1. 环境
containerd
版本:v1.7.2
,containerd debug搭建教程链接- 操作系统:
Ubuntu22.04
2. 初始化流程
containerd
的入口为:cmd/containerd/main.go
,如下:
func main() {// TODO 实例化containerdapp := command.App()// 实际上这里的app是一个命令行工具封装的,app.Run的运行也是固定的,主要是为了执行app.Action,所以只需要重点分析app.Action干了啥if err := app.Run(os.Args); err != nil {fmt.Fprintf(os.Stderr, "containerd: %s\n", err)os.Exit(1)}
}
这里的App
实际上一个命令行工具的封装,这里执行app.Run
的时候实际上执行的时App
的Action
方法,因此我们需要关心的是containerd
是如何实现这个Action
的
不过,话说回来,在如今cobra
大行其道的时候,containerd
居然会使用urfave
这个命令行工具,这个工具相比于cobra
有何优略势以后倒是可以研究下。
containerd
实现的App
有很多细节我们并不需要关心,这里我们重点关心containerd
是如何实现Action
方法的,毕竟,containerd
开始运行后,第一时间就是执行Action
方法
func App() *cli.App {... // 省略不需要太关心的代码app.Action = func(context *cli.Context) error {var (start = time.Now()signals = make(chan os.Signal, 2048)serverC = make(chan *server.Server, 1)ctx, cancel = gocontext.WithCancel(gocontext.Background())config = defaultConfig())defer cancel()// Only try to load the config if it either exists, or the user explicitly// told us to load this6 path.configPath := context.GlobalString("config") // 获取配置文件路径_, err := os.Stat(configPath)if !os.IsNotExist(err) || context.GlobalIsSet("config") {if err := srvconfig.LoadConfig(configPath, config); err != nil {return err}}// Apply flags to the config 解析/etc/containerd/config.toml配置文件到config对象当中if err := applyFlags(context, config); err != nil {return err}if config.GRPC.Address == "" {return fmt.Errorf("grpc address cannot be empty: %w", errdefs.ErrInvalidArgument)}if config.TTRPC.Address == "" {// If TTRPC was not explicitly configured, use defaults based on GRPC.config.TTRPC.Address = config.GRPC.Address + ".ttrpc"config.TTRPC.UID = config.GRPC.UIDconfig.TTRPC.GID = config.GRPC.GID}// Make sure top-level directories are created early. 确保一些目录必须存在if err := server.CreateTopLevelDirectories(config); err != nil {return err}// Stop if we are registering or unregistering against Windows SCM. 仅和Windows有关stop, err := registerUnregisterService(config.Root)if err != nil {logrus.Fatal(err)}if stop {return nil}done := handleSignals(ctx, signals, serverC, cancel) // 处理退出信号// start the signal handler as soon as we can to make sure that// we don't miss any signals during bootsignal.Notify(signals, handledSignals...)// cleanup temp mountsif err := mount.SetTempMountLocation(filepath.Join(config.Root, "tmpmounts")); err != nil {return fmt.Errorf("creating temp mount location: %w", err)}// unmount all temp mounts on boot for the serverwarnings, err := mount.CleanupTempMounts(0)if err != nil {log.G(ctx).WithError(err).Error("unmounting temp mounts")}for _, w := range warnings {log.G(ctx).WithError(w).Warn("cleanup temp mount")}log.G(ctx).WithFields(log.Fields{"version": version.Version,"revision": version.Revision,}).Info("starting containerd")type srvResp struct {s *server.Servererr error}// run server initialization in a goroutine so we don't end up blocking important things like SIGTERM handling// while the server is initializing.// As an example, opening the bolt database blocks forever if a containerd instance// is already running, which must then be forcibly terminated (SIGKILL) to recover.chsrv := make(chan srvResp)go func() {defer close(chsrv)// TODO 这里干了啥?server, err := server.New(ctx, config)if err != nil {select {case chsrv <- srvResp{err: err}:case <-ctx.Done():}return}// Launch as a Windows Service if necessary 这里主要是在适配windows,直接忽略if err := launchService(server, done); err != nil {logrus.Fatal(err)}select {case <-ctx.Done():server.Stop()case chsrv <- srvResp{s: server}:}}()var server *server.Serverselect { // 等待Containerd Server初始化完成case <-ctx.Done():return ctx.Err()case r := <-chsrv:if r.err != nil {return r.err}server = r.s}// We don't send the server down serverC directly in the goroutine above because we need it lower down.select { // TODO 这里为啥这么写,没看懂上面的注释case <-ctx.Done():return ctx.Err()case serverC <- server:}// 开启containerd的debug功能,开启后可以通过/debug/vars, /debug/pprof这样的URL查看containerd部分数据if config.Debug.Address != "" {var l net.Listenerif isLocalAddress(config.Debug.Address) {if l, err = sys.GetLocalListener(config.Debug.Address, config.Debug.UID, config.Debug.GID); err != nil {return fmt.Errorf("failed to get listener for debug endpoint: %w", err)}} else {if l, err = net.Listen("tcp", config.Debug.Address); err != nil {return fmt.Errorf("failed to get listener for debug endpoint: %w", err)}}serve(ctx, l, server.ServeDebug)}// containerd的指数据if config.Metrics.Address != "" {l, err := net.Listen("tcp", config.Metrics.Address)if err != nil {return fmt.Errorf("failed to get listener for metrics endpoint: %w", err)}serve(ctx, l, server.ServeMetrics)}// setup the ttrpc endpoint 创建containerd.sock.ttrpc文件tl, err := sys.GetLocalListener(config.TTRPC.Address, config.TTRPC.UID, config.TTRPC.GID)if err != nil {return fmt.Errorf("failed to get listener for main ttrpc endpoint: %w", err)}serve(ctx, tl, server.ServeTTRPC)if config.GRPC.TCPAddress != "" {l, err := net.Listen("tcp", config.GRPC.TCPAddress)if err != nil {return fmt.Errorf("failed to get listener for TCP grpc endpoint: %w", err)}serve(ctx, l, server.ServeTCP)}// setup the main grpc endpoint 创建container.sock文件l, err := sys.GetLocalListener(config.GRPC.Address, config.GRPC.UID, config.GRPC.GID)if err != nil {return fmt.Errorf("failed to get listener for main endpoint: %w", err)}serve(ctx, l, server.ServeGRPC)readyC := make(chan struct{})go func() {server.Wait()close(readyC)}()select {case <-readyC:if err := notifyReady(ctx); err != nil {log.G(ctx).WithError(err).Warn("notify ready failed")}// containerd成功启动log.G(ctx).Infof("containerd successfully booted in %fs", time.Since(start).Seconds())<-donecase <-done:}return nil}return app
}
如上所示,这里我们忽略的一些无关紧要的代码,重点关心Action
的实现。
通过分析,我们发现,Action
方法主要是做了如下一些操作:
- 1、加载
containerd
的配置,并校验某些配置的值,如果没有指定containerd
配置文件的位置,那么containerd
默认的配置文件为/etc/containerd/config.toml
- 2、创建
containerd
的root
目录以及state
目录;实际上,所谓的root
目录,指的是containerd
保存元数据的位置,譬如镜像、运行时数据、快照等等,默认root
目录就是/var/lib/containerd
。而所谓的state
目录则是存放containerd socket
文件的目录,该目录的默认值为:/run/containerd
- 3、监听
SIGPIPE, SIGUSR1, SIGTERM, SIGINT
信号 - 4、清理临时目录
- 5、实例化
containerd server
,这个就是我们的重点,稍后我们着重分析 - 6、根据配置暴露
debug
接口,开启后可以通过/debug/vars, /debug/pprof这样的URL查看containerd部分数据 - 7、根据配置暴露
metric
指标 - 8、运行
GRPC, TCP, TTRPC
服务
解析来我们继续分析containerd server
是如何实例化的。
func New(ctx context.Context, config *srvconfig.Config) (*Server, error) {// 主要是为了设置OOM参数以及Cgroupif err := apply(ctx, config); err != nil {return nil, err}// 设置超时参数,这里使用一个Map来保存for key, sec := range config.Timeouts {d, err := time.ParseDuration(sec)if err != nil {return nil, fmt.Errorf("unable to parse %s into a time duration", sec)}timeout.Set(key, d)}// TODO 加载插件plugins, err := LoadPlugins(ctx, config)if err != nil {return nil, err}// TODO StreamProcessor是啥玩意?for id, p := range config.StreamProcessors {diff.RegisterProcessor(diff.BinaryHandler(id, p.Returns, p.Accepts, p.Path, p.Args, p.Env))}// TODO 增加了GRPC Server Option参数serverOpts := []grpc.ServerOption{grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(otelgrpc.StreamServerInterceptor(),grpc_prometheus.StreamServerInterceptor,streamNamespaceInterceptor,)),grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(otelgrpc.UnaryServerInterceptor(),grpc_prometheus.UnaryServerInterceptor,unaryNamespaceInterceptor,)),}// 设置GRPC可以消息的最大阈值if config.GRPC.MaxRecvMsgSize > 0 {serverOpts = append(serverOpts, grpc.MaxRecvMsgSize(config.GRPC.MaxRecvMsgSize))}// 设置GRPC发送消息的最大阈值if config.GRPC.MaxSendMsgSize > 0 {serverOpts = append(serverOpts, grpc.MaxSendMsgSize(config.GRPC.MaxSendMsgSize))}// 实例化TTRPCServer,所谓的TTRPC,实际上就设置GRPC ober TLSttrpcServer, err := newTTRPCServer()if err != nil {return nil, err}tcpServerOpts := serverOpts// 设置TLS证书if config.GRPC.TCPTLSCert != "" {log.G(ctx).Info("setting up tls on tcp GRPC services...")tlsCert, err := tls.LoadX509KeyPair(config.GRPC.TCPTLSCert, config.GRPC.TCPTLSKey)if err != nil {return nil, err}tlsConfig := &tls.Config{Certificates: []tls.Certificate{tlsCert}}if config.GRPC.TCPTLSCA != "" {caCertPool := x509.NewCertPool()caCert, err := os.ReadFile(config.GRPC.TCPTLSCA)if err != nil {return nil, fmt.Errorf("failed to load CA file: %w", err)}caCertPool.AppendCertsFromPEM(caCert)tlsConfig.ClientCAs = caCertPooltlsConfig.ClientAuth = tls.RequireAndVerifyClientCert}tcpServerOpts = append(tcpServerOpts, grpc.Creds(credentials.NewTLS(tlsConfig)))}// grpcService allows GRPC services to be registered with the underlying servertype grpcService interface {Register(*grpc.Server) error}// tcpService allows GRPC services to be registered with the underlying tcp servertype tcpService interface {RegisterTCP(*grpc.Server) error}// ttrpcService allows TTRPC services to be registered with the underlying servertype ttrpcService interface {RegisterTTRPC(*ttrpc.Server) error}var (grpcServer = grpc.NewServer(serverOpts...)tcpServer = grpc.NewServer(tcpServerOpts...)grpcServices []grpcServicetcpServices []tcpServicettrpcServices []ttrpcServices = &Server{grpcServer: grpcServer,tcpServer: tcpServer,ttrpcServer: ttrpcServer,config: config,}// TODO: Remove this in 2.0 and let event plugin crease itevents = exchange.NewExchange()initialized = plugin.NewPluginSet()required = make(map[string]struct{}))for _, r := range config.RequiredPlugins {required[r] = struct{}{}}for _, p := range plugins {id := p.URI()reqID := idif config.GetVersion() == 1 {reqID = p.ID}log.G(ctx).WithField("type", p.Type).Infof("loading plugin %q...", id)initContext := plugin.NewContext(ctx,p,initialized,config.Root,config.State,)initContext.Events = eventsinitContext.Address = config.GRPC.AddressinitContext.TTRPCAddress = config.TTRPC.AddressinitContext.RegisterReadiness = s.RegisterReadiness// load the plugin specific configuration if it is providedif p.Config != nil {// 反序列化当前插件的配置pc, err := config.Decode(p)if err != nil {return nil, err}initContext.Config = pc}// 执行插件的InitFn函数,并实例化插件实体result := p.Init(initContext)if err := initialized.Add(result); err != nil {return nil, fmt.Errorf("could not add plugin result to plugin set: %w", err)}// 获取实例化的插件实体,并且获取实例化插件实体时的错误instance, err := result.Instance()if err != nil {if plugin.IsSkipPlugin(err) {log.G(ctx).WithError(err).WithField("type", p.Type).Infof("skip loading plugin %q...", id)} else {log.G(ctx).WithError(err).Warnf("failed to load plugin %s", id)}if _, ok := required[reqID]; ok {return nil, fmt.Errorf("load required plugin %s: %w", id, err)}continue}// 每删除一个插件,都需要从required中删除此插件delete(required, reqID)// check for grpc services that should be registered with the serverif src, ok := instance.(grpcService); ok {grpcServices = append(grpcServices, src)}if src, ok := instance.(ttrpcService); ok {ttrpcServices = append(ttrpcServices, src)}if service, ok := instance.(tcpService); ok {tcpServices = append(tcpServices, service)}s.plugins = append(s.plugins, result)}// 如果插件加载完成,但是还有必要的插件没有加载,那么只能退出containerd的初始化if len(required) != 0 {var missing []stringfor id := range required {missing = append(missing, id)}return nil, fmt.Errorf("required plugin %s not included", missing)}// register services after all plugins have been initialized// 注册服务for _, service := range grpcServices {if err := service.Register(grpcServer); err != nil {return nil, err}}for _, service := range ttrpcServices {if err := service.RegisterTTRPC(ttrpcServer); err != nil {return nil, err}}for _, service := range tcpServices {if err := service.RegisterTCP(tcpServer); err != nil {return nil, err}}return s, nil
}
以上代码就是containerd server
初始化逻辑,主要做了这么几个事情:
- 1、根据
containerd
的配置,设置OOM
以及Cgroup
参数 - 2、设置超时参数,主要设置了
io.containerd.timeout.task.state = 2s
io.containerd.timeout.bolt.open = 0s
io.containerd.timeout.metrics.shimstats = 2s
io.containerd.timeout.shim.cleanup = 5s
io.containerd.timeout.shim.load = 5s
io.containerd.timeout.shim.shutdown = 3s
- 3、加载插件
- 其一是动态加载
plugin_dir
目录中包含的插件,实际上追踪进去你会发现,注释会提示containerd 1.8
以前都不会支持动态加载插件,估计这个特性还在开发当中。 - 其二是加载
content
插件,这个插件的具体作用我们以后会分析,看了containerd
的同学估计会对这个插件有点印象,擦测这个插件是实现ContentService
的关键,以后在分析 - 其三是加载代理插件,
containerd
的代理插件具体作用不得而知,以后在分析吧,今天我们先看个整体流程,毕竟我也是初学者。 - 实际上通过
IDEA debug
源码的时候,你会发现,containerd
最终会注册50个
插件。然鹅,在debug
的时候根本就没有看到注册的代码,最终跟踪下来,你会发现,这些插件除了content
插件,其余的插件都是各个插件在自己的Init
函数当汇总注册的,containerd
已启动的时候就会注册这些插件
- 其一是动态加载
- 4、处理
stream process
配置,这玩意具体作用现在我也不知道,后续在分析吧。 - 5、根据之前注册的插件根据插件的配置实例化插件,如果有任何必须的插件没有初始化,就认为
containerd
初始化失败 - 6、注册服务
以上就是containerd
的总体初始化流程,今天只是看了一个大概,其中还有很多不懂的地方,后续我们再各个击破。