Skip to content

Commit d8673c4

Browse files
committed
Added dask_bring_your_own_container_local_processing Sample Code
1 parent 9d883ac commit d8673c4

File tree

5 files changed

+237
-0
lines changed

5 files changed

+237
-0
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
FROM continuumio/miniconda3:4.7.12
2+
3+
ENV PYTHONHASHSEED 0
4+
ENV PYTHONIOENCODING UTF-8
5+
6+
# Install required Python packages fo Dask
7+
RUN conda install --yes dask distributed dask-ml boto3
8+
9+
# Install additional Python packages
10+
RUN conda install aiohttp boto3
11+
12+
# Dumb init
13+
RUN wget -O /usr/local/bin/dumb-init https://github.com/Yelp/dumb-init/releases/download/v1.2.0/dumb-init_1.2.0_amd64
14+
RUN chmod +x /usr/local/bin/dumb-init
15+
16+
RUN mkdir /opt/app /etc/dask
17+
COPY dask_config/dask.yaml /etc/dask/
18+
19+
# Set up bootstrapping program and Dask configuration
20+
COPY program /opt/program
21+
RUN chmod +x /opt/program/bootstrap.py
22+
23+
ENTRYPOINT ["/opt/program/bootstrap.py"]
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
logging:
2+
distributed: info
3+
bokeh: critical
4+
tornado: critical
5+
6+
scheduler:
7+
work-stealing: True
8+
allowed-failures: 10
9+
10+
admin:
11+
log-format: '%(name)s - %(levelname)s - %(message)s'
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
#!/usr/bin/env python3
2+
import json
3+
import os
4+
import socket
5+
import subprocess
6+
import sys
7+
import time
8+
from shutil import copyfile
9+
from subprocess import PIPE, Popen
10+
11+
DASK_PATH = "/opt/conda/bin"
12+
13+
14+
def get_resource_config():
15+
resource_config_path = "/opt/ml/config/resourceconfig.json"
16+
with open(resource_config_path, "r") as f:
17+
resource_config_json = json.load(f)
18+
return resource_config_json
19+
20+
21+
def start_daemons(master_ip):
22+
resource_config = get_resource_config()
23+
print(f"resource_config: {resource_config}")
24+
current_host = resource_config["current_host"]
25+
print(f"current_host: {current_host}")
26+
scheduler_host = resource_config["hosts"][0]
27+
print(f"scheduler_host: {scheduler_host}")
28+
29+
30+
cmd_start_scheduler = os.path.join(DASK_PATH, "dask-scheduler")
31+
cmd_start_worker = os.path.join(DASK_PATH, "dask-worker")
32+
schedule_conn_string = "tcp://{ip}:8786".format(ip=master_ip)
33+
if current_host == scheduler_host:
34+
Popen([cmd_start_scheduler])
35+
Popen([cmd_start_worker, schedule_conn_string])
36+
else:
37+
worker_process = Popen([cmd_start_worker, schedule_conn_string])
38+
39+
40+
def get_ip_from_host(host_name):
41+
ip_wait_time = 200
42+
counter = 0
43+
ip = ""
44+
45+
while counter < ip_wait_time and ip == "":
46+
try:
47+
ip = socket.gethostbyname(host_name)
48+
break
49+
except:
50+
counter += 1
51+
time.sleep(1)
52+
53+
if counter == ip_wait_time and ip == "":
54+
raise Exception(
55+
"Exceeded max wait time of {}s for hostname resolution".format(ip_wait_time)
56+
)
57+
58+
return ip
59+
60+
61+
if __name__ == "__main__":
62+
ips = []
63+
resource_config = get_resource_config()
64+
master_host = resource_config["hosts"][0]
65+
scheduler_ip = get_ip_from_host(master_host)
66+
current_host = resource_config["current_host"]
67+
68+
# Start Dask cluster in all nodes
69+
start_daemons(scheduler_ip)
70+
71+
# wait for a few seconds until the cluster is up and running
72+
time.sleep(10)
73+
74+
# Submit the preprocessing job on the cluster from the first instance. You only need to submit the job once from one node.
75+
if current_host == master_host:
76+
cmd_string = ["/opt/conda/bin/python", str(sys.argv[1])]
77+
cmd_string.extend(sys.argv[2:])
78+
cmd_string.append(scheduler_ip)
79+
print(f"cmd_string: {cmd_string}")
80+
result = subprocess.Popen(cmd_string)
81+
_ = result.communicate()[0]
82+
exit_code = result.returncode
83+
else:
84+
while True:
85+
scheduler = (scheduler_ip, 8786)
86+
alive_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
87+
alive_check = alive_socket.connect_ex(scheduler)
88+
if alive_check == 0:
89+
pass
90+
else:
91+
print("Received a shutdown signal from Dask cluster")
92+
sys.exit(0)
93+
alive_socket.close()
94+
time.sleep(2)
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
# This is a sample Python program that runs a Dask Processing job on a JSON fetched from a web site.
2+
# The output of the processing will be total files found in the JSON.
3+
# This implementation will work on your *local computer*.
4+
#
5+
# Prerequisites:
6+
# 1. Install required Python packages:
7+
# pip install boto3 sagemaker pandas scikit-learn
8+
# pip install 'sagemaker[local]'
9+
# 2. Docker Desktop has to be installed on your computer, and running.
10+
# 3. Open terminal and run the following commands:
11+
# docker build -t sagemaker-dask-processing-local container/.
12+
########################################################################################################################
13+
14+
from sagemaker.local import LocalSession
15+
from sagemaker.processing import ScriptProcessor, ProcessingInput, ProcessingOutput
16+
import boto3
17+
18+
19+
s3 = boto3.client('s3')
20+
sagemaker_session = LocalSession()
21+
sagemaker_session.config = {'local': {'local_code': True}}
22+
23+
# For local training a dummy role will be sufficient
24+
role = 'arn:aws:iam::111111111111:role/service-role/AmazonSageMaker-ExecutionRole-20200101T000001'
25+
26+
dask_processor = ScriptProcessor(command=["/opt/program/bootstrap.py"],
27+
image_uri='sagemaker-dask-processing-local',
28+
role=role,
29+
instance_count=1,
30+
instance_type='local')
31+
32+
dask_processor.run(code='processing_script.py',
33+
outputs=[ProcessingOutput(
34+
output_name='filenames_processed_data',
35+
source='/opt/ml/processing/processed_data/')],
36+
arguments=['site_uri', 'https://archive.analytics.mybinder.org/index.jsonl']
37+
)
38+
39+
preprocessing_job_description = dask_processor.jobs[-1].describe()
40+
output_config = preprocessing_job_description['ProcessingOutputConfig']
41+
42+
print(output_config)
43+
44+
for output in output_config['Outputs']:
45+
if output['OutputName'] == 'filenames_processed_data':
46+
filenames_processed_data_file = output['S3Output']['S3Uri']
47+
bucket = filenames_processed_data_file.split("/")[:3][2]
48+
output_file_name = '/'.join(filenames_processed_data_file.split("/")[3:])+"/filenames_in_json.txt"
49+
50+
print(f'Opening processing output file: {"s3://"+bucket+"/"+output_file_name}')
51+
data = s3.get_object(Bucket=bucket, Key=output_file_name)
52+
contents = data['Body'].read()
53+
print('Processing output file content\n-----------\n')
54+
print(contents.decode("utf-8"))
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import os
2+
import sys
3+
import logging
4+
from dask.distributed import Client
5+
import dask.bag as db
6+
import json
7+
8+
9+
processed_data_path = '/opt/ml/processing/processed_data'
10+
11+
12+
def main():
13+
print("Processing Started")
14+
15+
# Convert command line args into a map of args
16+
args_iter = iter(sys.argv[1:])
17+
args = dict(zip(args_iter, args_iter))
18+
scheduler_ip = sys.argv[-1]
19+
print(f"scheduler_ip: {scheduler_ip}")
20+
21+
# Start the Dask cluster client
22+
try:
23+
print("initiating client")
24+
client = Client("tcp://{ip}:8786".format(ip=scheduler_ip))
25+
print("Cluster information: {}".format(client))
26+
except Exception as err:
27+
logging.exception(err)
28+
29+
print(f"Received arguments {args}")
30+
31+
if args["site_uri"]:
32+
print(f"Processing web site JSON: {args['site_uri']}")
33+
filenames = (db.read_text(args['site_uri'])
34+
.map(json.loads)
35+
.pluck('name')
36+
.compute())
37+
38+
filenames = ['https://archive.analytics.mybinder.org/' + fn for fn in filenames]
39+
print(f"Total filenames: {len(filenames)}")
40+
print(f"Sample filenames found: {filenames[:5]}")
41+
42+
output_file = os.path.join(processed_data_path, "filenames_in_json.txt")
43+
print(f'Writing output file: {output_file}')
44+
with open(output_file, 'w') as outfile:
45+
outfile.write(json.dumps(filenames))
46+
else:
47+
print("No `site_uri` parameter - doing nothing")
48+
49+
print("Processing Complete")
50+
51+
print(client)
52+
sys.exit(os.EX_OK)
53+
54+
if __name__ == "__main__":
55+
main()

0 commit comments

Comments
 (0)