Skip to content
This repository was archived by the owner on Aug 11, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
289 changes: 289 additions & 0 deletions paperspace.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
import inspect
import json
import os
import sys
import tempfile
import time
import zipfile

import boto3
import botocore
import requests

PAPERSPACE_API_KEY = ''
CONFIG_HOST = 'https://api.paperspace.io'
CONFIG_LOG_HOST = 'https://logs.paperspace.io'

if 'PAPERSPACE_API_KEY' in os.environ:
PAPERSPACE_API_KEY = os.environ['PAPERSPACE_API_KEY']


def zip_to_tmp(obj_name):
zipname = os.path.join(tempfile.gettempdir(),
os.path.basename(obj_name)) + '.zip'
outZipFile = zipfile.ZipFile(zipname, 'w', zipfile.ZIP_DEFLATED)

if os.path.isdir(obj_name):
for dirpath, dirnames, filenames in os.walk(obj_name):
for filename in dirnames + filenames:
filepath = os.path.join(dirpath, filename)
basename = os.path.basename(filepath)
if ('/.git/' not in filepath
and basename not in ['.git', '.gitignore']):
arcname = os.path.relpath(filepath, obj_name)
outZipFile.write(filepath, arcname)
else:
outZipFile.write(obj_name, os.path.basename(obj_name))

outZipFile.close()
return zipname


def print_json_pretty(res):
print(json.dumps(res, indent=2, sort_keys=True))


def paperspace(category, method, params):

if method in ['artifactsGet', 'artifactsList', 'getJob', 'getJobs',
'getLogs']:

http_method = 'GET'
path = '/' + category + '/' + method

elif method in ['artifactsDestroy', 'clone', 'destroy', 'stop']:

http_method = 'POST'
path = '/' + category + '/' + params['jobId'] + '/' + method
del params['jobId']

else:

http_method = 'POST'
path = '/' + category + '/' + method

files = None
if method == 'createJob' and 'workspace' in params:
workspace = params['workspace']
workspace_file = None
if workspace and workspace != 'none':
workspace_path = os.path.expanduser(workspace)
if os.path.exists(workspace_path):
if (not workspace_path.endswith('.zip')
and not workspace_path.endswith('.gz')):
workspace_file = zip_to_tmp(workspace_path)
else:
workspace_file = workspace_path
files = {'file': open(workspace_file, 'rb')}
params['workspaceFileName'] = os.path.basename(workspace_file)
del params['workspace']

r = requests.request(http_method, CONFIG_HOST + path,
headers={'x-api-key': PAPERSPACE_API_KEY},
params=params, files=files)

return r.json()


def jobs_logs(params, tail=False, json=False):
last_line = 0
PSEOF = False
json_res = []
MAX_BACKOFF = 30
backoff = 0

if 'line' not in params:
params['line'] = 0

while True:
r = requests.request('GET', CONFIG_LOG_HOST + '/jobs/logs',
headers={'x-api-key': PAPERSPACE_API_KEY},
params=params)
try:
res = r.json()
except ValueError:
res = []
if json:
json_res += res
else:
for l in res:
m = l['message']
if m != 'PSEOF':
print(m)

if res:
last_line = res[-1]['line']
PSEOF = res[-1]['message'] == 'PSEOF'

if PSEOF:
break

if last_line > params['line']:
params['line'] = last_line
backoff = 0
continue

if tail:
if backoff:
time.sleep(backoff)
backoff = min(backoff * 2, MAX_BACKOFF)
else:
backoff = 1
else:
break

if json:
print_json_pretty(json_res)
return json_res
return True


def jobs_waitfor(params):
while True:
job = paperspace('jobs', 'getJob', params)
if 'state' not in job:
return job
state = job['state']

if (state == params['state']
or (state == 'Running' and params['state'] == 'Pending')
or state == 'Error'
or state == 'Stopped'
or state == 'Failed'):
return job
time.sleep(5)


