Eino-learning-notes-1-ChatModel-en

ChatModel is Eino’s abstraction over conversational large language models. It provides a unified API for talking to different model backends (OpenAI, Ollama, and so on).

This component matters especially for:

  • Natural-language dialogue
  • Text generation and completion
  • Generating parameters for tool calls
  • Multimodal interaction (text, images, audio, etc.)

Component definition

Interface definition

Source: eino/components/model/interface.go

type ChatModel interface {
    Generate(ctx context.Context, input []*schema.Message, opts ...Option) (*schema.Message, error)
    Stream(ctx context.Context, input []*schema.Message, opts ...Option) (*schema.StreamReader[*schema.Message], error)
    BindTools(tools []*schema.ToolInfo) error
}

Generate

  • Purpose: produce a complete model response in one shot.
  • Parameters:
    • ctx: context for request-scoped data and for passing the callback manager.
    • input: list of input messages.
    • opts: optional knobs for model behavior.
  • Returns:
    • *schema.Message: the model’s reply.
    • error: anything that went wrong during generation.

Stream

  • Purpose: stream the model response chunk by chunk.
  • Parameters: same as Generate.
  • Returns:
    • *schema.StreamReader[*schema.Message]: reader for the streamed reply.
    • error: errors during streaming.

BindTools

  • Purpose: attach tools the model may call.
  • Parameters:
    • tools: tool metadata list.
  • Returns:
    • error: binding failures.

Core role — This interface is the main abstraction for chat models and supports two call styles:

  • Generate: synchronous, full response (typical chat).
  • Stream: streamed output (long text, live UX).

Architectural traits

type ChatModel interface {
    // Synchronous generation (typical chat loop)
    Generate(ctx context.Context, input []*schema.Message, opts ...Option) (*schema.Message, error)

    // Streaming (good for incremental output)
    Stream(ctx context.Context, input []*schema.Message, opts ...Option) (
        *schema.StreamReader[*schema.Message], error)

    // Tool binding (extensibility / function calling)
    BindTools(tools []*schema.ToolInfo) error

}

Design highlights

  • Multiple backends — One interface, many engines (OpenAI, MAAS, …).
  • Context-awarecontext.Context for deadlines, tracing, etc.
  • Extensible options...Option lets each implementation add config.
  • Runtime tool bindingBindTools augments capabilities at runtime (e.g. function calling).

Engineering practice

Using //go:generate to produce ChatModelMock signals:

  • Interface-first design.
  • Strong unit-test support.
  • Dependency injection for different environments.

Caveats

  • Concurrency — Comments warn that BindTools vs Generate may not be atomic; you may need synchronization.
  • Message contract — Behavior depends on schema.Message; dig into the schema when needed.
  • Stream lifecycle — Pair StreamReader with Close so resources are released.

Message struct

Source: eino/schema/message.go

type Message struct {   
    // Role 表示消息的角色(system/user/assistant/tool)
    Role RoleType
    // Content 是消息的文本内容
    Content string
    // MultiContent 是多模态内容,支持文本、图片、音频等
    MultiContent []ChatMessagePart
    // Name 是消息的发送者名称
    Name string
    // ToolCalls 是 assistant 消息中的工具调用信息
    ToolCalls []ToolCall
    // ToolCallID 是 tool 消息的工具调用 ID
    ToolCallID string
    // ResponseMeta 包含响应的元信息
    ResponseMeta *ResponseMeta
    // Extra 用于存储额外信息
    Extra map[string]any
}

Message is the basic unit for model I/O. It supports:

  • Several roles: system, user, assistant, tool.
  • Multimodal parts: text, images, audio, video, files.
  • Tool calls: model-invoked tools and functions.
  • Metadata: finish reason, token usage, etc.

Shared Options

The model component exposes common options:

Source: eino/components/model/option.go

type Options struct {
    // Temperature 控制输出的随机性
    Temperature *float32
    // MaxTokens 控制生成的最大 token 数量
    MaxTokens *int
    // Model 指定使用的模型名称
    Model *string
    // TopP 控制输出的多样性
    TopP *float32
    // Stop 指定停止生成的条件
    Stop []string
}

Set options like this:

// 设置温度
WithTemperature(temperature float32) Option

// 设置最大 token 数
WithMaxTokens(maxTokens int) Option

// 设置模型名称
WithModel(name string) Option

// 设置 top_p 值
WithTopP(topP float32) Option

// 设置停止词
WithStop(stop []string) Option

Usage

Standalone

