文章目录
- 1. 概述
- 2. 环境
- 3. 注册
- 4. 核心概念
- 4.1. blob
- 4.2. ingest
- 5. 抽象接口
- 5.1. Manager接口
- 5.2. Provider
- 5.3. IngestManager
- 5.4. Ingester
- 6. 核心实现
- 6.1. Info
- 6.2. Update
- 6.3. Walk
- 6.4. Delete
- 6.5. ReaderAt
- 6.6. Status
- 6.7. ListStatuses
- 6.8. Abort
- 6.9. Writer
- 7. 总结
1. 概述
今天我们来看一下ContentPlugin
,研究一下ContentPlugin
究竟给containerd
提供了什么功能。
虽然下图中标记出来的为ContentService
,但是这篇文章并不分析ContentService
。这是因为ContentService
这个服务的底层依赖就是ContentService
。
我们一起来看一下ContentService
这个服务的注册代码,如下:
// services/content/store.go
func init() {plugin.Register(&plugin.Registration{Type: plugin.ServicePlugin,ID: services.ContentService,Requires: []plugin.Type{plugin.EventPlugin,plugin.MetadataPlugin,},InitFn: func(ic *plugin.InitContext) (interface{}, error) {// 获取元数据插件,其实元数据插件的实现原理就是依赖boltdb来存储KV键值对m, err := ic.Get(plugin.MetadataPlugin)if err != nil {return nil, err}// 获取事件插件,实际上这里获取的就是事件biz层,有点类似于service注入biz层依赖ep, err := ic.Get(plugin.EventPlugin)if err != nil {return nil, err}// 元数据的contentStore实际上就是对于blob的增删改查s, err := newContentStore(m.(*metadata.DB).ContentStore(), ep.(events.Publisher))return s, err},})
}// metadata/plugin/plugin.go
func init() {plugin.Register(&plugin.Registration{Type: plugin.MetadataPlugin,ID: "bolt",Requires: []plugin.Type{plugin.ContentPlugin, // 这里表明MetadataPlugin插件依赖ContentPluginplugin.SnapshotPlugin,},Config: &BoltConfig{ContentSharingPolicy: SharingPolicyShared,},// 省略不重要的代码...}
}
所谓的ContentService
,实际上就是插件类型为ServicePlugin
,且ID
为ContentService
的插件,从上面的注册代码可以看出,这个插件依赖MetadataPlugin
插件的ContentStore
服务,通过debug
源码可以知道,MetadataPlugin
插件的ContentStore
能力就是我们今天需要分析的ContentPlugin
2. 环境
containerd tag
版本:v1.7.2
3. 注册
ContentPlugin
注册代码如下,这里省略了一些不重要的细节。
// services/server/server.go
func LoadPlugins(ctx context.Context, config *srvconfig.Config) ([]*plugin.Registration, error) {// load all plugins into containerd// 如果没有指定插件的位置,那么默认从/var/lib/containerd/plugins目录中加载插件path := config.PluginDirif path == "" {path = filepath.Join(config.Root, "plugins")}// 实际上这里目前是空的,并不会加载任何插件if err := plugin.Load(path); err != nil {return nil, err}// load additional plugins that don't automatically register themselves// TODO content插件究竟干了啥?// 这个插件和content-service插件有何区别?plugin.Register(&plugin.Registration{Type: plugin.ContentPlugin,ID: "content",InitFn: func(ic *plugin.InitContext) (interface{}, error) {// TODO 这里暴露的数据有何作用?ic.Meta.Exports["root"] = ic.Root// 注意,每个插件在初始化的时候都被修改了root目录,规则为:<root>/<plugin-type>.<plugin-id>// 对于content插件来说,root目录为:/var/lib/containerd/io.containerd.content.v1.contentreturn local.NewStore(ic.Root)},})// 省略不重要的代码...}
containerd
的注册代码还是比较简单,注册也仅仅是把插件相关的元信息封装为Registration
,并没有做其它事情。
可以看到,这里注册ContentPlugin
实际上就是实例化local.store
接下来我们看看,local.store
到底提供了什么能力?
4. 核心概念
4.1. blob
起初看代码时,看到这个概念听懵逼的,后来搜索了一下,blob
应该是Binary Large Object
的缩写,也就是二进制大对象。这个概念并不是containerd
发明的,在存储世界中,很早就有的一个概念。
blob
就是数据存储的一种方式,跟对象存储有点类似。在containerd
中,blob
可以理解为镜像层。我们知道,一个镜像是由多层叠加而成的,尤其在下载镜像的时候特别明显。镜像下载完成之后,会被保存到/var/lib/containerd/io.containerd.content.v1.content/blobs
当中
root@containerd:/var/lib/containerd/io.containerd.content.v1.content/blobs# tree
.
└── sha256├── 00a1f6deb2b5d3294cb50e0a59dfc47f67650398d2f0151911e49a56bfd9c355├── 01085d60b3a624c06a7132ff0749efc6e6565d9f2531d7685ff559fb5d0f669f├── 029a81f05585f767fb7549af85a8f24479149e2a73710427a8775593fbe86159├── 05a79c7279f71f86a2a0d05eb72fcb56ea36139150f0a75cd87e80a4272e4e39├── 06212d50621c9654d97e7ec78e972b5017e139b11763375ee4c28bace1fcc087├── 0bbbd1f379fc1f577c5db15c9deac4a218637e4af0196d97e6771f59d9815355
4.2. ingest
ingest
这个概念实际上也挺让人费解的,这个概念在containerd
中也可以理解为镜像层,不过与blob
不同的是,ingest
专指没有下载完成的镜像。所谓的没有下载完成的镜像,就是在镜像下载过程中,由于某些原因,譬如网络,用户执行ctrl + c
,导致镜像下载中断,此时的镜像就会保存在ingest
目录当中
ingest
所对应的镜像,一般会存储在:/var/lib/containerd/io.containerd.content.v1.content/ingest
目录当中。一般情况下,这个目录是空的,但是我们可以通过中断镜像下载来看到ingest
数据。
root@containerd:/var/lib/containerd/io.containerd.content.v1.content/ingest#
root@containerd:/var/lib/containerd/io.containerd.content.v1.content/ingest# ls
root@containerd:/var/lib/containerd/io.containerd.content.v1.content/ingest# tree
.0 directories, 0 files
root@containerd:/var/lib/containerd/io.containerd.content.v1.content/ingest#
root@containerd:/var/lib/containerd/io.containerd.content.v1.content/ingest# nerdctl image rm redis:6.2
FATA[0000] 1 errors:
no such image: redis:6.2
docker.io/library/redis:6.2: resolved |++++++++++++++++++++++++++++++++++++++|
index-sha256:9e75c88539241ad7f61bc9c39ea4913b354064b8a75ca5fc40e1cef41b645bc0: done |++++++++++++++++++++++++++++++++++++++|
manifest-sha256:3b2deb4fdf85229e72229c44bb80c3939e0f93ce93ce8a00cb6b363b0e40b490: done |++++++++++++++++++++++++++++++++++++++|
config-sha256:808c9871bf9dae251c8be67691c3a827742c06f3fb5cf8658568aa7eb0738227: downloading |--------------------------------------| 0.0 B/7.6 KiB
elapsed: 4.2 s total: 3.4 Ki (817.0 B/s)
^C
root@containerd:/var/lib/containerd/io.containerd.content.v1.content/ingest# tree
.
└── 9b372caec3ac7f53c08336ba4b8aba9006b1cf5d56f205fe9f22e828bf9d2ffa├── data├── ref├── startedat├── total└── updatedat1 directory, 5 files
root@containerd:/var/lib/containerd/io.containerd.content.v1.content/ingest#
5. 抽象接口
5.1. Manager接口
Manager
接口封装了对于blob
数据的查看、更新、遍历、删除动作,接口功能很简单,不做过多解释,后面看看具体实现就很简单了
// Manager实际上就是对于镜像层获取信息、修改信息、遍历镜像层以及删除镜像层的封装
type Manager interface {// Info will return metadata about content available in the content store.//// If the content is not present, ErrNotFound will be returned.// 获取摘要所对应的镜像层的大小、创建时间、更新时间、标签信息,dgst相当于镜像层的ID,Info是直接通过读取操作系统中的镜像层文件返回的Info(ctx context.Context, dgst digest.Digest) (Info, error)// Update updates mutable information related to content.// If one or more fieldpaths are provided, only those// fields will be updated.// Mutable fields:// labels.*// 更新镜像层的标签信息 TODO 看起来containerd并没有实现镜像层信息更新Update(ctx context.Context, info Info, fieldpaths ...string) (Info, error)// Walk will call fn for each item in the content store which// match the provided filters. If no filters are given all// items will be walked.// 遍历containerd存储的镜像层,并根据指定的过滤器过滤不满足要求的镜像层,这里的过滤器可以根据摘要、标签或者大小,不过根据源码显示// 根据大小过滤以及根据标签过滤并没有实现Walk(ctx context.Context, fn WalkFunc, filters ...string) error// Delete removes the content from the store.// 根据摘要删除某个镜像层Delete(ctx context.Context, dgst digest.Digest) error
}
5.2. Provider
Provider
封装了对于blob
的写入
// 此接口可以用于读取某镜像层(通过摘要)数据,并且可以指定偏移量
type Provider interface {// ReaderAt only requires desc.Digest to be set.// Other fields in the descriptor may be used internally for resolving// the location of the actual data.ReaderAt(ctx context.Context, desc ocispec.Descriptor) (ReaderAt, error)
}
5.3. IngestManager
IngestManager
接口封装了对于ingest
类型的数据的查看、终端功能。Abort
接口的实现实际上就是删除ingest
的过程,有点没有搞懂为啥这个接口叫做Abort
,而不是Delete
// IngestManager provides methods for managing ingestions. An ingestion is a
// not-yet-complete writing operation initiated using Ingester and identified
// by a ref string.
// 到底如何理解ingest这个概念? 根据注释的含义,实际上就是ingest就是一个还未完成的写操作,这里的写操作肯定是指的镜像的写操作
// IngestManager用于抽象还未完成镜像层的查询、删除操作
type IngestManager interface {// Status returns the status of the provided ref.Status(ctx context.Context, ref string) (Status, error)// ListStatuses returns the status of any active ingestions whose ref match// the provided regular expression. If empty, all active ingestions will be// returned.// 返回所有镜像的信息,并根据过滤器过滤不需要的镜像ListStatuses(ctx context.Context, filters ...string) ([]Status, error)// Abort completely cancels the ingest operation targeted by ref.// 移除镜像所指向的ingest的所有数据Abort(ctx context.Context, ref string) error
}
5.4. Ingester
Ingester
接口抽象了对于ingest
数据的写入。
// Ingester writes content
// ingest的写入接口
type Ingester interface {// Writer initiates a writing operation (aka ingestion). A single ingestion// is uniquely identified by its ref, provided using a WithRef option.// Writer can be called multiple times with the same ref to access the same// ingestion.// Once all the data is written, use Writer.Commit to complete the ingestion.Writer(ctx context.Context, opts ...WriterOpt) (Writer, error)
}
6. 核心实现
6.1. Info
Info
接口主要是根据摘要信息,读取/var/lib/containerd/io.containerd.content.v1.content/blobs
目录下的blob
,其中包含blob
的大小、创建时间以及更新时间。
// Info Content服务实现Info非常简单,就是根据摘要信息拼接出这个摘要对应的镜像层的位置,然后当成一个普通文件读取其大小、创建时间、更新时间等
// Info接口用于根据摘要镜像层的信息,其实就是查看的二进制文件信息,在containerd中被称为blob
func (s *store) Info(ctx context.Context, dgst digest.Digest) (content.Info, error) {// blob为binary large object的缩写,也就是二进制形式的大对象// blob的概念可以参考这个连接:https://www.cloudflare.com/zh-cn/learning/cloud/what-is-blob-storage/// 这里实现的逻辑很简单,就是根据摘要信息拼接处此摘要指向的镜像层的路径,目录为:/var/lib/containerd/io.containerd.content.v1.content/blobs/sha256/<digest>p, err := s.blobPath(dgst)if err != nil {return content.Info{}, fmt.Errorf("calculating blob info path: %w", err)}// 判断这个摘要对应的镜像层是否存在,毕竟在操作系统中,bolb就是一个普通文件而已,还是有可能被用户删除的fi, err := os.Stat(p)if err != nil {if os.IsNotExist(err) {err = fmt.Errorf("content %v: %w", dgst, errdefs.ErrNotFound)}return content.Info{}, err}var labels map[string]stringif s.ls != nil {labels, err = s.ls.Get(dgst)if err != nil {return content.Info{}, err}}// 直接读取操作系统中文件的大小、修改时间、创建时间等等return s.info(dgst, fi, labels), nil
}
6.2. Update
Update
接口用于更新blob
的标签以及更新时间
// Update 用于更新镜像层的标签信息,TODO 看起来containerd并没有实现镜像层信息更新
// 根据摘要更新镜像层的信息,镜像层其实就是一个二进制文件,在containerd中被称为blob。
func (s *store) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) {// 如果没有初始化标签存储器,肯定是不能更改的if s.ls == nil {return content.Info{}, fmt.Errorf("update not supported on immutable content store: %w", errdefs.ErrFailedPrecondition)}// 获取镜像层的存储路径:/var/lib/containerd/io.containerd.content.v1.content/blobs/sha256/<digest>p, err := s.blobPath(info.Digest)if err != nil {return content.Info{}, fmt.Errorf("calculating blob path for update: %w", err)}// 判断镜像层是否存在fi, err := os.Stat(p)if err != nil {if os.IsNotExist(err) {err = fmt.Errorf("content %v: %w", info.Digest, errdefs.ErrNotFound)}return content.Info{}, err}var (all boollabels map[string]string)if len(fieldpaths) > 0 {for _, path := range fieldpaths {if strings.HasPrefix(path, "labels.") {if labels == nil {labels = map[string]string{}}key := strings.TrimPrefix(path, "labels.")labels[key] = info.Labels[key]continue}switch path {case "labels":all = truelabels = info.Labelsdefault:return content.Info{}, fmt.Errorf("cannot update %q field on content info %q: %w", path, info.Digest, errdefs.ErrInvalidArgument)}}} else {all = truelabels = info.Labels}if all {err = s.ls.Set(info.Digest, labels)} else {labels, err = s.ls.Update(info.Digest, labels)}if err != nil {return content.Info{}, err}info = s.info(info.Digest, fi, labels)info.UpdatedAt = time.Now()if err := os.Chtimes(p, info.UpdatedAt, info.CreatedAt); err != nil {log.G(ctx).WithError(err).Warnf("could not change access time for %s", info.Digest)}return info, nil
}
6.3. Walk
Walk
接口实现的功能很简单,写过golnag
遍历目录的同学应该不会感到陌生。此接口会遍历/var/lib/containerd/io.containerd.content.v1.content/blobs
,同时根据过滤器筛选出满足条件的blob
,然后调用用户传递的fn
函数。
// Walk 遍历containerd当前所有的镜像层,镜像层其实就是一个二进制文件,在containerd中被称为blob。
// 同时,如果制定了过滤器,那就按照指定的过滤器遍历符合条件的镜像层
func (s *store) Walk(ctx context.Context, fn content.WalkFunc, fs ...string) error {// 获取blob对象的存储路径:/var/lib/containerd/io.containerd.content.v1.content/blobsroot := filepath.Join(s.root, "blobs")filter, err := filters.ParseAll(fs...)if err != nil {return err}var alg digest.Algorithm// 中规中矩的遍历目录return filepath.Walk(root, func(path string, fi os.FileInfo, err error) error {if err != nil {return err}// 如果当前镜像层的算法不可用,直接退出if !fi.IsDir() && !alg.Available() {return nil}// TODO(stevvooe): There are few more cases with subdirs that should be// handled in case the layout gets corrupted. This isn't strict enough// and may spew bad data.// 忽略根目录if path == root {return nil}if filepath.Dir(path) == root {alg = digest.Algorithm(filepath.Base(path))if !alg.Available() {alg = ""return filepath.SkipDir}// descending into a hash directoryreturn nil}dgst := digest.NewDigestFromEncoded(alg, filepath.Base(path))if err := dgst.Validate(); err != nil {// log error but don't reportlog.L.WithError(err).WithField("path", path).Error("invalid digest for blob path")// if we see this, it could mean some sort of corruption of the// store or extra paths not expected previously.}var labels map[string]stringif s.ls != nil {labels, err = s.ls.Get(dgst)if err != nil {return err}}info := s.info(dgst, fi, labels)if !filter.Match(content.AdaptInfo(info)) {return nil}return fn(info)})
}
6.4. Delete
Delete
接口用于根据摘要信息删除blob
// Delete removes a blob by its digest.
//
// While this is safe to do concurrently, safe exist-removal logic must hold
// some global lock on the store.
// 根据摘要删除镜像层,镜像层其实就是一个二进制文件,在containerd中被称为blob
func (s *store) Delete(ctx context.Context, dgst digest.Digest) error {// 找到镜像层的存储路径:/var/lib/containerd/io.containerd.content.v1.content/blobs/sha256/<digest>bp, err := s.blobPath(dgst)if err != nil {return fmt.Errorf("calculating blob path for delete: %w", err)}// 删除文件if err := os.RemoveAll(bp); err != nil {if !os.IsNotExist(err) {return err}return fmt.Errorf("content %v: %w", dgst, errdefs.ErrNotFound)}return nil
}
6.5. ReaderAt
ReaderAt
接口也非常简单,使用过file.ReaderAt
同学会比较熟悉。store.ReaderAt
接口返回的contentReaderAt
实际上就是对于blob
的读取。
// ReaderAt returns an io.ReaderAt for the blob.
// ReaderAt方法用于根据摘要读取镜像层的信息,其实就是读取blob文件(可以理解为镜像层就是一个二进制文件,在containerd中被称为blob)
func (s *store) ReaderAt(ctx context.Context, desc ocispec.Descriptor) (content.ReaderAt, error) {// 拼接出当前摘要所指向的镜像层的路径:/var/lib/containerd/io.containerd.content.v1.content/blobs/sha256/<digest>p, err := s.blobPath(desc.Digest)if err != nil {return nil, fmt.Errorf("calculating blob path for ReaderAt: %w", err)}reader, err := OpenReader(p)if err != nil {return nil, fmt.Errorf("blob %s expected at %s: %w", desc.Digest, p, err)}return reader, nil
}
6.6. Status
Status
接口用于读取/var/lib/containerd/io.containerd.content.v1.content/ingest
目录中的文件信息
// Status 实际上就是通过镜像的信息
// 根据镜像名读取ingest信息
func (s *store) Status(ctx context.Context, ref string) (content.Status, error) {return s.status(s.ingestRoot(ref))
}// status works like stat above except uses the path to the ingest.
func (s *store) status(ingestPath string) (content.Status, error) {dp := filepath.Join(ingestPath, "data")fi, err := os.Stat(dp)if err != nil {if os.IsNotExist(err) {err = fmt.Errorf("%s: %w", err.Error(), errdefs.ErrNotFound)}return content.Status{}, err}ref, err := readFileString(filepath.Join(ingestPath, "ref"))if err != nil {if os.IsNotExist(err) {err = fmt.Errorf("%s: %w", err.Error(), errdefs.ErrNotFound)}return content.Status{}, err}startedAt, err := readFileTimestamp(filepath.Join(ingestPath, "startedat"))if err != nil {return content.Status{}, fmt.Errorf("could not read startedat: %w", err)}updatedAt, err := readFileTimestamp(filepath.Join(ingestPath, "updatedat"))if err != nil {return content.Status{}, fmt.Errorf("could not read updatedat: %w", err)}// because we don't write updatedat on every write, the mod time may// actually be more up to date.if fi.ModTime().After(updatedAt) {updatedAt = fi.ModTime()}return content.Status{Ref: ref,Offset: fi.Size(),Total: s.total(ingestPath),UpdatedAt: updatedAt,StartedAt: startedAt,}, nil
}
6.7. ListStatuses
同Status
接口一样,不过此接口是返回的是一个数组,并且调用方可以指定过滤器。
// ListStatuses 遍历containerd所包含的所有镜像的ingest信息
func (s *store) ListStatuses(ctx context.Context, fs ...string) ([]content.Status, error) {fp, err := os.Open(filepath.Join(s.root, "ingest"))if err != nil {return nil, err}defer fp.Close()fis, err := fp.Readdir(-1)if err != nil {return nil, err}filter, err := filters.ParseAll(fs...)if err != nil {return nil, err}var active []content.Statusfor _, fi := range fis {p := filepath.Join(s.root, "ingest", fi.Name())stat, err := s.status(p)if err != nil {if !os.IsNotExist(err) {return nil, err}// TODO(stevvooe): This is a common error if uploads are being// completed while making this listing. Need to consider taking a// lock on the whole store to coordinate this aspect.//// Another option is to cleanup downloads asynchronously and// coordinate this method with the cleanup process.//// For now, we just skip them, as they really don't exist.continue}if filter.Match(adaptStatus(stat)) {active = append(active, stat)}}return active, nil
}
6.8. Abort
Abort
接口实际上就是根据ref
删除ingest
// Abort an active transaction keyed by ref. If the ingest is active, it will
// be cancelled. Any resources associated with the ingest will be cleaned.
// 移除镜像所指向的ingest的所有数据
func (s *store) Abort(ctx context.Context, ref string) error {// 获取镜像的ingest路径:/var/lib/containerd/io.containerd.content.v1.content/ingest/<digest>root := s.ingestRoot(ref)if err := os.RemoveAll(root); err != nil {if os.IsNotExist(err) {return fmt.Errorf("ingest ref %q: %w", ref, errdefs.ErrNotFound)}return err}return nil
}
6.9. Writer
ingest
数据写入接口
// Writer begins or resumes the active writer identified by ref. If the writer
// is already in use, an error is returned. Only one writer may be in use per
// ref at a time.
//
// The argument `ref` is used to uniquely identify a long-lived writer transaction.
// 用于生成ingest文件
func (s *store) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) {var wOpts content.WriterOptsfor _, opt := range opts {if err := opt(&wOpts); err != nil {return nil, err}}// TODO(AkihiroSuda): we could create a random string or one calculated based on the context// https://github.com/containerd/containerd/issues/2129#issuecomment-380255019if wOpts.Ref == "" {return nil, fmt.Errorf("ref must not be empty: %w", errdefs.ErrInvalidArgument)}var lockErr error// 要想写入这个ingest文件,首先必须锁住这个文件,否则其他人可能会对这个文件进行读写for count := uint64(0); count < 10; count++ {if err := tryLock(wOpts.Ref); err != nil {if !errdefs.IsUnavailable(err) {return nil, err}lockErr = err} else {lockErr = nilbreak}time.Sleep(time.Millisecond * time.Duration(randutil.Intn(1<<count)))}if lockErr != nil {return nil, lockErr}w, err := s.writer(ctx, wOpts.Ref, wOpts.Desc.Size, wOpts.Desc.Digest)if err != nil {unlock(wOpts.Ref)return nil, err}return w, nil // lock is now held by w.
}
7. 总结
一个系统中,数据是非常核心的。因此数据的持久化更是非常重要的,containerd
中的数据主要有镜像、容器、快照、事件、checkpointer
、任务、Lease
、沙箱等等,这些数据有些是通过文件的方式存储的,譬如镜像,而有些则是通过KV
数据库保存的,譬如容器、事件、Lease
等等
ContentPlugin
是containerd
非常核心的一个组件,ContentPlugin
插件会把镜像的每一层以blob
的形式保存在/var/lib/containerd/io.containerd.content.v1.content/blobs
目录当中;如果镜像在下载过程中断,会把镜像保存在/var/lib/containerd/io.containerd.content.v1.content/ingest
目录当中