KubeEdge-Sedna源码解析(转载)-en

Original author: jaypume
Original lecture video: https://www.bilibili.com/video/BV1hg4y1b78L
Original article: https://github.com/jaypume/article/blob/main/sedna/边云协同AI框架Sedna源码解析/README.MD
Reposted for personal study and easier reference.

KubeEdge–Sedna overview

Sedna is an edge–cloud collaborative AI project incubated in the KubeEdge SIG AI. Building on KubeEdge’s edge–cloud capabilities, Sedna supports collaborative training and inference across edge and cloud—for example joint inference, incremental learning, federated learning, and lifelong learning. It works with widely used AI frameworks such as TensorFlow, PyTorch, and MindSpore, so existing AI workloads can migrate to Sedna with minimal friction to gain collaborative training and inference, with potential benefits in cost, model quality, and data privacy.

Project home:
https://github.com/kubeedge/sedna
Documentation:
https://sedna.readthedocs.io

Overall architecture

Sedna’s edge–cloud collaboration relies on the following KubeEdge capabilities:

  • Unified orchestration of applications across edge and cloud
  • Router: a reliable management-plane messaging channel between cloud and edge
  • EdgeMesh: data-plane cross-edge–cloud service discovery and traffic management

Core components:

  • GlobalManager
    • Unified management of edge–cloud collaborative AI jobs
    • Cross-edge–cloud coordination and management
    • Central configuration management
  • LocalController
    • Local control flow for collaborative AI jobs
    • Local common management: models, datasets, status sync, etc.
  • Lib
    • For AI and application developers: exposes collaborative AI capabilities to applications
  • Worker
    • Runs training or inference jobs—training/inference programs built on existing AI frameworks
    • Each feature maps to a worker group; workers can run on edge or cloud and cooperate

Repository layout

DirectoryDescription
.githubSedna GitHub CI/CD pipeline configuration.
LICENSESSedna licenses and related vendor licenses.
buildDockerfiles for building GM/LC and other control-plane components; generated CRD YAML; sample CRD YAML.
cmdEntrypoints for GM/LC control-plane binaries.
componentsMonitoring and visualization components.
docsProposals and installation docs.
examplesExamples for joint inference, incremental learning, lifelong learning, and federated learning.
hackCode generators and other scripts for developers.
libSedna Library—Python dependency for building collaborative AI applications.
pkgAPI definitions; generated client-go code for CRDs; core Sedna GM/LC control-plane code.
scriptsInstallation scripts for users.
testE2E tests and tooling.
vendorVendored third-party source.

Sedna control plane source (Go)

GM: Global Manager

GM as a Kubernetes operator

What is an operator?

An Operator is an application-specific controller that extends the Kubernetes API to create, configure and manage instances of complex stateful applications on behalf of a Kubernetes user. It builds upon the basic Kubernetes resource and controller concepts, but also includes domain or application-specific knowledge to automate common tasks better managed by computers. [1]

For Sedna, the project governs how collaborative AI applications configure worker startup parameters, how they coordinate, and how data and artifacts flow. We can define it this way: Sedna GM is a domain-specific controller for “edge–cloud collaborative AI applications.”

The following components form the three main parts of an operator:

  • API: The data that describes the operand’s configuration. The API includes:
    • Custom resource definition (CRD), which defines a schema of settings available for configuring the operand.
    • Programmatic API, which defines the same data schema as the CRD and is implemented using the operator’s programming language, such as Go.
    • Custom resource (CR), which specifies values for the settings defined by the CRD; these values describe the configuration of an operand.
  • Controller: The brains of the operator. The controller creates managed resources based on the description in the custom resource; controllers are implemented using the operator’s programming language, such as Go. [2]

From Red Hat’s definition, the main pieces of a Kubernetes operator are CRD, API, CR, and Controller.

The following diagram illustrates the Sedna GM operator:

The following sections follow that breakdown—CR, CRD, API, and Controller—with Controller as the main control logic.

CR

Sedna supports collaborative inference, incremental learning, lifelong learning, and federated learning. For clarity, this article walks through lifelong learning using its concrete behavior and examples. The other three features share similar patterns in the codebase.

CR example

Below is a lifelong learning CR sample. You can create the corresponding lifelong learning object with kubectl from this CR; see this example for full steps. Important fields:

  • dataset: name of the dataset object (itself a CR).
  • trainSpec: container settings for the training worker in lifelong learning—image, env, etc.
  • trigger: conditions that start the training worker in lifelong learning.
  • evalSpec: container settings for the evaluation worker—image, env, etc.
  • deploySpec: container settings for the inference worker—image, env, etc.
  • outputDir: where trained model artifacts are written in lifelong learning.

build/crd-samples/sedna/lifelonglearningjobv1alpha1.yaml

apiVersion: sedna.io/v1alpha1
kind: LifelongLearningJob
metadata:
  name: atcii-classifier-demo