import (
    "context"
    "fmt"
    "io"

    "github.com/cloudwego/eino-ext/components/model/openai"
    "github.com/cloudwego/eino/components/model"
    "github.com/cloudwego/eino/schema"
)

// 初始化模型 (以openai为例)
cm, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
    // 配置参数
})

// 准备输入消息
messages := []*schema.Message{
    {
       Role:    schema.System,
       Content: "你是一个有帮助的助手。",
    },
    {
       Role:    schema.User,
       Content: "你好!",
    },
}

// 生成响应
response, err := cm.Generate(ctx, messages, model.WithTemperature(0.8))

// 响应处理
fmt.Print(response.Content)

// 流式生成
streamResult, err := cm.Stream(ctx, messages)

defer streamResult.Close()

for {
    chunk, err := streamResult.Recv()
    if err == io.EOF {
       break
    }
    if err != nil {
       // 错误处理
    }
    // 响应片段处理
    fmt.Print(chunk.Content)
}

Inside composition (chain / graph)

import (
    "github.com/cloudwego/eino/schema"
    "github.com/cloudwego/eino/compose"
)

/*** 初始化ChatModel
* cm, err := xxx
*/

// 在 Chain 中使用
c := compose.NewChain[[]*schema.Message, *schema.Message]()
c.AppendChatModel(cm)


// 在 Graph 中使用
g := compose.NewGraph[[]*schema.Message, *schema.Message]()
g.AddChatModelNode("model_node", cm)

Options and callbacks

Option example

import "github.com/cloudwego/eino/components/model"

// 使用 Option
response, err := cm.Generate(ctx, messages,
    model.WithTemperature(0.7),
    model.WithMaxTokens(2000),
    model.WithModel("gpt-4"),
)

Callback example

import (
    "context"
    "fmt"

    "github.com/cloudwego/eino/callbacks"
    "github.com/cloudwego/eino/components/model"
    "github.com/cloudwego/eino/compose"
    "github.com/cloudwego/eino/schema"
    callbacksHelper "github.com/cloudwego/eino/utils/callbacks"
)

// 创建 callback handler
handler := &callbacksHelper.ModelCallbackHandler{
    OnStart: func(ctx context.Context, info *callbacks.RunInfo, input *model.CallbackInput) context.Context {
       fmt.Printf("开始生成,输入消息数量: %d\n", len(input.Messages))
       return ctx
    },
    OnEnd: func(ctx context.Context, info *callbacks.RunInfo, output *model.CallbackOutput) context.Context {
       fmt.Printf("生成完成,Token 使用情况: %+v\n", output.TokenUsage)
       return ctx
    },
    OnEndWithStreamOutput: func(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[*model.CallbackOutput]) context.Context {
       fmt.Println("开始接收流式输出")
       defer output.Close()
       return ctx
    },
}

// 使用 callback handler
helper := callbacksHelper.NewHandlerHelper().
    ChatModel(handler).
    Handler()

/*** compose a chain
* chain := NewChain
* chain.appendxxx().
*       appendxxx().
*       ...
*/

// 在运行时使用
runnable, err := chain.Compile()
if err != nil {
    return err
}
result, err := runnable.Invoke(ctx, messages, compose.WithCallbacks(helper))

Existing implementations

  1. OpenAI ChatModel — GPT family via OpenAI ChatModel — OpenAI
  2. Ollama ChatModel — local models via Ollama ChatModel — Ollama
  3. ARK ChatModel — models on the ARK platform ChatModel — ARK

Implementing your own

When you build a custom ChatModel:

  1. Implement the shared options.
  2. Wire up the callback hooks.
  3. On streaming paths, close the writer when you are done.

Option mechanism

If you need options beyond the shared set, use the component helpers to define implementation-specific options, for example:

import (
    "time"

    "github.com/cloudwego/eino/components/model"
)

// 定义 Option 结构体
type MyChatModelOptions struct {
    Options    *model.Options
    RetryCount int
    Timeout    time.Duration
}

// 定义 Option 函数
func WithRetryCount(count int) model.Option {
    return model.WrapImplSpecificOptFn(func(o *MyChatModelOptions) {
       o.RetryCount = count
    })
}

func WithTimeout(timeout time.Duration) model.Option {
    return model.WrapImplSpecificOptFn(func(o *MyChatModelOptions) {
       o.Timeout = timeout
    })
}

Callback handling

A ChatModel implementation should fire callbacks at the right times. The component defines:

import (
    "github.com/cloudwego/eino/schema"
)

