Skip to content
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,5 @@ tmp-KafkaCluster
.venv
venv_test
venv_examples
.vscode/
.dmypy.json
6 changes: 6 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[tool.mypy]
show_error_codes=true
disallow_untyped_defs=true
disallow_untyped_calls=true
warn_redundant_casts=true
strict_optional=true
9 changes: 8 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,20 @@
INSTALL_REQUIRES = [
'futures;python_version<"3.2"',
'enum34;python_version<"3.4"',
'six'
]

TEST_REQUIRES = [
'pytest==4.6.4;python_version<"3.0"',
'pytest;python_version>="3.0"',
'pytest-timeout',
'flake8'
'flake8',
# Cap the version to avoid issues with newer editions. Should be periodically updated!
'mypy<=0.991',
'types-protobuf',
'types-jsonschema',
'types-requests',
'types-six'
]

DOC_REQUIRES = ['sphinx', 'sphinx-rtd-theme']
Expand Down
10 changes: 5 additions & 5 deletions src/confluent_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,19 @@ class ThrottleEvent(object):
:ivar float throttle_time: The amount of time (in seconds) the broker throttled (delayed) the request
"""

def __init__(self, broker_name,
broker_id,
throttle_time):
def __init__(self, broker_name: str,
broker_id: int,
throttle_time: float):
self.broker_name = broker_name
self.broker_id = broker_id
self.throttle_time = throttle_time

def __str__(self):
def __str__(self) -> str:
return "{}/{} throttled for {} ms".format(self.broker_name, self.broker_id,
int(self.throttle_time * 1000))


def _resolve_plugins(plugins):
def _resolve_plugins(plugins: str) -> str:
""" Resolve embedded plugins from the wheel's library directory.

For internal module use only.
Expand Down
111 changes: 60 additions & 51 deletions src/confluent_kafka/admin/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
"""
Kafka admin client: create, view, alter, and delete topics and resources.
"""
import concurrent.futures
from concurrent.futures import Future
from typing import Callable, Dict, List, Optional, Sequence, Tuple, Type, TypeVar

from pytest import Config

# Unused imports are keeped to be accessible using this public module
from ._config import (ConfigSource, # noqa: F401
Expand Down Expand Up @@ -45,7 +48,7 @@
RESOURCE_BROKER)


class AdminClient (_AdminClientImpl):
class AdminClient(_AdminClientImpl):
"""
AdminClient provides admin operations for Kafka brokers, topics, groups,
and other resource types supported by the broker.
Expand All @@ -67,7 +70,7 @@ class AdminClient (_AdminClientImpl):
Requires broker version v0.11.0.0 or later.
"""

def __init__(self, conf):
def __init__(self, conf: Dict):
"""
Create a new AdminClient using the provided configuration dictionary.

Expand All @@ -80,13 +83,14 @@ def __init__(self, conf):
super(AdminClient, self).__init__(conf)

@staticmethod
def _make_topics_result(f, futmap):
def _make_topics_result(f: Future, futmap: Dict[str, Future]) -> None:
"""
Map per-topic results to per-topic futures in futmap.
The result value of each (successful) future is None.
"""
try:
result = f.result()
assert isinstance(result, Dict)
for topic, error in result.items():
fut = futmap.get(topic, None)
if fut is None:
Expand All @@ -104,13 +108,14 @@ def _make_topics_result(f, futmap):
fut.set_exception(e)

@staticmethod
def _make_resource_result(f, futmap):
def _make_resource_result(f: Future, futmap: Dict[ConfigResource, Future]) -> None:
"""
Map per-resource results to per-resource futures in futmap.
The result value of each (successful) future is a ConfigResource.
"""
try:
result = f.result()
assert isinstance(result, Dict)
for resource, configs in result.items():
fut = futmap.get(resource, None)
if fut is None:
Expand All @@ -128,8 +133,10 @@ def _make_resource_result(f, futmap):
for resource, fut in futmap.items():
fut.set_exception(e)

_acl_type = TypeVar("_acl_type", bound=AclBinding)

@staticmethod
def _make_acls_result(f, futmap):
def _make_acls_result(f: Future, futmap: Dict[_acl_type, Future]) -> None:
"""
Map create ACL binding results to corresponding futures in futmap.
For create_acls the result value of each (successful) future is None.
Expand All @@ -155,14 +162,16 @@ def _make_acls_result(f, futmap):
fut.set_exception(e)

