Skip to content

Building a Data Analysis Pipeline with Python

DodaTech 4 min read

In this tutorial, you'll learn about Building a Data Analysis Pipeline with Python. We cover key concepts, practical examples, and best practices to help you understand and apply this topic effectively.

What You'll Learn

Build a complete data analysis pipeline โ€” load data from multiple sources, clean and validate, engineer features, generate reports, and automate the workflow.

Why It Matters

Manual analysis doesn't scale. A pipeline automates repetitive tasks, ensures consistency, and makes your analysis reproducible and shareable.

Real-World Use

A daily sales report that runs automatically, a customer churn analysis that processes new data each week, or a data quality check that alerts when anomalies appear.

Pipeline Architecture

Extract โ†’ Validate โ†’ Clean โ†’ Transform โ†’ Analyze โ†’ Report

1. Extract: Load from CSV, API, database
2. Validate: Check schema, data types, ranges
3. Clean: Handle missing values, outliers, duplicates
4. Transform: Feature engineering, aggregations
5. Analyze: Statistics, correlations, visualizations
6. Report: Generate charts, summaries, export

Step 1: Configuration

# config.yaml
"""
data:
  raw_path: "data/raw/"
  processed_path: "data/processed/"
  reports_path: "outputs/reports/"

sources:
  - name: "sales"
    type: "csv"
    path: "data/raw/sales_{date}.csv"
    schema:
      date: datetime
      product: str
      revenue: float
      quantity: int
      region: str

cleaning:
  max_missing_pct: 0.2
  outlier_method: "iqr"
  outlier_threshold: 1.5

reporting:
  charts: true
  summary_stats: true
  save_format: ["csv", "parquet"]
"""
# config.py
import yaml
from pathlib import Path
from datetime import datetime, timedelta

def load_config(path="config.yaml"):
    with open(path) as f:
        return yaml.safe_load(f)

def get_data_paths(config, days_back=1):
    """Get data file paths for the last N days."""
    date = (datetime.now() - timedelta(days=days_back)).strftime("%Y-%m-%d")
    paths = []
    for source in config["sources"]:
        path = source["path"].format(date=date)
        paths.append({"name": source["name"], "path": path, "schema": source.get("schema", {})})
    return paths

Step 2: Extract

# extract.py
import pandas as pd
from pathlib import Path

class DataExtractor:
    """Extract data from multiple sources."""

    def extract_csv(self, path, schema=None):
        """Load CSV with optional schema."""
        kwargs = {}
        if schema:
            if "date" in schema:
                kwargs["parse_dates"] = ["date"]

        df = pd.read_csv(path, **kwargs)
        print(f"  Loaded {path}: {df.shape[0]} rows ร— {df.shape[1]} cols")
        return df

    def extract_all(self, sources):
        """Extract data from all configured sources."""
        data = {}
        for source in sources:
            print(f"๐Ÿ“ฅ Extracting {source['name']}...")
            path = source["path"]
            if Path(path).exists():
                data[source["name"]] = self.extract_csv(path, source["schema"])
            else:
                print(f"  โš ๏ธ File not found: {path}")
        return data

Step 3: Validate

# validate.py
class DataValidator:
    """Validate data quality and schema."""

    def validate_schema(self, df, expected_schema):
        """Check that columns exist and have expected types."""
        issues = []
        for col, expected_type in expected_schema.items():
            if col not in df.columns:
                issues.append(f"Missing column: {col}")
                continue
            actual_type = df[col].dtype
            if expected_type == "datetime" and not pd.api.types.is_datetime64_any_dtype(df[col]):
                issues.append(f"Column '{col}' should be datetime, got {actual_type}")
        return issues

    def check_data_quality(self, df):
        """Run quality checks and return a report."""
        report = {
            "rows": len(df),
            "columns": len(df.columns),
            "missing": df.isnull().sum().to_dict(),
            "duplicates": df.duplicated().sum(),
            "negative_values": {},
        }

        for col in df.select_dtypes(include="number").columns:
            neg = (df[col] < 0).sum()
            if neg > 0:
                report["negative_values"][col] = neg

        return report

    def validate_all(self, data, config):
        """Validate all datasets."""
        for name, df in data.items():
            print(f"๐Ÿ” Validating {name}...")
            schema = config.get("schema", {})
            issues = self.validate_schema(df, schema)
            if issues:
                for issue in issues:
                    print(f"  โŒ {issue}")

            quality = self.check_data_quality(df)
            print(f"  Rows: {quality['rows']}, Duplicates: {quality['duplicates']}")
            missing_cols = {k: v for k, v in quality["missing"].items() if v > 0}
            if missing_cols:
                print(f"  Missing values: {missing_cols}")

