Skip to content

Commit aa4c836

Browse files
authored
Add missing endpoints (/schemas, /subjects) to SchemaRegistryClient (#2017)
* updaet * update * tests * fix integration tests * regenerate sync code + fix it * fix some unit tests broken from unasync * missing params and address feedback * lint * revert breaking chamges * update * update * address feedback * remove unused import
1 parent c15fdf8 commit aa4c836

File tree

7 files changed

+466
-81
lines changed

7 files changed

+466
-81
lines changed

DEVELOPER.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ If librdkafka is installed in a non-standard location provide the include and li
1111

1212
$ C_INCLUDE_PATH=/path/to/include LIBRARY_PATH=/path/to/lib python -m build
1313

14-
**Note**: On Windows the variables for Visual Studio are named INCLUDE and LIB
14+
**Note**: On Windows the variables for Visual Studio are named INCLUDE and LIB
1515

1616
## Generate Documentation
1717

@@ -45,4 +45,3 @@ If you make any changes to the async code (in `src/confluent_kafka/schema_regist
4545

4646

4747
See [tests/README.md](tests/README.md) for instructions on how to run tests.
48-

src/confluent_kafka/schema_registry/_async/schema_registry_client.py

Lines changed: 165 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
from confluent_kafka.schema_registry.error import SchemaRegistryError, OAuthTokenError
3434
from confluent_kafka.schema_registry.common.schema_registry_client import (
3535
RegisteredSchema,
36+
SchemaVersion,
3637
ServerConfig,
3738
is_success,
3839
is_retriable,
@@ -663,17 +664,19 @@ async def register_schema_full_response(
663664
return registered_schema
664665

665666
async def get_schema(
666-
self, schema_id: int, subject_name: Optional[str] = None, fmt: Optional[str] = None
667+
self, schema_id: int, subject_name: Optional[str] = None,
668+
fmt: Optional[str] = None, reference_format: Optional[str] = None,
667669
) -> 'Schema':
668670
"""
669671
Fetches the schema associated with ``schema_id`` from the
670672
Schema Registry. The result is cached so subsequent attempts will not
671673
require an additional round-trip to the Schema Registry.
672674
673675
Args:
674-
schema_id (int): Schema id
675-
subject_name (str): Subject name the schema is registered under
676-
fmt (str): Format of the schema
676+
schema_id (int): Schema id.
677+
subject_name (str): Subject name the schema is registered under.
678+
fmt (str): Desired output format, dependent on schema type.
679+
reference_format (str): Desired output format for references.
677680
678681
Returns:
679682
Schema: Schema instance identified by the ``schema_id``
@@ -682,19 +685,21 @@ async def get_schema(
682685
SchemaRegistryError: If schema can't be found.
683686
684687
See Also:
685-
`GET Schema API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--schemas-ids-int-%20id>`_
688+
`GET Schema API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--schemas-ids-int-%20id>`_
686689
""" # noqa: E501
687690

688691
result = self._cache.get_schema_by_id(subject_name, schema_id)
689692
if result is not None:
690693
return result[1]
691694

692-
query = {'subject': subject_name} if subject_name is not None else None
695+
query = {}
696+
if subject_name is not None:
697+
query['subject'] = subject_name
693698
if fmt is not None:
694-
if query is not None:
695-
query['format'] = fmt
696-
else:
697-
query = {'format': fmt}
699+
query['format'] = fmt
700+
if reference_format is not None:
701+
query['reference_format'] = reference_format
702+
698703
response = await self._rest_client.get('schemas/ids/{}'.format(schema_id), query)
699704

700705
registered_schema = RegisteredSchema.from_dict(response)
@@ -741,24 +746,102 @@ async def get_schema_by_guid(
741746

742747
return registered_schema.schema
743748

749+
async def get_schema_types(self) -> List[str]:
750+
"""
751+
Lists all supported schema types in the Schema Registry.
752+
753+
Returns:
754+
list(str): List of supported schema types (e.g., ['AVRO', 'JSON', 'PROTOBUF'])
755+
756+
Raises:
757+
SchemaRegistryError: if schema types can't be retrieved
758+
759+
See Also:
760+
`GET Schema Types API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--schemas-types>`_
761+
""" # noqa: E501
762+
763+
return await self._rest_client.get('schemas/types')
764+
765+
async def get_subjects_by_schema_id(
766+
self, schema_id: int, subject_name: Optional[str] = None, deleted: bool = False,
767+
offset: int = 0, limit: int = -1
768+
) -> List[str]:
769+
"""
770+
Retrieves all the subjects associated with ``schema_id``.
771+
772+
Args:
773+
schema_id (int): Schema ID.
774+
subject_name (str): Subject name that results can be filtered by.
775+
deleted (bool): Whether to include subjects where the schema was deleted.
776+
offset (int): Pagination offset for results.
777+
limit (int): Pagination size for results. Ignored if negative.
778+
779+
Returns:
780+
list(str): List of subjects matching the specified parameters.
781+
782+
Raises:
783+
SchemaRegistryError: if subjects can't be found
784+
"""
785+
query = {'offset': offset, 'limit': limit}
786+
if subject_name is not None:
787+
query['subject'] = subject_name
788+
if deleted:
789+
query['deleted'] = deleted
790+
return await self._rest_client.get('schemas/ids/{}/subjects'.format(schema_id), query)
791+
792+
async def get_schema_versions(
793+
self, schema_id: int, subject_name: Optional[str] = None, deleted: bool = False,
794+
offset: int = 0, limit: int = -1
795+
) -> List[SchemaVersion]:
796+
"""
797+
Gets all subject-version pairs of a schema by its ID.
798+
799+
Args:
800+
schema_id (int): Schema ID.
801+
subject_name (str): Subject name that results can be filtered by.
802+
deleted (bool): Whether to include subject versions where the schema was deleted.
803+
offset (int): Pagination offset for results.
804+
limit (int): Pagination size for results. Ignored if negative.
805+
806+
Returns:
807+
list(SchemaVersion): List of subject-version pairs. Each pair contains:
808+
- subject (str): Subject name.
809+
- version (int): Version number.
810+
811+
Raises:
812+
SchemaRegistryError: if schema versions can't be found.
813+
814+
See Also:
815+
`GET Schema Versions API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--schemas-ids-int-%20id-versions>`_
816+
""" # noqa: E501
817+
818+
query = {'offset': offset, 'limit': limit}
819+
if subject_name is not None:
820+
query['subject'] = subject_name
821+
if deleted:
822+
query['deleted'] = deleted
823+
response = await self._rest_client.get('schemas/ids/{}/versions'.format(schema_id), query)
824+
return [SchemaVersion.from_dict(item) for item in response]
825+
744826
async def lookup_schema(
745827
self, subject_name: str, schema: 'Schema',
746-
normalize_schemas: bool = False, deleted: bool = False
828+
normalize_schemas: bool = False, fmt: Optional[str] = None, deleted: bool = False
747829
) -> 'RegisteredSchema':
748830
"""
749831
Returns ``schema`` registration information for ``subject``.
750832
751833
Args:
752-
subject_name (str): Subject name the schema is registered under
834+
subject_name (str): Subject name the schema is registered under.
753835
schema (Schema): Schema instance.
754-
normalize_schemas (bool): Normalize schema before registering
836+
normalize_schemas (bool): Normalize schema before registering.
837+
fmt (str): Desired output format, dependent on schema type.
755838
deleted (bool): Whether to include deleted schemas.
756839
757840
Returns:
758841
RegisteredSchema: Subject registration information for this schema.
759842
760843
Raises:
761-
SchemaRegistryError: If schema or subject can't be found
844+
SchemaRegistryError: If schema or subject can't be found.
762845
763846
See Also:
764847
`POST Subject API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)>`_
@@ -770,9 +853,17 @@ async def lookup_schema(
770853

771854
request = schema.to_dict()
772855

856+
query_params = {
857+
'normalize': normalize_schemas,
858+
'deleted': deleted
859+
}
860+
if fmt is not None:
861+
query_params['format'] = fmt
862+
863+
query_string = '&'.join(f"{key}={value}" for key, value in query_params.items())
864+
773865
response = await self._rest_client.post(
774-
'subjects/{}?normalize={}&deleted={}'.format(
775-
_urlencode(subject_name), normalize_schemas, deleted),
866+
'subjects/{}?{}'.format(_urlencode(subject_name), query_string),
776867
body=request
777868
)
778869

@@ -791,9 +882,19 @@ async def lookup_schema(
791882

792883
return registered_schema
793884

794-
async def get_subjects(self) -> List[str]:
885+
async def get_subjects(
886+
self, subject_prefix: Optional[str] = None, deleted: bool = False, deleted_only: bool = False,
887+
offset: int = 0, limit: int = -1
888+
) -> List[str]:
795889
"""
796-
Lists all subjects registered with the Schema Registry
890+
Lists all subjects registered with the Schema Registry.
891+
892+
Args:
893+
subject_prefix (str): Subject name prefix that results can be filtered by.
894+
deleted (bool): Whether to include deleted subjects.
895+
deleted_only (bool): Whether to return deleted subjects only. If both deleted and deleted_only are True, deleted_only takes precedence.
896+
offset (int): Pagination offset for results.
897+
limit (int): Pagination size for results. Ignored if negative.
797898
798899
Returns:
799900
list(str): Registered subject names
@@ -805,7 +906,10 @@ async def get_subjects(self) -> List[str]:
805906
`GET subjects API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects>`_
806907
""" # noqa: E501
807908

808-
return await self._rest_client.get('subjects')
909+
query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset, 'limit': limit}
910+
if subject_prefix is not None:
911+
query['subject'] = subject_prefix
912+
return await self._rest_client.get('subjects', query)
809913

810914
async def delete_subject(self, subject_name: str, permanent: bool = False) -> List[int]:
811915
"""
@@ -899,7 +1003,9 @@ async def get_latest_with_metadata(
8991003
if registered_schema is not None:
9001004
return registered_schema
9011005

902-
query = {'deleted': deleted, 'format': fmt} if fmt is not None else {'deleted': deleted}
1006+
query = {'deleted': deleted}
1007+
if fmt is not None:
1008+
query['format'] = fmt
9031009
keys = metadata.keys()
9041010
if keys:
9051011
query['key'] = [_urlencode(key) for key in keys]
@@ -920,13 +1026,13 @@ async def get_version(
9201026
deleted: bool = False, fmt: Optional[str] = None
9211027
) -> 'RegisteredSchema':
9221028
"""
923-
Retrieves a specific schema registered under ``subject_name``.
1029+
Retrieves a specific schema registered under `subject_name` and `version`.
9241030
9251031
Args:
9261032
subject_name (str): Subject name.
927-
version (int): version number. Defaults to latest version.
1033+
version (Union[int, str]): Version of the schema or string "latest". Defaults to latest version.
9281034
deleted (bool): Whether to include deleted schemas.
929-
fmt (str): Format of the schema
1035+
fmt (str): Format of the schema.
9301036
9311037
Returns:
9321038
RegisteredSchema: Registration information for this version.
@@ -935,7 +1041,7 @@ async def get_version(
9351041
SchemaRegistryError: if the version can't be found or is invalid.
9361042
9371043
See Also:
938-
`GET Subject Versions API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects-(string-%20subject)-versions>`_
1044+
`GET Subject Versions API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects-(string-%20subject)-versions-(versionId-%20version)>`_
9391045
""" # noqa: E501
9401046

9411047
registered_schema = self._cache.get_registered_by_subject_version(subject_name, version)
@@ -953,12 +1059,44 @@ async def get_version(
9531059

9541060
return registered_schema
9551061

956-
async def get_versions(self, subject_name: str) -> List[int]:
1062+
async def get_referenced_by(
1063+
self, subject_name: str, version: Union[int, str] = "latest", offset: int = 0, limit: int = -1
1064+
) -> List[int]:
1065+
"""
1066+
Get a list of IDs of schemas that reference the schema with the given `subject_name` and `version`.
1067+
1068+
Args:
1069+
subject_name (str): Subject name
1070+
version (int or str): Version number or "latest"
1071+
offset (int): Pagination offset for results.
1072+
limit (int): Pagination size for results. Ignored if negative.
1073+
1074+
Returns:
1075+
list(int): List of schema IDs that reference the specified schema.
1076+
1077+
Raises:
1078+
SchemaRegistryError: if the schema version can't be found or referenced schemas can't be retrieved
1079+
1080+
See Also:
1081+
`GET Subject Versions API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#get--subjects-(string-%20subject)-versions-versionId-%20version-referencedby>`_
1082+
""" # noqa: E501
1083+
1084+
query = {'offset': offset, 'limit': limit}
1085+
return await self._rest_client.get('subjects/{}/versions/{}/referencedby'.format(
1086+
_urlencode(subject_name), version), query)
1087+
1088+
async def get_versions(
1089+
self, subject_name: str, deleted: bool = False, deleted_only: bool = False, offset: int = 0, limit: int = -1
1090+
) -> List[int]:
9571091
"""
9581092
Get a list of all versions registered with this subject.
9591093
9601094
Args:
9611095
subject_name (str): Subject name.
1096+
deleted (bool): Whether to include deleted schemas.
1097+
deleted_only (bool): Whether to return deleted versions only. If both deleted and deleted_only are True, deleted_only takes precedence.
1098+
offset (int): Pagination offset for results.
1099+
limit (int): Pagination size for results. Ignored if negative.
9621100
9631101
Returns:
9641102
list(int): Registered versions
@@ -970,7 +1108,8 @@ async def get_versions(self, subject_name: str) -> List[int]:
9701108
`GET Subject Versions API Reference <https://docs.confluent.io/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions>`_
9711109
""" # noqa: E501
9721110

973-
return await self._rest_client.get('subjects/{}/versions'.format(_urlencode(subject_name)))
1111+
query = {'deleted': deleted, 'deleted_only': deleted_only, 'offset': offset, 'limit': limit}
1112+
return await self._rest_client.get('subjects/{}/versions'.format(_urlencode(subject_name)), query)
9741113

9751114
async def delete_version(self, subject_name: str, version: int, permanent: bool = False) -> int:
9761115
"""

0 commit comments

Comments
 (0)