Skip to content

Commit 8724197

Browse files
authored
DGS-21988 Fix transformation of nullable JSON props (#2032)
* DGS-21988 Fix transformation of nullable JSON props * Minor cleanup
1 parent aa4c836 commit 8724197

File tree

3 files changed

+156
-0
lines changed

3 files changed

+156
-0
lines changed

src/confluent_kafka/schema_registry/common/json_schema.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,14 @@ def transform(
7373
field_ctx = ctx.current_field()
7474
if field_ctx is not None:
7575
field_ctx.field_type = get_type(schema)
76+
original_type = schema.get("type")
77+
if isinstance(original_type, list) and len(original_type) > 0:
78+
subschema = _validate_subtypes(schema, message, ref_registry)
79+
try:
80+
if subschema is not None:
81+
return transform(ctx, subschema, ref_registry, ref_resolver, path, message, field_transform)
82+
finally:
83+
schema["type"] = original_type # restore original type
7684
all_of = schema.get("allOf")
7785
if all_of is not None:
7886
subschema = _validate_subschemas(all_of, message, ref_registry)
@@ -136,6 +144,22 @@ def _transform_field(
136144
ctx.exit_field()
137145

138146

147+
def _validate_subtypes(
148+
schema: JsonSchema, message: JsonMessage, registry: Registry
149+
) -> Optional[JsonSchema]:
150+
schema_type = schema.get("type")
151+
if not isinstance(schema_type, list) or len(schema_type) == 0:
152+
return None
153+
for typ in schema_type:
154+
schema["type"] = typ
155+
try:
156+
validate(instance=message, schema=schema, registry=registry)
157+
return schema
158+
except ValidationError:
159+
pass
160+
return None
161+
162+
139163
def _validate_subschemas(
140164
subschemas: List[JsonSchema],
141165
message: JsonMessage,

tests/schema_registry/_async/test_json_serdes.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,72 @@ async def test_json_cel_field_transform():
573573
assert obj2 == newobj
574574

575575

576+
async def test_json_cel_field_transform_with_nullable():
577+
conf = {'url': _BASE_URL}
578+
client = AsyncSchemaRegistryClient.new_client(conf)
579+
ser_conf = {'auto.register.schemas': False, 'use.latest.version': True}
580+
schema = {
581+
"type": "object",
582+
"properties": {
583+
"intField": {"type": "integer"},
584+
"doubleField": {"type": "number"},
585+
"stringField": {
586+
"type": ["string", "null"],
587+
"confluent:tags": ["PII"]
588+
},
589+
"booleanField": {"type": "boolean"},
590+
"bytesField": {
591+
"type": "string",
592+
"contentEncoding": "base64",
593+
"confluent:tags": ["PII"]
594+
}
595+
}
596+
}
597+
598+
rule = Rule(
599+
"test-cel",
600+
"",
601+
RuleKind.TRANSFORM,
602+
RuleMode.WRITE,
603+
"CEL_FIELD",
604+
None,
605+
None,
606+
"name == 'stringField' ; value + '-suffix'",
607+
None,
608+
None,
609+
False
610+
)
611+
await client.register_schema(_SUBJECT, Schema(
612+
json.dumps(schema),
613+
"JSON",
614+
[],
615+
None,
616+
RuleSet(None, [rule])
617+
))
618+
619+
obj = {
620+
'intField': 123,
621+
'doubleField': 45.67,
622+
'stringField': 'hi',
623+
'booleanField': True,
624+
'bytesField': base64.b64encode(b'foobar').decode('utf-8'),
625+
}
626+
ser = await AsyncJSONSerializer(json.dumps(schema), client, conf=ser_conf)
627+
ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
628+
obj_bytes = await ser(obj, ser_ctx)
629+
630+
obj2 = {
631+
'intField': 123,
632+
'doubleField': 45.67,
633+
'stringField': 'hi-suffix',
634+
'booleanField': True,
635+
'bytesField': base64.b64encode(b'foobar').decode('utf-8'),
636+
}
637+
deser = await AsyncJSONDeserializer(None, schema_registry_client=client)
638+
newobj = await deser(obj_bytes, ser_ctx)
639+
assert obj2 == newobj
640+
641+
576642
async def test_json_cel_field_transform_with_def():
577643
conf = {'url': _BASE_URL}
578644
client = AsyncSchemaRegistryClient.new_client(conf)

tests/schema_registry/_sync/test_json_serdes.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -573,6 +573,72 @@ def test_json_cel_field_transform():
573573
assert obj2 == newobj
574574

575575

576+
def test_json_cel_field_transform_with_nullable():
577+
conf = {'url': _BASE_URL}
578+
client = SchemaRegistryClient.new_client(conf)
579+
ser_conf = {'auto.register.schemas': False, 'use.latest.version': True}
580+
schema = {
581+
"type": "object",
582+
"properties": {
583+
"intField": {"type": "integer"},
584+
"doubleField": {"type": "number"},
585+
"stringField": {
586+
"type": ["string", "null"],
587+
"confluent:tags": ["PII"]
588+
},
589+
"booleanField": {"type": "boolean"},
590+
"bytesField": {
591+
"type": "string",
592+
"contentEncoding": "base64",
593+
"confluent:tags": ["PII"]
594+
}
595+
}
596+
}
597+
598+
rule = Rule(
599+
"test-cel",
600+
"",
601+
RuleKind.TRANSFORM,
602+
RuleMode.WRITE,
603+
"CEL_FIELD",
604+
None,
605+
None,
606+
"name == 'stringField' ; value + '-suffix'",
607+
None,
608+
None,
609+
False
610+
)
611+
client.register_schema(_SUBJECT, Schema(
612+
json.dumps(schema),
613+
"JSON",
614+
[],
615+
None,
616+
RuleSet(None, [rule])
617+
))
618+
619+
obj = {
620+
'intField': 123,
621+
'doubleField': 45.67,
622+
'stringField': 'hi',
623+
'booleanField': True,
624+
'bytesField': base64.b64encode(b'foobar').decode('utf-8'),
625+
}
626+
ser = JSONSerializer(json.dumps(schema), client, conf=ser_conf)
627+
ser_ctx = SerializationContext(_TOPIC, MessageField.VALUE)
628+
obj_bytes = ser(obj, ser_ctx)
629+
630+
obj2 = {
631+
'intField': 123,
632+
'doubleField': 45.67,
633+
'stringField': 'hi-suffix',
634+
'booleanField': True,
635+
'bytesField': base64.b64encode(b'foobar').decode('utf-8'),
636+
}
637+
deser = JSONDeserializer(None, schema_registry_client=client)
638+
newobj = deser(obj_bytes, ser_ctx)
639+
assert obj2 == newobj
640+
641+
576642
def test_json_cel_field_transform_with_def():
577643
conf = {'url': _BASE_URL}
578644
client = SchemaRegistryClient.new_client(conf)

0 commit comments

Comments
 (0)