KubeEdge云边通信框架-en

Not guaranteed up to date—for authoritative behavior, read the source.

References

Beehive

Beehive is KubeEdge’s core messaging framework: it registers modules and routes messages between them. Both CloudCore and EdgeCore depend on Beehive, so understanding Beehive is a prerequisite for grasping KubeEdge’s design.

Beehive is built on Go channels. Its responsibilities split into module lifecycle (ModuleContext) and messaging (MessageContext). The high-level picture:

Message format

Before diving into Beehive internals, note how messages are shaped. A Message is the carrier between modules and has three parts:

  • Header
    • ID: message id (UUID string).
    • ParentID: present on replies to synchronous calls (only on sync responses).
    • TimeStamp: when the message was created.
    • Sync: true means this is a synchronous message.
  • Route
    • Source: who sent it.
    • Group: logical group.
    • Operation: what is being done to the resource.
    • Resource: the resource in question.
  • Content: payload.

Context data structure

Both ModuleContext and MessageContext are implemented by Context:

//Context is object for Context channel
type Context struct{
	//ConfigFactory goarchaius.ConfigurationFactory
	channels map[string]chan model.Message
	chsLock  sync.RwMutex
	typeChannels map[string]map[string]chan model.Message
	typeChsLock  sync.RWMutex
	anonChannels map[string]chan model.Message
	anonChsLock  sync.RWMutex
}

Full source: context_channel.go

  • channels — module name → message channel; used to deliver to a module.
  • chsLock — lock for channels.
  • typeChannels — two-level map: group → module → that module’s channel.
  • typeChsLock — lock for typeChannels.
  • anonChannels — maps a message’s parent id to a channel for synchronous replies.
  • anonChsLock — lock for anonChannels.

Beehive module management

In Beehive a module is any type that implements the interface; CloudHub, EdgeHub, EdgeController, etc., all do.

//Module Interface
type Moudule interface{
	Name() string
	Group() string
	Start()
	Enable() bool
}

Full source: module.go

Supported module operations:

	type ModuleContext interface {
	AddModule(info *common.ModuleInfo)
	AddModuleGroup(module.group string)
	Cleanup(module string)
}

Full source: context.go

APIRoleHow it works
AddModuleRegister a moduleAllocate a message channel and store it in channels.
AddModuleGroupPut a module in a groupLook up the module’s channel in channels, then record group/module/channel in typeChannels.
CleanupTear down a moduleRemove entries from channels and typeChannels.

Beehive messaging

Registered modules can talk to each other through several patterns:

//MessageContext is interface for messaging syncing
type MessageContext interface {
	//async mode
	Send(module string, message model.Message)
	Receive(module string)(model.Message error)
	//sync mode
	SendSync(module string,message model.Message,timeout time.Duration)(model.Message error)
	SendResp(message model.Message)
	//group broadcast
	SendToGroup(group string,message model.Message)
	SendToGroupSync(group string,message model.Message,timeout time.Duration) error
}

context.go

APIRoleHow it works
SendAsync to one moduleLook up the module channel in channels and enqueue the message.
ReceiveRead for a moduleBlock on that module’s channel until a message arrives.
SendSyncSync to one moduleEnqueue on the module channel, register a reply channel in anonChannels keyed by message id, then wait until timeout for the response.
SendRespReply to a sync callUse parentID to find the waiter channel in anonChannels and deliver the response (or log if missing).
SendToGroupAsync broadcastEnumerate modules under the group in typeChannels and Send to each.
SendToGroupSyncSync broadcastCollect all group modules, create an anonymous channel sized to the member count, fan out with Send, then wait until that many replies arrive.

Registering and starting modules

When CloudCore or EdgeCore boots, every module registers with Beehive, which keeps a name→module map.

//registerModules register all the modules started in cloudcore
func registerModules(c *v1alpha1.CloudCoreConfig){
	cloudhub.Register(c.Modules.Cloudhub)
	edgecontroller.Register(c.Modules.EdgeController)
	devicecontroller.Register(c.Modules.DeviceController)
	nodeupgradejobcontroller.Register(c.Modules.NodeUpgradeJobController)
	synccontroller.Register(c.Modules.SyncController)
	cloudstream.Register(c.Modules.CloudStream,c.CommonConfig)
	router.Register(c.Modules.Router)
	dynamiccontroller.Register(c.Modules.DynamicController)
	policycontroller.Register(client.CrdConfig)
}