spec:
  dataset:
    name: "lifelong-dataset"
    trainProb: 0.8
  trainSpec:
    template:
      spec:
        nodeName:  "edge-node"
        containers:
          - image: kubeedge/sedna-example-lifelong-learning-atcii-classifier:v0.3.0
            name:  train-worker
            imagePullPolicy: IfNotPresent
            args: ["train.py"]
            env:
              - name: "early_stopping_rounds"
                value: "100"
              - name: "metric_name"
                value: "mlogloss"
    trigger:
      checkPeriodSeconds: 60
      timer:
        start: 02:00
        end: 24:00
      condition:
        operator: ">"
        threshold: 500
        metric: num_of_samples
  evalSpec:
    template:
      spec:
        nodeName:  "edge-node"
        containers:
          - image: kubeedge/sedna-example-lifelong-learning-atcii-classifier:v0.3.0
            name:  eval-worker
            imagePullPolicy: IfNotPresent
            args: ["eval.py"]
            env:
              - name: "metrics"
                value: "precision_score"
              - name: "metric_param"
                value: "{'average': 'micro'}"
              - name: "model_threshold"
                value: "0.5"
  deploySpec:
    template:
      spec:
        nodeName:  "edge-node"
        containers:
        - image: kubeedge/sedna-example-lifelong-learning-atcii-classifier:v0.3.0
          name:  infer-worker
          imagePullPolicy: IfNotPresent
          args: ["inference.py"]
          env:
          - name: "UT_SAVED_URL"
            value: "/ut_saved_url"
          - name: "infer_dataset_url"
            value: "/data/testData.csv"
          volumeMounts:
          - name: utdir
            mountPath: /ut_saved_url
          - name: inferdata
            mountPath: /data/
          resources:
            limits:
              memory: 2Gi
        volumes:
          - name: utdir
            hostPath:
              path: /lifelong/unseen_task/
              type: DirectoryOrCreate
          - name: inferdata
            hostPath:
              path:  /data/
              type: DirectoryOrCreate
  outputDir: "/output"

CRD

A CRD is the template for CRs. Before you can create CRs of a kind, the CRD must be registered in the cluster. CRD YAML can be written by hand or generated; for complex CRDs, generation is recommended. Sedna uses kubebuilder’s controller-gen. The repo wraps this in scripts—run make crds to generate or refresh CRDs under build/crds/. See the crds: controller-gen target in Makefile.

A CRD must declare group, version, and kind—often shortened to GVK. CR instances are resources; loosely, Resource is like an object instance and Kind is like a class—so a Resource is an instance of a Kind. The table below maps lifelong learning CRD and CR to GVR/GVK:

GroupVersionResourceKind
CRDapiextensions.k8s.iov1lifelonglearningjobs.sedna.ioCustomResourceDefinition
CRsedna.iov1alpha1lifelonglearningjobLifelongLearningJob

In Kubernetes, resources are exposed via REST URIs organized as follows:

With these rules you can construct REST URIs for resources—useful when you cannot rely on kubectl or client-go. Examples:

Fetch the lifelong learning CRD via REST:

curl -k --cert ./client.crt --key ./client.key https://127.0.0.1:5443/apis/apiextensions.k8s.io/v1beta1/customresourcedefinitions/lifelonglearningjobs.sedna.io

List lifelong learning CRs via REST:

curl -k --cert ./client.crt --key ./client.key https://127.0.0.1:5443/apis/sedna.io/v1alpha1/lifelonglearningjobs

Languages without an official Kubernetes client can wrap these REST patterns uniformly.

Key fields in Sedna’s lifelong learning CRD:

  • apiVersion: apiextensions.k8s.io/v1 — CRDs extend this API version.
  • kind: CustomResourceDefinition — all CRDs use this kind.
  • spec.group: sedna.io — API group for custom resources.
  • spec.names.kind: LifelongLearningJob — the new resource type.
  • spec.names.shortNames: - llkubectl short name for LifelongLearningJob.

build/crds/sedna.io_lifelonglearningjobs.yaml

apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  annotations:
    controller-gen.kubebuilder.io/version: v0.4.1
  creationTimestamp: null
  name: lifelonglearningjobs.sedna.io
spec:
  group: sedna.io
  names:
    kind: LifelongLearningJob
    listKind: LifelongLearningJobList
    plural: lifelonglearningjobs
    shortNames:
    - ll
    singular: lifelonglearningjob
  scope: Namespaced
  versions:
  - name: v1alpha1
	...
status:
  acceptedNames:
    kind: ""
    plural: ""
  conditions: []
  storedVersions: []

API

The CRDs are auto-generated—where do the underlying API definitions live?
pkg/apis/sedna/v1alpha1/lifelonglearningjob_types.go

package v1alpha1

