@ #MDBlocal How MongoDB 4.2 Pipeline is Powering Queries, Updates and Views Asya Kamsky asya999 AGGREGATION POWER++
PREVIOUSLY ... ... 2017
#MDBW17 Analytics with MongoDB Aggregation Framework @asya999 by Asya Kamsky, Lead MongoDB Maven PIPELINE POWER
STORE RETRIEVE
#MDBLocal ps ax | grep mongod | head 1 *nix command line pipe PIPELINE
#MDBLocal $match $group | $sort| Input stream {} {} {} {} Result {} {} ... PIPELINE MongoDB document pipeline
DATA PIPELINE STAGES
Stage 1 Stage 2 Stage 3 Stage 4 {} {} {} {} {} {} {} {} DATA PIPELINE {} {} {} {} {"$stage":{ ... }} START Collection View Special stage STAGES
{title: "The Great Gatsby", language: "English", subjects: "Long Island"} {title: "The Great Gatsby", language: "English", subjects: "New York"} {title: "The Great Gatsby", language: "English", subjects: "1920s"} {title: "The Great Gatsby", language: "English", subjects: [ "Long Island", "New York", "1920s"] }, {"$match":{"language":"English"}} $match { _id:"Long Island", count: 1 }, $group { _id: "New York", count: 2 }, $unwind { _id: "1920s", count: 1 }, $sort $skip$limit $project {"$unwind":"$subjects"} {"$group":{"_id":"$subjects", "count":{"$sum:1}} { _id: "Harlem", count: 1 }, { _id: "Long Island", count: 1 }, { _id: "New York", count: 2 }, { _id: "1920s", count: 1 }, {title: "Open City", language: "English", subjects: [ "New York" "Harlem" ] } { title: "The Great Gatsby", language: "English", subjects: [ "Long Island", "New York", "1920s"] }, { title: "War and Peace", language: "Russian", subjects: [ "Russia", "War of 1812", "Napoleon"] }, { title: "Open City", language: "English", subjects: [ "New York", "Harlem" ] }, {title: "Open City", language: "English", subjects: "New York"} {title: "Open City", language: "English", subjects: "Harlem"} { _id: "Harlem", count: 1 }, {"$sort:{"count":-1} {"$limit":3} {"$project":...}
$cursor
{title: "The Great Gatsby", language: "English". subjects: "Long Island"} {title: "The Great Gatsby", language: "English", subjects: "New York"} {title: "The Great Gatsby", language: "English", subjects: "1920s"} {title: "The Great Gatsby", language: "English", subjects: [ "Long Island", "New York", "1920s"] }, {"$match":{"language":"English"}} $match { _id:"Long Island", count: 1 }, $group { _id: "New York", count: 2 }, $unwind { _id: "1920s", count: 1 }, $sort $skip$limit $project {"$unwind":"$subjects"} {"$group":{"_id":"$subjects", "count":{"$sum:1}} { _id: "Harlem", count: 1 }, { _id: "Long Island", count: 1 }, { _id: "New York", count: 2 }, { _id: "1920s", count: 1 }, {title: "Open City", language: "English", subjects: [ "New York" "Harlem" ] } { title: "The Great Gatsby", language: "English", subjects: [ "Long Island", "New York", "1920s"] }, { title: "War and Peace", language: "Russian", subjects: [ "Russia", "War of 1812", "Napoleon"] }, { title: "Open City", language: "English", subjects: [ "New York", "Harlem" ] }, {title: "Open City", language: "English", subjects: "New York"} {title: "Open City", language: "English", subjects: "Harlem"} { _id: "Harlem", count: 1 }, {"$sort:{"count":-1} {"$limit":3} {"$project":...} $group $sort 1
#MDBLocal INPUT STAGE RESULTSSTAGE STREAMING RESOURCE USE Each document is streamed through in RAM
#MDBLocal INPUT STAGE RESULTSSTAGE BLOCKING RESOURCE USE Everything has to be kept in RAM (or spill)
$sort$match $group start=ISODate("...") end=ISODate("...") { user: "303900", ipaddr: "71.56.112.56", ts:ISODate("2017-05-08T...") } {$match:{ts:{$gte:start,$lt:end}}}, {$sort:{ts:1}}, {$group:{_id:"$user",ips:{$push:{ip:"$ipaddr", ts:"$ts"}}, diffIps:{$addToSet:"$ipaddr"}}}, {$match:{"diffIps.1":{$exists:true}}}, {$addFields:{diffs: {$filter:{ input:{$map:{ input: {$range:[0,{$subtract:[{$size:"$ips"},1]}]}, as:"i", in:{$let:{vars:{ip1:{$arrayElemAt:["$ips","$$i"]}, ip2:{$arrayElemAt:["$ips",{$add:["$$i",1]}]}}, in:{ diff:{$cond:{ if:{$ne:["$$ip1.ip","$$ip2.ip"]}, then:{$divide:[{$subtract:["$$ip2.ts","$$ip1.ts"]},60000]}, else: 9999 }}, ip1:"$$ip1.ip", t1:"$$ip1.ts", ip2:"$$ip2.ip", t2:"$$ip2.ts" }}}}}, cond:{$lt:["$$this.diff",10]} }}}}, {$match:{"diffs":{$ne:[]}}}, {$project:{_id:0, user:"$_id", suspectLogins:"$diffs"}} { "user" : "35237073", "suspectLogins" : [ {"diff": 4.8333333333, "ip1": "106.220.151.16", "t1":"2017-05-08T06:58", "ip2": "223.182.113.15" "t2":"2017-05-08T07:03" }, {"diff": 8.3, "ip1": "223.182.113.15", "t1":"2017-05-08T07:03", "ip2": "49.206.217.26", "t2":"2017-05-08T07:11" } ] } $match $addFields $match $project
https://github.com/asya999/mdbw17
5 minute review https://github.com/asya999/mdbw17
#MDBW17 Analytics with MongoDB Aggregation Framework @asya999 by Asya Kamsky, Lead MongoDB Maven PIPELINE POWER https://github.com/asya999/mdbw17
PREVIOUSLY ... ... 2017
PREVIOUSLY ... ... 2017 ...
PREVIOUSLY ... ... 2017 ... 2018
#MDBLocal THE FUTURE OF AGGREGATION Better performance & optimizations More stages & expressions More options for output Compass helper for aggregate Unify different languages
#MDBLocal THE FUTURE OF AGGREGATION Better performance & optimizations More stages & expressions More options for output Compass helper for aggregate Unify different languages
#MDBLocal THE FUTURE OF AGGREGATION Better performance & optimizations More stages & expressions More options for output Compass helper for aggregate Unify different languages
#MDBLocal THE FUTURE OF AGGREGATION More options for output Unify different languages
#MDBLocal THE PRESENT OF AGGREGATION More options for output Unify different languages
#MDBLocal Unify Different Languages
#MDBLocal Unify Different Languages {children: [ {name:"Max", dob:"1994-12-01", dep:true}, {name:"Sam", dob:"1997-09-28", dep:true}, {name:"Kim", dob:"2000-02-29", dep:true} ]} AGGREGATION
#MDBLocal Unify Different Languages {children: [ {name:"Max", dob:"1994-12-01", dep:true}, {name:"Sam", dob:"1997-09-28", dep:true}, {name:"Kim", dob:"2000-02-29", dep:true} ]} AGGREGATION db.c.aggregate([ {$addFields:{ numChildren:{$size:"$children"}, numDependents:{$size:{ $filter:{ input:"$children.dep", cond: "$$this" } }} }}, ... ])
#MDBLocal Unify Different Languages {children: [ {name:"Max", dob:"1994-12-01", dep:true}, {name:"Sam", dob:"1997-09-28", dep:true}, {name:"Kim", dob:"2000-02-29", dep:true} ]} AGGREGATION FIND db.c.aggregate([ {$addFields:{ numChildren:{$size:"$children"}, numDependents:{$size:{ $filter:{ input:"$children.dep", cond: "$$this" } }} }}, ... ])
#MDBLocal Unify Different Languages {children: [ {name:"Max", dob:"1994-12-01", dep:true}, {name:"Sam", dob:"1997-09-28", dep:true}, {name:"Kim", dob:"2000-02-29", dep:true} ]} AGGREGATION FIND db.c.find ( {$expr:{ $lt:[ {$size:{$filter:{ input: "$children.dep", cond: "$$this" }}}, 2 ] }} )
#MDBLocal Unify Different Languages {children: [ {name:"Max", dob:"1994-12-01", dep:true}, {name:"Sam", dob:"1997-09-28", dep:true}, {name:"Kim", dob:"2000-02-29", dep:true} ]} AGGREGATION FIND UPDATE db.c.find ( {$expr:{ $lt:[ {$size:{$filter:{ input: "$children.dep", cond: "$$this" }}}, 2 ] }} )
#MDBLocal Unify Different Languages {children: [ {name:"Max", dob:"1994-12-01", dep:true}, {name:"Sam", dob:"1997-09-28", dep:true}, {name:"Kim", dob:"2000-02-29", dep:true} ]} AGGREGATION FIND UPDATE db.c.update( {$expr:{ $anyElementTrue:{$map:{ input:"$children", in: {$and:[ {$lt:["$$this.dob","1997-01-22"]}, "$$this.dep" ]} }} }}, {$set:{ audit:true }} )
#MDBLocal Update db.coll.update( <query>, <update>, <options> )
#MDBLocal Update db.coll.update( <query>, <update>, <options> )
#MDBLocal Update db.coll.update( <query>, <update>, <options> ) <update>
#MDBLocal Update { f1: <value>, f2: <value>, ... } { $set: { }, $inc: { }, $... } <update>
#MDBLocal Update in 4.2 { } OR [ ] <update>
#MDBLocal Update in 4.2 { <same> } [ ] <update>
#MDBLocal Update in 4.2 { <same> } [ <aggregation-pipeline> ] <update>
Updates Using Aggregation Pipeline
#MDBLocal { $addFields: { } } { $project: { } } { $replaceRoot: { } } { $set: { } } { $unset: [ ] } { $replaceWith: { } }
#MDBLocal db.coll.update({_id:1}, {$inc:{a:1}}, {upsert:true}) { _id: 1 } { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } { _id: 1, a: 1 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id: 1, a: 1 } "errmsg" : "Cannot apply to a value of non-numeric type."
#MDBLocal { _id: 1 } { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } { _id: 1, a: 1 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id: 1, a: 1 } { _id: 1, a: 1 } db.coll.update({_id:1}, [ {$set:{a:{$sum:["$a",1]}}} ], {upsert:true})
#MDBLocal { _id: 1 } { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } { _id: 1, a: 1 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id: 1, a: 1 } "errmsg" : "$add only supports numeric or date types, not string" db.coll.update({_id:1}, [ {$set:{a:{$add:["$a",1]}}} ], {upsert:true})
#MDBLocal { _id: 1 } { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id:1, a: 21 } db.coll.update({_id:1}, [ {$set:{a:{$ }} ], {upsert:true})
#MDBLocal { _id: 1 } { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id:1, a: 21 } db.coll.update({_id:1}, [ {$set:{a:{$cond:{ if: , then: , else: }}}}], {upsert:true})
#MDBLocal { _id: 1 } { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id:1, a: 21 } db.coll.update({_id:1}, [ {$set:{a:{$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: , else: }}}}], {upsert:true})
#MDBLocal { _id: 1 } { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id:1, a: 21 } db.coll.update({_id:1}, [ {$set:{a:{$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: 21, else: {$sum:["$a",1]} }}}}], {upsert:true})
#MDBLocal { _id: 1 } { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 100 } { _id:1, a: 21 } db.coll.update({_id:1}, [ {$set:{a:{$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: 21, else: {$sum:["$a",1]} }}}}], {upsert:true})
#MDBLocal { _id: 1 } { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 100 } { _id:1, a: 21 } db.coll.update({_id:1}, [ {$set:{a:{$min:[100, {$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: 21, else: {$sum:["$a",1]} }}]} }}], {upsert:true})
#MDBLocal { _id: 1 } { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 100 } { _id:1, a: 21 } { _id:1, a: 1 } db.coll.update({_id:1}, [ {$set:{a:{$min:[100, {$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: 21, else: {$sum:["$a",1]} }}]} }}], {upsert:true})
#MDBLocal { _id: 1 } { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 100 } { _id:1, a: 21 } { _id:1, a: 1 } db.coll.update({_id:1}, [ {$set:{a:{$min:[100, {$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: 21, else: {$sum:["$a",1]} }}]}, prev_a:"$a" }}], {upsert:true})
#MDBLocal { _id: 1 } { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } { _id:1, a: 21 } { _id: 1, a: 11, prev_a: 10 } { _id: 1, a: 100, prev_a: 100 } { _id:1, a: 21 } { _id:1, a: 1, prev_a: "10" } db.coll.update({_id:1}, [ {$set:{a:{$min:[100, {$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: 21, else: {$sum:["$a",1]} }}]}, prev_a:"$a" }}], {upsert:true})
#MDBLocal Set Defaults
#MDBLocal Set Defaults {_id: 1, a: 5, b: 12} {_id: 2, a: 15, c: "abc"} {_id: 3, b: 99, c: "xyz"} If a or b are missing, set to 0, if c is missing -> "unset"
#MDBLocal Set Defaults {_id: 1, a: 5, b: 12} {_id: 2, a: 15, c: "abc"} {_id: 3, b: 99, c: "xyz"} If a or b are missing, set to 0, if c is missing -> "unset" db.coll.update({}, [ {$replaceWith:{ }} ], {multi:true})
#MDBLocal Set Defaults {_id: 1, a: 5, b: 12} {_id: 2, a: 15, c: "abc"} {_id: 3, b: 99, c: "xyz"} If a or b are missing, set to 0, if c is missing -> "unset" db.coll.update({}, [ {$replaceWith:{$mergeObjects:[ ]}} ], {multi:true})
#MDBLocal Set Defaults {_id: 1, a: 5, b: 12} {_id: 2, a: 15, c: "abc"} {_id: 3, b: 99, c: "xyz"} If a or b are missing, set to 0, if c is missing -> "unset" db.coll.update({}, [ {$replaceWith:{$mergeObjects:[ { a:0, b:0, c:"unset" }, "$$ROOT" ]}} ], {multi:true})
#MDBLocal Set Defaults {_id: 1, a: 5, b: 12} {_id: 2, a: 15, c: "abc"} {_id: 3, b: 99, c: "xyz"} If a or b are missing, set to 0, if c is missing -> "unset" db.coll.update({}, [ {$replaceWith:{$mergeObjects:[ { a:0, b:0, c:"unset" }, "$$ROOT" ]}} ], {multi:true}) {_id: 1, a: 5, b: 12, c: "unset"} {_id: 2, a: 15, b: 0, c: "abc"} {_id: 3, a: 0, b: 99, c: "xyz"}
#MDBLocal { id: 1, d: ISODate("2019-06-04T00:00:00"), h: [ { hour:"11", value: 296 }, { hour:"12", value: 300 } ]} id: X, d:Y, hour:Z, value: VAL db.coll.update({id:X, d:Y}, [ {$set:{h:{$cond:{ if: then: else: }}}}], {upsert:true})
#MDBLocal { id: 1, d: ISODate("2019-06-04T00:00:00"), h: [ { hour:"11", value: 296 }, { hour:"12", value: 300 } ]} id: X, d:Y, hour:Z, value: VAL db.coll.update({id:X, d:Y}, [ {$set:{h:{$cond:{ if: {$in:[Z,{$ifNull:["$h.hour",[]]}]}, then:{$map:{ input:"$h", in: {$cond:{ if:{$ne:["$$this.hour",Z]}, then:"$$this", else: {hour: Z, value: {$sum:[ "$$this.value", VAL]}} }}}}, else:{$concatArrays:[{$ifNull:["$h",[]]},[{hour:Z,value:VAL}]]} }}}}], {upsert:true}) if: then: else:
#MDBLocal Recap: Updates can be specified with aggregation pipeline All fields from existing document can be accessed Slightly slower, but a lot more powerful
#MDBLocal THE FUTURE OF AGGREGATION Better performance & optimizations More stages & expressions More options for output Compass helper for aggregate Unify different languages
#MDBLocal THE FUTURE OF AGGREGATION Better performance & optimizations More stages & expressions More options for output Compass helper for aggregate Unify different languages
#MDBLocal THE FUTURE OF AGGREGATION More options for output
#MDBLocal More Options for Output
#MDBLocal Prior to MongoDB 4.2 $out coll new_coll $out
#MDBLocal Prior to MongoDB 4.2 $out coll new_coll $out db.coll.aggregate( [ {pipeline}, ... {$out: "new_coll"} ]);
#MDBLocal Prior to MongoDB 4.2 $out coll new_coll $out db.coll.aggregate( [ {pipeline}, ... {$out: "new_coll"} ]); new_coll ○ must be unsharded ○ overwrites existing
New $merge stage in aggregation pipeline
#MDBLocal MongoDB 4.2 $merge coll coll2 $merge
#MDBLocal MongoDB 4.2 $merge db.coll.aggregate( [ {pipeline}, ..., {$merge: { ... } ]); coll coll2 $merge
#MDBLocal MongoDB 4.2 $merge db.coll.aggregate( [ {pipeline}, ..., {$merge: { ... } ]); coll2 can exist same or different 'db' can be sharded coll coll2 $merge
#MDBLocal coll coll2 $merge { } { } { } { } { } { } { } { } MongoDB 4.2
#MDBLocal { $merge: { into: <target> } } $merge syntax
#MDBLocal {$merge: "collection2"} $merge syntax { $merge: { into: <target> } }
#MDBLocal {$merge: {into: {db: "db2", coll: "collection2"}} $merge syntax { $merge: { into: <target> } }
#MDBLocal { $merge: { into: <target> } } $merge syntax
#MDBLocal { $merge: { into: <target>, on: <fields> } } on: "_id" on: [ "_id", "shardkey(s)" ] must be unique $merge syntax
#MDBLocal { $merge: { into: <target>, on: <fields> } } $merge syntax
#MDBLocal Actions source target
#MDBLocal Actions nothing matched: source target
#MDBLocal Actions nothing matched: usually insert source target
#MDBLocal Actions nothing matched: usually insert document matched: source target
#MDBLocal Actions nothing matched: usually insert document matched: overwrite? update? ??? source target
#MDBLocal Actions nothing matched: usually insert document matched: update source target
#MDBLocal Actions nothing matched: usually insert document matched: update (merge) source target
#MDBLocal $merge syntax { $merge: { into: <target>, whenNotMatched: whenMatched: } }
#MDBLocal $merge syntax { $merge: { into: <target>, whenNotMatched:"insert", whenMatched: } }
#MDBLocal $merge syntax { $merge: { into: <target>, whenNotMatched:"insert", whenMatched:"merge" } }
#MDBLocal $merge syntax { $merge: { into: <target>, whenNotMatched:"insert"|"discard"|"fail", whenMatched:"merge" } }
#MDBLocal $merge syntax { $merge: { into: <target>, whenNotMatched:"insert"|"discard"|"fail", whenMatched:"merge"|"replace"|"keepExisting"|"fail"|[...] } }
#MDBLocal $merge syntax { $merge: { into: <target>, whenMatched:[...] } }
#MDBLocal $merge syntax { $merge: { into: <target>, whenMatched:[<custom pipeline>] } }
#MDBLocal $merge example { $merge: { into: <target>, whenMatched:[ {$addFields:{ }} ] } }
#MDBLocal $merge example { $merge: { into: <target>, whenMatched:[ {$addFields:{ total:{$sum:["$total","$$new.total"]} }} ] } }
#MDBLocal $merge example { $merge: { into: <target>, whenMatched:[ {$set:{ total:{$sum:["$total","$$new.total"]} }} ] } }
#MDBLocal $merge example { $merge: { into: <target>, whenMatched:[ {$set:{ total:{$sum:["$total","$$new.total"]} }} ] } }
#MDBLocal $merge example { $merge: { into: <target>, whenMatched:[ {$set:{ total:{$sum:["$total","$$new.total"]} }} ] } } Incoming Target { _id: "37", total: 64, f1: "x" } { _id: "37", total: 245, f1: "yyy" } Result: { }
#MDBLocal $merge example { $merge: { into: <target>, whenMatched:[ {$set:{ total:{$sum:["$total","$$new.total"]} }} ] } } Incoming Target { _id: "37", total: 64, f1: "x" } { _id: "37", total: 245, f1: "yyy" } Result: { _id: "37", total: 309, f1: "yyy" }
#MDBLocal $merge example 2 { $merge: { into: <target>, whenMatched:[ {$replaceWith:{$mergeObjects:[ "$$new", {total:{$sum:["$$new.total", "$total"]}} ]}} ] } }
#MDBLocal $merge example 2 { $merge: { into: <target>, whenMatched:[ {$replaceWith:{$mergeObjects:[ "$$new", {total:{$sum:["$$new.total", "$total"]}} ]}} ] } } Incoming Target { _id: "37", total: 64, f1: "x" } { _id: "37", total: 245, f1: "yyy" } Result: { }
#MDBLocal $merge example 2 { $merge: { into: <target>, whenMatched:[ {$replaceWith:{$mergeObjects:[ "$$new", {total:{$sum:["$$new.total", "$total"]}} ]}} ] } } Incoming Target { _id: "37", total: 64, f1: "x" } { _id: "37", total: 245, f1: "yyy" } Result: { _id: "37", total: 309, f1: "x" }
#MDBLocal $merge syntax { $merge: { into: <target>, whenMatched:[...] } }
#MDBLocal $merge syntax { $merge: { into: <target>, let: { ... }, whenMatched:[ ...] } }
#MDBLocal $merge syntax { $merge: { into: <target>, let: {new: "$$ROOT"}, whenMatched:[ ...] } }
#MDBLocal { $merge: { into: <target>, whenMatched:[ {$set:{ total:{$sum:["$total","$$new.total"]} }} ] } }
#MDBLocal { $merge: { into: <target>, let: {itotal: "$total"}, whenMatched:[ {$set:{ total:{$sum:["$total","$$itotal"]} }} ] } } { $merge: { into: <target>, whenMatched:[ {$set:{ total:{$sum:["$total","$$new.total"]} }} ] } }
EXAMPLES APPEND from TEMP collection
#MDBLocal temp real data real Using $merge to append loaded and cleansed records loaded into db
#MDBLocal aggregate 'temp' and append valid records to 'data' db.temp.aggregate( [ { ... } /* pipeline to massage and cleanse data in temp */, {$merge:{ into: "data", whenMatched: "fail" }} ]);
#MDBLocal aggregate 'temp' and append valid records to 'data' db.temp.aggregate( [ { ... } /* pipeline to massage and cleanse data in temp */, {$merge:{ into: "data", whenMatched: "fail" }} ]); Similar to SQL's INSERT INTO T1 SELECT * from T2
EXAMPLES Maintain Single View
#MDBLocal mflix users users mfriendbook users sv Using $merge to populate/update user fields from other services
#MDBLocal mflix users users mfriendbook users sv Using $merge to populate/update user fields from other services sv.users { _id: "user253", dob: ISODate(...), f1: "yyy" }
#MDBLocal $merge updates fields from mflix.users collection into sv.users collection. Our "_id" field is unique username mflix_pipeline = [ { "$project" : { "_id" : "$username", "mflix" : "$$ROOT" }}, { "$merge" : { "into" : { "db": "sv", "collection" : "users" }, "whenNotMatched" : "discard" }} ] (in mflix) sv.users { _id: "user253", dob: ISODate(...), f1: "yyy" }
#MDBLocal $merge updates fields from mflix.users collection into sv.users collection. Our "_id" field is unique username mflix_pipeline = [ { "$project" : { "_id" : "$username", "mflix" : "$$ROOT" }}, { "$merge" : { "into" : { "db": "sv", "collection" : "users" }, "whenNotMatched" : "discard" }} ] (in mflix) db.users.aggregate(mflix_pipeline) sv.users { _id: "user253", dob: ISODate(...), f1: "yyy", mflix: { ... } }
#MDBLocal $merge updates fields from mfriendbook.users collection into sv.users collection. Our "_id" field is unique username mfriendbook_pipeline = [ { "$project" : { "_id" : "$username", "mfriendbook" : "$$ROOT" }}, { "$merge" : { "into" : { "db": "sv", "collection" : "users" }, "whenNotMatched" : "discard" }} ] (in mfriendbook) sv.users { _id: "user253", dob: ISODate(...), f1: "yyy", mflix: { ... } }
#MDBLocal $merge updates fields from mfriendbook.users collection into sv.users collection. Our "_id" field is unique username mfriendbook_pipeline = [ { "$project" : { "_id" : "$username", "mfriendbook" : "$$ROOT" }}, { "$merge" : { "into" : { "db": "sv", "collection" : "users" }, "whenNotMatched" : "discard" }} ] (in mfriendbook) db.users.aggregate(mfriendbook_pipeline) sv.users { _id: "user253", dob: ISODate(...), f1: "yyy", mflix: { ... }, mfriendbook: { ... } }
EXAMPLES Populate ROLLUPS into summary table
registrations real regsummary real Using $merge to incrementally update periodic rollups in summary
#MDBLocal $merge to create/update periodic rollups in summary collection (for all days) db.regsummary.createIndex({event:1, date:1}, {unique: true});
#MDBLocal $merge to create/update periodic rollups in summary collection (for all days) db.regsummary.createIndex({event:1, date:1}, {unique: true}); db.registrations.aggregate([ {$match: {event_id: "MDBW19"}}, {$group:{ _id:{$dateToString:{date:"$date",format:"%Y-%m-%d"}}, count: {$sum:1} }}, {$project: {_id:0,event:"MDBW19",date:"$_id",total:"$count"}}, {$merge: { into: "regsummary", on: ["event", "date"] }} ])
#MDBLocal $merge to create/update periodic rollups in summary collection (for all days) db.regsummary.createIndex({event:1, date:1}, {unique: true}); db.registrations.aggregate([ {$match: {event_id: "MDBW19"}}, {$group:{ _id:{$dateToString:{date:"$date",format:"%Y-%m-%d"}}, count: {$sum:1} }}, {$project: {_id:0,event:"MDBW19",date:"$_id",total:"$count"}}, {$merge: { into: "regsummary", on: ["event", "date"] }} ]) { "event" : "MDBW19", "date" : "2019-05-19", "total" : 33 } { "event" : "MDBW19", "date" : "2019-05-20", "total" : 15 } { "event" : "MDBW19", "date" : "2019-05-21", "total" : 24 }
#MDBLocal $merge to incrementally update periodic rollups in summary collection (for single day)
#MDBLocal $merge to incrementally update periodic rollups in summary collection (for single day) db.registrations.aggregate([ {$match: { event_id: "MDBW19", date:{$gte:ISODate("2019-05-22"),$lt:ISODate("2019-05-23")} }}, {$count: "total"}, {$addFields: {event:"MDBW19", "date":"2019-05-22"}}, {$merge: { into: "regsummary", on: ["event", "date"] }} ])
#MDBLocal $merge to incrementally update periodic rollups in summary collection (for single day) db.registrations.aggregate([ {$match: { event_id: "MDBW19", date:{$gte:ISODate("2019-05-22"),$lt:ISODate("2019-05-23")} }}, {$count: "total"}, {$addFields: {event:"MDBW19", "date":"2019-05-22"}}, {$merge: { into: "regsummary", on: ["event", "date"] }} ]) { "event" : "MDBW19", "date" : "2019-05-19", "total" : 33 } { "event" : "MDBW19", "date" : "2019-05-20", "total" : 15 } { "event" : "MDBW19", "date" : "2019-05-21", "total" : 24 } { "event" : "MDBW19", "date" : "2019-05-22", "total" : 34 }
#MDBlocal Shard-N: Aggregation Pipeline Power++: How MongoDB 4.2 Pipeline Empowers Queries, Updates, and Materialized Views Asya Kamsky https://www.surveymonkey.com/r/KTG5CKP
https://github.com/asya999/mdbw17
MongoDB .local Toronto 2019: Aggregation Pipeline Power++: How MongoDB 4.2 Pipeline Empowers Queries, Updates, and Materialized Views
MongoDB .local Toronto 2019: Aggregation Pipeline Power++: How MongoDB 4.2 Pipeline Empowers Queries, Updates, and Materialized Views

MongoDB .local Toronto 2019: Aggregation Pipeline Power++: How MongoDB 4.2 Pipeline Empowers Queries, Updates, and Materialized Views

  • 1.
    @ #MDBlocal How MongoDB 4.2Pipeline is Powering Queries, Updates and Views Asya Kamsky asya999 AGGREGATION POWER++
  • 2.
  • 3.
    #MDBW17 Analytics with MongoDBAggregation Framework @asya999 by Asya Kamsky, Lead MongoDB Maven PIPELINE POWER
  • 4.
  • 5.
    #MDBLocal ps ax |grep mongod | head 1 *nix command line pipe PIPELINE
  • 6.
    #MDBLocal $match $group |$sort| Input stream {} {} {} {} Result {} {} ... PIPELINE MongoDB document pipeline
  • 7.
  • 8.
    Stage 1 Stage2 Stage 3 Stage 4 {} {} {} {} {} {} {} {} DATA PIPELINE {} {} {} {} {"$stage":{ ... }} START Collection View Special stage STAGES
  • 9.
    {title: "The GreatGatsby", language: "English", subjects: "Long Island"} {title: "The Great Gatsby", language: "English", subjects: "New York"} {title: "The Great Gatsby", language: "English", subjects: "1920s"} {title: "The Great Gatsby", language: "English", subjects: [ "Long Island", "New York", "1920s"] }, {"$match":{"language":"English"}} $match { _id:"Long Island", count: 1 }, $group { _id: "New York", count: 2 }, $unwind { _id: "1920s", count: 1 }, $sort $skip$limit $project {"$unwind":"$subjects"} {"$group":{"_id":"$subjects", "count":{"$sum:1}} { _id: "Harlem", count: 1 }, { _id: "Long Island", count: 1 }, { _id: "New York", count: 2 }, { _id: "1920s", count: 1 }, {title: "Open City", language: "English", subjects: [ "New York" "Harlem" ] } { title: "The Great Gatsby", language: "English", subjects: [ "Long Island", "New York", "1920s"] }, { title: "War and Peace", language: "Russian", subjects: [ "Russia", "War of 1812", "Napoleon"] }, { title: "Open City", language: "English", subjects: [ "New York", "Harlem" ] }, {title: "Open City", language: "English", subjects: "New York"} {title: "Open City", language: "English", subjects: "Harlem"} { _id: "Harlem", count: 1 }, {"$sort:{"count":-1} {"$limit":3} {"$project":...}
  • 11.
  • 12.
    {title: "The GreatGatsby", language: "English". subjects: "Long Island"} {title: "The Great Gatsby", language: "English", subjects: "New York"} {title: "The Great Gatsby", language: "English", subjects: "1920s"} {title: "The Great Gatsby", language: "English", subjects: [ "Long Island", "New York", "1920s"] }, {"$match":{"language":"English"}} $match { _id:"Long Island", count: 1 }, $group { _id: "New York", count: 2 }, $unwind { _id: "1920s", count: 1 }, $sort $skip$limit $project {"$unwind":"$subjects"} {"$group":{"_id":"$subjects", "count":{"$sum:1}} { _id: "Harlem", count: 1 }, { _id: "Long Island", count: 1 }, { _id: "New York", count: 2 }, { _id: "1920s", count: 1 }, {title: "Open City", language: "English", subjects: [ "New York" "Harlem" ] } { title: "The Great Gatsby", language: "English", subjects: [ "Long Island", "New York", "1920s"] }, { title: "War and Peace", language: "Russian", subjects: [ "Russia", "War of 1812", "Napoleon"] }, { title: "Open City", language: "English", subjects: [ "New York", "Harlem" ] }, {title: "Open City", language: "English", subjects: "New York"} {title: "Open City", language: "English", subjects: "Harlem"} { _id: "Harlem", count: 1 }, {"$sort:{"count":-1} {"$limit":3} {"$project":...} $group $sort 1
  • 13.
    #MDBLocal INPUT STAGE RESULTSSTAGE STREAMINGRESOURCE USE Each document is streamed through in RAM
  • 14.
    #MDBLocal INPUT STAGE RESULTSSTAGE BLOCKINGRESOURCE USE Everything has to be kept in RAM (or spill)
  • 20.
    $sort$match $group start=ISODate("...") end=ISODate("...") { user: "303900", ipaddr:"71.56.112.56", ts:ISODate("2017-05-08T...") } {$match:{ts:{$gte:start,$lt:end}}}, {$sort:{ts:1}}, {$group:{_id:"$user",ips:{$push:{ip:"$ipaddr", ts:"$ts"}}, diffIps:{$addToSet:"$ipaddr"}}}, {$match:{"diffIps.1":{$exists:true}}}, {$addFields:{diffs: {$filter:{ input:{$map:{ input: {$range:[0,{$subtract:[{$size:"$ips"},1]}]}, as:"i", in:{$let:{vars:{ip1:{$arrayElemAt:["$ips","$$i"]}, ip2:{$arrayElemAt:["$ips",{$add:["$$i",1]}]}}, in:{ diff:{$cond:{ if:{$ne:["$$ip1.ip","$$ip2.ip"]}, then:{$divide:[{$subtract:["$$ip2.ts","$$ip1.ts"]},60000]}, else: 9999 }}, ip1:"$$ip1.ip", t1:"$$ip1.ts", ip2:"$$ip2.ip", t2:"$$ip2.ts" }}}}}, cond:{$lt:["$$this.diff",10]} }}}}, {$match:{"diffs":{$ne:[]}}}, {$project:{_id:0, user:"$_id", suspectLogins:"$diffs"}} { "user" : "35237073", "suspectLogins" : [ {"diff": 4.8333333333, "ip1": "106.220.151.16", "t1":"2017-05-08T06:58", "ip2": "223.182.113.15" "t2":"2017-05-08T07:03" }, {"diff": 8.3, "ip1": "223.182.113.15", "t1":"2017-05-08T07:03", "ip2": "49.206.217.26", "t2":"2017-05-08T07:11" } ] } $match $addFields $match $project
  • 21.
  • 22.
  • 23.
    #MDBW17 Analytics with MongoDBAggregation Framework @asya999 by Asya Kamsky, Lead MongoDB Maven PIPELINE POWER https://github.com/asya999/mdbw17
  • 24.
  • 25.
  • 26.
  • 27.
    #MDBLocal THE FUTURE OFAGGREGATION Better performance & optimizations More stages & expressions More options for output Compass helper for aggregate Unify different languages
  • 28.
    #MDBLocal THE FUTURE OFAGGREGATION Better performance & optimizations More stages & expressions More options for output Compass helper for aggregate Unify different languages
  • 29.
    #MDBLocal THE FUTURE OFAGGREGATION Better performance & optimizations More stages & expressions More options for output Compass helper for aggregate Unify different languages
  • 30.
    #MDBLocal THE FUTURE OFAGGREGATION More options for output Unify different languages
  • 31.
    #MDBLocal THE PRESENT OFAGGREGATION More options for output Unify different languages
  • 32.
  • 33.
    #MDBLocal Unify Different Languages {children:[ {name:"Max", dob:"1994-12-01", dep:true}, {name:"Sam", dob:"1997-09-28", dep:true}, {name:"Kim", dob:"2000-02-29", dep:true} ]} AGGREGATION
  • 34.
    #MDBLocal Unify Different Languages {children:[ {name:"Max", dob:"1994-12-01", dep:true}, {name:"Sam", dob:"1997-09-28", dep:true}, {name:"Kim", dob:"2000-02-29", dep:true} ]} AGGREGATION db.c.aggregate([ {$addFields:{ numChildren:{$size:"$children"}, numDependents:{$size:{ $filter:{ input:"$children.dep", cond: "$$this" } }} }}, ... ])
  • 35.
    #MDBLocal Unify Different Languages {children:[ {name:"Max", dob:"1994-12-01", dep:true}, {name:"Sam", dob:"1997-09-28", dep:true}, {name:"Kim", dob:"2000-02-29", dep:true} ]} AGGREGATION FIND db.c.aggregate([ {$addFields:{ numChildren:{$size:"$children"}, numDependents:{$size:{ $filter:{ input:"$children.dep", cond: "$$this" } }} }}, ... ])
  • 36.
    #MDBLocal Unify Different Languages {children:[ {name:"Max", dob:"1994-12-01", dep:true}, {name:"Sam", dob:"1997-09-28", dep:true}, {name:"Kim", dob:"2000-02-29", dep:true} ]} AGGREGATION FIND db.c.find ( {$expr:{ $lt:[ {$size:{$filter:{ input: "$children.dep", cond: "$$this" }}}, 2 ] }} )
  • 37.
    #MDBLocal Unify Different Languages {children:[ {name:"Max", dob:"1994-12-01", dep:true}, {name:"Sam", dob:"1997-09-28", dep:true}, {name:"Kim", dob:"2000-02-29", dep:true} ]} AGGREGATION FIND UPDATE db.c.find ( {$expr:{ $lt:[ {$size:{$filter:{ input: "$children.dep", cond: "$$this" }}}, 2 ] }} )
  • 38.
    #MDBLocal Unify Different Languages {children:[ {name:"Max", dob:"1994-12-01", dep:true}, {name:"Sam", dob:"1997-09-28", dep:true}, {name:"Kim", dob:"2000-02-29", dep:true} ]} AGGREGATION FIND UPDATE db.c.update( {$expr:{ $anyElementTrue:{$map:{ input:"$children", in: {$and:[ {$lt:["$$this.dob","1997-01-22"]}, "$$this.dep" ]} }} }}, {$set:{ audit:true }} )
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
    #MDBLocal Update in 4.2 {} OR [ ] <update>
  • 44.
    #MDBLocal Update in 4.2 {<same> } [ ] <update>
  • 45.
    #MDBLocal Update in 4.2 {<same> } [ <aggregation-pipeline> ] <update>
  • 46.
  • 47.
    #MDBLocal { $addFields: {} } { $project: { } } { $replaceRoot: { } } { $set: { } } { $unset: [ ] } { $replaceWith: { } }
  • 48.
    #MDBLocal db.coll.update({_id:1}, {$inc:{a:1}}, {upsert:true}) { _id: 1} { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } { _id: 1, a: 1 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id: 1, a: 1 } "errmsg" : "Cannot apply to a value of non-numeric type."
  • 49.
    #MDBLocal { _id: 1} { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } { _id: 1, a: 1 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id: 1, a: 1 } { _id: 1, a: 1 } db.coll.update({_id:1}, [ {$set:{a:{$sum:["$a",1]}}} ], {upsert:true})
  • 50.
    #MDBLocal { _id: 1} { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } { _id: 1, a: 1 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id: 1, a: 1 } "errmsg" : "$add only supports numeric or date types, not string" db.coll.update({_id:1}, [ {$set:{a:{$add:["$a",1]}}} ], {upsert:true})
  • 51.
    #MDBLocal { _id: 1} { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id:1, a: 21 } db.coll.update({_id:1}, [ {$set:{a:{$ }} ], {upsert:true})
  • 52.
    #MDBLocal { _id: 1} { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id:1, a: 21 } db.coll.update({_id:1}, [ {$set:{a:{$cond:{ if: , then: , else: }}}}], {upsert:true})
  • 53.
    #MDBLocal { _id: 1} { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id:1, a: 21 } db.coll.update({_id:1}, [ {$set:{a:{$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: , else: }}}}], {upsert:true})
  • 54.
    #MDBLocal { _id: 1} { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 101 } { _id:1, a: 21 } db.coll.update({_id:1}, [ {$set:{a:{$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: 21, else: {$sum:["$a",1]} }}}}], {upsert:true})
  • 55.
    #MDBLocal { _id: 1} { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 100 } { _id:1, a: 21 } db.coll.update({_id:1}, [ {$set:{a:{$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: 21, else: {$sum:["$a",1]} }}}}], {upsert:true})
  • 56.
    #MDBLocal { _id: 1} { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 100 } { _id:1, a: 21 } db.coll.update({_id:1}, [ {$set:{a:{$min:[100, {$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: 21, else: {$sum:["$a",1]} }}]} }}], {upsert:true})
  • 57.
    #MDBLocal { _id: 1} { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 100 } { _id:1, a: 21 } { _id:1, a: 1 } db.coll.update({_id:1}, [ {$set:{a:{$min:[100, {$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: 21, else: {$sum:["$a",1]} }}]} }}], {upsert:true})
  • 58.
    #MDBLocal { _id: 1} { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } { _id:1, a: 21 } { _id: 1, a: 11 } { _id: 1, a: 100 } { _id:1, a: 21 } { _id:1, a: 1 } db.coll.update({_id:1}, [ {$set:{a:{$min:[100, {$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: 21, else: {$sum:["$a",1]} }}]}, prev_a:"$a" }}], {upsert:true})
  • 59.
    #MDBLocal { _id: 1} { _id: 1, a: 10 } { _id: 1, a: 100 } --- { _id: 1, a: "10" } { _id:1, a: 21 } { _id: 1, a: 11, prev_a: 10 } { _id: 1, a: 100, prev_a: 100 } { _id:1, a: 21 } { _id:1, a: 1, prev_a: "10" } db.coll.update({_id:1}, [ {$set:{a:{$min:[100, {$cond:{ if: {$eq:[{$type:"$a"},"missing"]}, then: 21, else: {$sum:["$a",1]} }}]}, prev_a:"$a" }}], {upsert:true})
  • 60.
  • 61.
    #MDBLocal Set Defaults {_id: 1,a: 5, b: 12} {_id: 2, a: 15, c: "abc"} {_id: 3, b: 99, c: "xyz"} If a or b are missing, set to 0, if c is missing -> "unset"
  • 62.
    #MDBLocal Set Defaults {_id: 1,a: 5, b: 12} {_id: 2, a: 15, c: "abc"} {_id: 3, b: 99, c: "xyz"} If a or b are missing, set to 0, if c is missing -> "unset" db.coll.update({}, [ {$replaceWith:{ }} ], {multi:true})
  • 63.
    #MDBLocal Set Defaults {_id: 1,a: 5, b: 12} {_id: 2, a: 15, c: "abc"} {_id: 3, b: 99, c: "xyz"} If a or b are missing, set to 0, if c is missing -> "unset" db.coll.update({}, [ {$replaceWith:{$mergeObjects:[ ]}} ], {multi:true})
  • 64.
    #MDBLocal Set Defaults {_id: 1,a: 5, b: 12} {_id: 2, a: 15, c: "abc"} {_id: 3, b: 99, c: "xyz"} If a or b are missing, set to 0, if c is missing -> "unset" db.coll.update({}, [ {$replaceWith:{$mergeObjects:[ { a:0, b:0, c:"unset" }, "$$ROOT" ]}} ], {multi:true})
  • 65.
    #MDBLocal Set Defaults {_id: 1,a: 5, b: 12} {_id: 2, a: 15, c: "abc"} {_id: 3, b: 99, c: "xyz"} If a or b are missing, set to 0, if c is missing -> "unset" db.coll.update({}, [ {$replaceWith:{$mergeObjects:[ { a:0, b:0, c:"unset" }, "$$ROOT" ]}} ], {multi:true}) {_id: 1, a: 5, b: 12, c: "unset"} {_id: 2, a: 15, b: 0, c: "abc"} {_id: 3, a: 0, b: 99, c: "xyz"}
  • 66.
    #MDBLocal { id: 1, d:ISODate("2019-06-04T00:00:00"), h: [ { hour:"11", value: 296 }, { hour:"12", value: 300 } ]} id: X, d:Y, hour:Z, value: VAL db.coll.update({id:X, d:Y}, [ {$set:{h:{$cond:{ if: then: else: }}}}], {upsert:true})
  • 67.
    #MDBLocal { id: 1, d:ISODate("2019-06-04T00:00:00"), h: [ { hour:"11", value: 296 }, { hour:"12", value: 300 } ]} id: X, d:Y, hour:Z, value: VAL db.coll.update({id:X, d:Y}, [ {$set:{h:{$cond:{ if: {$in:[Z,{$ifNull:["$h.hour",[]]}]}, then:{$map:{ input:"$h", in: {$cond:{ if:{$ne:["$$this.hour",Z]}, then:"$$this", else: {hour: Z, value: {$sum:[ "$$this.value", VAL]}} }}}}, else:{$concatArrays:[{$ifNull:["$h",[]]},[{hour:Z,value:VAL}]]} }}}}], {upsert:true}) if: then: else:
  • 68.
    #MDBLocal Recap: Updates can bespecified with aggregation pipeline All fields from existing document can be accessed Slightly slower, but a lot more powerful
  • 69.
    #MDBLocal THE FUTURE OFAGGREGATION Better performance & optimizations More stages & expressions More options for output Compass helper for aggregate Unify different languages
  • 70.
    #MDBLocal THE FUTURE OFAGGREGATION Better performance & optimizations More stages & expressions More options for output Compass helper for aggregate Unify different languages
  • 71.
    #MDBLocal THE FUTURE OFAGGREGATION More options for output
  • 72.
  • 73.
    #MDBLocal Prior to MongoDB4.2 $out coll new_coll $out
  • 74.
    #MDBLocal Prior to MongoDB4.2 $out coll new_coll $out db.coll.aggregate( [ {pipeline}, ... {$out: "new_coll"} ]);
  • 75.
    #MDBLocal Prior to MongoDB4.2 $out coll new_coll $out db.coll.aggregate( [ {pipeline}, ... {$out: "new_coll"} ]); new_coll ○ must be unsharded ○ overwrites existing
  • 76.
    New $merge stage inaggregation pipeline
  • 77.
  • 78.
    #MDBLocal MongoDB 4.2 $merge db.coll.aggregate( [ {pipeline},..., {$merge: { ... } ]); coll coll2 $merge
  • 79.
    #MDBLocal MongoDB 4.2 $merge db.coll.aggregate( [ {pipeline},..., {$merge: { ... } ]); coll2 can exist same or different 'db' can be sharded coll coll2 $merge
  • 80.
    #MDBLocal coll coll2 $merge { } {} { } { } { } { } { } { } MongoDB 4.2
  • 81.
  • 82.
  • 83.
    #MDBLocal {$merge: {into: {db:"db2", coll: "collection2"}} $merge syntax { $merge: { into: <target> } }
  • 84.
  • 85.
    #MDBLocal { $merge: { into: <target>, on:<fields> } } on: "_id" on: [ "_id", "shardkey(s)" ] must be unique $merge syntax
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
    #MDBLocal Actions nothing matched: usuallyinsert document matched: source target
  • 91.
    #MDBLocal Actions nothing matched: usuallyinsert document matched: overwrite? update? ??? source target
  • 92.
    #MDBLocal Actions nothing matched: usuallyinsert document matched: update source target
  • 93.
    #MDBLocal Actions nothing matched: usuallyinsert document matched: update (merge) source target
  • 94.
    #MDBLocal $merge syntax { $merge: { into:<target>, whenNotMatched: whenMatched: } }
  • 95.
    #MDBLocal $merge syntax { $merge: { into:<target>, whenNotMatched:"insert", whenMatched: } }
  • 96.
    #MDBLocal $merge syntax { $merge: { into:<target>, whenNotMatched:"insert", whenMatched:"merge" } }
  • 97.
    #MDBLocal $merge syntax { $merge: { into:<target>, whenNotMatched:"insert"|"discard"|"fail", whenMatched:"merge" } }
  • 98.
    #MDBLocal $merge syntax { $merge: { into:<target>, whenNotMatched:"insert"|"discard"|"fail", whenMatched:"merge"|"replace"|"keepExisting"|"fail"|[...] } }
  • 99.
    #MDBLocal $merge syntax { $merge: { into:<target>, whenMatched:[...] } }
  • 100.
    #MDBLocal $merge syntax { $merge: { into:<target>, whenMatched:[<custom pipeline>] } }
  • 101.
    #MDBLocal $merge example { $merge: { into:<target>, whenMatched:[ {$addFields:{ }} ] } }
  • 102.
    #MDBLocal $merge example { $merge: { into:<target>, whenMatched:[ {$addFields:{ total:{$sum:["$total","$$new.total"]} }} ] } }
  • 103.
    #MDBLocal $merge example { $merge: { into:<target>, whenMatched:[ {$set:{ total:{$sum:["$total","$$new.total"]} }} ] } }
  • 104.
    #MDBLocal $merge example { $merge: { into:<target>, whenMatched:[ {$set:{ total:{$sum:["$total","$$new.total"]} }} ] } }
  • 105.
    #MDBLocal $merge example { $merge: { into:<target>, whenMatched:[ {$set:{ total:{$sum:["$total","$$new.total"]} }} ] } } Incoming Target { _id: "37", total: 64, f1: "x" } { _id: "37", total: 245, f1: "yyy" } Result: { }
  • 106.
    #MDBLocal $merge example { $merge: { into:<target>, whenMatched:[ {$set:{ total:{$sum:["$total","$$new.total"]} }} ] } } Incoming Target { _id: "37", total: 64, f1: "x" } { _id: "37", total: 245, f1: "yyy" } Result: { _id: "37", total: 309, f1: "yyy" }
  • 107.
    #MDBLocal $merge example 2 { $merge:{ into: <target>, whenMatched:[ {$replaceWith:{$mergeObjects:[ "$$new", {total:{$sum:["$$new.total", "$total"]}} ]}} ] } }
  • 108.
    #MDBLocal $merge example 2 { $merge:{ into: <target>, whenMatched:[ {$replaceWith:{$mergeObjects:[ "$$new", {total:{$sum:["$$new.total", "$total"]}} ]}} ] } } Incoming Target { _id: "37", total: 64, f1: "x" } { _id: "37", total: 245, f1: "yyy" } Result: { }
  • 109.
    #MDBLocal $merge example 2 { $merge:{ into: <target>, whenMatched:[ {$replaceWith:{$mergeObjects:[ "$$new", {total:{$sum:["$$new.total", "$total"]}} ]}} ] } } Incoming Target { _id: "37", total: 64, f1: "x" } { _id: "37", total: 245, f1: "yyy" } Result: { _id: "37", total: 309, f1: "x" }
  • 110.
    #MDBLocal $merge syntax { $merge: { into:<target>, whenMatched:[...] } }
  • 111.
    #MDBLocal $merge syntax { $merge: { into:<target>, let: { ... }, whenMatched:[ ...] } }
  • 112.
    #MDBLocal $merge syntax { $merge: { into:<target>, let: {new: "$$ROOT"}, whenMatched:[ ...] } }
  • 113.
  • 114.
    #MDBLocal { $merge: { into: <target>, let:{itotal: "$total"}, whenMatched:[ {$set:{ total:{$sum:["$total","$$itotal"]} }} ] } } { $merge: { into: <target>, whenMatched:[ {$set:{ total:{$sum:["$total","$$new.total"]} }} ] } }
  • 115.
  • 116.
    #MDBLocal temp real data real Using $merge toappend loaded and cleansed records loaded into db
  • 117.
    #MDBLocal aggregate 'temp' andappend valid records to 'data' db.temp.aggregate( [ { ... } /* pipeline to massage and cleanse data in temp */, {$merge:{ into: "data", whenMatched: "fail" }} ]);
  • 118.
    #MDBLocal aggregate 'temp' andappend valid records to 'data' db.temp.aggregate( [ { ... } /* pipeline to massage and cleanse data in temp */, {$merge:{ into: "data", whenMatched: "fail" }} ]); Similar to SQL's INSERT INTO T1 SELECT * from T2
  • 119.
  • 120.
    #MDBLocal mflix users users mfriendbook users sv Using $merge topopulate/update user fields from other services
  • 121.
    #MDBLocal mflix users users mfriendbook users sv Using $merge topopulate/update user fields from other services sv.users { _id: "user253", dob: ISODate(...), f1: "yyy" }
  • 122.
    #MDBLocal $merge updates fieldsfrom mflix.users collection into sv.users collection. Our "_id" field is unique username mflix_pipeline = [ { "$project" : { "_id" : "$username", "mflix" : "$$ROOT" }}, { "$merge" : { "into" : { "db": "sv", "collection" : "users" }, "whenNotMatched" : "discard" }} ] (in mflix) sv.users { _id: "user253", dob: ISODate(...), f1: "yyy" }
  • 123.
    #MDBLocal $merge updates fieldsfrom mflix.users collection into sv.users collection. Our "_id" field is unique username mflix_pipeline = [ { "$project" : { "_id" : "$username", "mflix" : "$$ROOT" }}, { "$merge" : { "into" : { "db": "sv", "collection" : "users" }, "whenNotMatched" : "discard" }} ] (in mflix) db.users.aggregate(mflix_pipeline) sv.users { _id: "user253", dob: ISODate(...), f1: "yyy", mflix: { ... } }
  • 124.
    #MDBLocal $merge updates fieldsfrom mfriendbook.users collection into sv.users collection. Our "_id" field is unique username mfriendbook_pipeline = [ { "$project" : { "_id" : "$username", "mfriendbook" : "$$ROOT" }}, { "$merge" : { "into" : { "db": "sv", "collection" : "users" }, "whenNotMatched" : "discard" }} ] (in mfriendbook) sv.users { _id: "user253", dob: ISODate(...), f1: "yyy", mflix: { ... } }
  • 125.
    #MDBLocal $merge updates fieldsfrom mfriendbook.users collection into sv.users collection. Our "_id" field is unique username mfriendbook_pipeline = [ { "$project" : { "_id" : "$username", "mfriendbook" : "$$ROOT" }}, { "$merge" : { "into" : { "db": "sv", "collection" : "users" }, "whenNotMatched" : "discard" }} ] (in mfriendbook) db.users.aggregate(mfriendbook_pipeline) sv.users { _id: "user253", dob: ISODate(...), f1: "yyy", mflix: { ... }, mfriendbook: { ... } }
  • 126.
  • 127.
    registrations real regsummary real Using $merge toincrementally update periodic rollups in summary
  • 128.
    #MDBLocal $merge to create/updateperiodic rollups in summary collection (for all days) db.regsummary.createIndex({event:1, date:1}, {unique: true});
  • 129.
    #MDBLocal $merge to create/updateperiodic rollups in summary collection (for all days) db.regsummary.createIndex({event:1, date:1}, {unique: true}); db.registrations.aggregate([ {$match: {event_id: "MDBW19"}}, {$group:{ _id:{$dateToString:{date:"$date",format:"%Y-%m-%d"}}, count: {$sum:1} }}, {$project: {_id:0,event:"MDBW19",date:"$_id",total:"$count"}}, {$merge: { into: "regsummary", on: ["event", "date"] }} ])
  • 130.
    #MDBLocal $merge to create/updateperiodic rollups in summary collection (for all days) db.regsummary.createIndex({event:1, date:1}, {unique: true}); db.registrations.aggregate([ {$match: {event_id: "MDBW19"}}, {$group:{ _id:{$dateToString:{date:"$date",format:"%Y-%m-%d"}}, count: {$sum:1} }}, {$project: {_id:0,event:"MDBW19",date:"$_id",total:"$count"}}, {$merge: { into: "regsummary", on: ["event", "date"] }} ]) { "event" : "MDBW19", "date" : "2019-05-19", "total" : 33 } { "event" : "MDBW19", "date" : "2019-05-20", "total" : 15 } { "event" : "MDBW19", "date" : "2019-05-21", "total" : 24 }
  • 131.
    #MDBLocal $merge to incrementallyupdate periodic rollups in summary collection (for single day)
  • 132.
    #MDBLocal $merge to incrementallyupdate periodic rollups in summary collection (for single day) db.registrations.aggregate([ {$match: { event_id: "MDBW19", date:{$gte:ISODate("2019-05-22"),$lt:ISODate("2019-05-23")} }}, {$count: "total"}, {$addFields: {event:"MDBW19", "date":"2019-05-22"}}, {$merge: { into: "regsummary", on: ["event", "date"] }} ])
  • 133.
    #MDBLocal $merge to incrementallyupdate periodic rollups in summary collection (for single day) db.registrations.aggregate([ {$match: { event_id: "MDBW19", date:{$gte:ISODate("2019-05-22"),$lt:ISODate("2019-05-23")} }}, {$count: "total"}, {$addFields: {event:"MDBW19", "date":"2019-05-22"}}, {$merge: { into: "regsummary", on: ["event", "date"] }} ]) { "event" : "MDBW19", "date" : "2019-05-19", "total" : 33 } { "event" : "MDBW19", "date" : "2019-05-20", "total" : 15 } { "event" : "MDBW19", "date" : "2019-05-21", "total" : 24 } { "event" : "MDBW19", "date" : "2019-05-22", "total" : 34 }
  • 135.
    #MDBlocal Shard-N: Aggregation Pipeline Power++: HowMongoDB 4.2 Pipeline Empowers Queries, Updates, and Materialized Views Asya Kamsky https://www.surveymonkey.com/r/KTG5CKP
  • 136.