server.go#L155-L166

At Beehive startup it walks every registered module and, for each:

  1. Builds a ModuleInfo based on the module’s context type.
  2. Calls beehiveContext.AddModule.
  3. Calls beehiveContext.AddModuleGroup.
  4. Invokes the module’s Start.
//StartModules starts modules that are registered
func StartModules(){
	//only register channel mode,if want to use socket mode,we should also pass in common.MsgCtxTypeUSparameter
	beehiveContext.InitContext([]string{common.MsgCtxTypeChannel})

	modules := GetModules()

	for name, module := range modules{
		var m common.ModuleInfo
		switch module.contextType{
		case common.MsgCtxTypeChannel:
			m = common.ModuleInfo{
				ModuleName: name,
				ModuleType: module.contextType,
			}
		......

		default:
			klog.Exitf("unsupported context type: %s",module.contextType)
		}

		beehiveContext.AddModule(&m)
		beehiveContext.AddModuleGroup(name,module.module.Group())

		go moduleKeeper(name, module, m)
		klog.Infof("starting module %s",name)
	}
}

core.go

Viaduct

Viaduct is the cloud–edge transport layer: a small set of abstractions with concrete servers/clients for several protocols, managing edge connectivity and data plane traffic. It hides protocol differences so upper layers see one API; operators pick WebSocket, QUIC, etc., via config. New protocols can be plugged in through Viaduct as requirements evolve.

Viaduct splits into server/client interfaces and protocol implementations. CloudCore runs the server side (accept edges, move bytes); EdgeCore runs the client (dial in). Architecture:

The following uses WebSocket as the example.

Connection interface

Connection is the heart of Viaduct. Cloud and edge are full-duplex; both sides construct a Connection after connect. Definition:

//the operation set of connection
type Connection interface{
	//process message from the connection
	ServeConn() //服务端从通道中持续的读取消息

	//SetReadDeadline sets the deadline for future Read calls
	//and any currently-blocked Read call.
	//A zero value for t means Read will not time out.
	SetReadDeadline(t time.Time) error

	//SetWriteDeadline sets the deadline for future write calls
	//and any currently-blocked write call.
	//Even if write times out.It may return n > 0, indicating that
	//some of the data was successfully written.
	//A zero value for t means write will not time out.
	SetWriteDeadline(t time.Time) error

	//write write raw data to the connection
	//it will open a stream for raw data
	write(raw []byte)(int,error)

	//writeMessageAsync writes data to the connection and don't care about the response
	WriteMessageAsync(msg *model.Message) error //异步

	//writeMessageSync writes data to the connection and care about the response
	WriteMessageSync(msg *model.Message)(*model.Message,error) //同步

	//ReadMessage reads message from the connection
	//it will be blocked when no message received
	//if you want to use this api for message reading
	//make sure AutoRoute be false
	ReadMessage(msg *model.Message ) error

	//RemoteAddr returns the remote network address
	RemoteAddr() net.Addr

	//LocalAddr returns the local network address
	LocalAddr() net.Addr

	//connectState return the current connection state
	ConnectionState() connectionState

	//Close closes the connection
	//Any blocked Read or Write operations will be unblocked and return errors
	Close() error
}

conn.go

MethodRole
ServeConnServer loop: continuously read, decode to Message, dispatch via callback.
ReadRead raw bytes from the wire.
WriteWrite raw bytes.
WriteMessageAsyncSend a Message without waiting for a reply.
WriteMessageSyncSend a Message and block for the peer’s response.
ReadMessageRead and decode into Message.

Server API and WebSocket server

The protocol server surface is small:

//protocol server
type ProtocolServer interface {
	ListenAndServerTLS() error
	close() error
}

WebSocket implementation:

