KubeEdge云边通信框架-en
Not guaranteed up to date—for authoritative behavior, read the source.
References
- KubeEdge cloud-native edge computing open class 15 — project layout & cloud–edge comms source walkthrough
- KubeEdge source analysis (2): CloudHub
Beehive
Beehive is KubeEdge’s core messaging framework: it registers modules and routes messages between them. Both CloudCore and EdgeCore depend on Beehive, so understanding Beehive is a prerequisite for grasping KubeEdge’s design.
Beehive is built on Go channels. Its responsibilities split into module lifecycle (ModuleContext) and messaging (MessageContext). The high-level picture:

Message format
Before diving into Beehive internals, note how messages are shaped. A Message is the carrier between modules and has three parts:
- Header
- ID: message id (UUID string).
- ParentID: present on replies to synchronous calls (only on sync responses).
- TimeStamp: when the message was created.
- Sync:
truemeans this is a synchronous message.
- Route
- Source: who sent it.
- Group: logical group.
- Operation: what is being done to the resource.
- Resource: the resource in question.
- Content: payload.
Context data structure
Both ModuleContext and MessageContext are implemented by Context:
//Context is object for Context channel
type Context struct{
//ConfigFactory goarchaius.ConfigurationFactory
channels map[string]chan model.Message
chsLock sync.RwMutex
typeChannels map[string]map[string]chan model.Message
typeChsLock sync.RWMutex
anonChannels map[string]chan model.Message
anonChsLock sync.RWMutex
}
Full source: context_channel.go
- channels — module name → message channel; used to deliver to a module.
- chsLock — lock for
channels. - typeChannels — two-level map: group → module → that module’s channel.
- typeChsLock — lock for
typeChannels. - anonChannels — maps a message’s parent id to a channel for synchronous replies.
- anonChsLock — lock for
anonChannels.
Beehive module management
In Beehive a module is any type that implements the interface; CloudHub, EdgeHub, EdgeController, etc., all do.
//Module Interface
type Moudule interface{
Name() string
Group() string
Start()
Enable() bool
}
Full source: module.go
Supported module operations:
type ModuleContext interface {
AddModule(info *common.ModuleInfo)
AddModuleGroup(module.group string)
Cleanup(module string)
}
Full source: context.go
| API | Role | How it works |
|---|---|---|
AddModule | Register a module | Allocate a message channel and store it in channels. |
AddModuleGroup | Put a module in a group | Look up the module’s channel in channels, then record group/module/channel in typeChannels. |
Cleanup | Tear down a module | Remove entries from channels and typeChannels. |
Beehive messaging
Registered modules can talk to each other through several patterns:
//MessageContext is interface for messaging syncing
type MessageContext interface {
//async mode
Send(module string, message model.Message)
Receive(module string)(model.Message error)
//sync mode
SendSync(module string,message model.Message,timeout time.Duration)(model.Message error)
SendResp(message model.Message)
//group broadcast
SendToGroup(group string,message model.Message)
SendToGroupSync(group string,message model.Message,timeout time.Duration) error
}
| API | Role | How it works |
|---|---|---|
Send | Async to one module | Look up the module channel in channels and enqueue the message. |
Receive | Read for a module | Block on that module’s channel until a message arrives. |
SendSync | Sync to one module | Enqueue on the module channel, register a reply channel in anonChannels keyed by message id, then wait until timeout for the response. |
SendResp | Reply to a sync call | Use parentID to find the waiter channel in anonChannels and deliver the response (or log if missing). |
SendToGroup | Async broadcast | Enumerate modules under the group in typeChannels and Send to each. |
SendToGroupSync | Sync broadcast | Collect all group modules, create an anonymous channel sized to the member count, fan out with Send, then wait until that many replies arrive. |
Registering and starting modules
When CloudCore or EdgeCore boots, every module registers with Beehive, which keeps a name→module map.
//registerModules register all the modules started in cloudcore
func registerModules(c *v1alpha1.CloudCoreConfig){
cloudhub.Register(c.Modules.Cloudhub)
edgecontroller.Register(c.Modules.EdgeController)
devicecontroller.Register(c.Modules.DeviceController)
nodeupgradejobcontroller.Register(c.Modules.NodeUpgradeJobController)
synccontroller.Register(c.Modules.SyncController)
cloudstream.Register(c.Modules.CloudStream,c.CommonConfig)
router.Register(c.Modules.Router)
dynamiccontroller.Register(c.Modules.DynamicController)
policycontroller.Register(client.CrdConfig)
}
At Beehive startup it walks every registered module and, for each:
- Builds a
ModuleInfobased on the module’s context type. - Calls
beehiveContext.AddModule. - Calls
beehiveContext.AddModuleGroup. - Invokes the module’s
Start.
//StartModules starts modules that are registered
func StartModules(){
//only register channel mode,if want to use socket mode,we should also pass in common.MsgCtxTypeUSparameter
beehiveContext.InitContext([]string{common.MsgCtxTypeChannel})
modules := GetModules()
for name, module := range modules{
var m common.ModuleInfo
switch module.contextType{
case common.MsgCtxTypeChannel:
m = common.ModuleInfo{
ModuleName: name,
ModuleType: module.contextType,
}
......
default:
klog.Exitf("unsupported context type: %s",module.contextType)
}
beehiveContext.AddModule(&m)
beehiveContext.AddModuleGroup(name,module.module.Group())
go moduleKeeper(name, module, m)
klog.Infof("starting module %s",name)
}
}
Viaduct
Viaduct is the cloud–edge transport layer: a small set of abstractions with concrete servers/clients for several protocols, managing edge connectivity and data plane traffic. It hides protocol differences so upper layers see one API; operators pick WebSocket, QUIC, etc., via config. New protocols can be plugged in through Viaduct as requirements evolve.
Viaduct splits into server/client interfaces and protocol implementations. CloudCore runs the server side (accept edges, move bytes); EdgeCore runs the client (dial in). Architecture:

