Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ async def _client_run_async(self, **kwargs):
:rtype: bool
"""
try:
if self._link.current_link_credit == 0:
if self._link.current_link_credit <= 0:
await self._link.flow()
await self._connection.listen(wait=self._socket_timeout, **kwargs)
except ValueError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,11 @@ async def _incoming_attach(self, frame):
async def _incoming_transfer(self, frame):
if self.network_trace:
_LOGGER.debug("<- %r", TransferFrame(payload=b"***", *frame[:-1]), extra=self.network_trace_params)
self.current_link_credit -= 1
self.delivery_count += 1
self.received_delivery_id = frame[1] # delivery_id
# If more is false --> this is the last frame of the message
if not frame[5]:
self.current_link_credit -= 1
if self.received_delivery_id is not None:
self._first_frame = frame
if not self.received_delivery_id and not self._received_payload:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ def _client_run(self, **kwargs):
:rtype: bool
"""
try:
if self._link.current_link_credit == 0:
if self._link.current_link_credit <= 0:
self._link.flow()
self._connection.listen(wait=self._socket_timeout, **kwargs)
except ValueError:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ def _incoming_attach(self, frame):
def _incoming_transfer(self, frame):
if self.network_trace:
_LOGGER.debug("<- %r", TransferFrame(payload=b"***", *frame[:-1]), extra=self.network_trace_params)
self.current_link_credit -= 1
# If more is false --> this is the last frame of the message
if not frame[5]:
self.current_link_credit -= 1
self.delivery_count += 1
self.received_delivery_id = frame[1] # delivery_id
if self.received_delivery_id is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,27 @@ async def on_event_batch(partition_context, event_batch):
assert root_receive.children[1].links[1].headers['traceparent'] == traceparent2

settings.tracing_implementation.set_value(None)


@pytest.mark.liveTest
@pytest.mark.asyncio
async def test_receive_batch_large_event_async(connstr_senders, uamqp_transport):
connection_str, senders = connstr_senders
senders[0].send(EventData("A" * 15700))
client = EventHubConsumerClient.from_connection_string(connection_str, consumer_group='$default', uamqp_transport=uamqp_transport)

async def on_event(partition_context, event):
assert partition_context.partition_id == "0"
assert partition_context.consumer_group == "$default"
assert partition_context.fully_qualified_namespace in connection_str
assert partition_context.eventhub_name == senders[0]._client.eventhub_name
on_event.received += 1
assert client._event_processors[0]._consumers[0]._handler._link.current_link_credit == 1

on_event.received = 0
async with client:
task = asyncio.ensure_future(
client.receive(on_event, partition_id="0", starting_position="-1", prefetch=2))
await asyncio.sleep(10)
assert on_event.received == 1
await task
Original file line number Diff line number Diff line change
Expand Up @@ -328,3 +328,34 @@ def on_event_batch(partition_context, event_batch):
assert root_receive.children[1].links[1].headers['traceparent'] == traceparent2

settings.tracing_implementation.set_value(None)


@pytest.mark.liveTest
def test_receive_batch_large_event(connstr_senders, uamqp_transport):
connection_str, senders = connstr_senders
senders[0].send(EventData("A" * 15700))
client = EventHubConsumerClient.from_connection_string(
connection_str, consumer_group='$default', uamqp_transport=uamqp_transport
)

def on_event(partition_context, event):
on_event.received += 1
on_event.partition_id = partition_context.partition_id
on_event.consumer_group = partition_context.consumer_group
on_event.fully_qualified_namespace = partition_context.fully_qualified_namespace
on_event.eventhub_name = partition_context.eventhub_name
assert client._event_processors[0]._consumers[0]._handler._link.current_link_credit == 1

on_event.received = 0
with client:
worker = threading.Thread(target=client.receive_batch,
args=(on_event,),
kwargs={"starting_position": "-1",
"partition_id": "0", "prefetch": 2})
worker.start()
time.sleep(10)
assert on_event.received == 1
assert on_event.partition_id == "0"
assert on_event.consumer_group == "$default"
assert on_event.fully_qualified_namespace in connection_str
assert on_event.eventhub_name == senders[0]._client.eventhub_name
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from unittest.mock import Mock
from azure.eventhub._pyamqp.link import Link
from azure.eventhub._pyamqp.receiver import ReceiverLink
from azure.eventhub._pyamqp.constants import LinkState
import pytest

Expand Down Expand Up @@ -57,3 +58,54 @@ def test_link_should_not_detach(state):
link._outgoing_detach = Mock(return_value=None)
link.detach()
link._outgoing_detach.assert_not_called()

def test_receive_transfer_frame_multiple():
session = None
link = ReceiverLink(
session,
3,
source_address="test_source",
target_address="test_target",
network_trace=False,
network_trace_params={},
on_transfer=Mock(),
)

link.current_link_credit = 2 # Set the link credit to 2

# frame: handle, delivery_id, delivery_tag, message_format, settled, more, rcv_settle_mode, state, resume, aborted, bathable, payload
transfer_frame_one = [3, 0, b'/blah', 0, True, True, None, None, None, None, False, ""]
transfer_frame_two = [3, None, b'/blah', 0, True, False, None, None, None, None, False, ""]

link._incoming_transfer(transfer_frame_one)
assert link.current_link_credit == 2
link._incoming_transfer(transfer_frame_two)
assert link.current_link_credit == 1

def test_receive_transfer_continuation_frame():
session = None
link = ReceiverLink(
session,
3,
source_address="test_source",
target_address="test_target",
network_trace=False,
network_trace_params={},
on_transfer=Mock(),
)

link.current_link_credit = 3 # Set the link credit to 2

# frame: handle, delivery_id, delivery_tag, message_format, settled, more, rcv_settle_mode, state, resume, aborted, batchable, payload
transfer_frame_one = [3, 0, b'/blah', 0, True, False, None, None, None, None, False, ""]
transfer_frame_two = [3, 1, b'/blah', 0, True, True, None, None, None, None, False, ""]
transfer_frame_three = [3, None, b'/blah', 0, True, False, None, None, None, None, False, ""]



link._incoming_transfer(transfer_frame_one)
assert link.current_link_credit == 2
link._incoming_transfer(transfer_frame_two)
assert link.current_link_credit == 2
link._incoming_transfer(transfer_frame_three)
assert link.current_link_credit == 1