def jobs_create(params):
job = paperspace('jobs', 'createJob', params)
if 'id' not in job:
print_json_pretty(job)
return job
jobId = job['id']
print('New jobId: %s' % jobId)
print('Job %s' % job['state'])

if job['state'] == 'Pending':
print('Waiting for job to run...')
job = jobs_waitfor({'jobId': jobId, 'state': 'Running'})
if 'state' not in job:
print_json_pretty(job)
return job

if job['state'] != 'Error':
print('Awaiting logs...')
jobs_logs({'jobId': jobId}, tail=True)
job = paperspace('jobs', 'getJob', {'jobId': jobId})
if 'state' not in job:
print_json_pretty(job)
return job

if job['state'] != 'Error':
print('Job %s; exitCode %d' % (job['state'], job['exitCode']))
else:
print('Job %s: %s' % (job['state'], job['jobError']))
return job


def jobs_artifactsGet(params):
if 'dest' in params:
dest = os.path.abspath(os.path.expanduser(params['dest']))
if not os.path.exists(dest):
os.makedirs(dest)
else:
if not os.path.isdir(dest):
print('Destination path not is not directory: %s' % dest)
return False
del params['dest']
else:
dest = os.getcwd()

artifacts_list = paperspace('jobs', 'artifactsList', params)
if artifacts_list:

creds = paperspace('jobs', 'artifactsGet', params)
if creds:
bucket = creds['bucket']
folder = creds['folder']
credentials = creds['Credentials']

session = boto3.Session(
aws_access_key_id=credentials['AccessKeyId'],
aws_secret_access_key=credentials['SecretAccessKey'],
aws_session_token=credentials['SessionToken']
)
s3 = session.resource('s3')

for item in artifacts_list:
file = item['file']
dest_file = os.path.join(dest, file)

dest_dir = os.path.dirname(dest_file)
if not os.path.exists(dest_dir):
os.makedirs(dest_dir)

key = folder + '/' + file
print('Downloading %s' % file)

try:
s3.Bucket(bucket).download_file(key, dest_file)
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
print("The s3 object does not exist: %s" % key)
else:
raise

print('Download complete')
return True

return False


# TO DO:
# create/use project config
# deal with timeouts/server unreachable
# deal with returned errors
# deal with invalid directories, e.g. root for workspace
# detect running interactively
# stream file uploads/downloads


def runas_job(params={}):
if 'PAPERSPACE_JOB_RUNNER' in os.environ:
return

stack = inspect.stack()
obj = __import__(stack[1][0].f_globals['__name__'])
src = inspect.getsource(obj)
src_file = os.path.basename(inspect.getsourcefile(obj))

# TO DO: remove these replacements once we are auto importing paperspace on the job runner
# and have defined the PAPERSPACE_JOB_RUNNER env var and passed it into the container
src = src.replace('import paperspace', '# import paperspace')
src = src.replace('paperspace.PAPERSPACE_API_KEY', 'pass # paperspace.PAPERSPACE_API_KEY')
src = src.replace('paperspace.CONFIG_HOST', 'pass # paperspace.CONFIG_HOST')
src = src.replace('paperspace.CONFIG_LOG_HOST', 'pass # paperspace.CONFIG_LOG_HOST')
src = src.replace('paperspace.runas_job', 'paperspace_null_runas_job')
src = "\ndef paperspace_null_runas_job(*args, **kwargs):\n return None\n" + src

src_path = os.path.join(tempfile.gettempdir(), src_file)
with open(src_path, "w") as file:
file.write(src)

if 'project' not in params:
params['project'] = 'paperspace-python'
if 'machineType' not in params:
params['machineType'] = 'GPU+'
if 'container' not in params:
params['container'] = 'Test-Container'
params['command'] = 'python3 ' + src_file
params['workspace'] = src_path

jobs_create(params)
sys.exit(0)


# TO DO:
# automatic install of imported dependencies
# make console logging optional
# allow return results
9 changes: 9 additions & 0 deletions remote_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import os
import paperspace

paperspace.PAPERSPACE_API_KEY = '14a4bc1cbc414...'