import (
	v1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

// Shown here:
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +kubebuilder:resource:shortName=ll
// +kubebuilder:subresource:status

// LifelongLearningJob API definition: primarily Spec and Status—desired vs observed state.
type LifelongLearningJob struct {
	metav1.TypeMeta   `json:",inline"`
	metav1.ObjectMeta `json:"metadata"`
	Spec              LLJobSpec   `json:"spec"`
	Status            LLJobStatus `json:"status,omitempty"`
}

// Parameters required when creating a LifelongLearningJob; extend lifelong-learning fields here.
type LLJobSpec struct {
	Dataset    LLDataset    `json:"dataset"`
	TrainSpec  LLTrainSpec  `json:"trainSpec"`
	EvalSpec   LLEvalSpec   `json:"evalSpec"`
	DeploySpec LLDeploySpec `json:"deploySpec"`

	// the credential referer for OutputDir
	CredentialName string `json:"credentialName,omitempty"`
	OutputDir      string `json:"outputDir"`
}

type LLDataset struct {
	Name      string  `json:"name"`
	TrainProb float64 `json:"trainProb"`
}

// Additional struct definitions are omitted.

Takeaways from the snippet:

  • // +kubebuilder... — comments are inputs to kubebuilder and similar generators.
  • type LifelongLearningJob struct{...} — top-level API for the CRD; holds Spec (desired) and Status (observed).
  • type LLJobSpec struct {...} — fields for the CR; extend here when adding lifelong-learning parameters.

API types for joint inference, incremental learning, and federated learning live under pkg/apis/sedna/v1alpha1/.

Regenerate client-go code

After you add or change definitions in *_types.go, refresh generated clients:

bash hack/update-codegen.sh

Generated code is under pkg/client:

➜  pkg tree client -L 2
client
├── clientset
│   └── versioned
├── informers
│   └── externalversions
└── listers
    └── sedna

These clients are used throughout Controller logic.

Regenerate CRD manifests

After API changes, refresh CRD YAML:

make crds

CRDs land in build/crds. Re-apply them to the cluster with kubectl apply so the cluster picks up changes.

Controller

The main lifelong-learning control logic lives in pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go—when train/eval workers run, how parameters sync to the edge, etc.

Before diving in, the overall call flow can be sketched as:

cmd/sedna-gm/sedna-gm.go/main() 【1】
pkg/globalmanager/controllers/manager.go/New() 【2】 load GM config.
pkg/globalmanager/controllers/manager.go/Start() 【3】 start GM.
    - clientset.NewForConfig():【4】 build Sedna CRD client from client-go.
    - NewUpstreamController():【5】 one UpstreamController per GM process
    - uc.Run(stopCh): goroutine loop handling
        - pkg/globalmanager/controllers/upstream.go/syncEdgeUpdate() 
    - NewRegistry():【6】 register all feature controllers.
        - f.SetDownstreamSendFunc()【7】
            -> pkg/globalmanager/controllers/lifelonglearning/downstream.go
        - f.SetUpstreamHandler()【8】
            -> pkg/globalmanager/controllers/lifelonglearning/upstream.go/updateFromEdge()
        - f.Run()【9】
    - ws.ListenAndServe() 【10

The following subsections follow markers 【1】–【10】.

【1】 main entrypoint

sedna-gm.go is the GM binary entry: logging setup, app.NewControllerCommand() parses flags and starts GM controllers.

cmd/sedna-gm/sedna-gm.go

func main() {
   rand.Seed(time.Now().UnixNano())

   command := app.NewControllerCommand()
   logs.InitLogs()
   defer logs.FlushLogs()

   if err := command.Execute(); err != nil {
      os.Exit(1)
   }
}

【2】 Load GM configuration

GM loads cluster config, WebSocket listen address/port, knowledge-base (KB) endpoints, etc.

pkg/globalmanager/controllers/manager.go

// New creates the controller manager
func New(cc *config.ControllerConfig) *Manager {
   config.InitConfigure(cc)
   return &Manager{
      Config: cc,
   }
}

pkg/globalmanager/config/config.go

// ControllerConfig indicates the config of controller
type ControllerConfig struct {
   // KubeAPIConfig indicates the kubernetes cluster info which controller will connected
   KubeConfig string `json:"kubeConfig,omitempty"`

   // Master indicates the address of the Kubernetes API server. Overrides any value in KubeConfig.
   // such as https://127.0.0.1:8443
   // default ""
   Master string `json:"master"`
   // Namespace indicates which namespace the controller listening to.
   // default ""
   Namespace string `json:"namespace,omitempty"`

   // websocket server config
   // Since the current limit of kubeedge(1.5), GM needs to build the websocket channel for communicating between GM and LCs.
   WebSocket WebSocket `json:"websocket,omitempty"`

   // lc config to info the worker
   LC LCConfig `json:"localController,omitempty"`

   // kb config to info the worker
   KB KBConfig `json:"knowledgeBaseServer,omitempty"`

   // period config min resync period
   // default 30s
   MinResyncPeriodSeconds int64 `json:"minResyncPeriodSeconds,omitempty"`
}

【3】 GM startup sequence

Startup initializes the Sedna CRD client, wires edge–cloud messaging, starts per-feature controllers, and opens the WebSocket listener.

pkg/globalmanager/controllers/manager.go

// Start starts the controllers it has managed
func (m *Manager) Start() error {
   ...
   // Initialize Sedna CRD client; controllers watch Sedna CR changes and react.
   sednaClient, err := clientset.NewForConfig(kubecfg)

   ...
   sednaInformerFactory := sednainformers.NewSharedInformerFactoryWithOptions(sednaClient, genResyncPeriod(minResyncPeriod), sednainformers.WithNamespace(namespace))

   // UpstreamController handles messages uploaded from edge LCs
   uc, _ := NewUpstreamController(context)
   downstreamSendFunc := messagelayer.NewContextMessageLayer().SendResourceObject
   stopCh := make(chan struct{})
   go uc.Run(stopCh)

   // For each feature (joint inference, lifelong learning, ...), bind message handlers
   for name, factory := range NewRegistry() {
      ...
      f.SetDownstreamSendFunc(downstreamSendFunc)
      f.SetUpstreamHandler(uc.Add)
      ...
      // Start that feature’s controller
      go f.Run(stopCh)
   }

   ...

   // GM WebSocket server, default 0.0.0.0:9000
   ws := websocket.NewServer(addr)
   ...
}

【4】 Initialize the CRD client

clientset.NewForConfig() is implemented in pkg/client/clientset/versioned/clientset.go—generated from the Sedna API types for typed CRUD.

New for the lifelong-learning controller does roughly:

  • Obtain the LifelongLearningJob informer—a local cache backed by the API server to reduce load.
  • Wire controller fields: Kubernetes client, Sedna client, shared GM config.
  • Register Add/Update/Delete handlers on LifelongLearningJob.
    pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go
// New creates a new LifelongLearningJob controller that keeps the relevant pods
// in sync with their corresponding LifelongLearningJob objects.
func New(cc *runtime.ControllerContext) (runtime.FeatureControllerI, error) {
   cfg := cc.Config

   podInformer := cc.KubeInformerFactory.Core().V1().Pods()

   // LifelongLearningJob informer
   jobInformer := cc.SednaInformerFactory.Sedna().V1alpha1().LifelongLearningJobs()

   eventBroadcaster := record.NewBroadcaster()
   eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: cc.KubeClient.CoreV1().Events("")})

   // Controller fields
   jc := &Controller{
      kubeClient: cc.KubeClient,
      client:     cc.SednaClient.SednaV1alpha1(),
      queue:      workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(runtime.DefaultBackOff, runtime.MaxBackOff), Name),
      cfg:        cfg,
   }

   // LifelongLearningJob Add/Update/Delete callbacks
   jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
      AddFunc: func(obj interface{}) {
         jc.enqueueController(obj, true)
         jc.syncToEdge(watch.Added, obj)
      },
      UpdateFunc: func(old, cur interface{}) {
         jc.enqueueController(cur, true)
         jc.syncToEdge(watch.Added, cur)
      },
      DeleteFunc: func(obj interface{}) {
         jc.enqueueController(obj, true)
         jc.syncToEdge(watch.Deleted, obj)
      },
   })
   jc.jobLister = jobInformer.Lister()
   jc.jobStoreSynced = jobInformer.Informer().HasSynced

   // Pod Add/Update/Delete callbacks
   podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
      AddFunc:    jc.addPod,
      UpdateFunc: jc.updatePod,
      DeleteFunc: jc.deletePod,
   })
   jc.podStore = podInformer.Lister()
   jc.podStoreSynced = podInformer.Informer().HasSynced

   return jc, nil
}

