Pulkit Bhanot, Amit Nene Risk Platform Large-scale Feature Aggregation Using Apache Spark #Dev1SAIS
#Dev1SAIS • Motivation • Challenges • Architecture Deep Dive • Role of Spark • Takeaways 2 Agenda
#Dev1SAIS Build a scalable, self-service Feature Engineering Platform for predictive decisioning (based on ML and Business Rules) • Feature Engineering: use of domain knowledge to create Features • Self-Service for Data Scientists and Analysts without reliance on Engineers • Generic Platform: consolidating work towards wider ML efforts at Uber 3 Team Mission
#Dev1SAIS Promotions Detect and prevent bad actors in real-time number of trips over X hours/weeks trips cancelled over Y months count of referrals over lifetime ... 4 Payments Sample use case: Fraud detection
#Dev1SAIS Indexed databases Streaming aggregations Needs • Lifetime of entity • Sliding long-window: days/weeks/months • Sliding short-window: mins/hours • Real-time Existing solutions • None satisfies all of above • Complex onboarding Warehouse None fits the bill! 5 Time Series Aggregations
#Dev1SAIS • Scale: 1000s of aggregations for 100s million of business entities • Long-window aggregation queries slow even with indexes (seconds). Millis at high QPS needed. • Onboarding complexity: many moving parts • Recovery from accrual of errors 6 Technical Challenges
#Dev1SAIS One-stop shop for aggregation • Single system to interact with • Single spec: automate configurations of underlying system Scalable • Leverage the scale of Batch system for long window • Combine with real-time aggregation for freshness • Rollups: aggregate over time intervals • Fast query over rolled up aggregates • Caveat: summable functions Self-healing • Batch system auto-corrects any errors 7 Our approach
#Dev1SAIS Aggregator Aggregated Features Raw Events: Streaming+Hive Aggregation Function: sum, count, etc. Aggregation Window: LTD, 7 days, 30 days, etc. Grain Size: 5 min (realtime), 1 day (offline), etc. Input parameters to black box ➔ Source events ➔ Grain size ➔ Aggregation functions ➔ Aggregation windows 8 Aggregator as a black box
#Dev1SAIS Specs Feature Store Batch Aggregator (Spark Apps) Real-time Aggregator (Streaming) Feature Access (Microservice) • Batch (Spark) – Long-window: weeks, months – Bootstrap, incremental modes • Streaming (e.g. Kafka events) – Short-window (<24 hrs) – Near-real time • Real-time Access – Merge offline and streaming • Feature Store – Save rolled-up aggregates in Hive and Cassandra 9 1 1 2 2 3 4 Overall architecture
#Dev1SAIS Computation Hive Specs Batch Aggregator (Spark Apps) Feature Store (Cassandra) Feature Extractor Scheduler Rollup Generator Bootstrap Periodic Snapshot Manager Optimizer Feature Access Tbl1:<2018-04-10> Tb1:<2018-04-09> Optimized Snapshot Full Snapshot Incremental Snapshot Dispersal Decisioning System 10 Batch Spark Engine 1 2 3 4 5 6 7 8 9 10 Optimized Snapshot
#Dev1SAIS Hive Tbl1:<2018-04-13> ATable-1_daily:<2018-04-13> ATable-1_LTD:<2018-04-13> Table-1 -partition-<2018-04-10> col1, col2, col3, col4 -partition-<2018-04-11> col1, col2, col3, col4 -partition-<2018-04-12> col1, col2, col3, col4 -partition-<2018-04-13> col1, col2, col3, col4 Tbl1:<2018-04-10> Tbl1:<2018-04-13> ……. ATable-1_Lifetime -partition-<2018-04-10> uuid, f1_ltd, f2_ltd -partition-<2018-04-11> uuid, f1_ltd, f2_ltd -partition-<2018-04-12> uuid, f1_ltd, f2_ltd -partition-<2018-04-13> uuid, f1_ltd, f2_ltd ATable-1_daily -partition-<2018-04-10> uuid, f1, f2 -partition-<2018-04-11> uuid, f1, f2 -partition-<2018-04-12> uuid, f1, f2 -partition-<2018-04-13> uuid, f1, f2 ATable-1_joined -partition-<2018-04-10> uuid, f1, f2, f1_ltd, f2_ltd -partition-<2018-04-11> uuid, f1, f2, f1_ltd, f2_ltd -partition-<2018-04-12> uuid, f1, f2, f1_ltd, f2_ltd -partition-<2018-04-13> uuid, f1, f2, f1_ltd, f2_ltd Daily Partitioned Source Tables Rolled-up Tables Features involving Lifetime computation. Features involving sliding window computation. Dispersed to real- time store 11 Batch Storage Daily Lifetime Snapshot Daily Incremental Rollup
#Dev1SAIS 12 ● Orchestrator of ETL pipelines ○ Scheduling of subtasks ○ Record incremental progress ● Optimally resize HDFS files: scale with size of data set. ● Rich set of APIs to enable complex optimizations e.g of an optimization in bootstrap dispersal dailyDataset.join( ltdData, JavaConverters.asScalaIteratorConverter( Arrays.asList(pipelineConfig.getEntityKey()).iterator()) .asScala() .toSeq(), "outer"); uuid _ltd daily_buckets 44b7dc88 1534 [{"2017-10-24":"4"},{"2017-08- 22":"3"},{"2017-09-21":"4"},{"2017- 08-08":"3"},{"2017-10- 03":"3"},{"2017-10-19":"5"},{"2017- 09-06":"1"},{"2017-08- 17":"5"},{"2017-09-09":"12"},{"2017- 10-05":"5"},{"2017-09- 25":"4"},{"2017-09-17":"13"}] Role of Spark
#Dev1SAIS 13 • Ability to disperse billions of records – HashPartitioner to the rescue //Partition the data by hash HashPartitioner hashPartitioner = new HashPartitioner(partitionNumber); JavaPairRDD<String, Row> hashedRDD = keyedRDD.partitionBy(hashPartitioner); //Fetch each hash partition and process foreach partition{ JavaRDD<Tuple2<String, Row>> filteredHashRDD = filterRows(hashedRDD, index, paritionId); raise error if partition mismatch Dataset<Row> filteredDataSet = etlContext.getSparkSession().createDataset(filteredHashRDD.map(tuple -> tuple._2()).rdd(), data.org$apache$spark$sql$Dataset$$encoder); //repartition filteredDataSet, update checkpoint and records processed after successful completion. Paren t RDD P1 P2 P3 Pn ….. Process Each Partition Role of Spark (continued)
#Dev1SAIS 14 2018-02-01 2018-02-02 2018-02-03 2018-03-01 …. Dispersal C* 2018-02-01 2018-02-02 2018-02-03 2018-03-01 Bootstrap • Global Throttling – Feature Store can be the bottleneck – coalesce() to limit the executors • Inspect data – Disperse only if any column has changed • Monitoring and alert – create custom metrics Role of Spark in Dispersal Full computation snapshots Optimized snapshots
#Dev1SAIS ● Real-time, summable aggregations for < 24 hours ● Semantically equivalent to offline computation ● Aggregation rollups (5 mins) maintained in feature store (Cassandra) event enrichment raw kafka events microservices xform_0 xform_1 xform_2 streaming computation pipelines time window aggregator C* RPCs Uber Athena streaming 15 Real-time streaming engine
#Dev1SAIS ● Uses time series and clustering key support in Cassandra ○ 1 table for Lifetime & LTD values. ○ Multiple tables for realtime values with grain size 5M ● Consult metadata and assemble into single result at feature access time entity_common_aggr_bt_ltd UUID (PK) trip_count_ltd entity_common_aggr_bt UUID (PK) eventbucket (CK) trip_count entity_common_aggr_rt_2018_05_08 entity_common_aggr_rt_2018_05_09 entity_common_aggr_rt_2018_05_10 UUID (PK) eventbucket (CK) trip_coun t Feature access Service Metadata Service Query Planner e.g - lifetime trip count - trips over last 51 hrs - trips over previous 2 days 16 Final aggregation in real time
#Dev1SAIS Create Query (Spark SQL) Configure Spec Commit to Prod Test Spec 17 Self-service onboarding
#Dev1SAIS Backfill Support: what is the value of a feature f1 for an entity E1 from Thist to Tnow • Bootstrap to historic point in time: Thist • Incrementally compute from Thist to Tnow How ? • Lifetime: feature f1 on Thist access partition Thist • Windowed: feature f2 on Thist with window N days • Merge partitions Thist-N to Thist 18 Machine learning support T-120 T-119 T-90 T-1 T ….. ….. Lifetime value on a given date Last 30 day trips at T-90 Last 30 day trips at T-89
#Dev1SAIS ● Use of Spark to achieve massive scale ● Combine with Streaming aggregation for freshness ● Low latency access in production (P99 <= 20ms) at high QPS ● Simplify onboarding via single spec, onboarding time in hours ● Huge computational cost improvements 19 Takeaways
Proprietary and confidential © 2018 Uber Technologies, Inc. All rights reserved. No part of this document may be reproduced or utilized in any form or by any means, electronic or mechanical, including photocopying, recording, or by any information storage or retrieval systems, without permission in writing from Uber. This document is intended only for the use of the individual or entity to whom it is addressed and contains information that is privileged, confidential or otherwise exempt from disclosure under applicable law. All recipients of this document are notified that the information contained herein includes proprietary and confidential information of Uber, and recipient may not make use of, disseminate, or in any way disclose this document or any of the enclosed information to any person other than employees of addressee to the extent necessary for consultations with authorized personnel of Uber. bhanotp@uber.com anene@uber.com

