Scale an application using Ray

The Snowflake container runtime integrates with Ray, an open-source unified framework for scaling AI and Python applications. This integration allows you to use Ray’s distributed computing capabilities on Snowflake for your machine learning workloads.

Ray is pre-installed and runs as a background process within the Snowflake ML container runtime. You can access Ray from the Container Runtime for ML in the following ways:

Snowflake Notebooks: An interactive environment where you can connect to Ray, define tasks, and scale your cluster dynamically for development and experimentation.

Snowflake ML Jobs: Submit your Ray applications as structured, repeatable jobs. You can specify the cluster size as part of the job configuration for production workloads.

When you run the container runtime within a Snowflake Notebook or ML Job, the Ray process is automatically initiated as part of that container.

Use the following Python code to connect to the cluster:

import ray # Connect to the pre-existing Ray cluster within the Snowflake environment ray.init(address="auto", ignore_reinit_error=True) print(f"Ray cluster resources: {ray.cluster_resources()}") 
Copy

Important

Make sure you always use the "auto" address when you’re connecting to the Ray cluster. Initializing with the "auto" address directs your application to the head node of the Ray cluster that Snowflake has provisioned for your session.

Scaling your Ray cluster

After you connect to the Ray cluster, you can adjust its size to meet the computational demands of your workload.

Use the following approaches to scale your Ray cluster:

Within a notebook, you can dynamically scale your cluster up or down using the scale_cluster function. This is ideal for interactive workflows where resource needs might change.

When you specify expected_cluster_size=5, you get 1 head node and 4 worker nodes.

from snowflake.ml.runtime_cluster import scale_cluster, get_nodes # Check current cluster size print(f"Current cluster size: {len(get_nodes())} nodes") # Scale up to 4 nodes (1 head + 3 workers) print("Scaling up cluster...") scale_cluster(expected_cluster_size=4) print(f"New cluster size: {len(get_nodes())} nodes") 
Copy

After you’ve finished using your cluster you can scale it down. For more information, see Cleaning up.

Monitoring with the Ray Dashboard

If you’re running a job from a Snowflake Notebook, you can use the Ray Dashboard to monitor your cluster. The dashboard is a web interface that allows you to view the cluster’s resources, jobs, tasks, and performance. Use the following code to get the dashboard URL:

from snowflake.ml.runtime_cluster import get_ray_dashboard_url # This function is available in Notebooks to retrieve the dashboard URL dashboard_url = get_ray_dashboard_url() print(f"Access the Ray Dashboard here: {dashboard_url}") 
Copy

Open the URL in a new browser tab, log in with your Snowflake credentials.

Advanced use cases

This section covers advanced Ray features for complex workloads and for migrating existing applications.

Creating and operating distributed workloads with Ray

Ray provides components that enable you to create and operate distributed workloads. These include foundational components via Ray Core with essential primitives for building and scaling these workloads.

It also includes the following libraries that enable you build your own workflows for data preprocessing, ML training, hyperparameter tuning, and model inference:

  • Ray Data: Scalable data processing and transformation

  • Ray Train: Distributed training and fine-tuning of ML models

  • Ray Tune: Hyperparameter optimization with advanced search algorithms

  • Ray Serve: Model serving and inference

The following sections describe how you can use these libraries directly, while native Snowflake interfaces built over Ray provide additional tools to build, deploy, and operationalize Ray-based applications.

Ray Core: Tasks and Actors

Ray provides the following distributed computing primitives:

  • Tasks: Stateless functions that run remotely and return values

  • Actors: Stateful classes that can be instantiated remotely and called multiple times

  • Objects: Immutable values stored in Ray’s distributed object store

  • Resources: CPU, GPU, and custom resource requirements for tasks and actors

The following example demonstrates how to use a basic Ray Task and Actors to do linear regression:

import ray import numpy as np import pandas as pd from sklearn.linear_model import LinearRegression # Initialize Ray (automatically connects to cluster in Snowflake ML) ray.init(address="auto", ignore_reinit_error=True) # Create sample data large_dataset = np.random.randn(1000, 10) batch_data = pd.DataFrame(np.random.randn(100, 5), columns=[f'feature_{i}' for i in range(5)]) # Ray Tasks - stateless remote functions @ray.remote def compute_heavy_task(data):  """CPU-intensive computation example""" # Simulate heavy computation (matrix operations) result = np.dot(data, data.T) return np.mean(result) # Ray Actors - stateful remote classes @ray.remote class DataProcessor: def __init__(self): # Load a simple model self.model = LinearRegression() # Train on dummy data X_dummy = np.random.randn(100, 5) y_dummy = np.random.randn(100) self.model.fit(X_dummy, y_dummy) def process_batch(self, batch): # Convert to numpy if it's a DataFrame if isinstance(batch, pd.DataFrame): batch_array = batch.values else: batch_array = batch return self.model.predict(batch_array) # Submit tasks and get object references future = compute_heavy_task.remote(large_dataset) result = ray.get(future) # Blocks until task completes print(f"Task result: {result}") # Create and use actors processor = DataProcessor.remote() batch_result = ray.get(processor.process_batch.remote(batch_data)) print(f"Batch processing result shape: {batch_result.shape}") 
Copy

Ray Train: Distributed Training

Ray Train is a library that enables distributed training and fine-tuning of models. You can run your training code on a single machine or an entire cluster. For Ray on Snowflake, you can use Ray Train for single-node execution, but not multi-node execution.

For distributed multi-node training, use the Optimized Training functions in the container runtime. These functions provide integrated XGBoost, LightGBM, and PyTorch distributed training with automatic storage handling that internally uses the same Ray cluster.

Ray Data: Scalable Data Processing

Ray Data provides scalable, distributed data processing for ML workloads. It can handle datasets larger than cluster memory through streaming execution and lazy evaluation.

Note

Snowflake offers a native integration to transform any Snowflake data source to Ray Data. For more information, see the Data Connector and Ray Data Ingestion pages.

Use Ray Data for:

  • Processing large datasets that don’t fit in single-node memory

  • Distributed data preprocessing and feature engineering

  • Building data pipelines that integrate with other Ray libraries

import ray import ray.data as rd import pandas as pd import numpy as np from snowflake.ml.runtime_cluster import scale_cluster # Initialize Ray ray.init(address="auto", ignore_reinit_error=True) # Optional: Scale cluster for better performance with large datasets or CPU-intensive operations # Scaling benefits Ray Data when: # - Processing datasets larger than single-node memory (>10GB) # - Performing CPU-intensive transformations (complex feature engineering, ML preprocessing) # - Need faster processing through parallelization across multiple nodes scale_cluster(expected_cluster_size=4) # Create sample dataset np.random.seed(42) n_samples = 50000 n_features = 15 # Generate features with some correlation structure base_features = np.random.randn(n_samples, 5) derived_features = np.column_stack([ base_features[:, 0] * base_features[:, 1], # interaction np.sin(base_features[:, 2]), # non-linear base_features[:, 3] ** 2, # polynomial np.random.randn(n_samples, n_features - 8) # additional random features ]) X = np.column_stack([base_features, derived_features]) y = (X[:, 0] + 0.5 * X[:, 1] - 0.3 * X[:, 2] + 0.1 * X[:, 5] + np.random.randn(n_samples) * 0.2 > 0).astype(int) sample_data = pd.DataFrame(X, columns=[f'feature_{i}' for i in range(n_features)]) sample_data['target'] = y print(f"Created dataset with {n_samples} samples and {n_features} features") # Create Ray Dataset from pandas DataFrame ray_dataset = rd.from_pandas(sample_data) # Transform data with Ray Data operations def preprocess_batch(batch):  """Preprocess a batch of data""" # Get all feature columns feature_cols = [col for col in batch.columns if col.startswith('feature_')] # Normalize numerical features (first 3 for demo) for col in feature_cols[:3]: if col in batch.columns: batch[f'{col}_scaled'] = (batch[col] - batch[col].mean()) / batch[col].std() # Add derived features using actual column names if 'feature_0' in batch.columns and 'feature_1' in batch.columns: batch['feature_0_squared'] = batch['feature_0'] ** 2 batch['feature_interaction'] = batch['feature_0'] * batch['feature_1'] return batch # Apply transformations lazily processed_dataset = ray_dataset.map_batches( preprocess_batch, batch_format="pandas" ) # Repartition for optimal performance across cluster nodes processed_dataset = processed_dataset.repartition(num_blocks=8) # Convert to different formats for downstream use print("Converting to pandas...") pandas_df = processed_dataset.to_pandas() # Collect to pandas print(f"Processed dataset shape: {pandas_df.shape}") print(f"New columns: {list(pandas_df.columns)}") # Iterate through batches for memory efficiency print("Processing batches...") batch_count = 0 for batch in processed_dataset.iter_batches(batch_size=1000, batch_format="pandas"): batch_count += 1 print(f"Batch {batch_count}: {batch.shape}") if batch_count >= 3: # Just show first 3 batches break print(f"Total batches processed: {batch_count}") 
Copy