The screenshot below shows other modules referencing the Sedna CRD client.

【5】 Message handling setup

uc.Run() drives the UpstreamController, which processes all messages from edges. A loop reads context.upstreamChannel; on each message it looks up uc.updateHandlers[kind] and dispatches. That map holds handlers for joint inference, incremental learning, federated learning, lifelong learning, etc.

pkg/globalmanager/controllers/upstream.go

// syncEdgeUpdate receives the updates from edge and syncs these to k8s.
func (uc *UpstreamController) syncEdgeUpdate() {
   for {
      select {
      case <-uc.messageLayer.Done():
         klog.Info("Stop sedna upstream loop")
         return
      default:
      }

      update, err := uc.messageLayer.ReceiveResourceUpdate()
	  ...

      handler, ok := uc.updateHandlers[kind]
      if ok {
         err := handler(name, namespace, operation, update.Content)
         ...
      }
   }
}

ReceiveFromEdge blocks on a channel carrying nodeMessage values from edge LCs.

pkg/globalmanager/messagelayer/ws/context.go

// ReceiveResourceUpdate receives and handles the update
func (cml *ContextMessageLayer) ReceiveResourceUpdate() (*ResourceUpdateSpec, error) {
   nodeName, msg, err := wsContext.ReceiveFromEdge()
   ...
}

