Skip to content

Conversation

fangnx
Copy link
Member

@fangnx fangnx commented Sep 16, 2025

What

Add new unit and integration tests to async consumer:

  • test_async_consumer_joins_and_leaves_rebalance: Tests the complete lifecycle of consumers joining and leaving a consumer group, ensuring proper partition redistribution during rebalancing events
  • test_async_topic_partition_changes_rebalance: Tests dynamic partition addition to existing topics and verifies that consumers properly detect and rebalance across the new partitions
  • test_async_callback_exception_behavior: Tests how async consumer handles exceptions in callback (currently propagating the failure and failing the consumer). TODO: check whether this behavior is intended
  • TestAIOConsumer: new unit tests for error handling

Checklist

  • Contains customer facing changes? Including API/behavior changes
  • Did you add sufficient unit test and/or integration test coverage for this PR?
    • If not, please explain why it is not required

References

JIRA: https://confluentinc.atlassian.net/browse/NONJAVACLI-3988

Test & Review

Open questions / Follow-ups

@confluent-cla-assistant
Copy link

🎉 All Contributor License Agreements have been signed. Ready to merge.
Please push an empty commit if you would like to re-run the checks to verify CLA status for all contributors.

@sonarqube-confluent

This comment has been minimized.

@fangnx fangnx changed the title WIP: Add more async consumer integration tests Add more async consumer unit & integration tests Sep 16, 2025
@fangnx fangnx marked this pull request as ready for review September 16, 2025 21:42
@fangnx fangnx requested review from a team and MSeal as code owners September 16, 2025 21:42
Copy link
Contributor

@MSeal MSeal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor change request, glad to see wider edge case coverage

def add_partitions(self, topic_name, new_partition_count):
"""Add partitions to an existing topic"""
try:
from confluent_kafka.admin import AdminClient, NewPartitions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move imports to top, no need for JIT here

finally:
await consumer1.close()

asyncio.run(async_rebalance_test())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A quick look at ducktape looked like it doesn't support async test defs to avoid this wrapper. Maybe we could make an async supporting wrapper class that does this in the Metaclass? Could contribute it back to enable async tests in ducktape later on. Can be a follow-up task -- don't block this PR on it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes ducktape doesn't support it right now. Agreed - it's kinda verbose, and I can add a backlog ticket to work on after the main tasks are done

await consumer3.subscribe([topic_name], on_assign=track_rebalance)

# Poll all consumers until they detect new partitions and rebalance
for _ in range(15): # Max 15 seconds for partition discovery
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be longer given the consumers each poll as well for 1 second consecutively correct?


asyncio.run(async_topic_change_test())

# TODO: verify if the current behavior is correct/intended
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be surprised if it swallowed a callback error without reraising tbh

@fangnx fangnx requested a review from MSeal September 17, 2025 17:20
@sonarqube-confluent

This comment has been minimized.

@pytest.mark.asyncio
async def test_concurrent_operations_error_handling(self, mock_consumer, mock_common, basic_config):
"""Test concurrent async operations handle errors gracefully."""
# Mock: 2 poll calls fail, assignment succeeds
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this test verifies that consumer get KafkaError when assignment is empty?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think assign() and empty list (empty partitions) should cause consumer to return errors, unless I am mistaken @k-raina
I removed the assignment reference (since it's not doing much) in this test case to avoid confusion

from confluent_kafka.admin import AdminClient, NewPartitions

admin_client = AdminClient({'bootstrap.servers': self.bootstrap_servers_str})
metadata = admin_client.list_topics(timeout=10)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i see multiple instances of timeout=10. Maybe define it static ?

)