func (srv *WSServer) ListenAndServeTLS() error{
		return srv.server.ListenAndServeTLS("","")
}
func (srv *WSServer)Close() error{
	if srv.server != nil{
		return srv.server.Close()
	}
	return nil
}

ws.go (server)

The interesting part is ServeHTTP, which handles edge attach. Flow:

Client API and WebSocket client

Client side is also minimal: Connect dials CloudCore and returns a Connection.

//each protocol(websocket/quic) provides Connect
type Protocolclient interface{
	Connect() (conn.Connection,error)
}

The WebSocket client uses Dial; on success it may run a user callback, then wraps the socket in conn.NewConnection:

//Connect try to connect remote server
func(c *WSClient)Connect()(conn.Connection,error){
	header := c.exOpts.Header
	header.Add("ConnectionUse",string(c.options.ConnUse))
	wsConn,resp,err := c.dialer.Dial(c.options.Addr,header)
	if err ==nil{
		klog.Infof("dial %s successfully",c.options.Addr)

		//do user's processing on connection or response
		if c.exOpts.callback != nil{
			c.exOpts.Callback(wsConn,resp)
		}
		return conn.NewConnection(&conn.ConnectionOptions{
			ConnType: api.ProtocolTypeWS,
			ConnUse:  c.options.ConnUse,
			Base:     wsConn,
			Consumer: c.options.Consumer,
			Handler:  c.options.Handler,
			CtrlLane: lane.NewLane(api.ProtocolTypeWS,wsConn),
			State:    &conn.ConnectionState{
				State:  api.StatConnected,
				Headers:c.exOpts.Header.Clone(),
			},
			AutoRoute: c.options.AutoRoute,
		}),nil
	}

	//something wrong!
	var respMsg string
	if resp != nil{
		body, errRead := io.ReadAll(io.LimitReader(resp.Body,comm.MaxReadLength))
		if errRead ==nil{
			respMsg = fmt.Sprintf("response code: %d,response body: %s",resp.StatusCode,string(body))
		}else{
			respMsg = fmt.Sprintf("response code: %d",resp.StatusCode)
		}
		resp.Body.Close()
	}
	klog.Errorf("dial websocket error(%+v), response message: %s",err, respMsg)

	return nil,err
}

ws.go (client)

CloudHub

CloudHub is a CloudCore module: it accepts edges and shuttles data between controllers and EdgeCore—downstream Kubernetes events (e.g. pod updates) and upstream state from the edge. Placement in the stack:

Major internal pieces:

  • HTTP server — certificate workflows (CA fetch, issuance, rotation).
  • WebSocket server — optional edge attach over WS.
  • QUIC server — optional edge attach over QUIC.
  • CSI socket server — talks to the CSI driver on cloud.
  • Token manager — edge join tokens (default 12h rotation).
  • Certificate manager — issues/rotates edge certificates.
  • Message handler — connection setup and edge message dispatch.
  • Node session manager — per-edge session lifecycle.
  • Message dispatcher — upstream/downstream routing.

CloudHub startup

CloudHub registers when CloudCore starts, then Beehive calls Start().

cloudhub.Register(c.modules.Cloudhub)

Startup order (simplified): launch dispatcher.DispatchDownstream for async downstream work; initialize TLS material (generate CA/service certs if missing); start token manager; StartHTTPServer() for cert enrollment; start the hub listener via Viaduct (WebSocket over TCP or QUIC over UDP); optionally start the CSI UDS server.