【6】 Controller registry

NewRegistry() registers constructors for every feature; add a New function here when introducing a new collaborative capability.

pkg/globalmanager/controllers/registry.go

func NewRegistry() Registry {
   return Registry{
      ji.Name:      ji.New,
      fe.Name:      fe.New,
      fl.Name:      fl.New,
      il.Name:      il.New,
      ll.Name:      ll.New,
      reid.Name:    reid.New,
      va.Name:      va.New,
      dataset.Name: dataset.New,
      objs.Name:    objs.New,
   }
}

【7】 Cloud → edge sync

f.SetDownstreamSendFunc() binds each feature’s syncToEdge() implementation.

For lifelong learning, syncing roughly:

  • Resolve the node named on the Dataset CR.
  • Read train/eval/deploy node names from annotations.
  • Depending on the job stage, send to the appropriate node.

pkg/globalmanager/controllers/lifelonglearning/downstream.go

func (c *Controller) syncToEdge(eventType watch.EventType, obj interface{}) error {

   // Dataset CR carries the target node name
   ds, err := c.client.Datasets(job.Namespace).Get(context.TODO(), dataName, metav1.GetOptions{})
   
   // Train / eval / deploy node names from annotations
   getAnnotationsNodeName := func(nodeName sednav1.LLJobStage) string {
      return runtime.AnnotationsKeyPrefix + string(nodeName)
   }
   ann := job.GetAnnotations()
   if ann != nil {
      trainNodeName = ann[getAnnotationsNodeName(sednav1.LLJobTrain)]
      evalNodeName = ann[getAnnotationsNodeName(sednav1.LLJobEval)]
      deployNodeName = ann[getAnnotationsNodeName(sednav1.LLJobDeploy)]
   }
   
   ...

   // Route by current stage
   switch jobStage {
   case sednav1.LLJobTrain:
      doJobStageEvent(trainNodeName)
   case sednav1.LLJobEval:
      doJobStageEvent(evalNodeName)
   case sednav1.LLJobDeploy:
      doJobStageEvent(deployNodeName)
   }

   return nil
}

【8】 Edge → cloud sync

f.SetUpstreamHandler() binds each feature’s updateFromEdge().

For lifelong learning it:

  • Updates aggregate LifelongLearningJob status from per-edge progress.
  • Persists status to the API server (Status on the CR).
  • Parses JSON payloads from the edge—for example:
{
    "phase": "train",
    "status": "completed",
    "output": {
        "models": [{
            "classes":  ["road", "fence"],
            "current_metric": null,
            "format": "pkl",
            "metrics": null,
            "url": "/output/train/1/index.pkl"
        }],
        "ownerInfo": null
    }
}

pkg/globalmanager/controllers/lifelonglearning/upstream.go

// updateFromEdge syncs the edge updates to k8s
func (c *Controller) updateFromEdge(name, namespace, operation string, content []byte) error {
   var jobStatus struct {
      Phase  string `json:"phase"`
      Status string `json:"status"`
   }
   
   // Parse edge JSON
   err := json.Unmarshal(content, &jobStatus)
   ...
   cond := sednav1.LLJobCondition{
      Status:             v1.ConditionTrue,
      LastHeartbeatTime:  metav1.Now(),
      LastTransitionTime: metav1.Now(),
      Data:               string(condDataBytes),
      Message:            "reported by lc",
   }

   // Map edge status into LifelongLearningJob conditions
   switch strings.ToLower(jobStatus.Status) {
   case "ready":
      cond.Type = sednav1.LLJobStageCondReady
   case "completed":
      cond.Type = sednav1.LLJobStageCondCompleted
   case "failed":
      cond.Type = sednav1.LLJobStageCondFailed
   case "waiting":
      cond.Type = sednav1.LLJobStageCondWaiting
   default:
      return fmt.Errorf("invalid condition type: %v", jobStatus.Status)
   }

   // Write back Status on the LifelongLearningJob CR
   err = c.appendStatusCondition(name, namespace, cond)
   ...
}

【9】 Core controller loop

f.Run() starts each feature controller. For lifelong learning, Run() waits for informer sync, then starts worker goroutines.

pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go

