Skip to content
This repository was archived by the owner on Jun 8, 2023. It is now read-only.

Commit 3ed7d7a

Browse files
committed
allow to queue tasks with a delay #release-note
1 parent ce86e1b commit 3ed7d7a

File tree

7 files changed

+120
-36
lines changed

7 files changed

+120
-36
lines changed

README.rst

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,24 @@ Launch a python shell and play with the api
133133
>>> task.status_string
134134
'SUCCESS'
135135
136+
You can also queue tasks with a ``delay``
137+
138+
.. code:: python
139+
140+
>>> task = api.queue_task('addition', a=4, b=6, callback=False, delay=2).wait()
141+
>>> task.status_string
142+
'QUEUED'
143+
>>> task.time_queued # timestamp
144+
>>> task = task.done_callback.wait()
145+
>>> task.status_string
146+
'SUCCESS'
147+
>>> task.time_started - task.time_queued
148+
2.00
149+
150+
**NOTE**: The ``wait`` method in a task future can only be used on the shell
151+
or when the event loop is not running. In all other cases one should ``await``
152+
for the task future in a coroutine.
153+
136154
API
137155
=============
138156

pq/mq.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -79,14 +79,19 @@ def queue(self, task, callback=True):
7979
called back once the task is done, otherwise return a future
8080
called back once the task is queued
8181
'''
82-
future = TaskFuture(task.id, self.backend, loop=self._loop)
82+
future_done = TaskFuture(task.id, self.backend, loop=self._loop)
8383
if task.queue:
84-
self.callbacks[task.id] = future
84+
self.callbacks[task.id] = future_done
8585
else: # the task is not queued instead it is executed immediately
8686
coro = self.backend._execute_task(task)
87+
return chain_future(coro, next=future_done)
88+
coro = self._queue_task(task, future_done)
89+
if callback:
90+
ensure_future(coro, loop=self._loop)
91+
return future_done
92+
else:
93+
future = TaskFuture(task.id, self.backend, loop=self._loop)
8794
return chain_future(coro, next=future)
88-
result = ensure_future(self._queue_task(task, future), loop=self._loop)
89-
return future if callback else result
9095

9196
@abstractmethod
9297
async def size(self, *queues): # pragma nocover

pq/server/consumer.py

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,26 @@ async def _execute_task(self, task, worker=None):
3232
expiry = task.expiry
3333
if expiry and time_ended > expiry:
3434
raise TaskTimeout
35-
else:
36-
kwargs = task.kwargs or {}
37-
task.status = states.STARTED
38-
task.time_started = time_ended
39-
if worker:
40-
task.worker = worker.aid
41-
logger.info(task.lazy_info())
42-
await self.pubsub.publish('started', task)
43-
job = JobClass(self, task)
44-
# This may block for a while
45-
task.result = await self._consume(job, kwargs)
35+
36+
if task.delay: # Task with delay
37+
start_time = task.time_queued + task.delay
38+
gap = start_time - time_ended
39+
if gap > 0:
40+
self._loop.call_later(gap, self._queue_again, task)
41+
if self._concurrent_tasks:
42+
self._concurrent_tasks.discard(task_id)
43+
return task
44+
45+
kwargs = task.kwargs or {}
46+
task.status = states.STARTED
47+
task.time_started = time_ended
48+
if worker:
49+
task.worker = worker.aid
50+
logger.info(task.lazy_info())
51+
await self.pubsub.publish('started', task)
52+
job = JobClass(self, task)
53+
# This may block for a while
54+
task.result = await self._consume(job, kwargs)
4655
else:
4756
raise TaskError('Invalid status %s' % task.status_string)
4857
except TaskTimeout:
@@ -108,6 +117,9 @@ async def _consume_in_subprocess(self, job, kwargs):
108117
raise RemoteStackTrace
109118
return job.task.result
110119

120+
def _queue_again(self, task):
121+
self.broker.queue(task, False)
122+
111123
def json_params(self):
112124
for name, value in self.cfg.items():
113125
try:

pq/server/producer.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,22 +26,23 @@ class TaskProducer(models.RegistryMixin, ExecutorMixin, ABC):
2626
"""
2727
app = None
2828

29-
def __init__(self, cfg, logger=None, **kw):
29+
def __init__(self, cfg, *, logger=None, **kw):
3030
self.cfg = cfg
3131
self.logger = logger or logging.getLogger('pulsar.queue')
3232
self._closing = False
33-
store = create_store(cfg.data_store)
33+
loop = cfg.params.pop('loop', None)
34+
store = create_store(cfg.data_store, loop=loop)
3435
if not cfg.message_broker:
3536
broker = store
3637
else:
37-
broker = create_store(cfg.message_broker)
38+
broker = create_store(cfg.message_broker, loop=loop)
3839
if self.cfg.callable:
3940
self.app = self.cfg.callable(self)
4041
self.store_task = getattr(self.app, 'store_task', store_task)
4142
self.pubsub = PubSub(self, store)
4243
self.broker = brokers.get(broker.name)(self, broker)
4344
self.green_pool = getattr(self.app, 'green_pool', GreenPool())
44-
self.http = getattr(self.app, 'http', HttpClient())
45+
self.http = getattr(self.app, 'http', HttpClient(loop=loop))
4546

4647
def __str__(self):
4748
return repr(self)
@@ -103,7 +104,11 @@ def queue_task(self, jobname, callback=True, **kwargs):
103104
'''
104105
task = self._create_task(jobname, **kwargs)
105106
if task:
106-
return self.broker.queue(task, callback)
107+
future = self.broker.queue(task, callback)
108+
if self._loop.is_running():
109+
return self.green_pool.wait(future)
110+
else:
111+
return future
107112