The following uses WebSocket as the example.
Connection interface
Connection is the heart of Viaduct. Cloud and edge are full-duplex; both sides construct a Connection after connect. Definition:
//the operation set of connection
type Connection interface{
//process message from the connection
ServeConn() //服务端从通道中持续的读取消息
//SetReadDeadline sets the deadline for future Read calls
//and any currently-blocked Read call.
//A zero value for t means Read will not time out.
SetReadDeadline(t time.Time) error
//SetWriteDeadline sets the deadline for future write calls
//and any currently-blocked write call.
//Even if write times out.It may return n > 0, indicating that
//some of the data was successfully written.
//A zero value for t means write will not time out.
SetWriteDeadline(t time.Time) error
//write write raw data to the connection
//it will open a stream for raw data
write(raw []byte)(int,error)
//writeMessageAsync writes data to the connection and don't care about the response
WriteMessageAsync(msg *model.Message) error //异步
//writeMessageSync writes data to the connection and care about the response
WriteMessageSync(msg *model.Message)(*model.Message,error) //同步
//ReadMessage reads message from the connection
//it will be blocked when no message received
//if you want to use this api for message reading
//make sure AutoRoute be false
ReadMessage(msg *model.Message ) error
//RemoteAddr returns the remote network address
RemoteAddr() net.Addr
//LocalAddr returns the local network address
LocalAddr() net.Addr
//connectState return the current connection state
ConnectionState() connectionState
//Close closes the connection
//Any blocked Read or Write operations will be unblocked and return errors
Close() error
}
| Method | Role |
|---|---|
ServeConn | Server loop: continuously read, decode to Message, dispatch via callback. |
Read | Read raw bytes from the wire. |
Write | Write raw bytes. |
WriteMessageAsync | Send a Message without waiting for a reply. |
WriteMessageSync | Send a Message and block for the peer’s response. |
ReadMessage | Read and decode into Message. |
Server API and WebSocket server
The protocol server surface is small:
//protocol server
type ProtocolServer interface {
ListenAndServerTLS() error
close() error
}
WebSocket implementation:
func (srv *WSServer) ListenAndServeTLS() error{
return srv.server.ListenAndServeTLS("","")
}
func (srv *WSServer)Close() error{
if srv.server != nil{
return srv.server.Close()
}
return nil
}
The interesting part is ServeHTTP, which handles edge attach. Flow:

