Skip to content

Commit 358a1d8

Browse files
fix: Numerous small performance and correctness issues (#211)
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-pubsublite/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes #<issue_number_goes_here> 🦕
1 parent 1248cd8 commit 358a1d8

15 files changed

+177
-182
lines changed

google/cloud/pubsublite/cloudpubsub/internal/client_multiplexer.py

Lines changed: 36 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -16,40 +16,33 @@
1616
import threading
1717
from typing import Generic, TypeVar, Callable, Dict, Awaitable
1818

19-
from google.api_core.exceptions import FailedPrecondition
20-
2119
_Key = TypeVar("_Key")
2220
_Client = TypeVar("_Client")
2321

2422

2523
class ClientMultiplexer(Generic[_Key, _Client]):
26-
_OpenedClientFactory = Callable[[], _Client]
24+
_OpenedClientFactory = Callable[[_Key], _Client]
2725
_ClientCloser = Callable[[_Client], None]
2826

27+
_factory: _OpenedClientFactory
2928
_closer: _ClientCloser
3029
_lock: threading.Lock
3130
_live_clients: Dict[_Key, _Client]
3231

3332
def __init__(
34-
self, closer: _ClientCloser = lambda client: client.__exit__(None, None, None)
33+
self,
34+
factory: _OpenedClientFactory,
35+
closer: _ClientCloser = lambda client: client.__exit__(None, None, None),
3536
):
37+
self._factory = factory
3638
self._closer = closer
3739
self._lock = threading.Lock()
3840
self._live_clients = {}
3941

40-
def get_or_create(self, key: _Key, factory: _OpenedClientFactory) -> _Client:
42+
def get_or_create(self, key: _Key) -> _Client:
4143
with self._lock:
4244
if key not in self._live_clients:
43-
self._live_clients[key] = factory()
44-
return self._live_clients[key]
45-
46-
def create_or_fail(self, key: _Key, factory: _OpenedClientFactory) -> _Client:
47-
with self._lock:
48-
if key in self._live_clients:
49-
raise FailedPrecondition(
50-
f"Cannot create two clients with the same key. {_Key}"
51-
)
52-
self._live_clients[key] = factory()
45+
self._live_clients[key] = self._factory(key)
5346
return self._live_clients[key]
5447

5548
def try_erase(self, key: _Key, client: _Client):
@@ -75,52 +68,49 @@ def __exit__(self, exc_type, exc_val, exc_tb):
7568

7669

7770
class AsyncClientMultiplexer(Generic[_Key, _Client]):
78-
_OpenedClientFactory = Callable[[], Awaitable[_Client]]
71+
_OpenedClientFactory = Callable[[_Key], Awaitable[_Client]]
7972
_ClientCloser = Callable[[_Client], Awaitable[None]]
8073

74+
_factory: _OpenedClientFactory
8175
_closer: _ClientCloser
82-
_lock: asyncio.Lock
83-
_live_clients: Dict[_Key, _Client]
76+
_live_clients: Dict[_Key, Awaitable[_Client]]
8477

8578
def __init__(
86-
self, closer: _ClientCloser = lambda client: client.__aexit__(None, None, None)
79+
self,
80+
factory: _OpenedClientFactory,
81+
closer: _ClientCloser = lambda client: client.__aexit__(None, None, None),
8782
):
83+
self._factory = factory
8884
self._closer = closer
8985
self._live_clients = {}
9086

91-
async def get_or_create(self, key: _Key, factory: _OpenedClientFactory) -> _Client:
92-
async with self._lock:
93-
if key not in self._live_clients:
94-
self._live_clients[key] = await factory()
95-
return self._live_clients[key]
96-
97-
async def create_or_fail(self, key: _Key, factory: _OpenedClientFactory) -> _Client:
98-
async with self._lock:
99-
if key in self._live_clients:
100-
raise FailedPrecondition(
101-
f"Cannot create two clients with the same key. {_Key}"
102-
)
103-
self._live_clients[key] = await factory()
104-
return self._live_clients[key]
87+
async def get_or_create(self, key: _Key) -> _Client:
88+
if key not in self._live_clients:
89+
self._live_clients[key] = asyncio.ensure_future(self._factory(key))
90+
return await self._live_clients[key]
10591

10692
async def try_erase(self, key: _Key, client: _Client):
107-
async with self._lock:
108-
if key not in self._live_clients:
109-
return
110-
current_client = self._live_clients[key]
111-
if current_client is not client:
112-
return
113-
del self._live_clients[key]
93+
if key not in self._live_clients:
94+
return
95+
client_future = self._live_clients[key]
96+
current_client = await client_future
97+
if current_client is not client:
98+
return
99+
# duplicate check after await that no one raced with us
100+
if (
101+
key not in self._live_clients
102+
or self._live_clients[key] is not client_future
103+
):
104+
return
105+
del self._live_clients[key]
114106
await self._closer(client)
115107

116108
async def __aenter__(self):
117-
self._lock = asyncio.Lock()
118109
return self
119110

120111
async def __aexit__(self, exc_type, exc_val, exc_tb):
121-
live_clients: Dict[_Key, _Client]
122-
async with self._lock:
123-
live_clients = self._live_clients
124-
self._live_clients = {}
112+
live_clients: Dict[_Key, Awaitable[_Client]]
113+
live_clients = self._live_clients
114+
self._live_clients = {}
125115
for topic, client in live_clients.items():
126-
await self._closer(client)
116+
await self._closer(await client)

google/cloud/pubsublite/cloudpubsub/internal/managed_event_loop.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ class ManagedEventLoop(ContextManager):
2222
_loop: AbstractEventLoop
2323
_thread: Thread
2424

25-
def __init__(self):
25+
def __init__(self, name=None):
2626
self._loop = new_event_loop()
27-
self._thread = Thread(target=lambda: self._loop.run_forever())
27+
self._thread = Thread(target=lambda: self._loop.run_forever(), name=name)
2828

2929
def __enter__(self):
3030
self._thread.start()

google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_publisher_client.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,14 @@ class MultiplexedAsyncPublisherClient(AsyncPublisherClientInterface):
3838

3939
def __init__(self, publisher_factory: AsyncPublisherFactory):
4040
self._publisher_factory = publisher_factory
41-
self._multiplexer = AsyncClientMultiplexer()
41+
self._multiplexer = AsyncClientMultiplexer(
42+
lambda topic: self._create_and_open(topic)
43+
)
44+
45+
async def _create_and_open(self, topic: TopicPath):
46+
client = self._publisher_factory(topic)
47+
await client.__aenter__()
48+
return client
4249

4350
@overrides
4451
async def publish(
@@ -51,12 +58,7 @@ async def publish(
5158
if isinstance(topic, str):
5259
topic = TopicPath.parse(topic)
5360

54-
async def create_and_open():
55-
client = self._publisher_factory(topic)
56-
await client.__aenter__()
57-
return client
58-
59-
publisher = await self._multiplexer.get_or_create(topic, create_and_open)
61+
publisher = await self._multiplexer.get_or_create(topic)
6062
try:
6163
return await publisher.publish(
6264
data=data, ordering_key=ordering_key, **attrs

google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,6 @@
2323

2424
from google.cloud.pubsub_v1.subscriber.message import Message
2525

26-
from google.cloud.pubsublite.cloudpubsub.internal.client_multiplexer import (
27-
AsyncClientMultiplexer,
28-
)
2926
from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import (
3027
AsyncSubscriberFactory,
3128
AsyncSingleSubscriber,
@@ -66,11 +63,11 @@ def __aiter__(self):
6663

6764
class MultiplexedAsyncSubscriberClient(AsyncSubscriberClientInterface):
6865
_underlying_factory: AsyncSubscriberFactory
69-
_multiplexer: AsyncClientMultiplexer[SubscriptionPath, AsyncSingleSubscriber]
66+
_live_clients: Set[AsyncSingleSubscriber]
7067

7168
def __init__(self, underlying_factory: AsyncSubscriberFactory):
7269
self._underlying_factory = underlying_factory
73-
self._multiplexer = AsyncClientMultiplexer()
70+
self._live_clients = set()
7471

7572
@overrides
7673
async def subscribe(
@@ -82,25 +79,28 @@ async def subscribe(
8279
if isinstance(subscription, str):
8380
subscription = SubscriptionPath.parse(subscription)
8481

85-
async def create_and_open():
86-
client = self._underlying_factory(
87-
subscription, fixed_partitions, per_partition_flow_control_settings
88-
)
89-
await client.__aenter__()
90-
return client
91-
92-
subscriber = await self._multiplexer.get_or_create(
93-
subscription, create_and_open
82+
subscriber = self._underlying_factory(
83+
subscription, fixed_partitions, per_partition_flow_control_settings
9484
)
85+
await subscriber.__aenter__()
86+
self._live_clients.add(subscriber)
87+
9588
return _SubscriberAsyncIterator(
96-
subscriber, lambda: self._multiplexer.try_erase(subscription, subscriber)
89+
subscriber, lambda: self._try_remove_client(subscriber)
9790
)
9891

9992
@overrides
10093
async def __aenter__(self):
101-
await self._multiplexer.__aenter__()
10294
return self
10395

96+
async def _try_remove_client(self, client: AsyncSingleSubscriber):
97+
if client in self._live_clients:
98+
self._live_clients.remove(client)
99+
await client.__aexit__(None, None, None)
100+
104101
@overrides
105102
async def __aexit__(self, exc_type, exc_value, traceback):
106-
await self._multiplexer.__aexit__(exc_type, exc_value, traceback)
103+
live_clients = self._live_clients
104+
self._live_clients = set()
105+
for client in live_clients:
106+
await client.__aexit__(None, None, None)

google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,9 @@ class MultiplexedPublisherClient(PublisherClientInterface):
3838

3939
def __init__(self, publisher_factory: PublisherFactory):
4040
self._publisher_factory = publisher_factory
41-
self._multiplexer = ClientMultiplexer()
41+
self._multiplexer = ClientMultiplexer(
42+
lambda topic: self._create_and_start_publisher(topic)
43+
)
4244

4345
@overrides
4446
def publish(
@@ -51,9 +53,7 @@ def publish(
5153
if isinstance(topic, str):
5254
topic = TopicPath.parse(topic)
5355
try:
54-
publisher = self._multiplexer.get_or_create(
55-
topic, lambda: self._create_and_start_publisher(topic)
56-
)
56+
publisher = self._multiplexer.get_or_create(topic)
5757
except GoogleAPICallError as e:
5858
failed = Future()
5959
failed.set_exception(e)

google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,10 @@
1414

1515
from concurrent.futures.thread import ThreadPoolExecutor
1616
from typing import Union, Optional, Set
17+
from threading import Lock
1718

1819
from google.cloud.pubsub_v1.subscriber.futures import StreamingPullFuture
1920

20-
from google.cloud.pubsublite.cloudpubsub.internal.client_multiplexer import (
21-
ClientMultiplexer,
22-
)
2321
from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import (
2422
AsyncSubscriberFactory,
2523
)
@@ -40,22 +38,16 @@ class MultiplexedSubscriberClient(SubscriberClientInterface):
4038
_executor: ThreadPoolExecutor
4139
_underlying_factory: AsyncSubscriberFactory
4240

43-
_multiplexer: ClientMultiplexer[SubscriptionPath, StreamingPullFuture]
41+
_lock: Lock
42+
_live_clients: Set[StreamingPullFuture]
4443

4544
def __init__(
4645
self, executor: ThreadPoolExecutor, underlying_factory: AsyncSubscriberFactory
4746
):
4847
self._executor = executor
4948
self._underlying_factory = underlying_factory
50-
51-
def cancel_streaming_pull_future(fut: StreamingPullFuture):
52-
try:
53-
fut.cancel()
54-
fut.result()
55-
except: # noqa: E722
56-
pass
57-
58-
self._multiplexer = ClientMultiplexer(cancel_streaming_pull_future)
49+
self._lock = Lock()
50+
self._live_clients = set()
5951

6052
@overrides
6153
def subscribe(
@@ -68,28 +60,40 @@ def subscribe(
6860
if isinstance(subscription, str):
6961
subscription = SubscriptionPath.parse(subscription)
7062

71-
def create_and_open():
72-
underlying = self._underlying_factory(
73-
subscription, fixed_partitions, per_partition_flow_control_settings
74-
)
75-
subscriber = SubscriberImpl(underlying, callback, self._executor)
76-
future = StreamingPullFuture(subscriber)
77-
subscriber.__enter__()
78-
return future
79-
80-
future = self._multiplexer.create_or_fail(subscription, create_and_open)
81-
future.add_done_callback(
82-
lambda fut: self._multiplexer.try_erase(subscription, future)
63+
underlying = self._underlying_factory(
64+
subscription, fixed_partitions, per_partition_flow_control_settings
8365
)
66+
subscriber = SubscriberImpl(underlying, callback, self._executor)
67+
future = StreamingPullFuture(subscriber)
68+
subscriber.__enter__()
69+
future.add_done_callback(lambda fut: self._try_remove_client(future))
8470
return future
8571

72+
@staticmethod
73+
def _cancel_streaming_pull_future(fut: StreamingPullFuture):
74+
try:
75+
fut.cancel()
76+
fut.result()
77+
except: # noqa: E722
78+
pass
79+
80+
def _try_remove_client(self, future: StreamingPullFuture):
81+
with self._lock:
82+
if future not in self._live_clients:
83+
return
84+
self._live_clients.remove(future)
85+
self._cancel_streaming_pull_future(future)
86+
8687
@overrides
8788
def __enter__(self):
8889
self._executor.__enter__()
89-
self._multiplexer.__enter__()
9090
return self
9191

9292
@overrides
9393
def __exit__(self, exc_type, exc_value, traceback):
94-
self._multiplexer.__exit__(exc_type, exc_value, traceback)
94+
with self._lock:
95+
live_clients = self._live_clients
96+
self._live_clients = set()
97+
for client in live_clients:
98+
self._cancel_streaming_pull_future(client)
9599
self._executor.__exit__(exc_type, exc_value, traceback)

google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class SinglePublisherImpl(SinglePublisher):
3030

3131
def __init__(self, underlying: AsyncSinglePublisher):
3232
super().__init__()
33-
self._managed_loop = ManagedEventLoop()
33+
self._managed_loop = ManagedEventLoop("PublisherLoopThread")
3434
self._underlying = underlying
3535

3636
def publish(

google/cloud/pubsublite/cloudpubsub/internal/subscriber_impl.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ def __init__(
5454
self._underlying = underlying
5555
self._callback = callback
5656
self._unowned_executor = unowned_executor
57-
self._event_loop = ManagedEventLoop()
57+
self._event_loop = ManagedEventLoop("SubscriberLoopThread")
5858
self._close_lock = threading.Lock()
5959
self._failure = None
6060
self._close_callback = None

0 commit comments

Comments
 (0)