Skip to content

Commit 7d2138a

Browse files
authored
Add heartbeating to the streaming pull manager (googleapis#5413)
1 parent 0f2b1af commit 7d2138a

File tree

4 files changed

+250
-7
lines changed

4 files changed

+250
-7
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# Copyright 2018, Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from __future__ import absolute_import
16+
17+
import logging
18+
import threading
19+
20+
21+
_LOGGER = logging.getLogger(__name__)
22+
_HEARTBEAT_WORKER_NAME = 'Thread-Heartbeater'
23+
# How often to send heartbeats in seconds. Determined as half the period of
24+
# time where the Pub/Sub server will close the stream as inactive, which is
25+
# 60 seconds.
26+
_DEFAULT_PERIOD = 30
27+
28+
29+
class Heartbeater(object):
30+
def __init__(self, manager, period=_DEFAULT_PERIOD):
31+
self._thread = None
32+
self._operational_lock = threading.Lock()
33+
self._manager = manager
34+
self._stop_event = threading.Event()
35+
self._period = period
36+
37+
def heartbeat(self):
38+
"""Periodically send heartbeats."""
39+
while self._manager.is_active and not self._stop_event.is_set():
40+
self._manager.heartbeat()
41+
_LOGGER.debug('Sent heartbeat.')
42+
self._stop_event.wait(timeout=self._period)
43+
44+
_LOGGER.info('%s exiting.', _HEARTBEAT_WORKER_NAME)
45+
46+
def start(self):
47+
with self._operational_lock:
48+
if self._thread is not None:
49+
raise ValueError('Heartbeater is already running.')
50+
51+
# Create and start the helper thread.
52+
self._stop_event.clear()
53+
thread = threading.Thread(
54+
name=_HEARTBEAT_WORKER_NAME,
55+
target=self.heartbeat)
56+
thread.daemon = True
57+
thread.start()
58+
_LOGGER.debug('Started helper thread %s', thread.name)
59+
self._thread = thread
60+
61+
def stop(self):
62+
with self._operational_lock:
63+
self._stop_event.set()
64+
65+
if self._thread is not None:
66+
# The thread should automatically exit when the consumer is
67+
# inactive.
68+
self._thread.join()
69+
70+
self._thread = None

pubsub/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from google.cloud.pubsub_v1 import types
2727
from google.cloud.pubsub_v1.subscriber._protocol import bidi
2828
from google.cloud.pubsub_v1.subscriber._protocol import dispatcher
29+
from google.cloud.pubsub_v1.subscriber._protocol import heartbeater
2930
from google.cloud.pubsub_v1.subscriber._protocol import histogram
3031
from google.cloud.pubsub_v1.subscriber._protocol import leaser
3132
from google.cloud.pubsub_v1.subscriber._protocol import requests
@@ -115,6 +116,7 @@ def __init__(self, client, subscription, flow_control=types.FlowControl(),
115116
self._dispatcher = None
116117
self._leaser = None
117118
self._consumer = None
119+
self._heartbeater = None
118120

119121
@property
120122
def is_active(self):
@@ -263,6 +265,15 @@ def send(self, request):
263265
else:
264266
self._rpc.send(request)
265267

268+
def heartbeat(self):
269+
"""Sends an empty request over the streaming pull RPC.
270+
271+
This always sends over the stream, regardless of if
272+
``self._UNARY_REQUESTS`` is set or not.
273+
"""
274+
if self._rpc is not None and self._rpc.is_active:
275+
self._rpc.send(types.StreamingPullRequest())
276+
266277
def open(self, callback):
267278
"""Begin consuming messages.
268279
@@ -292,6 +303,7 @@ def open(self, callback):
292303
self._consumer = bidi.BackgroundConsumer(
293304
self._rpc, self._on_response)
294305
self._leaser = leaser.Leaser(self)
306+
self._heartbeater = heartbeater.Heartbeater(self)
295307

296308
# Start the thread to pass the requests.
297309
self._dispatcher.start()
@@ -302,6 +314,9 @@ def open(self, callback):
302314
# Start the lease maintainer thread.
303315
self._leaser.start()
304316

317+
# Start the stream heartbeater thread.
318+
self._heartbeater.start()
319+
305320
def close(self, reason=None):
306321
"""Stop consuming messages and shutdown all helper threads.
307322
@@ -332,6 +347,9 @@ def close(self, reason=None):
332347
_LOGGER.debug('Stopping dispatcher.')
333348
self._dispatcher.stop()
334349
self._dispatcher = None
350+
_LOGGER.debug('Stopping heartbeater.')
351+
self._heartbeater.stop()
352+
self._heartbeater = None
335353

336354
self._rpc = None
337355
self._closed = True
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
# Copyright 2018, Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
import logging
16+
import threading
17+
18+
from google.cloud.pubsub_v1.subscriber._protocol import heartbeater
19+
from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager
20+
21+
import mock
22+
import pytest
23+
24+
25+
def test_heartbeat_inactive(caplog):
26+
caplog.set_level(logging.INFO)
27+
manager = mock.create_autospec(
28+
streaming_pull_manager.StreamingPullManager, instance=True)
29+
manager.is_active = False
30+
31+
heartbeater_ = heartbeater.Heartbeater(manager)
32+
33+
heartbeater_.heartbeat()
34+
35+
assert 'exiting' in caplog.text
36+
37+
38+
def test_heartbeat_stopped(caplog):
39+
caplog.set_level(logging.INFO)
40+
manager = mock.create_autospec(
41+
streaming_pull_manager.StreamingPullManager, instance=True)
42+
43+
heartbeater_ = heartbeater.Heartbeater(manager)
44+
heartbeater_.stop()
45+
46+
heartbeater_.heartbeat()
47+
48+
assert 'exiting' in caplog.text
49+
50+
51+
def make_sleep_mark_manager_as_inactive(heartbeater):
52+
# Make sleep mark the manager as inactive so that heartbeat()
53+
# exits at the end of the first run.
54+
def trigger_inactive(timeout):
55+
assert timeout
56+
heartbeater._manager.is_active = False
57+
58+
heartbeater._stop_event.wait = trigger_inactive
59+
60+
61+
def test_heartbeat_once():
62+
manager = mock.create_autospec(
63+
streaming_pull_manager.StreamingPullManager, instance=True)
64+
heartbeater_ = heartbeater.Heartbeater(manager)
65+
make_sleep_mark_manager_as_inactive(heartbeater_)
66+
67+
heartbeater_.heartbeat()
68+
69+
manager.heartbeat.assert_called_once()
70+
71+
72+
@mock.patch('threading.Thread', autospec=True)
73+
def test_start(thread):
74+
manager = mock.create_autospec(
75+
streaming_pull_manager.StreamingPullManager, instance=True)
76+
heartbeater_ = heartbeater.Heartbeater(manager)
77+
78+
heartbeater_.start()
79+
80+
thread.assert_called_once_with(
81+
name=heartbeater._HEARTBEAT_WORKER_NAME,
82+
target=heartbeater_.heartbeat)
83+
84+
thread.return_value.start.assert_called_once()
85+
86+
assert heartbeater_._thread is not None
87+
88+
89+
@mock.patch('threading.Thread', autospec=True)
90+
def test_start_already_started(thread):
91+
manager = mock.create_autospec(
92+
streaming_pull_manager.StreamingPullManager, instance=True)
93+
heartbeater_ = heartbeater.Heartbeater(manager)
94+
heartbeater_._thread = mock.sentinel.thread
95+
96+
with pytest.raises(ValueError):
97+
heartbeater_.start()
98+
99+
thread.assert_not_called()
100+
101+
102+
def test_stop():
103+
manager = mock.create_autospec(
104+
streaming_pull_manager.StreamingPullManager, instance=True)
105+
heartbeater_ = heartbeater.Heartbeater(manager)
106+
thread = mock.create_autospec(threading.Thread, instance=True)
107+
heartbeater_._thread = thread
108+
109+
heartbeater_.stop()
110+
111+
assert heartbeater_._stop_event.is_set()
112+
thread.join.assert_called_once()
113+
assert heartbeater_._thread is None
114+
115+
116+
def test_stop_no_join():
117+
heartbeater_ = heartbeater.Heartbeater(mock.sentinel.manager)
118+
119+
heartbeater_.stop()

pubsub/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

Lines changed: 43 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from google.cloud.pubsub_v1.subscriber import scheduler
2424
from google.cloud.pubsub_v1.subscriber._protocol import bidi
2525
from google.cloud.pubsub_v1.subscriber._protocol import dispatcher
26+
from google.cloud.pubsub_v1.subscriber._protocol import heartbeater
2627
from google.cloud.pubsub_v1.subscriber._protocol import leaser
2728
from google.cloud.pubsub_v1.subscriber._protocol import requests
2829
from google.cloud.pubsub_v1.subscriber._protocol import streaming_pull_manager
@@ -216,6 +217,26 @@ def test_send_streaming():
216217
manager._rpc.send.assert_called_once_with(mock.sentinel.request)
217218

218219

220+
def test_heartbeat():
221+
manager = make_manager()
222+
manager._rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
223+
manager._rpc.is_active = True
224+
225+
manager.heartbeat()
226+
227+
manager._rpc.send.assert_called_once_with(types.StreamingPullRequest())
228+
229+
230+
def test_heartbeat_inactive():
231+
manager = make_manager()
232+
manager._rpc = mock.create_autospec(bidi.BidiRpc, instance=True)
233+
manager._rpc.is_active = False
234+
235+
manager.heartbeat()
236+
237+
manager._rpc.send.assert_not_called()
238+
239+
219240
@mock.patch(
220241
'google.cloud.pubsub_v1.subscriber._protocol.bidi.ResumableBidiRpc',
221242
autospec=True)
@@ -228,11 +249,20 @@ def test_send_streaming():
228249
@mock.patch(
229250
'google.cloud.pubsub_v1.subscriber._protocol.dispatcher.Dispatcher',
230251
autospec=True)
231-
def test_open(dispatcher, leaser, background_consumer, resumable_bidi_rpc):
252+
@mock.patch(
253+
'google.cloud.pubsub_v1.subscriber._protocol.heartbeater.Heartbeater',
254+
autospec=True)
255+
def test_open(
256+
heartbeater, dispatcher, leaser, background_consumer,
257+
resumable_bidi_rpc):
232258
manager = make_manager()
233259

234260
manager.open(mock.sentinel.callback)
235261

262+
heartbeater.assert_called_once_with(manager)
263+
heartbeater.return_value.start.assert_called_once()
264+
assert manager._heartbeater == heartbeater.return_value
265+
236266
dispatcher.assert_called_once_with(manager, manager._scheduler.queue)
237267
dispatcher.return_value.start.assert_called_once()
238268
assert manager._dispatcher == dispatcher.return_value
@@ -285,39 +315,45 @@ def make_running_manager():
285315
dispatcher.Dispatcher, instance=True)
286316
manager._leaser = mock.create_autospec(
287317
leaser.Leaser, instance=True)
318+
manager._heartbeater = mock.create_autospec(
319+
heartbeater.Heartbeater, instance=True)
288320

289321
return (
290322
manager, manager._consumer, manager._dispatcher, manager._leaser,
291-
manager._scheduler)
323+
manager._heartbeater, manager._scheduler)
292324

