Skip to content

Conversation

@lucasbru
Copy link
Member

@lucasbru lucasbru commented Oct 16, 2025

This adds code to pass committed partitions into ClassicGroup,
StreamsGroup and ConsumerGroup. These are not used in validation yet,
this is just preparation for later PRs.

The validation implementation in each group can return a validator,
that will be used to validate each partition in the offset commit.

We also add variations of streams groups tests for the tests in
OffsetMetadataManagerTest, which previously only tested the consumer
group case. Right now the commit logic is exactly the same, but we will
diverge in the following PRs.

Reviewers: David Jacot djacot@confluent.io

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds partitions to offset commit validation as preparation for future validation logic. The changes add a new parameter containing committed partitions to all group types (ClassicGroup, StreamsGroup, ConsumerGroup, and ShareGroup) without using them in validation yet.

  • Adds a Supplier<List<TopicIdPartition>> parameter to validateOffsetCommit methods across all group implementations
  • Updates all test calls to include the new partitions parameter with placeholder List::of suppliers
  • Adds comprehensive StreamsGroup test coverage equivalent to existing ConsumerGroup tests in OffsetMetadataManagerTest

Reviewed Changes

Copilot reviewed 12 out of 12 changed files in this pull request and generated no comments.

Show a summary per file
File Description
Group.java Updated interface to include topicIdPartitions parameter in validateOffsetCommit
ClassicGroup.java Added partitions parameter to validateOffsetCommit method with documentation
ConsumerGroup.java Added partitions parameter to validateOffsetCommit method with documentation
StreamsGroup.java Added partitions parameter to validateOffsetCommit method with documentation
ShareGroup.java Added partitions parameter to validateOffsetCommit method
OffsetMetadataManager.java Updated validation calls to supply TopicIdPartition collections from request data
GroupMetadataManager.java Changed getOrMaybeCreatePersistedStreamsGroup visibility from private to package-private
ClassicGroupTest.java Updated test calls to include partitions parameter
ConsumerGroupTest.java Updated test calls to include partitions parameter
StreamsGroupTest.java Updated test calls to include partitions parameter
ShareGroupTest.java Updated test call to include partitions parameter
OffsetMetadataManagerTest.java Added comprehensive StreamsGroup test coverage and extracted common verification methods

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.


@ParameterizedTest
@EnumSource(value = Group.GroupType.class, names = {"CLASSIC", "CONSUMER"})
@EnumSource(value = Group.GroupType.class, names = {"CLASSIC", "CONSUMER", "STREAMS"})
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some existing tests used this kind of parametrization. I used it as well in the test where we already had it, however, in the general case the setup was often a little different between the group types or existing parametrization made adopting this pattern painful, so I opted for a separate test method above but shared everything after the setup.

int apiVersion

int apiVersion,
Supplier<List<TopicIdPartition>> topicIdPartitions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether this is the right way to do it because it forces us to iterate on the topic-partition twice, once in validateOffsetCommit and once later on when we construct the records. Have you considered inlining the check in the later?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only going to be iterated twice if our member epoch is outdated, which should the minority of the cases. Wouldn't that be a minor performance concern?

Inlining the check later could work, but it does create a bit of complexity, since we have to go through 6 methods in the commit request handler

  • get the group (versions for non-txn vs. txn)
  • validate member id / epoch (versions for non-txn vs. txn) -- returns if member epochs match
  • validate assignment epoch -- shortcutted if member epoch match

I can give it a shot

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you have another look? I implemented inline validation. The downside is, that for every partition, we will have fetched the member identified by memberId from the group.

I added a partitionValidationRequired boolean to shortcut partition-level validation if we do not need it (in particular, if member epochs match).

I think the Group interface looks nicer, and there are no allocations, which is nice. Maybe the logic is a bit more complex now.

wdyt?

Comment on lines 623 to 629
group.validateOffsetCommitForPartition(
request.memberId(),
request.generationIdOrMemberEpoch(),
topic.name(),
topic.topicId(),
partition.partitionIndex()
);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be a bit inefficient because we have to lookup the member for every partition. I wonder whether we could do something as follow.

Let's define an OffsetPartitionValidator. We could find a better name.

OffsetPartitionValidator { void validate(name, topicId, partitionId) } 

At the group level, we can have a method which return the a concrete OffsetPartitionValidator depending on the member epoch / group type. If the epoch matches the expected epoch, the validator is basically a no op. Otherwise, we return one which does the partition level check while caching the reference to the member.

Group { OffsetPartitionValidator offsetValidator(memberId, memberEpoch) } 

This would make the validation logic in this class kind of agnostic of the two level validation.

Have you considered something like this? What do you think about it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had considered introducing a super-class of ModernGroupMember and StreamsGroupMember and return the member. But indeed your approach seems cleaner and easier to implement. I will revise the PR.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have updated the code with this suggestion. It's a good idea. I don't see anything wrong with it. Can you take another look?

This adds code to pass committed partitions into ClassicGroup, StreamsGroup and ConsumerGroup. These are not used in validation yet, this is just preparation for later PRs. The partitions are passed as a supplier, so that we only instantiate the collection if we are not trying to commit with the latest member epoch. We also add variations of streams groups tests for the tests in OffsetMetadataManagerTest, which previously only tested the consumer group case. Right now the commit logic is exactly the same, but we will diverge in the following PRs.
Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lucasbru Thanks for the update. I looks much better like this. I left few minor comments/questions.

TxnOffsetCommitRequestData request
) throws ApiException {
validateTransactionalOffsetCommit(context, request);
Group group = getGroupForTransactionalOffsetCommit(request);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't really need the group here. Have you considered doing it the other way around? By this, I mean having validateTransactionalOffsetCommit return the CommitPartitionValidator? It seems we could also do the same for the non-transactional path but we would need to re-schedule the heartbeat for classic groups in the validation method.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I considered this. It seemed we do not want to move the re-scheduling of the heartbeat into the validation method - seemed like this was intentionally kept outside. And then I kept the "getGroup" for non-txn case symmetric for consistency. But if we are fine with moving the heartbeat inside, let's do it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines +709 to +711
} catch (StaleMemberEpochException ex) {
throw Errors.ILLEGAL_GENERATION.exception();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you follow-up on my suggestion, I wonder if we could push this into the validator too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, keeping it this way for now

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@lucasbru lucasbru merged commit 69dd4c8 into apache:trunk Oct 17, 2025
26 checks passed
JimmyWang6 pushed a commit to JimmyWang6/jimmy-KAFKA-14048 that referenced this pull request Oct 19, 2025
…#20714) This adds code to pass committed partitions into ClassicGroup, StreamsGroup and ConsumerGroup. These are not used in validation yet, this is just preparation for later PRs. The validation implementation in each group can return a validator, that will be used to validate each partition in the offset commit. We also add variations of streams groups tests for the tests in OffsetMetadataManagerTest, which previously only tested the consumer group case. Right now the commit logic is exactly the same, but we will diverge in the following PRs. Reviewers: David Jacot <djacot@confluent.io>
eduwercamacaro pushed a commit to littlehorse-enterprises/kafka that referenced this pull request Nov 12, 2025
…#20714) This adds code to pass committed partitions into ClassicGroup, StreamsGroup and ConsumerGroup. These are not used in validation yet, this is just preparation for later PRs. The validation implementation in each group can return a validator, that will be used to validate each partition in the offset commit. We also add variations of streams groups tests for the tests in OffsetMetadataManagerTest, which previously only tested the consumer group case. Right now the commit logic is exactly the same, but we will diverge in the following PRs. Reviewers: David Jacot <djacot@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

2 participants