Client API and WebSocket client
Client side is also minimal: Connect dials CloudCore and returns a Connection.
//each protocol(websocket/quic) provides Connect
type Protocolclient interface{
Connect() (conn.Connection,error)
}
The WebSocket client uses Dial; on success it may run a user callback, then wraps the socket in conn.NewConnection:
//Connect try to connect remote server
func(c *WSClient)Connect()(conn.Connection,error){
header := c.exOpts.Header
header.Add("ConnectionUse",string(c.options.ConnUse))
wsConn,resp,err := c.dialer.Dial(c.options.Addr,header)
if err ==nil{
klog.Infof("dial %s successfully",c.options.Addr)
//do user's processing on connection or response
if c.exOpts.callback != nil{
c.exOpts.Callback(wsConn,resp)
}
return conn.NewConnection(&conn.ConnectionOptions{
ConnType: api.ProtocolTypeWS,
ConnUse: c.options.ConnUse,
Base: wsConn,
Consumer: c.options.Consumer,
Handler: c.options.Handler,
CtrlLane: lane.NewLane(api.ProtocolTypeWS,wsConn),
State: &conn.ConnectionState{
State: api.StatConnected,
Headers:c.exOpts.Header.Clone(),
},
AutoRoute: c.options.AutoRoute,
}),nil
}
//something wrong!
var respMsg string
if resp != nil{
body, errRead := io.ReadAll(io.LimitReader(resp.Body,comm.MaxReadLength))
if errRead ==nil{
respMsg = fmt.Sprintf("response code: %d,response body: %s",resp.StatusCode,string(body))
}else{
respMsg = fmt.Sprintf("response code: %d",resp.StatusCode)
}
resp.Body.Close()
}
klog.Errorf("dial websocket error(%+v), response message: %s",err, respMsg)
return nil,err
}
CloudHub
CloudHub is a CloudCore module: it accepts edges and shuttles data between controllers and EdgeCore—downstream Kubernetes events (e.g. pod updates) and upstream state from the edge. Placement in the stack:

Major internal pieces:

