Martin Zapletal @zapletal_martin Cake Solutions @cakesolutions Machine learning at scale with Apache Spark
Scaling computation ● Analytics tools with poor scalability and integration ● Manual processes ● Slow iterations ● Not suitable for large amounts of data ● We want fast iteration, reliability, integration ● Serial implementation ● Parallel ● GPUs ● Distributed
Scaling neural networks
Perceptron ● Basic building block of neural networks a = f(Σ(y * w) + b) b
Artificial neural network ● Network training ○ Many “optimal” solutions ○ Optimization and training techniques - LBFGS, Backpropagation, batch and online gradient descent, Downpour SGD, Sandblaster LBFGS, … ○ Vanishing gradient, amplifying parameters, ... ○ New methods for large networks - deep learning
XOR -10.895 1.195 1 0 0.999595 -24.584 -1.159 7.360 -40.119 1.991 35.369 -24.687 -53.197 -8.627 -57.122 2.616 61.488 -52.985 -22.904 -67.173 22.172 -53.706 27.098 -0.375 Output 2.613296075440797E-4 for input Vector(0, 0) Output 0.9989222606269823 for input Vector(0, 1) Output 0.9995952194411893 for input Vector(1, 0) Output 4.0074182099155245E-7 for input Vector(1, 1)
Scaling computation ● Different programming models, Different languages, Different levels ● Sequential ○ R, Matlab, Python, Scala ● Parallel ○ Theano, Torch, Caffe, Tensor Flow, Deeplearning4j Elapsed times for 20 PageRank iterations [3, 4]
Machine learning ● Linear algebra ● Vectors, matrices, vector spaces, matrix transformations, eigenvectors/values ● Many machine learning algorithms are optimization problems ● Goal is to solve them in reasonable (bounded) time ● Goal not always to find the best possible model (data size, feature engineering vs. algorithm/model complexity) ● Goal is to solve them reliably, at scale, support application needs and improve [5]
Distributed environment ● Asynchronous and unreliable ● CAP theorem ● Consistency ● Availability ● Partition tolerance
Consistency, time and order in DS ● Sequential program always one total order of operations ● No order guarantees in distributed system ● At-most-once. Messages may be lost. ● At-least-once. Messages may be duplicated but not lost. ● Exactly-once.
Failure in distributed system ● Node failures, network partitions, message loss, split brains, inconsistencies ● Microsoft's data centers average failure rate is 5.2 devices per day and 40.8 links per day, with a median time to repair of approximately five minutes (and a maximum of one week). ● Google new cluster over one year. Five times rack issues 40-80 machines seeing 50 percent packet loss. Eight network maintenance events (four of which might cause ~30-minute random connectivity losses). Three router failures (resulting in the need to pull traffic immediately for an hour). ● CENIC 500 isolating network partitions with median 2.7 and 32 minutes; 95th percentile of 19.9 minutes and 3.7 days, respectively for software and hardware problems [6]
Failure in distributed system ● MongoDB separated primary from its 2 secondaries. 2 hours later the old primary rejoined and rolled back everything on the new primary ● A network partition isolated the Redis primary from all secondaries. Every API call caused the billing system to recharge customer credit cards automatically, resulting in 1.1 percent of customers being overbilled over a period of 40 minutes. ● The partition caused inconsistency in the MySQL database. Because foreign key relationships were not consistent, Github showed private repositories to the wrong users' dashboards and incorrectly routed some newly created repositories. ● For several seconds, Elasticsearch is happy to believe two nodes in the same cluster are both primaries, will accept writes on both of those nodes, and later discard the writes to one side. ● RabbitMQ lost ~35% of acknowledged writes under those conditions. ● Redis threw away 56% of the writes it told us succeeded. ● In Riak, last-write-wins resulted in dropping 30-70% of writes, even with the strongest consistency settings ● MongoDB “strictly consistent” reads see stale versions of documents, but they can also return garbage data from writes that never should have occurred. [6]
Algorithm parallelization Data computation computation computation
Algorithm parallelization [7]
Neural network parallelism [8]
import tensorflow as tf def init_weights(shape): return tf.Variable(tf.random_normal(shape, stddev=0.01)) def model(X, w_h, w_o): h = tf.nn.sigmoid(tf.matmul(X, w_h)) return tf.matmul(h, w_o) X = tf.placeholder("float", [None, 784]) Y = tf.placeholder("float", [None, 10]) w_h = init_weights([784, 625]) w_o = init_weights([625, 10]) py_x = model(X, w_h, w_o) cost = tf.reduce_mean( tf.nn.softmax_cross_entropy_with_logits(py_x, Y)) train_op = tf.train.GradientDescentOptimizer(0.05).minimize(cost) predict_op = tf.argmax(py_x, 1) sess = tf.Session() init = tf.initialize_all_variables() sess.run(init) sess.run(train_op, …) sess.run(predict_op, …) [9, 10]
Model parallelism [11] Machine1 Machine2 Machine3 Machine4 Machine1 Machine2 Machine3 Machine4
Data parallelism [11] Data Data
Parameter server ● Model and data parallelism ● Failures and slow machines ● Additional stochasticity due to asynchrony (relaxed consistency, not up to data parameters, ordering not guaranteed, …) [11]
Examples “Their network for face detection from youtube comprised millions of neurons and 1 billion connection weights. They trained it on a dataset of 10 million 200x200 pixel RGB images to learn 20,000 object categories. The training simulation ran for three days on a cluster of 1,000 servers totaling 16,000 CPU cores. Each instantiation of the network spanned 170 servers” Google. “We demonstrate near-perfect weak scaling on a 16 rack IBM Blue Gene/Q (262144 CPUs, 256 TB memory), achieving an unprecedented scale of 256 million neurosynaptic cores containing 65 billion neurons and 16 trillion synapses“ TrueNorth, part of project IBM SyNAPSE. [11, 12]
Examples [13]
Architecture
Data Data Preprocessing Preprocessing Features Features Training Testing Error %
Data processing pipeline ● Whole lifecycle of data ● Data processing ● Data stores ● Integration ● Distributed computing primitives ● Cluster managers and task schedulers ● Deployment, configuration management and DevOps ● Data analytics and machine learning
[14] CQRS Client QueryCommand DBDB Denormalise /Precompute Kappa architecture Batch-Pipeline Kafka Allyour data NoSQL SQL Spark Client Client Client Views Stream processor Flume Scoop Hive Impala Oozie HDFS Lambda Architecture Batch Layer Servin g Layer Stream layer (fast) Query Query Allyour data Serving DB
[15, 16]
Apache Spark
Apache Spark ● In memory dataflow distributed data processing framework, streaming and batch ● Distributes computation using a higher level API ● Load balancing ● Moves computation to data ● Fault tolerant
Spark distributed programming model ● Resilient Distributed Datasets ● Fault tolerance ● Caching ● Serialization ● Transformations ○ Lazy, form the DAG ○ map, filter, flatMap, union, group, reduce, sort, join, repartition, cartesian, glom, ... ● Actions ○ Execute DAG, retrieve result ○ reduce, collect, count, first, take, foreach, saveAs…, min, max, ... ● Accumulators, Broadcast Variables ● SQL ● Integration ● Streaming ● Machine Learning ● Graph Processing
Distributed computation ● Spark streaming ● Computing, processing, transforming, analytics [17]
textFile mapmap reduceByKey collect sc.textFile("counts") .map(line => line.split("t")) .map(word => (word(0), word(1).toInt)) .reduceByKey(_ + _) .collect() [18] RDD
Graph lineage ● Master and worker failures val data2a = data2 .map(x => x.label -> x.features) val dataa = data .map(x => x.label -> x.features) .union(data2a) .cache() val data3a = data3 .map(x => x.label -> x.features) val datab = dataa .join(data3a, 4) .cache() .mapPartitions(it => it.map(x => x._1 + 1 -> x._2)) .groupByKey(4) .reduceByKey((it1, it2) => it1 ++ it2) .collect()
Optimizations ● Multiple phases ● Catalyst [19]
Optimizations [20] Spark master Spark worker Cassandra
Optimizations ● CPU and memory bottlenecks, not IO ● Project Tungsten ○ Explicit memory management and binary processing ○ Cache-aware computation ○ Code generation ● Daytona Gray Sort 100TB Benchmark won by Apache Spark ○ Optimized memory layout, shuffle algorithm, ... [20]
MLlib ● Data types ● Basic statistics ○ summary statistics, correlations, stratified sampling, hypothesis testing, streaming significance testing, random data generation ● Classification and regression ○ SVMs, logistic regression, linear regression, naive Bayes, decision trees, ensembles of trees (Random Forests and Gradient-Boosted Trees), isotonic regression, multilayer perceptron classifier, one-vs-rest classifier, survival regression ● Collaborative filtering ○ alternating least squares (ALS) ● Clustering ○ k-means, Gaussian mixture, power iteration clustering (PIC), latent Dirichlet allocation (LDA), bisecting k-means, streaming k-means ● Dimensionality reduction ○ singular value decomposition (SVD), principal component analysis (PCA) ● Feature extraction and transformation ○ TF-IDF, word2vec, normalizers, scaling ● Frequent pattern mining ○ FP-growth, association rules, PrefixSpan ● Evaluation metrics ● PMML model export ● Optimization (developer) ○ stochastic gradient descent, limited-memory BFGS (L-BFGS) ●
Example application
Muvr [21]
7 * Dumbbell Alternating Bicep Curl
Muvr architecture
Reactive ● Responsive ● Resilient ● Elastic ● Message driven
Muvr ● Classify finished (in progress) exercises ● Gather data for improved classification ● Predict next exercises ● Predict weights, intensity ● Design a schedule of exercises and improvements (personal trainer) ● Monitor exercise quality
Scaling model training val sc = new SparkContext("local[4]", "NN") val data = ... val layers = Array[Int](inputSize, 250, 50, outputSize) val trainer = new MultilayerPerceptronClassifier() .setLayers(layers) .setBlockSize(128) .setSeed(1234L) .setMaxIter(100) val model = trainer.fit(data) val result = model.transform(data) println(result.select(result("prediction")).foreach(println)) val predictionAndLabels = result.select("prediction", "label") val evaluator = new MulticlassClassificationEvaluator() .setMetricName("precision") println("Precision:" + evaluator.evaluate(predictionAndLabels))
Scaling model training ● Deeplearning4j, Neon, Tensor flow on Spark Model 1 training Model 2 training Model 3 training Best model
init_norm = Uniform(low=-0.1,high=0.1) bias_init = Constant(val = 1.0) layers = [] layers.append(Conv( fshape = (1, 3, 16), init=init_norm, bias=bias_init, activation=Rectlin())) layers.append(Pooling( op="max", fshape=(2,1), strides=2)) layers.append(Conv( fshape = (1, 3, 32), init=init_norm, bias=bias_init, activation=Rectlin())) layers.append(Pooling( op="max", fshape=(2,1), strides=2)) layers.append(Affine( nout=100, init=init_norm, bias=bias_init, activation=Rectlin())) layers.append(Dropout( name="do_2", keep = 0.9)) layers.append(Affine( nout=dataset.num_labels, init=init_norm, bias=bias_init, activation = Logistic())) return Model(layers=layers)
backend = gen_backend( backend='cpu', batch_size=self.batch_size, rng_seed=self.random_seed, stochastic_round=False) # backend = gen_backend(rng_seed=0, gpu='cudanet') cost = GeneralizedCost( name='cost', costfunc=CrossEntropyMulti()) optimizer = GradientDescentMomentum( learning_rate=self.lrate, momentum_coef=0.9) model.fit( dataset.train(), optimizer=optimizer, num_epochs=self.max_epochs, cost=cost, callbacks=callbacks)
sc .cassandraTable(conf["cassandra"]["data_keyspace"], conf["cassandra"]["data_table"]) .select("user_id", "model_id", "file_name", "time", "x", "y", "z", "exercise") .spanBy("user_id", "model_id") .map(train_model_for_user) .saveToCassandra(conf["cassandra"]["model_keyspace"], conf["cassandra"]["model_table"])
[22]
val events = sc.eventTable().cache().toDF() val lr = new LinearRegression() val pipeline = new Pipeline().setStages(Array(new UserFilter(), new ZScoreNormalizer(), new IntensityFeatureExtractor(), lr)) val paramGrid = new ParamGridBuilder() .addGrid(lr.regParam, Array(0.1, 0.01)) .addGrid(lr.fitIntercept, Array(true, false)) getEligibleUsers(events, sessionEndedBefore) .map { user => val trainValidationSplit = new TrainValidationSplit() .setEstimator(pipeline) .setEvaluator(new RegressionEvaluator) .setEstimatorParamMaps(paramGrid) val model = trainValidationSplit.fit( events, ParamMap(ParamPair(userIdParam, user))) val testData = // Prepare test data. val predictions = model.transform(testData) submitResult(userId, predictions, config) }
Queries and analytics val events: RDD[(JournalKey, Any)] = sc.eventTable().cache().filterClass [EntireResistanceExerciseSession].flatMap(_.deviations) val deviationsFrequency = sqlContext.sql( """SELECT planned.exercise, hour(time), COUNT(1) FROM exerciseDeviations WHERE planned.exercise = 'bench press' GROUP BY planned.exercise, hour(time)""") val deviationsFrequency2 = exerciseDeviationsDF .where(exerciseDeviationsDF("planned.exercise") === "bench press") .groupBy( exerciseDeviationsDF("planned.exercise"), exerciseDeviationsDF("time”)) .count() val deviationsFrequency3 = exerciseDeviations .filter(_.planned.exercise == "bench press") .groupBy(d => (d.planned.exercise, d.time.getHours)) .map(d => (d._1, d._2.size))
Clustering def toVector(user: User): mllib.linalg.Vector = Vectors.dense( user.frequency, user.performanceIndex, user.improvementIndex) val events: RDD[(JournalKey, Any)] = sc.eventTable().cache() val users: RDD[User] = events.filterClass[User] val kmeans = new KMeans() .setK(5) .set... val clusters = kmeans.run(users.map(_.toVector))
Recommendations val weight: RDD[(JournalKey, Any)] = sc.eventTable().cache() val exerciseDeviations = events .filterClass[EntireResistanceExerciseSession] .flatMap(session => session.sets.flatMap(set => set.sets.map( exercise => (session.id.id, exercise.exercise)))) .groupBy(e => e) .map(g => Rating(normalize(g._1._1), normalize(g._1._2), normalize(g._2.size))) val model = new ALS().run(ratings) val predictions = model.predict(recommend) bench press bicep curl dead lift user 1 5 2 user 2 4 3 user 3 5 2 user 4 3 1
Graph analysis val events: RDD[(JournalKey, Any)] = sc.eventTable().cache() val connections = events.filterClass[Connections] val vertices: RDD[(VertexId, Long)] = connections.map(c => (c.id, 1l)) val edges: RDD[Edge[Long]] = connections .flatMap(c => c.connections .map(Edge(c.id, _, 1l))) val graph = Graph(vertices, edges) val ranks = graph.pageRank(0.0001).vertices
Conclusions ● Scaling systems, data pipelines and machine learning ● Reactive ○ Elasticity ○ Resilience ○ Responsiveness ○ Message driven
Questions
Thank you ● Jobs at www.cakesolutions.net/careers ● Code at https://github.com/muvr ● Martin Zapletal @zapletal_martin
References [1] http://arxiv.org/abs/1112.6209 [2] SuperComputing 2012 two weeks ago and part of the IBM SyNAPSE project [3] http://www.csie.ntu.edu.tw/~cjlin/talks/twdatasci_cjlin.pdf [4] http://blog.acolyer.org/2015/06/05/scalability-but-at-what-cost/ [5] https://www.tensorflow.org/versions/master/tutorials/mnist/beginners/index.html [6] https://queue.acm.org/detail.cfm?id=2655736 [7] http://fa.bianp.net/blog/2013/isotonic-regression/ [8] http://briandolhansky.com/blog/2014/10/30/artificial-neural-networks-matrix-form-part-5 [9] https://github.com/nlintz/TensorFlow-Tutorials/blob/master/3_net.py [10] https://www.tensorflow.org/ [11] http://static.googleusercontent.com/media/research.google.com/en/us/archive/large_deep_networks_nips2012.pdf [12] https://www.quora.com/How-big-is-the-largest-feedforward-neural-network-ever-trained-and-what-for [13] http://static.googleusercontent.com/media/research.google.com/en//archive/unsupervised_icml2012.pdf [14] http://www.benstopford.com/2015/04/28/elements-of-scale-composing-and-scaling-data-platforms/ [15] http://malteschwarzkopf.de/research/assets/google-stack.pdf [16] http://malteschwarzkopf.de/research/assets/facebook-stack.pdf [17] https://twitter.com/tsantero/status/695013012525060097 [18] http://www.slideshare.net/LisaHua/spark-overview-37479609 [19] https://ogirardot.wordpress.com/2015/05/29/rdds-are-the-new-bytecode-of-apache-spark/ [20] https://kayousterhout.github.io/trace-analysis/ [21] https://github.com/muvr [22] https://databricks.com/blog/2016/01/25/deep-learning-with-spark-and-tensorflow.html
Twitter: @cakesolutions Tel: 0845 617 1200 Email: enquiries@cakesolutions.net

Machine learning at Scale with Apache Spark

  • 2.
    Martin Zapletal @zapletal_martin CakeSolutions @cakesolutions Machine learning at scale with Apache Spark
  • 3.
    Scaling computation ● Analyticstools with poor scalability and integration ● Manual processes ● Slow iterations ● Not suitable for large amounts of data ● We want fast iteration, reliability, integration ● Serial implementation ● Parallel ● GPUs ● Distributed
  • 4.
  • 5.
    Perceptron ● Basic buildingblock of neural networks a = f(Σ(y * w) + b) b
  • 6.
    Artificial neural network ●Network training ○ Many “optimal” solutions ○ Optimization and training techniques - LBFGS, Backpropagation, batch and online gradient descent, Downpour SGD, Sandblaster LBFGS, … ○ Vanishing gradient, amplifying parameters, ... ○ New methods for large networks - deep learning
  • 7.
    XOR -10.895 1.195 1 0 0.999595 -24.584 -1.159 7.360 -40.119 1.991 35.369 -24.687 -53.197 -8.627 -57.122 2.616 61.488 -52.985 -22.904 -67.173 22.172 -53.706 27.098 -0.375 Output 2.613296075440797E-4 forinput Vector(0, 0) Output 0.9989222606269823 for input Vector(0, 1) Output 0.9995952194411893 for input Vector(1, 0) Output 4.0074182099155245E-7 for input Vector(1, 1)
  • 8.
    Scaling computation ● Differentprogramming models, Different languages, Different levels ● Sequential ○ R, Matlab, Python, Scala ● Parallel ○ Theano, Torch, Caffe, Tensor Flow, Deeplearning4j Elapsed times for 20 PageRank iterations [3, 4]
  • 9.
    Machine learning ● Linearalgebra ● Vectors, matrices, vector spaces, matrix transformations, eigenvectors/values ● Many machine learning algorithms are optimization problems ● Goal is to solve them in reasonable (bounded) time ● Goal not always to find the best possible model (data size, feature engineering vs. algorithm/model complexity) ● Goal is to solve them reliably, at scale, support application needs and improve [5]
  • 10.
    Distributed environment ● Asynchronousand unreliable ● CAP theorem ● Consistency ● Availability ● Partition tolerance
  • 11.
    Consistency, time andorder in DS ● Sequential program always one total order of operations ● No order guarantees in distributed system ● At-most-once. Messages may be lost. ● At-least-once. Messages may be duplicated but not lost. ● Exactly-once.
  • 12.
    Failure in distributedsystem ● Node failures, network partitions, message loss, split brains, inconsistencies ● Microsoft's data centers average failure rate is 5.2 devices per day and 40.8 links per day, with a median time to repair of approximately five minutes (and a maximum of one week). ● Google new cluster over one year. Five times rack issues 40-80 machines seeing 50 percent packet loss. Eight network maintenance events (four of which might cause ~30-minute random connectivity losses). Three router failures (resulting in the need to pull traffic immediately for an hour). ● CENIC 500 isolating network partitions with median 2.7 and 32 minutes; 95th percentile of 19.9 minutes and 3.7 days, respectively for software and hardware problems [6]
  • 13.
    Failure in distributedsystem ● MongoDB separated primary from its 2 secondaries. 2 hours later the old primary rejoined and rolled back everything on the new primary ● A network partition isolated the Redis primary from all secondaries. Every API call caused the billing system to recharge customer credit cards automatically, resulting in 1.1 percent of customers being overbilled over a period of 40 minutes. ● The partition caused inconsistency in the MySQL database. Because foreign key relationships were not consistent, Github showed private repositories to the wrong users' dashboards and incorrectly routed some newly created repositories. ● For several seconds, Elasticsearch is happy to believe two nodes in the same cluster are both primaries, will accept writes on both of those nodes, and later discard the writes to one side. ● RabbitMQ lost ~35% of acknowledged writes under those conditions. ● Redis threw away 56% of the writes it told us succeeded. ● In Riak, last-write-wins resulted in dropping 30-70% of writes, even with the strongest consistency settings ● MongoDB “strictly consistent” reads see stale versions of documents, but they can also return garbage data from writes that never should have occurred. [6]
  • 14.
  • 15.
  • 16.
  • 17.
    import tensorflow astf def init_weights(shape): return tf.Variable(tf.random_normal(shape, stddev=0.01)) def model(X, w_h, w_o): h = tf.nn.sigmoid(tf.matmul(X, w_h)) return tf.matmul(h, w_o) X = tf.placeholder("float", [None, 784]) Y = tf.placeholder("float", [None, 10]) w_h = init_weights([784, 625]) w_o = init_weights([625, 10]) py_x = model(X, w_h, w_o) cost = tf.reduce_mean( tf.nn.softmax_cross_entropy_with_logits(py_x, Y)) train_op = tf.train.GradientDescentOptimizer(0.05).minimize(cost) predict_op = tf.argmax(py_x, 1) sess = tf.Session() init = tf.initialize_all_variables() sess.run(init) sess.run(train_op, …) sess.run(predict_op, …) [9, 10]
  • 18.
  • 19.
  • 20.
    Parameter server ● Modeland data parallelism ● Failures and slow machines ● Additional stochasticity due to asynchrony (relaxed consistency, not up to data parameters, ordering not guaranteed, …) [11]
  • 21.
    Examples “Their network forface detection from youtube comprised millions of neurons and 1 billion connection weights. They trained it on a dataset of 10 million 200x200 pixel RGB images to learn 20,000 object categories. The training simulation ran for three days on a cluster of 1,000 servers totaling 16,000 CPU cores. Each instantiation of the network spanned 170 servers” Google. “We demonstrate near-perfect weak scaling on a 16 rack IBM Blue Gene/Q (262144 CPUs, 256 TB memory), achieving an unprecedented scale of 256 million neurosynaptic cores containing 65 billion neurons and 16 trillion synapses“ TrueNorth, part of project IBM SyNAPSE. [11, 12]
  • 22.
  • 23.
  • 24.
  • 25.
    Data processing pipeline ●Whole lifecycle of data ● Data processing ● Data stores ● Integration ● Distributed computing primitives ● Cluster managers and task schedulers ● Deployment, configuration management and DevOps ● Data analytics and machine learning
  • 26.
  • 27.
  • 28.
  • 29.
    Apache Spark ● Inmemory dataflow distributed data processing framework, streaming and batch ● Distributes computation using a higher level API ● Load balancing ● Moves computation to data ● Fault tolerant
  • 30.
    Spark distributed programmingmodel ● Resilient Distributed Datasets ● Fault tolerance ● Caching ● Serialization ● Transformations ○ Lazy, form the DAG ○ map, filter, flatMap, union, group, reduce, sort, join, repartition, cartesian, glom, ... ● Actions ○ Execute DAG, retrieve result ○ reduce, collect, count, first, take, foreach, saveAs…, min, max, ... ● Accumulators, Broadcast Variables ● SQL ● Integration ● Streaming ● Machine Learning ● Graph Processing
  • 31.
    Distributed computation ● Sparkstreaming ● Computing, processing, transforming, analytics [17]
  • 32.
    textFile mapmap reduceByKey collect sc.textFile("counts") .map(line =>line.split("t")) .map(word => (word(0), word(1).toInt)) .reduceByKey(_ + _) .collect() [18] RDD
  • 33.
    Graph lineage ● Masterand worker failures val data2a = data2 .map(x => x.label -> x.features) val dataa = data .map(x => x.label -> x.features) .union(data2a) .cache() val data3a = data3 .map(x => x.label -> x.features) val datab = dataa .join(data3a, 4) .cache() .mapPartitions(it => it.map(x => x._1 + 1 -> x._2)) .groupByKey(4) .reduceByKey((it1, it2) => it1 ++ it2) .collect()
  • 34.
  • 35.
  • 36.
    Optimizations ● CPU andmemory bottlenecks, not IO ● Project Tungsten ○ Explicit memory management and binary processing ○ Cache-aware computation ○ Code generation ● Daytona Gray Sort 100TB Benchmark won by Apache Spark ○ Optimized memory layout, shuffle algorithm, ... [20]
  • 37.
    MLlib ● Data types ●Basic statistics ○ summary statistics, correlations, stratified sampling, hypothesis testing, streaming significance testing, random data generation ● Classification and regression ○ SVMs, logistic regression, linear regression, naive Bayes, decision trees, ensembles of trees (Random Forests and Gradient-Boosted Trees), isotonic regression, multilayer perceptron classifier, one-vs-rest classifier, survival regression ● Collaborative filtering ○ alternating least squares (ALS) ● Clustering ○ k-means, Gaussian mixture, power iteration clustering (PIC), latent Dirichlet allocation (LDA), bisecting k-means, streaming k-means ● Dimensionality reduction ○ singular value decomposition (SVD), principal component analysis (PCA) ● Feature extraction and transformation ○ TF-IDF, word2vec, normalizers, scaling ● Frequent pattern mining ○ FP-growth, association rules, PrefixSpan ● Evaluation metrics ● PMML model export ● Optimization (developer) ○ stochastic gradient descent, limited-memory BFGS (L-BFGS) ●
  • 38.
  • 39.
  • 44.
  • 45.
  • 46.
  • 47.
    Muvr ● Classify finished(in progress) exercises ● Gather data for improved classification ● Predict next exercises ● Predict weights, intensity ● Design a schedule of exercises and improvements (personal trainer) ● Monitor exercise quality
  • 48.
    Scaling model training valsc = new SparkContext("local[4]", "NN") val data = ... val layers = Array[Int](inputSize, 250, 50, outputSize) val trainer = new MultilayerPerceptronClassifier() .setLayers(layers) .setBlockSize(128) .setSeed(1234L) .setMaxIter(100) val model = trainer.fit(data) val result = model.transform(data) println(result.select(result("prediction")).foreach(println)) val predictionAndLabels = result.select("prediction", "label") val evaluator = new MulticlassClassificationEvaluator() .setMetricName("precision") println("Precision:" + evaluator.evaluate(predictionAndLabels))
  • 49.
    Scaling model training ●Deeplearning4j, Neon, Tensor flow on Spark Model 1 training Model 2 training Model 3 training Best model
  • 50.
    init_norm = Uniform(low=-0.1,high=0.1) bias_init= Constant(val = 1.0) layers = [] layers.append(Conv( fshape = (1, 3, 16), init=init_norm, bias=bias_init, activation=Rectlin())) layers.append(Pooling( op="max", fshape=(2,1), strides=2)) layers.append(Conv( fshape = (1, 3, 32), init=init_norm, bias=bias_init, activation=Rectlin())) layers.append(Pooling( op="max", fshape=(2,1), strides=2)) layers.append(Affine( nout=100, init=init_norm, bias=bias_init, activation=Rectlin())) layers.append(Dropout( name="do_2", keep = 0.9)) layers.append(Affine( nout=dataset.num_labels, init=init_norm, bias=bias_init, activation = Logistic())) return Model(layers=layers)
  • 51.
    backend = gen_backend( backend='cpu', batch_size=self.batch_size, rng_seed=self.random_seed, stochastic_round=False) #backend = gen_backend(rng_seed=0, gpu='cudanet') cost = GeneralizedCost( name='cost', costfunc=CrossEntropyMulti()) optimizer = GradientDescentMomentum( learning_rate=self.lrate, momentum_coef=0.9) model.fit( dataset.train(), optimizer=optimizer, num_epochs=self.max_epochs, cost=cost, callbacks=callbacks)
  • 52.
    sc .cassandraTable(conf["cassandra"]["data_keyspace"], conf["cassandra"]["data_table"]) .select("user_id", "model_id", "file_name", "time", "x", "y", "z", "exercise") .spanBy("user_id", "model_id") .map(train_model_for_user) .saveToCassandra(conf["cassandra"]["model_keyspace"], conf["cassandra"]["model_table"])
  • 53.
  • 54.
    val events =sc.eventTable().cache().toDF() val lr = new LinearRegression() val pipeline = new Pipeline().setStages(Array(new UserFilter(), new ZScoreNormalizer(), new IntensityFeatureExtractor(), lr)) val paramGrid = new ParamGridBuilder() .addGrid(lr.regParam, Array(0.1, 0.01)) .addGrid(lr.fitIntercept, Array(true, false)) getEligibleUsers(events, sessionEndedBefore) .map { user => val trainValidationSplit = new TrainValidationSplit() .setEstimator(pipeline) .setEvaluator(new RegressionEvaluator) .setEstimatorParamMaps(paramGrid) val model = trainValidationSplit.fit( events, ParamMap(ParamPair(userIdParam, user))) val testData = // Prepare test data. val predictions = model.transform(testData) submitResult(userId, predictions, config) }
  • 55.
    Queries and analytics valevents: RDD[(JournalKey, Any)] = sc.eventTable().cache().filterClass [EntireResistanceExerciseSession].flatMap(_.deviations) val deviationsFrequency = sqlContext.sql( """SELECT planned.exercise, hour(time), COUNT(1) FROM exerciseDeviations WHERE planned.exercise = 'bench press' GROUP BY planned.exercise, hour(time)""") val deviationsFrequency2 = exerciseDeviationsDF .where(exerciseDeviationsDF("planned.exercise") === "bench press") .groupBy( exerciseDeviationsDF("planned.exercise"), exerciseDeviationsDF("time”)) .count() val deviationsFrequency3 = exerciseDeviations .filter(_.planned.exercise == "bench press") .groupBy(d => (d.planned.exercise, d.time.getHours)) .map(d => (d._1, d._2.size))
  • 56.
    Clustering def toVector(user: User):mllib.linalg.Vector = Vectors.dense( user.frequency, user.performanceIndex, user.improvementIndex) val events: RDD[(JournalKey, Any)] = sc.eventTable().cache() val users: RDD[User] = events.filterClass[User] val kmeans = new KMeans() .setK(5) .set... val clusters = kmeans.run(users.map(_.toVector))
  • 57.
    Recommendations val weight: RDD[(JournalKey,Any)] = sc.eventTable().cache() val exerciseDeviations = events .filterClass[EntireResistanceExerciseSession] .flatMap(session => session.sets.flatMap(set => set.sets.map( exercise => (session.id.id, exercise.exercise)))) .groupBy(e => e) .map(g => Rating(normalize(g._1._1), normalize(g._1._2), normalize(g._2.size))) val model = new ALS().run(ratings) val predictions = model.predict(recommend) bench press bicep curl dead lift user 1 5 2 user 2 4 3 user 3 5 2 user 4 3 1
  • 58.
    Graph analysis val events:RDD[(JournalKey, Any)] = sc.eventTable().cache() val connections = events.filterClass[Connections] val vertices: RDD[(VertexId, Long)] = connections.map(c => (c.id, 1l)) val edges: RDD[Edge[Long]] = connections .flatMap(c => c.connections .map(Edge(c.id, _, 1l))) val graph = Graph(vertices, edges) val ranks = graph.pageRank(0.0001).vertices
  • 59.
    Conclusions ● Scaling systems,data pipelines and machine learning ● Reactive ○ Elasticity ○ Resilience ○ Responsiveness ○ Message driven
  • 60.
  • 61.
    Thank you ● Jobsat www.cakesolutions.net/careers ● Code at https://github.com/muvr ● Martin Zapletal @zapletal_martin
  • 62.
    References [1] http://arxiv.org/abs/1112.6209 [2] SuperComputing2012 two weeks ago and part of the IBM SyNAPSE project [3] http://www.csie.ntu.edu.tw/~cjlin/talks/twdatasci_cjlin.pdf [4] http://blog.acolyer.org/2015/06/05/scalability-but-at-what-cost/ [5] https://www.tensorflow.org/versions/master/tutorials/mnist/beginners/index.html [6] https://queue.acm.org/detail.cfm?id=2655736 [7] http://fa.bianp.net/blog/2013/isotonic-regression/ [8] http://briandolhansky.com/blog/2014/10/30/artificial-neural-networks-matrix-form-part-5 [9] https://github.com/nlintz/TensorFlow-Tutorials/blob/master/3_net.py [10] https://www.tensorflow.org/ [11] http://static.googleusercontent.com/media/research.google.com/en/us/archive/large_deep_networks_nips2012.pdf [12] https://www.quora.com/How-big-is-the-largest-feedforward-neural-network-ever-trained-and-what-for [13] http://static.googleusercontent.com/media/research.google.com/en//archive/unsupervised_icml2012.pdf [14] http://www.benstopford.com/2015/04/28/elements-of-scale-composing-and-scaling-data-platforms/ [15] http://malteschwarzkopf.de/research/assets/google-stack.pdf [16] http://malteschwarzkopf.de/research/assets/facebook-stack.pdf [17] https://twitter.com/tsantero/status/695013012525060097 [18] http://www.slideshare.net/LisaHua/spark-overview-37479609 [19] https://ogirardot.wordpress.com/2015/05/29/rdds-are-the-new-bytecode-of-apache-spark/ [20] https://kayousterhout.github.io/trace-analysis/ [21] https://github.com/muvr [22] https://databricks.com/blog/2016/01/25/deep-learning-with-spark-and-tensorflow.html
  • 63.
    Twitter: @cakesolutions Tel: 0845617 1200 Email: enquiries@cakesolutions.net