Skip to content

Commit be5bc1d

Browse files
authored
[KIP-848] Added online upgrade and downgrade test (#2012)
1 parent c4048a5 commit be5bc1d

11 files changed

+166
-19
lines changed

tests/common/__init__.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -55,22 +55,28 @@ def use_group_protocol_consumer():
5555

5656
@staticmethod
5757
def update_conf_group_protocol(conf=None):
58-
if conf is not None and 'group.id' in conf and TestUtils.use_group_protocol_consumer():
58+
if TestUtils.can_upgrade_group_protocol_to_consumer(conf):
5959
conf['group.protocol'] = 'consumer'
6060

61+
@staticmethod
62+
def can_upgrade_group_protocol_to_consumer(conf):
63+
return (conf is not None and 'group.id' in conf and
64+
'group.protocol' not in conf and TestUtils.use_group_protocol_consumer())
65+
6166
@staticmethod
6267
def remove_forbidden_conf_group_protocol_consumer(conf):
63-
if conf is None:
68+
if (conf is None or
69+
not TestUtils.use_group_protocol_consumer() or
70+
conf.get('group.protocol', 'consumer') != 'consumer'):
6471
return
65-
if TestUtils.use_group_protocol_consumer():
66-
forbidden_conf_properties = ["session.timeout.ms",
67-
"partition.assignment.strategy",
68-
"heartbeat.interval.ms",
69-
"group.protocol.type"]
70-
for prop in forbidden_conf_properties:
71-
if prop in conf:
72-
print("Skipping setting forbidden configuration {prop} for `CONSUMER` protocol")
73-
del conf[prop]
72+
forbidden_conf_properties = ["session.timeout.ms",
73+
"partition.assignment.strategy",
74+
"heartbeat.interval.ms",
75+
"group.protocol.type"]
76+
for prop in forbidden_conf_properties:
77+
if prop in conf:
78+
print(f"Skipping setting forbidden configuration {prop} for `CONSUMER` protocol")
79+
del conf[prop]
7480

7581

7682
class TestConsumer(Consumer):

tests/integration/cluster_fixture.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,19 @@ def create_topic(self, prefix, conf=None, **create_topic_kwargs):
234234
future_topic.get(name).result()
235235
return name
236236

237+
def delete_topic(self, topic):
238+
"""
239+
Deletes a topic with this cluster.
240+
241+
:param str topic: topic name
242+
"""
243+
future = self.admin().delete_topics([topic])
244+
try:
245+
future.get(topic).result()
246+
print("Topic {} deleted".format(topic))
247+
except Exception as e:
248+
print("Failed to delete topic {}: {}".format(topic, e))
249+
237250
def create_topic_and_wait_propogation(self, prefix, conf=None, **create_topic_kwargs):
238251
"""
239252
Creates a new topic with this cluster. Wait for the topic to be propogated to all brokers.

tests/integration/consumer/test_consumer_error.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# distributed under the License is distributed on an "AS IS" BASIS,
1414
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1515
# See the License for the specific language governing permissions and
16-
# limit
16+
# limitations under the License.
1717
#
1818

1919
import pytest

tests/integration/consumer/test_consumer_memberid.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# distributed under the License is distributed on an "AS IS" BASIS,
1414
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1515
# See the License for the specific language governing permissions and
16-
# limit
16+
# limitations under the License.
1717

1818
import pytest
1919
from tests.common import TestUtils

tests/integration/consumer/test_consumer_topicpartition_metadata.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# distributed under the License is distributed on an "AS IS" BASIS,
1414
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1515
# See the License for the specific language governing permissions and
16-
# limit
16+
# limitations under the License.
1717

1818
from confluent_kafka import TopicPartition
1919

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Copyright 2025 Confluent Inc.
5+
#
6+
# Licensed under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License.
8+
# You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
18+
import pytest
19+
from enum import Enum
20+
from confluent_kafka import ConsumerGroupType, KafkaException
21+
from tests.common import TestUtils
22+
23+
topic_prefix = "test_consumer_upgrade_downgrade_"
24+
number_of_partitions = 10
25+
26+
27+
def get_group_protocol_type(a, group_id):
28+
futureMap = a.describe_consumer_groups([group_id])
29+
try:
30+
future = futureMap[group_id]
31+
g = future.result()
32+
return g.type
33+
except KafkaException as e:
34+
print("Error while describing group id '{}': {}".format(group_id, e))
35+
except Exception:
36+
raise
37+
38+
39+
def check_consumer(kafka_cluster, consumers, admin_client, group_id, topic, expected_protocol):
40+
no_of_messages = 100
41+
total_msg_read = 0
42+
expected_partitions_per_consumer = number_of_partitions // len(consumers)
43+
while len(consumers[-1].assignment()) != expected_partitions_per_consumer:
44+
for consumer in consumers:
45+
consumer.poll(0.1)
46+
47+
all_assignments = set()
48+
for consumer in consumers:
49+
assignment = consumer.assignment()
50+
all_assignments.update(assignment)
51+
assert len(assignment) == expected_partitions_per_consumer
52+
assert len(all_assignments) == number_of_partitions
53+
54+
assert get_group_protocol_type(admin_client, group_id) == expected_protocol
55+
56+
# Produce some messages to the topic
57+
test_data = ['test-data{}'.format(i) for i in range(0, no_of_messages)]
58+
test_keys = ['test-key{}'.format(i) for i in range(0, no_of_messages)] # we want each partition to have data
59+
kafka_cluster.seed_topic(topic, test_data, test_keys)
60+
61+
while total_msg_read < no_of_messages:
62+
for consumer in consumers:
63+
# Poll for messages
64+
msg = consumer.poll(0.1)
65+
if msg is not None:
66+
total_msg_read += 1
67+
68+
assert total_msg_read == no_of_messages, f"Expected to read {no_of_messages} messages, but read {total_msg_read}"
69+
70+
71+
class Operation(Enum):
72+
ADD = 0
73+
REMOVE = 1
74+
75+
76+
def perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(
77+
kafka_cluster, partition_assignment_strategy):
78+
"""
79+
Test consumer upgrade and downgrade.
80+
"""
81+
topic_name_prefix = f"{topic_prefix}_{partition_assignment_strategy}"
82+
topic = kafka_cluster.create_topic_and_wait_propogation(topic_name_prefix,
83+
{
84+
"num_partitions": number_of_partitions
85+
})
86+
admin_client = kafka_cluster.admin()
87+
88+
consumer_conf = {'group.id': topic,
89+
'auto.offset.reset': 'earliest'}
90+
consumer_conf_classic = {
91+
'group.protocol': 'classic',
92+
'partition.assignment.strategy': partition_assignment_strategy,
93+
**consumer_conf
94+
}
95+
consumer_conf_consumer = {
96+
'group.protocol': 'consumer',
97+
**consumer_conf
98+
}
99+
100+
test_scenarios = [(Operation.ADD, consumer_conf_classic, ConsumerGroupType.CLASSIC),
101+
(Operation.ADD, consumer_conf_consumer, ConsumerGroupType.CONSUMER),
102+
(Operation.REMOVE, None, ConsumerGroupType.CONSUMER),
103+
(Operation.ADD, consumer_conf_classic, ConsumerGroupType.CONSUMER),
104+
(Operation.REMOVE, None, ConsumerGroupType.CLASSIC)]
105+
consumers = []
106+
107+
for operation, conf, expected_protocol in test_scenarios:
108+
if operation == Operation.ADD:
109+
consumer = kafka_cluster.consumer(conf)
110+
assert consumer is not None
111+
consumer.subscribe([topic])
112+
consumers.append(consumer)
113+
elif operation == Operation.REMOVE:
114+
consumer_to_remove = consumers.pop(0)
115+
consumer_to_remove.close()
116+
check_consumer(kafka_cluster, consumers, admin_client, topic, topic, expected_protocol)
117+
118+
assert len(consumers) == 1
119+
consumers[0].close()
120+
kafka_cluster.delete_topic(topic)
121+
122+
123+
@pytest.mark.skipif(not TestUtils.use_group_protocol_consumer(),
124+
reason="Skipping test as group protocol consumer is not enabled")
125+
def test_consumer_upgrade_downgrade(kafka_cluster):
126+
perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(kafka_cluster, 'roundrobin')
127+
perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(kafka_cluster, 'range')
128+
perform_consumer_upgrade_downgrade_test_with_partition_assignment_strategy(kafka_cluster, 'cooperative-sticky')

tests/integration/consumer/test_cooperative_rebalance_1.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# distributed under the License is distributed on an "AS IS" BASIS,
1414
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1515
# See the License for the specific language governing permissions and
16-
# limit
16+
# limitations under the License.
1717

1818
import pytest
1919
import time

tests/integration/consumer/test_cooperative_rebalance_2.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# distributed under the License is distributed on an "AS IS" BASIS,
1414
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1515
# See the License for the specific language governing permissions and
16-
# limit
16+
# limitations under the License.
1717

1818
import pytest
1919
import time

tests/integration/consumer/test_incremental_assign.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# distributed under the License is distributed on an "AS IS" BASIS,
1414
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1515
# See the License for the specific language governing permissions and
16-
# limit
16+
# limitations under the License.
1717

1818
import pytest
1919
from uuid import uuid1

tests/integration/producer/test_transactions.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
# distributed under the License is distributed on an "AS IS" BASIS,
1414
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1515
# See the License for the specific language governing permissions and
16-
# limit
16+
# limitations under the License.
1717
#
1818
import inspect
1919
import sys

0 commit comments

Comments
 (0)