Continuation to the first part. This post assumes you are a developer with working knowledge on AWS, Lambda, EventBridge, MongoDB, NodeJs.
Technical Implementation:
1. Design State-Machine
This step you need to figure out, how do you want to orchestrate your jobs and create an object like below.
const stateMachine = { id: '1', purposeId: 'catch_the_fish', currentPhase: 'thinking', phases: { thinking: { steps: { `thinking`: { jobs: ['buy_fishing_gear'], }, buy_fishing_gear: { jobs: ['rent_a_boat'], }, rent_a_boat: { jobs: ['go_to_fishing_ground'], }, go_to_fishing_ground: { jobs: ['cast_fishing_pole', 'drink_beer', 'catch_fish'], }, catch_fish: { jobs: ['go_home'], }, }, finished: [], next: 'end', waitFor: [ 'buy_fishing_gear', 'rent_a_boat', 'go_to_fishing_ground', 'cast_fishing_pole', 'drink_beer', 'catch_fish', 'go_home', ], }, }, };
In the above example we are running buy_fishing_gear
after thinking
job is finished in a sequential manner, but we run cast_fishing_pole
, drink_beer
and catch_fish
in parallel after go_to_fishing_ground
is successful.
2. Start the Process:
Creation of the statemachine could be anything from API to a cron job. Lets take API as an example as POST /process/catch-the-fish.
router.post('/process/catch-the-fish', async (req, res, next) => { const state = {}; // above mentioned state // Insert into state-machines const createdState = await db.collection('state-machines').insertOne(state); // Start the process by sending first event. As thinking is success we start the process of catching fish await db.collection('statuses').insertOne({ stateId: createdState._id, status: 'success', job: 'thinking', date: new Date(), // additional parameters as needed }); });
3. State Machine Job:
exports.handler = async (event, ctx, callback) => { // Mongodb Document from the trigger const document = event.detail.fullDocument; // Find the state machine const state = await db .collection('state-machines') .findOne({ _id: document.stateId }); // if its ended do nothing if (state?.currentPhase === 'end') { callback(); } // Update the job as finished for the state machine const currentPhase = state.phases[state.currentPhase]; currentPhase.finished.push(document.job); await db .collection('state-machines') .findOneAndUpdate( { _id: document.stateId }, { $set: { [`phases.${state.currentPhase}`]: currentPhase } } ); const jobsRemaining = currentPhase.waitFor.filter( (s) => !currentPhase.finished.includes(s) ); // If there are no remaining jobs in the current phase if (jobsRemaining.length === 0) { // Update the currentPhase to be the next Phase. await db.collection('state-machines').findOneAndUpdate( { _id: document.stateId }, { $set: { currentPhase: currentPhase.next, }, } ); // If the next phase is not end trigger the phase. if (currentPhase.next !== 'end') { await db.collection('statuses').insertOne({ ...document, job: `${currentPhase.next}Start`, status: 'success', }); } } };
Above, whenever a job succeeds this will update the finished jobs and check if the phase is finished and move onto next phase until it meets the end phase.
4. Orchestrator Job:
exports.handler = async (event, ctx, callback) => { ctx.callbackWaitsForEmptyEventLoop = false; const document = event.detail.fullDocument; const state = await db .collection('pipeline-state-machines') .findOne({ _id: document.stateId }); const currentPhase = state[state.currentPhase]; if (!document.stateId) { console.log('Cannot execute the pipeline without the state id:', state); return { success: false }; } if (state.currentPhase === 'end') { console.log('Cannot execute the pipeline without the state id:', state); return { success: false }; } if (!currentPhase) { console.log('Cannot execute the pipeline without the state id:', state); return { success: false }; } const step = currentPhase.steps[document.job]; if (step && step.jobs && step.jobs.length > 0) { await Promise.all( step.jobs.map(async (job) => { const payload = {} const event = { FunctionName: job, InvocationType: 'Event', LogType: 'Tail', Payload: JSON.stringify(payload), }; return lambda.invoke(event); }) ); } callback(); };
The above job receives the success event and finds the next jobs to trigger and invoke them.
5. Notifications Job:
As above the job receives all the events so we can use the data and structure the message to send.
What if there are multiple phases:
Then we just have to add another phase to existing state machine configuration.
const stateMachine = { id: '1', purposeId: 'catch_the_fish', currentPhase: 'thinking', phases: { thinking: { // Prev things }, cooking: { steps: { cookingStart: { jobs: ['clean'], }, clean: { jobs: ['cook'], }, }, waitFor: ['cook', 'clean'], finished: [], next: 'end', }, }, };
I hope the above code and explanation gives you a way to implement your own solutions. For anymore details please do comment, I would be happy to help. Thanks.
Top comments (0)