- Notifications
You must be signed in to change notification settings - Fork 14.8k
KAFKA-19779: Add partitions to offset commit validation [1/N] #20714
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds partitions to offset commit validation as preparation for future validation logic. The changes add a new parameter containing committed partitions to all group types (ClassicGroup, StreamsGroup, ConsumerGroup, and ShareGroup) without using them in validation yet.
- Adds a
Supplier<List<TopicIdPartition>>parameter tovalidateOffsetCommitmethods across all group implementations - Updates all test calls to include the new partitions parameter with placeholder
List::ofsuppliers - Adds comprehensive StreamsGroup test coverage equivalent to existing ConsumerGroup tests in OffsetMetadataManagerTest
Reviewed Changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated no comments.
Show a summary per file
| File | Description |
|---|---|
| Group.java | Updated interface to include topicIdPartitions parameter in validateOffsetCommit |
| ClassicGroup.java | Added partitions parameter to validateOffsetCommit method with documentation |
| ConsumerGroup.java | Added partitions parameter to validateOffsetCommit method with documentation |
| StreamsGroup.java | Added partitions parameter to validateOffsetCommit method with documentation |
| ShareGroup.java | Added partitions parameter to validateOffsetCommit method |
| OffsetMetadataManager.java | Updated validation calls to supply TopicIdPartition collections from request data |
| GroupMetadataManager.java | Changed getOrMaybeCreatePersistedStreamsGroup visibility from private to package-private |
| ClassicGroupTest.java | Updated test calls to include partitions parameter |
| ConsumerGroupTest.java | Updated test calls to include partitions parameter |
| StreamsGroupTest.java | Updated test calls to include partitions parameter |
| ShareGroupTest.java | Updated test call to include partitions parameter |
| OffsetMetadataManagerTest.java | Added comprehensive StreamsGroup test coverage and extracted common verification methods |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| | ||
| @ParameterizedTest | ||
| @EnumSource(value = Group.GroupType.class, names = {"CLASSIC", "CONSUMER"}) | ||
| @EnumSource(value = Group.GroupType.class, names = {"CLASSIC", "CONSUMER", "STREAMS"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some existing tests used this kind of parametrization. I used it as well in the test where we already had it, however, in the general case the setup was often a little different between the group types or existing parametrization made adopting this pattern painful, so I opted for a separate test method above but shared everything after the setup.
| int apiVersion | ||
| | ||
| int apiVersion, | ||
| Supplier<List<TopicIdPartition>> topicIdPartitions |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether this is the right way to do it because it forces us to iterate on the topic-partition twice, once in validateOffsetCommit and once later on when we construct the records. Have you considered inlining the check in the later?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's only going to be iterated twice if our member epoch is outdated, which should the minority of the cases. Wouldn't that be a minor performance concern?
Inlining the check later could work, but it does create a bit of complexity, since we have to go through 6 methods in the commit request handler
- get the group (versions for non-txn vs. txn)
- validate member id / epoch (versions for non-txn vs. txn) -- returns if member epochs match
- validate assignment epoch -- shortcutted if member epoch match
I can give it a shot
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you have another look? I implemented inline validation. The downside is, that for every partition, we will have fetched the member identified by memberId from the group.
I added a partitionValidationRequired boolean to shortcut partition-level validation if we do not need it (in particular, if member epochs match).
I think the Group interface looks nicer, and there are no allocations, which is nice. Maybe the logic is a bit more complex now.
wdyt?
| group.validateOffsetCommitForPartition( | ||
| request.memberId(), | ||
| request.generationIdOrMemberEpoch(), | ||
| topic.name(), | ||
| topic.topicId(), | ||
| partition.partitionIndex() | ||
| ); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This may be a bit inefficient because we have to lookup the member for every partition. I wonder whether we could do something as follow.
Let's define an OffsetPartitionValidator. We could find a better name.
OffsetPartitionValidator { void validate(name, topicId, partitionId) } At the group level, we can have a method which return the a concrete OffsetPartitionValidator depending on the member epoch / group type. If the epoch matches the expected epoch, the validator is basically a no op. Otherwise, we return one which does the partition level check while caching the reference to the member.
Group { OffsetPartitionValidator offsetValidator(memberId, memberEpoch) } This would make the validation logic in this class kind of agnostic of the two level validation.
Have you considered something like this? What do you think about it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had considered introducing a super-class of ModernGroupMember and StreamsGroupMember and return the member. But indeed your approach seems cleaner and easier to implement. I will revise the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have updated the code with this suggestion. It's a good idea. I don't see anything wrong with it. Can you take another look?
This adds code to pass committed partitions into ClassicGroup, StreamsGroup and ConsumerGroup. These are not used in validation yet, this is just preparation for later PRs. The partitions are passed as a supplier, so that we only instantiate the collection if we are not trying to commit with the latest member epoch. We also add variations of streams groups tests for the tests in OffsetMetadataManagerTest, which previously only tested the consumer group case. Right now the commit logic is exactly the same, but we will diverge in the following PRs.
22f7ccb to 12dd2de Compare
dajac left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lucasbru Thanks for the update. I looks much better like this. I left few minor comments/questions.
| TxnOffsetCommitRequestData request | ||
| ) throws ApiException { | ||
| validateTransactionalOffsetCommit(context, request); | ||
| Group group = getGroupForTransactionalOffsetCommit(request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't really need the group here. Have you considered doing it the other way around? By this, I mean having validateTransactionalOffsetCommit return the CommitPartitionValidator? It seems we could also do the same for the non-transactional path but we would need to re-schedule the heartbeat for classic groups in the validation method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I considered this. It seemed we do not want to move the re-scheduling of the heartbeat into the validation method - seemed like this was intentionally kept outside. And then I kept the "getGroup" for non-txn case symmetric for consistency. But if we are fine with moving the heartbeat inside, let's do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
| } catch (StaleMemberEpochException ex) { | ||
| throw Errors.ILLEGAL_GENERATION.exception(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you follow-up on my suggestion, I wonder if we could push this into the validator too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed, keeping it this way for now
dajac left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
…#20714) This adds code to pass committed partitions into ClassicGroup, StreamsGroup and ConsumerGroup. These are not used in validation yet, this is just preparation for later PRs. The validation implementation in each group can return a validator, that will be used to validate each partition in the offset commit. We also add variations of streams groups tests for the tests in OffsetMetadataManagerTest, which previously only tested the consumer group case. Right now the commit logic is exactly the same, but we will diverge in the following PRs. Reviewers: David Jacot <djacot@confluent.io>
…#20714) This adds code to pass committed partitions into ClassicGroup, StreamsGroup and ConsumerGroup. These are not used in validation yet, this is just preparation for later PRs. The validation implementation in each group can return a validator, that will be used to validate each partition in the offset commit. We also add variations of streams groups tests for the tests in OffsetMetadataManagerTest, which previously only tested the consumer group case. Right now the commit logic is exactly the same, but we will diverge in the following PRs. Reviewers: David Jacot <djacot@confluent.io>
This adds code to pass committed partitions into ClassicGroup,
StreamsGroup and ConsumerGroup. These are not used in validation yet,
this is just preparation for later PRs.
The validation implementation in each group can return a validator,
that will be used to validate each partition in the offset commit.
We also add variations of streams groups tests for the tests in
OffsetMetadataManagerTest, which previously only tested the consumer
group case. Right now the commit logic is exactly the same, but we will
diverge in the following PRs.
Reviewers: David Jacot djacot@confluent.io