Mark Hamilton, Microsoft, marhamil@microsoft.com Apache Spark Serving: Unifying Batch, Streaming, and RESTful Serving #UnifiedAnalytics #SparkAISummit
Overview • Spark Serving 101 – Basic Usage – Anatomy of a Query – Performance • Architecture – Fault Tolerance – Serving Shuffles – Replying from within computations • HTTP on Spark • Deployment – Kubernetes, Azure Machine Learning 2#UnifiedAnalytics #SparkAISummit
Motivation • RESTful model deployment makes it easy to integrate ML into other systems • Current solutions rely on exporting Spark pipelines or using the high latency batch API • Spark’s philosophy is to unify computing with a single easy to use API 3#UnifiedAnalytics #SparkAISummit spark.read.parquet.load(…) .select(…) spark.readStream.kafka.load(…) .select(…) Batch API: Streaming API: Serving API: ???
• Sub-millisecond latencies • Fully Distributed • Spins up in seconds • Same API as Batch and Streaming • Scala, Python, R and Java • Fully Open Source 4#UnifiedAnalytics #SparkAISummit Lightning Fast Web Services on Any Spark Cluster www.aka.ms/spark Serving val server = spark .readStream .server(“0.0.0.0", 80, "api") .option(“name”, “my_api”) .load() .parseRequest(schema) .mlTransform(model) .makeReply("scores") .writeStream .server() .option(“name”, “my_api”) .start()
Deploying a Deep Network • Demo/Code walkthrough 5#UnifiedAnalytics #SparkAISummit
Anatomy 6#UnifiedAnalytics #SparkAISummit val server = spark .readStream .server(“0.0.0.0", 80, "api") .option(“name”, “my_api”) .load() .parseRequest(schema) .mlTransform(model) .makeReply("scores") .writeStream .server() .option(“name”, “my_api”) .start() 1) Read a streaming data source 2) Use the “server” source with host, port, and API path 3) Load the dataframe 4) Parse the incoming request body to a target schema 5) Transform the dataframe with a sparkML model (same as model.transform) 6) Pack the target column into the body of a web response
Performance • PMML, ONNX, and MLeap require writing exporters for each model in SparkML • Clipper and AML leverage the Batch API which incurs a steep 500ms overhead but does not require additional code 7#UnifiedAnalytics #SparkAISummit df.writeStream .server() .option(“name”, “my_api”) .trigger(continuous=“20 seconds”) .start()
Architecture Basics 8#UnifiedAnalytics #SparkAISummit Spark Worker Partition Partition Partition Server Spark Worker Partition Partition Partition Server Spark Master Users / Apps Load Balancer HTTP Requests and Responses
Architecture Details: Microbatch Fault Tolerance 9#UnifiedAnalytics #SparkAISummit Spark Worker Server Request Queue Epoch 1 History Store Partition 1 Partition 2 Partition 3 (Epoch, Partition) → List[Request]LinkedBlockingQueue[Request] 1. Handler adds request to queue
Architecture Details: Microbatch Fault Tolerance 10#UnifiedAnalytics #SparkAISummit Spark Worker Server Request Queue Epoch 1 History Store Partition 1 Partition 2 Partition 3 (Epoch, Partition) → List[Request]LinkedBlockingQueue[Request] 1. Handler adds request to queue 2. Partitions pull request
Architecture Details: Microbatch Fault Tolerance 11#UnifiedAnalytics #SparkAISummit Spark Worker Server Request Queue Epoch 1 History Store Partition 1 Partition 2 Partition 3 (Epoch, Partition) → List[Request]LinkedBlockingQueue[Request] 1. Handler adds request to queue 2. Partitions pull request 3. Partitions add to history store
Spark Worker Architecture Details: Microbatch Fault Tolerance 12#UnifiedAnalytics #SparkAISummit Server Request Queue Epoch 1 History Store Partition 1 Partition 2 Partition 3 (Epoch, Partition) → List[Request]LinkedBlockingQueue[Request]
Spark Worker Architecture Details: Microbatch Fault Tolerance 13#UnifiedAnalytics #SparkAISummit Server Request Queue Epoch 1 History Store Partition 1 Partition 2 Partition 3 (Epoch, Partition) → List[Request]LinkedBlockingQueue[Request]
Spark Worker Architecture Details: Microbatch Fault Tolerance 14#UnifiedAnalytics #SparkAISummit Server Request Queue Epoch 1 History Store Partition 1 Partition 2, Retry 1 Partition 3 (Epoch, Partition) → List[Request]LinkedBlockingQueue[Request] 4. Retry partition pulls from history store
Architecture Details: Microbatch Fault Tolerance 15#UnifiedAnalytics #SparkAISummit Spark Worker Server Request Queue Epoch 1 History Store Partition 1 Partition 2 Partition 3 (Epoch, Partition) → List[Request]LinkedBlockingQueue[Request] 1. Handler adds request to queue Request Queue Epoch 2
Reply From within Pipelines 16#UnifiedAnalytics #SparkAISummit df.withColumn(“sentReplies”, when(col(“condition”), ServingUDFs.sendReplyUDF(apiName, col(“replies”), col(“id”)) ).otherwise( lit(null) ) .filter(col(“sentReplies").isNull) .makeReply("value")
Worker Node 1 Architecture Details: Serving Shuffles 17#UnifiedAnalytics #SparkAISummit Driver Node Web Server 1 Partition 1 Partition 2 Worker Node N Partition M-1 Partition M Routing Service Serving Query Object Serving Monitor Load Balancer Service status info sent outside of hotpath Request Time Web Server N Routing Service Worker Node 1 Driver Node Web Server 1 Partition 1 Partition 2 Worker Node N Partition M-1 Partition M Routing Service Serving Query Object Serving Monitor Load Balancer Response Time Web Server N Routing Service Inter-machine routing in case of shuffles Function Dispatch if request is local
• Full Integration between HTTP Protocol and Spark SQL • Spark as a Microservice Orchestrator • Spark + X 18#UnifiedAnalytics #SparkAISummit df = SimpleHTTPTransformer() .setInputParser(JSONInputParser()) .setOutputParser(JSONOutputParser() .setDataType(schema)) .setOutputCol("results") .setUrl(…) on
19#UnifiedAnalytics #SparkAISummit on Spark Worker Partition Partition Partition Client Client Client Web Service Spark Worker Partition Partition Partition Client Client Client Local Service Local Service Local Service HTTP Requests and Responses
Spark as a Microservice Orchestrator • Can use Spark as a distributed web server and client • Can incorporate web services into SparkML pipelines, then deploy these composite models as services • Can compose Spark with other ecosystems via orchestration frameworks 20#UnifiedAnalytics #SparkAISummit Web Service 1 Web Service 2 Spark Worker Web Service 3 Spark Serving HTTP on Spark
Azure Kubernetes Service + Helm • Works on any k8s cluster • Helm: Package Manager for Kubernetes 21#UnifiedAnalytics #SparkAISummit Kubernetes (AKS, ACS, GKE, On-Prem etc) K8s workerK8s worker Spark Worker Spark Worker K8s worker Cognitive Service Container HTTP on Spark Spark Worker Cognitive Service Container HTTP on Spark Spark Worker Cognitive Service Container HTTP on Spark Spark Serving Load Balancer Jupyter, Zepplin, LIVY, or Spark Submit LB Zepplin Jupyter Storage or other Databases Cloud Cognitive Services Spark Serving Hotpath HTTP on Spark Spark Readers REST Requests to Deployed Models Submit Jobs, Run Notebooks, Manage Cluster, etc Users / Apps helm repo add mmlspark https://dbanda.github.io/charts helm install mmlspark/spark --set localTextApi=true Dalitso Banda, dbanda@microsoft.com Microsoft AI Development Acceleration Program
Deployment: Azure ML • Can use Spark Serving to improve latency of Azure ML Services • Just modify AML scoring script 22#UnifiedAnalytics #SparkAISummit AML Container AML Flask Public Server Spark Serving Local Server Request Passthrough Incoming Request
23#UnifiedAnalytics #SparkAISummit Microsoft Machine Learning for Apache Spark v0.16 Microsoft’s Open Source Contributions to Apache Spark www.aka.ms/spark Azure/mmlspark Cognitive Services Spark Serving Model Interpretability LightGBM Gradient Boosting Deep Networks with CNTK HTTP on Spark
Conclusions • Spark Serving: idiomatic way to deploy any Spark streaming computation as a web service • Millisecond latencies • Get started now with interactive examples! • The Azure Cognitive Services on Spark: Clusters with Embedded Intelligent Services – 3:30pm Room 2009 24#UnifiedAnalytics #SparkAISummit www.aka.ms/spark Contact: marhamil@microsoft.com mmlspark-support@microsoft.com Azure/mmlspark Help us advance Spark:
Thanks To • Sudarshan Raghunathan, Anand Raman, Pablo Castro • Ilya Matiach • Andrew Schonhoffer • Microsoft Development Acceleration Team: – Dalitso Banda, Casey Hong, Karthik Rajendran, Manon Knoertzer, Tayo Amuneke, Alejandro Buendia • Daniel Ciborowski, Markus Cosowicz, Scott Graham, Jeremy Reynolds, Miguel Fierro, Tao Wu • Azure CAT Team + AzureML Team 25#UnifiedAnalytics #SparkAISummit

