Eino-learning-notes-1-ChatModel-en
ChatModel is Eino’s abstraction over conversational large language models. It provides a unified API for talking to different model backends (OpenAI, Ollama, and so on).
This component matters especially for:
- Natural-language dialogue
- Text generation and completion
- Generating parameters for tool calls
- Multimodal interaction (text, images, audio, etc.)
Component definition
Interface definition
Source:
eino/components/model/interface.go
type ChatModel interface {
Generate(ctx context.Context, input []*schema.Message, opts ...Option) (*schema.Message, error)
Stream(ctx context.Context, input []*schema.Message, opts ...Option) (*schema.StreamReader[*schema.Message], error)
BindTools(tools []*schema.ToolInfo) error
}
Generate
- Purpose: produce a complete model response in one shot.
- Parameters:
ctx: context for request-scoped data and for passing the callback manager.input: list of input messages.opts: optional knobs for model behavior.
- Returns:
*schema.Message: the model’s reply.error: anything that went wrong during generation.
Stream
- Purpose: stream the model response chunk by chunk.
- Parameters: same as
Generate. - Returns:
*schema.StreamReader[*schema.Message]: reader for the streamed reply.error: errors during streaming.
BindTools
- Purpose: attach tools the model may call.
- Parameters:
tools: tool metadata list.
- Returns:
error: binding failures.
Core role — This interface is the main abstraction for chat models and supports two call styles:
Generate: synchronous, full response (typical chat).Stream: streamed output (long text, live UX).
Architectural traits
type ChatModel interface {
// Synchronous generation (typical chat loop)
Generate(ctx context.Context, input []*schema.Message, opts ...Option) (*schema.Message, error)
// Streaming (good for incremental output)
Stream(ctx context.Context, input []*schema.Message, opts ...Option) (
*schema.StreamReader[*schema.Message], error)
// Tool binding (extensibility / function calling)
BindTools(tools []*schema.ToolInfo) error
}
Design highlights
- Multiple backends — One interface, many engines (OpenAI, MAAS, …).
- Context-aware —
context.Contextfor deadlines, tracing, etc. - Extensible options —
...Optionlets each implementation add config. - Runtime tool binding —
BindToolsaugments capabilities at runtime (e.g. function calling).
Engineering practice
Using //go:generate to produce ChatModelMock signals:
- Interface-first design.
- Strong unit-test support.
- Dependency injection for different environments.
Caveats
- Concurrency — Comments warn that
BindToolsvsGeneratemay not be atomic; you may need synchronization. - Message contract — Behavior depends on
schema.Message; dig into the schema when needed. - Stream lifecycle — Pair
StreamReaderwithCloseso resources are released.
Message struct
Source:
eino/schema/message.go
type Message struct {
// Role 表示消息的角色(system/user/assistant/tool)
Role RoleType
// Content 是消息的文本内容
Content string
// MultiContent 是多模态内容,支持文本、图片、音频等
MultiContent []ChatMessagePart
// Name 是消息的发送者名称
Name string
// ToolCalls 是 assistant 消息中的工具调用信息
ToolCalls []ToolCall
// ToolCallID 是 tool 消息的工具调用 ID
ToolCallID string
// ResponseMeta 包含响应的元信息
ResponseMeta *ResponseMeta
// Extra 用于存储额外信息
Extra map[string]any
}
Message is the basic unit for model I/O. It supports:
- Several roles:
system,user,assistant,tool. - Multimodal parts: text, images, audio, video, files.
- Tool calls: model-invoked tools and functions.
- Metadata: finish reason, token usage, etc.
Shared Options
The model component exposes common options:
Source:
eino/components/model/option.go
type Options struct {
// Temperature 控制输出的随机性
Temperature *float32
// MaxTokens 控制生成的最大 token 数量
MaxTokens *int
// Model 指定使用的模型名称
Model *string
// TopP 控制输出的多样性
TopP *float32
// Stop 指定停止生成的条件
Stop []string
}
Set options like this:
// 设置温度
WithTemperature(temperature float32) Option
// 设置最大 token 数
WithMaxTokens(maxTokens int) Option
// 设置模型名称
WithModel(name string) Option
// 设置 top_p 值
WithTopP(topP float32) Option
// 设置停止词
WithStop(stop []string) Option
Usage
Standalone
import (
"context"
"fmt"
"io"
"github.com/cloudwego/eino-ext/components/model/openai"
"github.com/cloudwego/eino/components/model"
"github.com/cloudwego/eino/schema"
)
// 初始化模型 (以openai为例)
cm, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
// 配置参数
})
// 准备输入消息
messages := []*schema.Message{
{
Role: schema.System,
Content: "你是一个有帮助的助手。",
},
{
Role: schema.User,
Content: "你好!",
},
}
// 生成响应
response, err := cm.Generate(ctx, messages, model.WithTemperature(0.8))
// 响应处理
fmt.Print(response.Content)
// 流式生成
streamResult, err := cm.Stream(ctx, messages)
defer streamResult.Close()
for {
chunk, err := streamResult.Recv()
if err == io.EOF {
break
}
if err != nil {
// 错误处理
}
// 响应片段处理
fmt.Print(chunk.Content)
}
Inside composition (chain / graph)
import (
"github.com/cloudwego/eino/schema"
"github.com/cloudwego/eino/compose"
)
/*** 初始化ChatModel
* cm, err := xxx
*/
// 在 Chain 中使用
c := compose.NewChain[[]*schema.Message, *schema.Message]()
c.AppendChatModel(cm)
// 在 Graph 中使用
g := compose.NewGraph[[]*schema.Message, *schema.Message]()
g.AddChatModelNode("model_node", cm)
Options and callbacks
Option example
import "github.com/cloudwego/eino/components/model"
// 使用 Option
response, err := cm.Generate(ctx, messages,
model.WithTemperature(0.7),
model.WithMaxTokens(2000),
model.WithModel("gpt-4"),
)
Callback example
import (
"context"
"fmt"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/components/model"
"github.com/cloudwego/eino/compose"
"github.com/cloudwego/eino/schema"
callbacksHelper "github.com/cloudwego/eino/utils/callbacks"
)
// 创建 callback handler
handler := &callbacksHelper.ModelCallbackHandler{
OnStart: func(ctx context.Context, info *callbacks.RunInfo, input *model.CallbackInput) context.Context {
fmt.Printf("开始生成,输入消息数量: %d\n", len(input.Messages))
return ctx
},
OnEnd: func(ctx context.Context, info *callbacks.RunInfo, output *model.CallbackOutput) context.Context {
fmt.Printf("生成完成,Token 使用情况: %+v\n", output.TokenUsage)
return ctx
},
OnEndWithStreamOutput: func(ctx context.Context, info *callbacks.RunInfo, output *schema.StreamReader[*model.CallbackOutput]) context.Context {
fmt.Println("开始接收流式输出")
defer output.Close()
return ctx
},
}
// 使用 callback handler
helper := callbacksHelper.NewHandlerHelper().
ChatModel(handler).
Handler()
/*** compose a chain
* chain := NewChain
* chain.appendxxx().
* appendxxx().
* ...
*/
// 在运行时使用
runnable, err := chain.Compile()
if err != nil {
return err
}
result, err := runnable.Invoke(ctx, messages, compose.WithCallbacks(helper))
Existing implementations
- OpenAI ChatModel — GPT family via OpenAI ChatModel — OpenAI
- Ollama ChatModel — local models via Ollama ChatModel — Ollama
- ARK ChatModel — models on the ARK platform ChatModel — ARK
Implementing your own
When you build a custom ChatModel:
- Implement the shared options.
- Wire up the callback hooks.
- On streaming paths, close the writer when you are done.
Option mechanism
If you need options beyond the shared set, use the component helpers to define implementation-specific options, for example:
import (
"time"
"github.com/cloudwego/eino/components/model"
)
// 定义 Option 结构体
type MyChatModelOptions struct {
Options *model.Options
RetryCount int
Timeout time.Duration
}
// 定义 Option 函数
func WithRetryCount(count int) model.Option {
return model.WrapImplSpecificOptFn(func(o *MyChatModelOptions) {
o.RetryCount = count
})
}
func WithTimeout(timeout time.Duration) model.Option {
return model.WrapImplSpecificOptFn(func(o *MyChatModelOptions) {
o.Timeout = timeout
})
}
Callback handling
A ChatModel implementation should fire callbacks at the right times. The component defines:
import (
"github.com/cloudwego/eino/schema"
)
// 定义回调输入输出
type CallbackInput struct {
Messages []*schema.Message
Model string
Temperature *float32
MaxTokens *int
Extra map[string]any
}
type CallbackOutput struct {
Message *schema.Message
TokenUsage *schema.TokenUsage
Extra map[string]any
}
End-to-end implementation sketch
import (
"context"
"errors"
"net/http"
"time"
"github.com/cloudwego/eino/callbacks"
"github.com/cloudwego/eino/components/model"
"github.com/cloudwego/eino/schema"
)
type MyChatModel struct {
client *http.Client
apiKey string
baseURL string
model string
timeout time.Duration
retryCount int
}
type MyChatModelConfig struct {
APIKey string
}
func NewMyChatModel(config *MyChatModelConfig) (*MyChatModel, error) {
if config.APIKey == "" {
return nil, errors.New("api key is required")
}
return &MyChatModel{
client: &http.Client{},
apiKey: config.APIKey,
}, nil
}
func (m *MyChatModel) Generate(ctx context.Context, messages []*schema.Message, opts ...model.Option) (*schema.Message, error) {
// 1. 处理选项
options := &MyChatModelOptions{
Options: &model.Options{
Model: &m.model,
},
RetryCount: m.retryCount,
Timeout: m.timeout,
}
options.Options = model.GetCommonOptions(options.Options, opts...)
options = model.GetImplSpecificOptions(options, opts...)
// 2. 开始生成前的回调
ctx = callbacks.OnStart(ctx, &model.CallbackInput{
Messages: messages,
Config: &model.Config{
Model: *options.Options.Model,
},
})
// 3. 执行生成逻辑
response, err := m.doGenerate(ctx, messages, options)
// 4. 处理错误和完成回调
if err != nil {
ctx = callbacks.OnError(ctx, err)
return nil, err
}
ctx = callbacks.OnEnd(ctx, &model.CallbackOutput{
Message: response,
})
return response, nil
}
func (m *MyChatModel) Stream(ctx context.Context, messages []*schema.Message, opts ...model.Option) (*schema.StreamReader[*schema.Message], error) {
// 1. 处理选项
options := &MyChatModelOptions{
Options: &model.Options{
Model: &m.model,
},
RetryCount: m.retryCount,
Timeout: m.timeout,
}
options.Options = model.GetCommonOptions(options.Options, opts...)
options = model.GetImplSpecificOptions(options, opts...)
// 2. 开始流式生成前的回调
ctx = callbacks.OnStart(ctx, &model.CallbackInput{
Messages: messages,
Config: &model.Config{
Model: *options.Options.Model,
},
})
// 3. 创建流式响应
// Pipe产生一个StreamReader和一个StreamWrite,向StreamWrite中写入可以从StreamReader中读到,二者并发安全。
// 实现中异步向StreamWrite中写入生成内容,返回StreamReader作为返回值
// ***StreamReader是一个数据流,仅可读一次,组件自行实现Callback时,既需要通过OnEndWithCallbackOutput向callback传递数据流,也需要向返回一个数据流,需要对数据流进行一次拷贝
// 考虑到此种情形总是需要拷贝数据流,OnEndWithCallbackOutput函数会在内部拷贝并返回一个未被读取的流
// 以下代码演示了一种流处理方式,处理方式不唯一
sr, sw := schema.Pipe[*model.CallbackOutput](1)
// 4. 启动异步生成
go func() {
defer sw.Close()
// 流式写入
m.doStream(ctx, messages, options, sw)
}()
// 5. 完成回调
_, nsr := callbacks.OnEndWithStreamOutput(ctx, sr)
return schema.StreamReaderWithConvert(nsr, func(t *model.CallbackOutput) (*schema.Message, error) {
return t.Message, nil
}), nil
}
func (m *MyChatModel) BindTools(tools []*schema.ToolInfo) error {
// 实现工具绑定逻辑
return nil
}
func (m *MyChatModel) doGenerate(ctx context.Context, messages []*schema.Message, opts *MyChatModelOptions) (*schema.Message, error) {
// 实现生成逻辑
return nil, nil
}
func (m *MyChatModel) doStream(ctx context.Context, messages []*schema.Message, opts *MyChatModelOptions, sr *schema.StreamWriter[*model.CallbackOutput]) {
// 流式生成文本写入sr中
return
}