Testing Data Pipelines — ETL, Data Quality & Validation Guide
Data pipeline testing ensures that data moving through ETL processes remains accurate, complete, and consistent from source to destination. A broken pipeline can silently corrupt data for days before anyone notices. In this guide, you will learn how to test ETL transformations with sample data, enforce data quality checks using Great Expectations, validate schema evolution with migration tests, and build automated pipeline test suites. The Durga Antivirus Pro team tests every threat intelligence data pipeline with quality gates that block bad data before it reaches the analysis engine.
Learning Path
flowchart LR A[Data Engineering Basics] --> B[ETL Pipelines] B --> C[Testing Data Pipelines
You are here] C --> D[Data Quality Monitoring] D --> E[Production Validation] style C fill:#f90,color:#fff
ETL Transformation Testing
Test that your transformation logic produces correct output for known input:
import pandas as pd
def transform_events(raw_df):
df = raw_df.copy()
df["timestamp"] = pd.to_datetime(df["timestamp"])
df["event_date"] = df["timestamp"].dt.date
df["user_id"] = df["user_id"].astype(str)
return df[["event_date", "user_id", "event_type", "value"]]
def test_event_transformation():
raw = pd.DataFrame({
"timestamp": ["2026-06-24 10:30:00", "2026-06-24 11:00:00"],
"user_id": [1001, 1002],
"event_type": ["page_view", "purchase"],
"value": [1, 49.99],
"extra_col": ["ignore", "this"]
})
result = transform_events(raw)
assert len(result.columns) == 4
assert result.iloc[0]["event_date"] == pd.Timestamp("2026-06-24").date()
assert result.iloc[1]["user_id"] == "1002"
assert "extra_col" not in result.columns
print("ETL transformation test passed")
test_event_transformation()
Expected output:
ETL transformation test passed
Data Quality Testing with Great Expectations
Great Expectations provides declarative data quality validation:
import great_expectations as ge
def test_data_quality():
df = ge.read_csv("sample_events.csv")
expect_column_to_exist = df.expect_column_to_exist("user_id")
assert expect_column_to_exist["success"]
expect_not_null = df.expect_column_values_to_not_be_null("user_id")
assert expect_not_null["success"]
expect_range = df.expect_column_values_to_be_between("value", 0, 10000)
assert expect_range["success"]
expect_unique = df.expect_compound_columns_to_be_unique(["event_id"])
print(f"Quality checks: {expect_column_to_exist['success']}, "
f"{expect_not_null['success']}, "
f"{expect_range['success']}, "
f"{expect_unique['success']}")
test_data_quality()
Schema Validation
Test that source and destination schemas match expectations:
import json
def validate_schema(schema_path, sample_data):
with open(schema_path) as f:
schema = json.load(f)
errors = []
for field in schema["required_fields"]:
if field not in sample_data:
errors.append(f"Missing required field: {field}")
continue
expected_type = schema["field_types"].get(field)
if expected_type and not isinstance(sample_data[field], eval(expected_type)):
errors.append(f"Field {field} has wrong type: "
f"expected {expected_type}, got {type(sample_data[field]).__name__}")
return errors
schema = {
"required_fields": ["user_id", "event_type", "timestamp"],
"field_types": {
"user_id": "str",
"event_type": "str",
"timestamp": "str"
}
}
record = {"user_id": 1001, "event_type": "purchase", "timestamp": "2026-06-24"}
errors = validate_schema(json.dumps(schema), record)
print(f"Schema validation errors: {errors}")
Expected output:
Schema validation errors: ["Field user_id has wrong type: expected str, got int"]
Pipeline Integrity Test
End-to-end test for a complete pipeline flow:
import sqlite3, pandas as pd
def run_pipeline(input_csv):
conn = sqlite3.connect(":memory:")
conn.execute("""
CREATE TABLE events (
event_id INTEGER PRIMARY KEY,
user_id TEXT,
event_type TEXT,
value REAL,
event_date TEXT
)
""")
df = pd.read_csv(input_csv)
df.to_sql("events", conn, if_exists="append", index=False)
result = conn.execute("""
SELECT event_date, COUNT(*) as cnt, SUM(value) as total
FROM events
GROUP BY event_date
ORDER BY event_date
""").fetchall()
conn.close()
return result
results = run_pipeline("test_events.csv")
for row in results:
print(f"{row[0]}: {row[1]} events, total {row[2]:.2f}")
Testing Incremental Loads
Verify that incremental pipelines only process new data:
def test_incremental_load():
processed_ids = set()
def process_batch(batch):
new_records = [r for r in batch if r["id"] not in processed_ids]
for r in new_records:
processed_ids.add(r["id"])
return new_records
batch1 = [{"id": 1, "data": "A"}, {"id": 2, "data": "B"}]
batch2 = [{"id": 2, "data": "B"}, {"id": 3, "data": "C"}]
result1 = process_batch(batch1)
assert len(result1) == 2
result2 = process_batch(batch2)
assert len(result2) == 1
assert result2[0]["id"] == 3
print("Incremental load test passed")
test_incremental_load()
Practice Questions
1. Why is data pipeline testing important?
Silent data corruption can go unnoticed for days. Pipeline tests catch transformation errors, schema drift, and quality regressions before they affect downstream consumers.
2. What is Great Expectations and how does it help with data testing?
It is a declarative data quality framework that allows you to define expectations about your data — missing values, value ranges, uniqueness — and validate them automatically.
3. How do you test incremental data loads?
Track processed record IDs and verify that only new records are processed in each batch, preventing duplicate processing.
4. What is schema drift and how do you test for it?
Schema drift occurs when source data changes structure (new columns, changed types). Test by validating source schemas before each pipeline run and alerting on mismatches.
Challenge: Build an ETL pipeline that reads CSV files, transforms date formats, validates required fields, and loads into a SQLite database. Write tests for: transformation correctness, data quality (null checks, value ranges), schema validation, incremental loading deduplication, and error handling for malformed input.
FAQ
What's Next
Built by the developers of Doda Browser, DodaZIP, and Durga Antivirus Pro.
Built by the developers of DodaTech
Doda Browser, DodaZIP & Durga Antivirus Pro