Distributed implementation of a LSTM on Spark and Tensorflow Emanuel Di Nardo Source code: https://github.com/EmanuelOverflow/LSTM-TensorSpark
Overview ● Introduction ● Apache Spark ● Tensorflow ● RNN-LSTM ● Implementation ● Results ● Conclusions
Introduction Distributed environment: ● Many computation units; ● Each unit is called ‘node’; ● Node collaboration/competition; ● Message passing; ● Synchronization and global state management;
Apache Spark ● Large-scale data processing framework; ● In-memory processing; ● General purpose: ○ MapReduce; ○ Batch and streaming processing; ○ Machine learning; ○ Graph theory; ○ Etc… ● Scalable; ● Open source;
Apache Spark ● Resilient Distributed Dataset (RDD): ○ Fault-tolerant collection of elements; ○ Transformation and actions; ○ Lazy computation; ● Spark core: ○ Tasks dispatching; ○ Scheduling; ○ I/O; ● Essentially: ○ A master driver organizes nodes and demands tasks to workers passing a RDD; ○ Worker executioner runs tasks and returns results in new RDD;
Apache Spark Streaming ● Streaming computation; ● Mini-batch strategy; ● Latency depends on mini-batch elaboration time/size; ● Easy to combine with batch strategy; ● Fault tolerance;
Apache Spark ● API for many languages: ○ Java; ○ Python; ○ Scala; ○ R; ● Runs on ○ Hadoop; ○ Mesos; ○ Standalone; ○ Vloud. ● It can access diverse data sources including: ○ HDFS; ○ Cassandra; ○ HBase;
Tensorflow ● Numerical computation library; ● Computation is graph-based: ○ Nodes are mathematical operations; ○ Edges are I/O multidimensional array (tensors); ● Distributed on multiple CPU/GPU; ● API: ○ Python; ○ C++; ● Open source; ● A Google product;
Tensorflow ● Data Flow Graph: ○ Oriented graph; ○ Nodes are mathematical operations or data I/O; ○ Edges are I/O tensors; ○ Operations are asynchronous and parallel: ■ Performed once all input tensors are available; ● Flexible and easily extendible; ● Auto-differentiation; ● Lazy computation;
RNN-LSTM ● Recurrent Neural Network; ● Cyclic networks: ○ At each training step the output of the previous step is used to feed the same layer with a different input data; ● Input Xt is transformed in the hidden layer A, the output is also used to feed itself; *Image from http://colah.github.io/posts/2015-08-Understanding-LSTMs/
RNN-LSTM ● Recurrent Neural Network; ● Cyclic networks: ○ At each training step the output of the previous step is used to feed the same layer with a different input data; ● Unrolled network: ○ Each input feed the network; ○ The output is passed to the next step as a supplementary input data; *Image from http://colah.github.io/posts/2015-08-Understanding-LSTMs/
RNN-LSTM ● This kind of network has a great problem...: ○ It is unable to learn long data sequence; ○ It works only with in short term; ● It is needed a ‘long memory’ model: ○ Long-short term memory; ● Hidden layer is able to memorize long data sequence using: ○ Current input; ○ Previous output; ○ Network memory state; *Image from http://colah.github.io/posts/2015-08-Understanding-LSTMs/
RNN-LSTM ● Hidden layer is able to memorize long data sequence using: ○ Current input; ○ Previous output; ○ Network memory state; ● Four ‘gate layers’ to preserve information: ○ Forget gate layer; ○ Input gate layer; ○ ‘Candidate’ gate layer; ○ Output gate layer; ● Multiple activation functions: ○ Sigmoid for the first three layers; ○ Tanh for the output layer; *Image from http://colah.github.io/posts/2015-08-Understanding-LSTMs/
Implementation ● RNN-LSTM: ○ Distributed on Spark; ○ Mathematical operations with Tensorflow; ● Distribution of mini-batch computation: ○ Each partition takes care of a subset of the whole dataset; ○ Each subset has the same size, it is not required in the mini-batch strategy, using proper techniques, but we want to test performances over all partitions with a balanced loading; ● Tensorflow provides many LSTM implementations, but it has been decided to implement a network from scratch for learning purpose;
Implementation ● A master driver splits the input data in partitions organized by key: ○ Input data is shuffled and normalized; ○ Each partition will have its own RDD; ● Each spark-worker runs an entire LSTM training cycle: ○ We will have a number of LSTM equal to number of partitions; ○ It is possible to choose number of epochs, number of hidden layers and number of partitions; ○ Memory to assign to each worker and many other parameters; ● At the end of training step the returning RDD will be mapped in a key-value data structure with weights and biases values; ● At the end, all elements in the RDDs are averaged to achieve the final result;
Implementation ● With tensorflow mathematical operations a new LSTM is created: ○ Operations are executed in a lazy manner; ○ Initialization builds and organizes the data graph; ● Weights and biases are initialized randomly; ● An optimizer is chosen and an OutputLayer is instantiate; ● For the lazy-strategy all operations must be placed in a ‘session’ window: ○ Session handles initialization ops and graph execution; ○ All variables must be initialized before any run; ● Taking advantages of python function passing, all computation layers are performed with a unique method: ○ Each time a different function and the right variables are used;
Implementation ● At the end minimization is performed: ○ Loss function is computed in the output layer; ○ Minimization uses tensorflow auto-differentiation; ● At the end data are organized in a key-value structure with weights and biases; ● It is also possible to perform data evaluation, but it is not a very time-consuming task, therefore it is not reported.
Results ● Tested locally in a multicore environment: ○ Distributed environment is not available; ○ Each partition is assigned to a core; ● No GPU usage; ● Iris dataset*; ● Overloaded CPUs vs Idle CPUs; ● 12 Core - 64GB RAM; * http://archive.ics.uci.edu/ml/datasets/Iris
Results ● 3 partitions: Partition T. exec(s) T. exec(m) 1 1385.62 ~23 2 1675.76 ~28 3 1692.48 ~28 Tot+weight average 1704.81 ~28 Tot+repartition 1704.81 ~28
Results ● 5 partitions: Partition T. exec(s) T. exec(m) 1 867.18 ~14 2 834.31 ~14 3 995.37 ~16 4 970.46 ~16 5 1015.47 ~17 Tot+weight average 1023.43 ~17 Tot+repartition 1023.43 ~17
Results ● 15 partitions: Part. T. exec(s) T. exec(m) Part. T. exec(s) T. exec(m) Part. T. exec(s) T. exec(m) 1 476.76 ~8 6 482.82 ~8 11 458.05 ~8 2 448..91 ~7 7 499.66 ~8 12 504.85 ~8 3 472.05 ~8 8 454.78 ~8 13 470.93 ~8 4 493.39 ~8 9 479.61 ~8 14 450.84 ~8 5 485.66 ~8 10 493.21 ~8 15 454.29 ~8 Tot+weight average 510.89 ~9 Tot+repartition 510.89 ~9
Results ● Comparison without distribution: System T. exec(s) T. exec(m) Speed up mb Speed up loc. dist-3 1704.81 ~28 96% 61% dist-5 1023.91 ~17 97% 76% dist-15 510.89 ~9 98% 88% local-opt 4080.94 ~68 89% 6% local 4335.66 ~72 88% - local-mb-10 34699.58 ~578 - - local: not distributed implementation local-opt: not distributed - optimized implementation local-mb-10: not distributed implementation with mini-batch each 10 elements (like dist-15 organization)
Results ● 3 partitions [overloaded vs idle]: Part. T. exec busy(s) T. exec busy(m) T. exec idle(s) T. exec idle(m) 1 2679.76 ~44 1385.62 ~23 2 2910.69 ~48 1675.76 ~28 3 3063.88 ~51 1692.48 ~28 Tot 3078.15 ~51 1704.81 ~28
Results ● 5 partitions [overloaded vs idle]: Part. T. exec busy(s) T. exec busy(m) T. exec idle(s) T. exec idle(m) 1 1356.44 ~22 867.18 ~14 2 1358.28 ~22 834.31 ~14 3 1373.25 ~22 995.37 ~16 4 1370.11 ~23 970.46 ~16 5 1372.25 ~23 1015.47 ~17 Tot 1393.91 ~23 1023.43 ~17