- HTTP server — certificate workflows (CA fetch, issuance, rotation).
- WebSocket server — optional edge attach over WS.
- QUIC server — optional edge attach over QUIC.
- CSI socket server — talks to the CSI driver on cloud.
- Token manager — edge join tokens (default 12h rotation).
- Certificate manager — issues/rotates edge certificates.
- Message handler — connection setup and edge message dispatch.
- Node session manager — per-edge session lifecycle.
- Message dispatcher — upstream/downstream routing.
CloudHub startup
CloudHub registers when CloudCore starts, then Beehive calls Start().
cloudhub.Register(c.modules.Cloudhub)
Startup order (simplified): launch dispatcher.DispatchDownstream for async downstream work; initialize TLS material (generate CA/service certs if missing); start token manager; StartHTTPServer() for cert enrollment; start the hub listener via Viaduct (WebSocket over TCP or QUIC over UDP); optionally start the CSI UDS server.
func (ch *cloudHub) Start(){
if !cache.WaitForCacheSync(beehiveContext.Done(),ch.informersSyncedFuncs...
{
klog.Errorf("unable to sync caches for objectSyncController")
os.Exit(1)
}
//start dispatch message from the cloud to edge node
go ch.dispatcher.DispatchDownstream()
//check whether the certificates exists in the local directory.
//and then check whether certificates exists in the secret.
//generate if they don't exist
if err := httpserver.PrepareAllCerts(): err!=nil{
klog.Exit(err)
}
DoneTLSTunnelCerts <- true
close(DoneTLSTunnelCerts)
//generate Token
if err:=httpserver.GenerateToken():err!=nil{
klog.Exit(err)
}
//HttpServer mainly used to issue certificates for the edge
go httpserver.StartHTTPServer()
servers.StartCloudHub(ch.messageHandler)
if hubconfig.Config.UnixSocket.Enable{
//The uds server is only used to communicate with csi driver from kubeedge on cloud
//It is not used to communicate between cloud and edge
go udsserver.StartServer(hubconfig.Config.unixSocket.Address)
}
}
Core responsibilities: edge attach and message flow. Internal view:

Downstream delivery modes
Two downstream modes affect session behavior:
ACK mode — After the edge persists a downstream message locally it must ACK the cloud. If the cloud never sees the ACK it retries until one arrives.
NO-ACK mode — No ACK required; the cloud assumes success. Messages can be lost. Often used when the edge is waiting for a synchronous reply; missing replies trigger retries on the edge side.
Edge attach
Attach logic lives in messageHandler. Interface:
type Handler interface{
//HandleConnection is invoked when a new connection arrives
HandleConnection(connection conn.Connection)
//HandleMessage is invoked when a new message arrives
HandleMessage(container *mux.MessageContainer,writer mux.ResponseWriter)
//OnEdgeNodeConnect is invoked when a new connection is established
OnEdgeNodeConnect(info *model.HubInfo,connection conn.Connection) error
//OnEdgeNodeDisconnect is invoked when a connection is lost
OnEdgeNodeDisconnect(info *model.HubInfo,connection conn.Connection)
//OnReadTransportErr is invoked when the connection read message err
}
HandleConnection runs the attach path—for WebSocket, HTTP upgrades to WS, Viaduct builds Connection, then the handler:
- Pre-checks such as max node count.
nodeID := connection.ConnectionState().Headers.Get("node_id")
projectID := connection.ConnectionState().Headers.Get("project_id")
if mh.SessionManager.ReachLimit(){
klog.Errorf("Fail to serve node %s,reach node limit",nodeID)
return
}
- Allocates
nodeMessagePooland registers it with the dispatcher (queue for downstream work).
//init node message pool and add to the dispatcher
nodeMessagePool := common.InitNodeMessagePool(nodeID)
mh.MessageDispatcher.AddNodeMessagePool(nodeID,nodeMessagePool)
nodeMessagePool holds downstream queues per edge—separate stores/queues for ACK vs NO-ACK, matching the modes above.
//NodeMessagePool is a collection of all downstream message sent to an
//edge node.There are two types of messages,one that requires an ack
//and another that does not.For each type of message.we use the 'queue'
//to mark the order of sending, and use the 'store' to store specific messages
type NodeMessagePool struct{
//AckMessageStore store message that will send to edge node
//and require acknowledgement from edge node
AckMessageStore cache.Store
//AckMessageQueue store message key that will send to edge node
//and require acknowledgement from edge node
AckMessageQueue workqueue.RateLimitingInterface
//NoAckMessageStore store message that will send to edge node
//and do not require acknowledgement from edge node
NoAckMessageStore cache.Store
//NoAckMessageQueue store message key that will send to edge node
//and do not require acknowledgement from edge node
NoAckMessageQueue workqueue.RateLimitingInterface
}
- Creates
nodeSession, registers it withSessionManager, and starts it.
//create a node session for each edge node
nodeSession := session.NewNodeSession(nodeID,projectID,connection,
keepaliveInterval,nodeMessagePool,mh.reliableClient)
//add node session to the session manager
mh.SessionManager.AddSession(nodeSession)
//Start session for each edge node and it will keep running until
//it encounters some Transport Error from underlying connection.
nodeSession.Start()
Each edge gets one NodeSession abstraction; SessionManager tracks every live session. Start spins goroutines for keepalive, SendAckMessage, and SendNoAckMessage.
//Start the main goroutine responsible for serving node session
func (ns *NodeSession)Start(){
klog.Infof("Start session for edge node %s",ns.nodeID)
go ns.KeepAliveCheck()
go ns.SendAckMessage()
go ns.SendNoAckMessage()
<-ns.ctx.Done()
}
Upstream / downstream dispatch
HandleMessage is thin: Viaduct decodes into MessageContainer, validate, then MessageDispatcher.DispatchUpstream toward EdgeController, DeviceController, etc.
//HandleMessage handle all the request from node
func (mh *messageHandler)HandleMessage(coantainer *mux.MessageContainer,writer mux.ResponseWriter){
nodeID := container.Header.Get("node_id")
projectID := container.Header.Get("project_id")
//validate message
if container.Message == nil{
klog.Errorf("The message is nil for node: %s",nodeID)
return
}
klog.v(4).Infof("[messageHandler]get msg from node(%s): %+v",nodeID,container.Message)
//dispatch upstream message
mh.MessageDispatcher.DispatchUpstream(container.Message,&model.HubInfo{ProjectID: projectID,NodeID:nodeID})
}
ACK downstream path (summary):
- KubeEdge uses the
ObjectSyncCRD to remember the latestresourceVersionsuccessfully delivered per resource on each edge. On hub start it reconciles pending vs already-synced versions to avoid replaying stale events. - Controllers enqueue to CloudHub;
MessageDispatcherroutes by node into the rightNodeMessagePool, picking ACK vs NO-ACK from the message. Enqueue consultsObjectSyncto skip duplicates. SendAckMessagedrains the pool in order, tracks pending ACKs; when the edge ACKs, persist the newresourceVersiontoObjectSyncand send the next item.- EdgeCore persists locally then returns ACK. If the hub misses the ACK it retries (implementation retries a bounded number of times, then drops).
SyncControllerhandles stragglers—e.g. ACK lost in transit—by re-driving delivery until state converges.
func (ns *NodeSession)SendMessageWithRetry(copyMsg, msg *beehivemodel.Message)error{
ackChan := make(chan struct{})
ns.ackMessageCache.Store(copyMsg.GetID(),ackChan)
//initialize retry count and timer for sending message
retryCount := 0
ticker := time.NewTimer(sendRetryInterval)
err := ns.connection.WriteMessageAsync(copyMsg)
if err !=nil{
return err
}
for{
select{
case <- ackChan:
ns.saveSuccessPoint(msg)
return nil
case <- ticker.C:
if retryCount == 4{
return ErrwaitTimeout
}
err := ns.connection.WriteMessageAsync(copyMsg)
if err !=nil{
return err
}
retryCount++
ticker.Reset(sendRetryInterval)
}
}
}
SyncController
Edge networks flap; cloud–edge traffic can drop mid-flight. SyncController is the CloudCore module that keeps delivery honest. KubeEdge persists sync state in ObjectSync CRs: for each edge the cloud records the latest resourceVersion acknowledged, so after restarts or disconnects ordering stays consistent and old revisions are not resent blindly. SyncController also periodically reconciles cloud vs edge views for eventual consistency.
It registers like other modules:
synccontroller.Register(c.Modules.syncController)
Start waits for informer sync, prunes stale ObjectSync objects, then runs reconcile every 5s.
func (sctl *SyncController) Start(){
if !cache.WaitForCacheSync(beehiveContext.Done(),sctl.informersSyncedFuncs...){
klog.Errorf("unable to sync caches for sync controller")
return
}
sctl.deleteObjectSyncs() //check outdate sync before start to reconcile
go wait.Until(sctl.reconcile, 5*time.Second,beehiveContext.Done())
}
ObjectSync names combine node id and object UUID. The controller compares stored resourceVersion against live API objects to decide retries/deletes. When CloudHub enqueues, it also compares against pool state to drop superseded work.

EdgeHub
EdgeHub is the WebSocket or QUIC client facing CloudCore—pulling cloud updates, pushing host/device deltas, etc. It registers during EdgeCore startup:
func Register(eh *v1alpha2.EdgeHub,nodeName string){
config.InitConfigure(eh,nodeName)
core.Register(newEdgeHub(eh.Enable))
}
Startup outline:
func (eh *EdgeHub)Start(){
eh.certManager = certificate.NewCertManager(config.config.EdgeHub,config.config.NodeName)
eh.certManager.Start()
for _, v := range GetCertSyncChannel(){
v <- true
close(v)
}
go eh.ifRotationDone()
for{
select{
case <- beehiveContext.Done():
klog.warning("EdgeHub stop")
return
default:
}
err := eh.initial()
if err !=nil{
klog.Exitf("failed to init controller:%v",err)
return
}
waitTime := time.Duration(config.Config.Heartbeat)*time.Second*2
err = eh.chClient.Init()
if err!= nil{
klog.Errorf("connection failed: %v,will reconnect after %s",err,waitTime.String())
time.Sleep(waitTime)
continue
}
//execute hook func after connect
eh.pubConnectInfo(true)
go eh.routeToEdge()
go eh.routeToCloud()
go eh.keepalive()
//wait the stop signal
//stop authinfo manager/websocket connection
<-eh.reconnectChan
eh.chClient.UnInit()
//execute hook function after disconnect
eh.pubConnectInfo(false)
//sleep one period of heartbeat, then try to connect cloud hub again
klog.warningf("connection is broken, will reconnect after %s",waitTime.String())
time.Sleep(waitTime)
//clean channel
clean:
for{
select{
case <- eh.reconnectChan:
default:
break clean
}
}
}
}
Rough sequence:
- Cert bootstrap from CloudCore (or local files), rotation hooks, then the reconnect loop.
eh.initial()buildschClient;Init()opens the Viaduct connection.pubConnectInfo(true)tells other EdgeCore modules the tunnel is up.- Three goroutines:
routeToEdge,routeToCloud,keepalive.
routeToEdge reads cloud→edge traffic; for sync replies it uses SendResp, otherwise routes by Beehive group.
func (eh *EdgeHub)routeToEdge(){
for{
select{
case<-beehiveContext.Done():
klog.Warning("EdgeHub RouteToEdge stop")
return
default:
}
message, err := eh.chClient.Receive()
if err!=nil{
klog.Errorf("failed to dispatch message,discard: %v",err)
}
}
}
routeToCloud reads from Beehive and writes to CloudHub.
func (eh *EdgeHub)routeToCloud(){
for{
select{
case<-beehiveContext.Done():
klog.warning("EdgeHub RouteToCloud stop")
return
default:
}
message,err:= beehiveContext.Receive(modules.EdgeHubModuleName)
if err !=nil{
klog.Errorf("failed to receive message from edge: %v",err)
time.Sleep(time.Second)
continue
}
err = eh.tryThrottle(message.GetID())
if err !=nil{
klog.Errorf("msgID: %s,client rate limiter returned an error: %v",message.GetID(),err)
continue
}
//post message to cloud hub
err = eh.sendToCloud(message)
if err !=nil{
klog.Errorf("failed to send message to cloud: %v",err)
eh.reconnectChan <- struct{}{}
return
}
}
}
keepalive periodically pings the cloud.
func (eh *EdgeHub)keepalive(){
for{
select{
case <-beehiveContext.Done():
klog.warning("EdgeHub KeepAlive stop")
return
default:
}
msg := model.NewMessage("").
BuildRouter(modules.EdgeHubModuleName,"resource","node",messagepkg.OperationKeepalive)
FillBody("ping")
//post message to cloud hub
err := eh.sendToCloud(*msg)
if err !=nil{
klog.Errorf("websocket write error: %v",err)
eh.reconnectChan <- struct{}{}
return
}
time.Sleep(time.Duration(config.Config.Heartbeat)*time.Second)
}
}
On transport errors EdgeHub tears down and re-initializes the client to dial CloudHub again.