Building a Feature Store around Dataframes and Apache Spark Jim Dowling, CEO @ Logical Clocks AB Fabio Buso, Head of Engineering @ Logical Clocks AB
When Data Engineers are asked to re-use other teams’ features* *Hide-the-pain-Harold smiles and says ‘yes’, but inside he’s in a world of pain
Known Feature Stores in Production ▪ Logical Clocks – Hopsworks (world’s first/only fully open source) ▪ Uber Michelangelo ▪ Airbnb – Bighead/Zipline ▪ Comcast ▪ Twitter ▪ GO-JEK Feast ▪ Conde Nast ▪ Facebook FB Learner ▪ Netflix ▪ Reference: www.featurestore.org
Feature Store in Banking ▪ Problem: Manage TBs of Transactions as ML Features. Develop models to reduce costs of Fraud. ▪ Solution: Hopsworks provides the platform to train machine learning models to classify transactions as suspected for Fraud or not. The Fraud dataset contains billions of records (40 TB) and the solution involves using Deep Learning (GPUs) to detect structural patterns in bank transactions and temporal patterns based on the frequency of bank transactions executed. ▪ Reference: Swedbank Talk at Spark/AI EU Summit 2019
Data Teams are moving from Analytics to ML Event DataRaw Data Data LakeDATA PIPELINES BI Platforms SQL Data
Data Teams are moving from Analytics to ML Event DataRaw Data Data LakeDATA PIPELINES BI Platforms SQL Data FEATURE PIPELINES Feature Store Hopsworks MODEL TRAINING ONLINE MODEL SERVING ANALYTICAL MODEL SCORING (BATCH)
Features are created/updated at different cadences Click features every 10 secs CDC data every 30 secs User profile updates every hour Featurized weblogs data every day Online Feature Store Offline Feature Store SQL DW S3, HDFS SQL Event Data Real-Time Data User-Entered Features (<2 secs) Online App Low Latency Features High Latency Features Train, Batch App Feature Store No existing database is both scalable (PBs) and low latency (<10ms). Hence, online + offline Feature Stores. <10ms TBs/PBs
FeatureGroup Ingestion in Hopsworks Feature Store ClickFeatureGroup TableFeatureGroup UserFeatureGroup LogsFeatureGroup Event Data SQL DW S3, HDFS SQL DataFrameAPI Kafka Input Flink RTFeatureGroup Online App Train, Batch App User Clicks DB Updates User Profile Updates Weblogs Real-time features Kafka Output
No More End-to-End ML Pipelines!
Event DataRaw Data Feature Pipeline FEATURE STORE TRAIN/VALIDATE MODEL SERVING MONITOR Data Lake ML Pipelines start and stop at the Feature Store
Feature Store Concepts Features name Pclass Sex Survive Name Balance Train / Test Datasets Survivename PClass Sex Balance Join key Feature Groups Titanic Passenger List Passenger Bank Account File format .tfrecord .npy .csv .hdf5, .petastorm, etc Storage GCS Amazon S3 HopsFS Features, FeatureGroups, and Train/Test Datasets are all versioned
Register a FeatureGroup with the Feature Store from hops import featurestore as fs df = # Spark or Pandas Dataframe # Do feature engineering on ‘df’ # Register Dataframe as FeatureGroup fs.create_featuregroup(df, ”titanic_df“, online=True)
Hopsworks Feature Store Raw Data Structured Data Events Data Lake Online Feature Store Offline Feature Store Ingest Data From Used By Online Apps Batch Apps Create Train/Test Data
Create Train/Test Datasets using the Feature Store from hops import featurestore as fs sample_data = fs.get_features([“name”, “Pclass”, “Sex”, “Balance”, “Survived”]) fs.create_training_dataset(sample_data, “titanic_training_dataset", data_format="tfrecords“, training_dataset_version=1)
Online Feature Store US-West-la MySQL NDB1 Model Online Application 1.JDBC 2.Predict 1. Build a Feature Vector using the Online Feature Store US-West-1c MySQL NDB3 Model ~5-50ms US-West-1b MySQL NDB2 Model 2-20ms 2. Send the Feature Vector to a Model for Prediction
Good Decisions we took in Version 1 ▪ General Purpose Data Frame API (DSL could be added later) ▪ Feature Store is a cache for materialized features, not a library. ▪ Online and Offline Feature Stores to support low latency and scale, respectively ▪ Reuse of Features means JOINS – Spark as a join engine
Feature Store API v2 ▪ Enforce feature-group scope and versioning (as best practice) ▪ Better support for multiple feature stores - join features from development and production feature stores ▪ Better support for complex joins of features ▪ First class API support for time-travel ▪ More consistent developer experience
Connect and Support for Multiple Feature Stores import hsfs # Connect to the production feature store conn = hsfs.connection(host="ea2.aws.hopsworks.ai", project="prod") prod_fs = conn.get_feature_store() dev_fs = conn.get_feature_store(“dev”)
Feature Group Operations # Create Feature group metadata fg = dev_fs.create_feature_group(“temperature”, description=”Temperature Features”, version = 1, online_enabled=True) # Schema is inferred from the dataframe fg.save(dataframe) # Read the feature group as dataframe df = fg.read() # Append more data to the feature group fg.insert(dataframe, overwrite=False)
fg = dev_fs.get_feature_group(“temperature”, version = 1) fg.add_tag(“country”, “SE”) fg.add_tags({“country”: “SE”, “year”: 2020}) Tags ▪ Allow feature groups, features and training datasets to be discoverable ▪ Tags are searchable from the Hopsworks UI
fg.add_feature("new_feature", type=”int”, default_value) Schema Version Management Non breaking schema changes (e.g. add a feature) can be applied without bumping the version.
fg = dev_fs.get_feature_group(“temperature”, version=1) # Returns a dataframe object fg.read() # Show a sample of 10 rows in the feature group fg.show(10) fg.select(["date", "location", "avg"]).show(10) fg.select(["date”, “location”,”avg”]).read() .filter(col(“location”) == “Stockholm”).show(10) Exploratory Data Analysis
Joins - Pandas Style API crop_fg = prod_fs.get_feature_group(“crop”, version = 1) temperature = dev_fs.get_feature_group(“temperature”, version = 1) rain = dev_fs.get_feature_group(“rain”, version = 1) joined_features = crop_fg.select(["location", "yield"]) .join(temperature.select(["location", “season_avg"])) .join(rain.select(["location", "avg_mm"]), on=["location"], join_type="left") dataframe = joined_features.read()
Time-Travel fs.get_feature_group("temperature", version = 1, wallclock_time=None, wallclock_time_start=None, wallclock_time_end=None) ▪ Explore how the feature group looked like at X point in time in the past ▪ List value changes between timestamps
Create Train/Test Data from Joined Features connector = fs.get_storage_connector("s3connector", "S3") td = fs.create_training_dataset(name='crop_model', description='Dataset to train the crop model', version=1, data_format='tfrecords', connector = connector, splits={'train': 0.7,'test': 0.2,'validate': 0.1}) td.save(joined_features)
fs.get_feature_vector(training_dataset=”crop”, id = [{ ‘location’: ‘Stockholm’, ‘crop’: ‘wheat’ }]) Get feature vector for online serving ▪ Return feature vector from the online feature store ▪ Feature order is maintained
Demo Using Hopsworks Feature Store in Databricks
Thank You! Get Started hopsworks.ai github.com/logicalclocks/hopsworks Twitter @logicalclocks Web www.logicalclocks.com Feature Store contributions from colleagues ▪ Moritz Meister ▪ Kim Hammar ▪ Alex Ormenisan ▪ Robin Andersson ▪ Ermias Gebremeskel ▪ Theofilos Kakantousis Thanks to the Logical Clocks Team!
Feedback Your feedback is important to us. Don’t forget to rate and review the sessions.

Building a Feature Store around Dataframes and Apache Spark

  • 2.
    Building a FeatureStore around Dataframes and Apache Spark Jim Dowling, CEO @ Logical Clocks AB Fabio Buso, Head of Engineering @ Logical Clocks AB
  • 4.
    When Data Engineersare asked to re-use other teams’ features* *Hide-the-pain-Harold smiles and says ‘yes’, but inside he’s in a world of pain
  • 5.
    Known Feature Storesin Production ▪ Logical Clocks – Hopsworks (world’s first/only fully open source) ▪ Uber Michelangelo ▪ Airbnb – Bighead/Zipline ▪ Comcast ▪ Twitter ▪ GO-JEK Feast ▪ Conde Nast ▪ Facebook FB Learner ▪ Netflix ▪ Reference: www.featurestore.org
  • 6.
    Feature Store inBanking ▪ Problem: Manage TBs of Transactions as ML Features. Develop models to reduce costs of Fraud. ▪ Solution: Hopsworks provides the platform to train machine learning models to classify transactions as suspected for Fraud or not. The Fraud dataset contains billions of records (40 TB) and the solution involves using Deep Learning (GPUs) to detect structural patterns in bank transactions and temporal patterns based on the frequency of bank transactions executed. ▪ Reference: Swedbank Talk at Spark/AI EU Summit 2019
  • 7.
    Data Teams aremoving from Analytics to ML Event DataRaw Data Data LakeDATA PIPELINES BI Platforms SQL Data
  • 8.
    Data Teams aremoving from Analytics to ML Event DataRaw Data Data LakeDATA PIPELINES BI Platforms SQL Data FEATURE PIPELINES Feature Store Hopsworks MODEL TRAINING ONLINE MODEL SERVING ANALYTICAL MODEL SCORING (BATCH)
  • 9.
    Features are created/updatedat different cadences Click features every 10 secs CDC data every 30 secs User profile updates every hour Featurized weblogs data every day Online Feature Store Offline Feature Store SQL DW S3, HDFS SQL Event Data Real-Time Data User-Entered Features (<2 secs) Online App Low Latency Features High Latency Features Train, Batch App Feature Store No existing database is both scalable (PBs) and low latency (<10ms). Hence, online + offline Feature Stores. <10ms TBs/PBs
  • 10.
    FeatureGroup Ingestion inHopsworks Feature Store ClickFeatureGroup TableFeatureGroup UserFeatureGroup LogsFeatureGroup Event Data SQL DW S3, HDFS SQL DataFrameAPI Kafka Input Flink RTFeatureGroup Online App Train, Batch App User Clicks DB Updates User Profile Updates Weblogs Real-time features Kafka Output
  • 11.
    No More End-to-EndML Pipelines!
  • 12.
    Event DataRaw Data FeaturePipeline FEATURE STORE TRAIN/VALIDATE MODEL SERVING MONITOR Data Lake ML Pipelines start and stop at the Feature Store
  • 13.
    Feature Store Concepts Featuresname Pclass Sex Survive Name Balance Train / Test Datasets Survivename PClass Sex Balance Join key Feature Groups Titanic Passenger List Passenger Bank Account File format .tfrecord .npy .csv .hdf5, .petastorm, etc Storage GCS Amazon S3 HopsFS Features, FeatureGroups, and Train/Test Datasets are all versioned
  • 14.
    Register a FeatureGroupwith the Feature Store from hops import featurestore as fs df = # Spark or Pandas Dataframe # Do feature engineering on ‘df’ # Register Dataframe as FeatureGroup fs.create_featuregroup(df, ”titanic_df“, online=True)
  • 15.
    Hopsworks Feature Store RawData Structured Data Events Data Lake Online Feature Store Offline Feature Store Ingest Data From Used By Online Apps Batch Apps Create Train/Test Data
  • 16.
    Create Train/Test Datasetsusing the Feature Store from hops import featurestore as fs sample_data = fs.get_features([“name”, “Pclass”, “Sex”, “Balance”, “Survived”]) fs.create_training_dataset(sample_data, “titanic_training_dataset", data_format="tfrecords“, training_dataset_version=1)
  • 17.
    Online Feature Store US-West-la MySQL NDB1 Model OnlineApplication 1.JDBC 2.Predict 1. Build a Feature Vector using the Online Feature Store US-West-1c MySQL NDB3 Model ~5-50ms US-West-1b MySQL NDB2 Model 2-20ms 2. Send the Feature Vector to a Model for Prediction
  • 18.
    Good Decisions wetook in Version 1 ▪ General Purpose Data Frame API (DSL could be added later) ▪ Feature Store is a cache for materialized features, not a library. ▪ Online and Offline Feature Stores to support low latency and scale, respectively ▪ Reuse of Features means JOINS – Spark as a join engine
  • 19.
    Feature Store APIv2 ▪ Enforce feature-group scope and versioning (as best practice) ▪ Better support for multiple feature stores - join features from development and production feature stores ▪ Better support for complex joins of features ▪ First class API support for time-travel ▪ More consistent developer experience
  • 20.
    Connect and Supportfor Multiple Feature Stores import hsfs # Connect to the production feature store conn = hsfs.connection(host="ea2.aws.hopsworks.ai", project="prod") prod_fs = conn.get_feature_store() dev_fs = conn.get_feature_store(“dev”)
  • 21.
    Feature Group Operations #Create Feature group metadata fg = dev_fs.create_feature_group(“temperature”, description=”Temperature Features”, version = 1, online_enabled=True) # Schema is inferred from the dataframe fg.save(dataframe) # Read the feature group as dataframe df = fg.read() # Append more data to the feature group fg.insert(dataframe, overwrite=False)
  • 22.
    fg = dev_fs.get_feature_group(“temperature”,version = 1) fg.add_tag(“country”, “SE”) fg.add_tags({“country”: “SE”, “year”: 2020}) Tags ▪ Allow feature groups, features and training datasets to be discoverable ▪ Tags are searchable from the Hopsworks UI
  • 23.
    fg.add_feature("new_feature", type=”int”, default_value) SchemaVersion Management Non breaking schema changes (e.g. add a feature) can be applied without bumping the version.
  • 24.
    fg = dev_fs.get_feature_group(“temperature”,version=1) # Returns a dataframe object fg.read() # Show a sample of 10 rows in the feature group fg.show(10) fg.select(["date", "location", "avg"]).show(10) fg.select(["date”, “location”,”avg”]).read() .filter(col(“location”) == “Stockholm”).show(10) Exploratory Data Analysis
  • 25.
    Joins - PandasStyle API crop_fg = prod_fs.get_feature_group(“crop”, version = 1) temperature = dev_fs.get_feature_group(“temperature”, version = 1) rain = dev_fs.get_feature_group(“rain”, version = 1) joined_features = crop_fg.select(["location", "yield"]) .join(temperature.select(["location", “season_avg"])) .join(rain.select(["location", "avg_mm"]), on=["location"], join_type="left") dataframe = joined_features.read()
  • 26.
    Time-Travel fs.get_feature_group("temperature", version =1, wallclock_time=None, wallclock_time_start=None, wallclock_time_end=None) ▪ Explore how the feature group looked like at X point in time in the past ▪ List value changes between timestamps
  • 27.
    Create Train/Test Datafrom Joined Features connector = fs.get_storage_connector("s3connector", "S3") td = fs.create_training_dataset(name='crop_model', description='Dataset to train the crop model', version=1, data_format='tfrecords', connector = connector, splits={'train': 0.7,'test': 0.2,'validate': 0.1}) td.save(joined_features)
  • 28.
    fs.get_feature_vector(training_dataset=”crop”, id = [{ ‘location’:‘Stockholm’, ‘crop’: ‘wheat’ }]) Get feature vector for online serving ▪ Return feature vector from the online feature store ▪ Feature order is maintained
  • 29.
    Demo Using Hopsworks FeatureStore in Databricks
  • 30.
    Thank You! Get Started hopsworks.ai github.com/logicalclocks/hopsworks Twitter @logicalclocks Web www.logicalclocks.com FeatureStore contributions from colleagues ▪ Moritz Meister ▪ Kim Hammar ▪ Alex Ormenisan ▪ Robin Andersson ▪ Ermias Gebremeskel ▪ Theofilos Kakantousis Thanks to the Logical Clocks Team!
  • 31.
    Feedback Your feedback isimportant to us. Don’t forget to rate and review the sessions.