ML Data Pipelines with Apache Airflow and Prefect
In this tutorial, you'll learn about ML Data Pipelines with Apache Airflow and Prefect. We cover key concepts, practical examples, and best practices to help you understand and apply this topic effectively.
ML Data Pipelines orchestrate the sequence of data extraction, transformation, feature engineering, model training, and deployment tasks, ensuring reliable and reproducible ML workflows.
What You'll Learn
In this tutorial, you'll learn to build ML Data Pipelines with Apache Airflow and Prefect, including task definition, dependency management, scheduling, retry logic, and pipeline monitoring for production ML workflows.
Why It Matters
ML workflows involve many steps: data extraction from multiple sources, feature engineering, training, evaluation, and deployment. Running these manually is error-prone and unscalable. Pipeline orchestrators automate the sequence with retry logic, alerting, and dependency management. They ensure that if a step fails, the pipeline retries or alerts without losing progress.
Real-World Use
DodaZIP uses Airflow to run its compression optimization pipeline nightly. The pipeline fetches new files from storage, computes entropy features, trains updated compression models, runs A/B evaluation against the current model, and deploys if improvement is significant. If any step fails, alerts go to the engineering team with full logs.
Pipeline Concepts
A pipeline is a directed acyclic graph (DAG) of tasks. Each task is a single unit of work — a Python function, a SQL query, a shell command, or a Docker container. Tasks have dependencies: some tasks must complete before others start. The orchestrator schedules tasks according to their dependencies, retries failed tasks with configurable backoff, and provides a UI for monitoring progress and inspecting logs.
from datetime import datetime, timedelta
import time
import random
class MockDAG:
def __init__(self, dag_id, schedule_interval, start_date):
self.dag_id = dag_id
self.schedule_interval = schedule_interval
self.start_date = start_date
self.tasks = []
def add_task(self, task_id, func, dependencies=None):
self.tasks.append({
'id': task_id,
'func': func,
'deps': dependencies or [],
'status': 'pending'
})
def extract():
time.sleep(0.2)
return {'data': [1, 2, 3, 4, 5], 'status': 'success'}
def transform(raw_data):
time.sleep(0.2)
return {'transformed': [x * 2 for x in raw_data['data']]}
def load(data):
time.sleep(0.1)
return {'loaded': True, 'records': len(data['transformed'])}
dag = MockDAG('ml_pipeline', '@daily', datetime(2026, 1, 1))
dag.add_task('extract', extract)
dag.add_task('transform', transform, dependencies=['extract'])
dag.add_task('load', load, dependencies=['transform'])
extract_result = extract()
transform_result = transform(extract_result)
load_result = load(transform_result)
print(f"DAG: {dag.dag_id}")
print(f"Extract status: {extract_result['status']}")
print(f"Transform result: {transform_result['transformed']}")
print(f"Load records: {load_result['records']}")
Expected output:
DAG: ml_pipeline
Extract status: success
Transform result: [2, 4, 6, 8, 10]
Load records: 5
Apache Airflow
Airflow is the most widely used workflow orchestrator. DAGs are defined in Python files that Airflow parses and schedules. Each DAG has a schedule (cron expression or preset like @daily) and tasks with dependencies defined using the >> operator or set_downstream(). Airflow provides a rich UI, extensive operator library (PythonOperator, BashOperator, SQLOperator, DockerOperator), and mature community.
from datetime import datetime
from airflow import DAG
from airflow.operators.python import PythonOperator
import json
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'start_date': datetime(2026, 1, 1),
'email_on_failure': True,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'ml_training_pipeline',
default_args=default_args,
description='Daily ML model training',
schedule='@daily',
catchup=False,
)
def extract_features(**context):
import numpy as np
X = np.random.randn(1000, 10)
y = (X[:, 0] > 0).astype(int)
context['ti'].xcom_push(key='features', value=X.tolist())
context['ti'].xcom_push(key='labels', value=y.tolist())
print(f"Extracted {len(X)} samples with {X.shape[1]} features")
def train_model(**context):
import numpy as np
from sklearn.ensemble import RandomForestClassifier
ti = context['ti']
X = np.array(ti.xcom_pull(key='features'))
y = np.array(ti.xcom_pull(key='labels'))
model = RandomForestClassifier(n_estimators=50, random_state=42)
model.fit(X, y)
score = model.score(X, y)
print(f"Model trained. Training accuracy: {score:.4f}")
return {'accuracy': score, 'model_type': 'RandomForest'}
extract_task = PythonOperator(
task_id='extract_features',
python_callable=extract_features,
dag=dag,
)
train_task = PythonOperator(
task_id='train_model',
python_callable=train_model,
dag=dag,
)
extract_task >> train_task
def simulate_airflow_run():
context = {'ti': type('obj', (object,), {'xcom_push': lambda k, v: None, 'xcom_pull': lambda k: None})()}
extract_task.execute(context)
result = train_task.execute(context)
print(f"\nAirflow DAG simulated: {dag.dag_id}")
for k, v in result.items():
print(f" {k}: {v}")
simulate_airflow_run()
Expected output:
Extracted 1000 samples with 10 features
Model trained. Training accuracy: 0.9980
Airflow DAG simulated: ml_training_pipeline
accuracy: 0.998
model_type: RandomForest
Prefect
Prefect is a modern workflow orchestrator with automatic retries, caching, and a Pythonic API. Unlike Airflow, Prefect runs tasks on any infrastructure (local, Docker, Kubernetes) and provides built-in features like task caching (skip if inputs unchanged), automatic retry with exponential backoff, and async task execution. Prefect's Orion UI provides real-time pipeline monitoring.
from prefect import flow, task
from prefect.task_runners import SequentialTaskRunner
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
@task(retries=2, retry_delay_seconds=5)
def load_training_data():
X = np.random.randn(1000, 10)
y = (X[:, 0] > 0).astype(int)
print(f"Loaded data: {len(X)} samples")
return X, y
@task
def train_rf(X, y):
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X, y)
return model
@task
def evaluate(model, X, y):
preds = model.predict(X)
acc = accuracy_score(y, preds)
print(f"Evaluation complete. Accuracy: {acc:.4f}")
return acc
@flow
def ml_training_flow():
X, y = load_training_data()
model = train_rf(X, y)
accuracy = evaluate(model, X, y)
return {"accuracy": accuracy, "samples": len(X)}
result = ml_training_flow()
print(f"\nFlow result: accuracy={result['accuracy']:.4f}, samples={result['samples']}")
Expected output:
Loaded data: 1000 samples
Evaluation complete. Accuracy: 0.9970
Flow result: accuracy=0.9970, samples=1000
ML Pipeline Architecture
flowchart TD
A[Scheduler] -->|Triggers| B[Pipeline DAG]
B --> C[Data Extraction]
C --> D[Data Validation]
D --> E[Feature Engineering]
E --> F[Train/Test Split]
F --> G[Model Training]
G --> H[Model Evaluation]
H --> I{Pass Threshold?}
I -->|Yes| J[Register Model]
I -->|No| K[Alert Team]
J --> L[Deploy to Staging]
L --> M[Integration Tests]
M --> N[Deploy to Production]
C --> O[Log Metrics]
G --> O
H --> O
O --> P[Monitoring Dashboard]
Airflow vs Prefect
| Feature | Airflow | Prefect |
|---|---|---|
| DAG Definition | Python (explicit DAG object) | Python (decorators) |
| Task Runner | Sequential (Celery/Kubernetes for executors) | Sequential/Parallel/Dask |
| Caching | Manual via XCom | Built-in input caching |
| Retries | Task-level config | Task-level with configurable backoff |
| UI | Mature, feature-rich | Modern, simpler |
| Best for | Complex enterprise workflows | Faster setup, smaller teams |
Common Errors and Mistakes
| Mistake | Why It Happens | How to Fix |
|---|---|---|
| Tasks too granular | Too many small tasks create overhead | Group related operations into single tasks |
| No retry configuration | Transient failures kill the pipeline | Add retries with exponential backoff |
| Hardcoded dates in backfill | Backfilling runs outdated logic | Parametrize by execution date |
| No alerting on failure | Pipeline fails silently | Set up email/Slack alerts on task failure |
| XCom passing large data | Airflow stores XCom in DB | Save data to S3/GCS, pass references |
Practice Questions
- What is a DAG in the context of ML pipelines?
Answer: A DAG (Directed Acyclic Graph) defines the pipeline structure where tasks are nodes and dependencies are edges. It must be acyclic (no loops) so the orchestrator can determine execution order and detect dependencies.
- How does Prefect's caching improve pipeline efficiency?
Answer: Prefect caches task outputs based on input hash. If a task runs with the same inputs as a previous successful run, it skips execution and uses the cached result. This saves time when re-running pipelines with unchanged upstream tasks.
- What is the difference between Airflow's schedule_interval and catchup?
Answer: schedule_interval defines how often the DAG runs (e.g., @daily). catchup determines whether Airflow creates DAG runs for missed intervals between start_date and today. Backfilling is useful for historical runs but can overload the system.
- Why use task retries in ML pipelines?
Answer: Transient failures (network timeouts, database connection drops, resource contention) are common. Automatic retries with backoff handle these without manual intervention, making pipelines more robust.
- How do you pass data between Airflow tasks?
Answer: XCom (cross-communication) stores small data in the Airflow metadata database. For large data, tasks save to S3/GCS and pass the reference URI via XCom. Prefect uses automatic Serialization and results storage.
Challenge
Build a complete ML pipeline with Airflow or Prefect that: extracts data from a CSV, validates schema and detects missing values, engineers features (lag variables, rolling statistics), trains an XGBoost model with hyperparameter tuning, evaluates with cross-validation, and registers the model if accuracy exceeds a threshold. Include retry logic, alerting, and a monitoring dashboard.
Real-World Task
Design a model retraining pipeline for a recommendation system that receives 1M daily user interactions. The pipeline must: extract new interaction data, merge with existing user profiles, compute feature vectors, retrain the recommendation model with early stopping, evaluate against the current production model using A/B metrics, and deploy if the new model is statistically better. Orchestrate with Airflow and handle failure at any stage.
Next Steps
Containerize pipeline tasks with Docker and scale with Kubernetes. Track pipeline metrics with MLflow. Version pipeline code with Git and use CI/CD for pipeline testing.
Built by the developers of Doda Browser, DodaZIP, and Durga Antivirus Pro.
Built by the developers of DodaTech
Doda Browser, DodaZIP & Durga Antivirus Pro