Skip to content

Testing Data Pipelines — ETL, Data Quality & Validation Guide

DodaTech Updated 2026-06-24 4 min read

Data pipeline testing ensures that data moving through ETL processes remains accurate, complete, and consistent from source to destination. A broken pipeline can silently corrupt data for days before anyone notices. In this guide, you will learn how to test ETL transformations with sample data, enforce data quality checks using Great Expectations, validate schema evolution with migration tests, and build automated pipeline test suites. The Durga Antivirus Pro team tests every threat intelligence data pipeline with quality gates that block bad data before it reaches the analysis engine.

Learning Path

flowchart LR
  A[Data Engineering Basics] --> B[ETL Pipelines]
  B --> C[Testing Data Pipelines
You are here] C --> D[Data Quality Monitoring] D --> E[Production Validation] style C fill:#f90,color:#fff

ETL Transformation Testing

Test that your transformation logic produces correct output for known input:

import pandas as pd

def transform_events(raw_df):
    df = raw_df.copy()
    df["timestamp"] = pd.to_datetime(df["timestamp"])
    df["event_date"] = df["timestamp"].dt.date
    df["user_id"] = df["user_id"].astype(str)
    return df[["event_date", "user_id", "event_type", "value"]]

def test_event_transformation():
    raw = pd.DataFrame({
        "timestamp": ["2026-06-24 10:30:00", "2026-06-24 11:00:00"],
        "user_id": [1001, 1002],
        "event_type": ["page_view", "purchase"],
        "value": [1, 49.99],
        "extra_col": ["ignore", "this"]
    })

    result = transform_events(raw)

    assert len(result.columns) == 4
    assert result.iloc[0]["event_date"] == pd.Timestamp("2026-06-24").date()
    assert result.iloc[1]["user_id"] == "1002"
    assert "extra_col" not in result.columns
    print("ETL transformation test passed")

test_event_transformation()

Expected output:

ETL transformation test passed

Data Quality Testing with Great Expectations

Great Expectations provides declarative data quality validation:

import great_expectations as ge

def test_data_quality():
    df = ge.read_csv("sample_events.csv")

    expect_column_to_exist = df.expect_column_to_exist("user_id")
    assert expect_column_to_exist["success"]

    expect_not_null = df.expect_column_values_to_not_be_null("user_id")
    assert expect_not_null["success"]

    expect_range = df.expect_column_values_to_be_between("value", 0, 10000)
    assert expect_range["success"]

    expect_unique = df.expect_compound_columns_to_be_unique(["event_id"])
    print(f"Quality checks: {expect_column_to_exist['success']}, "
          f"{expect_not_null['success']}, "
          f"{expect_range['success']}, "
          f"{expect_unique['success']}")

test_data_quality()

Schema Validation

Test that source and destination schemas match expectations:

import json

def validate_schema(schema_path, sample_data):
    with open(schema_path) as f:
        schema = json.load(f)

    errors = []
    for field in schema["required_fields"]:
        if field not in sample_data:
            errors.append(f"Missing required field: {field}")
            continue
        expected_type = schema["field_types"].get(field)
        if expected_type and not isinstance(sample_data[field], eval(expected_type)):
            errors.append(f"Field {field} has wrong type: "
                         f"expected {expected_type}, got {type(sample_data[field]).__name__}")
    return errors

schema = {
    "required_fields": ["user_id", "event_type", "timestamp"],
    "field_types": {
        "user_id": "str",
        "event_type": "str",
        "timestamp": "str"
    }
}

record = {"user_id": 1001, "event_type": "purchase", "timestamp": "2026-06-24"}
errors = validate_schema(json.dumps(schema), record)
print(f"Schema validation errors: {errors}")

Expected output:

Schema validation errors: ["Field user_id has wrong type: expected str, got int"]

Pipeline Integrity Test

End-to-end test for a complete pipeline flow:

import sqlite3, pandas as pd

def run_pipeline(input_csv):
    conn = sqlite3.connect(":memory:")
    conn.execute("""
        CREATE TABLE events (
            event_id INTEGER PRIMARY KEY,
            user_id TEXT,
            event_type TEXT,
            value REAL,
            event_date TEXT
        )
    """)

    df = pd.read_csv(input_csv)
    df.to_sql("events", conn, if_exists="append", index=False)

    result = conn.execute("""
        SELECT event_date, COUNT(*) as cnt, SUM(value) as total
        FROM events
        GROUP BY event_date
        ORDER BY event_date
    """).fetchall()
    conn.close()
    return result

results = run_pipeline("test_events.csv")
for row in results:
    print(f"{row[0]}: {row[1]} events, total {row[2]:.2f}")

Testing Incremental Loads

Verify that incremental pipelines only process new data:

def test_incremental_load():
    processed_ids = set()

    def process_batch(batch):
        new_records = [r for r in batch if r["id"] not in processed_ids]
        for r in new_records:
            processed_ids.add(r["id"])
        return new_records

    batch1 = [{"id": 1, "data": "A"}, {"id": 2, "data": "B"}]
    batch2 = [{"id": 2, "data": "B"}, {"id": 3, "data": "C"}]

    result1 = process_batch(batch1)
    assert len(result1) == 2

    result2 = process_batch(batch2)
    assert len(result2) == 1
    assert result2[0]["id"] == 3
    print("Incremental load test passed")

test_incremental_load()

Practice Questions

1. Why is data pipeline testing important?

Silent data corruption can go unnoticed for days. Pipeline tests catch transformation errors, schema drift, and quality regressions before they affect downstream consumers.

2. What is Great Expectations and how does it help with data testing?

It is a declarative data quality framework that allows you to define expectations about your data — missing values, value ranges, uniqueness — and validate them automatically.

3. How do you test incremental data loads?

Track processed record IDs and verify that only new records are processed in each batch, preventing duplicate processing.

4. What is schema drift and how do you test for it?

Schema drift occurs when source data changes structure (new columns, changed types). Test by validating source schemas before each pipeline run and alerting on mismatches.

Challenge: Build an ETL pipeline that reads CSV files, transforms date formats, validates required fields, and loads into a SQLite database. Write tests for: transformation correctness, data quality (null checks, value ranges), schema validation, incremental loading deduplication, and error handling for malformed input.

FAQ

What is data pipeline testing?

Data pipeline testing validates that ETL processes correctly transform data from source to destination, maintaining accuracy, completeness, and schema compatibility.

How is data pipeline testing different from application testing?

Data pipeline testing focuses on data accuracy, schema evolution, idempotency, and quality gates rather than feature functionality.

What tools can I use for data pipeline testing?

Great Expectations for data quality, dbt for transformation testing, pytest for pipeline logic, and Apache Beam testing utilities for streaming pipelines.

How do I test streaming data pipelines?

Use time-bounded test windows with known input data. Verify that the output matches expected aggregations within each window.

What's Next

A/B Testing — Statistical Significance Guide
Testing Microservices Integration
CI/CD Testing Pipeline

Built by the developers of Doda Browser, DodaZIP, and Durga Antivirus Pro.

Built by the developers of DodaTech

Doda Browser, DodaZIP & Durga Antivirus Pro