Skip to content
3 changes: 2 additions & 1 deletion queue/abstract.lua
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ queue.driver = {
fifo = require('queue.abstract.driver.fifo'),
fifottl = require('queue.abstract.driver.fifottl'),
utube = require('queue.abstract.driver.utube'),
utubettl = require('queue.abstract.driver.utubettl')
utubettl = require('queue.abstract.driver.utubettl'),
limfifottl = require('queue.abstract.driver.limfifottl')
}

-- tube methods
Expand Down
42 changes: 42 additions & 0 deletions queue/abstract/driver/limfifottl.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
local fiber = require('fiber')
local fifottl = require('queue.abstract.driver.fifottl')

local tube = {}
local methods = {}

tube.create_space = fifottl.create_space

-- start tube on space
function tube.new(space, on_task_change, opts)
local state = {
capacity = opts.capacity or 0,
parent = fifottl.new(space, on_task_change, opts)
}

-- put task in space
local put = function (self, data, opts)
local timeout = opts.timeout or 0
timeout = timeout * 1000000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If fiber.time64() will be changed on fiber.time(), then this multiplication will be not needed.


while true do
local tube_size = self.space:count()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self.space:len() is more preferable here, because it is faster, but has less precision.

if tube_size < state.capacity or state.capacity == 0 then
return state.parent.put(self, data, opts)
else
if timeout == 0 then
return nil
end

local started = fiber.time64()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think time64 is redundant, and I prefer to use fiber.time().

fiber.yield()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fiber.yield() may cause a big cpu load, so I prefer to use fiber.sleep(0.01)

local elapsed = fiber.time64() - started

timeout = timeout > elapsed and timeout - elapsed or 0
end
end
end

return setmetatable({put = put}, {__index = state.parent})
end

return tube
17 changes: 9 additions & 8 deletions rockspecs/queue-1.0.2-1.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,15 @@ build = {
type = 'builtin',

modules = {
['queue.abstract'] = 'queue/abstract.lua',
['queue.abstract.state'] = 'queue/abstract/state.lua',
['queue.abstract.driver.fifottl'] = 'queue/abstract/driver/fifottl.lua',
['queue.abstract.driver.utubettl'] = 'queue/abstract/driver/utubettl.lua',
['queue.abstract.driver.fifo'] = 'queue/abstract/driver/fifo.lua',
['queue.abstract.driver.utube'] = 'queue/abstract/driver/utube.lua',
['queue.compat'] = 'queue/compat.lua',
['queue'] = 'queue/init.lua'
['queue.abstract'] = 'queue/abstract.lua',
['queue.abstract.state'] = 'queue/abstract/state.lua',
['queue.abstract.driver.fifottl'] = 'queue/abstract/driver/fifottl.lua',
['queue.abstract.driver.utubettl'] = 'queue/abstract/driver/utubettl.lua',
['queue.abstract.driver.fifo'] = 'queue/abstract/driver/fifo.lua',
['queue.abstract.driver.utube'] = 'queue/abstract/driver/utube.lua',
['queue.abstract.driver.limfifottl'] = 'queue/abstract/driver/limfifottl.lua',
['queue.compat'] = 'queue/compat.lua',
['queue'] = 'queue/init.lua'
}
}

Expand Down
60 changes: 60 additions & 0 deletions t/100-limfifottl.t
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/usr/bin/env tarantool
local yaml = require('yaml')
local fiber = require('fiber')

local test = require('tap').test()
test:plan(8)

local queue = require('queue')
local state = require('queue.abstract.state')

local engine = os.getenv('ENGINE') or 'memtx'

local tnt = require('t.tnt')
tnt.cfg{}

test:ok(rawget(box, 'space'), 'box started')
test:ok(queue, 'queue is loaded')

local tube = queue.create_tube('lim3_tube', 'limfifottl', { engine = engine, capacity = 3 })
local unlim_tube = queue.create_tube('unlim_tube', 'limfifottl', { engine = engine })

test:ok(tube, 'test tube created')
test:is(tube.name, 'lim3_tube', 'tube.name')
test:is(tube.type, 'limfifottl', 'tube.type')

test:test('Put timeout is reached', function(test)
test:plan(4)

test:ok(tube:put{1}, 'task 1 was put')
test:ok(tube:put{2}, 'task 2 was put')
test:ok(tube:put{3}, 'task 3 was put')
test:is(tube:put({4}, {timeout = 0.1}), nil, 'task 4 wasn\'t put cause timeout')
end)

test:test('Put after freeing up space', function(test)
test:plan(3)
local put_fiber = fiber.create(function()
test:ok(tube:put({4}, {timeout = 1}), 'task 4 was put')
end)

local task = tube:take()
test:ok(task, 'task 3 was taken')
test:is(tube:ack(task[1])[2], state.DONE, 'task 3 is done')

while put_fiber:status() ~= 'dead' do
fiber.yield()
end
end)

test:test('Unlimited tube put', function(test)
test:plan(3)

test:is(unlim_tube:take(0), nil, 'tube is empty')
test:ok(unlim_tube:put{1}, 'task 1 was put')
test:ok(unlim_tube:put{2}, 'task 2 was put')
end)

tnt.finish()
os.exit(test:check() == true and 0 or -1)
-- vim: set ft=lua :