DEV Community

Cover image for Distributing Machine Learning Jobs
Thomas Brittain
Thomas Brittain

Posted on • Originally published at ladvien.com

Distributing Machine Learning Jobs

Boss

A human sends machine learning job to the Boss. A Job is JSON object containing the the desired machine learning script and the parameters needed for successful execution. The Boss stores the Job and Creates an Order. The Order is another JSON object representing the state of a requested Job.

 Job #4 0 Boss /|\ +-----------------> ____ / \ +""+ +__+ [ ==.]`) +----+====== 0 +--+ + | Order #3 Job #3 | | Order #2 Job #2 | | Order #1 Job #1 
Enter fullscreen mode Exit fullscreen mode

Worker

The Worker uses node-scheduler to fire an HTTP request to the Boss letting it know the Worker is "bored." The Boss will then search through the Orders for the oldest unassigned Order, if it finds one, it will return this Order to the Worker as a JSON object. At this point, the Boss updates the Order's status to "assigned."

The Worker sends another HTTP request, this time requesting the Job information associated with the Order the Boss had assigned.

 Boss ____ +""+ +__+ [ ==.]`) +----+====== 0 +--+ + + If the Boss finds an unassigned Order #3 Job #3 Order it is returned. The worker requests the + + related Job. The Boss updates the Order #2 Job #2 the Order status to "assigned" + + Worker Order #1 Job #1<-+ ____ ^ +-----------> +""+ | +__+ +------------------------------------+ [ ==.]`) The worker checks with the boss periodically for the oldest submitted Order. 
Enter fullscreen mode Exit fullscreen mode

The worker passes the Job information into the appropriate machine learning Python script via stdout. The script is executed and whether successful or not, an Outcome object is passed back to the Worker Node through stdout.

Worker ____ +""+ Job #1 +__+ +---------------> Python Script [ ==.] + ^ | | | | v +------------------------ Outcome #1 
Enter fullscreen mode Exit fullscreen mode

The Worker then makes a callback API call and passes the Outcome object to the Boss to be stored in the database

 Boss Worker ____ ____ +""+ +""+ +__+ +__+ [ ==.]`) [ ==.]`) +----+====== 0 +------+ + | | | | Order #3 Job #3 Outcome #1 <---------------+ | | Order #2 Job #2 | | Order #1 Job #1 
Enter fullscreen mode Exit fullscreen mode

MongoDB on Mac

brew install mongodb nano /usr/local/etc/mongod.conf 
Enter fullscreen mode Exit fullscreen mode

Your file should look something like this

systemLog: destination: file path: /usr/local/var/log/mongodb/mongo.log logAppend: true storage: dbPath: /usr/local/var/mongodb net: bindIp: 127.0.0.1 
Enter fullscreen mode Exit fullscreen mode

Change the dbPath to where you'd like Mongo to store your databases. Then, start and enable Mongo with brew's services.

brew services start mongo 
Enter fullscreen mode Exit fullscreen mode

Sample Objects

Order

{ "_id" : "5bcc93d67f0b3f4844c87c7a", "jobId" : "5bcc93d67f0b3f4844c87c79", "createdDate" : ISODate("2018-10-21T14:57:26.980Z"), "status" : "unassigned", } 
Enter fullscreen mode Exit fullscreen mode

Job

{ "_id" : ObjectId("5bcc93d67f0b3f4844c87c79"), "hiddenLayers" : [ { "activation" : "relu", "widthModifier" : 4, "dropout" : 0.2 }, { "activation" : "relu", "widthModifier" : 2.3, "dropout" : 0.2 }, { "activation" : "relu", "widthModifier" : 1.3, "dropout" : 0.2 } ], "dataFileName" : "wine_data.csv", "scriptName" : "nn.py", "projectName" : "wine_data", "depedentVariable" : "con_lot", "crossValidateOnly" : true, "crossValidationCrossingType" : "neg_mean_squared_error", "batchSize" : 100000, "epochs" : 3000, "patienceRate" : 0.05, "slowLearningRate" : 0.01, "loss" : "mse", "pcaComponents" : -1, "extraTreesKeepThreshd" : 0, "saveWeightsOnlyAtEnd" : false, "optimizer" : "rmsprop", "lastLayerActivator" : "", "learningRate" : 0.05, "l1" : 0.1, "l2" : 0.1, "minDependentVarValue" : 0, "maxDependentVarValue" : 1500, "scalerType" : "standard", } 
Enter fullscreen mode Exit fullscreen mode