293325

294326
def test_close():
295-
manager, consumer, dispatcher, leaser, scheduler = make_running_manager()
327+
manager, consumer, dispatcher, leaser, heartbeater, scheduler = (
328+
make_running_manager())
296329

297330
manager.close()
298331

299332
consumer.stop.assert_called_once()
300333
leaser.stop.assert_called_once()
301334
dispatcher.stop.assert_called_once()
335+
heartbeater.stop.assert_called_once()
302336
scheduler.shutdown.assert_called_once()
303337

304338
assert manager.is_active is False
305339

306340

307341
def test_close_inactive_consumer():
308-
manager, consumer, dispatcher, leaser, scheduler = make_running_manager()
342+
manager, consumer, dispatcher, leaser, heartbeater, scheduler = (
343+
make_running_manager())
309344
consumer.is_active = False
310345

311346
manager.close()
312347

313348
consumer.stop.assert_not_called()
314349
leaser.stop.assert_called_once()
315350
dispatcher.stop.assert_called_once()
351+
heartbeater.stop.assert_called_once()
316352
scheduler.shutdown.assert_called_once()
317353

318354

319355
def test_close_idempotent():
320-
manager, _, _, _, scheduler = make_running_manager()
356+
manager, _, _, _, _, scheduler = make_running_manager()
321357

322358
manager.close()
323359
manager.close()
@@ -326,7 +362,7 @@ def test_close_idempotent():
326362

327363

328364
def test_close_callbacks():
329-
manager, _, _, _, _ = make_running_manager()
365+
manager, _, _, _, _, _ = make_running_manager()
330366

331367
callback = mock.Mock()
332368

@@ -352,7 +388,7 @@ def test__get_initial_request():
352388

353389

354390
def test_on_response():
355-
manager, _, dispatcher, _, scheduler = make_running_manager()
391+
manager, _, dispatcher, _, _, scheduler = make_running_manager()
356392
manager._callback = mock.sentinel.callback
357393

358394
# Set up the messages.

0 commit comments

Comments
 (0)