Skip to content

Commit e29a2c0

Browse files
authored
feat: make publish futures compatible with concurrent.futures.as_completed() (#397)
* feat: make futures compatible with as_completed() The futures implementation is adjusted to work well with the built-in function with the same name in `concurrent.futures` package. * Fix two unit tests in pre-Python 3.8 If setting a result/exception on a concurrent.futures.Future object, an exception is raised only in Python3.8+, thus we conditionally disable two unit tests. This behavior change is fine, though, because users should never use the set_result() and set_exception() methods directly. * Cover missing code line with a test * Use double underscore for internal cancelled flag * Prefix manager reference with double underscore * Remove Future's completed parameter altogether This parameter is unlikely to be used by any 3rd party code, but even if it is, it's better to cause a loud error rather than silently changing its effect to a no-op.
1 parent 3ee887c commit e29a2c0

File tree

8 files changed

+127
-201
lines changed

8 files changed

+127
-201
lines changed

google/cloud/pubsub_v1/futures.py

Lines changed: 16 additions & 144 deletions
Original file line numberDiff line numberDiff line change
@@ -14,173 +14,45 @@
1414

1515
from __future__ import absolute_import
1616

17-
import threading
18-
import uuid
17+
import concurrent.futures
1918

2019
import google.api_core.future
21-
from google.cloud.pubsub_v1.publisher import exceptions
2220

2321

24-
class Future(google.api_core.future.Future):
22+
class Future(concurrent.futures.Future, google.api_core.future.Future):
2523
"""Encapsulation of the asynchronous execution of an action.
2624
2725
This object is returned from asychronous Pub/Sub calls, and is the
2826
interface to determine the status of those calls.
2927
3028
This object should not be created directly, but is returned by other
3129
methods in this library.
32-
33-
Args:
34-
completed (Optional[Any]): An event, with the same interface as
35-
:class:`threading.Event`. This is provided so that callers
36-
with different concurrency models (e.g. ``threading`` or
37-
``multiprocessing``) can supply an event that is compatible
38-
with that model. The ``wait()`` and ``set()`` methods will be
39-
used. If this argument is not provided, then a new
40-
:class:`threading.Event` will be created and used.
4130
"""
4231

43-
# This could be a sentinel object or None, but the sentinel object's ID
44-
# can change if the process is forked, and None has the possibility of
45-
# actually being a result.
46-
_SENTINEL = uuid.uuid4()
47-
48-
def __init__(self, completed=None):
49-
self._result = self._SENTINEL
50-
self._exception = self._SENTINEL
51-
self._callbacks = []
52-
if completed is None:
53-
completed = threading.Event()
54-
self._completed = completed
55-
56-
def cancel(self):
57-
"""Actions in Pub/Sub generally may not be canceled.
58-
59-
This method always returns False.
60-
"""
61-
return False
62-
63-
def cancelled(self):
64-
"""Actions in Pub/Sub generally may not be canceled.
65-
66-
This method always returns False.
67-
"""
68-
return False
69-
7032
def running(self):
71-
"""Actions in Pub/Sub generally may not be canceled.
33+
"""Return ``True`` if the associated Pub/Sub action has not yet completed.
7234
73-
Returns:
74-
bool: ``True`` if this method has not yet completed, or
75-
``False`` if it has completed.
35+
Returns: bool:
7636
"""
7737
return not self.done()
7838

79-
def done(self):
80-
"""Return True the future is done, False otherwise.
81-
82-
This still returns True in failure cases; checking :meth:`result` or
83-
:meth:`exception` is the canonical way to assess success or failure.
84-
"""
85-
return self._exception != self._SENTINEL or self._result != self._SENTINEL
86-
87-
def result(self, timeout=None):
88-
"""Resolve the future and return a value where appropriate.
89-
90-
Args:
91-
timeout (Union[int, float]): The number of seconds before this call
92-
times out and raises TimeoutError.
93-
94-
Raises:
95-
concurrent.futures.TimeoutError: If the request times out.
96-
Exception: For undefined exceptions in the underlying
97-
call execution.
98-
"""
99-
# Attempt to get the exception if there is one.
100-
# If there is not one, then we know everything worked, and we can
101-
# return an appropriate value.
102-
err = self.exception(timeout=timeout)
103-
if err is None:
104-
return self._result
105-
raise err
106-
107-
def exception(self, timeout=None):
108-
"""Return the exception raised by the call, if any.
109-
110-
Args:
111-
timeout (Union[int, float]): The number of seconds before this call
112-
times out and raises TimeoutError.
113-
114-
Raises:
115-
concurrent.futures.TimeoutError: If the request times out.
116-
117-
Returns:
118-
Exception: The exception raised by the call, if any.
119-
"""
120-
# Wait until the future is done.
121-
if not self._completed.wait(timeout=timeout):
122-
raise exceptions.TimeoutError("Timed out waiting for result.")
123-
124-
# If the batch completed successfully, this should return None.
125-
if self._result != self._SENTINEL:
126-
return None
127-
128-
# Okay, this batch had an error; this should return it.
129-
return self._exception
130-
131-
def add_done_callback(self, callback):
132-
"""Attach the provided callable to the future.
133-
134-
The provided function is called, with this future as its only argument,
135-
when the future finishes running.
136-
137-
Args:
138-
callback (Callable): The function to call.
139-
140-
Returns:
141-
None
142-
"""
143-
if self.done():
144-
return callback(self)
145-
self._callbacks.append(callback)
39+
def set_running_or_notify_cancel(self):
40+
raise NotImplementedError(
41+
"Only used by executors from `concurrent.futures` package."
42+
)
14643

14744
def set_result(self, result):
148-
"""Set the result of the future to the provided result.
45+
"""Set the return value of work associated with the future.
14946
150-
Args:
151-
result (Any): The result
47+
Do not use this method, it should only be used internally by the library and its
48+
unit tests.
15249
"""
153-
# Sanity check: A future can only complete once.
154-
if self.done():
155-
raise RuntimeError("set_result can only be called once.")
156-
157-
# Set the result and trigger the future.
158-
self._result = result
159-
self._trigger()
50+
return super().set_result(result=result)
16051

16152
def set_exception(self, exception):
162-
"""Set the result of the future to the given exception.
163-
164-
Args:
165-
exception (:exc:`Exception`): The exception raised.
166-
"""
167-
# Sanity check: A future can only complete once.
168-
if self.done():
169-
raise RuntimeError("set_exception can only be called once.")
170-
171-
# Set the exception and trigger the future.
172-
self._exception = exception
173-
self._trigger()
174-
175-
def _trigger(self):
176-
"""Trigger all callbacks registered to this Future.
177-
178-
This method is called internally by the batch once the batch
179-
completes.
53+
"""Set the result of the future as being the given exception.
18054
181-
Args:
182-
message_id (str): The message ID, as a string.
55+
Do not use this method, it should only be used internally by the library and its
56+
unit tests.
18357
"""
184-
self._completed.set()
185-
for callback in self._callbacks:
186-
callback(self)
58+
return super().set_exception(exception=exception)

google/cloud/pubsub_v1/publisher/_batch/thread.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ def publish(self, message):
380380

381381
# Track the future on this batch (so that the result of the
382382
# future can be set).
383-
future = futures.Future(completed=threading.Event())
383+
future = futures.Future()
384384
self._futures.append(future)
385385

386386
# Try to commit, but it must be **without** the lock held, since

google/cloud/pubsub_v1/publisher/futures.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,20 @@ class Future(futures.Future):
2525
ID, unless an error occurs.
2626
"""
2727

28+
def cancel(self):
29+
"""Actions in Pub/Sub generally may not be canceled.
30+
31+
This method always returns ``False``.
32+
"""
33+
return False
34+
35+
def cancelled(self):
36+
"""Actions in Pub/Sub generally may not be canceled.
37+
38+
This method always returns ``False``.
39+
"""
40+
return False
41+
2842
def result(self, timeout=None):
2943
"""Return the message ID or raise an exception.
3044
@@ -43,10 +57,4 @@ def result(self, timeout=None):
4357
Exception: For undefined exceptions in the underlying
4458
call execution.
4559
"""
46-
# Attempt to get the exception if there is one.
47-
# If there is not one, then we know everything worked, and we can
48-
# return an appropriate value.
49-
err = self.exception(timeout=timeout)
50-
if err is None:
51-
return self._result
52-
raise err
60+
return super().result(timeout=timeout)

google/cloud/pubsub_v1/subscriber/futures.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ class StreamingPullFuture(futures.Future):
2828

2929
def __init__(self, manager):
3030
super(StreamingPullFuture, self).__init__()
31-
self._manager = manager
32-
self._manager.add_close_callback(self._on_close_callback)
33-
self._cancelled = False
31+
self.__manager = manager
32+
self.__manager.add_close_callback(self._on_close_callback)
33+
self.__cancelled = False
3434

3535
def _on_close_callback(self, manager, result):
3636
if self.done():
@@ -47,12 +47,14 @@ def cancel(self):
4747
"""Stops pulling messages and shutdowns the background thread consuming
4848
messages.
4949
"""
50-
self._cancelled = True
51-
return self._manager.close()
50+
# NOTE: We circumvent the base future's self._state to track the cancellation
51+
# state, as this state has different meaning with streaming pull futures.
52+
self.__cancelled = True
53+
return self.__manager.close()
5254

5355
def cancelled(self):
5456
"""
5557
returns:
5658
bool: ``True`` if the subscription has been cancelled.
5759
"""
58-
return self._cancelled
60+
return self.__cancelled

tests/unit/pubsub_v1/publisher/test_futures_publisher.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,14 @@
2020

2121

2222
class TestFuture(object):
23+
def test_cancel(self):
24+
future = futures.Future()
25+
assert future.cancel() is False
26+
27+
def test_cancelled(self):
28+
future = futures.Future()
29+
assert future.cancelled() is False
30+
2331
def test_result_on_success(self):
2432
future = futures.Future()
2533
future.set_result("570307942214048")

tests/unit/pubsub_v1/subscriber/test_futures_subscriber.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,13 +31,12 @@ def make_future(self):
3131

3232
def test_default_state(self):
3333
future = self.make_future()
34+
manager = future._StreamingPullFuture__manager
3435

3536
assert future.running()
3637
assert not future.done()
3738
assert not future.cancelled()
38-
future._manager.add_close_callback.assert_called_once_with(
39-
future._on_close_callback
40-
)
39+
manager.add_close_callback.assert_called_once_with(future._on_close_callback)
4140

4241
def test__on_close_callback_success(self):
4342
future = self.make_future()
@@ -71,8 +70,9 @@ def test__on_close_callback_future_already_done(self):
7170

7271
def test_cancel(self):
7372
future = self.make_future()
73+
manager = future._StreamingPullFuture__manager
7474

7575
future.cancel()
7676

77-
future._manager.close.assert_called_once()
77+
manager.close.assert_called_once()
7878
assert future.cancelled()

tests/unit/pubsub_v1/subscriber/test_subscriber_client.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,8 @@ def test_subscribe(manager_open, creds):
137137
future = client.subscribe("sub_name_a", callback=mock.sentinel.callback)
138138
assert isinstance(future, futures.StreamingPullFuture)
139139

140-
assert future._manager._subscription == "sub_name_a"
140+
manager = future._StreamingPullFuture__manager
141+
assert manager._subscription == "sub_name_a"
141142
manager_open.assert_called_once_with(
142143
mock.ANY,
143144
callback=mock.sentinel.callback,
@@ -164,10 +165,11 @@ def test_subscribe_options(manager_open, creds):
164165
)
165166
assert isinstance(future, futures.StreamingPullFuture)
166167

167-
assert future._manager._subscription == "sub_name_a"
168-
assert future._manager.flow_control == flow_control
169-
assert future._manager._scheduler == scheduler
170-
assert future._manager._await_callbacks_on_shutdown is mock.sentinel.await_callbacks
168+
manager = future._StreamingPullFuture__manager
169+
assert manager._subscription == "sub_name_a"
170+
assert manager.flow_control == flow_control
171+
assert manager._scheduler == scheduler
172+
assert manager._await_callbacks_on_shutdown is mock.sentinel.await_callbacks
171173
manager_open.assert_called_once_with(
172174
mock.ANY,
173175
callback=mock.sentinel.callback,

0 commit comments

Comments
 (0)