Outcomes

{ "_id" : ObjectId("5bcc88fa7f0b3f4844c87c78"), "status" : 200, "jobId" : "5bcc724d7449f746b5aa6fe8", "loss" : 15109.168650257, "metric" : 14281.4453526111, } 
Enter fullscreen mode Exit fullscreen mode

Code

Worker

server.js

var express = require('express'); var bodyParser = require('body-parser'); var pythonRunner = require('./preprocessing-services/python-runner'); var schedule = require('node-schedule'); var axios = require('axios'); var fs = require('fs'); var {Worker} = require('./worker/worker'); // Get Worker Node configuration var fs = require('fs'); var config = JSON.parse(fs.readFileSync('./python-scripts/worker-node-configure.json', 'utf8')); if(!config) { console.log('No configuration file found.') process.exit(); } // Boss' address bossAddress = config.bossAddress; nodeName = config.nodeName; console.log(`Boss's address is ${bossAddress}`); console.log(`This worker's name is ${nodeName}`); var worker = new Worker('bored'); // Start server and add Middleware var app = express(); const port = 3000; app.use(bodyParser.json()) // Start checking for Boredom var j = schedule.scheduleJob('*/1 * * * *', function(){ if (worker.status === 'bored') { console.log('Worker is bored.'); axios({ method: 'post', url: bossAddress + `/bored/${nodeName}` }).then((response) => { let orderId = response.data._id let jobId = response.data.jobId; console.log(`Boss provided jobID #${jobId}`); axios({ method: 'get', url: bossAddress + `/retrieve/job/${jobId}` }).then((response) => { let job = response.data; console.log(`Worker found the details for jobID #${jobId}`); job.callbackAddress = bossAddress; job.assignmentId = orderId; pythonRunner.scriptRun(job, worker) .then((response) => { console.log('Worker started job, will let Boss know when finished.'); }); }).catch((error) => { console.log(error); }); }).catch((error) => { console.log('Failed to find new job.') }); } }); // Python script runner interface app.post('/scripts/run', (req, res) => { try { let pythonJob = req.body; pythonRunner.scriptRun(pythonJob) .then((response) => { console.log(response); res.send(response); }); } catch (err) { res.send(err); } }); app.listen(port, () => { console.log(`Started on port ${port}`); }); 
Enter fullscreen mode Exit fullscreen mode

python-runner.js

