- Notifications
You must be signed in to change notification settings - Fork 935
Closed
Labels
component:schema-registryAny schema registry related isues rather than kafka isolated onesAny schema registry related isues rather than kafka isolated ones
Description
The ctx
argument in ProtobufDeserializer
is marked as Optional.
def __call__(self, data: bytes, ctx: Optional[SerializationContext] = None) -> Optional[Message]: |
In previous version of the library, calling ProtobufDeserializer.__call__
with ctx=None
worked. With the new version it fails with an AttributeError
, around here
I think this was introduced by this recent change #1852
There is a work around, consisting of passing asubject.name.strategy
that ignores the context:
ProtobufDeserializer(xxx, {"subject.name.strategy": lambda *_: None})
I would argue that the code should be able to handle a null ctx, without having to change the config..
In particular in the context of a ProtobufDeserializer
that doesn't have a schema registry set (and is using schema on read). Because in this case, the subject is not used, and therefore the ctx is not needed.
Steps to reproduce:
import dataclasses import pytest from confluent_kafka.schema_registry import RegisteredSchema, Schema from confluent_kafka.schema_registry.protobuf import ( ProtobufDeserializer, ProtobufSerializer, ) from confluent_kafka.serialization import MessageField, SerializationContext from google.protobuf.wrappers_pb2 import DoubleValue @dataclasses.dataclass class InMemorySchemaRegistryClient: """In memory schema registry, for test""" schemas: dict = dataclasses.field(default_factory=dict) def register_schema(self, subject_name, schema, *_, **__) -> int: try: return self.schemas[schema].schema_id except KeyError: schema_id = len(self.schemas) self.schemas[schema] = RegisteredSchema( schema_id=schema_id, schema=Schema(schema, "PROTOBUF", []), subject=subject_name, version=1, ) return schema_id # noinspection PyUnusedLocal def lookup_schema(self, subject_name, schema): return self.schemas.get(schema, None) def test_end_to_end_kafka(): context = SerializationContext("test-topic-1", MessageField.VALUE) serializer = ProtobufSerializer( DoubleValue, InMemorySchemaRegistryClient(), {"use.deprecated.format": False} ) deserializer = ProtobufDeserializer(DoubleValue, {"use.deprecated.format": False}) msg_in = DoubleValue(value=3.14) kafka_data = serializer(msg_in, context) assert isinstance(kafka_data, bytes) proto_data = msg_in.SerializeToString() assert kafka_data[-len(proto_data) :] == proto_data assert len(kafka_data) - len(proto_data) == 6 deserializer._msg_class().ParseFromString(proto_data) with pytest.raises( AttributeError, match="'NoneType' object has no attribute 'topic'" ): deserializer(kafka_data, ctx=None) msg_out = deserializer( kafka_data, ctx=SerializationContext("test-topic-1", MessageField.VALUE) ) deserializer_no_subject_name = ProtobufDeserializer( DoubleValue, {"use.deprecated.format": False, "subject.name.strategy": lambda *_: None}, ) msg_out2 = deserializer_no_subject_name(kafka_data, ctx=None) assert msg_out == msg_out2 assert isinstance(msg_out, DoubleValue) assert msg_out == msg_in kafka_data_back = serializer(msg_out, context) assert kafka_data_back == kafka_data if __name__ == "__main__": test_end_to_end_kafka()
and-ratajski
Metadata
Metadata
Assignees
Labels
component:schema-registryAny schema registry related isues rather than kafka isolated onesAny schema registry related isues rather than kafka isolated ones