Scaling Data and ML with Feast and Apache Spark Willem Pienaar Data Science Platform Lead
Agenda ▪ Overview ▪ Data challenges in production ML ▪ What is Feast? ▪ Getting data into Feast ▪ Feature serving ▪ Feature statistics and validation ▪ Takeaways ▪ The road ahead
Gojek ■ Ride hailing ■ Food delivery ■ Digital payments ■ Logistics ■ Lifestyle services 100m+ app downloads +500k merchants 4 countries 1m+ drivers 100m+ monthly bookings Indonesia Singapore Thailand Vietnam
Machine learning at Gojek ■ Matchmaking ■ Dynamic pricing ■ Routing ■ Recommendation systems ■ Incentive optimization ■ Supply positioning ■ Fraud prevention
Machine learning life cycle prior to Feast Jupyter Notebook Model Serving Production System Features ??
Machine learning life cycle prior to Feast Spark Transform Data Train Model Deploy Model Model Serving Production System Features Streams Stream Processing Data Lake
Problems with end-to-end ML systems ● Monolithic end-to-end systems are hard to iterate on ● Training code needs to be rewritten for serving ● Training and serving features are inconsistent ● Data quality monitoring and validation is absent ● Lack of feature reuse and sharing
Feast is a system that attempts to solve the key data challenges with productionizing machine learning
Feast background ▪ Feature store was a collaboration between Gojek and Google Cloud ▪ Open-sourced in January ‘19 ▪ Community driven with adoption/contribution from multiple tech companies
Machine learning life cycle prior to Feast Spark Transform Data Train Model Deploy Model Model Serving Production System Features Streaming Data Stream Processing Data Lake
Machine learning life cycle with Feast Train Model Model Serving Production System Streaming Data Stream Processing Data Lake Feast Feast Feast Create Features Train Model Serve Model Spark
What is Feast? Feast is an ML-specific data system that attempts to solve the key challenges with productionizing ML ▪ Manages ingestion and storage of both streaming and batch data ▪ Allows for standardized definitions of features regardless of environment ▪ Encourages sharing and re-use of features through semantic references ▪ Ensures data consistency between to both training and serving ▪ Provides a point-in-time correct view of features for model training ▪ Ensures model performance by tracking, validating, and monitoring features
What is Feast not? ▪ A workflow scheduler (Airflow, Luigi) ▪ Just a data warehouse or data lake (Hive, BigQuery, Snowflake) ▪ A data transformation/processing tool (Pandas, Spark, DBT) ▪ A data discovery or cataloguing system (Amundsen, DataHub) ▪ Data version control or lineage (Dolt, Pachyderm) ▪ Model serving or metadata tracking (KFServing, Seldon, MLflow)
Getting data into Feast
Create entities and features using feature sets name: driver_weekly entities: - name: driver_id valueType: INT64 features: - name: acc_rate valueType: FLOAT - name: conv_rate valueType: FLOAT - name: avg_daily_trips valueType: FLOAT ▪ Feature sets allow for the definition of entities and features and their associated properties ▪ Allows for bulk definition of features as they occur in a data source, e.g., Kafka ▪ Feature sets are not a grouping for serving features
Ingesting a DataFrame into Feast # Load dataframe driver_df = pd.read_csv("driver_weekly_data.csv") # Create feature set from dataframe driver_fs = FeatureSet("driver_weekly") driver_fs.infer_fields_from_df(dataframe) # Register driver feature set. feast_client.apply(driver_fs) # Load feature data into Feast feast_client.ingest(driver_fs, driver_df) name: driver_weekly entities: - name: driver_id valueType: INT64 features: - name: acc_rate valueType: FLOAT - name: conv_rate valueType: FLOAT - name: avg_daily_trips valueType: FLOAT
Ingesting streams into Feast # Create feature set from a Kafka stream driver_stream_fs = FeatureSet( name="driver_stream", entities=[Entity(name="driver_id", dtype=ValueType.INT64)], features=[Feature(name="trips_today", dtype=ValueType.INT64)], source=KafkaSource(brokers="kafka:9092", topic="driver-stream-topic"), ) # Register driver stream feature set feast_client.apply(driver_stream_fs) Events on stream
What happens to the data? Stream Data Warehouse Ingestion layer (Apache Beam) Data Lake Jupyter Notebook Historical Feature Store Online Feature Storage (Redis, Cassandra) Feast Serving Feast Core ● Registry of features and entities ● Manages ingestion jobs ● Allows for search and discovery of features ● Allows for generation of feature statistics ● Retrieve point-in-time correct training datasets ● Retrieve consistent online features at low latency ● Unified ingestion ensures online/historical consistency ● Provides feature schema based statistics and alerting Your data Ingestion Storage Serving Production Model Training Model Serving
Feature serving
Feature references and retrieval Feast ServingModel Training features = [ avg_daily_trips, conv_rate, acc_rate, trips_today, target ] Training Dataset Feast ServingModel Serving Online features < 10ms ■ Each feature is identified through a feature reference ■ Feature references allow clients to request either online or historical feature data from Feast ■ Models have a single consistent view of features in both training and serving ■ Feature references are persisted with model binaries, allowing full automation of online serving features = [ avg_daily_trips, conv_rate, acc_rate, trips_today ]
Events throughout time Time Acceptance rate Average daily trips Conversion rate Rider booking Booking outcome Featurevalues Prediction made here Outcome of prediction Trips Today
Ensuring point-in-time correctness Time Acceptance rate Average daily trips Conversion rate Rider booking Booking outcome Featurevalues Prediction made here Outcome of prediction Trips Today
Point-in-time joins
Getting features for model training features = [ "acc_rate", "conv_rate", "avg_daily_trips", "trips_today", ] # Fetch historical data historic_features = client.get_batch_features( entity_rows=drivers, feature_ids=features ).to_dataframe() # Train model my_model = ml_framework.fit(historic_features) Batch data Stream Target
Getting features during online serving features = [ "acc_rate", "conv_rate", "avg_daily_trips", "trips_today", ] # Fetch online features online_features = client.get_online_features( entity_rows=drivers, feature_ids=features ) # Train model result = trip_comp_model.predict(online_features)
Feature statistics and validation
Feature validation in Feast ▪ TFX: Feast has interoperability with TFDV as part of feature specifications ▪ Statistics: Allows users to generate feature statistics and visualize with Facets ▪ Dataset validation: Schemas can be used for validating data during training ▪ Monitoring & Alerting: Feast metrics an schemas can be used for monitoring and alerting
Infer TFDV schemas for features # Get statistics based on source data inside of Feast stats = feast_client.get_statistics( feature_set_ref = 'iris', start_date=start_date, end_date=end_date ) # Infer schema using TFDV schema = tfdv.infer_schema(statistics=stats) # User tweaks schema tfdv.set_domain(schema, 'petal_width', schema_pb2.FloatDomain(min=0)) # Create a new Feast “feature set” from our Iris dataframe iris_feature_set = feast_client.get_feature_set('iris') # Update the entities and features with constraints defined in the schema iris_feature_set.import_tfx_schema(schema) # Persist feature set with TFDV schema in Feast feast_client.apply(iris_feature_set) name: iris entities: - name: class valueType: STRING features: - name: sepal_length valueType: DOUBLE presence: minFraction: 1 minCount: 1 shape: dim: - size: 1 - name: sepal_width valueType: DOUBLE presence: minFraction: 1 minCount: 1 shape: dim: - size: 1 ...
Visualize and validate training dataset # Get statistics based on source data inside of Feast dataset = client.get_batch_features(entity_rows=drivers, feature_ids=features) # Get statistics based on training dataset stats = dataset.get_statistics() # Get schema based on training dataset schema = dataset.export_tfx_schema() # Use TFDV to validate statistics generated from training dataset anomalies = tfdv.validate_statistics(statistics=stats, schema=schema) # Use TFDV to visualize statistics with Facets for debugging tfdv.visualize_statistics(stats)
Takeaways
What value does Feast unlock? ▪ Sharing: New projects start with feature selection and not creation ▪ Iteration speed: Stages of the ML life cycle can be iterated on independently ▪ Consistency: Improved model performance through consistency and point-in-time correctness ▪ Definitions: Feature creators can encode domain knowledge into feature definitions ▪ Quality: Ensures the quality of data that reaches models through validation and alerting
The road ahead
Roadmap ▪ Feast 0.6 ▪ Statistics and validation functionality ▪ Improved discovery and metadata functionality ▪ Under development ▪ Databricks, Azure, AWS support (community driven) ▪ SQL based sources ▪ JDBC storage (MySQL, PostgreSQL, Snowflake) ▪ Planned ▪ Automated training-serving skew detection ▪ Derived features ▪ Feature discovery UI
Get involved! ▪ Homepage: feast.dev ▪ Source code: github.com/feast-dev/feast ▪ Slack: #Feast ▪ Mailing list: https://groups.google.com/d/forum/feast-discuss ▪ These slides: https://tinyurl.com/feast-spark-deck