let {PythonShell} = require('python-shell') var fs = require('fs'); var path = require('path'); var axios = require('axios'); var scriptRun = function(pythonJob, worker){ console.log(worker); worker.status = 'busy'; return new Promise((resolve, reject) => { try { let callbackAddress = pythonJob.callbackAddress; let options = { mode: 'text', pythonOptions: ['-u'], // get print results in real-time scriptPath: path.relative(process.cwd(), 'python-scripts/'), args: [JSON.stringify(pythonJob)] }; PythonShell.run(pythonJob.scriptName, options, function (err, results) { if (err) throw err; try { result = JSON.parse(results.pop()); if(result) { console.log(callbackAddress + '/callback') axios({ method: 'post', url: callbackAddress + '/callback', data: result }).then((response) => { console.log(`Worker let let the Boss know job is complete.`); worker.status = 'bored'; }).catch((error) => { worker.status = 'bored' }); } else { worker.status = 'bored' } } catch (err) { worker.status = 'bored' } }); resolve({'message': 'job started'}); } catch (err) { reject(err) worker.status = 'bored' } }); } module.exports = {scriptRun} 
Enter fullscreen mode Exit fullscreen mode

Boss

server.js

const express = require('express'); const bodyParser = require('body-parser'); const axios = require('axios'); var timeout = require('connect-timeout') const {mongoose} = require('./backend/database-services/dl-mongo'); const workerNode = require('./backend/services/worker-node'); const work = require('./backend/services/work'); // Database collection var {Job} = require('./backend/database-services/models/job'); var {Order} = require('./backend/database-services/models/order'); const bossAddress = 'http://maddatum.com' // Server setup. var app = express(); const port = 3000; // Add request parameters. app.use((req, res, next) => { res.setHeader('Access-Control-Allow-Origin', '*'); res.setHeader('Access-Control-Allow-Headers', 'Origin, X-Requested-With, Content-Type, Accept'); res.setHeader('Access-Control-Allow-Methods', 'GET, POST, PUT, PATCH, DELETE, OPTIONS'); next(); }); // Add the middleware. app.use(bodyParser.json()) /* This route is for creating new Jobs on the queue */ app.post('/job/:method', (req, res) => { if (!req.body) { return { 'message': 'No request provided.' }}; try { switch (req.params.method) { case 'create': work.create(req.body) .then((response) =>{ res.send(response); }).catch((error) => { res.send({'error': error }) }); break; default: res.send({'error': 'No method selected.'}) } } catch (err) { res.send({'error': 'Error with request shape.', err}) } }); /* This route is for adding new WorkerNodes to the database. */ app.post('/worker-node/:method', (req, res) => { if (!req.body) { return { 'message': 'No request provided.' }}; try { switch (req.params.method) { case 'create': workerNode.create(req.body) .then((response) =>{ res.send(response); }).catch((error) =>{ res.send({'error': error.message}); }) break; default: throw err; } } catch (err) { res.send({'error': 'Error with request shape.', err }) } }); app.post('/callback', (req, res) => { if (!req.body) { return { 'message': 'No request provided.' }}; let outcome = req.body; console.log(outcome); try { work.file(outcome) .then((response) =>{ console.log(response); res.send(response); }) } catch (err) { res.send({'error': 'Error with request shape.', err }) } }); /* Route for Worker Node to let the Boss know it needs a Job. The oldest Job which is unassigned is provided. */ app.post('/bored/:id', (req, res) => { if (!req.body) { return { 'message': 'No request provided.' }}; try { let workerNodeId = req.params.id; console.log(`${workerNodeId} said it's bored.`); if (!workerNodeId) { throw {'error': 'No id provided.'}} Order.findOne({ status: 'unassigned' }, {}, { sort: { 'created_at' : -1 } }, (err, order) => { console.log(`Found a work order, #${order._id}`) order.status = 'assigned'; console.log(`Provided ${workerNodeId} with ${order.jobId}`); order.save() .then((doc) => { console.log(`Updated the Order #${doc.id}'s status to ${order.status}`); res.send(doc); }); }) .catch((err) => { res.send({'message': `No work to do. Don't get used to it.`}) }); } catch (err) { res.send({'error': 'Error with request shape.', err }) } }); /* Retrieve Orders or Job */ app.get('/retrieve/:type/:id?/:param1?', (req, res) => { if (!req.body) { return { 'message': 'No request provided.' }}; try { let type = req.params.type; let id = req.params.id; let param1 = req.params.param1; switch(type) { case 'order': Order.find().then((response) => { res.send(response); }); break; case 'job': if (!id) { throw {'error': 'Missing Id'} } Job.findOne({'_id': id }) .then((response) => { res.send(response); }); break; default: throw error } } catch (err) { res.send({'error': 'Error with request shape.', err }) } }) app.listen(port, () => { console.log(`Started on port ${port}`); }); 
Enter fullscreen mode Exit fullscreen mode

Top comments (0)