...
User-specified applyOps operations can happen in some scenarios we care about, such as using mongomirror to import data into atlas. They may also happen in some backup scenarios and in mongorestore, but I don't know much about that. It seems incorrect for change streams to miss events generated in this way, though it's not clear when this might come up. One scenario might be cutting over to atlas and using a resume token from the old cluster. From the query perspective, in order to support this, it seems tlike we just have to take out some restrictions such as these filters on lsid and txnNumber to get applyOps to show up. As a separate piece of work that is very related, I filed SERVER-44450. Original Description From jesse, this comment means change streams could miss important operations. I'm not sure why this was deliberately done: Change streams deliberately ignore user-initiated applyOps commands, they only generate events from transactions' applyOps oplog entries. (Change streams require a txnNumber and lsid in an applyOps oplog entry.) Change streams even ignore the individual oplog entries that are generated when a user-initiated applyOps is executed non-atomically. Such entries have "fromMigrate: true", and change streams filter such entries out.
jesse commented on Tue, 12 Nov 2019 15:50:27 +0000: Debugging more carefully, I see a few things going on. An insert operation in a user-executed applyOps command produces either an applyOps oplog entry with lsid but no txnNumber or, if it's not atomic, a regular oplog entry with fromMigrate: true. Say we do a regular insert, an applyOps insert, and a non-atomic applyOps insert: replset:PRIMARY> db.collection.insert({}) WriteResult({ "nInserted" : 1 }) replset:PRIMARY> db.runCommand({applyOps: [{op: "i", ns: 'test.collection', o: {_id: ObjectId()}}]}) ... replset:PRIMARY> db.runCommand({applyOps: [{op: "i", ns: 'test.collection', o: {_id: ObjectId()}}], allowAtomic: false}) ... The oplog entries are an insert, an applyOps oplog entry with lsid but no txnNumber, and an insert with fromMigrate: replset:PRIMARY> db.getSiblingDB('local').oplog.rs.find({$or: [{ns: 'test.collection'}, {'o.applyOps': {$exists: 1}}]}).pretty() // from regular insert { "op" : "i", "ns" : "test.collection", "ui" : UUID("0823a4c8-4dfb-4ef9-aa85-c80d7ee54b33"), "o" : { "_id" : ObjectId("5dcad279d03e388ec211e0c6") }, "ts" : Timestamp(1573573241, 2), "t" : NumberLong(1), "wall" : ISODate("2019-11-12T15:40:41.501Z"), "v" : NumberLong(2) } // from user-executed applyOps { "op" : "c", "ns" : "test.$cmd", "o" : { "applyOps" : [ { "op" : "i", "ns" : "test.collection", "o" : { "_id" : ObjectId("5dcad282d03e388ec211e0c7") }, "ui" : UUID("0823a4c8-4dfb-4ef9-aa85-c80d7ee54b33") } ], "lsid" : { "id" : UUID("4833ef9e-59a9-4db9-a703-b98a774600fa") }, "$clusterTime" : { "clusterTime" : Timestamp(1573573241, 2), "signature" : { "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="), "keyId" : NumberLong(0) } }, "$db" : "test" }, "ts" : Timestamp(1573573250, 1), "t" : NumberLong(1), "wall" : ISODate("2019-11-12T15:40:50.699Z"), "v" : NumberLong(2) } // from user-executed non-atomic applyOps { "op" : "i", "ns" : "test.collection", "ui" : UUID("0823a4c8-4dfb-4ef9-aa85-c80d7ee54b33"), "o" : { "_id" : ObjectId("5dcad292d03e388ec211e0c8") }, "ts" : Timestamp(1573573267, 1), "t" : NumberLong(1), "wall" : ISODate("2019-11-12T15:41:07Z"), "v" : NumberLong(2), "fromMigrate" : true // <-- FROM MIGRATE } A change stream includes the regular insert entry, but it ignores the applyOps oplog entry (because it has an lsid but no txnNumber?) and the second insert (because it has fromMigrate). (To avoid confusion I'm going to edit my previous comment, which was wrong about fromMigrate.) charlie.swanson commented on Wed, 6 Nov 2019 18:12:31 +0000: Ok thanks all! Without any hard evidence that we have 'fromMigrate' oplog entries I think we can just proceed under the assumption it's just about the lsid and txnNumber. We can start there and see what we find in testing. That reproducer is helpful jesse! I'm putting this back in to "Needs Scheduling" now that I understand this better. jesse commented on Wed, 6 Nov 2019 17:42:42 +0000: There is some logic that makes change streams ignore applyOps events, however. When I do this on a recently-built replica set with Python: >>> import pymongo >>> c = pymongo.MongoClient() >>> for i in c.test.collection.watch(): ... print(i) ... ... and do this in the shell: replset:PRIMARY> db.collection.insert({}) WriteResult({ "nInserted" : 1 }) replset:PRIMARY> db.runCommand({applyOps: [{op: "i", ns: 'test.collection', o: {_id: ObjectId()}}]}) { "applied" : 1, "results" : [ true ], "ok" : 1, "$clusterTime" : { "clusterTime" : Timestamp(1573061959, 1), "signature" : { "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="), "keyId" : NumberLong(0) } }, "operationTime" : Timestamp(1573061959, 1) } ... the Python shell prints an event for the first insert but not the second. milkie commented on Wed, 6 Nov 2019 15:34:21 +0000: Back in 2011 I added "fromMigrate" to oplog entries generated by the system for chunk moves. I'm not sure if we populate that field for any other reasons since. judah.schvimer commented on Wed, 6 Nov 2019 12:46:07 +0000: I did some digging, and I can't find where we would add fromMigrate. jesse, do you remember at all where you saw this fromMigrate issue or why we add that field? charlie.swanson commented on Tue, 5 Nov 2019 22:51:45 +0000: Ok I spoke with judah.schvimer and have some updated context here. judah.schvimer I realized we didn't speak about the 'fromMigrate' issue mentioned in the description: do you have any insight as to why this might happen and whether any tools are known to do this? judah.schvimer commented on Fri, 16 Aug 2019 14:57:31 +0000: This may also require changing atomic applyOps commands to log the work they actually do in the oplog rather than what the user sends exactly. For example, if the user sends an upsert and it does an insert we should log it as an insert in the applyOps oplog entry rather than having to say "alwaysUpsert: true". Transactions and non-atomic applyOps (AFAIK) log what actually occurs, not what the user specified exactly.