Ray Tune: Distributed Hyperparameter Tuning

Ray Tune provides distributed hyperparameter optimization with advanced search algorithms and early stopping capabilities. For a more integrated and optimized experience when reading from Snowflake data sources, use the native Hyperparameter Optimization (HPO) API. For more information about using HPO optimization, see Optimize a model’s hyperparameters.

If you’re looking for a more customizable approach to a distributed HPO implementation, use Ray Tune.

You can use Ray Tune for the following use cases:

  • Hyperparameter optimization across multiple trials in parallel

  • Advanced search algorithms (Bayesian optimization, population-based training)

  • Large-scale hyperparameter sweeps requiring distributed execution

import ray from ray import tune import pandas as pd import numpy as np from sklearn.ensemble import RandomForestClassifier from sklearn.model_selection import train_test_split from sklearn.metrics import accuracy_score from snowflake.ml.runtime_cluster import scale_cluster # Initialize Ray ray.init(address="auto", ignore_reinit_error=True) # Optional: Scale cluster for hyperparameter tuning # Scaling benefits Ray Tune when: # - Running many trials in parallel # - Each trial is computationally intensive # - Need faster hyperparameter search scale_cluster(expected_cluster_size=6) # Create sample dataset np.random.seed(42) n_samples = 5000 n_features = 10 X = np.random.randn(n_samples, n_features) y = ((X[:, 0] + X[:, 1] * X[:, 2] + np.sin(X[:, 3]) + np.random.randn(n_samples) * 0.3) > 0).astype(int) # Split data X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42) def train_function(config):  """Training function that gets hyperparameters from Ray Tune""" # Train model with current hyperparameters model = RandomForestClassifier( n_estimators=config["n_estimators"], max_depth=config["max_depth"], min_samples_split=config["min_samples_split"], random_state=42, n_jobs=-1 ) model.fit(X_train, y_train) # Evaluate and report results val_predictions = model.predict(X_val) accuracy = accuracy_score(y_val, val_predictions) # Report metrics back to Ray Tune return {"accuracy": accuracy} # Define search space search_space = { "n_estimators": tune.randint(50, 200), "max_depth": tune.randint(3, 15), "min_samples_split": tune.randint(2, 10) } # Configure and run hyperparameter optimization tuner = tune.Tuner( tune.with_resources( train_function, resources={"CPU": 2} ), param_space=search_space, tune_config=tune.TuneConfig( metric="accuracy", mode="max", num_samples=20, # Number of trials max_concurrent_trials=4 ) ) print("Starting hyperparameter optimization...") results = tuner.fit() # Get best results best_result = results.get_best_result() print(f"✅ Hyperparameter tuning completed!") print(f" Best accuracy: {best_result.metrics['accuracy']:.4f}") print(f" Best parameters: {best_result.config}") # Show results summary df_results = results.get_dataframe() print(f"\nTop 5 results:") top_results = df_results.nlargest(5, 'accuracy') for i, (_, row) in enumerate(top_results.iterrows(), 1): print(f" {i}. Accuracy: {row['accuracy']:.4f}, n_estimators: {row['config/n_estimators']}, max_depth: {row['config/max_depth']}") 
Copy

