Skip to content

Commit 04e261c

Browse files
feat: Add support for server-side flow control (#143)
* chore: Remove notes about ordering keys being experimental. * Revert "chore: Remove notes about ordering keys being experimental." This reverts commit 38b2a3e. * feat: Add support for server-side flow control * Add unit test for flow control
1 parent 1cb6746 commit 04e261c

File tree

2 files changed

+11
-0
lines changed

2 files changed

+11
-0
lines changed

google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,8 @@ def _get_initial_request(self, stream_ack_deadline_seconds):
575575
stream_ack_deadline_seconds=stream_ack_deadline_seconds,
576576
subscription=self._subscription,
577577
client_id=self._client_id,
578+
max_outstanding_messages=self._flow_control.max_messages,
579+
max_outstanding_bytes=self._flow_control.max_bytes,
578580
)
579581

580582
# Return the initial request.

tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,15 @@ def test_client_id():
159159
assert client_id_1 != client_id_2
160160

161161

162+
def test_streaming_flow_control():
163+
manager = make_manager(
164+
flow_control=types.FlowControl(max_messages=10, max_bytes=1000)
165+
)
166+
request = manager._get_initial_request(stream_ack_deadline_seconds=10)
167+
assert request.max_outstanding_messages == 10
168+
assert request.max_outstanding_bytes == 1000
169+
170+
162171
def test_ack_deadline_with_max_duration_per_lease_extension():
163172
manager = make_manager()
164173
manager._flow_control = types.FlowControl(max_duration_per_lease_extension=5)

0 commit comments

Comments
 (0)