Skip to content

Commit 8d75d9a

Browse files
authored
KIP-345 Consumer group static membership (#2625)
1 parent c95dad9 commit 8d75d9a

File tree

15 files changed

+520
-174
lines changed

15 files changed

+520
-174
lines changed

kafka/admin/client.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from kafka.admin.acl_resource import ACLOperation, ACLPermissionType, ACLFilter, ACL, ResourcePattern, ResourceType, \
1414
ACLResourcePatternType
1515
from kafka.client_async import KafkaClient, selectors
16-
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment, ConsumerProtocol
16+
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0, ConsumerProtocol_v0
1717
import kafka.errors as Errors
1818
from kafka.errors import (
1919
IncompatibleBrokerVersion, KafkaConfigurationError, UnknownTopicOrPartitionError,
@@ -1242,7 +1242,7 @@ def _describe_consumer_groups_process_response(self, response):
12421242
for (described_group_information, group_information_name, group_information_field) in zip(described_group, described_groups_field_schema.names, described_groups_field_schema.fields):
12431243
if group_information_name == 'protocol_type':
12441244
protocol_type = described_group_information
1245-
protocol_type_is_consumer = (protocol_type == ConsumerProtocol.PROTOCOL_TYPE or not protocol_type)
1245+
protocol_type_is_consumer = (protocol_type == ConsumerProtocol_v0.PROTOCOL_TYPE or not protocol_type)
12461246
if isinstance(group_information_field, Array):
12471247
member_information_list = []
12481248
member_schema = group_information_field.array_of
@@ -1251,9 +1251,9 @@ def _describe_consumer_groups_process_response(self, response):
12511251
for (member, member_field, member_name) in zip(members, member_schema.fields, member_schema.names):
12521252
if protocol_type_is_consumer:
12531253
if member_name == 'member_metadata' and member:
1254-
member_information.append(ConsumerProtocolMemberMetadata.decode(member))
1254+
member_information.append(ConsumerProtocolMemberMetadata_v0.decode(member))
12551255
elif member_name == 'member_assignment' and member:
1256-
member_information.append(ConsumerProtocolMemberAssignment.decode(member))
1256+
member_information.append(ConsumerProtocolMemberAssignment_v0.decode(member))
12571257
else:
12581258
member_information.append(member)
12591259
member_info_tuple = MemberInformation._make(member_information)

kafka/consumer/group.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import copy
44
import logging
5+
import re
56
import socket
67
import time
78

@@ -57,6 +58,14 @@ class KafkaConsumer(six.Iterator):
5758
committing offsets. If None, auto-partition assignment (via
5859
group coordinator) and offset commits are disabled.
5960
Default: None
61+
group_instance_id (str): A unique identifier of the consumer instance
62+
provided by end user. Only non-empty strings are permitted. If set,
63+
the consumer is treated as a static member, which means that only
64+
one instance with this ID is allowed in the consumer group at any
65+
time. This can be used in combination with a larger session timeout
66+
to avoid group rebalances caused by transient unavailability (e.g.
67+
process restarts). If not set, the consumer will join the group as
68+
a dynamic member, which is the traditional behavior. Default: None
6069
key_deserializer (callable): Any callable that takes a
6170
raw message key and returns a deserialized key.
6271
value_deserializer (callable): Any callable that takes a
@@ -276,6 +285,7 @@ class KafkaConsumer(six.Iterator):
276285
'bootstrap_servers': 'localhost',
277286
'client_id': 'kafka-python-' + __version__,
278287
'group_id': None,
288+
'group_instance_id': None,
279289
'key_deserializer': None,
280290
'value_deserializer': None,
281291
'enable_incremental_fetch_sessions': True,
@@ -408,6 +418,10 @@ def __init__(self, *topics, **configs):
408418
"Request timeout (%s) must be larger than session timeout (%s)" %
409419
(self.config['request_timeout_ms'], self.config['session_timeout_ms']))
410420

421+
if self.config['group_instance_id'] is not None:
422+
if self.config['group_id'] is None:
423+
raise KafkaConfigurationError("group_instance_id requires group_id")
424+
411425
self._subscription = SubscriptionState(self.config['auto_offset_reset'])
412426
self._fetcher = Fetcher(
413427
self._client, self._subscription, metrics=self._metrics, **self.config)
@@ -423,6 +437,16 @@ def __init__(self, *topics, **configs):
423437
self._subscription.subscribe(topics=topics)
424438
self._client.set_topics(topics)
425439

440+
def _validate_group_instance_id(self, group_instance_id):
441+
if not group_instance_id or not isinstance(group_instance_id, str):
442+
raise KafkaConfigurationError("group_instance_id must be non-empty string")
443+
if group_instance_id in (".", ".."):
444+
raise KafkaConfigurationError("group_instance_id cannot be \".\" or \"..\"")
445+
if len(group_instance_id) > 249:
446+
raise KafkaConfigurationError("group_instance_id can't be longer than 249 characters")
447+
if not re.match(r'^[A-Za-z0-9\.\_\-]+$', group_instance_id):
448+
raise KafkaConfigurationError("group_instance_id is illegal: it contains a character other than ASCII alphanumerics, '.', '_' and '-'")
449+
426450
def bootstrap_connected(self):
427451
"""Return True if the bootstrap is connected."""
428452
return self._client.bootstrap_connected()

kafka/coordinator/assignors/abstract.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,9 @@ def assign(self, cluster, members):
2323
2424
Arguments:
2525
cluster (ClusterMetadata): metadata for use in assignment
26-
members (dict of {member_id: MemberMetadata}): decoded metadata for
27-
each member in the group.
26+
members (dict of {member_id: Subscription}): decoded metadata
27+
for each member in the group, including group_instance_id
28+
when available.
2829
2930
Returns:
3031
dict: {member_id: MemberAssignment}

kafka/coordinator/assignors/range.py

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
from __future__ import absolute_import
22

33
import collections
4+
import itertools
45
import logging
56

67
from kafka.vendor import six
78

89
from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
9-
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
10+
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0
1011

1112
log = logging.getLogger(__name__)
1213

@@ -32,45 +33,49 @@ class RangePartitionAssignor(AbstractPartitionAssignor):
3233
version = 0
3334

3435
@classmethod
35-
def assign(cls, cluster, member_metadata):
36+
def assign(cls, cluster, group_subscriptions):
3637
consumers_per_topic = collections.defaultdict(list)
37-
for member, metadata in six.iteritems(member_metadata):
38-
for topic in metadata.subscription:
39-
consumers_per_topic[topic].append(member)
38+
for member_id, subscription in six.iteritems(group_subscriptions):
39+
for topic in subscription.topics:
40+
consumers_per_topic[topic].append((subscription.group_instance_id, member_id))
4041

4142
# construct {member_id: {topic: [partition, ...]}}
4243
assignment = collections.defaultdict(dict)
4344

45+
for topic in consumers_per_topic:
46+
# group by static members (True) v dynamic members (False)
47+
grouped = {k: list(g) for k, g in itertools.groupby(consumers_per_topic[topic], key=lambda ids: ids[0] is not None)}
48+
consumers_per_topic[topic] = sorted(grouped.get(True, [])) + sorted(grouped.get(False, [])) # sorted static members first, then sorted dynamic
49+
4450
for topic, consumers_for_topic in six.iteritems(consumers_per_topic):
4551
partitions = cluster.partitions_for_topic(topic)
4652
if partitions is None:
4753
log.warning('No partition metadata for topic %s', topic)
4854
continue
4955
partitions = sorted(partitions)
50-
consumers_for_topic.sort()
5156

5257
partitions_per_consumer = len(partitions) // len(consumers_for_topic)
5358
consumers_with_extra = len(partitions) % len(consumers_for_topic)
5459

55-
for i, member in enumerate(consumers_for_topic):
60+
for i, (_group_instance_id, member_id) in enumerate(consumers_for_topic):
5661
start = partitions_per_consumer * i
5762
start += min(i, consumers_with_extra)
5863
length = partitions_per_consumer
5964
if not i + 1 > consumers_with_extra:
6065
length += 1
61-
assignment[member][topic] = partitions[start:start+length]
66+
assignment[member_id][topic] = partitions[start:start+length]
6267

6368
protocol_assignment = {}
64-
for member_id in member_metadata:
65-
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment(
69+
for member_id in group_subscriptions:
70+
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment_v0(
6671
cls.version,
6772
sorted(assignment[member_id].items()),
6873
b'')
6974
return protocol_assignment
7075

7176
@classmethod
7277
def metadata(cls, topics):
73-
return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'')
78+
return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), b'')
7479