// 定义回调输入输出
type CallbackInput struct {
    Messages    []*schema.Message
    Model       string
    Temperature *float32
    MaxTokens   *int
    Extra       map[string]any
}

type CallbackOutput struct {
    Message    *schema.Message
    TokenUsage *schema.TokenUsage
    Extra      map[string]any
}

End-to-end implementation sketch

import (
    "context"
    "errors"
    "net/http"
    "time"

    "github.com/cloudwego/eino/callbacks"
    "github.com/cloudwego/eino/components/model"
    "github.com/cloudwego/eino/schema"
)

type MyChatModel struct {
    client     *http.Client
    apiKey     string
    baseURL    string
    model      string
    timeout    time.Duration
    retryCount int
}

type MyChatModelConfig struct {
    APIKey string
}

func NewMyChatModel(config *MyChatModelConfig) (*MyChatModel, error) {
    if config.APIKey == "" {
       return nil, errors.New("api key is required")
    }

    return &MyChatModel{
       client: &http.Client{},
       apiKey: config.APIKey,
    }, nil
}

func (m *MyChatModel) Generate(ctx context.Context, messages []*schema.Message, opts ...model.Option) (*schema.Message, error) {
    // 1. 处理选项
    options := &MyChatModelOptions{
       Options: &model.Options{
          Model: &m.model,
       },
       RetryCount: m.retryCount,
       Timeout:    m.timeout,
    }
    options.Options = model.GetCommonOptions(options.Options, opts...)
    options = model.GetImplSpecificOptions(options, opts...)

    // 2. 开始生成前的回调
    ctx = callbacks.OnStart(ctx, &model.CallbackInput{
       Messages: messages,
       Config: &model.Config{
          Model: *options.Options.Model,
       },
    })

    // 3. 执行生成逻辑
    response, err := m.doGenerate(ctx, messages, options)

    // 4. 处理错误和完成回调
    if err != nil {
       ctx = callbacks.OnError(ctx, err)
       return nil, err
    }

    ctx = callbacks.OnEnd(ctx, &model.CallbackOutput{
       Message: response,
    })

    return response, nil
}

func (m *MyChatModel) Stream(ctx context.Context, messages []*schema.Message, opts ...model.Option) (*schema.StreamReader[*schema.Message], error) {
    // 1. 处理选项
    options := &MyChatModelOptions{
       Options: &model.Options{
          Model: &m.model,
       },
       RetryCount: m.retryCount,
       Timeout:    m.timeout,
    }
    options.Options = model.GetCommonOptions(options.Options, opts...)
    options = model.GetImplSpecificOptions(options, opts...)

    // 2. 开始流式生成前的回调
    ctx = callbacks.OnStart(ctx, &model.CallbackInput{
       Messages: messages,
       Config: &model.Config{
          Model: *options.Options.Model,
       },
    })

    // 3. 创建流式响应
    // Pipe产生一个StreamReader和一个StreamWrite,向StreamWrite中写入可以从StreamReader中读到,二者并发安全。
    // 实现中异步向StreamWrite中写入生成内容,返回StreamReader作为返回值
    // ***StreamReader是一个数据流,仅可读一次,组件自行实现Callback时,既需要通过OnEndWithCallbackOutput向callback传递数据流,也需要向返回一个数据流,需要对数据流进行一次拷贝
    // 考虑到此种情形总是需要拷贝数据流,OnEndWithCallbackOutput函数会在内部拷贝并返回一个未被读取的流
    // 以下代码演示了一种流处理方式,处理方式不唯一
    sr, sw := schema.Pipe[*model.CallbackOutput](1)

    // 4. 启动异步生成
    go func() {
       defer sw.Close()

       // 流式写入
       m.doStream(ctx, messages, options, sw)
    }()

    // 5. 完成回调
    _, nsr := callbacks.OnEndWithStreamOutput(ctx, sr)

    return schema.StreamReaderWithConvert(nsr, func(t *model.CallbackOutput) (*schema.Message, error) {
       return t.Message, nil
    }), nil
}

func (m *MyChatModel) BindTools(tools []*schema.ToolInfo) error {
    // 实现工具绑定逻辑
    return nil
}

func (m *MyChatModel) doGenerate(ctx context.Context, messages []*schema.Message, opts *MyChatModelOptions) (*schema.Message, error) {
    // 实现生成逻辑
    return nil, nil
}

func (m *MyChatModel) doStream(ctx context.Context, messages []*schema.Message, opts *MyChatModelOptions, sr *schema.StreamWriter[*model.CallbackOutput]) {
    // 流式生成文本写入sr中
    return
}

References