Skip to content

Commit dab13d5

Browse files
authored
fix: mypy errors (#622)
* fix: mypy errors * fixing linter errors * addressing review comments * fixing test coverage * fixing test comments * fixing test comments * linting for test * fixing test comment
1 parent 8f02850 commit dab13d5

File tree

2 files changed

+75
-10
lines changed

2 files changed

+75
-10
lines changed

google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import logging
2121
import threading
2222
import typing
23-
from typing import Any, Callable, Iterable, List, Optional, Tuple, Union
23+
from typing import Any, Dict, Callable, Iterable, List, Optional, Tuple, Union
2424
import uuid
2525

2626
import grpc # type: ignore
@@ -49,7 +49,6 @@
4949

5050
if typing.TYPE_CHECKING: # pragma: NO COVER
5151
from google.cloud.pubsub_v1 import subscriber
52-
from google.protobuf.internal import containers
5352

5453

5554
_LOGGER = logging.getLogger(__name__)
@@ -141,9 +140,7 @@ def _get_status(exc: exceptions.GoogleAPICallError,) -> Optional["status_pb2.Sta
141140
return None
142141

143142

144-
def _get_ack_errors(
145-
exc: exceptions.GoogleAPICallError,
146-
) -> Optional["containers.ScalarMap"]:
143+
def _get_ack_errors(exc: exceptions.GoogleAPICallError,) -> Optional[Dict[str, str]]:
147144
status = _get_status(exc)
148145
if not status:
149146
_LOGGER.debug("Unable to get status of errored RPC.")
@@ -159,8 +156,8 @@ def _get_ack_errors(
159156

160157
def _process_requests(
161158
error_status: Optional["status_pb2.Status"],
162-
ack_reqs_dict: "containers.ScalarMap",
163-
errors_dict: Optional["containers.ScalarMap"],
159+
ack_reqs_dict: Dict[str, requests.AckRequest],
160+
errors_dict: Optional[Dict[str, str]],
164161
):
165162
"""Process requests by referring to error_status and errors_dict.
166163
@@ -182,9 +179,9 @@ def _process_requests(
182179
exc = AcknowledgeError(AcknowledgeStatus.INVALID_ACK_ID, info=None)
183180
else:
184181
exc = AcknowledgeError(AcknowledgeStatus.OTHER, exactly_once_error)
185-
186182
future = ack_reqs_dict[ack_id].future
187-
future.set_exception(exc)
183+
if future is not None:
184+
future.set_exception(exc)
188185
requests_completed.append(ack_reqs_dict[ack_id])
189186
# Temporary GRPC errors are retried
190187
elif (
@@ -201,12 +198,14 @@ def _process_requests(
201198
else:
202199
exc = AcknowledgeError(AcknowledgeStatus.OTHER, str(error_status))
203200
future = ack_reqs_dict[ack_id].future
204-
future.set_exception(exc)
201+
if future is not None:
202+
future.set_exception(exc)
205203
requests_completed.append(ack_reqs_dict[ack_id])
206204
# Since no error occurred, requests with futures are completed successfully.
207205
elif ack_reqs_dict[ack_id].future:
208206
future = ack_reqs_dict[ack_id].future
209207
# success
208+
assert future is not None
210209
future.set_result(AcknowledgeStatus.SUCCESS)
211210
requests_completed.append(ack_reqs_dict[ack_id])
212211
# All other requests are considered completed.

tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1713,6 +1713,21 @@ def test_process_requests_no_errors():
17131713
assert not requests_to_retry
17141714

17151715

1716+
def test_process_requests_no_errors_no_future():
1717+
# no errors, request should be completed, even when future is None.
1718+
ack_reqs_dict = {
1719+
"ackid1": requests.AckRequest(
1720+
ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=None
1721+
)
1722+
}
1723+
errors_dict = {}
1724+
requests_completed, requests_to_retry = streaming_pull_manager._process_requests(
1725+
None, ack_reqs_dict, errors_dict
1726+
)
1727+
assert requests_completed[0].ack_id == "ackid1"
1728+
assert not requests_to_retry
1729+
1730+
17161731
def test_process_requests_permanent_error_raises_exception():
17171732
# a permanent error raises an exception
17181733
future = futures.Future()
@@ -1735,6 +1750,40 @@ def test_process_requests_permanent_error_raises_exception():
17351750
assert not requests_to_retry
17361751

17371752

1753+
def test_process_requests_permanent_error_other_raises_exception():
1754+
# a permanent error of other raises an exception
1755+
future = futures.Future()
1756+
ack_reqs_dict = {
1757+
"ackid1": requests.AckRequest(
1758+
ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=future
1759+
)
1760+
}
1761+
errors_dict = {"ackid1": "PERMANENT_FAILURE_OTHER"}
1762+
requests_completed, requests_to_retry = streaming_pull_manager._process_requests(
1763+
None, ack_reqs_dict, errors_dict
1764+
)
1765+
assert requests_completed[0].ack_id == "ackid1"
1766+
with pytest.raises(subscriber_exceptions.AcknowledgeError) as exc_info:
1767+
future.result()
1768+
assert exc_info.value.error_code == subscriber_exceptions.AcknowledgeStatus.OTHER
1769+
assert not requests_to_retry
1770+
1771+
1772+
def test_process_requests_permanent_error_other_raises_exception_no_future():
1773+
# with a permanent error, request is completed even when future is None.
1774+
ack_reqs_dict = {
1775+
"ackid1": requests.AckRequest(
1776+
ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=None
1777+
)
1778+
}
1779+
errors_dict = {"ackid1": "PERMANENT_FAILURE_OTHER"}
1780+
requests_completed, requests_to_retry = streaming_pull_manager._process_requests(
1781+
None, ack_reqs_dict, errors_dict
1782+
)
1783+
assert requests_completed[0].ack_id == "ackid1"
1784+
assert not requests_to_retry
1785+
1786+
17381787
def test_process_requests_transient_error_returns_request_for_retrying():
17391788
# a transient error returns the request in `requests_to_retry`
17401789
future = futures.Future()
@@ -1872,6 +1921,23 @@ def test_process_requests_other_error_status_raises_exception():
18721921
assert not requests_to_retry
18731922

18741923

1924+
def test_process_requests_other_error_status_raises_exception_no_future():
1925+
# with an unrecognized error status, requests are completed, even when
1926+
# future is None.
1927+
ack_reqs_dict = {
1928+
"ackid1": requests.AckRequest(
1929+
ack_id="ackid1", byte_size=0, time_to_ack=20, ordering_key="", future=None
1930+
)
1931+
}
1932+
st = status_pb2.Status()
1933+
st.code = code_pb2.Code.OUT_OF_RANGE
1934+
requests_completed, requests_to_retry = streaming_pull_manager._process_requests(
1935+
st, ack_reqs_dict, None
1936+
)
1937+
assert requests_completed[0].ack_id == "ackid1"
1938+
assert not requests_to_retry
1939+
1940+
18751941
def test_process_requests_mixed_success_and_failure_acks():
18761942
# mixed success and failure (acks)
18771943
future1 = futures.Future()

0 commit comments

Comments
 (0)