Model Serving

For model serving, you can use Snowflake’s native capabilities. For more information, see Model Serving in Snowpark Container Services.

Submit and manage distributed applications on Ray clusters

Use Ray Jobs to submit and manage distributed applications on Ray clusters with better resource isolation and lifecycle management. For all job-based executions that require access to a Ray Cluster, Snowflake recommends using an ML Job, where you can define the Ray application logic. For instances where you require direct access to the Ray Job interface, such as migrating an existing implementation, you could use the Ray Job primitive as is described in the Ray documentation.

Use Ray jobs for:

  • Production ML pipelines and scheduled workflows

  • Long-running workloads requiring fault tolerance

  • Batch processing and large-scale data processing

import ray from ray.job_submission import JobSubmissionClient import os # Initialize Ray and get job client ray.init(address="auto", ignore_reinit_error=True) # Get Ray dashboard address for job submission node_ip = os.getenv("NODE_IP_ADDRESS", "0.0.0.0") dashboard_port = os.getenv("DASHBOARD_PORT", "9999") dashboard_address = f"http://{node_ip}:{dashboard_port}" client = JobSubmissionClient(dashboard_address) # Simple job script job_script = ''' import ray @ray.remote def compute_task(x):  return x * x # Submit tasks to Ray cluster futures = [compute_task.remote(i) for i in range(5)] results = ray.get(futures) print(f"Results: {results}") ''' # Submit job job_id = client.submit_job( entrypoint=f"python -c '{job_script}'", runtime_env={"pip": ["numpy"]}, submission_id="my-ray-job" ) print(f"Submitted job: {job_id}") # Monitor job status status = client.get_job_status(job_id) print(f"Job status: {status}") 
Copy

Scaling Ray Clusters with Options

From a Snowflake Notebook, you can scale your Ray clusters to precisely match computational demands. A cluster consists of a head node (coordinator) and worker nodes (for task execution).

from snowflake.ml.runtime_cluster import scale_cluster, get_nodes # Asynchronous scaling - returns immediately scale_cluster( expected_cluster_size=2, is_async=True # Don't wait for all nodes to be ready ) # Scaling with custom options scale_cluster( expected_cluster_size=3, options={ "rollback_after_seconds": 300, # Auto-rollback after 5 minutes "block_until_min_cluster_size": 2 # Return when at least 2 nodes ready } ) # Scale down for cost efficiency scale_cluster(expected_cluster_size=2) 
Copy

Resource monitoring

import ray from snowflake.ml.runtime_cluster import get_nodes from snowflake.ml.runtime_cluster.cluster_manager import ( get_available_cpu, get_available_gpu, get_num_cpus_per_node ) # Check available resources available_cpus = get_available_cpu() available_gpus = get_available_gpu() cpus_per_node = get_num_cpus_per_node() print(f"Available CPUs: {available_cpus}") print(f"Available GPUs: {available_gpus}") print(f"CPUs per node: {cpus_per_node}") # Get Ray's view of resources ray_resources = ray.available_resources() print(f"Ray available resources: {ray_resources}") # Calculate resource utilization total_cpus = ray.cluster_resources().get('CPU', 0) used_cpus = total_cpus - available_cpus utilization = (used_cpus / total_cpus * 100) if total_cpus > 0 else 0 print(f"CPU Utilization: {utilization:.1f}%") 
Copy

Cleaning up

After you’re finished with the cluster, you can scale it down to avoid additional charges. Use the following code to scale it down:

# Scale down when finished to conserve resources print("Scaling down cluster...") scale_cluster(expected_cluster_size=1) print(f"Final cluster size: {len(get_nodes())} nodes") 
Copy