// Run starts the main goroutine responsible for watching and syncing jobs.
func (c *Controller) Run(stopCh <-chan struct{}) {
   workers := 1

   defer utilruntime.HandleCrash()
   defer c.queue.ShutDown()

   klog.Infof("Starting %s controller", Name)
   defer klog.Infof("Shutting down %s controller", Name)

   if !cache.WaitForNamedCacheSync(Name, stopCh, c.podStoreSynced, c.jobStoreSynced) {
      klog.Errorf("failed to wait for %s caches to sync", Name)

      return
   }
   klog.Infof("Starting %s workers", Name)
   for i := 0; i < workers; i++ {
      go wait.Until(c.worker, time.Second, stopCh)
   }

   <-stopCh
}

worker calls processNextWorkItem() so syncHandler never runs the same key concurrently.

pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (c *Controller) worker() {
   for c.processNextWorkItem() {
   }
}

processNextWorkItem() invokes sync().

pkg/globalmanager/controllers/lifelonglearning/lifelonglearningjob.go

func (c *Controller) sync(key string) (bool, error) {
   // Part of the implementation omitted
   ns, name, err := cache.SplitMetaNamespaceKey(key)

   sharedJob, err := c.jobLister.LifelongLearningJobs(ns).Get(name)

   // if job was finished previously, we don't want to redo the termination
   if IsJobFinished(&job) {
      return true, nil
   }

   // transit this job's state machine
   needUpdated, err = c.transitJobState(&job)


   if needUpdated {
      if err := c.updateJobStatus(&job); err != nil {
         return forget, err
      }

      if jobFailed && !IsJobFinished(&job) {
         // returning an error will re-enqueue LifelongLearningJob after the backoff period
         return forget, fmt.Errorf("failed pod(s) detected for lifelonglearningjob key %q", key)
      }

      forget = true
   }

   return forget, err
}

sync handles a single job:

  • Split the work-queue key into namespace and name.
  • Load the LifelongLearningJob via the lister.
  • transitJobState advances train → eval → deploy as appropriate.
  • If status changed, updateJobStatus writes it back so kubectl reflects current phase, model paths, etc.
  • Handle failures and retries.
// transit this job's state machine
needUpdated, err = c.transitJobState(&job)

transitJobState() is the state machine—when each stage starts/stops. Use the diagram below together with the code.

【10】 WebSocket server

GM listens for edge messages (as in 【8】), defaulting to 0.0.0.0:9000.

pkg/globalmanager/controllers/manager.go

addr := fmt.Sprintf("%s:%d", m.Config.WebSocket.Address, m.Config.WebSocket.Port)

ws := websocket.NewServer(addr)
err = ws.ListenAndServe()

LC: Local Controller

LC runs on edge nodes for local job management and message relaying. The binary entry is cmd/sedna-lc/sedna-lc.go (same pattern as GM). Below is where feature managers are registered:

cmd/sedna-lc/app/server.go

// runServer runs server
func runServer() {
   c := gmclient.NewWebSocketClient(Options)
   if err := c.Start(); err != nil {
      return
   }

   dm := dataset.New(c, Options)
   mm := model.New(c)
   jm := jointinference.New(c)
   fm := federatedlearning.New(c)
   im := incrementallearning.New(c, dm, mm, Options)
   lm := lifelonglearning.New(c, dm, Options)
   s := server.New(Options)

   for _, m := range []managers.FeatureManager{
      dm, mm, jm, fm, im, lm,
   } {
      s.AddFeatureManager(m)
      c.Subscribe(m)
      err := m.Start()
      if err != nil {
         klog.Errorf("failed to start manager %s: %v",
            m.GetName(), err)
         return
      }
      klog.Infof("manager %s is started", m.GetName())
   }

   s.ListenAndServe()
}

Local job management

The Manager struct models edge-side job management:

pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go

// LifelongLearningJobManager defines lifelong-learning-job Manager
type Manager struct {
   Client                 clienttypes.ClientI
   WorkerMessageChannel   chan workertypes.MessageContent
   DatasetManager         *dataset.Manager
   LifelongLearningJobMap map[string]*Job
   VolumeMountPrefix      string
}

startJob() illustrates the flow:

  • Watch synced Dataset objects—for example trigger training when sample count crosses a threshold.
  • Per stage, drive train/eval/deploy by reporting state up to GM; GM schedules the actual workloads rather than LC starting pods directly.

pkg/localcontroller/managers/lifelonglearning/lifelonglearningjob.go

// startJob starts a job
func (lm *Manager) startJob(name string) {
   ...
    
   // Watch Dataset CRs synced to the edge
   go lm.handleData(job)

   tick := time.NewTicker(JobIterationIntervalSeconds * time.Second)
   for {
      // Drive train/eval/deploy by stage
      select {
      case <-job.JobConfig.Done:
         return
      case <-tick.C:
         cond := lm.getLatestCondition(job)
         jobStage := cond.Stage

         switch jobStage {
         case sednav1.LLJobTrain:
            err = lm.trainTask(job)
         case sednav1.LLJobEval:
            err = lm.evalTask(job)

         case sednav1.LLJobDeploy:
            err = lm.deployTask(job)
         default:
            klog.Errorf("invalid phase: %s", jobStage)
            continue
         }
		 ...
      }
   }
}