@staticmethod
def _create_future():
f = concurrent.futures.Future()
def _create_future() -> Future:
f: Future = Future()
if not f.set_running_or_notify_cancel():
raise RuntimeError("Future was cancelled prematurely")
return f

_futures_map_key = TypeVar("_futures_map_key")

@staticmethod
def _make_futures(futmap_keys, class_check, make_result_fn):
def _make_futures(futmap_keys: List[_futures_map_key], class_check: Optional[Type], make_result_fn: Callable[[Future, Dict[_futures_map_key, Future]], None]) -> Tuple[Future, Dict[_futures_map_key, Future]]:
"""
Create futures and a futuremap for the keys in futmap_keys,
and create a request-level future to be bassed to the C API.
Expand All @@ -182,10 +191,10 @@ def _make_futures(futmap_keys, class_check, make_result_fn):
return f, futmap

@staticmethod
def _has_duplicates(items):
def _has_duplicates(items: Sequence) -> bool:
return len(set(items)) != len(items)

def create_topics(self, new_topics, **kwargs):
def create_topics(self, new_topics: List[NewTopic], **kwargs: object) -> Dict[str, Future]: # type: ignore[override]
"""
Create one or more new topics.

Expand Down Expand Up @@ -219,7 +228,7 @@ def create_topics(self, new_topics, **kwargs):

return futmap

def delete_topics(self, topics, **kwargs):
def delete_topics(self, topics: List[str], **kwargs: object) -> Dict[str, Future]: # type: ignore[override]
"""
Delete one or more topics.

Expand Down Expand Up @@ -249,15 +258,15 @@ def delete_topics(self, topics, **kwargs):

return futmap

def list_topics(self, *args, **kwargs):
def list_topics(self, *args: object, **kwargs: object) -> object:

return super(AdminClient, self).list_topics(*args, **kwargs)

def list_groups(self, *args, **kwargs):
def list_groups(self, *args: object, **kwargs: object) -> object:

return super(AdminClient, self).list_groups(*args, **kwargs)

def create_partitions(self, new_partitions, **kwargs):
def create_partitions(self, new_partitions: List[NewPartitions], **kwargs: object) -> Dict[str, Future]: # type: ignore[override]
"""
Create additional partitions for the given topics.

Expand Down Expand Up @@ -290,7 +299,7 @@ def create_partitions(self, new_partitions, **kwargs):

return futmap

def describe_configs(self, resources, **kwargs):
def describe_configs(self, resources: List[ConfigResource], **kwargs: object) -> Dict[ConfigResource, Future]: # type: ignore[override]
"""
Get the configuration of the specified resources.

Expand Down Expand Up @@ -322,7 +331,7 @@ def describe_configs(self, resources, **kwargs):

return futmap

def alter_configs(self, resources, **kwargs):
def alter_configs(self, resources: List[ConfigResource], **kwargs: object) -> Dict[ConfigResource, Future]: # type: ignore[override]
"""
Update configuration properties for the specified resources.
Updates are not transactional so they may succeed for a subset
Expand Down Expand Up @@ -365,7 +374,7 @@ def alter_configs(self, resources, **kwargs):

return futmap

def create_acls(self, acls, **kwargs):
def create_acls(self, acls: List[AclBinding], **kwargs: object) -> Dict[AclBinding, Future]: # type: ignore[override]
"""
Create one or more ACL bindings.

Expand Down Expand Up @@ -394,7 +403,7 @@ def create_acls(self, acls, **kwargs):

return futmap

def describe_acls(self, acl_binding_filter, **kwargs):
def describe_acls(self, acl_binding_filter: List[AclBindingFilter], **kwargs: object) -> Future: # type: ignore[override]
"""
Match ACL bindings by filter.

Expand Down Expand Up @@ -429,7 +438,7 @@ def describe_acls(self, acl_binding_filter, **kwargs):

return f

def delete_acls(self, acl_binding_filters, **kwargs):
def delete_acls(self, acl_binding_filters: List[AclBindingFilter], **kwargs: object) -> Dict[AclBindingFilter, Future]: # type: ignore[override]
"""
Delete ACL bindings matching one or more ACL binding filters.

Expand Down Expand Up @@ -477,24 +486,24 @@ class ClusterMetadata (object):
This class is typically not user instantiated.
"""