108113
def queue_task_local(self, jobname, **kwargs):
109114
kwargs['queue'] = self.node_name
@@ -118,7 +123,7 @@ def execute_task(self, jobname, **kwargs):
118123

119124
# INTERNALS
120125
def _create_task(self, jobname, meta_params=None, expiry=None, queue=True,
121-
**kwargs):
126+
delay=None, **kwargs):
122127
'''Try to queue a new :ref:`Task`.
123128
124129
This method returns a :class:`.Future` which results in the
@@ -160,6 +165,7 @@ def _create_task(self, jobname, meta_params=None, expiry=None, queue=True,
160165
expiry=expiry,
161166
kwargs=kwargs,
162167
status=states.QUEUED,
168+
delay=delay,
163169
**meta_params)
164170
else:
165171
raise TaskNotAvailable(jobname)

pq/server/pubsub.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ def __call__(self, channel, message):
6464

6565
def _channel(self, event=''):
6666
event = 'task_%s' % event
67-
prefix = self.cfg.task_queue_prefix
67+
prefix = self.cfg.name
6868
return '%s_%s' % (prefix, event) if prefix else event
6969

7070

pq/tasks/task.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ class Task:
3737
exception = None
3838
stacktrace = None
3939
worker = None
40+
delay = None
4041

4142
def __init__(self, id=None, name=None, time_queued=None,
4243
expiry=None, status=None, kwargs=None, queue=None,

tests/test_app.py

Lines changed: 56 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -46,19 +46,16 @@ def rpc_name(cls):
4646
@classmethod
4747
async def setUpClass(cls):
4848
# The name of the task queue application
49-
name = cls.name()
50-
queues = ['%s1' % name, '%s2' % name]
51-
pq = api.PulsarQueue(cls.name(),
52-
wsgi=True,
53-
config='tests.config',
54-
queue_callable=dummy,
55-
task_queues=queues,
56-
default_task_queue=queues[0],
57-
schedule_periodic=cls.schedule_periodic,
58-
rpc_bind='127.0.0.1:0',
59-
concurrency=cls.concurrency,
60-
rpc_concurrency=cls.concurrency,
61-
rpc_keep_alive=cls.rpc_timeout)
49+
params = cls.params()
50+
params.update(dict(
51+
wsgi=True,
52+
schedule_periodic=cls.schedule_periodic,
53+
rpc_bind='127.0.0.1:0',
54+
concurrency=cls.concurrency,
55+
rpc_concurrency=cls.concurrency,
56+
rpc_keep_alive=cls.rpc_timeout
57+
))
58+
pq = api.PulsarQueue(**params)
6259
cfgs = await pq.start()
6360
cls.tq = cfgs[0].app()
6461
cls.rpc = cfgs[1].app()
@@ -67,14 +64,30 @@ async def setUpClass(cls):
6764
timeout=cls.rpc_timeout)
6865
# Now flush the task queue
6966
backend = await cls.tq.backend.start()
70-
await backend.flush_queues(*queues)
67+
await backend.flush_queues(*cls.queues())
7168

7269
@classmethod
7370
def tearDownClass(cls):
7471
coros = [send('arbiter', 'kill_actor', a.name) for a in
7572
(cls.tq, cls.rpc) if a is not None]
7673
return asyncio.gather(*coros)
7774

75+
@classmethod
76+
def queues(cls):
77+
name = cls.name()
78+
return ['%s1' % name, '%s2' % name]
79+
80+
@classmethod
81+
def params(cls):
82+
queues = cls.queues()
83+
return dict(
84+
name=cls.name(),
85+
config='tests.config',
86+
queue_callable=dummy,
87+
task_queues=queues,
88+
default_task_queue=queues[0]
89+
)
90+
7891

7992
class TestTaskQueue(TaskQueueBase, unittest.TestCase):
8093

@@ -261,6 +274,35 @@ async def test_scrape(self):
261274
self.assertEqual(task.status_string, 'SUCCESS')
262275
self.assertTrue(task.result)
263276

277+
async def test_delay(self):
278+
task = await self.tq.backend.queue_task('scrape',
279+
delay=2,
280+
url='https://www.bbc.co.uk/')
281+
282+
self.assertEqual(task.status_string, 'SUCCESS')
283+
self.assertEqual(task.delay, 2)
284+
self.assertTrue(task.time_started - task.time_queued > 2)
285+
self.assertTrue(task.result)
286+
287+
def test_sync(self):
288+
loop = asyncio.new_event_loop()
289+
tasks = api.TaskApp(loop=loop, **self.params()).backend
290+
self.assertEqual(tasks._loop, loop)
291+
task = tasks.queue_task('scrape', url='https://github.com')
292+
self.assertIsInstance(task, asyncio.Future)
293+
self.assertTrue(task.task_id)
294+
task = task.wait()
295+
self.assertEqual(task.status_string, 'SUCCESS')
296+
self.assertFalse(tasks._loop.is_running())
297+
#
298+
task = tasks.queue_task('scrape', url='https://github.com',
299+
callback=False)
300+
task = task.wait()
301+
self.assertEqual(task.status_string, 'QUEUED')
302+
self.assertTrue(task.done_callback)
303+
task = task.done_callback.wait()
304+
self.assertEqual(task.status_string, 'SUCCESS')
305+
264306
# RPC
265307
async def test_rpc_job_list(self):
266308
data = await self.proxy.job_list()

0 commit comments

Comments
 (0)