topic_name = f"test-{consumer_type}-consumer-topic"
def _run_consumer_performance_test(self, consumer_type, operation_type, batch_size=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe move these private helper functions to bottom of file?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do

@sonarqube-confluent

This comment has been minimized.

@sonarqube-confluent

This comment has been minimized.

@sonarqube-confluent

This comment has been minimized.

1 similar comment
@sonarqube-confluent
Copy link

Failed

  • 67.50% Coverage on New Code (is less than 80.00%)

Analysis Details

22 Issues

  • Bug 2 Bugs
  • Vulnerability 0 Vulnerabilities
  • Code Smell 20 Code Smells

Coverage and Duplications

  • Coverage 67.50% Coverage (66.10% Estimated after merge)
  • Duplications No duplication information (5.10% Estimated after merge)

Project ID: confluent-kafka-python

View in SonarQube

@fangnx fangnx merged commit c99d6bd into async Sep 22, 2025
1 of 3 checks passed
@fangnx fangnx deleted the consumer-more-tests branch September 22, 2025 22:49
MSeal added a commit that referenced this pull request Oct 2, 2025
* Asyncio Producer and Consumer implementation with same API as the sync ones * Add unit tests for asyncio producer + consumer (#2036) * update * update * fix producer and add consumer uts * refactor * rename * Add Benchmark Framework for ducktape (#2030) * Integrate Schema Registry with ducktape load tests (#2027) * basic sr test * more tests * update * update * lint * Add ducktape benchmark tests for consumer (sync + async) (#2045) * draft * update and cleanup * add perf comments, add batch_size param to consume test, lint fix * linter fix * Fix linter issues in consumer testing code (#2051) * Add remaining functions missing in async producer & consumer (#2050) * Add semaphore block for ducktape tests (#2037) * Add semaphore block for ducktape tests * Increase kafka start timeout * Increase kafka start timeout * Increase kafka start timeout * Add logs to debug pipeline * Start kafka in kraft mode * Fix directory failures * Fix directory failures * Fix directory failures * templatise path * Fix ductape run * Fix kafka broker listner * Fix ducktape version error * Cleanup * Fix bound voilation should fail tests * Now expand bounds for success * Add schema registry instance * Update Schema Registry hostname * Update Schema Registry hostname * Update Schema Registry hostname * Fix for linux CI environment * Address minor feedback * Fix semaphore * Minor fix after rebase * Add more async consumer unit & integration tests (#2052) * basic rebalance test * rebalance tests * refactor and linter fix * feebdack * refactor and cleanup * update * remove jit imports * Add produce batch api to producer (#2047) * Add integration tests for transactions (#2056) * add tests * cleanup and linter fix * remove jit import * refactor * cleanup * minot rlinter * Update AsyncIO producer architecture to improve performance (#2044) * Fix helper function name to avoid ducktape test discovery * Integrate schema registry with producer sync/async performance test + clean up the old SR test (#2063) * Add comprehensive producer benchmark tests with Schema Registry support - Updated message serialization to use comprehensive structure with all protobuf fields - Implemented proper strategy pattern for sync/async serializers - Added Schema Registry authentication configuration - Fixed JSON serialization issues (schema title, async serializer initialization) - Added performance validation with configurable JSON validation - Enhanced producer strategies with comprehensive Avro, JSON, and Protobuf support * remove * remove confusing msg * Minor: Producer close calls flush() (#2066) * Integrate schema registry with consumer sync/async performance test (#2059) * update * remove auth * cleanup and ensure same msg size * more cleanup * Add comprehensive producer benchmark tests with Schema Registry support - Updated message serialization to use comprehensive structure with all protobuf fields - Implemented proper strategy pattern for sync/async serializers - Added Schema Registry authentication configuration - Fixed JSON serialization issues (schema title, async serializer initialization) - Added performance validation with configurable JSON validation - Enhanced producer strategies with comprehensive Avro, JSON, and Protobuf support * update * Group messages by topic partition before passing to produce_batch API (#2069) * Merge master to async (#2068) * Pre release (#2067) * Attempting to add python versioning to read from project toml and setting beta flag * Updated docs to read project toml version as well * Updated to read from c file for now. Updaed docs and fixed bad AI code * NPI-7572: Add content for AsyncIO Python client (#2070) * Updates for AsyncIO and other improvements * Add updates based on asyncio blog * Add SR updates relatd to AsyncIO * Reorganize content, remove redundancy, and improve content * Edits to diagram and other content * Add why to use this client in both readme files * Improve CHANGELOG title * Add release dates to versions in CHANGELOG * Add release dates back to v2.4.0 * Edits based on feedback * AsyncIO: Only clear messages from buffer if executor passed (#2071) * Fix async producer transaction behavior + add transactional produce benchmark test (#2072) * update * linter fix * Fix the async transaction behavior related to flush() (#2073) * fix * linter * more linter fix * linter and add link * Removed very old librdkafka version checks * Resolved admin import conflict issue * Fix test_version unit test (#2079) * Fix broken tests (#2077) * fix tests * fix linter * Removed set operation from test --------- Co-authored-by: Matthew Seal <mseal@confluent.io> * Async fix buffer cleanup (#2078) * Fix buffer cleanup logic * Add tests * fix linter * Remove SR key * Removed incorrect assert * Change ducktape tests to install more dependencies * Fix semaphore for producer ducktape tests + clean up files that should've been removed (#2081) * update * use warning for producer validate * remove unnecessary assert --------- Co-authored-by: Emanuele Sabellico <esabellico@confluent.io> Co-authored-by: Matthew Seal <mseal@confluent.io> Co-authored-by: Kaushik Raina <103954755+k-raina@users.noreply.github.com> Co-authored-by: Matthew Seal <mseal007@gmail.com> Co-authored-by: Steve Bang <sbang@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

3 participants