Mongodb数据迁移:MongoShake源码阅读
简述
- 近期用到了MongoShake做数据迁移, 顺便看看源码, 本篇为阅读源码的笔记
- 本文只讲数据迁移这块相关的
原理及架构
源码信息
- 源码版本: v2.8.1
- 源码仓库: https://github.com/alibaba/MongoShake
- 本文贴的源码片段为了只关注核心逻辑, 无关紧要的代码段会干掉或注释代替
一. 进程入口:
collector
的func main
为进程的入口, 核心源码如下:- 简单来讲做了3件事情
- 初始化和校验配置参数
- 加进程锁
- 使用了第三方工具
github.com/nightlyone/lockfile
实现的 - 简单讲就是在
conf.Options.LogDirectory
目录下建一个{conf.Options.Id}.pid
文件来实现进程锁; 所以你可以扫描该目录下有多少个pid文件来确认当前有多少MongoShake进程在运行
- 使用了第三方工具
- 开始跑数据迁移(startup)
1 | // cmd/collector/collector.go |
- 上面的
startup
里面的coordinator.Run()
会根据syncMode
走全量
,增量
或全量+增量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25switch syncMode {
case utils.VarSyncModeAll: // 全量+增量
if conf.Options.FullSyncReaderOplogStoreDisk {
LOG.Info("run parallel document oplog")
if err := coordinator.parallelDocumentOplog(fullBeginTs); err != nil {
return err
}
} else {
LOG.Info("run serialize document oplog")
if err := coordinator.serializeDocumentOplog(fullBeginTs); err != nil {
return err
}
}
case utils.VarSyncModeFull: // 全量
if err := coordinator.startDocumentReplication(); err != nil {
return err
}
case utils.VarSyncModeIncr: // 增量
if err := coordinator.startOplogReplication(int64(0), int64(0), startTsMap); err != nil {
return err
}
default:
LOG.Critical("unknown sync mode %v", conf.Options.SyncMode)
return errors.New("unknown sync mode " + conf.Options.SyncMode)
}
二. 全量同步
- 根据上面可知道, 全量同步的入口在
ReplicationCoordinator.startDocumentReplication
, 主要分为3步:获取同步源的所有表, 删除同步目标对应的表
- 上面说的
表
实际在源码中是ns(namespace), 格式为f’{database}.{collection}’, 后面会经常提到
- 上面说的
同步所有索引数据 (Option为后台索引时是放在这一步, 如果是前台索引则是同步db后才执行同步)
- 查询源库getIndexes, 然后在目标库createIndex
同步db(collection)数据
- 多线程同步, 线程数取决源数据地址和架构
- mongos只算一个地址即一条线程, 多线程是针对mongod的情况
- dbSyncer是同步的核心, 一条线程对应一个dbSyncer, 负责单个db实例的全量同步工作, 所以通常情况下只有1个dbSyncer
1 | // collector/coordinator/full.go |
全量同步的核心: dbSyncer
dbSyncer
负责单个db的全量同步工作, 同步逻辑如下的Start
:- 将同步任务细分为
nsList
,ns
即namespace, 其实就是collection
- 开n条线程, n=
conf.Options.FullSyncReaderCollectionParallel
- 将上面细分的任务
nsList
丢给这些线程去运行, 即1个collection
的同步是一个任务单元, 所以多个collection
是做到了并行同步的 - 任务单元的执行逻辑在
dbSyncer.collectionSync
- 将同步任务细分为
1 | // 每个collection为1个任务(nsList), 分配给n条线程执行 |
collectionSync
的逻辑如下, 在collection
这个粒度下还在splitter
中做了细分任务, 这个是根据splitKeys
去做分割的, 一般很少用到splitter
负责从源读数据, 数据reader入队到splitter.readerChan
splitSync
那上面出队的reader
进行读数据, 这里就是读数据的终点了- 读到的真实数据(
BSON
)会交给colExecutor
去同步给目标 colExecutor
也是一个生产-消费
模型, 上面读到的数据会推给colExecutor.docBatch (chan []*bson.Raw)
, 在colExecutor.Start()
中处理队列数据colExecutor
又又又细分了DocExecutor
,DocExecutor
拿到docBatch
里面的数据后, 才真正的进行同步exec.doSync(docs)
, 这里就是写数据的终点了- 写数据用的
BulkWrite
去批量写入
1 | // 单个collection的全量同步, splitter负责从FromMongoUrl-ns读, colExecutor负责往ToMongoUrl-toNS写 |
- 看完了全量同步流程, 很明显有个缺陷, 全量同步期间源端有数据更新, 两边数据就不一致了, 当然这个限制条件官方也是有指出的
三. 增量同步
增量同步逻辑在
OplogSyncer
OplogSyncer
和DbSyncer
类似- 多线程同步, 线程数取决源数据地址和架构
- mongos只算一个地址即一条线程, 多线程是针对mongod的情况
- 一条线程对应一个OplogSyncer, 负责单个db实例的增量同步工作, 所以通常情况下只有1个OplogSyncer
依旧是
生产-消费
模型- worker是消费者, Batcher是生产者
1 | // collector/coordinator/incr.go |
- 附oplog实例及字段意义
1
2
3
4
5
6
7
8
9
10
11
12
13
14{
"ts" : Timestamp(1582277156, 1), // 操作的时间戳,64位表示,高32位是时间戳,低32位是计数累加
"t" : NumberLong(1), // 对应raft协议里面的term,每次发生节点down掉,新节点加入,主从切换,term都会自增
"h" : NumberLong(0), // 操作的全局唯一id的hash结果
"v" : 2, // oplog的版本字段
"op" : "i", // "i"表示插入,"d"表示删除,"u"表示更新,"c"表示DDL操作,"n"表示心跳
"ns" : "zz.test", // 命名空间。操作发生在哪个表上面
"ui" : UUID("20d9f949-cfc7-496e-a80e-32ba633701a8"), // 表的uuid
"wall" : ISODate("2020-02-21T09:25:56.570Z"),
"o" : { // 具体的操作指令字段, 跟"op"一对
"_id" : ObjectId("5e4fa224a6717632d6ee2e85"),
"kick" : 1
}
}