- Notifications
You must be signed in to change notification settings - Fork 933
Fixed some testing docs and consume in DeserializingConsumer #1085
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -27,3 +27,4 @@ tests/docker/conf/tls/* | |
| .idea | ||
| tmp-KafkaCluster | ||
| .venv | ||
| venv_test/* | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -130,29 +130,83 @@ def poll(self, timeout=-1): | |
| if msg.error() is not None: | ||
| raise ConsumeError(msg.error(), kafka_message=msg) | ||
| | ||
| ctx = SerializationContext(msg.topic(), MessageField.VALUE) | ||
| value = msg.value() | ||
| return self._parse_deserialize_message(msg) | ||
| | ||
| def consume(self, num_messages=1, timeout=-1): | ||
| """ | ||
| Consume up to the number of messages specified with a timeout for each request | ||
| | ||
| Args: | ||
| num_messages (int): The maximum number of messages to wait for. | ||
| timeout (float): Maximum time to block waiting for message(Seconds). | ||
| | ||
| Returns: | ||
| :py:class:`Message` or None on timeout | ||
| | ||
| Raises: | ||
| KeyDeserializationError: If an error occurs during key | ||
| deserialization. | ||
| | ||
| ValueDeserializationError: If an error occurs during value | ||
| deserialization. | ||
| | ||
| ConsumeError if an error was encountered while polling. | ||
| | ||
| RuntimeError if the number of messages is less than 1 | ||
| """ | ||
| if num_messages < 1: | ||
| raise RuntimeError("The maximum number of messages must be greater than or equal to 1.") | ||
| | ||
| messages = super(DeserializingConsumer, self).consume(num_messages, timeout) | ||
| | ||
| if messages is None: | ||
| return [] | ||
| | ||
| deserialized_messages = [] | ||
| | ||
| for message in messages: | ||
| if message.error() is not None: | ||
| raise ConsumeError(message.error(), kafka_message=message) | ||
| ||
| | ||
| deserialized_messages.append(self._parse_deserialize_message(message)) | ||
| | ||
| return deserialized_messages | ||
| | ||
| def _parse_deserialize_message(self, message): | ||
| """ | ||
| Internal class method for deserializing and maintaining consistency between poll and consume classes. | ||
| | ||
| This function will take in a raw serialized message (from cimpl) and return a deserialized message back. | ||
| | ||
| Args: | ||
| message (cimpl.Message): The serialized message returned from the base consumer class. | ||
| | ||
| Returns: | ||
| :py:class:`Message` on sucessful deserialization | ||
| | ||
| Raises: | ||
| KeyDeserializationError: If an error occurs during key | ||
| deserialization. | ||
| | ||
| ValueDeserializationError: If an error occurs during value | ||
| deserialization. | ||
| """ | ||
| ctx = SerializationContext(message.topic(), MessageField.VALUE) | ||
| value = message.value() | ||
| if self._value_deserializer is not None: | ||
| try: | ||
| value = self._value_deserializer(value, ctx) | ||
| except Exception as se: | ||
| raise ValueDeserializationError(exception=se, kafka_message=msg) | ||
| raise ValueDeserializationError(exception=se, kafka_message=message) | ||
| | ||
| key = msg.key() | ||
| key = message.key() | ||
| ctx.field = MessageField.KEY | ||
| if self._key_deserializer is not None: | ||
| try: | ||
| key = self._key_deserializer(key, ctx) | ||
| except Exception as se: | ||
| raise KeyDeserializationError(exception=se, kafka_message=msg) | ||
| | ||
| msg.set_key(key) | ||
| msg.set_value(value) | ||
| return msg | ||
| raise KeyDeserializationError(exception=se, kafka_message=message) | ||
| | ||
| def consume(self, num_messages=1, timeout=-1): | ||
| """ | ||
| :py:func:`Consumer.consume` not implemented, use | ||
| :py:func:`DeserializingConsumer.poll` instead | ||
| """ | ||
| raise NotImplementedError | ||
| message.set_key(key) | ||
| message.set_value(value) | ||
| return message | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| | @@ -18,7 +18,7 @@ A python3 env suitable for running tests: | |
| | ||
| $ python3 -m venv venv_test | ||
| $ source venv_test/bin/activate | ||
| $ pip install -r test/requirements.txt | ||
| $ pip install -r tests/requirements.txt | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 | ||
| $ python setup.py build | ||
| $ python setup.py install | ||
| | ||
| | @@ -28,7 +28,7 @@ When you're finished with it: | |
| | ||
| ## Unit tests | ||
| | ||
| "Unit" tests are the ones directly in the `./test` directory. These tests do | ||
| "Unit" tests are the ones directly in the `./tests` directory. These tests do | ||
| not require an active Kafka cluster. | ||
| | ||
| You can run them selectively like so: | ||
| | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
but a newline would be nice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need for
*, the dir is sufficient.