Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
utubettl: if the task being deleted was not taken, do not wake neighb…
…our task
  • Loading branch information
egor.iskandarov committed Sep 19, 2019
commit cae6b858e90c8b96c5f93d2f57bae80216d68449
36 changes: 17 additions & 19 deletions queue/abstract/driver/utubettl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -290,27 +290,24 @@ function method.take(self)
end
end

local function process_neighbour(self, task, operation)
self:on_task_change(task, operation)
if task ~= nil then
local neighbour = self.space.index.utube:min{state.READY, task[i_utube]}
if neighbour ~= nil
and neighbour[i_utube] == task[i_utube]
and neighbour[i_status] == state.READY
then
self:on_task_change(neighbour)
end
local function wake_neighbour(self, utube)
local neighbour = self.space.index.utube:select({state.READY, utube}, {limit=1})[1]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think using min() with checks is more correct (with link to tarantool/tarantool#3167)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aren't they equivalent?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant that the behavior of max() was different from expected and I see no reason to replace min() to select(). I agree with you that the default iterator is ALL(same as GE) and the results will be equivalent. But generally min () is used in utubettl driver and IMHO: it better reflects the idea. I may be wrong. What is the reason for choosing select()?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from the 1.10 docs:

In a future version of Tarantool, index:min(key value) will return nothing if key value is not equal to a value in the index.

so we cannot be sure that neighbour will be from the same utube on tarantool 1.10

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, as I understand you say about tarantool/tarantool@8bbd4a6 . Looks like you're right! Please add a comment. If you add a test, it will be great!

Copy link
Contributor

@LeonidVas LeonidVas Aug 11, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant that the behavior of max() was different from expected and I see no reason to replace min() to select(). I agree with you that the default iterator is ALL(same as GE) and the results will be equivalent.

The default iterator is 'EQ'.

if neighbour ~= nil then
self:on_task_change(neighbour)
end
return task
end

-- delete task
function method.delete(self, id)
local task = self.space:get(id)
self.space:delete(id)
if task ~= nil then
self.space:delete(id)
if task[i_status] == state.TAKEN then
wake_neighbour(self, task[i_utube])
end
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be implemented in less intruisive way, without refactoring of suppounding code.

local is_taken = task[i_status] == state.TAKEN task = task:transform(i_status, 1, state.DONE) if is_taken then return process_neighbour(self, task, 'delete') else self:on_task_change(task, 'delete') return task end
task = task:transform(i_status, 1, state.DONE)
return process_neighbour(self, task, 'delete')
self:on_task_change(task, 'delete')
return task
end
self:on_task_change(task, 'delete')
end
Expand All @@ -327,9 +324,9 @@ function method.release(self, id, opts)
{ '=', i_next_event, event_time(opts.delay) },
{ '+', i_ttl, time(opts.delay) }
})
if task ~= nil then
return process_neighbour(self, task, 'release')
end
self:on_task_change(task, 'release')
wake_neighbour(self, task[i_utube])
return task
else
task = self.space:update(id, {
{ '=', i_status, state.READY },
Expand All @@ -344,9 +341,10 @@ end
function method.bury(self, id)
local task = self.space:update(id, {{ '=', i_status, state.BURIED }})
if task ~= nil then
return process_neighbour(
self, task:transform(i_status, 1, state.BURIED), 'bury'
)
wake_neighbour(self, task[i_utube])
task = task:transform(i_status, 1, state.BURIED)
self:on_task_change(task, 'bury')
return task
end
self:on_task_change(task, 'bury')
end
Expand Down