Beyond orchestration, LC also covers dataset monitoring, model downloads, and local persistence.

Message proxy

Besides pushing status to the cloud, LC serves HTTP on 0.0.0.0:9100, aggregating messages from the Lib SDK before forwarding to GM. Route registration:

pkg/localcontroller/server/server.go

// register registers api
func (s *Server) register(container *restful.Container) {
	ws := new(restful.WebService)
	ws.Path(fmt.Sprintf("/%s", constants.ServerRootPath)).
		Consumes(restful.MIME_XML, restful.MIME_JSON).
		Produces(restful.MIME_JSON, restful.MIME_XML)

	ws.Route(ws.POST("/workers/{worker-name}/info").
		To(s.messageHandler).
		Doc("receive worker message"))
	container.Add(ws)
}

pkg/localcontroller/server/server.go

// messageHandler handles message from the worker
func (s *Server) messageHandler(request *restful.Request, response *restful.Response) {
   var err error
   workerName := request.PathParameter("worker-name")
   workerMessage := workertypes.MessageContent{}

   err = request.ReadEntity(&workerMessage)
   if workerMessage.Name != workerName || err != nil {
      var msg string
      if workerMessage.Name != workerName {
         msg = fmt.Sprintf("worker name(name=%s) in the api is different from that(name=%s) in the message body",
            workerName, workerMessage.Name)
      } else {
         msg = fmt.Sprintf("read worker(name=%s) message body failed, error: %v", workerName, err)
      }

      klog.Errorf(msg)
      err = s.reply(response, http.StatusBadRequest, msg)
      if err != nil {
         klog.Errorf("reply messge to worker(name=%s) failed, error: %v", workerName, err)
      }
   }

   if m, ok := s.fmm[workerMessage.OwnerKind]; ok {
      m.AddWorkerMessage(workerMessage)
   }

   err = s.reply(response, http.StatusOK, "OK")
   if err != nil {
      klog.Errorf("reply message to worker(name=%s) failed, error: %v", workerName, err)
      return
   }
}

Sedna Lib source (Python)

Lib is the Python SDK for AI and application developers to adapt existing code to edge–cloud collaboration.

Directory layout:

➜  sedna tree lib -L 2
lib
├── __init__.py
├── MANIFEST.in
├── OWNERS
├── requirements.dev.txt
├── requirements.txt    // Sedna Python dependencies
├── sedna
│   ├── algorithms  // Collaborative algorithms
│   ├── backend     // Backends: TensorFlow, PyTorch, ...
│   ├── common
│   ├── core        // Feature implementations
│   ├── datasources // Formats such as txt, csv
│   ├── __init__.py
│   ├── README.md
│   ├── service     // Components that run servers (e.g. KB)
│   ├── VERSION
│   └── __version__.py
└── setup.py

Highlights by area:

core

core wraps user callbacks. The train path below wires post-processing, cloud knowledge-base training/inference, KB updates (lifelong learning continuously refreshes models and samples), and LC reporting (completion, metrics).

lib/sedna/core/lifelong_learning/lifelong_learning.py

def train(self, train_data,
          valid_data=None,
          post_process=None,
          **kwargs):
    is_completed_initilization = \
        str(Context.get_parameters("HAS_COMPLETED_INITIAL_TRAINING",
                                   "false")).lower()

    if is_completed_initilization == "true":
        return self.update(train_data,
                           valid_data=valid_data,
                           post_process=post_process,
                           **kwargs)

    # Configure post-processing callback
    callback_func = None
    if post_process is not None:
        callback_func = ClassFactory.get_cls(
            ClassType.CALLBACK, post_process)
    res, seen_task_index = \
        self.cloud_knowledge_management.seen_estimator.train(
            train_data=train_data,
            valid_data=valid_data,
            **kwargs
        ) 

    # Train against cloud knowledge base (seen + unseen paths)
    unseen_res, unseen_task_index = \
        self.cloud_knowledge_management.unseen_estimator.train()

    # Refresh cloud KB indices
    task_index = dict(
        seen_task=seen_task_index,
        unseen_task=unseen_task_index)
    task_index_url = FileOps.dump(
        task_index, self.cloud_knowledge_management.local_task_index_url)

    task_index = self.cloud_knowledge_management.update_kb(task_index_url)
    res.update(unseen_res)

    ...
    
    # Report status to LC (completion, metrics, ...)
    self.report_task_info(
            None, K8sResourceKindStatus.COMPLETED.value, task_info_res)
        self.log.info(f"Lifelong learning Train task Finished, "
                      f"KB index save in {task_index}")
        return callback_func(self.estimator, res) if callback_func else res
    
    ...

backend

