|
| 1 | +{ |
| 2 | + "cells": [ |
| 3 | + { |
| 4 | + "cell_type": "markdown", |
| 5 | + "metadata": {}, |
| 6 | + "source": [ |
| 7 | + "# Deploying Python ML in PySpark\n", |
| 8 | + "\n", |
| 9 | + "----\n", |
| 10 | + "\n", |
| 11 | + "This notebook intends to introduce a PySpark `pandas_udf` function that can be used to **deploy both python ML models and sophisticated pipelines**. With this in mind please excuse the barbaric use of `RandomForestRegressor`. \n", |
| 12 | + "\n", |
| 13 | + "In this notebook we deploy sklearn's `RandomForestRegressor` in PySpark. The [Titanic](https://raw.githubusercontent.com/amueller/scipy-2017-sklearn/091d371/notebooks/datasets/titanic3.csv) dataset is used simply for convenience. In the examples below a number of features are used to estimate ticket \"fare\", we will fit our models/pipelines in pandas and deploy in PySpark. **Please be aware** this notebook uses the `pyspark` package, therefore any user looking to run the cells below will need to install PySpark.\n", |
| 14 | + "\n", |
| 15 | + "In practice the model used below can be replaced by any other predictive python model, be that a `RandomForestClassifier`, `XGBoost`, `LightGBM` or any other package you care to use with an sklearn like API." |
| 16 | + ] |
| 17 | + }, |
| 18 | + { |
| 19 | + "cell_type": "code", |
| 20 | + "execution_count": null, |
| 21 | + "metadata": {}, |
| 22 | + "outputs": [], |
| 23 | + "source": [ |
| 24 | + "import shutil\n", |
| 25 | + "from datetime import datetime, timedelta\n", |
| 26 | + "\n", |
| 27 | + "import numpy as np\n", |
| 28 | + "import pandas as pd\n", |
| 29 | + "from sklearn.ensemble import RandomForestRegressor\n", |
| 30 | + "from sklearn.pipeline import Pipeline\n", |
| 31 | + "from sklearn.preprocessing import MinMaxScaler, OrdinalEncoder\n", |
| 32 | + "from sklearn.compose import ColumnTransformer\n", |
| 33 | + "import pyspark.sql\n", |
| 34 | + "from pyspark.sql import SparkSession\n", |
| 35 | + "import pyspark.sql.functions as sf\n", |
| 36 | + "from pyspark.sql.types import DoubleType\n", |
| 37 | + "import pyarrow" |
| 38 | + ] |
| 39 | + }, |
| 40 | + { |
| 41 | + "cell_type": "code", |
| 42 | + "execution_count": null, |
| 43 | + "metadata": {}, |
| 44 | + "outputs": [], |
| 45 | + "source": [ |
| 46 | + "# Defining data path, target and features\n", |
| 47 | + "TITANIC_URL = \"https://raw.githubusercontent.com/amueller/scipy-2017-sklearn/091d371/notebooks/datasets/titanic3.csv\"\n", |
| 48 | + "TARGET = \"fare\"\n", |
| 49 | + "NUMERICAL_FEATURES = [\n", |
| 50 | + " \"sibsp\",\n", |
| 51 | + " \"parch\",\n", |
| 52 | + " \"age\"\n", |
| 53 | + "]\n", |
| 54 | + "CATEGORICAL_FEATURES = [\n", |
| 55 | + " \"sex\",\n", |
| 56 | + " \"cabin\"\n", |
| 57 | + "]\n", |
| 58 | + "ALL_FEATURES = NUMERICAL_FEATURES + CATEGORICAL_FEATURES" |
| 59 | + ] |
| 60 | + }, |
| 61 | + { |
| 62 | + "cell_type": "code", |
| 63 | + "execution_count": null, |
| 64 | + "metadata": {}, |
| 65 | + "outputs": [], |
| 66 | + "source": [ |
| 67 | + "# It is necessary for us to set a SparkSession\n", |
| 68 | + "for dir in [\"metastore_db\", \"derby.log\", \".cache\"]:\n", |
| 69 | + " try:\n", |
| 70 | + " shutil.rmtree(dir)\n", |
| 71 | + " except OSError:\n", |
| 72 | + " pass\n", |
| 73 | + "\n", |
| 74 | + "spark = (SparkSession.builder\n", |
| 75 | + " .master(\"local[2]\")\n", |
| 76 | + " .appName(\"sklearn-deploy\")\n", |
| 77 | + " .config(\"spark.ui.enabled\", \"false\")\n", |
| 78 | + " .getOrCreate()\n", |
| 79 | + " )" |
| 80 | + ] |
| 81 | + }, |
| 82 | + { |
| 83 | + "cell_type": "code", |
| 84 | + "execution_count": null, |
| 85 | + "metadata": {}, |
| 86 | + "outputs": [], |
| 87 | + "source": [ |
| 88 | + "# Read the df, select relevant columns and drop any NaNs\n", |
| 89 | + "df = (\n", |
| 90 | + " pd.read_csv(TITANIC_URL)[NUMERICAL_FEATURES + CATEGORICAL_FEATURES + [TARGET]]\n", |
| 91 | + " .dropna()\n", |
| 92 | + ")\n", |
| 93 | + "\n", |
| 94 | + "for num_feat in NUMERICAL_FEATURES:\n", |
| 95 | + " df[num_feat] = df[num_feat].astype(float)" |
| 96 | + ] |
| 97 | + }, |
| 98 | + { |
| 99 | + "cell_type": "code", |
| 100 | + "execution_count": null, |
| 101 | + "metadata": {}, |
| 102 | + "outputs": [], |
| 103 | + "source": [ |
| 104 | + "# We have a number of numerical features: sibsp, parch and age.\n", |
| 105 | + "# And a two categorical features: cabin and sex\n", |
| 106 | + "# We will use those features to predict fare\n", |
| 107 | + "df.head()" |
| 108 | + ] |
| 109 | + }, |
| 110 | + { |
| 111 | + "cell_type": "code", |
| 112 | + "execution_count": null, |
| 113 | + "metadata": {}, |
| 114 | + "outputs": [], |
| 115 | + "source": [ |
| 116 | + "# In order to deploy our python model in PySpark we need a PySpark DataFrame\n", |
| 117 | + "ddf = spark.createDataFrame(df)\n", |
| 118 | + "ddf.show(5)" |
| 119 | + ] |
| 120 | + }, |
| 121 | + { |
| 122 | + "cell_type": "markdown", |
| 123 | + "metadata": {}, |
| 124 | + "source": [ |
| 125 | + "# (i) Deploying a simple Random Forest\n", |
| 126 | + "----\n", |
| 127 | + "\n", |
| 128 | + "To keep things simple we will start by using our predefined `NUMERICAL_FEATURES` to predict \"fare\". The cell below fits the model to all the data, in practice this is not advisable. Our goal is simply to create an object that is capable of making predictions, in the context of this quality of those predictions is of no interest.\n", |
| 129 | + "\n", |
| 130 | + "`spark_predict` is used to deploy our model in PySpark. The function is a wrapper around a `pandas_udf`, a wrapper is used to enable a python ml model to be passed to the `pandas_udf`. The function is based on the excellent blog post [\"Prediction at Scale with scikit-learn and PySpark Pandas UDFs\"](https://medium.com/civis-analytics/prediction-at-scale-with-scikit-learn-and-pyspark-pandas-udfs-51d5ebfb2cd8) written by **Michael Heilman**." |
| 131 | + ] |
| 132 | + }, |
| 133 | + { |
| 134 | + "cell_type": "code", |
| 135 | + "execution_count": null, |
| 136 | + "metadata": {}, |
| 137 | + "outputs": [], |
| 138 | + "source": [ |
| 139 | + "def spark_predict(model, *cols) -> pyspark.sql.column:\n", |
| 140 | + " \"\"\"This function deploys python ml in PySpark using the `predict` method of the `model` parameter.\n", |
| 141 | + " \n", |
| 142 | + " Args:\n", |
| 143 | + " model: python ml model with sklearn API\n", |
| 144 | + " *cols (list-like): Features used for predictions, required to be present as columns in the spark \n", |
| 145 | + " DataFrame used to make predictions.\n", |
| 146 | + " \"\"\"\n", |
| 147 | + " @sf.pandas_udf(returnType=DoubleType())\n", |
| 148 | + " def predict_pandas_udf(*cols):\n", |
| 149 | + " # cols will be a tuple of pandas.Series here.\n", |
| 150 | + " X = pd.concat(cols, axis=1)\n", |
| 151 | + " return pd.Series(model.predict(X))\n", |
| 152 | + " \n", |
| 153 | + " return predict_pandas_udf(*cols)" |
| 154 | + ] |
| 155 | + }, |
| 156 | + { |
| 157 | + "cell_type": "code", |
| 158 | + "execution_count": null, |
| 159 | + "metadata": {}, |
| 160 | + "outputs": [], |
| 161 | + "source": [ |
| 162 | + "rf = RandomForestRegressor()\n", |
| 163 | + "rf = rf.fit(df[NUMERICAL_FEATURES], df[TARGET])" |
| 164 | + ] |
| 165 | + }, |
| 166 | + { |
| 167 | + "cell_type": "code", |
| 168 | + "execution_count": null, |
| 169 | + "metadata": {}, |
| 170 | + "outputs": [], |
| 171 | + "source": [ |
| 172 | + "# Let's make some predictions for comparison against our PySpark predictions.\n", |
| 173 | + "rf.predict(df[NUMERICAL_FEATURES])[:5]" |
| 174 | + ] |
| 175 | + }, |
| 176 | + { |
| 177 | + "cell_type": "code", |
| 178 | + "execution_count": null, |
| 179 | + "metadata": {}, |
| 180 | + "outputs": [], |
| 181 | + "source": [ |
| 182 | + "# Here we deploy our model in PySpark using our previously defined `spark_predict`.\n", |
| 183 | + "# Upon looking at the DataFrame printed below we can see that the predictions in PySpark are same as made in python\n", |
| 184 | + "(\n", |
| 185 | + " ddf\n", |
| 186 | + " .select(NUMERICAL_FEATURES + [TARGET])\n", |
| 187 | + " .withColumn(\"prediction\", spark_predict(rf, *NUMERICAL_FEATURES).alias(\"prediction\"))\n", |
| 188 | + " .show(5)\n", |
| 189 | + ")" |
| 190 | + ] |
| 191 | + }, |
| 192 | + { |
| 193 | + "cell_type": "markdown", |
| 194 | + "metadata": {}, |
| 195 | + "source": [ |
| 196 | + "# (ii) Deploying a Pipeline with Feature Scaling\n", |
| 197 | + "----\n", |
| 198 | + "\n", |
| 199 | + "It is common practice to scale numerical features, so in the example below we make things a little more interesting by scaling our `NUMERICAL_FEATURES` before fitting our model and making predictions. Feature scaling is performed using sklearn's `Pipeline`." |
| 200 | + ] |
| 201 | + }, |
| 202 | + { |
| 203 | + "cell_type": "code", |
| 204 | + "execution_count": null, |
| 205 | + "metadata": {}, |
| 206 | + "outputs": [], |
| 207 | + "source": [ |
| 208 | + "# Construct and fit a `Pipeline` to our Titanic dataset\n", |
| 209 | + "pipe = Pipeline(steps=[(\"scaler\", MinMaxScaler()), (\"predictor\", RandomForestRegressor())])\n", |
| 210 | + "pipe = pipe.fit(df[NUMERICAL_FEATURES], df[TARGET])" |
| 211 | + ] |
| 212 | + }, |
| 213 | + { |
| 214 | + "cell_type": "code", |
| 215 | + "execution_count": null, |
| 216 | + "metadata": {}, |
| 217 | + "outputs": [], |
| 218 | + "source": [ |
| 219 | + "# Again let's make some predictions for comparison against our PySpark predictions.\n", |
| 220 | + "pipe.predict(df[NUMERICAL_FEATURES])[:5]" |
| 221 | + ] |
| 222 | + }, |
| 223 | + { |
| 224 | + "cell_type": "code", |
| 225 | + "execution_count": null, |
| 226 | + "metadata": {}, |
| 227 | + "outputs": [], |
| 228 | + "source": [ |
| 229 | + "# Model deployment in PySpark using our `spark_predict` function\n", |
| 230 | + "(\n", |
| 231 | + " ddf\n", |
| 232 | + " .select(NUMERICAL_FEATURES + [TARGET])\n", |
| 233 | + " .withColumn(\"pipe_predict\", spark_predict(pipe, *NUMERICAL_FEATURES).alias(\"prediction\")).show(5)\n", |
| 234 | + ")" |
| 235 | + ] |
| 236 | + }, |
| 237 | + { |
| 238 | + "cell_type": "markdown", |
| 239 | + "metadata": {}, |
| 240 | + "source": [ |
| 241 | + "# (iii) Deploying a Pipeline with Mixed Feature Types\n", |
| 242 | + "----\n", |
| 243 | + "\n", |
| 244 | + "It is not uncommon to use both categorical and numerical features in an ML model. In the next example I demonstrate how we can build an sklearn `Pipeline` capable of encoding categorical features and scaling numerical features. This pipeline is then deployed in PySpark." |
| 245 | + ] |
| 246 | + }, |
| 247 | + { |
| 248 | + "cell_type": "code", |
| 249 | + "execution_count": null, |
| 250 | + "metadata": {}, |
| 251 | + "outputs": [], |
| 252 | + "source": [ |
| 253 | + "# We create the preprocessing pipelines for both numeric and categorical data\n", |
| 254 | + "categorical_transformer = Pipeline(steps=[(\"encoder\", OrdinalEncoder())])\n", |
| 255 | + "numerical_transformer = Pipeline(steps=[(\"scaler\", MinMaxScaler())])\n", |
| 256 | + "\n", |
| 257 | + "preprocessor = ColumnTransformer(\n", |
| 258 | + " transformers=[\n", |
| 259 | + " (\"cat\", categorical_transformer, [3, 4]),\n", |
| 260 | + " (\"num\", numerical_transformer, [0, 1, 2])]\n", |
| 261 | + ")\n", |
| 262 | + "\n", |
| 263 | + "# Append random forest to preprocessing pipeline. We now have a full prediction pipeline.\n", |
| 264 | + "preprocessor_pipe = Pipeline(steps=[(\"preprocessor\", preprocessor), (\"predictor\", RandomForestRegressor())])\n", |
| 265 | + "preprocessor_pipe = preprocessor_pipe.fit(df[ALL_FEATURES], df[TARGET])" |
| 266 | + ] |
| 267 | + }, |
| 268 | + { |
| 269 | + "cell_type": "code", |
| 270 | + "execution_count": null, |
| 271 | + "metadata": {}, |
| 272 | + "outputs": [], |
| 273 | + "source": [ |
| 274 | + "# Again let's make some predictions to compare our PySpark deployment against\n", |
| 275 | + "preprocessor_pipe.predict(df[ALL_FEATURES])[:5]" |
| 276 | + ] |
| 277 | + }, |
| 278 | + { |
| 279 | + "cell_type": "code", |
| 280 | + "execution_count": null, |
| 281 | + "metadata": {}, |
| 282 | + "outputs": [], |
| 283 | + "source": [ |
| 284 | + "# Again let's deploy our pipeline in PySpark using our `spark_predict` function\n", |
| 285 | + "(\n", |
| 286 | + " ddf\n", |
| 287 | + " .select(ALL_FEATURES + [TARGET])\n", |
| 288 | + " .withColumn(\"pipe_predict\", spark_predict(preprocessor_pipe, *ALL_FEATURES).alias(\"prediction\"))\n", |
| 289 | + " .show(5)\n", |
| 290 | + ")" |
| 291 | + ] |
| 292 | + }, |
| 293 | + { |
| 294 | + "cell_type": "markdown", |
| 295 | + "metadata": {}, |
| 296 | + "source": [ |
| 297 | + "# Summary\n", |
| 298 | + "----\n", |
| 299 | + "The `spark_predict` function defined in this notebook is a versatile solution to python ml deployment in PySpark. \n", |
| 300 | + "We have demonstrated it's use in three **deployment** examples:\n", |
| 301 | + "- Simple `RandomForestRegressor`\n", |
| 302 | + "- Ml `Pipeline` that scales numerical features\n", |
| 303 | + "- ML `Pipeline` that is capable of processing mixed feature types" |
| 304 | + ] |
| 305 | + } |
| 306 | + ], |
| 307 | + "metadata": { |
| 308 | + "kernelspec": { |
| 309 | + "display_name": "Python 3", |
| 310 | + "language": "python", |
| 311 | + "name": "python3" |
| 312 | + }, |
| 313 | + "language_info": { |
| 314 | + "codemirror_mode": { |
| 315 | + "name": "ipython", |
| 316 | + "version": 3 |
| 317 | + }, |
| 318 | + "file_extension": ".py", |
| 319 | + "mimetype": "text/x-python", |
| 320 | + "name": "python", |
| 321 | + "nbconvert_exporter": "python", |
| 322 | + "pygments_lexer": "ipython3", |
| 323 | + "version": "3.7.0" |
| 324 | + } |
| 325 | + }, |
| 326 | + "nbformat": 4, |
| 327 | + "nbformat_minor": 2 |
| 328 | +} |
0 commit comments