Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ tests/docker/conf/tls/*
.idea
tmp-KafkaCluster
.venv
venv_test/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍
but a newline would be nice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for *, the dir is sufficient.

84 changes: 69 additions & 15 deletions src/confluent_kafka/deserializing_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,29 +130,83 @@ def poll(self, timeout=-1):
if msg.error() is not None:
raise ConsumeError(msg.error(), kafka_message=msg)

ctx = SerializationContext(msg.topic(), MessageField.VALUE)
value = msg.value()
return self._parse_deserialize_message(msg)

def consume(self, num_messages=1, timeout=-1):
"""
Consume up to the number of messages specified with a timeout for each request

Args:
num_messages (int): The maximum number of messages to wait for.
timeout (float): Maximum time to block waiting for message(Seconds).

Returns:
:py:class:`Message` or None on timeout

Raises:
KeyDeserializationError: If an error occurs during key
deserialization.

ValueDeserializationError: If an error occurs during value
deserialization.

ConsumeError if an error was encountered while polling.

RuntimeError if the number of messages is less than 1
"""
if num_messages < 1:
raise RuntimeError("The maximum number of messages must be greater than or equal to 1.")

messages = super(DeserializingConsumer, self).consume(num_messages, timeout)

if messages is None:
return []

deserialized_messages = []

for message in messages:
if message.error() is not None:
raise ConsumeError(message.error(), kafka_message=message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this implementation effectively causes message without error to be thrown away, which is probably not ideal.

thanks for the PR, currently re-considering the need for batch consume.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, only did this to ensure that we kept parity with the singular API, but returning the messages with the error code is likely the better idea. I removed the exception throw.


deserialized_messages.append(self._parse_deserialize_message(message))

return deserialized_messages

def _parse_deserialize_message(self, message):
"""
Internal class method for deserializing and maintaining consistency between poll and consume classes.

This function will take in a raw serialized message (from cimpl) and return a deserialized message back.

Args:
message (cimpl.Message): The serialized message returned from the base consumer class.

Returns:
:py:class:`Message` on sucessful deserialization

Raises:
KeyDeserializationError: If an error occurs during key
deserialization.

ValueDeserializationError: If an error occurs during value
deserialization.
"""
ctx = SerializationContext(message.topic(), MessageField.VALUE)
value = message.value()
if self._value_deserializer is not None:
try:
value = self._value_deserializer(value, ctx)
except Exception as se:
raise ValueDeserializationError(exception=se, kafka_message=msg)
raise ValueDeserializationError(exception=se, kafka_message=message)

key = msg.key()
key = message.key()
ctx.field = MessageField.KEY
if self._key_deserializer is not None:
try:
key = self._key_deserializer(key, ctx)
except Exception as se:
raise KeyDeserializationError(exception=se, kafka_message=msg)

msg.set_key(key)
msg.set_value(value)
return msg
raise KeyDeserializationError(exception=se, kafka_message=message)

def consume(self, num_messages=1, timeout=-1):
"""
:py:func:`Consumer.consume` not implemented, use
:py:func:`DeserializingConsumer.poll` instead
"""
raise NotImplementedError
message.set_key(key)
message.set_value(value)
return message
4 changes: 2 additions & 2 deletions tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ A python3 env suitable for running tests:

$ python3 -m venv venv_test
$ source venv_test/bin/activate
$ pip install -r test/requirements.txt
$ pip install -r tests/requirements.txt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

$ python setup.py build
$ python setup.py install

Expand All @@ -28,7 +28,7 @@ When you're finished with it:

## Unit tests

"Unit" tests are the ones directly in the `./test` directory. These tests do
"Unit" tests are the ones directly in the `./tests` directory. These tests do
not require an active Kafka cluster.

You can run them selectively like so:
Expand Down