mongodb官方备份工具, MongoDump源码分析
简介
- 最近用到了该模块, 期间看了源码, 这里顺便做下笔记
- 本文前几小节讲MongoDump的运转流程, 后几小节讲代码方面的技术细节
- 源码版本: 100.6.1
- 源码仓库: https://github.com/mongodb/mongo-tools/blob/100.6.1/mongodump
0. 备份总览: MongoDump.Dump
备份逻辑就在MongoDump的Dump方法 (mongodump/mongodump.go/MongoDump.Dump)
Dump主要分4步:
- 各种初始化工作
- 备份metaData, index, users, roles, version 等基础数据
- 备份collections
- 备份oplog
后面会经常提到
Intent
, 这是MongoDump自己的一个抽象概念, 可以简单理解为备份任务单元, 例如一个collection的备份对应一个Intent, oplog的备份对应一个Intent等等; 在阅读源码时你可以将Intent
在脑海里替换成Task
. 关于Intent
详见本文后面章节核心逻辑见以下源码及注释(为了方便阅读, 这里我删减了些不关键的逻辑):
1 | // Dump是MongoDump的一个方法 |
1. 备份metadata
备份metadata的逻辑比较简单, 就是将
Metadata
jsonMarshal后写入intent.MetadataFile (io)
源码逻辑如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92type Metadata struct {
Options bson.M `bson:"options,omitempty"`
Indexes []bson.D `bson:"indexes"`
UUID string `bson:"uuid,omitempty"`
CollectionName string `bson:"collectionName"`
Type string `bson:"type,omitempty"`
}
func (dump *MongoDump) dumpMetadata(intent *intents.Intent, buffer resettableOutputBuffer) (err error) {
// 1. 填充Metadata, 值取自入参`intent`
meta := Metadata{
Indexes: []bson.D{},
}
meta.Options = intent.Options
meta.UUID = intent.UUID
meta.CollectionName = intent.C
if intent.Type != "" {
meta.Type = intent.Type
}
session, err := dump.SessionProvider.GetSession()
if err != nil {
return err
}
// 获取源端的index并set进meta.Indexes
if dump.OutputOptions.ViewsAsCollections || intent.IsView() {
dump.Logger.Logvf(log.DebugLow, "not dumping indexes metadata for '%v' because it is a view", intent.Namespace())
} else {
// get the indexes
indexesIter, err := db.GetIndexes(session.Database(intent.DB).Collection(intent.C))
if err != nil {
return err
}
if indexesIter == nil {
dump.Logger.Logvf(log.Always, "the collection %v appears to have been dropped after the dump started", intent.Namespace())
return nil
}
defer indexesIter.Close(context.Background())
ctx := context.Background()
for indexesIter.Next(ctx) {
indexOpts := &bson.D{}
err := indexesIter.Decode(indexOpts)
if err != nil {
return fmt.Errorf("error converting index: %v", err)
}
meta.Indexes = append(meta.Indexes, *indexOpts)
}
if err := indexesIter.Err(); err != nil {
return fmt.Errorf("error getting indexes for collection `%v`: %v", intent.Namespace(), err)
}
}
// 2. 把Metadata写入intent.MetadataFile
/* 后面就是将meta jsonMarshal后写入intent.MetadataFile 而已*/
jsonBytes, err := bson.MarshalExtJSON(meta, true, false)
if err != nil {
return fmt.Errorf("error marshalling metadata json for collection `%v`: %v", intent.Namespace(), err)
}
err = intent.MetadataFile.Open()
if err != nil {
return err
}
defer func() {
closeErr := intent.MetadataFile.Close()
if err == nil && closeErr != nil {
err = fmt.Errorf("error writing metadata for collection `%v` to disk: %v", intent.Namespace(), closeErr)
}
}()
var f io.Writer
f = intent.MetadataFile
if buffer != nil {
buffer.Reset(f)
f = buffer
defer func() {
closeErr := buffer.Close()
if err == nil && closeErr != nil {
err = fmt.Errorf("error writing metadata for collection `%v` to disk: %v", intent.Namespace(), closeErr)
}
}()
}
_, err = f.Write(jsonBytes)
if err != nil {
err = fmt.Errorf("error writing metadata for collection `%v` to disk: %v", intent.Namespace(), err)
}
return
}
2. 备份collections
源码如下, 就是从intent的manager中不断取intent分配给n条线程进行备份
一个intent对应一个collection的备份任务
1 | // 并发备份collections, NumParallelCollections条线程 |
3. 备份oplog
备份oplog的逻辑比较简单, 将查询oplog的结果写入oplog对应的intent.BSONFile
如果对oplog不熟悉可以看下官方文档: https://www.mongodb.com/zh-cn/docs/manual/core/replica-set-oplog/
1 | // 查询start与end之间的oplog, 写入 dump.manager.Oplog().BSONFile |
备份单元: Intent
备份任务单元, 可以简单理解1个collection的备份任务就叫intent, 拆分是为了多线程执行
Intent相关的结构和关键方法:
1 | type Intent struct { |
多路读写模块: archive.Writer/Reader
这是MongoDump唯一比较复杂的模块, 因为只讲备份, 所以只讲
archive.Writer
,archive.Reader
是恢复时用到, 原理一样多路读写核心
Multiplexer
, 里面有个核心组件MuxIn
, 这东西其实就是上面提到的BSONFile
的实现, 每次BSONFile.Open
就相当于New
一个NuxIn
然后塞给Multiplexer
管理,MuxIn
就是多路读写里面的路
多路读写怎么实现? 其实本质是
多路In, 一路Out
, 上面提到的MuxIn
是实现多路In
, 而一路Out
的关键逻辑在Multiplexer.formatBody
, 这里可以看看下面的源码, 其实就是利用写入header和namespace来做数据隔离, 配合Multiplexer
的select channel
这样就实现了多路读写. 这个思想是值得学习的概念那么多是不是看了头晕? 我们将所有概念都关联起来捋一下:
- 在1次备份中, 只有1个
archive.Writer
, 也意味着只有1个Multiplexer
, 1个Multiplexer
管理了n
个MuxIn
,n
又等于Intent
的个数,Intent
有多少个?Intent
的个数为len(collections) + 1 + 1
, 这里的两个1
分别是metadata
和oplog
- 在1次备份中, 只有1个
Multiplexer
源码的几个核心方法:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150type Writer struct {
Out io.WriteCloser
Prelude *Prelude
Mux *Multiplexer
}
type Multiplexer struct {
Out io.WriteCloser
Control chan *MuxIn
Completed chan error
shutdownInputs notifier
// ins and selectCases are correlating slices
ins []*MuxIn
selectCases []reflect.SelectCase
currentNamespace string
}
type notifier interface {
Notify()
}
func NewMultiplexer(out io.WriteCloser, shutdownInputs notifier) *Multiplexer {
mux := &Multiplexer{
Out: out,
Control: make(chan *MuxIn),
Completed: make(chan error),
shutdownInputs: shutdownInputs,
ins: []*MuxIn{
nil, // There is no MuxIn for the Control case
},
}
// 反射实现channel select, 非常少见的玩法!
mux.selectCases = []reflect.SelectCase{
{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(mux.Control),
Send: reflect.Value{},
},
}
return mux
}
// 核心事件循环: 处理MuxIn的增删事件和来自MuxIn的写数据事件
func (mux *Multiplexer) Run() {
var err, completionErr error
for {
// select的反射玩法, 学到了
index, value, notEOF := reflect.Select(mux.selectCases)
EOF := !notEOF
if index == 0 { // index 0 为 mux.Control, 用于接收新的MuxIn
if EOF {
log.Logvf(log.DebugLow, "Mux finish")
mux.Out.Close()
if completionErr != nil {
mux.Completed <- completionErr
} else if len(mux.selectCases) != 1 {
mux.Completed <- fmt.Errorf("Mux ending but selectCases still open %v",
len(mux.selectCases))
} else {
mux.Completed <- nil
}
return
}
muxIn, ok := value.Interface().(*MuxIn)
if !ok {
mux.Completed <- fmt.Errorf("non MuxIn received on Control chan") // one for the MuxIn.Open
return
}
log.Logvf(log.DebugLow, "Mux open namespace %v", muxIn.Intent.DataNamespace())
mux.selectCases = append(mux.selectCases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(muxIn.writeChan),
Send: reflect.Value{},
})
mux.ins = append(mux.ins, muxIn)
} else { // index > 0 为 MuxIn.writeChan, 用于接收MuxIn.Write的data
if EOF {
mux.ins[index].writeCloseFinishedChan <- struct{}{}
err = mux.formatEOF(index, mux.ins[index])
if err != nil {
mux.shutdownInputs.Notify()
mux.Out = &nopCloseNopWriter{}
completionErr = err
}
log.Logvf(log.DebugLow, "Mux close namespace %v", mux.ins[index].Intent.DataNamespace())
mux.currentNamespace = ""
mux.selectCases = append(mux.selectCases[:index], mux.selectCases[index+1:]...)
mux.ins = append(mux.ins[:index], mux.ins[index+1:]...)
} else {
bsonBytes, ok := value.Interface().([]byte)
if !ok {
mux.Completed <- fmt.Errorf("multiplexer received a value that wasn't a []byte")
return
}
// format bsonBytes, 然后 mux.Out.Write(bsonBytes)
err = mux.formatBody(mux.ins[index], bsonBytes)
if err != nil {
mux.shutdownInputs.Notify()
mux.Out = &nopCloseNopWriter{}
completionErr = err
}
}
}
}
}
// 核心逻辑, 这个里的header用于隔离不同namespace的数据, 已达到多路的效果, 恢复的时候也是根据header来恢复的
// mux.Out.Write header和bsonBytes, 这里的Out其实就是dump.archive.Out
func (mux *Multiplexer) formatBody(in *MuxIn, bsonBytes []byte) error {
var err error
var length int
defer func() {
in.writeLenChan <- length
}()
if in.Intent.DataNamespace() != mux.currentNamespace {
// Handle the change of which DB/Collection we're writing docs for
// If mux.currentNamespace then we need to terminate the current block
if mux.currentNamespace != "" {
l, err := mux.Out.Write(terminatorBytes)
if err != nil {
return err
}
if l != len(terminatorBytes) {
return io.ErrShortWrite
}
}
header, err := bson.Marshal(NamespaceHeader{
Database: in.Intent.DB,
Collection: in.Intent.DataCollection(),
})
if err != nil {
return err
}
l, err := mux.Out.Write(header)
if err != nil {
return err
}
if l != len(header) {
return io.ErrShortWrite
}
}
mux.currentNamespace = in.Intent.DataNamespace()
length, err = mux.Out.Write(bsonBytes)
if err != nil {
return err
}
return nil
}MuxIn
源码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73type MuxIn struct {
writeChan chan []byte
writeLenChan chan int
writeCloseFinishedChan chan struct{}
buf []byte
hash hash.Hash64
Intent *intents.Intent
Mux *Multiplexer
}
func (muxIn *MuxIn) Read([]byte) (int, error) {
return 0, nil
}
func (muxIn *MuxIn) Pos() int64 {
return 0
}
// 关闭muxIn内部的所有chan, 最后multiplexer会收到关闭信号并返回formatEOF, 同时multiplexer也会发信号muxIn.writeCloseFinishedChan
func (muxIn *MuxIn) Close() error {
// the mux side of this gets closed in the mux when it gets an eof on the read
log.Logvf(log.DebugHigh, "MuxIn close %v", muxIn.Intent.DataNamespace())
if bufferWrites {
muxIn.writeChan <- muxIn.buf
length := <-muxIn.writeLenChan
if length != len(muxIn.buf) {
return io.ErrShortWrite
}
muxIn.buf = nil
}
close(muxIn.writeChan)
close(muxIn.writeLenChan)
<-muxIn.writeCloseFinishedChan
return nil
}
// 初始化muxIn, 然后把自己发给 muxIn.Mux
func (muxIn *MuxIn) Open() error {
log.Logvf(log.DebugHigh, "MuxIn open %v", muxIn.Intent.DataNamespace())
muxIn.writeChan = make(chan []byte)
muxIn.writeLenChan = make(chan int)
muxIn.writeCloseFinishedChan = make(chan struct{})
muxIn.buf = make([]byte, 0, bufferSize)
muxIn.hash = crc64.New(crc64.MakeTable(crc64.ECMA))
if bufferWrites {
muxIn.buf = make([]byte, 0, db.MaxBSONSize)
}
muxIn.Mux.Control <- muxIn
return nil
}
// buf写入muxIn.buf, 满了就把muxIn.buf写入muxIn.writeChan, 然后清空muxIn.buf
func (muxIn *MuxIn) Write(buf []byte) (int, error) {
if bufferWrites { // 固定true
if len(muxIn.buf)+len(buf) > cap(muxIn.buf) {
muxIn.writeChan <- muxIn.buf
length := <-muxIn.writeLenChan
if length != len(muxIn.buf) {
return 0, io.ErrShortWrite
}
muxIn.buf = muxIn.buf[:0]
}
muxIn.buf = append(muxIn.buf, buf...)
} else {
muxIn.writeChan <- buf
length := <-muxIn.writeLenChan
if length != len(buf) {
return 0, io.ErrShortWrite
}
}
muxIn.hash.Write(buf)
return len(buf), nil
}