Pandas UDF Scalable Analysis with Python and PySpark Li Jin, Two Sigma Investments
About Me • Li Jin (icexelloss) • Software Engineer @ Two Sigma Investments • Analytics Tools Smith • Apache Arrow Committer • Other Open Source Projects: – Flint: A Time Series Library on Spark 2
Important Legal Information • The information presented here is offered for informational purposes only and should not be used for any other purpose (including, without limitation, the making of investment decisions). Examples provided herein are for illustrative purposes only and are not necessarily based on actual data. Nothing herein constitutes: an offer to sell or the solicitation of any offer to buy any security or other interest; tax advice; or investment advice. This presentation shall remain the property of Two Sigma Investments, LP (“Two Sigma”) and Two Sigma reserves the right to require the return of this presentation at any time. • Some of the images, logos or other material used herein may be protected by copyright and/or trademark. If so, such copyrights and/or trademarks are most likely owned by the entity that created the material and are used purely for identification and comment as fair use under international copyright and/or trademark laws. Use of such image, copyright or trademark does not imply any association with such organization (or endorsement of such organization) by Two Sigma, nor vice versa. • Copyright © 2018 TWO SIGMA INVESTMENTS, LP. All rights reserved 3
Outline • Overview: Data Science in Python and Spark • Pandas UDF in Spark 2.3 • Ongoing work 4
Overview: Data Science in Python and Spark 5
Predictive Modeling Read Data Data Cleaning Data Manipulation Feature Engineering Model Training Model Testing 6
Predictive Modeling (Python) Read Data Data Cleaning Data Manipulation Feature Engineering Model Training Model Testing pandas pandas numpy pandas numpy scipy sklearn sklearn 7
Predictive Modeling (Spark) Read Data Data Cleaning Data Manipulation Feature Engineering Model Training Model Testing Spark SQL Spark SQL Spark SQL Spark ML Spark ML Spark ML 8
The Problem…Feature Gap • Many functionality in Python is not available or easy in Spark 9
Stack Overflow Answer: Forward Fill (Python) 10
Stack Overflow Answer: Forward Fill (Spark) 11
Stack Overflow Answer: Forward Fill (Spark) 12
Feature Gap: Forward Fill • Spark SQL: – Previous/Next observation • Python: – Previous/Next observation – Interpolation • Linear • Quadratic • … 13
Feature Gap between Spark and Python • Data Cleaning and Manipulation – Fill missing values (pandas.DataFrame.fillna) – Rank features (scipy.stats.percentileofscore) – Exponential moving average (pandas.DataFrame.ewm) – Power transformations (scipy.stats.boxcox) – … • Modeling Training – … 14
Spark and Python Spark Scalable Python Functionality? 15
Pandas UDF in Spark 2.3 16
Strength of Spark and Python • How (Spark SQL) – For each row – For each group – Over rolling window – Over entire data – … • What (Python) – Filling missing value – Rank features – … 17
Combine What and How: PySpark UDF • Interface for extending Spark with native Python libraries • UDF is executed in a separate Python process • Data is transferred between Python and Java 18
Existing UDF • Python function on each Row • Data serialized using Pickle • Data as Python objects (Python integer, Python lists, …) 19
Existing UDF (Functionality) • How (Spark SQL) – For each row – For each group – Over rolling window – Over entire data – … • What (Python) – Filling missing value – Rank features – … Most relational functionality is taken away 20
Existing UDF (Usability) v – v.mean() / v.std() groupby year month 21
Existing UDF (Usability) 80% of the code is boilerplate 22
Existing UDF (Performance) 8 Mb/s 91.8% in Ser/DeserProfile UDF lambda x: x + 1 23
Challenge • More expressive API • Efficient data transfer between Java and Python (Serialization) • Efficient data operation in Python 24
Pandas UDF in Spark 2.3: Scalar and Grouped Map 25
Existing UDF vs Pandas UDF Existing UDF • Function on Row • Pickle serialization • Data as Python objects Pandas UDF • Function on Row, Group and Window • Arrow serialization • Data as pd.Series (for column) and pd.DataFrame (for table) 26
Apache Arrow • In memory columnar format for data analysis • Low cost to transfer between systems 27
Apache Arrow Before With Arrow Pandas Drill Impala HBase KuduCassandra Parquet Spark Copy & Convert Copy & Convert Copy & Convert Copy & Convert Copy & Convert Pandas Drill Impala HBase KuduCassandra Parquet Spark Arrow Memory
Scalar Serialize row batch to pd.Series using Arrow Apply function (N -> N mapping) on pd.Series Spark Partition 29
Scalar Example: millisecond to timestamp 30
Scalar Example: cumulative density function 31
Grouped Map • Operations on Groups of Rows – Each group: N -> Any – Similar to flatMapGroups and “groupby apply” in Pandas 32
Grouped Map Key A B C Key A A B Key A A A Key B B C groupBy Serialize group to pd.DataFrame using Arrow Apply function (pd.DataFrame -> pd.DataFrame) for each group Key A A A Key B B C 33
Grouped Map Example: Backward Fill 34
Grouped Map Example: Model Fitting 35
Grouped Map Example: Model Fitting Define constants and output schema 36
Grouped Map Example: Model Fitting Define model (linear regression) 37
Improvements and limitations 38
Improvement (Usability) Before After 39
Improvement (Performance) https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html 40
Pandas UDF limitations • Must split data • (Grouped Map) Each group must fit entirely in memory 41
Ongoing Work 42
Pandas UDF Roadmap • Spark-22216 • Released in Spark 2.3 – Scalar – Grouped Map • Ongoing – Grouped Aggregate (not yet released) – Window (work in progress) – Memory efficiency – Complete type support (struct type, map type) 43
Thank you 44

Pandas UDF: Scalable Analysis with Python and PySpark