paperspace.runas_job({'project': 'myprojec8', 'machineType': 'GPU+', 'container': 'Test-Container'})

print(os.getcwd())
print('something useful')
75 changes: 75 additions & 0 deletions test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import paperspace

# Tests:

paperspace.PAPERSPACE_API_KEY = '14a4bc1cbc414...'

project = 'myproject7'
print('project: %s' % project)

print("paperspace.paperspace('jobs', 'getJobs', {'project': project})")
jobs = paperspace.paperspace('jobs', 'getJobs', {'project': project})
if 'error' in jobs:
paperspace.print_json_pretty(jobs)
else:
for job in jobs:
print(job['id'])

print("jobs_create({'project': project, 'machineType': 'GPU+', 'container': 'Test-Container', 'command': './do.sh', 'workspace': '~/myproject3'})")
job = paperspace.jobs_create({'project': project,
'machineType': 'GPU+', 'container': 'Test-Container',
'command': './do.sh', 'workspace': '~/myproject3'})
jobId = job['id']

print("paperspace.paperspace('jobs', 'artifactsList', {'jobId': jobId, 'links': True})")
artifacts = paperspace.paperspace('jobs', 'artifactsList', {'jobId': jobId, 'links': True})
if artifacts:
paperspace.print_json_pretty(artifacts)

print("paperspace.jobs_artifactsGet({'jobId': jobId, 'dest': '~/temp1'})")
paperspace.jobs_artifactsGet({'jobId': jobId, 'dest': '~/temp1'})

print("paperspace.paperspace('jobs', 'getJob', {'jobId': jobId})")
job = paperspace.paperspace('jobs', 'getJob', {'jobId': jobId})
paperspace.print_json_pretty(job)

print("paperspace.jobs_logs({'jobId': jobId, 'limit': 4}, tail=True)")
paperspace.jobs_logs({'jobId': jobId, 'limit': 4}, tail=True)

print("paperspace.paperspace('jobs', 'stop', {'jobId': jobId})")
res = paperspace.paperspace('jobs', 'stop', {'jobId': jobId})
paperspace.print_json_pretty(res)

print("paperspace.paperspace('jobs', 'clone', {'jobId': jobId})")
clonedJob = paperspace.paperspace('jobs', 'clone', {'jobId': jobId})
paperspace.print_json_pretty(clonedJob)

print("paperspace.jobs_waitfor({'jobId': clonedJob['id'], 'state': 'Stopped'})")
waitforJob = paperspace.jobs_waitfor({'jobId': clonedJob['id'], 'state': 'Stopped'})
paperspace.print_json_pretty(waitforJob)

print("paperspace.paperspace('jobs', 'artifactsList', {'jobId': clonedJob['id']})")
artifacts = paperspace.paperspace('jobs', 'artifactsList', {'jobId': clonedJob['id']})
if artifacts:
paperspace.print_json_pretty(artifacts)
print("paperspace.paperspace('jobs', 'artifactsDestroy', {'jobId': clonedJob['id']})")
paperspace.paperspace('jobs', 'artifactsDestroy', {'jobId': clonedJob['id']})

print("paperspace.paperspace('jobs', 'artifactsList', {'jobId': clonedJob['id']})")
artifacts = paperspace.paperspace('jobs', 'artifactsList', {'jobId': clonedJob['id']})
if artifacts:
paperspace.print_json_pretty(artifacts)

print("paperspace.paperspace('jobs', 'getJobs', {'project': project})")
jobs = paperspace.paperspace('jobs', 'getJobs', {'project': project})
for job in jobs:
print(job['id'])

print("paperspace.paperspace('jobs', 'destroy', {'jobId': clonedJob['id']})")
res = paperspace.paperspace('jobs', 'destroy', {'jobId': clonedJob['id']})
paperspace.print_json_pretty(res)

print("paperspace.paperspace('jobs', 'getJobs', {'project': project})")
jobs = paperspace.paperspace('jobs', 'getJobs', {'project': project})
for job in jobs:
print(job['id'])