Fundamentals of MLOps
Data Collection and Preparation
Demo Data Pipeline Orchestration
Welcome to this comprehensive guide on data pipeline orchestration using Apache Airflow. In this lesson, you will explore a hands-on demo that illustrates how Airflow processes real-time data from an IoT device in a factory setting. Imagine a manufacturing facility where multiple machines are monitored through IoT devices collecting various metrics. Our objective is to process this data at scheduled intervals using Airflow’s scheduling feature and then share the processed results with maintenance technicians. Airflow streamlines task orchestration and simplifies troubleshooting for data engineers, data warehouse engineers, and MLOps professionals.
In the sections below, we break down each component of the pipeline including Docker image initialization, Airflow configuration, and the creation of DAGs for ETL operations.
Setting Up the Airflow Environment
In our KodeKloud playground, we start by creating a working directory for our orchestration example and then initialize Airflow using its official Docker image.
Create the Working Directory
Begin by creating and navigating to your project directory:
admin@docker-host:~$ mkdir example-orchestration
admin@docker-host:~$ cd example-orchestration/
admin@docker-host:~/example-orchestration$
Download the Docker Compose File and Configure Folders
Download the required Docker Compose file and set up the necessary folder structure for DAGs, logs, plugins, and configuration:
admin@docker-host:~/example-orchestrations$ mkdir example-orchestration
admin@docker-host:~/example-orchestrations$ cd example-orchestration
admin@docker-host:~/example-orchestrations$ curl -LLo 'https://airflow.apache.org/docs/apache-airflow/2.10.2/docker-compose.yaml'
% Total % Received % Xferd Average Speed Time Time Time Current
100 11342 100 11342 0 29307 0 --:-- --:-- --:-- 29307
admin@docker-host:~/example-orchestrations$ ls -lrt
total 12
-rw-r--r-- 1 admin admin 11342 Dec 8 07:09 docker-compose.yaml
admin@docker-host:~/example-orchestrations$ mkdir -p ./dags ./logs ./plugins ./config
admin@docker-host:~/example-orchestrations$ echo -e "AIRFLOW_UID=${id -u}" > .env
After configuring your environment, open the Docker Compose YAML file to review Airflow's settings. This file details the Airflow image, its environment variables, and services like Redis and Postgres.
Tip
Key components include setting the executor type, defining database connections, and mapping volumes for DAGs, logs, plugins, and configuration.
Airflow Configuration Snippet
Below is a snippet from the Docker Compose file, highlighting the Airflow image and essential environment variables:
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.10.2}
environment:
airflow-common-env:
AIRFLOW_CORE_EXECUTOR: CeleryExecutor
AIRFLOW_DATABASE_SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW_CELERY_RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW_CELERY_BROKER_URL: redis://redis:6379/0
AIRFLOW_CORE_FERNET_KEY: ''
AIRFLOW_CORE_DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW_CORE_LOAD_EXAMPLES: 'true'
AIRFLOW_API_AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
AIRFLOW_SCHEDULER_ENABLE_HEALTH_CHECK: 'true'
AIRFLOW_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
AIRFLOW_CONFIG: '/opt/airflow/config/airflow.cfg'
volumes:
- ${AIRFLOW_PROJ_DIR:-/}:/dags/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-/}:/logs/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-/}:/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-/}:/plugins:/opt/airflow/plugins
user: '${AIRFLOW_UID:-50000}:0'
depends_on:
&airflow-common-depends-on
redis:
condition: service_healthy
postgres:
Additional services such as Redis, the Airflow web server, scheduler, and worker are defined similarly. For example, the Redis service is configured as follows:
redis:
image: redis:7.2-bookworm
expose:
- 6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 30s
retries: 50
start_period: 30s
restart: always
And the Airflow web server exposes port 8080 (mapped to your local port 8081):
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- "8081:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
Initializing Airflow
After reviewing the configuration, initialize Airflow with the following commands:
admin@docker-host:~$ cd example-orchestration/
admin@docker-host:~/example-orchestration$ ls -lrt
total 12
-rw-rw-r-- 1 admin admin 11342 Dec 8 07:09 docker-compose.yaml
admin@docker-host:~/example-orchestration$ mkdir -p ./dags ./logs ./plugins ./config
admin@docker-host:~/example-orchestration$ echo -e "AIRFLOW_UID=${id -u}" > .env
admin@docker-host:~/example-orchestration$ vim docker-compose.yaml
admin@docker-host:~/example-orchestration$ docker compose up airflow-init
During initialization, Airflow downloads the necessary images and sets up database tables and user permissions. A successful initialization is indicated by a zero exit code in the logs.
Once initialization is complete, start all containers:
admin@docker-host:~/example-orchestration$ docker compose up
You'll observe that containers for Redis, Postgres, airflow-init, airflow-scheduler, airflow-webserver, airflow-worker, and airflow-triggerer are created and started. The appearance of the Airflow logo in the logs confirms that the service is running.
Accessing the Airflow Web Interface
After starting the containers (which might take 5–10 minutes), open your browser and navigate to http://localhost:8081. You will see the Airflow web interface.
Log in using the credentials:
- Username: Airflow
- Password: Airflow
In the UI, you'll find a list of DAGs (ETL workflows). Most example DAGs are initially paused; simply toggle them to activate. For example, the interface may display a tutorial DAG along with associated code details.
Clicking a DAG’s code button reveals its underlying implementation, offering further insights into the orchestration process.
Example: Bash Operator DAG
The following example demonstrates how to use the BashOperator to run Linux commands as part of an ETL workflow. This DAG, titled "example_bash_operator," schedules tasks using a cron expression and utilizes dummy tasks to represent workflow steps.
from __future__ import annotations
import datetime
import pendulum
from airflow.models import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="example_bash_operator",
schedule="0 0 * * *",
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
dag_timeout=datetime.timedelta(minutes=60),
tags=["example", "example2"],
params={"example_key": "example_value"},
) as dag:
run_this_last = EmptyOperator(
task_id="run_this_last",
)
# [START howto_operator_bash]
run_this = BashOperator(
task_id="run_after_loop",
bash_command="echo https://airflow.apache.org/",
)
# [END howto_operator_bash]
run_this >> run_this_last
for i in range(3):
task = BashOperator(
task_id=f"run_{i}",
bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
)
task >> run_this
Switch to the “Graph” view in the Airflow UI to visually inspect the DAG execution. Each box in the graph represents an individual task and the arrows depict task dependencies. Checking individual task logs can help diagnose any issues during execution.
Creating a Custom IoT Data Pipeline DAG
In this section, you will create a custom DAG that simulates an IoT data pipeline. The DAG will perform three main functions:
- Generate random IoT data.
- Aggregate the collected data.
- (Optionally) Send an email with the aggregated results.
Create the DAG File
Open a new file named iot_dag.py
within the dags
folder:
cd dags/
vim iot_dag.py
Paste the following code into iot_dag.py
:
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
from airflow.utils.dates import days_ago
import random
import time
import datetime
# Function to generate random IoT data
def generate_iot_data(**kwargs):
data = []
for _ in range(60): # 60 readings (1 per second) over one minute
data.append(random.choice([0, 1]))
time.sleep(1) # simulate a 1-second interval
return data
# Function to aggregate the IoT data
def aggregate_machine_data(**kwargs):
ti = kwargs['ti']
data = ti.xcom_pull(task_ids='getting_iot_data')
count_0 = data.count(0)
count_1 = data.count(1)
aggregated_data = {'count_0': count_0, 'count_1': count_1}
return aggregated_data
# Email content generation
def create_email_content(**kwargs):
ti = kwargs['ti']
aggregated_data = ti.xcom_pull(task_ids='aggregate_machine_data')
return (f"Aggregated IoT Data:\n"
f"Count of 0: {aggregated_data['count_0']}\n"
f"Count of 1: {aggregated_data['count_1']}")
# Default arguments for the DAG
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
}
with DAG(
dag_id='iot_data_pipeline',
default_args=default_args,
schedule_interval=None,
catchup=False,
) as dag:
start_task = DummyOperator(task_id='start_task')
getting_iot_data = PythonOperator(
task_id='getting_iot_data',
python_callable=generate_iot_data,
)
aggregate_machine_data = PythonOperator(
task_id='aggregate_machine_data',
python_callable=aggregate_machine_data,
)
# Optionally, use an EmailOperator to send the results
send_email = EmailOperator(
task_id='send_email',
to='[email protected]',
subject='IoT Data Aggregation Results',
html_content=create_email_content(),
)
end_task = DummyOperator(task_id='end_task')
# Define the task dependencies
start_task >> getting_iot_data >> aggregate_machine_data >> send_email >> end_task
Save the file. Airflow automatically detects new DAG definitions in the dags
folder, and the "iot_data_pipeline" DAG will appear in the Airflow UI.
Running and Monitoring the DAG
To run your new DAG:
- Unpause "iot_data_pipeline" in the Airflow UI.
- Trigger a manual run by clicking the run button.
The DAG executes the following tasks sequentially:
- start_task: Indicates the workflow start.
- getting_iot_data: Generates simulated IoT data.
- aggregate_machine_data: Aggregates the collected data.
- send_email: (Optional) Sends an email with the aggregated results.
- end_task: Marks the end of the pipeline.
Monitor the current state of each task (running, success, or failure) via the real-time UI. For detailed insights, click on a running task (e.g., "getting_iot_data") and check the logs.
After execution, the aggregated data might show, for example, 38 instances of “0” and 22 instances of “1”. In real-world applications, these counts could represent metrics such as successful operations versus machine errors, thereby aiding maintenance engineers in troubleshooting issues.
Extendability
This pipeline can be extended to write data to databases or data lakes, depending on your application needs.
Conclusion
This lesson demonstrated how Apache Airflow simplifies the scheduling and orchestration of complex ETL workflows, replacing multiple cron jobs with a single, manageable system. With Airflow, you can efficiently set up pipelines to collect, process, and analyze data from IoT devices with minimal effort.
Thank you for following along, and happy orchestrating!
For more detailed information on Apache Airflow and data orchestration, consider visiting the Apache Airflow Documentation.
Watch Video
Watch video content
Practice Lab
Practice lab