@@ -42,10 +42,11 @@ function tube.create_space(space_name, opts)
4242end
4343
4444-- start tube on space
45- function tube .new (space , on_task_change )
45+ function tube .new (space , on_task_change , opts )
4646 on_task_change = on_task_change or (function () end )
4747 local self = setmetatable ({
4848 space = space ,
49+ concurrent = opts .concurrent or 1 ,
4950 on_task_change = on_task_change ,
5051 }, { __index = method })
5152 return self
@@ -65,21 +66,46 @@ function method.put(self, data, opts)
6566 return task
6667end
6768
69+ -- check concurrency of a sub-queue
70+ function method .is_throttled (self , utube )
71+ if self .concurrent == 1 then
72+ local taken = self .space .index .utube :min {state .TAKEN , utube }
73+ return taken ~= nil and taken [3 ] == utube
74+ elseif self .concurrent ~= 1 / 0 then
75+ local num_taken = self .space .index .utube :count {state .TAKEN , utube }
76+ return num_taken == self .concurrent
77+ end
78+ return false
79+ end
80+
6881-- take task
69- function method .take (self )
70- for s , task in self .space .index .status :pairs (state .READY ,
71- { iterator = ' GE' }) do
72- if task [2 ] ~= state .READY then
73- break
82+ function method .take (self , opts )
83+ local task
84+ if opts and opts .utube then
85+ if not self :is_throttled (opts .utube ) then
86+ local t = self .space .index .utube :min {state .READY , opts .utube }
87+ if t and t [2 ] == state .READY and t [3 ] == opts .utube then
88+ task = t
89+ end
7490 end
75-
76- local taken = self .space .index .utube :min {state .TAKEN , task [3 ]}
77- if taken == nil or taken [2 ] ~= state .TAKEN then
78- task = self .space :update (task [1 ], { { ' =' , 2 , state .TAKEN } })
79- self .on_task_change (task , ' take' )
80- return task
91+ else
92+ local utubes = {}
93+ for s , t in self .space .index .status :pairs (state .READY ) do
94+ local utube = t [3 ]
95+ if not utubes [utube ] then
96+ if not self :is_throttled (utube ) then
97+ task = t
98+ break
99+ end
100+ utubes [utube ] = true
101+ end
81102 end
82103 end
104+ if task then
105+ task = self .space :update (task [1 ], { { ' =' , 2 , state .TAKEN } })
106+ self .on_task_change (task , ' take' )
107+ return task
108+ end
83109end
84110
85111-- touch task
0 commit comments