介绍
,特别是追踪MongoDB的操作日志是MongoDB中拥有多种用途、非常受欢迎的特色,例如向数据库发送一个有关所有变化的实时通知。一个可追踪游标在概念上与Unix系统中的”tail -f”命令相同。一旦已经到达结果集的末尾,游标将不会关闭,直到新数据到达,才会返回所有结果。
对于复制集而言,追踪操作日志非常简单,但是对于分片集群而言,会变得稍微复杂点。在这篇博文中,我们将说明如何在一个分片集群中追踪MongoDB的操作日志。
为什么要追踪操作日志?
可追踪游标可以用于封顶集合,并且经常用于数据流的发布-订阅类型。特别地,我们内部用于复制的MongoDB是一个,从节点将会使用一个可追踪的游标来获取用于复制的操作。
ETL中的第三方工具或者异构的复制集域也可以从MongoDB操作日志中读取事件。例如,都可以实现该功能。
但是,通过使用一个如此强大的接口,我们可以实现的不仅仅是复制集!已经成为主流趋势,特别是在HTML5以及JavaScript应用中。一旦你改变数据模型的某些值,一些现在的JavaScript框架将会立即自动地更新用户接口。
通过追踪操作日志的方法追踪一个MongoDB集合或者是整个数据集是上述编程模型的一个完美搭配。这意味着发生在整个数据库中的任何改变都将会出发一个实时的通知到应用服务器。
实际上,一个超棒的JavaScript框架已经正在实现这个工作了:Meteor。在他们的网站上有一个,你可以观看一下。这使得Meteor成为一个全堆栈的交互式平台:改变将自动从数据库传递到UI界面。
使用一个可追踪游标读取操作日志
下面是一个如何在mongo命令行中实现一个的实例:
shard01:PRIMARY> c = db.oplog.rs.find( { fromMigrate : { $exists : false } } ).addOption( DBQuery.Option.tailable ).addOption(DBQuery.Option.awaitData){ "ts" : Timestamp(1422998530, 1), "h" : NumberLong(0), "v" : 2, "op" : "n", "ns" : "", "o" : { "msg" : "initiating set" } }{ "ts" : Timestamp(1422998574, 1), "h" : NumberLong("-6781014703318499311"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 1, "data" : "hello" } }{ "ts" : Timestamp(1422998579, 1), "h" : NumberLong("-217362260421471244"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 3, "data" : "hello" } }{ "ts" : Timestamp(1422998584, 1), "h" : NumberLong("7215322058367374253"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 5, "data" : "hello" } }shard01:PRIMARY> c.hasNext()trueshard01:PRIMARY> c.next(){ "ts" : Timestamp(1423049506, 1), "h" : NumberLong("5775895302295493166"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 12, "data" : "hello" }}shard01:PRIMARY> c.hasNext()false
正如你说看到的,当在命令行中使用游标的时候,游标不会一直等待,而是在几秒钟之后进入超时状态。然后,你可以使用hasNext() 及next()方法来检查是否有新的数据到来。然后,确实有新数据到达!
当然,你也可以在 find() 函数中添加过滤器来获取那些你想得到的事件。例如,下面是一个Meteor中追踪游标的示例:
meteor:PRIMARY> db.currentOp(){ "inprog" : [ { "opid" : 345, "active" : true, "secs_running" : 4, "op" : "getmore", "ns" : "local.oplog.rs", "query" : { "ns" : { "$regex" : "^meteor\\." }, "$or" : [ { "op" : { "$in" : [ "i", "u", "d" ] } }, { "op" : "c", "o.drop" : { "$exists" : true } } ], "ts" : { "$gt" : Timestamp(1422200128, 7) } },
在分片集群中追踪操作日志
但是,当你使用分片时会面临什么呢?首先,你将不得不单独追踪每一个分片上的每一个操作日志。
这些仍然是可以实现的,但是,还有更多的其它并发情况。在一个分片集群中,MongoDB平衡器将会偶尔将数据从某个分片迁移到另一个。这就意味着,你在某个分片将会看到一系列删除操作,然后在下一个分片中将会同步看到一系列对应的插入操作。但是,这些仅仅是MongoDB内部的问题。如果你正在通过追踪操作日志来捕捉数据库中的变化,很有可能你并不希望看到这些情况,甚至也许会被这些内部事件弄糊涂。例如,一个在分片集群中追踪操作日志的Meteor应用也许会莫名其妙地突然删除一些数据!
我来举个例子。首先,我们先使用实用工具来搭建一个分片集群:
$ mlaunch --sharded 2 --replicasetlaunching: mongod on port 27018launching: mongod on port 27019launching: mongod on port 27020launching: mongod on port 27021launching: mongod on port 27022launching: mongod on port 27023launching: config server on port 27024replica set 'shard01' initialized.replica set 'shard02' initialized.launching: mongos on port 27017adding shards. can take up to 30 seconds...
接着,连接到mongos,对一个集合进行分片然后向里面插入一些数据:
$ mongoMongoDB shell version: 2.6.7connecting to: testmongos> sh.enableSharding( "test" ){ "ok" : 1 }mongos> sh.shardCollection( "test.mycollection", { _id : 1 }, true ){ "collectionsharded" : "test.mycollection", "ok" : 1 }mongos> db.mycollection.insert( { _id : 1, data : "hello" } )WriteResult({ "nInserted" : 1 })mongos> db.mycollection.insert( { _id : 3, data : "hello" } )WriteResult({ "nInserted" : 1 })mongos> db.mycollection.insert( { _id : 5, data : "hello" } )WriteResult({ "nInserted" : 1 })mongos> db.mycollection.insert( { _id : 7, data : "hello" } )WriteResult({ "nInserted" : 1 })mongos> db.mycollection.insert( { _id : 9, data : "hello" } )WriteResult({ "nInserted" : 1 })mongos> db.mycollection.insert( { _id : 11, data : "hello" } )WriteResult({ "nInserted" : 1 })
接着,如果我在shard01上连接到mongod,我们可以看到所有数据都在这里,我们也可以从操作日志中看到插入操作:
$ mongo --port 27018MongoDB shell version: 2.6.7connecting to: 127.0.0.1:27018/testshard01:PRIMARY> show collectionsmycollectionsystem.indexesshard01:PRIMARY> db.mycollection.find(){ "_id" : 1, "data" : "hello" }{ "_id" : 3, "data" : "hello" }{ "_id" : 5, "data" : "hello" }{ "_id" : 7, "data" : "hello" }{ "_id" : 9, "data" : "hello" }{ "_id" : 11, "data" : "hello" }shard01:PRIMARY> use localswitched to db localshard01:PRIMARY> show collectionsmeoplog.rsslavesstartup_logsystem.indexessystem.replsetshard01:PRIMARY> db.oplog.rs.find().pretty(){ "ts" : Timestamp(1422998530, 1), "h" : NumberLong(0), "v" : 2, "op" : "n", "ns" : "", "o" : { "msg" : "initiating set" }}{ "ts" : Timestamp(1422998574, 1), "h" : NumberLong("-6781014703318499311"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 1, "data" : "hello" }}{ "ts" : Timestamp(1422998579, 1), "h" : NumberLong("-217362260421471244"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 3, "data" : "hello" }}{ "ts" : Timestamp(1422998584, 1), "h" : NumberLong("7215322058367374253"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 5, "data" : "hello" }}{ "ts" : Timestamp(1422998588, 1), "h" : NumberLong("-5372877897993278968"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 7, "data" : "hello" }}{ "ts" : Timestamp(1422998591, 1), "h" : NumberLong("-243188455606213719"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 9, "data" : "hello" }}{ "ts" : Timestamp(1422998597, 1), "h" : NumberLong("5040618552262309692"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 11, "data" : "hello" }}
而在shard02上,到目前为止还没有任何数据,那是因为当前数据量太小,平衡器并不会运行。接着,我们将数据切分为2块,这将触发一个平衡器回合:
mongos> sh.status()--- Sharding Status --- sharding version: { "_id" : 1, "version" : 4, "minCompatibleVersion" : 4, "currentVersion" : 5, "clusterId" : ObjectId("54d13c0555c0347d23e33cdd")} shards: { "_id" : "shard01", "host" : "shard01/hingo-sputnik:27018,hingo-sputnik:27019,hingo-sputnik:27020" } { "_id" : "shard02", "host" : "shard02/hingo-sputnik:27021,hingo-sputnik:27022,hingo-sputnik:27023" } databases: { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : true, "primary" : "shard01" } test.mycollection shard key: { "_id" : 1 } chunks: shard01 1 { "_id" : { "$minKey" : 1 } } -->> { "_id" : { "$maxKey" : 1 } } on : shard01 Timestamp(1, 0) mongos> sh.splitAt( "test.mycollection", { _id : 6 } ){ "ok" : 1 }mongos> sh.status()--- Sharding Status --- sharding version: { "_id" : 1, "version" : 4, "minCompatibleVersion" : 4, "currentVersion" : 5, "clusterId" : ObjectId("54d13c0555c0347d23e33cdd")} shards: { "_id" : "shard01", "host" : "shard01/hingo-sputnik:27018,hingo-sputnik:27019,hingo-sputnik:27020" } { "_id" : "shard02", "host" : "shard02/hingo-sputnik:27021,hingo-sputnik:27022,hingo-sputnik:27023" } databases: { "_id" : "admin", "partitioned" : false, "primary" : "config" } { "_id" : "test", "partitioned" : true, "primary" : "shard01" } test.mycollection shard key: { "_id" : 1 } chunks: shard02 1 shard01 1 { "_id" : { "$minKey" : 1 } } -->> { "_id" : 6 } on : shard02 Timestamp(2, 0) { "_id" : 6 } -->> { "_id" : { "$maxKey" : 1 } } on : shard01 Timestamp(2, 1) mongos>
可以发现,集合正在被切分为2块,平衡器也已经在进行它的工作:将数据均匀地在分片中进行迁移。如果我们回到shard01,我们可以看见一半的记录是如何不见的({“op”: “d”} 表示删除操作):
shard01:PRIMARY> use testswitched to db testshard01:PRIMARY> db.mycollection.find(){ "_id" : 7, "data" : "hello" }{ "_id" : 9, "data" : "hello" }{ "_id" : 11, "data" : "hello" }shard01:PRIMARY> use localswitched to db localshard01:PRIMARY> db.oplog.rs.find().pretty(){ "ts" : Timestamp(1422998530, 1), "h" : NumberLong(0), "v" : 2, "op" : "n", "ns" : "", "o" : { "msg" : "initiating set" }}{ "ts" : Timestamp(1422998574, 1), "h" : NumberLong("-6781014703318499311"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 1, "data" : "hello" }}{ "ts" : Timestamp(1422998579, 1), "h" : NumberLong("-217362260421471244"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 3, "data" : "hello" }}{ "ts" : Timestamp(1422998584, 1), "h" : NumberLong("7215322058367374253"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 5, "data" : "hello" }}{ "ts" : Timestamp(1422998588, 1), "h" : NumberLong("-5372877897993278968"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 7, "data" : "hello" }}{ "ts" : Timestamp(1422998591, 1), "h" : NumberLong("-243188455606213719"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 9, "data" : "hello" }}{ "ts" : Timestamp(1422998597, 1), "h" : NumberLong("5040618552262309692"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 11, "data" : "hello" }}{ "ts" : Timestamp(1422998892, 1), "h" : NumberLong("3056127588031004421"), "v" : 2, "op" : "d", "ns" : "test.mycollection", "fromMigrate" : true, "o" : { "_id" : 1 }}{ "ts" : Timestamp(1422998892, 2), "h" : NumberLong("-7633416138502997855"), "v" : 2, "op" : "d", "ns" : "test.mycollection", "fromMigrate" : true, "o" : { "_id" : 3 }}{ "ts" : Timestamp(1422998892, 3), "h" : NumberLong("1499304029305069766"), "v" : 2, "op" : "d", "ns" : "test.mycollection", "fromMigrate" : true, "o" : { "_id" : 5 }}shard01:PRIMARY>
然后,在shard02我们也可以看到相同的记录出现了:
$ mongo --port 27021MongoDB shell version: 2.6.7connecting to: 127.0.0.1:27021/testshard02:PRIMARY> db.mycollection.find(){ "_id" : 1, "data" : "hello" }{ "_id" : 3, "data" : "hello" }{ "_id" : 5, "data" : "hello" }shard02:PRIMARY> use localswitched to db localshard02:PRIMARY> db.oplog.rs.find().pretty(){ "ts" : Timestamp(1422998531, 1), "h" : NumberLong(0), "v" : 2, "op" : "n", "ns" : "", "o" : { "msg" : "initiating set" }}{ "ts" : Timestamp(1422998890, 1), "h" : NumberLong("-6780991630754185199"), "v" : 2, "op" : "i", "ns" : "test.system.indexes", "fromMigrate" : true, "o" : { "v" : 1, "key" : { "_id" : 1 }, "name" : "_id_", "ns" : "test.mycollection" }}{ "ts" : Timestamp(1422998890, 2), "h" : NumberLong("-165956952201849851"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "fromMigrate" : true, "o" : { "_id" : 1, "data" : "hello" }}{ "ts" : Timestamp(1422998890, 3), "h" : NumberLong("-7432242710082771022"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "fromMigrate" : true, "o" : { "_id" : 3, "data" : "hello" }}{ "ts" : Timestamp(1422998890, 4), "h" : NumberLong("6790671206092100026"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "fromMigrate" : true, "o" : { "_id" : 5, "data" : "hello" }}
如果我们再次插入更多数据:
mongos> db.mycollection.insert( { _id : 2, data : "hello" } )WriteResult({ "nInserted" : 1 })mongos> db.mycollection.insert( { _id : 4, data : "hello" } )WriteResult({ "nInserted" : 1 })mongos> db.mycollection.insert( { _id : 6, data : "hello" } )WriteResult({ "nInserted" : 1 })mongos> db.mycollection.insert( { _id : 8, data : "hello" } )WriteResult({ "nInserted" : 1 })mongos> db.mycollection.insert( { _id : 10, data : "hello" } )WriteResult({ "nInserted" : 1 })mongos> db.mycollection.find(){ "_id" : 1, "data" : "hello" }{ "_id" : 7, "data" : "hello" }{ "_id" : 3, "data" : "hello" }{ "_id" : 9, "data" : "hello" }{ "_id" : 5, "data" : "hello" }{ "_id" : 11, "data" : "hello" }{ "_id" : 2, "data" : "hello" }{ "_id" : 6, "data" : "hello" }{ "_id" : 4, "data" : "hello" }{ "_id" : 8, "data" : "hello" }{ "_id" : 10, "data" : "hello" }
然后,和我们预料的一样,这些插入出现在了shard01中:
shard01:PRIMARY> use localswitched to db localshard01:PRIMARY> db.oplog.rs.find().pretty()...beginning is the same as above, omitted for brevity ...{ "ts" : Timestamp(1422998892, 3), "h" : NumberLong("1499304029305069766"), "v" : 2, "op" : "d", "ns" : "test.mycollection", "fromMigrate" : true, "o" : { "_id" : 5 }}{ "ts" : Timestamp(1422999422, 1), "h" : NumberLong("-6691556866108433789"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 6, "data" : "hello" }}{ "ts" : Timestamp(1422999426, 1), "h" : NumberLong("-3908881761176526422"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 8, "data" : "hello" }}{ "ts" : Timestamp(1422999429, 1), "h" : NumberLong("-4997431625184830993"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 10, "data" : "hello" }}shard01:PRIMARY>
而shard02中的数据如下:
shard02:PRIMARY> use localswitched to db localshard02:PRIMARY> db.oplog.rs.find().pretty(){ "ts" : Timestamp(1422998531, 1), "h" : NumberLong(0), "v" : 2, "op" : "n", "ns" : "", "o" : { "msg" : "initiating set" }}{ "ts" : Timestamp(1422998890, 1), "h" : NumberLong("-6780991630754185199"), "v" : 2, "op" : "i", "ns" : "test.system.indexes", "fromMigrate" : true, "o" : { "v" : 1, "key" : { "_id" : 1 }, "name" : "_id_", "ns" : "test.mycollection" }}{ "ts" : Timestamp(1422998890, 2), "h" : NumberLong("-165956952201849851"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "fromMigrate" : true, "o" : { "_id" : 1, "data" : "hello" }}{ "ts" : Timestamp(1422998890, 3), "h" : NumberLong("-7432242710082771022"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "fromMigrate" : true, "o" : { "_id" : 3, "data" : "hello" }}{ "ts" : Timestamp(1422998890, 4), "h" : NumberLong("6790671206092100026"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "fromMigrate" : true, "o" : { "_id" : 5, "data" : "hello" }}{ "ts" : Timestamp(1422999414, 1), "h" : NumberLong("8160426227798471967"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 2, "data" : "hello" }}{ "ts" : Timestamp(1422999419, 1), "h" : NumberLong("-3554656302824563522"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 4, "data" : "hello" }}shard02:PRIMARY>
将内部操作与应用操作分离
因此,如果一个像Meteor一样的应用像上面一样进行读取,这肯定会给识别数据模型的最终状态带来极大的挑战。如果我们仅仅是简单地从所有分片中整合操作日志事件,将会出现下面这些插入和删除操作:
OPERATION | _ID |
insert | 1 |
insert | 3 |
insert | 5 |
insert | 7 |
insert | 9 |
insert | 11 |
insert | 1 |
insert | 3 |
insert | 5 |
delete | 1 |
delete | 3 |
delete | 5 |
insert | 2 |
insert | 4 |
insert | 6 |
insert | 8 |
insert | 10 |
因此,给定上述序列,id为1,3,5的数据是否还存在呢?
幸运的是,区分集群内部的操作和应用操作是可能的。也许你已经注意到由迁移产生的操作有一个fromMigrate 的标记集合:
{ "ts" : Timestamp(1422998890, 2), "h" : NumberLong("-165956952201849851"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "fromMigrate" : true, "o" : { "_id" : 1, "data" : "hello" }}
由于我们只对那些在把集群看做一个整体时,真实改变数据库状态的的操作感兴趣,我们就可以过滤掉所有设置了该标记的信息。注意:正确的方式是使用:$exists,而不是false:
shard01:PRIMARY> db.oplog.rs.find( { fromMigrate : false } ).pretty() shard01:PRIMARY> db.oplog.rs.find( { fromMigrate : { $exists : false } } ).pretty(){ "ts" : Timestamp(1422998530, 1), "h" : NumberLong(0), "v" : 2, "op" : "n", "ns" : "", "o" : { "msg" : "initiating set" }}{ "ts" : Timestamp(1422998574, 1), "h" : NumberLong("-6781014703318499311"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 1, "data" : "hello" }}{ "ts" : Timestamp(1422998579, 1), "h" : NumberLong("-217362260421471244"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 3, "data" : "hello" }}{ "ts" : Timestamp(1422998584, 1), "h" : NumberLong("7215322058367374253"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 5, "data" : "hello" }}{ "ts" : Timestamp(1422998588, 1), "h" : NumberLong("-5372877897993278968"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 7, "data" : "hello" }}{ "ts" : Timestamp(1422998591, 1), "h" : NumberLong("-243188455606213719"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 9, "data" : "hello" }}{ "ts" : Timestamp(1422998597, 1), "h" : NumberLong("5040618552262309692"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 11, "data" : "hello" }}{ "ts" : Timestamp(1422999422, 1), "h" : NumberLong("-6691556866108433789"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 6, "data" : "hello" }}{ "ts" : Timestamp(1422999426, 1), "h" : NumberLong("-3908881761176526422"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 8, "data" : "hello" }}{ "ts" : Timestamp(1422999429, 1), "h" : NumberLong("-4997431625184830993"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 10, "data" : "hello" }}shard01:PRIMARY>
然后,在shard02中,数据如下:
shard02:PRIMARY> db.oplog.rs.find( { fromMigrate : { $exists : false } } ).pretty(){ "ts" : Timestamp(1422998531, 1), "h" : NumberLong(0), "v" : 2, "op" : "n", "ns" : "", "o" : { "msg" : "initiating set" }}{ "ts" : Timestamp(1422999414, 1), "h" : NumberLong("8160426227798471967"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 2, "data" : "hello" }}{ "ts" : Timestamp(1422999419, 1), "h" : NumberLong("-3554656302824563522"), "v" : 2, "op" : "i", "ns" : "test.mycollection", "o" : { "_id" : 4, "data" : "hello" }}shard02:PRIMARY>
的确就是我们想要的!
如果你对学习更多关于MongoDB操作的最佳实践感兴趣,下载我们的指南:
本文译自:。