Skip to content
This repository was archived by the owner on Jun 8, 2023. It is now read-only.

Commit 65d3eca

Browse files
committed
bug fix for closing consumer - added tests
1 parent 490e294 commit 65d3eca

File tree

9 files changed

+105
-38
lines changed

9 files changed

+105
-38
lines changed

.travis.yml

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ language: python
33
sudo: false
44

55
python:
6-
- 3.5.1
6+
- 3.5.2
77

88
services:
99
- redis-server
@@ -14,12 +14,7 @@ install:
1414
script:
1515
- python setup.py test --coverage -q
1616
- flake8
17-
- if [[ $TRAVIS_PYTHON_VERSION == '3.5.1' ]]; then python setup.py test --coveralls; fi
17+
- python setup.py test --coveralls
1818

1919
notifications:
2020
email: false
21-
22-
branches:
23-
only:
24-
- master
25-
- dev

README.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,12 @@ value.
331331
Data store used for publishing and subscribing to messages (redis is the
332332
only backend available at the moment)
333333

334+
* **max_requests** (``--max-requests 0``)
335+
336+
The maximum number of tasks a worker will process before restarting.
337+
A 0 value (the default) means no maximum number, workers will process
338+
all tasks forever.
339+
334340
* **message_broker** (``--message-broker ...``)
335341

336342
Data store used as distributed task queue. If not provided (default) the

pq/server/apps.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ async def monitor_start(self, monitor, exc=None):
3838
if not exc:
3939
if self.cfg.schedule_periodic:
4040
self.backend_factory = TaskScheduler
41+
elif self.cfg.workers:
42+
self.backend_factory = TaskProducer
4143
self._backend = await self._start(monitor)
4244

4345
def monitor_task(self, monitor):

pq/server/consumer.py

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@ def __new__(cls, *args, **kwargs):
132132
o = super().__new__(cls)
133133
o._processed = 0
134134
o._next_time = 1
135-
o._closing_msg = None
136135
o._closing_waiter = None
137136
o._concurrent_tasks = set()
138137
return o
@@ -161,7 +160,7 @@ async def start(self, worker):
161160
'''Starts consuming tasks
162161
'''
163162
await self.pubsub.start()
164-
self._pool_tasks(worker)
163+
self._poll_tasks(worker)
165164
self.logger.warning('%s started polling tasks', self)
166165
return self
167166

@@ -171,24 +170,22 @@ def closing(self):
171170
def close(self, msg=None):
172171
if not self.closing():
173172
self._closing_waiter = Future(loop=self._loop)
174-
self._closing_msg = msg
173+
self.logger.warning(msg)
175174
return self._closing_waiter
176175

177176
# #######################################################################
178177
# # PRIVATE METHODS
179178
# #######################################################################
180-
def _pool_tasks(self, worker, next_time=None):
181-
if self.closing():
182-
if not self._concurrent_tasks:
183-
self._do_close()
184-
else:
185-
if worker.is_running() and not next_time:
186-
ensure_future(self._may_pool_task(worker), loop=worker._loop)
187-
elif not worker.after_run():
188-
next_time = next_time or 0
189-
worker._loop.call_later(next_time, self._pool_tasks, worker)
190-
191-
async def _may_pool_task(self, worker):
179+
def _poll_tasks(self, worker, next_time=None):
180+
if self.closing() and not self._concurrent_tasks:
181+
self._do_close()
182+
elif worker.is_running() and not next_time:
183+
ensure_future(self._may_poll_task(worker), loop=worker._loop)
184+
elif not worker.after_run():
185+
next_time = next_time or 0
186+
worker._loop.call_later(next_time, self._poll_tasks, worker)
187+
188+
async def _may_poll_task(self, worker):
192189
# Called in the ``worker`` event loop.
193190
#
194191
# It pools a new task if possible, and add it to the queue of
@@ -200,7 +197,8 @@ async def _may_pool_task(self, worker):
200197
max_tasks = self.cfg.max_requests
201198
if max_tasks and self._processed >= max_tasks:
202199
self.close(
203-
'Processed %s tasks. Restarting.' % self._processed
200+
'Processed %s tasks. Stop polling tasks.'
201+
% self._processed
204202
)
205203

206204
if not self.closing():
@@ -238,7 +236,7 @@ async def _may_pool_task(self, worker):
238236
next_time = self._next_time
239237
await self._broadcast(worker)
240238

241-
self._pool_tasks(worker, next_time)
239+
self._poll_tasks(worker, next_time)
242240

243241
def _broadcast(self, worker):
244242
info = self.info()
@@ -247,7 +245,7 @@ def _broadcast(self, worker):
247245
return self.pubsub.publish(consumer_event, info)
248246

249247
def _do_close(self):
250-
if self._closing_msg:
251-
self.logger.warning(self._closing_msg)
248+
self.logger.warning('Closing %s', self)
252249
self.manager.close()
253250
self._closing_waiter.set_result(True)
251+
self._loop.call_later(1, self._loop.stop)

pq/server/producer.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,7 @@ class TaskProducer(models.RegistryMixin, ExecutorMixin):
2929
def __init__(self, cfg, *, logger=None, **kw):
3030
self.cfg = cfg
3131
self.logger = logger or logging.getLogger('pulsar.queue')
32-
self._closing_waiter = None
33-
self._closing = None
32+
self._closing = False
3433
loop = cfg.params.pop('loop', None)
3534
store = create_store(cfg.data_store, loop=loop)
3635
if not cfg.message_broker:
@@ -88,12 +87,17 @@ def flush_queues(self, *queues):
8887
def on_events(self, callback):
8988
self.pubsub.on_events(callback)
9089