03 2014 Apache Spark Serving: Unifying Batch, Streaming, and RESTful Serving

  • 1.
    Mark Hamilton, Microsoft,marhamil@microsoft.com Apache Spark Serving: Unifying Batch, Streaming, and RESTful Serving #UnifiedAnalytics #SparkAISummit
  • 2.
    Overview • Spark Serving101 – Basic Usage – Anatomy of a Query – Performance • Architecture – Fault Tolerance – Serving Shuffles – Replying from within computations • HTTP on Spark • Deployment – Kubernetes, Azure Machine Learning 2#UnifiedAnalytics #SparkAISummit
  • 3.
    Motivation • RESTful modeldeployment makes it easy to integrate ML into other systems • Current solutions rely on exporting Spark pipelines or using the high latency batch API • Spark’s philosophy is to unify computing with a single easy to use API 3#UnifiedAnalytics #SparkAISummit spark.read.parquet.load(…) .select(…) spark.readStream.kafka.load(…) .select(…) Batch API: Streaming API: Serving API: ???
  • 4.
    • Sub-millisecond latencies • FullyDistributed • Spins up in seconds • Same API as Batch and Streaming • Scala, Python, R and Java • Fully Open Source 4#UnifiedAnalytics #SparkAISummit Lightning Fast Web Services on Any Spark Cluster www.aka.ms/spark Serving val server = spark .readStream .server(“0.0.0.0", 80, "api") .option(“name”, “my_api”) .load() .parseRequest(schema) .mlTransform(model) .makeReply("scores") .writeStream .server() .option(“name”, “my_api”) .start()
  • 5.
    Deploying a DeepNetwork • Demo/Code walkthrough 5#UnifiedAnalytics #SparkAISummit
  • 6.
    Anatomy 6#UnifiedAnalytics #SparkAISummit val server= spark .readStream .server(“0.0.0.0", 80, "api") .option(“name”, “my_api”) .load() .parseRequest(schema) .mlTransform(model) .makeReply("scores") .writeStream .server() .option(“name”, “my_api”) .start() 1) Read a streaming data source 2) Use the “server” source with host, port, and API path 3) Load the dataframe 4) Parse the incoming request body to a target schema 5) Transform the dataframe with a sparkML model (same as model.transform) 6) Pack the target column into the body of a web response
  • 7.
    Performance • PMML, ONNX,and MLeap require writing exporters for each model in SparkML • Clipper and AML leverage the Batch API which incurs a steep 500ms overhead but does not require additional code 7#UnifiedAnalytics #SparkAISummit df.writeStream .server() .option(“name”, “my_api”) .trigger(continuous=“20 seconds”) .start()
  • 8.
    Architecture Basics 8#UnifiedAnalytics #SparkAISummit SparkWorker Partition Partition Partition Server Spark Worker Partition Partition Partition Server Spark Master Users / Apps Load Balancer HTTP Requests and Responses
  • 9.
    Architecture Details: Microbatch FaultTolerance 9#UnifiedAnalytics #SparkAISummit Spark Worker Server Request Queue Epoch 1 History Store Partition 1 Partition 2 Partition 3 (Epoch, Partition) → List[Request]LinkedBlockingQueue[Request] 1. Handler adds request to queue
  • 10.
    Architecture Details: Microbatch FaultTolerance 10#UnifiedAnalytics #SparkAISummit Spark Worker Server Request Queue Epoch 1 History Store Partition 1 Partition 2 Partition 3 (Epoch, Partition) → List[Request]LinkedBlockingQueue[Request] 1. Handler adds request to queue 2. Partitions pull request
  • 11.
    Architecture Details: Microbatch FaultTolerance 11#UnifiedAnalytics #SparkAISummit Spark Worker Server Request Queue Epoch 1 History Store Partition 1 Partition 2 Partition 3 (Epoch, Partition) → List[Request]LinkedBlockingQueue[Request] 1. Handler adds request to queue 2. Partitions pull request 3. Partitions add to history store
  • 12.
    Spark Worker Architecture Details:Microbatch Fault Tolerance 12#UnifiedAnalytics #SparkAISummit Server Request Queue Epoch 1 History Store Partition 1 Partition 2 Partition 3 (Epoch, Partition) → List[Request]LinkedBlockingQueue[Request]
  • 13.
    Spark Worker Architecture Details:Microbatch Fault Tolerance 13#UnifiedAnalytics #SparkAISummit Server Request Queue Epoch 1 History Store Partition 1 Partition 2 Partition 3 (Epoch, Partition) → List[Request]LinkedBlockingQueue[Request]
  • 14.
    Spark Worker Architecture Details:Microbatch Fault Tolerance 14#UnifiedAnalytics #SparkAISummit Server Request Queue Epoch 1 History Store Partition 1 Partition 2, Retry 1 Partition 3 (Epoch, Partition) → List[Request]LinkedBlockingQueue[Request] 4. Retry partition pulls from history store
  • 15.
    Architecture Details: Microbatch FaultTolerance 15#UnifiedAnalytics #SparkAISummit Spark Worker Server Request Queue Epoch 1 History Store Partition 1 Partition 2 Partition 3 (Epoch, Partition) → List[Request]LinkedBlockingQueue[Request] 1. Handler adds request to queue Request Queue Epoch 2
  • 16.
    Reply From withinPipelines 16#UnifiedAnalytics #SparkAISummit df.withColumn(“sentReplies”, when(col(“condition”), ServingUDFs.sendReplyUDF(apiName, col(“replies”), col(“id”)) ).otherwise( lit(null) ) .filter(col(“sentReplies").isNull) .makeReply("value")
  • 17.
    Worker Node 1 ArchitectureDetails: Serving Shuffles 17#UnifiedAnalytics #SparkAISummit Driver Node Web Server 1 Partition 1 Partition 2 Worker Node N Partition M-1 Partition M Routing Service Serving Query Object Serving Monitor Load Balancer Service status info sent outside of hotpath Request Time Web Server N Routing Service Worker Node 1 Driver Node Web Server 1 Partition 1 Partition 2 Worker Node N Partition M-1 Partition M Routing Service Serving Query Object Serving Monitor Load Balancer Response Time Web Server N Routing Service Inter-machine routing in case of shuffles Function Dispatch if request is local
  • 18.
    • Full Integrationbetween HTTP Protocol and Spark SQL • Spark as a Microservice Orchestrator • Spark + X 18#UnifiedAnalytics #SparkAISummit df = SimpleHTTPTransformer() .setInputParser(JSONInputParser()) .setOutputParser(JSONOutputParser() .setDataType(schema)) .setOutputCol("results") .setUrl(…) on
  • 19.
    19#UnifiedAnalytics #SparkAISummit on Spark Worker PartitionPartition Partition Client Client Client Web Service Spark Worker Partition Partition Partition Client Client Client Local Service Local Service Local Service HTTP Requests and Responses
  • 20.
    Spark as aMicroservice Orchestrator • Can use Spark as a distributed web server and client • Can incorporate web services into SparkML pipelines, then deploy these composite models as services • Can compose Spark with other ecosystems via orchestration frameworks 20#UnifiedAnalytics #SparkAISummit Web Service 1 Web Service 2 Spark Worker Web Service 3 Spark Serving HTTP on Spark
  • 21.
    Azure Kubernetes Service+ Helm • Works on any k8s cluster • Helm: Package Manager for Kubernetes 21#UnifiedAnalytics #SparkAISummit Kubernetes (AKS, ACS, GKE, On-Prem etc) K8s workerK8s worker Spark Worker Spark Worker K8s worker Cognitive Service Container HTTP on Spark Spark Worker Cognitive Service Container HTTP on Spark Spark Worker Cognitive Service Container HTTP on Spark Spark Serving Load Balancer Jupyter, Zepplin, LIVY, or Spark Submit LB Zepplin Jupyter Storage or other Databases Cloud Cognitive Services Spark Serving Hotpath HTTP on Spark Spark Readers REST Requests to Deployed Models Submit Jobs, Run Notebooks, Manage Cluster, etc Users / Apps helm repo add mmlspark https://dbanda.github.io/charts helm install mmlspark/spark --set localTextApi=true Dalitso Banda, dbanda@microsoft.com Microsoft AI Development Acceleration Program
  • 22.
    Deployment: Azure ML •Can use Spark Serving to improve latency of Azure ML Services • Just modify AML scoring script 22#UnifiedAnalytics #SparkAISummit AML Container AML Flask Public Server Spark Serving Local Server Request Passthrough Incoming Request
  • 23.
    23#UnifiedAnalytics #SparkAISummit Microsoft MachineLearning for Apache Spark v0.16 Microsoft’s Open Source Contributions to Apache Spark www.aka.ms/spark Azure/mmlspark Cognitive Services Spark Serving Model Interpretability LightGBM Gradient Boosting Deep Networks with CNTK HTTP on Spark
  • 24.
    Conclusions • Spark Serving:idiomatic way to deploy any Spark streaming computation as a web service • Millisecond latencies • Get started now with interactive examples! • The Azure Cognitive Services on Spark: Clusters with Embedded Intelligent Services – 3:30pm Room 2009 24#UnifiedAnalytics #SparkAISummit www.aka.ms/spark Contact: marhamil@microsoft.com mmlspark-support@microsoft.com Azure/mmlspark Help us advance Spark:
  • 25.
    Thanks To • SudarshanRaghunathan, Anand Raman, Pablo Castro • Ilya Matiach • Andrew Schonhoffer • Microsoft Development Acceleration Team: – Dalitso Banda, Casey Hong, Karthik Rajendran, Manon Knoertzer, Tayo Amuneke, Alejandro Buendia • Daniel Ciborowski, Markus Cosowicz, Scott Graham, Jeremy Reynolds, Miguel Fierro, Tao Wu • Azure CAT Team + AzureML Team 25#UnifiedAnalytics #SparkAISummit