|  | 
|  | 1 | +// Copyright (c) Microsoft Corporation. All rights reserved. | 
|  | 2 | +// Licensed under the MIT License. | 
|  | 3 | + | 
|  | 4 | +import { traceError } from '../logger'; | 
|  | 5 | +import { createDeferred, Deferred } from './async'; | 
|  | 6 | + | 
|  | 7 | +interface IWorker { | 
|  | 8 | + /** | 
|  | 9 | + * Start processing of items. | 
|  | 10 | + * @method stop | 
|  | 11 | + */ | 
|  | 12 | + start(): void; | 
|  | 13 | + /** | 
|  | 14 | + * Stops any further processing of items. | 
|  | 15 | + * @method stop | 
|  | 16 | + */ | 
|  | 17 | + stop(): void; | 
|  | 18 | +} | 
|  | 19 | + | 
|  | 20 | +type NextFunc<T> = () => Promise<T>; | 
|  | 21 | +type WorkFunc<T, R> = (item: T) => Promise<R>; | 
|  | 22 | +type PostResult<T, R> = (item: T, result?: R, err?: Error) => void; | 
|  | 23 | + | 
|  | 24 | +interface IWorkItem<T> { | 
|  | 25 | + item: T; | 
|  | 26 | +} | 
|  | 27 | + | 
|  | 28 | +export enum QueuePosition { | 
|  | 29 | + Back, | 
|  | 30 | + Front | 
|  | 31 | +} | 
|  | 32 | + | 
|  | 33 | +export interface IWorkerPool<T, R> extends IWorker { | 
|  | 34 | + /** | 
|  | 35 | + * Add items to be processed to a queue. | 
|  | 36 | + * @method addToQueue | 
|  | 37 | + * @param {T} item: Item to process | 
|  | 38 | + * @param {QueuePosition} position: Add items to the front or back of the queue. | 
|  | 39 | + * @returns A promise that when resolved gets the result from running the worker function. | 
|  | 40 | + */ | 
|  | 41 | + addToQueue(item: T, position?: QueuePosition): Promise<R>; | 
|  | 42 | +} | 
|  | 43 | + | 
|  | 44 | +class Worker<T, R> implements IWorker { | 
|  | 45 | + private stopProcessing: boolean = false; | 
|  | 46 | + public constructor( | 
|  | 47 | + private readonly next: NextFunc<T>, | 
|  | 48 | + private readonly workFunc: WorkFunc<T, R>, | 
|  | 49 | + private readonly postResult: PostResult<T, R>, | 
|  | 50 | + private readonly name: string | 
|  | 51 | + ) {} | 
|  | 52 | + public stop() { | 
|  | 53 | + this.stopProcessing = true; | 
|  | 54 | + } | 
|  | 55 | + | 
|  | 56 | + public async start() { | 
|  | 57 | + while (!this.stopProcessing) { | 
|  | 58 | + try { | 
|  | 59 | + const workItem = await this.next(); | 
|  | 60 | + try { | 
|  | 61 | + const result = await this.workFunc(workItem); | 
|  | 62 | + this.postResult(workItem, result); | 
|  | 63 | + } catch (ex) { | 
|  | 64 | + this.postResult(workItem, undefined, ex); | 
|  | 65 | + } | 
|  | 66 | + } catch (ex) { | 
|  | 67 | + // Next got rejected. Likely worker pool is shutting down. | 
|  | 68 | + // continue here and worker will exit if the worker pool is shutting down. | 
|  | 69 | + traceError(`Error while running worker[${this.name}].`, ex); | 
|  | 70 | + continue; | 
|  | 71 | + } | 
|  | 72 | + } | 
|  | 73 | + } | 
|  | 74 | +} | 
|  | 75 | + | 
|  | 76 | +class WorkQueue<T, R> { | 
|  | 77 | + private readonly items: IWorkItem<T>[] = []; | 
|  | 78 | + private readonly results: Map<IWorkItem<T>, Deferred<R>> = new Map(); | 
|  | 79 | + public add(item: T, position?: QueuePosition): Promise<R> { | 
|  | 80 | + // Wrap the user provided item in a wrapper object. This will allow us to track multiple | 
|  | 81 | + // submissions of the same item. For example, addToQueue(2), addToQueue(2). If we did not | 
|  | 82 | + // wrap this, then from the map both submissions will look the same. Since this is a generic | 
|  | 83 | + // worker pool, we do not know if we can resolve both using the same promise. So, a better | 
|  | 84 | + // approach is to ensure each gets a unique promise, and let the worker function figure out | 
|  | 85 | + // how to handle repeat submissions. | 
|  | 86 | + const workItem: IWorkItem<T> = { item }; | 
|  | 87 | + if (position === QueuePosition.Front) { | 
|  | 88 | + this.items.unshift(workItem); | 
|  | 89 | + } else { | 
|  | 90 | + this.items.push(workItem); | 
|  | 91 | + } | 
|  | 92 | + | 
|  | 93 | + // This is the promise that will be resolved when the work | 
|  | 94 | + // item is complete. We save this in a map to resolve when | 
|  | 95 | + // the worker finishes and posts the result. | 
|  | 96 | + const deferred = createDeferred<R>(); | 
|  | 97 | + this.results.set(workItem, deferred); | 
|  | 98 | + | 
|  | 99 | + return deferred.promise; | 
|  | 100 | + } | 
|  | 101 | + | 
|  | 102 | + public completed(workItem: IWorkItem<T>, result?: R, error?: Error): void { | 
|  | 103 | + const deferred = this.results.get(workItem); | 
|  | 104 | + if (deferred !== undefined) { | 
|  | 105 | + this.results.delete(workItem); | 
|  | 106 | + if (error !== undefined) { | 
|  | 107 | + deferred.reject(error); | 
|  | 108 | + } | 
|  | 109 | + deferred.resolve(result); | 
|  | 110 | + } | 
|  | 111 | + } | 
|  | 112 | + | 
|  | 113 | + public next(): IWorkItem<T> | undefined { | 
|  | 114 | + return this.items.shift(); | 
|  | 115 | + } | 
|  | 116 | + | 
|  | 117 | + public clear(): void { | 
|  | 118 | + this.results.forEach((v: Deferred<R>, k: IWorkItem<T>, map: Map<IWorkItem<T>, Deferred<R>>) => { | 
|  | 119 | + v.reject(Error('Queue stopped processing')); | 
|  | 120 | + map.delete(k); | 
|  | 121 | + }); | 
|  | 122 | + } | 
|  | 123 | +} | 
|  | 124 | + | 
|  | 125 | +class WorkerPool<T, R> implements IWorkerPool<T, R> { | 
|  | 126 | + // This collection tracks the full set of workers. | 
|  | 127 | + private readonly workers: IWorker[] = []; | 
|  | 128 | + | 
|  | 129 | + // A collections that holds unblock callback for each worker waiting | 
|  | 130 | + // for a work item when the queue is empty | 
|  | 131 | + private readonly waitingWorkersUnblockQueue: { unblock(w: IWorkItem<T>): void; stop(): void }[] = []; | 
|  | 132 | + | 
|  | 133 | + // A collection that manages the work items. | 
|  | 134 | + private readonly queue = new WorkQueue<T, R>(); | 
|  | 135 | + | 
|  | 136 | + // State of the pool manages via stop(), start() | 
|  | 137 | + private stopProcessing = false; | 
|  | 138 | + | 
|  | 139 | + public constructor( | 
|  | 140 | + private readonly workerFunc: WorkFunc<T, R>, | 
|  | 141 | + private readonly numWorkers: number = 2, | 
|  | 142 | + private readonly name: string = 'Worker' | 
|  | 143 | + ) {} | 
|  | 144 | + | 
|  | 145 | + public addToQueue(item: T, position?: QueuePosition): Promise<R> { | 
|  | 146 | + if (this.stopProcessing) { | 
|  | 147 | + throw Error('Queue is stopped'); | 
|  | 148 | + } | 
|  | 149 | + | 
|  | 150 | + // This promise when resolved should return the processed result of the item | 
|  | 151 | + // being added to the queue. | 
|  | 152 | + const deferred = this.queue.add(item, position); | 
|  | 153 | + | 
|  | 154 | + const worker = this.waitingWorkersUnblockQueue.shift(); | 
|  | 155 | + if (worker) { | 
|  | 156 | + const workItem = this.queue.next(); | 
|  | 157 | + if (workItem !== undefined) { | 
|  | 158 | + // If we are here it means there were no items to process in the queue. | 
|  | 159 | + // At least one worker is free and waiting for a work item. Call 'unblock' | 
|  | 160 | + // and give the worker the newly added item. | 
|  | 161 | + worker.unblock(workItem); | 
|  | 162 | + } else { | 
|  | 163 | + // Something is wrong, we should not be here. we just added an item to | 
|  | 164 | + // the queue. It should not be empty. | 
|  | 165 | + traceError('Work queue was empty immediately after adding item.'); | 
|  | 166 | + } | 
|  | 167 | + } | 
|  | 168 | + | 
|  | 169 | + return deferred; | 
|  | 170 | + } | 
|  | 171 | + | 
|  | 172 | + public start() { | 
|  | 173 | + this.stopProcessing = false; | 
|  | 174 | + let num = this.numWorkers; | 
|  | 175 | + while (num > 0) { | 
|  | 176 | + this.workers.push( | 
|  | 177 | + new Worker<IWorkItem<T>, R>( | 
|  | 178 | + () => this.nextWorkItem(), | 
|  | 179 | + (workItem: IWorkItem<T>) => this.workerFunc(workItem.item), | 
|  | 180 | + (workItem: IWorkItem<T>, result?: R, error?: Error) => | 
|  | 181 | + this.queue.completed(workItem, result, error), | 
|  | 182 | + `${this.name} ${num}` | 
|  | 183 | + ) | 
|  | 184 | + ); | 
|  | 185 | + num = num - 1; | 
|  | 186 | + } | 
|  | 187 | + this.workers.forEach(async (w) => w.start()); | 
|  | 188 | + } | 
|  | 189 | + | 
|  | 190 | + public stop(): void { | 
|  | 191 | + this.stopProcessing = true; | 
|  | 192 | + | 
|  | 193 | + // Signal all registered workers with this worker pool to stop processing. | 
|  | 194 | + // Workers should complete the task they are currently doing. | 
|  | 195 | + let worker = this.workers.shift(); | 
|  | 196 | + while (worker) { | 
|  | 197 | + worker.stop(); | 
|  | 198 | + worker = this.workers.shift(); | 
|  | 199 | + } | 
|  | 200 | + | 
|  | 201 | + // Remove items from queue. | 
|  | 202 | + this.queue.clear(); | 
|  | 203 | + | 
|  | 204 | + // This is necessary to exit any worker that is waiting for an item. | 
|  | 205 | + // If we don't unblock here then the worker just remains blocked | 
|  | 206 | + // forever. | 
|  | 207 | + let blockedWorker = this.waitingWorkersUnblockQueue.shift(); | 
|  | 208 | + while (blockedWorker) { | 
|  | 209 | + blockedWorker.stop(); | 
|  | 210 | + blockedWorker = this.waitingWorkersUnblockQueue.shift(); | 
|  | 211 | + } | 
|  | 212 | + } | 
|  | 213 | + | 
|  | 214 | + public nextWorkItem(): Promise<IWorkItem<T>> { | 
|  | 215 | + // Note that next() will return `undefined` if the queue is empty. | 
|  | 216 | + const nextWorkItem = this.queue.next(); | 
|  | 217 | + if (nextWorkItem !== undefined) { | 
|  | 218 | + return Promise.resolve(nextWorkItem); | 
|  | 219 | + } | 
|  | 220 | + | 
|  | 221 | + // Queue is Empty, so return a promise that will be resolved when | 
|  | 222 | + // new items are added to the queue. | 
|  | 223 | + return new Promise<IWorkItem<T>>((resolve, reject) => { | 
|  | 224 | + this.waitingWorkersUnblockQueue.push({ | 
|  | 225 | + unblock: (workItem?: IWorkItem<T>) => { | 
|  | 226 | + // This will be called to unblock any worker waiting for items. | 
|  | 227 | + if (this.stopProcessing) { | 
|  | 228 | + // We should reject here since the processing should be stopped. | 
|  | 229 | + reject(); | 
|  | 230 | + } | 
|  | 231 | + // If we are here, the queue received a new work item. Resolve with that item. | 
|  | 232 | + resolve(workItem); | 
|  | 233 | + }, | 
|  | 234 | + stop: () => { | 
|  | 235 | + reject(); | 
|  | 236 | + } | 
|  | 237 | + }); | 
|  | 238 | + }); | 
|  | 239 | + } | 
|  | 240 | +} | 
|  | 241 | + | 
|  | 242 | +export function createWorkerPool<T, R>( | 
|  | 243 | + workerFunc: WorkFunc<T, R>, | 
|  | 244 | + numWorkers: number = 2, | 
|  | 245 | + name: string = 'Worker' | 
|  | 246 | +): IWorkerPool<T, R> { | 
|  | 247 | + const pool = new WorkerPool<T, R>(workerFunc, numWorkers, name); | 
|  | 248 | + pool.start(); | 
|  | 249 | + return pool; | 
|  | 250 | +} | 
0 commit comments