Scaling Data and ML with Apache Spark and Feast

  • 2.
    Scaling Data andML with Feast and Apache Spark Willem Pienaar Data Science Platform Lead
  • 3.
    Agenda ▪ Overview ▪ Datachallenges in production ML ▪ What is Feast? ▪ Getting data into Feast ▪ Feature serving ▪ Feature statistics and validation ▪ Takeaways ▪ The road ahead
  • 4.
    Gojek ■ Ride hailing ■Food delivery ■ Digital payments ■ Logistics ■ Lifestyle services 100m+ app downloads +500k merchants 4 countries 1m+ drivers 100m+ monthly bookings Indonesia Singapore Thailand Vietnam
  • 5.
    Machine learning atGojek ■ Matchmaking ■ Dynamic pricing ■ Routing ■ Recommendation systems ■ Incentive optimization ■ Supply positioning ■ Fraud prevention
  • 6.
    Machine learning lifecycle prior to Feast Jupyter Notebook Model Serving Production System Features ??
  • 7.
    Machine learning lifecycle prior to Feast Spark Transform Data Train Model Deploy Model Model Serving Production System Features Streams Stream Processing Data Lake
  • 8.
    Problems with end-to-endML systems ● Monolithic end-to-end systems are hard to iterate on ● Training code needs to be rewritten for serving ● Training and serving features are inconsistent ● Data quality monitoring and validation is absent ● Lack of feature reuse and sharing
  • 9.
    Feast is asystem that attempts to solve the key data challenges with productionizing machine learning
  • 10.
    Feast background ▪ Featurestore was a collaboration between Gojek and Google Cloud ▪ Open-sourced in January ‘19 ▪ Community driven with adoption/contribution from multiple tech companies
  • 11.
    Machine learning lifecycle prior to Feast Spark Transform Data Train Model Deploy Model Model Serving Production System Features Streaming Data Stream Processing Data Lake
  • 12.
    Machine learning lifecycle with Feast Train Model Model Serving Production System Streaming Data Stream Processing Data Lake Feast Feast Feast Create Features Train Model Serve Model Spark
  • 13.
    What is Feast? Feastis an ML-specific data system that attempts to solve the key challenges with productionizing ML ▪ Manages ingestion and storage of both streaming and batch data ▪ Allows for standardized definitions of features regardless of environment ▪ Encourages sharing and re-use of features through semantic references ▪ Ensures data consistency between to both training and serving ▪ Provides a point-in-time correct view of features for model training ▪ Ensures model performance by tracking, validating, and monitoring features
  • 14.
    What is Feastnot? ▪ A workflow scheduler (Airflow, Luigi) ▪ Just a data warehouse or data lake (Hive, BigQuery, Snowflake) ▪ A data transformation/processing tool (Pandas, Spark, DBT) ▪ A data discovery or cataloguing system (Amundsen, DataHub) ▪ Data version control or lineage (Dolt, Pachyderm) ▪ Model serving or metadata tracking (KFServing, Seldon, MLflow)
  • 15.
  • 16.
    Create entities andfeatures using feature sets name: driver_weekly entities: - name: driver_id valueType: INT64 features: - name: acc_rate valueType: FLOAT - name: conv_rate valueType: FLOAT - name: avg_daily_trips valueType: FLOAT ▪ Feature sets allow for the definition of entities and features and their associated properties ▪ Allows for bulk definition of features as they occur in a data source, e.g., Kafka ▪ Feature sets are not a grouping for serving features
  • 17.
    Ingesting a DataFrameinto Feast # Load dataframe driver_df = pd.read_csv("driver_weekly_data.csv") # Create feature set from dataframe driver_fs = FeatureSet("driver_weekly") driver_fs.infer_fields_from_df(dataframe) # Register driver feature set. feast_client.apply(driver_fs) # Load feature data into Feast feast_client.ingest(driver_fs, driver_df) name: driver_weekly entities: - name: driver_id valueType: INT64 features: - name: acc_rate valueType: FLOAT - name: conv_rate valueType: FLOAT - name: avg_daily_trips valueType: FLOAT
  • 18.
    Ingesting streams intoFeast # Create feature set from a Kafka stream driver_stream_fs = FeatureSet( name="driver_stream", entities=[Entity(name="driver_id", dtype=ValueType.INT64)], features=[Feature(name="trips_today", dtype=ValueType.INT64)], source=KafkaSource(brokers="kafka:9092", topic="driver-stream-topic"), ) # Register driver stream feature set feast_client.apply(driver_stream_fs) Events on stream
  • 19.
    What happens tothe data? Stream Data Warehouse Ingestion layer (Apache Beam) Data Lake Jupyter Notebook Historical Feature Store Online Feature Storage (Redis, Cassandra) Feast Serving Feast Core ● Registry of features and entities ● Manages ingestion jobs ● Allows for search and discovery of features ● Allows for generation of feature statistics ● Retrieve point-in-time correct training datasets ● Retrieve consistent online features at low latency ● Unified ingestion ensures online/historical consistency ● Provides feature schema based statistics and alerting Your data Ingestion Storage Serving Production Model Training Model Serving
  • 20.
  • 21.
    Feature references andretrieval Feast ServingModel Training features = [ avg_daily_trips, conv_rate, acc_rate, trips_today, target ] Training Dataset Feast ServingModel Serving Online features < 10ms ■ Each feature is identified through a feature reference ■ Feature references allow clients to request either online or historical feature data from Feast ■ Models have a single consistent view of features in both training and serving ■ Feature references are persisted with model binaries, allowing full automation of online serving features = [ avg_daily_trips, conv_rate, acc_rate, trips_today ]
  • 22.
    Events throughout time Time Acceptancerate Average daily trips Conversion rate Rider booking Booking outcome Featurevalues Prediction made here Outcome of prediction Trips Today
  • 23.
    Ensuring point-in-time correctness Time Acceptancerate Average daily trips Conversion rate Rider booking Booking outcome Featurevalues Prediction made here Outcome of prediction Trips Today
  • 24.
  • 25.
    Getting features formodel training features = [ "acc_rate", "conv_rate", "avg_daily_trips", "trips_today", ] # Fetch historical data historic_features = client.get_batch_features( entity_rows=drivers, feature_ids=features ).to_dataframe() # Train model my_model = ml_framework.fit(historic_features) Batch data Stream Target
  • 26.
    Getting features duringonline serving features = [ "acc_rate", "conv_rate", "avg_daily_trips", "trips_today", ] # Fetch online features online_features = client.get_online_features( entity_rows=drivers, feature_ids=features ) # Train model result = trip_comp_model.predict(online_features)
  • 27.
  • 28.
    Feature validation inFeast ▪ TFX: Feast has interoperability with TFDV as part of feature specifications ▪ Statistics: Allows users to generate feature statistics and visualize with Facets ▪ Dataset validation: Schemas can be used for validating data during training ▪ Monitoring & Alerting: Feast metrics an schemas can be used for monitoring and alerting
  • 29.
    Infer TFDV schemasfor features # Get statistics based on source data inside of Feast stats = feast_client.get_statistics( feature_set_ref = 'iris', start_date=start_date, end_date=end_date ) # Infer schema using TFDV schema = tfdv.infer_schema(statistics=stats) # User tweaks schema tfdv.set_domain(schema, 'petal_width', schema_pb2.FloatDomain(min=0)) # Create a new Feast “feature set” from our Iris dataframe iris_feature_set = feast_client.get_feature_set('iris') # Update the entities and features with constraints defined in the schema iris_feature_set.import_tfx_schema(schema) # Persist feature set with TFDV schema in Feast feast_client.apply(iris_feature_set) name: iris entities: - name: class valueType: STRING features: - name: sepal_length valueType: DOUBLE presence: minFraction: 1 minCount: 1 shape: dim: - size: 1 - name: sepal_width valueType: DOUBLE presence: minFraction: 1 minCount: 1 shape: dim: - size: 1 ...
  • 30.
    Visualize and validatetraining dataset # Get statistics based on source data inside of Feast dataset = client.get_batch_features(entity_rows=drivers, feature_ids=features) # Get statistics based on training dataset stats = dataset.get_statistics() # Get schema based on training dataset schema = dataset.export_tfx_schema() # Use TFDV to validate statistics generated from training dataset anomalies = tfdv.validate_statistics(statistics=stats, schema=schema) # Use TFDV to visualize statistics with Facets for debugging tfdv.visualize_statistics(stats)
  • 31.
  • 32.
    What value doesFeast unlock? ▪ Sharing: New projects start with feature selection and not creation ▪ Iteration speed: Stages of the ML life cycle can be iterated on independently ▪ Consistency: Improved model performance through consistency and point-in-time correctness ▪ Definitions: Feature creators can encode domain knowledge into feature definitions ▪ Quality: Ensures the quality of data that reaches models through validation and alerting
  • 33.
  • 34.
    Roadmap ▪ Feast 0.6 ▪Statistics and validation functionality ▪ Improved discovery and metadata functionality ▪ Under development ▪ Databricks, Azure, AWS support (community driven) ▪ SQL based sources ▪ JDBC storage (MySQL, PostgreSQL, Snowflake) ▪ Planned ▪ Automated training-serving skew detection ▪ Derived features ▪ Feature discovery UI
  • 35.
    Get involved! ▪ Homepage:feast.dev ▪ Source code: github.com/feast-dev/feast ▪ Slack: #Feast ▪ Mailing list: https://groups.google.com/d/forum/feast-discuss ▪ These slides: https://tinyurl.com/feast-spark-deck