7580
@classmethod
7681
def on_assignment(cls, assignment):

kafka/coordinator/assignors/roundrobin.py

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from kafka.vendor import six
88

99
from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
10-
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
10+
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0
1111
from kafka.structs import TopicPartition
1212

1313
log = logging.getLogger(__name__)
@@ -49,10 +49,10 @@ class RoundRobinPartitionAssignor(AbstractPartitionAssignor):
4949
version = 0
5050

5151
@classmethod
52-
def assign(cls, cluster, member_metadata):
52+
def assign(cls, cluster, group_subscriptions):
5353
all_topics = set()
54-
for metadata in six.itervalues(member_metadata):
55-
all_topics.update(metadata.subscription)
54+
for subscription in six.itervalues(group_subscriptions):
55+
all_topics.update(subscription.topics)
5656

5757
all_topic_partitions = []
5858
for topic in all_topics:
@@ -67,29 +67,34 @@ def assign(cls, cluster, member_metadata):
6767
# construct {member_id: {topic: [partition, ...]}}
6868
assignment = collections.defaultdict(lambda: collections.defaultdict(list))
6969

70-
member_iter = itertools.cycle(sorted(member_metadata.keys()))
70+
# Sort static and dynamic members separately to maintain stable static assignments
71+
ungrouped = [(subscription.group_instance_id, member_id) for member_id, subscription in six.iteritems(group_subscriptions)]
72+
grouped = {k: list(g) for k, g in itertools.groupby(ungrouped, key=lambda ids: ids[0] is not None)}
73+
member_list = sorted(grouped.get(True, [])) + sorted(grouped.get(False, [])) # sorted static members first, then sorted dynamic
74+
member_iter = itertools.cycle(member_list)
75+
7176
for partition in all_topic_partitions:
72-
member_id = next(member_iter)
77+
_group_instance_id, member_id = next(member_iter)
7378

