Event Streaming with Kafka
Kafka Producers Consumers The Message Flow
What is a Kafka Producer
Apache Kafka is a distributed streaming platform designed for high-throughput, real-time data pipelines. A Kafka Producer is the client application responsible for publishing messages (records) to one or more Kafka topics. Understanding how producers work and how to configure them effectively is essential for building reliable, scalable data pipelines.
Overview
A Kafka Producer’s main duties include:
- Serialization: Convert message keys and values into byte arrays. 
- Partitioning: Decide which partition each record should go to. 
- Batching & Buffering: Group messages to optimize network usage. 
- Retries & Error Handling: Automatically retry on transient failures. 
- Acknowledgments: Wait for durability guarantees based on the configured - ackslevel.
Note
By default, Kafka Producers send records asynchronously, yielding high throughput and non-blocking application threads.
Producer Workflow
- Configure Producer Properties 
 Define bootstrap servers, serializers, acknowledgments, and other settings.
- Instantiate the Producer 
 Create a- KafkaProducerclient using the configuration.
- Send Records Asynchronously 
 Use- ProducerRecordobjects to publish data to a specified topic.
- Handle Callbacks (Optional) 
 Attach success and error callbacks to monitor delivery results.
- Flush and Close 
 Ensure all buffered records are sent before shutting down the client.
Basic Python Example
from kafka import KafkaProducer
import json
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'],
    key_serializer=lambda k: k.encode('utf-8'),
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    acks='all'  # Strong durability: wait for full ISR acknowledgment
)
topic = 'example-topic'
for i in range(5):
    key = f'key-{i}'
    value = {'count': i}
    # 2. Send asynchronously with callbacks
    producer.send(topic, key=key, value=value)\
        .add_callback(lambda md: print(
            f"Sent to {md.topic} partition {md.partition} offset {md.offset}"
        ))\
        .add_errback(lambda err: print(f"Send error: {err}"))
producer.flush()
producer.close()
Sample Console Output
Sent to example-topic partition 2 offset 10
Sent to example-topic partition 0 offset 11
Sent to example-topic partition 1 offset 12
Sent to example-topic partition 2 offset 13
Sent to example-topic partition 0 offset 14
Configuration Reference
| Property | Description | Example |
|---------------------|------------------------------------------------------------------|---------------------------|
| bootstrap.servers | Initial Kafka broker addresses (host:port)                       | ['broker1:9092']        |
| key.serializer    | Converts the record key to bytes                                 | StringSerializer()      |
| value.serializer  | Converts the record value to bytes                               | JsonSerializer()        |
| acks              | 0, 1, or all (wait for leader/ISR acknowledgment)         | 'all'                   |
| retries           | Number of automatic retries on transient failures               | 3                       |
| linger.ms         | Time to wait for additional messages before sending a batch (ms)| 5                       |
| batch.size        | Maximum batch size in bytes                                      | 16384                   |
Warning
Setting acks=0 offers low latency but no durability guarantees. Use with caution in critical data flows.
To enable exactly-once semantics, set enable.idempotence=true (Kafka ≥0.11).
Best Practices
- Tune batch.size and linger.ms to strike a balance between latency and throughput.
- Implement robust error handling in callbacks for custom retry or logging logic.
- Enable idempotence (enable.idempotence=true) for exactly-once delivery in supported Kafka versions.
- Monitor key producer metrics (e.g., record-send-rate,request-latency) via JMX or your monitoring stack.
Links and References
Watch Video
Watch video content