Skip to content

Commit 909da2b

Browse files
authored
fix(move-to-delayed): consider promoting delayed jobs (#1493)
1 parent a1ee543 commit 909da2b

File tree

4 files changed

+141
-58
lines changed

4 files changed

+141
-58
lines changed

src/classes/scripts.ts

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -496,7 +496,11 @@ export class Scripts {
496496
}
497497

498498
// Note: We have an issue here with jobs using custom job ids
499-
moveToDelayedArgs(jobId: string, timestamp: number, token: string): string[] {
499+
moveToDelayedArgs(
500+
jobId: string,
501+
timestamp: number,
502+
token: string,
503+
): (string | number)[] {
500504
//
501505
// Bake in the job id first 12 bits into the timestamp
502506
// to guarantee correct execution order of delayed jobs
@@ -510,12 +514,28 @@ export class Scripts {
510514
timestamp = timestamp * 0x1000 + (+jobId & 0xfff);
511515
}
512516

513-
const keys = ['active', 'delayed', jobId].map(name => {
517+
const keys: (string | number)[] = [
518+
'wait',
519+
'active',
520+
'priority',
521+
'delayed',
522+
jobId,
523+
].map(name => {
514524
return this.queue.toKey(name);
515525
});
516-
keys.push.apply(keys, [this.queue.keys.events, this.queue.keys.delay]);
526+
keys.push.apply(keys, [
527+
this.queue.keys.events,
528+
this.queue.keys.paused,
529+
this.queue.keys.meta,
530+
]);
517531

518-
return keys.concat([JSON.stringify(timestamp), jobId, token]);
532+
return keys.concat([
533+
this.queue.keys[''],
534+
Date.now(),
535+
JSON.stringify(timestamp),
536+
jobId,
537+
token,
538+
]);
519539
}
520540

521541
moveToWaitingChildrenArgs(

src/commands/moveToDelayed-5.lua

Lines changed: 0 additions & 54 deletions
This file was deleted.

src/commands/moveToDelayed-8.lua

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
--[[
2+
Moves job from active to delayed set.
3+
4+
Input:
5+
KEYS[1] wait key
6+
KEYS[2] active key
7+
KEYS[3] priority key
8+
KEYS[4] delayed key
9+
KEYS[5] job key
10+
KEYS[6] events stream
11+
KEYS[7] paused key
12+
KEYS[8] meta key
13+
14+
ARGV[1] key prefix
15+
ARGV[2] timestamp
16+
ARGV[3] delayedTimestamp
17+
ARGV[4] the id of the job
18+
ARGV[5] queue token
19+
20+
Output:
21+
0 - OK
22+
-1 - Missing job.
23+
-3 - Job not in active set.
24+
25+
Events:
26+
- delayed key.
27+
]]
28+
local rcall = redis.call
29+
30+
-- Includes
31+
--- @include "includes/promoteDelayedJobs"
32+
33+
promoteDelayedJobs(KEYS[4], KEYS[1], KEYS[3], KEYS[7], KEYS[8], KEYS[6], ARGV[1], ARGV[2])
34+
35+
if rcall("EXISTS", KEYS[5]) == 1 then
36+
37+
if ARGV[5] ~= "0" then
38+
local lockKey = KEYS[5] .. ':lock'
39+
if rcall("GET", lockKey) == ARGV[5] then
40+
rcall("DEL", lockKey)
41+
else
42+
return -2
43+
end
44+
end
45+
46+
local jobId = ARGV[4]
47+
local score = tonumber(ARGV[3])
48+
local delayedTimestamp = (score / 0x1000)
49+
50+
local numRemovedElements = rcall("LREM", KEYS[2], -1, jobId)
51+
52+
if(numRemovedElements < 1) then
53+
return -3
54+
end
55+
56+
rcall("ZADD", KEYS[4], score, jobId)
57+
58+
rcall("XADD", KEYS[6], "*", "event", "delayed", "jobId", jobId, "delay", delayedTimestamp);
59+
60+
return 0
61+
else
62+
return -1
63+
end

tests/test_delay.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -198,6 +198,60 @@ describe('Delayed jobs', function () {
198198
await worker2.close();
199199
});
200200

201+
describe('when delayed jobs are ready when pending jobs are moved to delayed', function () {
202+
it('processes jobs without getting stuck', async () => {
203+
const countJobs = 28;
204+
205+
for (let j = 0; j < countJobs; j++) {
206+
await queue.add(
207+
'test',
208+
{ foo: `bar${j}` },
209+
{ attempts: 2, backoff: 10 },
210+
);
211+
}
212+
213+
const concurrency = 50;
214+
215+
let worker: Worker;
216+
const processedJobs: { data: any }[] = [];
217+
const processing = new Promise<void>(resolve => {
218+
worker = new Worker(
219+
queueName,
220+
async (job: Job) => {
221+
if (job.attemptsMade == 1) {
222+
await delay(250);
223+
224+
throw new Error('error');
225+
}
226+
227+
await delay(25);
228+
229+
processedJobs.push({ data: job.data });
230+
231+
if (processedJobs.length == countJobs) {
232+
resolve();
233+
}
234+
235+
return;
236+
},
237+
{
238+
connection,
239+
concurrency,
240+
},
241+
);
242+
worker.on('error', err => {
243+
console.error(err);
244+
});
245+
});
246+
247+
await processing;
248+
249+
expect(processedJobs.length).to.be.equal(countJobs);
250+
251+
await worker.close();
252+
}).timeout(4000);
253+
});
254+
201255
it('should process delayed jobs with exact same timestamps in correct order (FIFO)', async function () {
202256
let order = 1;
203257
const numJobs = 43;

0 commit comments

Comments
 (0)