Build Large-Scale Data Analytics and AI Pipeline Using RayDP Carson Wang, Xianyang Liu Intel
Agenda ▪ Carson Wang ▪ Big Data & AI Background ▪ Ray & RayDP Overview ▪ Xianyang Liu ▪ RayDP Deep Dive ▪ RayDP Examples
Big Data & AI Background
Big Data & AI HorovodOnSpark TensorflowOnSpark BigDL, Analytic-Zoo CaffeOnSpark Petastorm XGBoostOnSpark • Massive data is critical for better AI. • Distributed training will be a norm. • Many community efforts to integrate big data with AI. spark-tensorflow distributor spark-tensorflow connector
Separate Spark and AI Cluster Challenges: • Data movement between clusters. • Overhead of managing two clusters. • Segmented application and glue code. Spark Cluster Data Preprocessing ML/DL Cluster Model Training ML/DL Storage
Running ML/DL Frameworks on Spark Challenges: • Specific to Spark and requires ML/DL frameworks supported on Spark. • Data exchange between frameworks relies on distributed filesystems like HDFS or S3. Spark Cluster Data Preprocessing Model Training
Running on Kubernetes Kubernetes Cluster Challenges: • The pipeline must be written in multiple programs and configuration files (v.s. a single python program). • Data exchange between frameworks relies on distributed filesystems like HDFS or S3. Data Preprocessing Model Training
Ray & RayDP Overview
What is Ray? A general-purpose framework that provides a simple, universal API for building distributed applications.
What is RayDP? RayDP provides simple APIs for running Spark on Ray and integrating Spark with distributed ML/DL frameworks. RayDP PyTorch/Tensorflow Estimator Ray MLDataset Converter Spark on Ray Ray Libraries
Build End-to-End Pipeline using RayDP and Ray Data Preprocessing Model Training/Tuning Model Serving End-to-End Integrated Python Program Object Store
Scale From Laptop To Cloud/Kubernetes Seamlessly Your Python Program Written by Ray, RayDP, pySpark, Tensorflow, PyTorch, etc APIs Develop Locally Scale to Cloud/Kubernetes Ray Cluster Launcher
Why RayDP? • Increased Productivity • Simplify how to build and manage end to end pipeline. Write Spark, Xgboost, Tensorflow, PyTorch, Horovod code in a single python program. • Better Performance • In-memory data exchange. • Built-in Spark optimization. • Increased Resource Utilization • Auto scaling at the cluster level and the application level.
RayDP Deep Dive
Spark on Ray API import ray import raydp ray.init(address='auto') spark = raydp.init_spark(app_name='Spark on Ray', num_executors=2, executor_cores=2, executor_memory='1G’, configs=None) df = spark.read.parquet(…) raydp.stop_spark() # The Spark application name # The number of executors # CPU cores for each executor # Memory size for each executor # Extra configs should be set for the application # Stop the spark cluster
Spark on Ray Architecture AppMaster (Java Actor) ObjectStore Raylet GCS GCS GCS Web UI Debugging Tools Profiling Tools Spark Executor (Java Actor) Worker ObjectStore Raylet Worker ObjectStore Raylet Spark Executor (Java Actor) Driver Spark Driver 1 2 3 3 2 • One Ray actor for Spark AppMaster to start/stop Spark executors. • All Spark executors are in Ray Java actors. • Leverage object store for data exchanging between Spark and other Ray libraries.
PyTorch/Tensorflow Estimator estimator = TorchEstimator(num_workers=2, model=your_model, optimizer=optimizer, loss=criterion, feature_columns=features, label_column="fare_amount", batch_size=64, num_epochs=30) estimator.fit_on_spark(train_df, test_df) # The number of workers for distributed model training # The model for training # The optimizer instance # The loss function # Feature columns of the Spark DataFrame # Label column of the Spark DataFrame # Training batch size # The total epoches # Fit on Spark DataFrame directly
Ray MLDataset Converter from raydp.spark import RayMLDataset spark_df = … torch_ds = RayMLDataset .from_spark(spark_df, …) .transform(func) .to_torch(…) torch_dataloader = DataLoader(torch_ds.get_shard(shard_i ndex), …) Operations Execution Planning Object Store Ray Scheduler Spark Actor MLDataset Actor Object 1 MLDataset Shard Spark Dataframe PyTorch Actor PyTorch Dataset 1. from_spark 2. transform + to_torch Object 2 Object 3 Spark Dataframe ML Dataset ML Dataset ML Dataset from_sparkt transform to_torch Build Operation Graph • Create from Spark dataframe, In-memory objects, etc • Transform using user defined functions • Convert to PyTorch/Tensorflow Dataset Transformations are lazy, executed in pipeline Phase 1 Phase 2
RayDP Examples
Spark + XGBoost on Ray import ray import raydp ray.init(address='auto') spark = raydp.init_spark('Spark + XGBoost', num_executors=2, executor_cores=4, executor_memory='8G') df = spark.read.csv(...) ... train_df, test_df = random_split(df, [0.9, 0.1]) train_dataset = RayMLDataset.from_spark(train_df, ...) test_dataset = RayMLDataset.from_spark(test_df, ...) from xgboost_ray import RayDMatrix, train, RayParams dtrain = RayDMatrix(train_dataset, label='fare_amount') dtest = RayDMatrix(test_dataset, label='fare_amount’) … bst = train( config, dtrain, evals=[(dtest, "eval")], evals_result=evals_result, ray_params=RayParams(…) num_boost_round=10) Data Preprocessing Model Training End-to-End Integrated Python Program RayD P RayD P
Spark + Horovod on Ray import ray import raydp ray.init(address='auto') spark = raydp.init_spark('Spark + Horovod', num_executors=2, executor_cores=4, executor_memory=‘8G’) df = spark.read.csv(...) ... torch_ds= RayMLDataset.from_spark(df, …) .to_torch(...) #PyTorch Model class My_Model(nn.Module): … #Horovod on Ray def train_fn(dataset, num_features): hvd.init() rank = hvd.rank() train_data = dataset.get_shard(rank) ... from horovod.ray import RayExecutor executor = RayExecutor(settings, num_hosts=1, num_slots=1, cpus_per_slot=1) executor.start() executor.run(train_fn, args=[torch_ds, num_features]) Data Preprocessing Model Training End-to-End Integrated Python Program RayD P RayD P
Spark + Horovod + RayTune on Ray import ray import raydp ray.init(address='auto') spark = raydp.init_spark(‘Spark + Horovod', num_executors=2, executor_cores=4, executor_memory=‘8G’) df = spark.read.csv(...) ... torch_ds= RayMLDataset.from_spark(df, …) .to_torch(...) #PyTorch Model class My_Model(nn.Module): … #Horovod on Ray + Ray Tune def train_fn(config: Dict): ... trainable = DistributedTrainableCreator( train_fn, num_slots=2, use_gpu=use_gpu) analysis = tune.run( trainable, num_samples=2, config={ "epochs": tune.grid_search([1, 2, 3]), "lr": tune.grid_search([0.1, 0.2, 0.3]), } ) print(analysis.best_config) Data Preprocessing Model Training/Tuning End-to-End Integrated Python Program RayD P RayD P
Summary • Ray is a general-purpose framework that can be used as a single substrate for end-to-end data analytics and AI pipelines. • RayDP provides simple APIs for running Spark on Ray and integrating Spark with distributed ML/DL frameworks. • For more information, please visit https://github.com/oap-project/raydp 
Feedback Your feedback is important to us. Don’t forget to rate and review the sessions.

Build Large-Scale Data Analytics and AI Pipeline Using RayDP

  • 1.
    Build Large-Scale Data Analyticsand AI Pipeline Using RayDP Carson Wang, Xianyang Liu Intel
  • 2.
    Agenda ▪ Carson Wang ▪Big Data & AI Background ▪ Ray & RayDP Overview ▪ Xianyang Liu ▪ RayDP Deep Dive ▪ RayDP Examples
  • 3.
    Big Data &AI Background
  • 4.
    Big Data &AI HorovodOnSpark TensorflowOnSpark BigDL, Analytic-Zoo CaffeOnSpark Petastorm XGBoostOnSpark • Massive data is critical for better AI. • Distributed training will be a norm. • Many community efforts to integrate big data with AI. spark-tensorflow distributor spark-tensorflow connector
  • 5.
    Separate Spark andAI Cluster Challenges: • Data movement between clusters. • Overhead of managing two clusters. • Segmented application and glue code. Spark Cluster Data Preprocessing ML/DL Cluster Model Training ML/DL Storage
  • 6.
    Running ML/DL Frameworkson Spark Challenges: • Specific to Spark and requires ML/DL frameworks supported on Spark. • Data exchange between frameworks relies on distributed filesystems like HDFS or S3. Spark Cluster Data Preprocessing Model Training
  • 7.
    Running on Kubernetes KubernetesCluster Challenges: • The pipeline must be written in multiple programs and configuration files (v.s. a single python program). • Data exchange between frameworks relies on distributed filesystems like HDFS or S3. Data Preprocessing Model Training
  • 8.
    Ray & RayDPOverview
  • 9.
    What is Ray? Ageneral-purpose framework that provides a simple, universal API for building distributed applications.
  • 10.
    What is RayDP? RayDPprovides simple APIs for running Spark on Ray and integrating Spark with distributed ML/DL frameworks. RayDP PyTorch/Tensorflow Estimator Ray MLDataset Converter Spark on Ray Ray Libraries
  • 11.
    Build End-to-End Pipelineusing RayDP and Ray Data Preprocessing Model Training/Tuning Model Serving End-to-End Integrated Python Program Object Store
  • 12.
    Scale From LaptopTo Cloud/Kubernetes Seamlessly Your Python Program Written by Ray, RayDP, pySpark, Tensorflow, PyTorch, etc APIs Develop Locally Scale to Cloud/Kubernetes Ray Cluster Launcher
  • 13.
    Why RayDP? • IncreasedProductivity • Simplify how to build and manage end to end pipeline. Write Spark, Xgboost, Tensorflow, PyTorch, Horovod code in a single python program. • Better Performance • In-memory data exchange. • Built-in Spark optimization. • Increased Resource Utilization • Auto scaling at the cluster level and the application level.
  • 14.
  • 15.
    Spark on RayAPI import ray import raydp ray.init(address='auto') spark = raydp.init_spark(app_name='Spark on Ray', num_executors=2, executor_cores=2, executor_memory='1G’, configs=None) df = spark.read.parquet(…) raydp.stop_spark() # The Spark application name # The number of executors # CPU cores for each executor # Memory size for each executor # Extra configs should be set for the application # Stop the spark cluster
  • 16.
    Spark on RayArchitecture AppMaster (Java Actor) ObjectStore Raylet GCS GCS GCS Web UI Debugging Tools Profiling Tools Spark Executor (Java Actor) Worker ObjectStore Raylet Worker ObjectStore Raylet Spark Executor (Java Actor) Driver Spark Driver 1 2 3 3 2 • One Ray actor for Spark AppMaster to start/stop Spark executors. • All Spark executors are in Ray Java actors. • Leverage object store for data exchanging between Spark and other Ray libraries.
  • 17.
    PyTorch/Tensorflow Estimator estimator =TorchEstimator(num_workers=2, model=your_model, optimizer=optimizer, loss=criterion, feature_columns=features, label_column="fare_amount", batch_size=64, num_epochs=30) estimator.fit_on_spark(train_df, test_df) # The number of workers for distributed model training # The model for training # The optimizer instance # The loss function # Feature columns of the Spark DataFrame # Label column of the Spark DataFrame # Training batch size # The total epoches # Fit on Spark DataFrame directly
  • 18.
    Ray MLDataset Converter fromraydp.spark import RayMLDataset spark_df = … torch_ds = RayMLDataset .from_spark(spark_df, …) .transform(func) .to_torch(…) torch_dataloader = DataLoader(torch_ds.get_shard(shard_i ndex), …) Operations Execution Planning Object Store Ray Scheduler Spark Actor MLDataset Actor Object 1 MLDataset Shard Spark Dataframe PyTorch Actor PyTorch Dataset 1. from_spark 2. transform + to_torch Object 2 Object 3 Spark Dataframe ML Dataset ML Dataset ML Dataset from_sparkt transform to_torch Build Operation Graph • Create from Spark dataframe, In-memory objects, etc • Transform using user defined functions • Convert to PyTorch/Tensorflow Dataset Transformations are lazy, executed in pipeline Phase 1 Phase 2
  • 19.
  • 20.
    Spark + XGBooston Ray import ray import raydp ray.init(address='auto') spark = raydp.init_spark('Spark + XGBoost', num_executors=2, executor_cores=4, executor_memory='8G') df = spark.read.csv(...) ... train_df, test_df = random_split(df, [0.9, 0.1]) train_dataset = RayMLDataset.from_spark(train_df, ...) test_dataset = RayMLDataset.from_spark(test_df, ...) from xgboost_ray import RayDMatrix, train, RayParams dtrain = RayDMatrix(train_dataset, label='fare_amount') dtest = RayDMatrix(test_dataset, label='fare_amount’) … bst = train( config, dtrain, evals=[(dtest, "eval")], evals_result=evals_result, ray_params=RayParams(…) num_boost_round=10) Data Preprocessing Model Training End-to-End Integrated Python Program RayD P RayD P
  • 21.
    Spark + Horovodon Ray import ray import raydp ray.init(address='auto') spark = raydp.init_spark('Spark + Horovod', num_executors=2, executor_cores=4, executor_memory=‘8G’) df = spark.read.csv(...) ... torch_ds= RayMLDataset.from_spark(df, …) .to_torch(...) #PyTorch Model class My_Model(nn.Module): … #Horovod on Ray def train_fn(dataset, num_features): hvd.init() rank = hvd.rank() train_data = dataset.get_shard(rank) ... from horovod.ray import RayExecutor executor = RayExecutor(settings, num_hosts=1, num_slots=1, cpus_per_slot=1) executor.start() executor.run(train_fn, args=[torch_ds, num_features]) Data Preprocessing Model Training End-to-End Integrated Python Program RayD P RayD P
  • 22.
    Spark + Horovod+ RayTune on Ray import ray import raydp ray.init(address='auto') spark = raydp.init_spark(‘Spark + Horovod', num_executors=2, executor_cores=4, executor_memory=‘8G’) df = spark.read.csv(...) ... torch_ds= RayMLDataset.from_spark(df, …) .to_torch(...) #PyTorch Model class My_Model(nn.Module): … #Horovod on Ray + Ray Tune def train_fn(config: Dict): ... trainable = DistributedTrainableCreator( train_fn, num_slots=2, use_gpu=use_gpu) analysis = tune.run( trainable, num_samples=2, config={ "epochs": tune.grid_search([1, 2, 3]), "lr": tune.grid_search([0.1, 0.2, 0.3]), } ) print(analysis.best_config) Data Preprocessing Model Training/Tuning End-to-End Integrated Python Program RayD P RayD P
  • 23.
    Summary • Ray isa general-purpose framework that can be used as a single substrate for end-to-end data analytics and AI pipelines. • RayDP provides simple APIs for running Spark on Ray and integrating Spark with distributed ML/DL frameworks. • For more information, please visit https://github.com/oap-project/raydp 
  • 24.
    Feedback Your feedback isimportant to us. Don’t forget to rate and review the sessions.