def __init__(self):
def __init__(self) -> None:
self.cluster_id = None
"""Cluster id string, if supported by the broker, else None."""
self.controller_id = -1
self.controller_id: int = -1
"""Current controller broker id, or -1."""
self.brokers = {}
self.brokers: Dict[object, object] = {}
"""Map of brokers indexed by the broker id (int). Value is a BrokerMetadata object."""
self.topics = {}
self.topics: Dict[object, object] = {}
"""Map of topics indexed by the topic name. Value is a TopicMetadata object."""
self.orig_broker_id = -1
self.orig_broker_id: int = -1
"""The broker this metadata originated from."""
self.orig_broker_name = None
"""The broker name/address this metadata originated from."""

def __repr__(self):
def __repr__(self) -> str:
return "ClusterMetadata({})".format(self.cluster_id)

def __str__(self):
def __str__(self) -> str:
return str(self.cluster_id)


Expand All @@ -505,18 +514,18 @@ class BrokerMetadata (object):
This class is typically not user instantiated.
"""

def __init__(self):
def __init__(self) -> None:
self.id = -1
"""Broker id"""
self.host = None
"""Broker hostname"""
self.port = -1
"""Broker port"""

def __repr__(self):
def __repr__(self) -> str:
return "BrokerMetadata({}, {}:{})".format(self.id, self.host, self.port)

def __str__(self):
def __str__(self) -> str:
return "{}:{}/{}".format(self.host, self.port, self.id)


Expand All @@ -530,22 +539,22 @@ class TopicMetadata (object):
# Sphinx issue where it tries to reference the same instance variable
# on other classes which raises a warning/error.

def __init__(self):
self.topic = None
def __init__(self) -> None:
self.topic: Optional[str] = None
"""Topic name"""
self.partitions = {}
self.partitions: Dict[object, object] = {}
"""Map of partitions indexed by partition id. Value is a PartitionMetadata object."""
self.error = None
self.error: Optional[KafkaError] = None
"""Topic error, or None. Value is a KafkaError object."""

def __repr__(self):
def __repr__(self) -> str:
if self.error is not None:
return "TopicMetadata({}, {} partitions, {})".format(self.topic, len(self.partitions), self.error)
else:
return "TopicMetadata({}, {} partitions)".format(self.topic, len(self.partitions))

def __str__(self):
return self.topic
def __str__(self) -> str:
return str(self.topic)


class PartitionMetadata (object):
Expand All @@ -560,25 +569,25 @@ class PartitionMetadata (object):
of a broker id in the brokers dict.
"""

def __init__(self):
self.id = -1
def __init__(self) -> None:
self.id: int = -1
"""Partition id."""
self.leader = -1
self.leader: int = -1
"""Current leader broker for this partition, or -1."""
self.replicas = []
self.replicas: List[object] = []
"""List of replica broker ids for this partition."""
self.isrs = []
self.isrs: List[object] = []
"""List of in-sync-replica broker ids for this partition."""
self.error = None
"""Partition error, or None. Value is a KafkaError object."""

def __repr__(self):
def __repr__(self) -> str:
if self.error is not None:
return "PartitionMetadata({}, {})".format(self.id, self.error)
else:
return "PartitionMetadata({})".format(self.id)

def __str__(self):
def __str__(self) -> str:
return "{}".format(self.id)


Expand All @@ -591,7 +600,7 @@ class GroupMember(object):
This class is typically not user instantiated.
""" # noqa: E501

def __init__(self,):
def __init__(self) -> None:
self.id = None
"""Member id (generated by broker)."""
self.client_id = None
Expand All @@ -610,10 +619,10 @@ class GroupMetadata(object):
This class is typically not user instantiated.
"""

def __init__(self):
def __init__(self) -> None:
self.broker = None
"""Originating broker metadata."""
self.id = None
self.id: Optional[str] = None
"""Group name."""
self.error = None
"""Broker-originated error, or None. Value is a KafkaError object."""
Expand All @@ -623,14 +632,14 @@ def __init__(self):
"""Group protocol type."""
self.protocol = None
"""Group protocol."""
self.members = []
self.members: List[object] = []
"""Group members."""

def __repr__(self):
def __repr__(self) -> str:
if self.error is not None:
return "GroupMetadata({}, {})".format(self.id, self.error)
else:
return "GroupMetadata({})".format(self.id)

def __str__(self):
return self.id
def __str__(self) -> str:
return str(self.id)
Loading