Event Streaming with Kafka
Kafka Connect Effortless Data Pipelines
Demo Kafka Connect in Action
In this tutorial, you’ll send JSON events to an Apache Kafka topic and watch Kafka Connect’s S3 sink connector automatically persist them to Amazon S3. This workflow highlights how event streaming architectures offload storage concerns and streamline analytics.
Prerequisites
- A running Kafka broker and Connect cluster on your EC2 instance
- AWS credentials configured for the S3 sink connector
- An S3 bucket created for storing topic data (e.g.,
my-kafka-bucket
)
1. Start the Kafka Console Producer
- SSH into the EC2 instance and elevate to root:
sudo su
- Navigate to your Kafka installation:
cd ~/kafka_2.13-3.0.0
- Launch the console producer for the
cartevent
topic:bin/kafka-console-producer.sh \ --bootstrap-server 98.81.233.254:9092 \ --topic cartevent
2. Publish Sample Car Events
Copy and paste each line below into the producer terminal. Each JSON message represents a single car event:
{"eventId":1,"carId":"CAR001","eventType":"SPEED_CHANGE","fuelLevel":65,"timestamp":"2025-02-11T10:00:52Z"}
{"eventId":2,"carId":"CAR002","eventType":"ENGINE_START","timestamp":"2025-02-11T10:01:02Z"}
{"eventId":3,"carId":"CAR006","eventType":"LOW_FUEL","fuelLevel":10,"timestamp":"2025-02-11T10:05:30Z"}
{"eventId":4,"carId":"CAR002","eventType":"LOW_FUEL","fuelLevel":15,"timestamp":"2025-02-11T10:01:30Z"}
{"eventId":5,"carId":"CAR006","eventType":"LOW_FUEL","fuelLevel":10,"timestamp":"2025-02-11T10:05:30Z"}
{"eventId":6,"carId":"CAR008","eventType":"LOW_FUEL","fuelLevel":1,"timestamp":"2025-02-11T10:01:30Z"}
{"eventId":7,"carId":"CAR008","eventType":"LOW_FUEL","fuelLevel":1,"timestamp":"2025-02-11T10:02:30Z"}
{"eventId":8,"carId":"CAR009","eventType":"LOW_FUEL","fuelLevel":2,"timestamp":"2025-02-11T10:03:30Z"}
Note
By default, the S3 sink connector flushes records every 5 messages or after a time interval. After sending at least five events, you can immediately verify data landing in S3.
3. Verify JSON Files in Amazon S3
Navigate to the S3 console and open the prefix topics/cartevent/partition=0/
in your bucket. You should see JSON files named using the convention topic+partition+startOffset.json
.
Refreshing after more events produces additional files:
4. How It Works
- The Kafka Connect S3 sink connector continuously polls the
cartevent
topic. - Upon reaching the configured batch size or interval, it writes a bulk JSON file to S3.
- Connector handles offset management, retries, and error handling automatically.
- You can query these files directly with Amazon Athena or integrate with downstream analytics tools.
Supported Kafka Connectors
Connector Type | Examples | Use Case |
---|---|---|
Sink | Amazon S3, Google Cloud Storage, RDS | Persist Kafka data to external stores |
Source | FileSource, JDBC Source, S3 Source | Ingest external data into Kafka |
For a full list of available connectors and configurations, see the Confluent Hub.
That wraps up this demo of the Kafka Connect S3 sink. Next, explore additional connectors and integration patterns to build scalable event-driven architectures!
Watch Video
Watch video content
Practice Lab
Practice lab