7479
# Because we constructed all_topic_partitions from the set of
7580
# member subscribed topics, we should be safe assuming that
7681
# each topic in all_topic_partitions is in at least one member
7782
# subscription; otherwise this could yield an infinite loop
78-
while partition.topic not in member_metadata[member_id].subscription:
83+
while partition.topic not in group_subscriptions[member_id].topics:
7984
member_id = next(member_iter)
8085
assignment[member_id][partition.topic].append(partition.partition)
8186

8287
protocol_assignment = {}
83-
for member_id in member_metadata:
84-
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment(
88+
for member_id in group_subscriptions:
89+
protocol_assignment[member_id] = ConsumerProtocolMemberAssignment_v0(
8590
cls.version,
8691
sorted(assignment[member_id].items()),
8792
b'')
8893
return protocol_assignment
8994

9095
@classmethod
9196
def metadata(cls, topics):
92-
return ConsumerProtocolMemberMetadata(cls.version, list(topics), b'')
97+
return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), b'')
9398

9499
@classmethod
95100
def on_assignment(cls, assignment):

kafka/coordinator/assignors/sticky/sticky_assignor.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
66
from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements
77
from kafka.coordinator.assignors.sticky.sorted_set import SortedSet
8-
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata, ConsumerProtocolMemberAssignment
8+
from kafka.coordinator.protocol import ConsumerProtocolMemberMetadata_v0, ConsumerProtocolMemberAssignment_v0
99
from kafka.coordinator.protocol import Schema
1010
from kafka.protocol.struct import Struct
1111
from kafka.protocol.types import String, Array, Int32
@@ -66,6 +66,7 @@ class StickyAssignorUserDataV1(Struct):
6666

6767
class StickyAssignmentExecutor:
6868
def __init__(self, cluster, members):
69+
# a mapping of member_id => StickyAssignorMemberMetadataV1
6970
self.members = members
7071
# a mapping between consumers and their assigned partitions that is updated during assignment procedure
7172
self.current_assignment = defaultdict(list)
@@ -603,7 +604,7 @@ def assign(cls, cluster, members):
603604

604605
assignment = {}
605606
for member_id in members:
606-
assignment[member_id] = ConsumerProtocolMemberAssignment(
607+
assignment[member_id] = ConsumerProtocolMemberAssignment_v0(
607608
cls.version, sorted(executor.get_final_assignment(member_id)), b''
608609
)
609610
return assignment
@@ -625,24 +626,24 @@ def parse_member_metadata(cls, metadata):
625626
user_data = metadata.user_data
626627
if not user_data:
627628
return StickyAssignorMemberMetadataV1(
628-
partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.subscription
629+
partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.topics
629630
)
630631

631632
try:
632633
decoded_user_data = StickyAssignorUserDataV1.decode(user_data)
633-
except Exception as e:
634+
except Exception:
634635
# ignore the consumer's previous assignment if it cannot be parsed
635-
log.error("Could not parse member data", e) # pylint: disable=logging-too-many-args
636+
log.exception("Could not parse member data")
636637
return StickyAssignorMemberMetadataV1(
637-
partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.subscription
638+
partitions=[], generation=cls.DEFAULT_GENERATION_ID, subscription=metadata.topics
638639
)
639640

640641
member_partitions = []
641642
for topic, partitions in decoded_user_data.previous_assignment: # pylint: disable=no-member
642643
member_partitions.extend([TopicPartition(topic, partition) for partition in partitions])
643644
return StickyAssignorMemberMetadataV1(
644645
# pylint: disable=no-member
645-
partitions=member_partitions, generation=decoded_user_data.generation, subscription=metadata.subscription
646+
partitions=member_partitions, generation=decoded_user_data.generation, subscription=metadata.topics
646647
)
647648

648649
@classmethod
@@ -661,7 +662,7 @@ def _metadata(cls, topics, member_assignment_partitions, generation=-1):
661662
partitions_by_topic[topic_partition.topic].append(topic_partition.partition)
662663
data = StickyAssignorUserDataV1(list(partitions_by_topic.items()), generation)
663664
user_data = data.encode()
664-
return ConsumerProtocolMemberMetadata(cls.version, list(topics), user_data)
665+
return ConsumerProtocolMemberMetadata_v0(cls.version, list(topics), user_data)
665666

666667
@classmethod
667668
def on_assignment(cls, assignment):

0 commit comments

Comments
 (0)