KubeEdge源码学习1:杂记

本文仅为个人的学习笔记(速记),不具任何参考意义。
主入口,其它主要模式、机制记录。

阅读疑问

注册模块,模块怎么理解?

模块中还有消息源?

发送、接收消息为一类函数
创建消息、创建回复消息为一类函数

主入口

使用cobra框架。cobra在较多golang项目中使用。以“命令”形式添加命令行参数或子命令。其形式可参考docker、k8s、git等等。

多用NewXXX形式函数创建。云边均用server.go作入口源码文件。

云端主函数(cloud/cmd/cloudcore.go):

1
2
3
4
5
6
7
8
9
func main() {
truecommand := app.NewCloudCoreCommand()
truelogs.InitLogs()
truedefer logs.FlushLogs()

trueif err := command.Execute(); err != nil {
truetrueos.Exit(1)
true}
}

创建“命令”(cloud/cmd/cloudcore/app/server.go):

1
2
3
4
5
6
7
8
9
func NewCloudCoreCommand() *cobra.Command {
trueopts := options.NewCloudCoreOptions()
truecmd := &cobra.Command{
...
Run: func(cmd *cobra.Command, args []string) {
config, err := opts.Config() // 配置
registerModules(config) // 注册模块
core.Run() // 运行模块
}

注册的模块:

1
2
3
4
5
6
7
// registerModules register all the modules started in cloudcore
func registerModules(c *v1alpha1.CloudCoreConfig) {
truecloudhub.Register(c.Modules.CloudHub, c.KubeAPIConfig)
trueedgecontroller.Register(c.Modules.EdgeController, c.KubeAPIConfig, "", false)
truedevicecontroller.Register(c.Modules.DeviceController, c.KubeAPIConfig)
truesynccontroller.Register(c.Modules.SyncController, c.KubeAPIConfig)
}

边缘端主函数(edge/cmd/edgecore.go):

1
2
3
4
5
6
7
8
9
func main() {
truecommand := app.NewEdgeCoreCommand()
truelogs.InitLogs()
truedefer logs.FlushLogs()

trueif err := command.Execute(); err != nil {
truetrueos.Exit(1)
true}
}

NewEdgeCoreCommand函数:

1
2
3
4
5
6
7
8
9
10
// NewEdgeCoreCommand create edgecore cmd
func NewEdgeCoreCommand() *cobra.Command {
trueopts := options.NewEdgeCoreOptions()
truecmd := &cobra.Command{
truetrueRun: func(cmd *cobra.Command, args []string) {
config, err := opts.Config()
registerModules(config) // 注册模块
core.Run() // 运行
}
}

注册模块:

1
2
3
4
5
6
7
8
9
10
11
12
13
14

// registerModules register all the modules started in edgecore
func registerModules(c *v1alpha1.EdgeCoreConfig) {
truedevicetwin.Register(c.Modules.DeviceTwin, c.Modules.Edged.HostnameOverride)
trueedged.Register(c.Modules.Edged)
trueedgehub.Register(c.Modules.EdgeHub, c.Modules.Edged.HostnameOverride)
trueeventbus.Register(c.Modules.EventBus, c.Modules.Edged.HostnameOverride)
trueedgemesh.Register(c.Modules.EdgeMesh)
truemetamanager.Register(c.Modules.MetaManager)
trueservicebus.Register(c.Modules.ServiceBus)
truetest.Register(c.Modules.DBTest)
true// Nodte: Need to put it to the end, and wait for all models to register before executing
truedbm.InitDBConfig(c.DataBase.DriverName, c.DataBase.AliasName, c.DataBase.DataSource)
}

其它类似的:

1
2
3
func NewAdmissionCommand() *cobra.Command {
func NewCSIDriverCommand() *cobra.Command {
func NewEdgeSiteCommand() *cobra.Command {

keadm不在此列

初始化参数选项

使用--minconfig--defaultconfig选项,可生成默认信息,默认打印到终端(因为用fmt.Print),需要手动重定向到文件,默认配置文件为/etc/kubeedge/cofig/cloudcore.ymal/etc/kubeedge/cofig/edgecore.ymal,可用--config指定,建议默认。
参数初始化概述:
打印配置,然后退出程序:

1
2
flag.PrintMinConfigAndExitIfRequested(v1alpha1.NewMinCloudCoreConfig())
flag.PrintDefaultConfigAndExitIfRequested(v1alpha1.NewDefaultCloudCoreConfig())

检测是否合法:opts.Validate(),获取配置结构体:opts.Config(),再检测一次:ValidateCloudCoreConfiguration(config)

beehive

云边的各种模块使用beehive框架管理、通信,与阿里的不知有否密切联系。个人理解,一是方便各模块注册、运行,从代码结构上看较清晰(即使不用通信功能)。二是各模块间的通信(使用channel通信,在socket,但未实现)。
使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
创建结构体,字段自定义,但必须有enable字段
type cloudHub struct {
trueenable bool
}

func newCloudHub(enable bool) *cloudHub {
truereturn &cloudHub{
truetrueenable: enable,
true}
}

注册,可初始化本模块其它参数
func Register() {
truehubconfig.InitConfigure(hub, kubeAPIConfig)
truecore.Register(newCloudHub(hub.Enable))
}

// 以下几个必须具备
// 用于通信的类别(本模块名,本模块所属组)
func (a *cloudHub) Name() string {
truereturn "cloudhub"
}

func (a *cloudHub) Group() string {
truereturn "cloudhub"
}

// 使能,之前通过配置文件,当前传参
// Enable indicates whether enable this module
func (a *cloudHub) Enable() bool {
truereturn a.enable
}

// 启动本模块,之前带参数,当前无参数,并且不用实现cleanup函数
func (a *cloudHub) Start() {
...
}

beehive核心机制简要说明:
全局表,模块表modules和disabledModules。Register函数中判断使能,是则加入modules。运行函数中,初始化上下文beehiveContext,再遍历模块表,用go启动协程。退出判断系统中断。

核心运行函数:

1
2
3
4
5
6
7
// Run starts the modules and in the end does module cleanup
func Run() {
true// Address the module registration and start the core
trueStartModules()
true// monitor system signal and shutdown gracefully
trueGracefulShutdown()
}

StartModules函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// StartModules starts modules that are registered
func StartModules() {
// MsgCtxTypeChannel值为channel,目前只支持此类型
truebeehiveContext.InitContext(beehiveContext.MsgCtxTypeChannel)

// 获取所有模块,再用go启动模块的start函数
truemodules := GetModules()
truefor name, module := range modules {
truetrue//Init the module
truetruebeehiveContext.AddModule(name)
truetrue//Assemble typeChannels for sendToGroup
truetruebeehiveContext.AddModuleGroup(name, module.Group())
truetruego module.Start()
truetrueklog.Infof("Starting module %v", name)
true}
}

GracefulShutdown函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// GracefulShutdown is if it gets the special signals it does modules cleanup
func GracefulShutdown() {
truec := make(chan os.Signal)
truesignal.Notify(c, syscall.SIGINT, syscall.SIGHUP, syscall.SIGTERM,
truetruesyscall.SIGQUIT, syscall.SIGILL, syscall.SIGTRAP, syscall.SIGABRT)
trueselect {
truecase s := <-c:
truetrueklog.Infof("Get os signal %v", s.String())
truetrue//Cleanup each modules
truetruebeehiveContext.Cancel()
truetruemodules := GetModules()
truetruefor name, _ := range modules {
truetruetrueklog.Infof("Cleanup module %v", name)
truetruetruebeehiveContext.Cleanup(name)
truetrue}
true}
}

viaduct

通信框架。具体细节暂无研究,边缘端发送消息即通过该框架,如1.2版本自动注册节点。

消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
消息名:ModuleNameEdgeHub

edged 发送:
resource := fmt.Sprintf("%s/%s/%s", e.namespace, model.ResourceTypeNodeStatus, e.nodeName)
nodeInfoMsg := message.BuildMsg(modules.MetaGroup, "", modules.EdgedModuleName, resource, model.InsertOperation, node)
res, err := beehiveContext.SendSync(edgehub.ModuleNameEdgeHub, *nodeInfoMsg, syncMsgRespTimeout)

edgehub 接收
message, err := beehiveContext.Receive(ModuleNameEdgeHub)
发送云端
eh.sendToCloud(message)

===============

消息名:EdgeControllerModuleName
发送:
msg.BuildRouter(constants.EdgeControllerModuleName, constants.GroupResource, resource, operation)
msg.Content = secret
err = dc.messageLayer.Send(*msg)

接收:


其它

常量定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// Constants for database operations and resource type settings
const (
trueInsertOperation = "insert"
trueDeleteOperation = "delete"
trueQueryOperation = "query"
trueUpdateOperation = "update"
trueResponseOperation = "response"
trueResponseErrorOperation = "error"

trueResourceTypePod = "pod"
trueResourceTypeConfigmap = "configmap"
trueResourceTypeSecret = "secret"
trueResourceTypeNode = "node"
trueResourceTypePodlist = "podlist"
trueResourceTypePodStatus = "podstatus"
trueResourceTypeNodeStatus = "nodestatus"
)


// constants for resource types
const (
trueResNode = "node"
trueResMember = "membership"
trueResTwin = "twin"
trueResAuth = "auth_info"
trueResDevice = "device"
)

// constants for resource operations
const (
trueOpGet = "get"
trueOpResult = "get_result"
trueOpList = "list"
trueOpDetail = "detail"
trueOpDelta = "delta"
trueOpDoc = "document"
trueOpUpdate = "updated"
trueOpInsert = "insert"
trueOpDelete = "deleted"
trueOpConnect = "connected"
trueOpDisConnect = "disconnected"
trueOpKeepalive = "keepalive"
)

// constants for message source
const (
trueSrcCloudHub = "cloudhub"
trueSrcEdgeController = "edgecontroller"
trueSrcDeviceController = "devicecontroller"
trueSrcManager = "edgemgr"
)


// constants for identifier information for edge hub
const (
trueProjectID = "project_id"
trueNodeID = "node_id"
)