Boss
A human sends machine learning job to the Boss. A Job is JSON object containing the the desired machine learning script and the parameters needed for successful execution. The Boss stores the Job and Creates an Order. The Order is another JSON object representing the state of a requested Job.
Job #4 0 Boss /|\ +-----------------> ____ / \ +""+ +__+ [ ==.]`) +----+====== 0 +--+ + | Order #3 Job #3 | | Order #2 Job #2 | | Order #1 Job #1
Worker
The Worker uses node-scheduler to fire an HTTP request to the Boss letting it know the Worker is "bored." The Boss will then search through the Orders for the oldest unassigned Order, if it finds one, it will return this Order to the Worker as a JSON object. At this point, the Boss updates the Order's status to "assigned."
The Worker sends another HTTP request, this time requesting the Job information associated with the Order the Boss had assigned.
Boss ____ +""+ +__+ [ ==.]`) +----+====== 0 +--+ + + If the Boss finds an unassigned Order #3 Job #3 Order it is returned. The worker requests the + + related Job. The Boss updates the Order #2 Job #2 the Order status to "assigned" + + Worker Order #1 Job #1<-+ ____ ^ +-----------> +""+ | +__+ +------------------------------------+ [ ==.]`) The worker checks with the boss periodically for the oldest submitted Order.
The worker passes the Job information into the appropriate machine learning Python script via stdout
. The script is executed and whether successful or not, an Outcome object is passed back to the Worker Node through stdout
.
Worker ____ +""+ Job #1 +__+ +---------------> Python Script [ ==.] + ^ | | | | v +------------------------ Outcome #1
The Worker then makes a callback
API call and passes the Outcome object to the Boss to be stored in the database
Boss Worker ____ ____ +""+ +""+ +__+ +__+ [ ==.]`) [ ==.]`) +----+====== 0 +------+ + | | | | Order #3 Job #3 Outcome #1 <---------------+ | | Order #2 Job #2 | | Order #1 Job #1
MongoDB on Mac
brew install mongodb nano /usr/local/etc/mongod.conf
Your file should look something like this
systemLog: destination: file path: /usr/local/var/log/mongodb/mongo.log logAppend: true storage: dbPath: /usr/local/var/mongodb net: bindIp: 127.0.0.1
Change the dbPath
to where you'd like Mongo to store your databases. Then, start and enable Mongo with brew's services.
brew services start mongo
Sample Objects
Order
{ "_id" : "5bcc93d67f0b3f4844c87c7a", "jobId" : "5bcc93d67f0b3f4844c87c79", "createdDate" : ISODate("2018-10-21T14:57:26.980Z"), "status" : "unassigned", }
Job
{ "_id" : ObjectId("5bcc93d67f0b3f4844c87c79"), "hiddenLayers" : [ { "activation" : "relu", "widthModifier" : 4, "dropout" : 0.2 }, { "activation" : "relu", "widthModifier" : 2.3, "dropout" : 0.2 }, { "activation" : "relu", "widthModifier" : 1.3, "dropout" : 0.2 } ], "dataFileName" : "wine_data.csv", "scriptName" : "nn.py", "projectName" : "wine_data", "depedentVariable" : "con_lot", "crossValidateOnly" : true, "crossValidationCrossingType" : "neg_mean_squared_error", "batchSize" : 100000, "epochs" : 3000, "patienceRate" : 0.05, "slowLearningRate" : 0.01, "loss" : "mse", "pcaComponents" : -1, "extraTreesKeepThreshd" : 0, "saveWeightsOnlyAtEnd" : false, "optimizer" : "rmsprop", "lastLayerActivator" : "", "learningRate" : 0.05, "l1" : 0.1, "l2" : 0.1, "minDependentVarValue" : 0, "maxDependentVarValue" : 1500, "scalerType" : "standard", }
Outcomes
{ "_id" : ObjectId("5bcc88fa7f0b3f4844c87c78"), "status" : 200, "jobId" : "5bcc724d7449f746b5aa6fe8", "loss" : 15109.168650257, "metric" : 14281.4453526111, }
Code
Worker
server.js
var express = require('express'); var bodyParser = require('body-parser'); var pythonRunner = require('./preprocessing-services/python-runner'); var schedule = require('node-schedule'); var axios = require('axios'); var fs = require('fs'); var {Worker} = require('./worker/worker'); // Get Worker Node configuration var fs = require('fs'); var config = JSON.parse(fs.readFileSync('./python-scripts/worker-node-configure.json', 'utf8')); if(!config) { console.log('No configuration file found.') process.exit(); } // Boss' address bossAddress = config.bossAddress; nodeName = config.nodeName; console.log(`Boss's address is ${bossAddress}`); console.log(`This worker's name is ${nodeName}`); var worker = new Worker('bored'); // Start server and add Middleware var app = express(); const port = 3000; app.use(bodyParser.json()) // Start checking for Boredom var j = schedule.scheduleJob('*/1 * * * *', function(){ if (worker.status === 'bored') { console.log('Worker is bored.'); axios({ method: 'post', url: bossAddress + `/bored/${nodeName}` }).then((response) => { let orderId = response.data._id let jobId = response.data.jobId; console.log(`Boss provided jobID #${jobId}`); axios({ method: 'get', url: bossAddress + `/retrieve/job/${jobId}` }).then((response) => { let job = response.data; console.log(`Worker found the details for jobID #${jobId}`); job.callbackAddress = bossAddress; job.assignmentId = orderId; pythonRunner.scriptRun(job, worker) .then((response) => { console.log('Worker started job, will let Boss know when finished.'); }); }).catch((error) => { console.log(error); }); }).catch((error) => { console.log('Failed to find new job.') }); } }); // Python script runner interface app.post('/scripts/run', (req, res) => { try { let pythonJob = req.body; pythonRunner.scriptRun(pythonJob) .then((response) => { console.log(response); res.send(response); }); } catch (err) { res.send(err); } }); app.listen(port, () => { console.log(`Started on port ${port}`); });
python-runner.js
let {PythonShell} = require('python-shell') var fs = require('fs'); var path = require('path'); var axios = require('axios'); var scriptRun = function(pythonJob, worker){ console.log(worker); worker.status = 'busy'; return new Promise((resolve, reject) => { try { let callbackAddress = pythonJob.callbackAddress; let options = { mode: 'text', pythonOptions: ['-u'], // get print results in real-time scriptPath: path.relative(process.cwd(), 'python-scripts/'), args: [JSON.stringify(pythonJob)] }; PythonShell.run(pythonJob.scriptName, options, function (err, results) { if (err) throw err; try { result = JSON.parse(results.pop()); if(result) { console.log(callbackAddress + '/callback') axios({ method: 'post', url: callbackAddress + '/callback', data: result }).then((response) => { console.log(`Worker let let the Boss know job is complete.`); worker.status = 'bored'; }).catch((error) => { worker.status = 'bored' }); } else { worker.status = 'bored' } } catch (err) { worker.status = 'bored' } }); resolve({'message': 'job started'}); } catch (err) { reject(err) worker.status = 'bored' } }); } module.exports = {scriptRun}
Boss
server.js
const express = require('express'); const bodyParser = require('body-parser'); const axios = require('axios'); var timeout = require('connect-timeout') const {mongoose} = require('./backend/database-services/dl-mongo'); const workerNode = require('./backend/services/worker-node'); const work = require('./backend/services/work'); // Database collection var {Job} = require('./backend/database-services/models/job'); var {Order} = require('./backend/database-services/models/order'); const bossAddress = 'http://maddatum.com' // Server setup. var app = express(); const port = 3000; // Add request parameters. app.use((req, res, next) => { res.setHeader('Access-Control-Allow-Origin', '*'); res.setHeader('Access-Control-Allow-Headers', 'Origin, X-Requested-With, Content-Type, Accept'); res.setHeader('Access-Control-Allow-Methods', 'GET, POST, PUT, PATCH, DELETE, OPTIONS'); next(); }); // Add the middleware. app.use(bodyParser.json()) /* This route is for creating new Jobs on the queue */ app.post('/job/:method', (req, res) => { if (!req.body) { return { 'message': 'No request provided.' }}; try { switch (req.params.method) { case 'create': work.create(req.body) .then((response) =>{ res.send(response); }).catch((error) => { res.send({'error': error }) }); break; default: res.send({'error': 'No method selected.'}) } } catch (err) { res.send({'error': 'Error with request shape.', err}) } }); /* This route is for adding new WorkerNodes to the database. */ app.post('/worker-node/:method', (req, res) => { if (!req.body) { return { 'message': 'No request provided.' }}; try { switch (req.params.method) { case 'create': workerNode.create(req.body) .then((response) =>{ res.send(response); }).catch((error) =>{ res.send({'error': error.message}); }) break; default: throw err; } } catch (err) { res.send({'error': 'Error with request shape.', err }) } }); app.post('/callback', (req, res) => { if (!req.body) { return { 'message': 'No request provided.' }}; let outcome = req.body; console.log(outcome); try { work.file(outcome) .then((response) =>{ console.log(response); res.send(response); }) } catch (err) { res.send({'error': 'Error with request shape.', err }) } }); /* Route for Worker Node to let the Boss know it needs a Job. The oldest Job which is unassigned is provided. */ app.post('/bored/:id', (req, res) => { if (!req.body) { return { 'message': 'No request provided.' }}; try { let workerNodeId = req.params.id; console.log(`${workerNodeId} said it's bored.`); if (!workerNodeId) { throw {'error': 'No id provided.'}} Order.findOne({ status: 'unassigned' }, {}, { sort: { 'created_at' : -1 } }, (err, order) => { console.log(`Found a work order, #${order._id}`) order.status = 'assigned'; console.log(`Provided ${workerNodeId} with ${order.jobId}`); order.save() .then((doc) => { console.log(`Updated the Order #${doc.id}'s status to ${order.status}`); res.send(doc); }); }) .catch((err) => { res.send({'message': `No work to do. Don't get used to it.`}) }); } catch (err) { res.send({'error': 'Error with request shape.', err }) } }); /* Retrieve Orders or Job */ app.get('/retrieve/:type/:id?/:param1?', (req, res) => { if (!req.body) { return { 'message': 'No request provided.' }}; try { let type = req.params.type; let id = req.params.id; let param1 = req.params.param1; switch(type) { case 'order': Order.find().then((response) => { res.send(response); }); break; case 'job': if (!id) { throw {'error': 'Missing Id'} } Job.findOne({'_id': id }) .then((response) => { res.send(response); }); break; default: throw error } } catch (err) { res.send({'error': 'Error with request shape.', err }) } }) app.listen(port, () => { console.log(`Started on port ${port}`); });
Top comments (0)