func (ch *cloudHub) Start(){
	if !cache.WaitForCacheSync(beehiveContext.Done(),ch.informersSyncedFuncs...
	{
		klog.Errorf("unable to sync caches for objectSyncController")
		os.Exit(1)
	}

	//start dispatch message from the cloud to edge node
	go ch.dispatcher.DispatchDownstream()

	//check whether the certificates exists in the local directory.
	//and then check whether certificates exists in the secret.
	//generate if they don't exist
	if err := httpserver.PrepareAllCerts(): err!=nil{
		klog.Exit(err)
	}

	DoneTLSTunnelCerts <- true
	close(DoneTLSTunnelCerts)

	//generate Token
	if err:=httpserver.GenerateToken():err!=nil{
		klog.Exit(err)
	}

	//HttpServer mainly used to issue certificates for the edge
	go httpserver.StartHTTPServer()

	servers.StartCloudHub(ch.messageHandler)

	if hubconfig.Config.UnixSocket.Enable{
	//The uds server is only used to communicate with csi driver from kubeedge on cloud
	//It is not used to communicate  between cloud and edge
	go udsserver.StartServer(hubconfig.Config.unixSocket.Address)
	}
}

cloudhub.go

Core responsibilities: edge attach and message flow. Internal view:

Downstream delivery modes

Two downstream modes affect session behavior:

ACK mode — After the edge persists a downstream message locally it must ACK the cloud. If the cloud never sees the ACK it retries until one arrives.

NO-ACK mode — No ACK required; the cloud assumes success. Messages can be lost. Often used when the edge is waiting for a synchronous reply; missing replies trigger retries on the edge side.

Edge attach

Attach logic lives in messageHandler. Interface:

type Handler interface{
	//HandleConnection is invoked when a new connection arrives
	HandleConnection(connection conn.Connection)

	//HandleMessage is invoked when a new message arrives
	HandleMessage(container *mux.MessageContainer,writer mux.ResponseWriter)

	//OnEdgeNodeConnect is invoked when a new connection is established
	OnEdgeNodeConnect(info *model.HubInfo,connection conn.Connection) error
	//OnEdgeNodeDisconnect is invoked when a connection is lost
	OnEdgeNodeDisconnect(info *model.HubInfo,connection conn.Connection)

	//OnReadTransportErr is invoked when the connection read message err

}

message_handler.go

HandleConnection runs the attach path—for WebSocket, HTTP upgrades to WS, Viaduct builds Connection, then the handler:

  1. Pre-checks such as max node count.
nodeID := connection.ConnectionState().Headers.Get("node_id")
projectID := connection.ConnectionState().Headers.Get("project_id")

if mh.SessionManager.ReachLimit(){
	klog.Errorf("Fail to serve node %s,reach node limit",nodeID)
	return
}
  1. Allocates nodeMessagePool and registers it with the dispatcher (queue for downstream work).
//init node message pool and add to the dispatcher
nodeMessagePool := common.InitNodeMessagePool(nodeID)
mh.MessageDispatcher.AddNodeMessagePool(nodeID,nodeMessagePool)

nodeMessagePool holds downstream queues per edge—separate stores/queues for ACK vs NO-ACK, matching the modes above.

//NodeMessagePool is a collection of all downstream message sent to an
//edge node.There are two types of messages,one that requires an ack
//and another that does not.For each type of message.we use the 'queue'
//to mark the order of sending, and use the 'store' to store specific messages
type NodeMessagePool struct{
	//AckMessageStore store message that will send to edge node
	//and require acknowledgement from edge node
	AckMessageStore cache.Store
	//AckMessageQueue store message key that will send to edge node
	//and require acknowledgement from edge node
	AckMessageQueue workqueue.RateLimitingInterface
	//NoAckMessageStore store message that will send to edge node
	//and do not require acknowledgement from edge node
	NoAckMessageStore cache.Store
	//NoAckMessageQueue store message key that will send to edge node
	//and do not require acknowledgement from edge node
	NoAckMessageQueue workqueue.RateLimitingInterface
}

message_pool.go

  1. Creates nodeSession, registers it with SessionManager, and starts it.
//create a node session for each edge node
nodeSession := session.NewNodeSession(nodeID,projectID,connection,
	keepaliveInterval,nodeMessagePool,mh.reliableClient)
//add node session  to the session manager
mh.SessionManager.AddSession(nodeSession)

//Start session for each edge node and it will keep running until
//it encounters some Transport Error from underlying connection.
nodeSession.Start()

Each edge gets one NodeSession abstraction; SessionManager tracks every live session. Start spins goroutines for keepalive, SendAckMessage, and SendNoAckMessage.

//Start the main goroutine responsible for serving node session
func (ns *NodeSession)Start(){
	klog.Infof("Start session for edge node %s",ns.nodeID)

	go ns.KeepAliveCheck()
	go ns.SendAckMessage()
	go ns.SendNoAckMessage()

	<-ns.ctx.Done()
}

Upstream / downstream dispatch

HandleMessage is thin: Viaduct decodes into MessageContainer, validate, then MessageDispatcher.DispatchUpstream toward EdgeController, DeviceController, etc.

//HandleMessage handle all the request from node
func (mh *messageHandler)HandleMessage(coantainer *mux.MessageContainer,writer mux.ResponseWriter){
	nodeID := container.Header.Get("node_id")
	projectID := container.Header.Get("project_id")

	//validate message
	if container.Message == nil{
		klog.Errorf("The message is nil for node: %s",nodeID)
		return
	}

	klog.v(4).Infof("[messageHandler]get msg from node(%s): %+v",nodeID,container.Message)

	//dispatch upstream message
	mh.MessageDispatcher.DispatchUpstream(container.Message,&model.HubInfo{ProjectID: projectID,NodeID:nodeID})
}

ACK downstream path (summary):

  1. KubeEdge uses the ObjectSync CRD to remember the latest resourceVersion successfully delivered per resource on each edge. On hub start it reconciles pending vs already-synced versions to avoid replaying stale events.
  2. Controllers enqueue to CloudHub; MessageDispatcher routes by node into the right NodeMessagePool, picking ACK vs NO-ACK from the message. Enqueue consults ObjectSync to skip duplicates.
  3. SendAckMessage drains the pool in order, tracks pending ACKs; when the edge ACKs, persist the new resourceVersion to ObjectSync and send the next item.
  4. EdgeCore persists locally then returns ACK. If the hub misses the ACK it retries (implementation retries a bounded number of times, then drops).
  5. SyncController handles stragglers—e.g. ACK lost in transit—by re-driving delivery until state converges.
func (ns *NodeSession)SendMessageWithRetry(copyMsg, msg *beehivemodel.Message)error{
	ackChan := make(chan struct{})
	ns.ackMessageCache.Store(copyMsg.GetID(),ackChan)

	//initialize retry count and timer for sending message
	retryCount := 0
	ticker := time.NewTimer(sendRetryInterval)

	err := ns.connection.WriteMessageAsync(copyMsg)
	if err !=nil{
		return err
	}

	for{
		select{
		case <- ackChan:
			ns.saveSuccessPoint(msg)
			return nil

		case <- ticker.C:
			if retryCount == 4{
				return ErrwaitTimeout
			}
			err := ns.connection.WriteMessageAsync(copyMsg)
			if err !=nil{
				return err
			}
			retryCount++
			ticker.Reset(sendRetryInterval)
		}
	}
}

cloud/pkg/cloudhub/session

SyncController

Edge networks flap; cloud–edge traffic can drop mid-flight. SyncController is the CloudCore module that keeps delivery honest. KubeEdge persists sync state in ObjectSync CRs: for each edge the cloud records the latest resourceVersion acknowledged, so after restarts or disconnects ordering stays consistent and old revisions are not resent blindly. SyncController also periodically reconciles cloud vs edge views for eventual consistency.

It registers like other modules:

synccontroller.Register(c.Modules.syncController)

Start waits for informer sync, prunes stale ObjectSync objects, then runs reconcile every 5s.

func (sctl *SyncController) Start(){
	if !cache.WaitForCacheSync(beehiveContext.Done(),sctl.informersSyncedFuncs...){
		klog.Errorf("unable to sync caches for sync controller")
		return
	}

	sctl.deleteObjectSyncs() //check outdate sync before start to reconcile
	go wait.Until(sctl.reconcile, 5*time.Second,beehiveContext.Done())
}

synccontroller

ObjectSync names combine node id and object UUID. The controller compares stored resourceVersion against live API objects to decide retries/deletes. When CloudHub enqueues, it also compares against pool state to drop superseded work.

EdgeHub

EdgeHub is the WebSocket or QUIC client facing CloudCore—pulling cloud updates, pushing host/device deltas, etc. It registers during EdgeCore startup:

func Register(eh *v1alpha2.EdgeHub,nodeName string){
	config.InitConfigure(eh,nodeName)
	core.Register(newEdgeHub(eh.Enable))
}

Startup outline:

func (eh *EdgeHub)Start(){
	eh.certManager = certificate.NewCertManager(config.config.EdgeHub,config.config.NodeName)
	eh.certManager.Start()
	for _, v := range GetCertSyncChannel(){
		v <- true
		close(v)
	}

	go eh.ifRotationDone()

	for{
		select{
			case <- beehiveContext.Done():
			klog.warning("EdgeHub stop")
			return
		default:
		}
		err := eh.initial()
		if err !=nil{
			klog.Exitf("failed to init controller:%v",err)
			return
		}
		waitTime := time.Duration(config.Config.Heartbeat)*time.Second*2

		err = eh.chClient.Init()
		if err!= nil{
			klog.Errorf("connection failed: %v,will reconnect after %s",err,waitTime.String())
			time.Sleep(waitTime)
			continue
		}
		//execute hook func after connect
		eh.pubConnectInfo(true)
		go eh.routeToEdge()
		go eh.routeToCloud()
		go eh.keepalive()

		//wait the stop signal
		//stop authinfo manager/websocket connection
		<-eh.reconnectChan
		eh.chClient.UnInit()

		//execute hook function after disconnect
		eh.pubConnectInfo(false)

		//sleep one period of heartbeat, then try to connect cloud hub again
		klog.warningf("connection is broken, will reconnect after %s",waitTime.String())
		time.Sleep(waitTime)

	//clean channel
	clean:
		for{
			select{
			case <- eh.reconnectChan:
			default:
				break clean
			}
		}
	}
}

edgehub.go

Rough sequence:

  1. Cert bootstrap from CloudCore (or local files), rotation hooks, then the reconnect loop.
  2. eh.initial() builds chClient; Init() opens the Viaduct connection.
  3. pubConnectInfo(true) tells other EdgeCore modules the tunnel is up.
  4. Three goroutines: routeToEdge, routeToCloud, keepalive.

routeToEdge reads cloud→edge traffic; for sync replies it uses SendResp, otherwise routes by Beehive group.

func (eh *EdgeHub)routeToEdge(){
	for{
		select{
		case<-beehiveContext.Done():
			klog.Warning("EdgeHub RouteToEdge stop")
			return
		default:
		}
		message, err := eh.chClient.Receive()
		if err!=nil{
			klog.Errorf("failed to dispatch message,discard: %v",err)
		}
	}
}

routeToCloud reads from Beehive and writes to CloudHub.

func (eh *EdgeHub)routeToCloud(){
	for{
		select{
		case<-beehiveContext.Done():
			klog.warning("EdgeHub RouteToCloud stop")
			return
		default:
		}
		message,err:= beehiveContext.Receive(modules.EdgeHubModuleName)
		if err !=nil{
			klog.Errorf("failed to receive message from edge: %v",err)
			time.Sleep(time.Second)
			continue
		}

		err = eh.tryThrottle(message.GetID())
		if err !=nil{
			klog.Errorf("msgID: %s,client rate limiter returned an error: %v",message.GetID(),err)
			continue
		}

		//post message to cloud hub
		err = eh.sendToCloud(message)
		if err !=nil{
			klog.Errorf("failed to send message to cloud: %v",err)
			eh.reconnectChan <- struct{}{}
			return
		}
	}
}

keepalive periodically pings the cloud.

func (eh *EdgeHub)keepalive(){
	for{
		select{
		case <-beehiveContext.Done():
			klog.warning("EdgeHub KeepAlive stop")
			return
		default:
		}
		msg := model.NewMessage("").
			BuildRouter(modules.EdgeHubModuleName,"resource","node",messagepkg.OperationKeepalive)
			FillBody("ping")

		//post message to cloud hub
		err := eh.sendToCloud(*msg)
		if err !=nil{
			klog.Errorf("websocket write error: %v",err)
			eh.reconnectChan <- struct{}{}
			return
		}
		time.Sleep(time.Duration(config.Config.Heartbeat)*time.Second)
	}
}

On transport errors EdgeHub tears down and re-initializes the client to dial CloudHub again.