90+
def closing(self):
91+
return self._closing
92+
9193
def close(self):
9294
'''Close this :class:`.TaskBackend`.
9395
9496
Invoked by the :class:`.Actor` when stopping.
9597
'''
96-
self.manager.close()
98+
if not self._closing:
99+
self._closing = True
100+
self.manager.close()
97101

98102
def queue_task(self, jobname, callback=True, **kwargs):
99103
'''Try to queue a new :task

tests/app.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ class TaskQueueBase:
2929
# used for both keep-alive and timeout in JsonProxy
3030
# long enough to allow to wait for tasks
3131
rpc_timeout = 500
32+
max_requests = 0
33+
concurrent_tasks = 5
3234
tq_app = None
3335
rpc = None
3436
schedule_periodic = False
@@ -46,15 +48,17 @@ def rpc_name(cls):
4648
async def setUpClass(cls):
4749
# The name of the task queue application
4850
params = cls.params()
49-
params.update(dict(
51+
params.update(
5052
wsgi=True,
5153
schedule_periodic=cls.schedule_periodic,
5254
rpc_bind='127.0.0.1:0',
5355
concurrency=cls.concurrency,
56+
concurrent_tasks=cls.concurrent_tasks,
57+
max_requests=cls.max_requests,
5458
message_serializer=cls.message_serializer,
5559
rpc_concurrency=cls.concurrency,
5660
rpc_keep_alive=cls.rpc_timeout
57-
))
61+
)
5862
pq = api.PulsarQueue(**params)
5963
cfgs = await pq.start()
6064
cls.tq_app = cfgs[0].app()
@@ -285,6 +289,10 @@ async def test_thread_io(self):
285289
self.assertNotEqual(task.result['thread'], threading.get_ident())
286290
self.assertEqual(task.result['text'], 306)
287291

292+
async def test_bad_task(self):
293+
task = await self.tq.queue_task('asynchronous', sleep=2)
294+
self.assertEqual(task.status_string, 'FAILURE')
295+
288296
def _test_sync(self):
289297
loop = asyncio.new_event_loop()
290298
tasks = api.TaskApp(loop=loop, **self.params()).api()

tests/example/manage.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
1-
task_paths = ['sampletasks.*']
2-
3-
4-
def app():
5-
from pq.api import TaskApp
6-
return TaskApp(config=__file__)
1+
task_paths = ['jobs.*', 'pq.jobs']
72

83

94
if __name__ == '__main__': # pragma nocover
105
from pq.api import PulsarQueue
11-
PulsarQueue('taskqueue', config='manage.py').start()
6+
PulsarQueue(name='taskqueue', config=__file__).start()

tests/test_api.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,3 +38,26 @@ def test_format_time(self):
3838
st2 = format_time(timestamp)
3939
self.assertEqual(st, st2)
4040
self.assertEqual(format_time(None), '?')
41+
42+
def test_close(self):
43+
t = api.TaskApp().api()
44+
self.assertEqual(t.closing(), False)
45+
t.close()
46+
self.assertEqual(t.closing(), True)
47+
warn = mock.MagicMock()
48+
t.logger.warning = warn
49+
self.assertFalse(t.queue_task('foo'))
50+
self.assertEqual(warn.call_count, 1)
51+
self.assertEqual(
52+
warn.call_args[0][0],
53+
'Cannot queue task, task backend closing'
54+
)
55+
56+
def test_task_not_available(self):
57+
t = api.TaskApp().api()
58+
self.assertRaises(api.TaskNotAvailable,
59+
t.queue_task, 'jsdbcjsdhbc')
60+
61+
def test_no_queues(self):
62+
t = api.TaskApp().api()
63+
self.assertEqual(t.queues(), ())

tests/test_close.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
"""Tests closing the worker after 10 requests"""
2+
import unittest
3+
import asyncio
4+
from random import random
5+
6+
from tests import app
7+
8+
9+
class TestMsgPackQueue(app.TaskQueueBase, unittest.TestCase):
10+
max_requests = 10
11+
concurrent_tasks = 20
12+
13+
async def test_max_requests(self):
14+
tasks = [self.tq.queue_task('asynchronous', lag=random())
15+
for _ in range(18)]
16+
tasks = await asyncio.gather(*tasks)
17+
self.assertEqual(len(tasks), 18)
18+
workers = set()
19+
for task in tasks:
20+
self.assertEqual(task.status_string, 'SUCCESS')
21+
workers.add(task.worker)
22+
23+
self.assertEqual(len(workers), 2)
24+
25+
# FAILURES
26+
tasks = [self.tq.queue_task('asynchronous', sleep=1)
27+
for _ in range(6)]
28+
29+
tasks = await asyncio.gather(*tasks)
30+
self.assertEqual(len(tasks), 6)
31+
workers = set()
32+
for task in tasks:
33+
self.assertEqual(task.status_string, 'FAILURE')
34+
workers.add(task.worker)
35+
36+
self.assertEqual(len(workers), 2)

0 commit comments

Comments
 (0)