Mockafka-py is a Python library designed for in-memory mocking of Kafka.
- Compatible with confluent-kafka
- Compatible with aiokafka (async support)
- Supports Produce, Consume, and AdminClient operations with ease.
pip install mockafka-py # or using poetry poetry add mockafka-pyIn the following examples, we showcase the usage of multiple decorators to simulate different scenarios in a Mockafka environment. These scenarios include producing, consuming, and setting up Kafka topics using the provided decorators.
from mockafka import produce, consume @produce(topic='test', key='test_key', value='test_value', partition=4) @consume(topics=['test']) def test_produce_and_consume_decorator(message): """ This test showcases the usage of both @produce and @consume decorators in a single test case. It produces a message to the 'test' topic and then consumes it to perform further logic. # Notice you may get message None """ # Your test logic for processing the consumed message here if not message: return passfrom mockafka import produce @produce(topic='test', key='test_key', value='test_value', partition=4) @produce(topic='test', key='test_key1', value='test_value1', partition=0) def test_produce_twice(): # Your test logic here passfrom mockafka import bulk_produce, consume @bulk_produce(list_of_messages=sample_for_bulk_produce) @consume(topics=['test']) def test_bulk_produce_and_consume_decorator(message): """ This test showcases the usage of both @bulk_produce and @consume decorators in a single test case. It does bulk produces messages to the 'test' topic and then consumes them to perform further logic. """ # Your test logic for processing the consumed message here passfrom mockafka import setup_kafka, produce @setup_kafka(topics=[{"topic": "test_topic", "partition": 16}]) @produce(topic='test_topic', partition=5, key='test_', value='test_value1') def test_produce_with_kafka_setup_decorator(): # Your test logic here passfrom mockafka import setup_kafka, produce, consume @setup_kafka(topics=[{"topic": "test_topic", "partition": 16}]) @produce(topic='test_topic', partition=5, key='test_', value='test_value1') @produce(topic='test_topic', partition=5, key='test_', value='test_value1') @consume(topics=['test_topic']) def test_consumer_decorator(message: Message = None): if message is None: return # Your test logic for processing the consumed message here passfrom mockafka import FakeProducer, FakeConsumer, FakeAdminClientImpl from mockafka.admin_client import NewTopic from random import randint # Create topic admin = FakeAdminClientImpl() admin.create_topics([ NewTopic(topic='test', num_partitions=5) ]) # Produce messages producer = FakeProducer() for i in range(0, 10): producer.produce( topic='test', key=f'test_key{i}', value=f'test_value{i}', partition=randint(0, 4) ) # Subscribe consumer consumer = FakeConsumer() consumer.subscribe(topics=['test']) # Consume messages while True: message = consumer.poll() print(message) consumer.commit() if message is None: breakOutput:
""" <mockafka.message.Message object at 0x7fe84b4c3310> <mockafka.message.Message object at 0x7fe84b4c3370> <mockafka.message.Message object at 0x7fe84b4c33a0> <mockafka.message.Message object at 0x7fe84b4c33d0> <mockafka.message.Message object at 0x7fe84b4c3430> <mockafka.message.Message object at 0x7fe84b4c32e0> <mockafka.message.Message object at 0x7fe84b4c31f0> <mockafka.message.Message object at 0x7fe84b4c32b0> <mockafka.message.Message object at 0x7fe84b4c3400> <mockafka.message.Message object at 0x7fe84b4c3340> None """ import pytest from mockafka import aproduce, aconsume, asetup_kafka @pytest.mark.asyncio @asetup_kafka(topics=[{'topic': 'test_topic', 'partition': 16}], clean=True) @aproduce(topic='test_topic', value='test_value', key='test_key', partition=0) @aconsume(topics=['test_topic']) async def test_produce_and_consume_with_decorator(message=None): if not message: return assert message.key() == 'test_key' assert message.value() == 'test_value'import pytest from mockafka import aproduce, asetup_kafka from mockafka.aiokafka import FakeAIOKafkaConsumer @pytest.mark.asyncio @asetup_kafka(topics=[{'topic': 'test_topic', 'partition': 16}], clean=True) @aproduce(topic='test_topic', value='test_value', key='test_key', partition=0) async def test_produce_with_decorator(): consumer = FakeAIOKafkaConsumer() await consumer.start() consumer.subscribe(['test_topic']) message = await consumer.getone() assert message.key() == 'test_key' assert message.value() == 'test_value'