Step 4: Clean

# clean.py
class DataCleaner:
    """Clean and preprocess data."""

    def __init__(self, config):
        self.max_missing_pct = config.get("max_missing_pct", 0.2)
        self.outlier_method = config.get("outlier_method", "iqr")
        self.outlier_threshold = config.get("outlier_threshold", 1.5)

    def remove_duplicates(self, df):
        """Remove duplicate rows."""
        before = len(df)
        df = df.drop_duplicates()
        return df, before - len(df)

    def handle_missing(self, df):
        """Handle missing values based on column type."""
        for col in df.columns:
            missing_pct = df[col].isnull().mean()
            if missing_pct > self.max_missing_pct:
                df = df.drop(columns=[col])
            elif df[col].dtype in ["float64", "int64"]:
                df[col] = df[col].fillna(df[col].median())
            else:
                df[col] = df[col].fillna(df[col].mode().iloc[0] if not df[col].mode().empty else "")
        return df

    def remove_outliers(self, df):
        """Cap outliers using IQR method."""
        numeric_cols = df.select_dtypes(include="number").columns
        for col in numeric_cols:
            Q1 = df[col].quantile(0.25)
            Q3 = df[col].quantile(0.75)
            IQR = Q3 - Q1
            lower = Q1 - self.outlier_threshold * IQR
            upper = Q3 + self.outlier_threshold * IQR
            df[col] = df[col].clip(lower, upper)
        return df

    def clean_all(self, data):
        """Clean all datasets."""
        for name, df in data.items():
            print(f"๐Ÿงน Cleaning {name}...")
            df, dupes = self.remove_duplicates(df)
            print(f"  Removed {dupes} duplicates")
            df = self.handle_missing(df)
            df = self.remove_outliers(df)
            data[name] = df
        return data

Step 5: Transform and Analyze

# analyze.py
class DataAnalyzer:
    """Analyze data and generate insights."""

    def summary_statistics(self, df):
        """Generate summary statistics."""
        return {
            "numeric": df.describe(),
            "categorical": {col: df[col].value_counts().to_dict()
                          for col in df.select_dtypes("object").columns[:5]},
        }

    def top_correlations(self, df, target=None, n=10):
        """Find top correlated features."""
        numeric = df.select_dtypes(include="number")
        if len(numeric.columns) < 2:
            return {}

        corr = numeric.corr()
        if target and target in corr.columns:
            return corr[target].drop(target).abs().sort_values(ascending=False).head(n)

        return corr

    def analyze_all(self, data):
        """Run analysis on all datasets."""
        results = {}
        for name, df in data.items():
            print(f"๐Ÿ“Š Analyzing {name}...")
            results[name] = {
                "summary": self.summary_statistics(df),
                "shape": df.shape,
                "columns": list(df.columns),
            }
        return results

Step 6: Run the Pipeline

# pipeline.py
from pathlib import Path

class DataPipeline:
    """End-to-end data analysis pipeline."""

    def __init__(self, config_path="config.yaml"):
        self.config = load_config(config_path)

    def run(self, days_back=1):
        print("=" * 60)
        print("๐Ÿ“Š DATA ANALYSIS PIPELINE")
        print("=" * 60)

        # Get data sources
        sources = get_data_paths(self.config, days_back)

        # Extract
        extractor = DataExtractor()
        data = extractor.extract_all(sources)
        if not data:
            print("โŒ No data found")
            return

        # Validate
        validator = DataValidator()
        validator.validate_all(data, self.config)

        # Clean
        cleaner = DataCleaner(self.config.get("cleaning", {}))
        data = cleaner.clean_all(data)

        # Analyze
        analyzer = DataAnalyzer()
        results = analyzer.analyze_all(data)

        # Save processed data
        output_dir = Path(self.config["data"]["processed_path"])
        output_dir.mkdir(parents=True, exist_ok=True)
        for name, df in data.items():
            df.to_parquet(output_dir / f"{name}_processed.parquet")

        print("\nโœ… Pipeline complete!")
        return data, results

# Run
if __name__ == "__main__":
    pipeline = DataPipeline()
    data, results = pipeline.run(days_back=7)

Built by the developers of DodaTech

Doda Browser, DodaZIP & Durga Antivirus Pro