@h_ingo Analytics with MongoDB alone and with Hadoop Connector Henrik Ingo Solution Architect, MongoDB
The Science in Data Science • Collect data • Explore the data, use visualization • Use math • Make predictions • Test predictions – Collect even more data • Repeat...
Why MongoDB? When MongoDB?
5 NoSQL categories Redis Cassandra Key Value Graph Neo4j Wide Column Document Map Reduce Hadoop
MongoDB and Enterprise IT Stack CRM, ERP, Collaboration, Mobile, BI Data Management Online Data Offline Data RDBMS RDBMS Hadoop EDW Infrastructure OS & Virtualization, Compute, Storage, Network Security & Auditing Management & Monitoring Applications
How do we do it with MongoDB?
Collect data
Exponential Data Growth http://www.worldwidewebsize.com/
Volume Velocity Variety
Volume Velocity Variety Upserts avoid unnecessary reads Asynchronous writes Data Data Sources Data Sources Data Sources Sources Spread writes over multiple shards Writes buffered in RAM and flushed to disk in bulk
Volume Velocity Variety MongoDB RDBMS { _id : ObjectId("4c4ba5e5e8aabf3"), employee_name: "Dunham, Justin", department : "Marketing", title : "Product Manager, Web", report_up: "Neray, Graham", pay_band: “C", benefits : [ { type : "Health", plan : "PPO Plus" }, { type : "Dental", plan : "Standard" } ] }
Visualization
Visualization d3js.org, …
Use math
Data Processing in MongoDB • Pre-aggregated documents • Aggregation Framework • Map/Reduce • Hadoop Connector
Pre-aggregated documents Design Pattern
Pre-Aggregation Data for URL / Date { _id: "20101010/site-1/apache_pb.gif", metadata: { date: ISODate("2000-10-10T00:00:00Z"), site: "site-1", page: "/apache_pb.gif" }, daily: 5468426, hourly: { "0": 227850, "1": 210231, ... "23": 20457 }, minute: { "0": 3612, "1": 3241, ... "1439": 2819 } }
Pre-Aggregation Data for URL / Date query = { '_id': "20101010/site-1/apache_pb.gif" } update = { '$inc': { 'hourly.12' : 1, 'minute.739': 1 } } db.stats.daily.update(query, update, upsert=True)
Aggregation framework
Dynamic Queries Find all logs for a URL db.logs.find( { ‘path’ : ‘/index.html’ } ) Find all logs for a time range db.logs.find( { ‘time’ : { ‘$gte’: new Date(2013, 0), ‘$lt’: new Date(2013, s1) } } ) Find all logs for a host over a range of dates db.logs.find( { ‘host’ : ‘127.0.0.1’, ‘time’ : { ‘$gte’: new Date(2013, 0), ‘$lt’: new Date(2013, 1) } } )
Aggregation Framework Requests db.logs.aggregate( [ { '$match': { per day by 'time': { URL '$gte': new Date(2013, 0), '$lt': new Date(2013, 1) } } }, { '$project': { 'path': 1, 'date': { 'y': { '$year': '$time' }, 'm': { '$month': '$time' }, 'd': { '$dayOfMonth': '$time' } } } }, { '$group': { '_id': { 'p': '$path', 'y': '$date.y', 'm': '$date.m', 'd': '$date.d' }, 'hits': { '$sum': 1 } } }, ])
Aggregation Framework { ‘ok’: 1, ‘result’: [ { '_id': {'p':’/index.html’,'y': { '_id': {'p':’/index.html’,'y': { '_id': {'p':’/index.html’,'y': { '_id': {'p':’/index.html’,'y': { '_id': {'p':’/index.html’,'y': ] } 2013,'m': 2013,'m': 2013,'m': 2013,'m': 2013,'m': 1,'d': 1,'d': 1,'d': 1,'d': 1,'d': 1 2 3 4 5 }, }, }, }, }, 'hits’: 'hits’: 'hits’: 'hits’: 'hits’: 124 }, 245 }, 322 }, 175 }, 94 }
Aggregation Framework Benefits • Real-time • Simple yet powerful interface • Scale-out • Declared in JSON, executes in C++ • Runs inside MongoDB on local data
Map Reduce in MongoDB
MongoDB Map/Reduce
Map Reduce – Map Phase Generate hourly rollups from log data var map = function() { var key = { p: this.path, d: new Date( this.ts.getFullYear(), this.ts.getMonth(), this.ts.getDate(), this.ts.getHours(), 0, 0, 0) }; emit( key, { hits: 1 } ); }
Map Reduce – Reduce Phase Generate hourly rollups from log data var reduce = function(key, values) { var r = { hits: 0 }; values.forEach(function(v) { r.hits += v.hits; }); return r; } )
Map Reduce - Execution query = { 'ts': { '$gte': new Date(2013, 0, 1), '$lte': new Date(2013, 0, 31) } } db.logs.mapReduce( map, reduce, { ‘query’: query, ‘out’: { ‘reduce’ : ‘stats.monthly’ } } )
MongoDB Map/Reduce Benefits • Runs inside MongoDB • Sharding supported • JavaScript – Pro: functionality, expressiveness – Con: overhead • Input can be a collection or query! • Output directly to document or collection • Easy, when you don’t want overhead of Hadoop
Hadoop Connector
MongoDB with Hadoop
MongoDB with Hadoop
MongoDB MongoDB with Hadoop
How it works • Adapter examines MongoDB input collection and calculates a set of splits from data • Each split is assigned to a Hadoop node • In parallel hadoop pulls data from splits on MongoDB (or BSON) and starts processing locally • Hadoop merges results and streams output back to MongoDB (or BSON) output collection
Read From MongoDB (or BSON) mongo.job.input.format=com.mongodb.hadoop.MongoInputFormat mongo.input.uri=mongodb://my-db:27017/enron.messages mongo.job.input.format=com.mongodb.hadoop.BSONFileInputFormat mapred.input.dir= file:///tmp/messages.bson mapred.input.dir= hdfs:///tmp/messages.bson mapred.input.dir= s3:///tmp/messages.bson
Write To MongoDB (or BSON) mongo.job.output.format=com.mongodb.hadoop.MongoOutputFormat mongo.output.uri=mongodb://my-db:27017/enron.results_out mongo.job.output.format=com.mongodb.hadoop.BSONFileOutputFormat mapred.output.dir= file:///tmp/results.bson mapred.output.dir= hdfs:///tmp/results.bson mapred.output.dir= s3:///tmp/results.bson
Document Example { "_id" : ObjectId("4f2ad4c4d1e2d3f15a000000"), "body" : "Here is our forecastnn ", "filename" : "1.", "headers" : { "From" : "phillip.allen@enron.com", "Subject" : "Forecast Info", "X-bcc" : "", "To" : "tim.belden@enron.com", "X-Origin" : "Allen-P", "X-From" : "Phillip K Allen", "Date" : "Mon, 14 May 2001 16:39:00 -0700 (PDT)", "X-To" : "Tim Belden ", "Message-ID" : "<18782981.1075855378110.JavaMail.evans@thyme>", "Content-Type" : "text/plain; charset=us-ascii", "Mime-Version" : "1.0" } }
Graph Sketch
Receiver Sender Pairs {"_id": {"t":"bob@enron.com", "f":"alice@enron.com"}, "count" : 14} {"_id": {"t":"bob@enron.com", "f":"eve@enron.com"}, "count" : 9} {"_id": {"t":"alice@enron.com", "f":"charlie@enron.com"}, "count" : 99} {"_id": {"t":"charlie@enron.com", "f":"bob@enron.com"}, "count" : 48} {"_id": {"t":"eve@enron.com", "f":"charlie@enron.com"}, "count" : 20}
Map Phase – each document get’s through mapper function @Override public void map(NullWritable key, BSONObject val, final Context context){ BSONObject headers = (BSONObject)val.get("headers"); if(headers.containsKey("From") && headers.containsKey("To")){ String from = (String)headers.get("From"); String to = (String) headers.get("To"); String[] recips = to.split(","); for(int i=0;i<recips.length;i++){ String recip = recips[i].trim(); context.write(new MailPair(from, recip), new IntWritable(1)); } } }
Reduce Phase – output Maps are grouped by key and passed to Reducer public void reduce(final MailPair pKey, final Iterable<IntWritable> pValues, final Context pContext ){ int sum = 0; for ( final IntWritable value : pValues ){ sum += value.get(); } BSONObject outDoc = new BasicDBObjectBuilder().start() .add( "f" , pKey.from) .add( "t" , pKey.to ) .get(); BSONWritable pkeyOut = new BSONWritable(outDoc); pContext.write( pkeyOut, new IntWritable(sum) ); }
Query Data mongos> db.streaming.output.find({"_id.t": /^kenneth.lay/}) { "_id" : { "t" "f" } { "_id" : { "t" "f" } { "_id" : { "t" "f" { "_id" : { "t" "f" { "_id" : { "t" "f" { "_id" : { "t" "f" { "_id" : { "t" "f" : "kenneth.lay@enron.com", : "15126-1267@m2.innovyx.com" }, "count" : 1 : "kenneth.lay@enron.com", : "2586207@www4.imakenews.com" }, "count" : 1 : : : : : : : : : : "kenneth.lay@enron.com", "40enron@enron.com" }, "count" : 2 } "kenneth.lay@enron.com", "a..davis@enron.com" }, "count" : 2 } "kenneth.lay@enron.com", "a..hughes@enron.com" }, "count" : 4 } "kenneth.lay@enron.com", "a..lindholm@enron.com" }, "count" : 1 } "kenneth.lay@enron.com", "a..schroeder@enron.com" }, "count" : 1 }
Hadoop Connector Benefits • Full multi-core parallelism to process MongoDB data • mongo.input.query • Full integration w/ Hadoop and JVM ecosystem • Mahout, et.al. • Can be used on Amazon Elastic MapReduce • Read and write backup files to local, HDFS and S3 • Vanilla Java MapReduce, Hadoop Streaming, Pig, Hive
Make predictions & test
A/B testing • Hey, it looks like teenage girls clicked a lot on that ad with a pink background... • Hypothesis: Given otherwise the same ad, teenage girls are more likely to click on ads with pink backgrounds than white • Test 50-50 pink vs white ads • Collect click stream stats in MongoDB or Hadoop • Analyze results
Recommendations – social filtering • ”Customers who bought this book also bought” • Computed offline / nightly • As easy as it sounds! google it: Amazon item-to-item algorithm
Personalization • ”Even if you are a teenage girl, you seem to be 60% more likely to click on blue ads than pink.” • User specific recommendations a hybrid of offline & online recommendations • User profile in MongoDB • May even be updated real time
@h_ingo Questions? Henrik Ingo Solution Architect, MongoDB

Analytics with MongoDB Aggregation Framework and Hadoop Connector

  • 1.
    @h_ingo Analytics with MongoDB aloneand with Hadoop Connector Henrik Ingo Solution Architect, MongoDB
  • 2.
    The Science inData Science • Collect data • Explore the data, use visualization • Use math • Make predictions • Test predictions – Collect even more data • Repeat...
  • 3.
  • 4.
    5 NoSQL categories Redis Cassandra KeyValue Graph Neo4j Wide Column Document Map Reduce Hadoop
  • 5.
    MongoDB and EnterpriseIT Stack CRM, ERP, Collaboration, Mobile, BI Data Management Online Data Offline Data RDBMS RDBMS Hadoop EDW Infrastructure OS & Virtualization, Compute, Storage, Network Security & Auditing Management & Monitoring Applications
  • 6.
    How do wedo it with MongoDB?
  • 7.
  • 8.
  • 9.
  • 10.
    Volume Velocity Variety Upsertsavoid unnecessary reads Asynchronous writes Data Data Sources Data Sources Data Sources Sources Spread writes over multiple shards Writes buffered in RAM and flushed to disk in bulk
  • 11.
    Volume Velocity Variety MongoDB RDBMS { _id: ObjectId("4c4ba5e5e8aabf3"), employee_name: "Dunham, Justin", department : "Marketing", title : "Product Manager, Web", report_up: "Neray, Graham", pay_band: “C", benefits : [ { type : "Health", plan : "PPO Plus" }, { type : "Dental", plan : "Standard" } ] }
  • 12.
  • 13.
  • 14.
  • 15.
    Data Processing inMongoDB • Pre-aggregated documents • Aggregation Framework • Map/Reduce • Hadoop Connector
  • 16.
  • 17.
    Pre-Aggregation Data for URL / Date { _id:"20101010/site-1/apache_pb.gif", metadata: { date: ISODate("2000-10-10T00:00:00Z"), site: "site-1", page: "/apache_pb.gif" }, daily: 5468426, hourly: { "0": 227850, "1": 210231, ... "23": 20457 }, minute: { "0": 3612, "1": 3241, ... "1439": 2819 } }
  • 18.
    Pre-Aggregation Data for URL / Date query= { '_id': "20101010/site-1/apache_pb.gif" } update = { '$inc': { 'hourly.12' : 1, 'minute.739': 1 } } db.stats.daily.update(query, update, upsert=True)
  • 19.
  • 20.
    Dynamic Queries Find alllogs for a URL db.logs.find( { ‘path’ : ‘/index.html’ } ) Find all logs for a time range db.logs.find( { ‘time’ : { ‘$gte’: new Date(2013, 0), ‘$lt’: new Date(2013, s1) } } ) Find all logs for a host over a range of dates db.logs.find( { ‘host’ : ‘127.0.0.1’, ‘time’ : { ‘$gte’: new Date(2013, 0), ‘$lt’: new Date(2013, 1) } } )
  • 21.
    Aggregation Framework Requests db.logs.aggregate([ { '$match': { per day by 'time': { URL '$gte': new Date(2013, 0), '$lt': new Date(2013, 1) } } }, { '$project': { 'path': 1, 'date': { 'y': { '$year': '$time' }, 'm': { '$month': '$time' }, 'd': { '$dayOfMonth': '$time' } } } }, { '$group': { '_id': { 'p': '$path', 'y': '$date.y', 'm': '$date.m', 'd': '$date.d' }, 'hits': { '$sum': 1 } } }, ])
  • 22.
    Aggregation Framework { ‘ok’: 1, ‘result’:[ { '_id': {'p':’/index.html’,'y': { '_id': {'p':’/index.html’,'y': { '_id': {'p':’/index.html’,'y': { '_id': {'p':’/index.html’,'y': { '_id': {'p':’/index.html’,'y': ] } 2013,'m': 2013,'m': 2013,'m': 2013,'m': 2013,'m': 1,'d': 1,'d': 1,'d': 1,'d': 1,'d': 1 2 3 4 5 }, }, }, }, }, 'hits’: 'hits’: 'hits’: 'hits’: 'hits’: 124 }, 245 }, 322 }, 175 }, 94 }
  • 23.
    Aggregation Framework Benefits •Real-time • Simple yet powerful interface • Scale-out • Declared in JSON, executes in C++ • Runs inside MongoDB on local data
  • 24.
  • 25.
  • 26.
    Map Reduce –Map Phase Generate hourly rollups from log data var map = function() { var key = { p: this.path, d: new Date( this.ts.getFullYear(), this.ts.getMonth(), this.ts.getDate(), this.ts.getHours(), 0, 0, 0) }; emit( key, { hits: 1 } ); }
  • 27.
    Map Reduce –Reduce Phase Generate hourly rollups from log data var reduce = function(key, values) { var r = { hits: 0 }; values.forEach(function(v) { r.hits += v.hits; }); return r; } )
  • 28.
    Map Reduce -Execution query = { 'ts': { '$gte': new Date(2013, 0, 1), '$lte': new Date(2013, 0, 31) } } db.logs.mapReduce( map, reduce, { ‘query’: query, ‘out’: { ‘reduce’ : ‘stats.monthly’ } } )
  • 29.
    MongoDB Map/Reduce Benefits •Runs inside MongoDB • Sharding supported • JavaScript – Pro: functionality, expressiveness – Con: overhead • Input can be a collection or query! • Output directly to document or collection • Easy, when you don’t want overhead of Hadoop
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
    How it works • Adapterexamines MongoDB input collection and calculates a set of splits from data • Each split is assigned to a Hadoop node • In parallel hadoop pulls data from splits on MongoDB (or BSON) and starts processing locally • Hadoop merges results and streams output back to MongoDB (or BSON) output collection
  • 35.
    Read From MongoDB(or BSON) mongo.job.input.format=com.mongodb.hadoop.MongoInputFormat mongo.input.uri=mongodb://my-db:27017/enron.messages mongo.job.input.format=com.mongodb.hadoop.BSONFileInputFormat mapred.input.dir= file:///tmp/messages.bson mapred.input.dir= hdfs:///tmp/messages.bson mapred.input.dir= s3:///tmp/messages.bson
  • 36.
    Write To MongoDB (orBSON) mongo.job.output.format=com.mongodb.hadoop.MongoOutputFormat mongo.output.uri=mongodb://my-db:27017/enron.results_out mongo.job.output.format=com.mongodb.hadoop.BSONFileOutputFormat mapred.output.dir= file:///tmp/results.bson mapred.output.dir= hdfs:///tmp/results.bson mapred.output.dir= s3:///tmp/results.bson
  • 37.
    Document Example { "_id" :ObjectId("4f2ad4c4d1e2d3f15a000000"), "body" : "Here is our forecastnn ", "filename" : "1.", "headers" : { "From" : "phillip.allen@enron.com", "Subject" : "Forecast Info", "X-bcc" : "", "To" : "tim.belden@enron.com", "X-Origin" : "Allen-P", "X-From" : "Phillip K Allen", "Date" : "Mon, 14 May 2001 16:39:00 -0700 (PDT)", "X-To" : "Tim Belden ", "Message-ID" : "<18782981.1075855378110.JavaMail.evans@thyme>", "Content-Type" : "text/plain; charset=us-ascii", "Mime-Version" : "1.0" } }
  • 38.
  • 39.
    Receiver Sender Pairs {"_id":{"t":"bob@enron.com", "f":"alice@enron.com"}, "count" : 14} {"_id": {"t":"bob@enron.com", "f":"eve@enron.com"}, "count" : 9} {"_id": {"t":"alice@enron.com", "f":"charlie@enron.com"}, "count" : 99} {"_id": {"t":"charlie@enron.com", "f":"bob@enron.com"}, "count" : 48} {"_id": {"t":"eve@enron.com", "f":"charlie@enron.com"}, "count" : 20}
  • 40.
    Map Phase –each document get’s through mapper function @Override public void map(NullWritable key, BSONObject val, final Context context){ BSONObject headers = (BSONObject)val.get("headers"); if(headers.containsKey("From") && headers.containsKey("To")){ String from = (String)headers.get("From"); String to = (String) headers.get("To"); String[] recips = to.split(","); for(int i=0;i<recips.length;i++){ String recip = recips[i].trim(); context.write(new MailPair(from, recip), new IntWritable(1)); } } }
  • 41.
    Reduce Phase –output Maps are grouped by key and passed to Reducer public void reduce(final MailPair pKey, final Iterable<IntWritable> pValues, final Context pContext ){ int sum = 0; for ( final IntWritable value : pValues ){ sum += value.get(); } BSONObject outDoc = new BasicDBObjectBuilder().start() .add( "f" , pKey.from) .add( "t" , pKey.to ) .get(); BSONWritable pkeyOut = new BSONWritable(outDoc); pContext.write( pkeyOut, new IntWritable(sum) ); }
  • 42.
    Query Data mongos> db.streaming.output.find({"_id.t":/^kenneth.lay/}) { "_id" : { "t" "f" } { "_id" : { "t" "f" } { "_id" : { "t" "f" { "_id" : { "t" "f" { "_id" : { "t" "f" { "_id" : { "t" "f" { "_id" : { "t" "f" : "kenneth.lay@enron.com", : "15126-1267@m2.innovyx.com" }, "count" : 1 : "kenneth.lay@enron.com", : "2586207@www4.imakenews.com" }, "count" : 1 : : : : : : : : : : "kenneth.lay@enron.com", "40enron@enron.com" }, "count" : 2 } "kenneth.lay@enron.com", "a..davis@enron.com" }, "count" : 2 } "kenneth.lay@enron.com", "a..hughes@enron.com" }, "count" : 4 } "kenneth.lay@enron.com", "a..lindholm@enron.com" }, "count" : 1 } "kenneth.lay@enron.com", "a..schroeder@enron.com" }, "count" : 1 }
  • 43.
    Hadoop Connector Benefits • Fullmulti-core parallelism to process MongoDB data • mongo.input.query • Full integration w/ Hadoop and JVM ecosystem • Mahout, et.al. • Can be used on Amazon Elastic MapReduce • Read and write backup files to local, HDFS and S3 • Vanilla Java MapReduce, Hadoop Streaming, Pig, Hive
  • 44.
  • 45.
    A/B testing • Hey,it looks like teenage girls clicked a lot on that ad with a pink background... • Hypothesis: Given otherwise the same ad, teenage girls are more likely to click on ads with pink backgrounds than white • Test 50-50 pink vs white ads • Collect click stream stats in MongoDB or Hadoop • Analyze results
  • 46.
    Recommendations – socialfiltering • ”Customers who bought this book also bought” • Computed offline / nightly • As easy as it sounds! google it: Amazon item-to-item algorithm
  • 47.
    Personalization • ”Even ifyou are a teenage girl, you seem to be 60% more likely to click on blue ads than pink.” • User specific recommendations a hybrid of offline & online recommendations • User profile in MongoDB • May even be updated real time
  • 48.