Skip to content

Commit cc4c8d8

Browse files
authored
Isolate use or orjson to JSON serdes (#2025)
1 parent caf51da commit cc4c8d8

File tree

10 files changed

+20
-19
lines changed

10 files changed

+20
-19
lines changed

requirements/requirements-examples.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ avro>=1.11.1,<2
1616

1717
pyrsistent
1818
jsonschema
19+
orjson >= 3.10
1920

2021
googleapis-common-protos
2122
protobuf

requirements/requirements-json.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
pyrsistent
2-
jsonschema
2+
jsonschema
3+
orjson >= 3.10

requirements/requirements-schemaregistry.txt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,3 @@ attrs>=21.2.0
22
cachetools>=5.5.0
33
httpx>=0.26
44
authlib>=1.0.0
5-
orjson >= 3.10

src/confluent_kafka/avro/serializer/message_serializer.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
# derived from https://github.com/verisign/python-confluent-schemaregistry.git
2121
#
2222
import io
23-
import orjson
23+
import json
2424
import logging
2525
import struct
2626
import sys
@@ -80,7 +80,7 @@ def __init__(self, registry_client, reader_key_schema=None, reader_value_schema=
8080
# Encoder support
8181
def _get_encoder_func(self, writer_schema):
8282
if HAS_FAST:
83-
schema = orjson.loads(str(writer_schema))
83+
schema = json.loads(str(writer_schema))
8484
parsed_schema = parse_schema(schema)
8585
return lambda record, fp: schemaless_writer(fp, parsed_schema, record)
8686
writer = avro.io.DatumWriter(writer_schema)
@@ -176,9 +176,9 @@ def _get_decoder_func(self, schema_id, payload, is_key=False):
176176
if HAS_FAST:
177177
# try to use fast avro
178178
try:
179-
fast_avro_writer_schema = parse_schema(orjson.loads(str(writer_schema_obj)))
179+
fast_avro_writer_schema = parse_schema(json.loads(str(writer_schema_obj)))
180180
if reader_schema_obj is not None:
181-
fast_avro_reader_schema = parse_schema(orjson.loads(str(reader_schema_obj)))
181+
fast_avro_reader_schema = parse_schema(json.loads(str(reader_schema_obj)))
182182
else:
183183
fast_avro_reader_schema = None
184184
schemaless_reader(payload, fast_avro_writer_schema)

src/confluent_kafka/kafkatest/verifiable_client.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515

1616
import datetime
17-
import orjson
17+
import json
1818
import os
1919
import re
2020
import signal
@@ -61,8 +61,8 @@ def err(self, s, term=False):
6161
def send(self, d):
6262
""" Send dict as JSON to stdout for consumtion by kafkatest handler """
6363
d['_time'] = str(datetime.datetime.now())
64-
self.dbg('SEND: %s' % orjson.dumps(d).decode("utf-8"))
65-
sys.stdout.write('%s\n' % orjson.dumps(d).decode("utf-8"))
64+
self.dbg('SEND: %s' % json.dumps(d))
65+
sys.stdout.write('%s\n' % json.dumps(d))
6666
sys.stdout.flush()
6767

6868
@staticmethod

src/confluent_kafka/schema_registry/_async/avro.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
1717
import io
18-
import orjson
18+
import json
1919
from typing import Dict, Union, Optional, Callable
2020

2121
from fastavro import schemaless_reader, schemaless_writer
@@ -270,7 +270,7 @@ async def __init_impl(
270270
# i.e. {"type": "string"} has a name of string.
271271
# This function does not comply.
272272
# https://github.com/fastavro/fastavro/issues/415
273-
schema_dict = orjson.loads(schema.schema_str)
273+
schema_dict = json.loads(schema.schema_str)
274274
schema_name = parsed_schema.get("name", schema_dict.get("type"))
275275
else:
276276
schema_name = None

src/confluent_kafka/schema_registry/_async/schema_registry_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# limitations under the License.
1717
#
1818
import asyncio
19-
import orjson
19+
import json
2020
import logging
2121
import time
2222
import urllib
@@ -416,7 +416,7 @@ async def send_request(
416416
" application/json"}
417417

418418
if body is not None:
419-
body = orjson.dumps(body).decode('utf-8')
419+
body = json.dumps(body)
420420
headers = {'Content-Length': str(len(body)),
421421
'Content-Type': "application/vnd.schemaregistry.v1+json"}
422422

src/confluent_kafka/schema_registry/_sync/avro.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# See the License for the specific language governing permissions and
1616
# limitations under the License.
1717
import io
18-
import orjson
18+
import json
1919
from typing import Dict, Union, Optional, Callable
2020

2121
from fastavro import schemaless_reader, schemaless_writer
@@ -270,7 +270,7 @@ def __init_impl(
270270
# i.e. {"type": "string"} has a name of string.
271271
# This function does not comply.
272272
# https://github.com/fastavro/fastavro/issues/415
273-
schema_dict = orjson.loads(schema.schema_str)
273+
schema_dict = json.loads(schema.schema_str)
274274
schema_name = parsed_schema.get("name", schema_dict.get("type"))
275275
else:
276276
schema_name = None

src/confluent_kafka/schema_registry/_sync/schema_registry_client.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# limitations under the License.
1717
#
1818

19-
import orjson
19+
import json
2020
import logging
2121
import time
2222
import urllib
@@ -416,7 +416,7 @@ def send_request(
416416
" application/json"}
417417

418418
if body is not None:
419-
body = orjson.dumps(body).decode('utf-8')
419+
body = json.dumps(body)
420420
headers = {'Content-Length': str(len(body)),
421421
'Content-Type': "application/vnd.schemaregistry.v1+json"}
422422

src/confluent_kafka/schema_registry/common/avro.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from collections import defaultdict
44
from copy import deepcopy
55
from io import BytesIO
6-
import orjson
6+
import json
77
from typing import Dict, Union, Optional, Set
88

99
from fastavro import repository, validate
@@ -91,7 +91,7 @@ def load(self, subject):
9191

9292
def parse_schema_with_repo(schema_str: str, named_schemas: Dict[str, AvroSchema]) -> AvroSchema:
9393
copy = deepcopy(named_schemas)
94-
copy["$root"] = orjson.loads(schema_str)
94+
copy["$root"] = json.loads(schema_str)
9595
repo = LocalSchemaRepository(copy)
9696
return load_schema("$root", repo=repo)
9797

0 commit comments

Comments
 (0)