Skip to content

Commit 4fd5317

Browse files
committed
Added delta_lake_scikit_learn_local_training_and_serving sample code
1 parent 8c3b1ca commit 4fd5317

File tree

5 files changed

+166
-0
lines changed

5 files changed

+166
-0
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
delta-sharing
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License").
4+
# You may not use this file except in compliance with the License.
5+
# A copy of the License is located at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# or in the "license" file accompanying this file. This file is distributed
10+
# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
11+
# express or implied. See the License for the specific language governing
12+
# permissions and limitations under the License.
13+
14+
from __future__ import print_function
15+
16+
import argparse
17+
import os
18+
import numpy as np
19+
20+
import joblib
21+
import pandas as pd
22+
from sklearn.model_selection import train_test_split
23+
from sklearn.linear_model import LinearRegression
24+
from sklearn.metrics import mean_squared_error, r2_score
25+
import delta_sharing
26+
27+
28+
if __name__ == "__main__":
29+
print("Training Started")
30+
parser = argparse.ArgumentParser()
31+
32+
# Hyperparameters are described here. In this simple example we are just including one hyperparameter.
33+
parser.add_argument("--max_leaf_nodes", type=int, default=-1)
34+
35+
# Sagemaker specific arguments. Defaults are set in the environment variables.
36+
parser.add_argument("--output-data-dir", type=str, default=os.environ["SM_OUTPUT_DATA_DIR"])
37+
parser.add_argument("--model-dir", type=str, default=os.environ["SM_MODEL_DIR"])
38+
parser.add_argument("--train", type=str, default=os.environ["SM_CHANNEL_TRAIN"])
39+
40+
args = parser.parse_args()
41+
print("Got Args: {}".format(args))
42+
43+
# Take the profile file, create a SharingClient, and read data from the delta lake table
44+
profile_files = [os.path.join(args.train, file) for file in os.listdir(args.train)]
45+
if len(profile_files) == 0:
46+
raise ValueError(
47+
(
48+
"There are no files in {}.\n"
49+
+ "This usually indicates that the channel ({}) was incorrectly specified,\n"
50+
+ "the data specification in S3 was incorrectly specified or the role specified\n"
51+
+ "does not have permission to access the data."
52+
).format(args.train, "train")
53+
)
54+
55+
profile_file = profile_files[0]
56+
print(f'Found profile file: {profile_file}')
57+
58+
# Create a SharingClient
59+
client = delta_sharing.SharingClient(profile_file)
60+
table_url = profile_file + "#delta_sharing.default.boston-housing"
61+
62+
# Load the table as a Pandas DataFrame
63+
print('Loading boston-housing table from Delta Lake')
64+
train_data = delta_sharing.load_as_pandas(table_url)
65+
print(f'Train data shape: {train_data.shape}')
66+
67+
# Drop null values - THIS SHOULD BE DONE IN PRE-PROCESSING STAGE AS BEST PRACTISE
68+
train_data.dropna(inplace=True)
69+
70+
# Split the data into training and testing sets
71+
X = train_data.iloc[:, 1:14]
72+
Y = train_data.iloc[:, 14]
73+
74+
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=5)
75+
print(f'X_train.shape: {X_train.shape}')
76+
print(f'X_test.shape: {X_test.shape}')
77+
print(f'Y_train.shape: {Y_train.shape}')
78+
print(f'Y_test.shape: {Y_test.shape}')
79+
80+
linear_model = LinearRegression()
81+
linear_model.fit(X_train, Y_train)
82+
83+
# model evaluation for training set
84+
y_train_predict = linear_model.predict(X_train)
85+
rmse = (np.sqrt(mean_squared_error(Y_train, y_train_predict)))
86+
r2 = r2_score(Y_train, y_train_predict)
87+
88+
print("The model performance for training set")
89+
print("--------------------------------------")
90+
print(f'RMSE is {rmse}')
91+
print(f'R2 score is {r2}')
92+
93+
# Save model
94+
joblib.dump(linear_model, os.path.join(args.model_dir, "model.joblib"))
95+
96+
print("Training Completed")
97+
98+
99+
def model_fn(model_dir):
100+
"""Deserialized and return fitted model
101+
102+
Note that this should have the same name as the serialized model in the main method
103+
"""
104+
clf = joblib.load(os.path.join(model_dir, "model.joblib"))
105+
return clf
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
# This is a sample Python program that trains a simple scikit-learn model
2+
# on the boston-housing dataset fetched from Delta Lake.
3+
# This implementation will work on your *local computer* or in the *AWS Cloud*.
4+
#
5+
# Delta Sharing: An Open Protocol for Secure Data Sharing
6+
# https://github.com/delta-io/delta-sharing
7+
#
8+
# Prerequisites:
9+
# 1. Install required Python packages:
10+
# `pip install -r requirements.txt`
11+
# 2. Docker Desktop installed and running on your computer:
12+
# `docker ps`
13+
# 3. You should have AWS credentials configured on your local machine
14+
# in order to be able to pull the docker image from ECR.
15+
###############################################################################################
16+
17+
18+
from sagemaker.sklearn import SKLearn
19+
20+
21+
DUMMY_IAM_ROLE = 'arn:aws:iam::111111111111:role/service-role/AmazonSageMaker-ExecutionRole-20200101T000001'
22+
23+
24+
def main():
25+
26+
print('Starting model training.')
27+
print('Note: if launching for the first time in local mode, container image download might take a few minutes to complete.')
28+
29+
sklearn = SKLearn(
30+
entry_point="scikit_boston_housing.py",
31+
source_dir='code',
32+
framework_version="0.23-1",
33+
instance_type="local",
34+
role=DUMMY_IAM_ROLE
35+
)
36+
37+
delta_lake_profile_file = "file://./profile/open-datasets.share"
38+
39+
sklearn.fit({"train": delta_lake_profile_file})
40+
print('Completed model training')
41+
42+
# print('Deploying endpoint in local mode')
43+
# predictor = sklearn.deploy(initial_instance_count=1, instance_type='local')
44+
#
45+
#
46+
# print('About to delete the endpoint to stop paying (if in cloud mode).')
47+
# predictor.delete_endpoint(predictor.endpoint_name)
48+
49+
50+
if __name__ == "__main__":
51+
main()
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"shareCredentialsVersion": 1,
3+
"endpoint": "https://sharing.delta.io/delta-sharing/",
4+
"bearerToken": "faaie590d541265bcab1f2de9813274bf233"
5+
}
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
numpy
2+
pandas
3+
sagemaker>=2.0.0<3.0.0
4+
sagemaker[local]

0 commit comments

Comments
 (0)