MSBackend shows how Sedna plugs in MindSpore: implement train, predict, and evaluate and Lib can treat a framework as a backend, enabling thin wrappers around existing AI code for collaboration.

lib/sedna/backend/mindspore/__init__.py

class MSBackend(BackendBase):
    def __init__(self, estimator, fine_tune=True, **kwargs):
        super(MSBackend, self).__init__(estimator=estimator,
                                        fine_tune=fine_tune,
                                        **kwargs)
        self.framework = "mindspore"
        if self.use_npu:
            context.set_context(mode=context.GRAPH_MODE,
                                device_target="Ascend")
        elif self.use_cuda:
            context.set_context(mode=context.GRAPH_MODE,
                                device_target="GPU")
        else:
            context.set_context(mode=context.GRAPH_MODE,
                                device_target="CPU")

        if callable(self.estimator):
            self.estimator = self.estimator()

    def train(self, train_data, valid_data=None, **kwargs):
        if callable(self.estimator):
            self.estimator = self.estimator()
        if self.fine_tune and FileOps.exists(self.model_save_path):
            self.finetune()
        self.has_load = True
        varkw = self.parse_kwargs(self.estimator.train, **kwargs)
        return self.estimator.train(train_data=train_data,
                                    valid_data=valid_data,
                                    **varkw)

    def predict(self, data, **kwargs):
        if not self.has_load:
            self.load()
        varkw = self.parse_kwargs(self.estimator.predict, **kwargs)
        return self.estimator.predict(data=data, **varkw)

    def evaluate(self, data, **kwargs):
        if not self.has_load:
            self.load()
        varkw = self.parse_kwargs(self.estimator.evaluate, **kwargs)
        return self.estimator.evaluate(data, **varkw)

datasource

datasource packages common dataset parsers so callers avoid boilerplate.

lib/sedna/datasources/__init__.py

class CSVDataParse(BaseDataSource, ABC):
    """
    csv file which contain Structured Data parser
    """
    # Helpers to parse tabular datasets
    def parse(self, *args, **kwargs):
        x_data = []
        y_data = []
        label = kwargs.pop("label") if "label" in kwargs else ""
        usecols = kwargs.get("usecols", "")
        if usecols and isinstance(usecols, str):
            usecols = usecols.split(",")
        if len(usecols):
            if label and label not in usecols:
                usecols.append(label)
            kwargs["usecols"] = usecols
        for f in args:
            if isinstance(f, (dict, list)):
                res = self.parse_json(f, **kwargs)
            else:
                if not (f and FileOps.exists(f)):
                    continue
                res = pd.read_csv(f, **kwargs)
            if self.process_func and callable(self.process_func):
                res = self.process_func(res)
            if label:
                if label not in res.columns:
                    continue
                y = res[label]
                y_data.append(y)
                res.drop(label, axis=1, inplace=True)
            x_data.append(res)
        if not x_data:
            return
        self.x = pd.concat(x_data)
        self.y = pd.concat(y_data)

algorithms

Sedna ships algorithms tuned for edge–cloud settings—for example cross-entropy thresholding to flag low-confidence detections. The goal is not only bundled baselines but an extensible surface for new algorithms that improve end-to-end training and inference.

lib/sedna/algorithms/hard_example_mining/hard_example_mining.py

@ClassFactory.register(ClassType.HEM, alias="CrossEntropy")
class CrossEntropyFilter(BaseFilter, abc.ABC):
    """
    **Object detection** Hard samples discovery methods named `CrossEntropy`

    Parameters
    ----------
    threshold_cross_entropy: float
        hard coefficient threshold score to filter img, default to 0.5.
    """

    def __init__(self, threshold_cross_entropy=0.5, **kwargs):
        self.threshold_cross_entropy = float(threshold_cross_entropy)

    def __call__(self, infer_result=None) -> bool:
        """judge the img is hard sample or not.

        Parameters
        ----------
        infer_result: array_like
            prediction classes list, such as
            [class1-score, class2-score, class2-score,....],
            where class-score is the score corresponding to the class,
            class-score value is in [0,1], who will be ignored if its
            value not in [0,1].

        Returns
        -------
        is hard sample: bool
            `True` means hard sample, `False` means not.
        """

        if not infer_result:
            # if invalid input, return False
            return False

        log_sum = 0.0
        data_check_list = [class_probability for class_probability
                           in infer_result
                           if self.data_check(class_probability)]

        if len(data_check_list) != len(infer_result):
            return False

        for class_data in data_check_list:
            log_sum += class_data * math.log(class_data)
        confidence_score = 1 + 1.0 * log_sum / math.log(
            len(infer_result))
        return confidence_score < self.threshold_cross_entropy

  1. https://www.redhat.com/en/topics/containers/what-is-a-kubernetes-operator
  2. https://developers.redhat.com/articles/2021/06/22/kubernetes-operators-101-part-2-how-operators-work