提到事务必谈 ACID
特性, 基于悲观锁的实现会有读写冲突问题,性能很低,为了解决这个问题,主流数据库大多采用版本控制 mvcc[1] 技术,比如 oracle, mysql, postgresql 等等。读可以不加锁,只需要读历史版本即可 (写写还是冲突). 根据事务能看到不同版本的数据,还产生了隔离级别的问题,比如 mysql 默认的 repeatable-read
, oracle 默认的 read-commited
. 本文暂时只讲 mvcc
, 隔离实现放到下文。
mvcc 不同数据库实现也不同,mysql 原地更新数据,将多版本保存到 undo, 而 postgresql 直接插入不同版本数据,过期的数据由 vacuum
来删除。etcd 的实现类似 pg, 本次分享看一下 etcd 的实现原理。
Revision
可以先阅读我的文章 etcd 中让人头大的 version, revision, createRevision, modRevision[2] 来了解下几个版本的概念。
type revision struct {
// main is the main revision of a set of changes that happen atomically.
main int64
// sub is the sub revision of a change in a set of changes that happen
// atomically. Each change has different increasing sub revision in that
// set.
sub int64
}
main
是版本 id, 逻辑时间戳全局递增。sub
表示当前事务内操作 changes 的顺序 id, 从 0 开始递增。
静态存储
etcd 的 mvcc 数据存储分两部分:内存保存所有 key 对应的版本信息,用于快速范围查询与点查,而磁盘存储所有不同版本的真实数据。
kvindex btree
内存数据由 btree 来维护,从图上可以看到,key 是用户真实的 key, value 是对应所有的版本信息。
type keyIndex struct {
key []byte
modified revision // the main rev of the last modification
generations []generation
}
// generation contains multiple revisions of a key.
type generation struct {
ver int64
created revision // when the generation is created (put in first revision).
revs []revision
}
keyIndex
保存 key 的所有版本信息,每删除一次都会生成一个 generation
, 每个 generation
保存了这个生命周期内从创建到删除中间的所有版本号。
磁盘 boltdb
磁盘负责存储所有数据,key 是 revision
, value 是 mvccpb.KeyValue
, 存储引擎是 boltdb
type KeyValue struct {
// key is the key in bytes. An empty key is not allowed.
Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
// create_revision is the revision of last creation on this key.
CreateRevision int64 `protobuf:"varint,2,opt,name=create_revision,json=createRevision,proto3" json:"create_revision,omitempty"`
// mod_revision is the revision of last modification on this key.
ModRevision int64 `protobuf:"varint,3,opt,name=mod_revision,json=modRevision,proto3" json:"mod_revision,omitempty"`
// version is the version of the key. A deletion resets
// the version to zero and any modification of the key
// increases its version.
Version int64 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"`
// value is the value held by the key, in bytes.
Value []byte `protobuf:"bytes,5,opt,name=value,proto3" json:"value,omitempty"`
// lease is the ID of the lease that attached to key.
// When the attached lease expires, the key will be deleted.
// If lease is 0, then no lease is attached to the key.
Lease int64 `protobuf:"varint,6,opt,name=lease,proto3" json:"lease,omitempty"`
}
mvccpb.KeyValue
存储本次操作的 key, value, 还有相关的所有版本信息。
Range 查找
每次数据操作,都会在 etcdserver 层开启一个事务 txn, Range 操作是 Read 读事务,然后调用 txn 的 Range 方法,直接看 mvcc
目录下 kvstore_txn.go 文件的实现。
func (tr *storeTxnRead) Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
return tr.rangeKeys(key, end, tr.Rev(), ro)
}
func (tr *storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (*RangeResult, error) {
rev := ro.Rev
if rev > curRev {
return &RangeResult{KVs: nil, Count: -1, Rev: curRev}, ErrFutureRev
}
if rev <= 0 {
rev = curRev
}
if rev return &RangeResult{KVs: nil, Count: -1, Rev: 0}, ErrCompacted
}
revpairs := tr.s.kvindex.Revisions(key, end, rev)
......
kvs := make([]mvccpb.KeyValue, limit)
revBytes := newRevBytes()
for i, revpair := range revpairs[:len(kvs)] {
revToBytes(revpair, revBytes)
_, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
......
if err := kvs[i].Unmarshal(vs[0]); err != nil {
......
}
}
tr.trace.Step("range keys from bolt db")
return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
}
省略部份无关代码,直接看主干部份
- 检查所查找的 rev 版本是否有效,超过当前版本不行,被 compact 删除的也不行
- 根据指定版本去 kvindex 即内存 btree 中查找,所有符合 rev 版本从 key 到 end 的版本信息
- 遍历所有版本,
UnsafeRange
去底层磁盘 boltdb 中获取真实 key/value
Put 更新数据
etcdserver 层同样要开启事务,只不过是写事务。然后实现直接看 mvcc
目录下 kvstore_txn.go
func (tw *storeTxnWrite) put(key, value []byte, leaseID lease.LeaseID) {
rev := tw.beginRev + 1
c := rev
oldLease := lease.NoLease
// if the key exists before, use its previous created and
// get its previous leaseID
_, created, ver, err := tw.s.kvindex.Get(key, rev)
if err == nil {
c = created.main
oldLease = tw.s.le.GetLease(lease.LeaseItem{Key: string(key)})
}
tw.trace.Step("get key's previous created_revision and leaseID")
ibytes := newRevBytes()
idxRev := revision{main: rev, sub: int64(len(tw.changes))}
revToBytes(idxRev, ibytes)
ver = ver + 1
kv := mvccpb.KeyValue{
Key: key,
Value: value,
CreateRevision: c,
ModRevision: rev,
Version: ver,
Lease: int64(leaseID),
}
d, err := kv.Marshal()
......
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
tw.s.kvindex.Put(key, idxRev)
tw.changes = append(tw.changes, kv)
tw.trace.Step("store kv pair into bolt db")
......
}
省去不太相关的 lease 操作,只看主干逻辑
- 根据当前版本 key, rev 查找内存 kvindex, 看看是否有当前 key 的版本记录。主要是获取这个 key 当前的
createdRevision
与Version
- 生成
mvccpb.KeyValue
信息,主要是确定这次操作的 ModRevision UnsafeSeqPut
操作写磁盘 boltdb, key 是Revision
, value 是mvccpb.KeyValue
序列化后的数据- 最后更新 kvindex btree
Delete 删除
同样需要开启写事务,直接看源码
func (tw *storeTxnWrite) DeleteRange(key, end []byte) (int64, int64) {
if n := tw.deleteRange(key, end); n != 0 || len(tw.changes) > 0 {
return n, tw.beginRev + 1
}
return 0, tw.beginRev
}
func (tw *storeTxnWrite) deleteRange(key, end []byte) int64 {
rrev := tw.beginRev
if len(tw.changes) > 0 {
rrev++
}
keys, _ := tw.s.kvindex.Range(key, end, rrev)
if len(keys) == 0 {
return 0
}
for _, key := range keys {
tw.delete(key)
}
return int64(len(keys))
}
同样需要先查找内存 kvindex, 找到所有符合的待删除版本,然后调用 delete
去删
func (tw *storeTxnWrite) delete(key []byte) {
ibytes := newRevBytes()
idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
revToBytes(idxRev, ibytes)
if tw.storeTxnRead.s != nil && tw.storeTxnRead.s.lg != nil {
ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
} else {
// TODO: remove this in v3.5
ibytes = appendMarkTombstone(nil, ibytes)
}
kv := mvccpb.KeyValue{Key: key}
d, err := kv.Marshal()
if err != nil {
if tw.storeTxnRead.s.lg != nil {
tw.storeTxnRead.s.lg.Fatal(
"failed to marshal mvccpb.KeyValue",
zap.Error(err),
)
} else {
plog.Fatalf("cannot marshal event: %v", err)
}
}
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
err = tw.s.kvindex.Tombstone(key, idxRev)
......
}
- 生成 ibytes, 然后追加一个
appendMarkTombstone
标记,表示这个 revision 是 delete,并且生成一个只含有 key 的 mvccpb.KeyValue UnsafeSeqPut
删除磁盘 boltdb, 注意这里底层只是标记删除,磁盘空间并不释放Tombstone
结束当前生命周期,生成一个新的空 generation, 更新 kvindex
数据过期
与 pg 比较像,过期与删除数据都是惰性删除的。etcd 可以配置只保留固定时间的数据,所以会周期性的 Compact
. 同样分为两部分,内存 btree 数据如果发现当前 generation
为空,并且最大 Revision 己过期,那就从 btree 中删除。
磁盘数据由 boltdb 维护,只会标记为删除,磁盘空间可以回收利用,但是不会自动释放,只有调用 Defrag
才会重建磁盘文件。
另外说到存储引擎 boltdb
, 这个东西性能一般,除了 etcd 没有什么知名项目在用。读事务是并发,但是写事务是串行的,所以内部会将尽可能多的写入 batch 一起操作,异步提交。
小结
这次分享就这些,以后面还会分享更多关于 etcd 的内容,如果感兴趣,可以关注并转发(:
参考资料
[1]什么是 mvcc: https://en.wikipedia.org/wiki/Multiversion_concurrency_control,
[2]etcd 中让人头大的 version, revision, createRevision, modRevision: https://mp.weixin.qq.com/s/TFcSEBBMnb0wJ_A3R4Jfqw,