Distributed implementation of a lstm on spark and tensorflow

  • 1.
    Distributed implementation of aLSTM on Spark and Tensorflow Emanuel Di Nardo Source code: https://github.com/EmanuelOverflow/LSTM-TensorSpark
  • 2.
    Overview ● Introduction ● ApacheSpark ● Tensorflow ● RNN-LSTM ● Implementation ● Results ● Conclusions
  • 3.
    Introduction Distributed environment: ● Manycomputation units; ● Each unit is called ‘node’; ● Node collaboration/competition; ● Message passing; ● Synchronization and global state management;
  • 4.
    Apache Spark ● Large-scaledata processing framework; ● In-memory processing; ● General purpose: ○ MapReduce; ○ Batch and streaming processing; ○ Machine learning; ○ Graph theory; ○ Etc… ● Scalable; ● Open source;
  • 5.
    Apache Spark ● ResilientDistributed Dataset (RDD): ○ Fault-tolerant collection of elements; ○ Transformation and actions; ○ Lazy computation; ● Spark core: ○ Tasks dispatching; ○ Scheduling; ○ I/O; ● Essentially: ○ A master driver organizes nodes and demands tasks to workers passing a RDD; ○ Worker executioner runs tasks and returns results in new RDD;
  • 6.
    Apache Spark Streaming ●Streaming computation; ● Mini-batch strategy; ● Latency depends on mini-batch elaboration time/size; ● Easy to combine with batch strategy; ● Fault tolerance;
  • 7.
    Apache Spark ● APIfor many languages: ○ Java; ○ Python; ○ Scala; ○ R; ● Runs on ○ Hadoop; ○ Mesos; ○ Standalone; ○ Vloud. ● It can access diverse data sources including: ○ HDFS; ○ Cassandra; ○ HBase;
  • 8.
    Tensorflow ● Numerical computationlibrary; ● Computation is graph-based: ○ Nodes are mathematical operations; ○ Edges are I/O multidimensional array (tensors); ● Distributed on multiple CPU/GPU; ● API: ○ Python; ○ C++; ● Open source; ● A Google product;
  • 9.
    Tensorflow ● Data FlowGraph: ○ Oriented graph; ○ Nodes are mathematical operations or data I/O; ○ Edges are I/O tensors; ○ Operations are asynchronous and parallel: ■ Performed once all input tensors are available; ● Flexible and easily extendible; ● Auto-differentiation; ● Lazy computation;
  • 10.
    RNN-LSTM ● Recurrent NeuralNetwork; ● Cyclic networks: ○ At each training step the output of the previous step is used to feed the same layer with a different input data; ● Input Xt is transformed in the hidden layer A, the output is also used to feed itself; *Image from http://colah.github.io/posts/2015-08-Understanding-LSTMs/
  • 11.
    RNN-LSTM ● Recurrent NeuralNetwork; ● Cyclic networks: ○ At each training step the output of the previous step is used to feed the same layer with a different input data; ● Unrolled network: ○ Each input feed the network; ○ The output is passed to the next step as a supplementary input data; *Image from http://colah.github.io/posts/2015-08-Understanding-LSTMs/
  • 12.
    RNN-LSTM ● This kindof network has a great problem...: ○ It is unable to learn long data sequence; ○ It works only with in short term; ● It is needed a ‘long memory’ model: ○ Long-short term memory; ● Hidden layer is able to memorize long data sequence using: ○ Current input; ○ Previous output; ○ Network memory state; *Image from http://colah.github.io/posts/2015-08-Understanding-LSTMs/
  • 13.
    RNN-LSTM ● Hidden layeris able to memorize long data sequence using: ○ Current input; ○ Previous output; ○ Network memory state; ● Four ‘gate layers’ to preserve information: ○ Forget gate layer; ○ Input gate layer; ○ ‘Candidate’ gate layer; ○ Output gate layer; ● Multiple activation functions: ○ Sigmoid for the first three layers; ○ Tanh for the output layer; *Image from http://colah.github.io/posts/2015-08-Understanding-LSTMs/
  • 14.
    Implementation ● RNN-LSTM: ○ Distributedon Spark; ○ Mathematical operations with Tensorflow; ● Distribution of mini-batch computation: ○ Each partition takes care of a subset of the whole dataset; ○ Each subset has the same size, it is not required in the mini-batch strategy, using proper techniques, but we want to test performances over all partitions with a balanced loading; ● Tensorflow provides many LSTM implementations, but it has been decided to implement a network from scratch for learning purpose;
  • 15.
    Implementation ● A masterdriver splits the input data in partitions organized by key: ○ Input data is shuffled and normalized; ○ Each partition will have its own RDD; ● Each spark-worker runs an entire LSTM training cycle: ○ We will have a number of LSTM equal to number of partitions; ○ It is possible to choose number of epochs, number of hidden layers and number of partitions; ○ Memory to assign to each worker and many other parameters; ● At the end of training step the returning RDD will be mapped in a key-value data structure with weights and biases values; ● At the end, all elements in the RDDs are averaged to achieve the final result;
  • 16.
    Implementation ● With tensorflowmathematical operations a new LSTM is created: ○ Operations are executed in a lazy manner; ○ Initialization builds and organizes the data graph; ● Weights and biases are initialized randomly; ● An optimizer is chosen and an OutputLayer is instantiate; ● For the lazy-strategy all operations must be placed in a ‘session’ window: ○ Session handles initialization ops and graph execution; ○ All variables must be initialized before any run; ● Taking advantages of python function passing, all computation layers are performed with a unique method: ○ Each time a different function and the right variables are used;
  • 17.
    Implementation ● At theend minimization is performed: ○ Loss function is computed in the output layer; ○ Minimization uses tensorflow auto-differentiation; ● At the end data are organized in a key-value structure with weights and biases; ● It is also possible to perform data evaluation, but it is not a very time-consuming task, therefore it is not reported.
  • 18.
    Results ● Tested locallyin a multicore environment: ○ Distributed environment is not available; ○ Each partition is assigned to a core; ● No GPU usage; ● Iris dataset*; ● Overloaded CPUs vs Idle CPUs; ● 12 Core - 64GB RAM; * http://archive.ics.uci.edu/ml/datasets/Iris
  • 19.
    Results ● 3 partitions: PartitionT. exec(s) T. exec(m) 1 1385.62 ~23 2 1675.76 ~28 3 1692.48 ~28 Tot+weight average 1704.81 ~28 Tot+repartition 1704.81 ~28
  • 20.
    Results ● 5 partitions: PartitionT. exec(s) T. exec(m) 1 867.18 ~14 2 834.31 ~14 3 995.37 ~16 4 970.46 ~16 5 1015.47 ~17 Tot+weight average 1023.43 ~17 Tot+repartition 1023.43 ~17
  • 21.
    Results ● 15 partitions: Part.T. exec(s) T. exec(m) Part. T. exec(s) T. exec(m) Part. T. exec(s) T. exec(m) 1 476.76 ~8 6 482.82 ~8 11 458.05 ~8 2 448..91 ~7 7 499.66 ~8 12 504.85 ~8 3 472.05 ~8 8 454.78 ~8 13 470.93 ~8 4 493.39 ~8 9 479.61 ~8 14 450.84 ~8 5 485.66 ~8 10 493.21 ~8 15 454.29 ~8 Tot+weight average 510.89 ~9 Tot+repartition 510.89 ~9
  • 22.
    Results ● Comparison withoutdistribution: System T. exec(s) T. exec(m) Speed up mb Speed up loc. dist-3 1704.81 ~28 96% 61% dist-5 1023.91 ~17 97% 76% dist-15 510.89 ~9 98% 88% local-opt 4080.94 ~68 89% 6% local 4335.66 ~72 88% - local-mb-10 34699.58 ~578 - - local: not distributed implementation local-opt: not distributed - optimized implementation local-mb-10: not distributed implementation with mini-batch each 10 elements (like dist-15 organization)
  • 23.
    Results ● 3 partitions[overloaded vs idle]: Part. T. exec busy(s) T. exec busy(m) T. exec idle(s) T. exec idle(m) 1 2679.76 ~44 1385.62 ~23 2 2910.69 ~48 1675.76 ~28 3 3063.88 ~51 1692.48 ~28 Tot 3078.15 ~51 1704.81 ~28
  • 24.
    Results ● 5 partitions[overloaded vs idle]: Part. T. exec busy(s) T. exec busy(m) T. exec idle(s) T. exec idle(m) 1 1356.44 ~22 867.18 ~14 2 1358.28 ~22 834.31 ~14 3 1373.25 ~22 995.37 ~16 4 1370.11 ~23 970.46 ~16 5 1372.25 ~23 1015.47 ~17 Tot 1393.91 ~23 1023.43 ~17