Event Streaming with Kafka

Kafka Producers Consumers The Message Flow

Demo Kafka Consumers

Learn how to create a multi-partition Kafka topic, produce messages using Python, and consume them programmatically. This step-by-step guide walks you through each phase—from topic creation to message processing.


Table of Contents

  1. Prerequisites
  2. 1. Create a Kafka Topic
  3. 2. Produce Messages with Python
  4. 3. Consume Messages with Python
  5. 4. Key Concepts
  6. References

Prerequisites

  • A running Kafka broker on localhost:9092
  • Python 3.6+ installed
  • Basic familiarity with the command line

Warning

Make sure your Kafka server is up and running. If you haven’t installed Kafka yet, follow the Apache Kafka Quickstart.


1. Create a Kafka Topic

Create a topic named multi-partition-topic with 3 partitions and a replication factor of 1:

cd ~/kafka/bin
./kafka-topics.sh \
  --create \
  --topic multi-partition-topic \
  --bootstrap-server localhost:9092 \
  --partitions 3 \
  --replication-factor 1

Expected output:

Created topic multi-partition-topic.

2. Produce Messages with Python

2.1 Set Up Python Environment

  1. Update package lists and install the virtual environment tools:

    sudo apt update
    sudo apt install -y python3-venv python3-pip
    
  2. Create and activate a virtual environment:

    python3 -m venv kafka-env
    source kafka-env/bin/activate
    
  3. Install the Kafka client library:

    pip install kafka-python
    

2.2 Write the Producer Script

Create a file named kafka_producer_example.py:

import random
import logging
from kafka import KafkaProducer

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s"
)

# Initialize producer
producer = KafkaProducer(bootstrap_servers='localhost:9092')

# Coffee shop data samples
coffee_shops = [
    {"name": "Espresso Bliss",    "location": "Downtown",    "rating": 4.5},
    {"name": "Cappuccino Corner", "location": "Uptown",      "rating": 4.2},
    {"name": "Latte Lounge",      "location": "Suburbs",     "rating": 4.8},
    {"name": "Mocha Magic",       "location": "City Center", "rating": 4.6},
    {"name": "Coffee Haven",      "location": "East Side",   "rating": 4.3},
]

# Send 10 messages
for i in range(10):
    shop = random.choice(coffee_shops)
    message = (
        f"Coffee Shop: {shop['name']}, "
        f"Location: {shop['location']}, "
        f"Rating: {shop['rating']}"
    )
    try:
        future = producer.send(
            'multi-partition-topic',
            key=str(i).encode('utf-8'),
            value=message.encode('utf-8')
        )
        metadata = future.get(timeout=10)
        logging.info(
            f"Message delivered to {metadata.topic} "
            f"[partition {metadata.partition}]"
        )
    except Exception as e:
        logging.error(f"Delivery failed: {e}")

producer.flush()
logging.info("Finished sending messages")

Run the producer:

python3 kafka_producer_example.py

You should see logs indicating messages delivered across different partitions.


3. Consume Messages with Python

Create a file named kafka_consumer.py:

from kafka import KafkaConsumer

# Initialize consumer
consumer = KafkaConsumer(
    'multi-partition-topic',
    bootstrap_servers='localhost:9092',
    group_id='partition-checker-group',
    auto_offset_reset='earliest'  # Start from the oldest message
)

try:
    print("Listening for messages...")
    for msg in consumer:
        text = msg.value.decode('utf-8')
        print(f"Received: {text}")
        print(f"→ Partition: {msg.partition}, Offset: {msg.offset}\n")
except KeyboardInterrupt:
    print("Interrupted by user. Exiting...")
finally:
    consumer.close()

Run the consumer:

python3 kafka_consumer.py

Each message will print alongside its partition number and offset.


4. Key Concepts

ConceptDescription
Consumer Groupgroup_id lets multiple consumers share partition consumption.
OffsetPosition in a partition. earliest resets the offset to the oldest record.
Partition DistributionMessages with different keys go to different partitions for parallelism.

In production, consumers typically process events, persist data, update dashboards, or forward messages to other systems instead of printing to the console.


References

Watch Video

Watch video content

Practice Lab

Practice lab

Previous
What is a Kafka Consumer