Large Scale Feature Aggregation Using Apache Spark with Pulkit Bhanot and Amit Nene

  • 1.
    Pulkit Bhanot, AmitNene Risk Platform Large-scale Feature Aggregation Using Apache Spark #Dev1SAIS
  • 2.
    #Dev1SAIS • Motivation • Challenges •Architecture Deep Dive • Role of Spark • Takeaways 2 Agenda
  • 3.
    #Dev1SAIS Build a scalable,self-service Feature Engineering Platform for predictive decisioning (based on ML and Business Rules) • Feature Engineering: use of domain knowledge to create Features • Self-Service for Data Scientists and Analysts without reliance on Engineers • Generic Platform: consolidating work towards wider ML efforts at Uber 3 Team Mission
  • 4.
    #Dev1SAIS Promotions Detect and preventbad actors in real-time number of trips over X hours/weeks trips cancelled over Y months count of referrals over lifetime ... 4 Payments Sample use case: Fraud detection
  • 5.
    #Dev1SAIS Indexed databases Streaming aggregations Needs • Lifetime ofentity • Sliding long-window: days/weeks/months • Sliding short-window: mins/hours • Real-time Existing solutions • None satisfies all of above • Complex onboarding Warehouse None fits the bill! 5 Time Series Aggregations
  • 6.
    #Dev1SAIS • Scale: 1000sof aggregations for 100s million of business entities • Long-window aggregation queries slow even with indexes (seconds). Millis at high QPS needed. • Onboarding complexity: many moving parts • Recovery from accrual of errors 6 Technical Challenges
  • 7.
    #Dev1SAIS One-stop shop foraggregation • Single system to interact with • Single spec: automate configurations of underlying system Scalable • Leverage the scale of Batch system for long window • Combine with real-time aggregation for freshness • Rollups: aggregate over time intervals • Fast query over rolled up aggregates • Caveat: summable functions Self-healing • Batch system auto-corrects any errors 7 Our approach
  • 8.
    #Dev1SAIS Aggregator Aggregated Features Raw Events: Streaming+Hive Aggregation Function: sum, count,etc. Aggregation Window: LTD, 7 days, 30 days, etc. Grain Size: 5 min (realtime), 1 day (offline), etc. Input parameters to black box ➔ Source events ➔ Grain size ➔ Aggregation functions ➔ Aggregation windows 8 Aggregator as a black box
  • 9.
    #Dev1SAIS Specs Feature Store Batch Aggregator (Spark Apps) Real-time Aggregator (Streaming) FeatureAccess (Microservice) • Batch (Spark) – Long-window: weeks, months – Bootstrap, incremental modes • Streaming (e.g. Kafka events) – Short-window (<24 hrs) – Near-real time • Real-time Access – Merge offline and streaming • Feature Store – Save rolled-up aggregates in Hive and Cassandra 9 1 1 2 2 3 4 Overall architecture
  • 10.
    #Dev1SAIS Computation Hive Specs Batch Aggregator (SparkApps) Feature Store (Cassandra) Feature Extractor Scheduler Rollup Generator Bootstrap Periodic Snapshot Manager Optimizer Feature Access Tbl1:<2018-04-10> Tb1:<2018-04-09> Optimized Snapshot Full Snapshot Incremental Snapshot Dispersal Decisioning System 10 Batch Spark Engine 1 2 3 4 5 6 7 8 9 10 Optimized Snapshot
  • 11.
    #Dev1SAIS Hive Tbl1:<2018-04-13> ATable-1_daily:<2018-04-13> ATable-1_LTD:<2018-04-13> Table-1 -partition-<2018-04-10> col1, col2,col3, col4 -partition-<2018-04-11> col1, col2, col3, col4 -partition-<2018-04-12> col1, col2, col3, col4 -partition-<2018-04-13> col1, col2, col3, col4 Tbl1:<2018-04-10> Tbl1:<2018-04-13> ……. ATable-1_Lifetime -partition-<2018-04-10> uuid, f1_ltd, f2_ltd -partition-<2018-04-11> uuid, f1_ltd, f2_ltd -partition-<2018-04-12> uuid, f1_ltd, f2_ltd -partition-<2018-04-13> uuid, f1_ltd, f2_ltd ATable-1_daily -partition-<2018-04-10> uuid, f1, f2 -partition-<2018-04-11> uuid, f1, f2 -partition-<2018-04-12> uuid, f1, f2 -partition-<2018-04-13> uuid, f1, f2 ATable-1_joined -partition-<2018-04-10> uuid, f1, f2, f1_ltd, f2_ltd -partition-<2018-04-11> uuid, f1, f2, f1_ltd, f2_ltd -partition-<2018-04-12> uuid, f1, f2, f1_ltd, f2_ltd -partition-<2018-04-13> uuid, f1, f2, f1_ltd, f2_ltd Daily Partitioned Source Tables Rolled-up Tables Features involving Lifetime computation. Features involving sliding window computation. Dispersed to real- time store 11 Batch Storage Daily Lifetime Snapshot Daily Incremental Rollup
  • 12.
    #Dev1SAIS 12 ● Orchestratorof ETL pipelines ○ Scheduling of subtasks ○ Record incremental progress ● Optimally resize HDFS files: scale with size of data set. ● Rich set of APIs to enable complex optimizations e.g of an optimization in bootstrap dispersal dailyDataset.join( ltdData, JavaConverters.asScalaIteratorConverter( Arrays.asList(pipelineConfig.getEntityKey()).iterator()) .asScala() .toSeq(), "outer"); uuid _ltd daily_buckets 44b7dc88 1534 [{"2017-10-24":"4"},{"2017-08- 22":"3"},{"2017-09-21":"4"},{"2017- 08-08":"3"},{"2017-10- 03":"3"},{"2017-10-19":"5"},{"2017- 09-06":"1"},{"2017-08- 17":"5"},{"2017-09-09":"12"},{"2017- 10-05":"5"},{"2017-09- 25":"4"},{"2017-09-17":"13"}] Role of Spark
  • 13.
    #Dev1SAIS 13 • Abilityto disperse billions of records – HashPartitioner to the rescue //Partition the data by hash HashPartitioner hashPartitioner = new HashPartitioner(partitionNumber); JavaPairRDD<String, Row> hashedRDD = keyedRDD.partitionBy(hashPartitioner); //Fetch each hash partition and process foreach partition{ JavaRDD<Tuple2<String, Row>> filteredHashRDD = filterRows(hashedRDD, index, paritionId); raise error if partition mismatch Dataset<Row> filteredDataSet = etlContext.getSparkSession().createDataset(filteredHashRDD.map(tuple -> tuple._2()).rdd(), data.org$apache$spark$sql$Dataset$$encoder); //repartition filteredDataSet, update checkpoint and records processed after successful completion. Paren t RDD P1 P2 P3 Pn ….. Process Each Partition Role of Spark (continued)
  • 14.
    #Dev1SAIS 14 2018-02-01 2018-02-02 2018-02-03 2018-03-01 …. Dispersal C* 2018-02-01 2018-02-02 2018-02-03 2018-03-01 Bootstrap •Global Throttling – Feature Store can be the bottleneck – coalesce() to limit the executors • Inspect data – Disperse only if any column has changed • Monitoring and alert – create custom metrics Role of Spark in Dispersal Full computation snapshots Optimized snapshots
  • 15.
    #Dev1SAIS ● Real-time, summable aggregationsfor < 24 hours ● Semantically equivalent to offline computation ● Aggregation rollups (5 mins) maintained in feature store (Cassandra) event enrichment raw kafka events microservices xform_0 xform_1 xform_2 streaming computation pipelines time window aggregator C* RPCs Uber Athena streaming 15 Real-time streaming engine
  • 16.
    #Dev1SAIS ● Uses timeseries and clustering key support in Cassandra ○ 1 table for Lifetime & LTD values. ○ Multiple tables for realtime values with grain size 5M ● Consult metadata and assemble into single result at feature access time entity_common_aggr_bt_ltd UUID (PK) trip_count_ltd entity_common_aggr_bt UUID (PK) eventbucket (CK) trip_count entity_common_aggr_rt_2018_05_08 entity_common_aggr_rt_2018_05_09 entity_common_aggr_rt_2018_05_10 UUID (PK) eventbucket (CK) trip_coun t Feature access Service Metadata Service Query Planner e.g - lifetime trip count - trips over last 51 hrs - trips over previous 2 days 16 Final aggregation in real time
  • 17.
    #Dev1SAIS Create Query (Spark SQL) ConfigureSpec Commit to Prod Test Spec 17 Self-service onboarding
  • 18.
    #Dev1SAIS Backfill Support: whatis the value of a feature f1 for an entity E1 from Thist to Tnow • Bootstrap to historic point in time: Thist • Incrementally compute from Thist to Tnow How ? • Lifetime: feature f1 on Thist access partition Thist • Windowed: feature f2 on Thist with window N days • Merge partitions Thist-N to Thist 18 Machine learning support T-120 T-119 T-90 T-1 T ….. ….. Lifetime value on a given date Last 30 day trips at T-90 Last 30 day trips at T-89
  • 19.
    #Dev1SAIS ● Use ofSpark to achieve massive scale ● Combine with Streaming aggregation for freshness ● Low latency access in production (P99 <= 20ms) at high QPS ● Simplify onboarding via single spec, onboarding time in hours ● Huge computational cost improvements 19 Takeaways
  • 20.
    Proprietary and confidential© 2018 Uber Technologies, Inc. All rights reserved. No part of this document may be reproduced or utilized in any form or by any means, electronic or mechanical, including photocopying, recording, or by any information storage or retrieval systems, without permission in writing from Uber. This document is intended only for the use of the individual or entity to whom it is addressed and contains information that is privileged, confidential or otherwise exempt from disclosure under applicable law. All recipients of this document are notified that the information contained herein includes proprietary and confidential information of Uber, and recipient may not make use of, disseminate, or in any way disclose this document or any of the enclosed information to any person other than employees of addressee to the extent necessary for consultations with authorized personnel of Uber. bhanotp@uber.com anene@uber.com