DEV Community

Amzar
Amzar

Posted on

Basic Kafka Stream in Python

Introduction

Kafka is an open-source distributed streaming platform developed by the Apache Software Foundation. It is designed to handle large volumes of real-time data streams and is used for building real-time data pipelines and streaming applications.

For instance, if you would like to have a real-time event log fetch from the source (Twitter, etc) and insert it into the target (CSV, SQLite, etc), you can leverage the Kafka library in Python. So, you need to know what are producer and consumer in Kafka.

Producer and Consumer

Producer & Consumer in Kafka

  1. Producer is a program or application that sends data or messages to a Kafka topic.
  2. Consumer is a program or application that reads data or messages from a Kafka topic.

How to setup Kafka in Python?

Now, let's start with requirements to configure Kafka in Python.

Requirements

Kafka server

You can start with local installation or AWS and GCP for advanced method). In this tutorial, I am launching Kafka server using Docker Compose.

Assume Docker is installed and running or run docker --version to confirm

Create a docker compose file (docker-compose.yaml) and copy below snippet.

version: '2' services: zookeeper: image: confluentinc/cp-zookeeper:latest environment: ZOOKEEPER_CLIENT_PORT: 2181 ZOOKEEPER_TICK_TIME: 2000 ports: - 22181:2181 kafka: image: confluentinc/cp-kafka:latest depends_on: - zookeeper ports: - 29092:29092 environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 
Enter fullscreen mode Exit fullscreen mode

Then, we could build the Docker
docker-compose up --build --force-recreate -d

To confirm the containers are up, we need to check the status.

Docker Desktop
Docker container status

Command

╰─➤ docker-compose ps Name Command State Ports --------------------------------------------------------------------------------------------------- kafka_kafka_1 /etc/confluent/docker/run Up 0.0.0.0:29092->29092/tcp, 9092/tcp kafka_zookeeper_1 /etc/confluent/docker/run Up 0.0.0.0:22181->2181/tcp, 2888/tcp, 3888/tcp 
Enter fullscreen mode Exit fullscreen mode

Okay, now it shows the container is successfully built and running. Next step, how to create a topic and send the message?

Kafka producer (Python)

Create a topic

from confluent_kafka.admin import AdminClient, NewTopic admin_client = AdminClient({"bootstrap.servers": "localhost:29092"}) topic_name = "my_first_topic" num_partitions = 3 new_topic = NewTopic(topic_name, num_partitions) try: admin_client.create_topics([new_topic]) print(f"{topic_name} is created!!") except: print(f"{topic_name} is not created!!") 
Enter fullscreen mode Exit fullscreen mode

I am using confluent_kafka to integrate with Kafka server.

Also, we can define the number of partitions for the topic. Important to know that number of partitions cannot be changed after the topic has been created.

If you have a topic with 10 partitions, you can have up to 10 consumers processing messages from that topic in parallel, each handling messages from its assigned partition.

╰─➤ python kafka_topic.py my_first_topic is created!! 
Enter fullscreen mode Exit fullscreen mode

Okay, the topic has been created!

Note: If you first forgot to create a topic, no worries. It should create when you post the message to the new topic.

Post a message

import json import uuid from confluent_kafka import Producer def post_message(producer, data): try: producer.produce("my_first_topic", json.dumps(data).encode("utf-8")) producer.flush() print(f"message posted!! --> {data['comment']}") except: print("failed to post message") data = { "user_session_id": str(uuid.uuid4()), "user_name": "John Doe", "comment": "Malaysia Boleh!", } producer = Producer({"bootstrap.servers": "localhost:29092"}) post_message(producer, data) 
Enter fullscreen mode Exit fullscreen mode

When you run the Python script, it could post the message

╰─➤ python producer.py message posted!! --> Malaysia Boleh! 
Enter fullscreen mode Exit fullscreen mode

Kafka consumer (Python)

Okay, let's move to the consumer to check whether the data is really posted.

from confluent_kafka import Consumer consumer = Consumer( { "bootstrap.servers": "localhost:29092", "group.id": "python-consumer", "auto.offset.reset": "earliest", } ) consumer.subscribe(["my_first_topic"]) while True: msg = consumer.poll(1.0) if msg is None: continue if msg.error(): print("Error: {}".format(msg.error())) continue data = msg.value().decode("utf-8") topic = msg.topic() ts = msg.timestamp() print(data, topic, ts) 
Enter fullscreen mode Exit fullscreen mode

And, here is the output when you run the script

╰─➤ python consumer.py {"user_session_id": "5321202b-694a-4d82-9d2b-97174303595e", "user_name": "John Doe", "comment": "Malaysia Boleh!"} my_first_topic (1, 1678561082329) 
Enter fullscreen mode Exit fullscreen mode

Here is the demo when you post a message, the consumer can print it out within a second.

Demo

Enjoy ~

Top comments (0)