Building a Data Analysis Pipeline with Python
DodaTech
4 min read
In this tutorial, you'll learn about Building a Data Analysis Pipeline with Python. We cover key concepts, practical examples, and best practices to help you understand and apply this topic effectively.
What You'll Learn
Build a complete data analysis pipeline โ load data from multiple sources, clean and validate, engineer features, generate reports, and automate the workflow.
Why It Matters
Manual analysis doesn't scale. A pipeline automates repetitive tasks, ensures consistency, and makes your analysis reproducible and shareable.
Real-World Use
A daily sales report that runs automatically, a customer churn analysis that processes new data each week, or a data quality check that alerts when anomalies appear.
Pipeline Architecture
Extract โ Validate โ Clean โ Transform โ Analyze โ Report
1. Extract: Load from CSV, API, database
2. Validate: Check schema, data types, ranges
3. Clean: Handle missing values, outliers, duplicates
4. Transform: Feature engineering, aggregations
5. Analyze: Statistics, correlations, visualizations
6. Report: Generate charts, summaries, export
Step 1: Configuration
# config.yaml
"""
data:
raw_path: "data/raw/"
processed_path: "data/processed/"
reports_path: "outputs/reports/"
sources:
- name: "sales"
type: "csv"
path: "data/raw/sales_{date}.csv"
schema:
date: datetime
product: str
revenue: float
quantity: int
region: str
cleaning:
max_missing_pct: 0.2
outlier_method: "iqr"
outlier_threshold: 1.5
reporting:
charts: true
summary_stats: true
save_format: ["csv", "parquet"]
"""
# config.py
import yaml
from pathlib import Path
from datetime import datetime, timedelta
def load_config(path="config.yaml"):
with open(path) as f:
return yaml.safe_load(f)
def get_data_paths(config, days_back=1):
"""Get data file paths for the last N days."""
date = (datetime.now() - timedelta(days=days_back)).strftime("%Y-%m-%d")
paths = []
for source in config["sources"]:
path = source["path"].format(date=date)
paths.append({"name": source["name"], "path": path, "schema": source.get("schema", {})})
return paths
Step 2: Extract
# extract.py
import pandas as pd
from pathlib import Path
class DataExtractor:
"""Extract data from multiple sources."""
def extract_csv(self, path, schema=None):
"""Load CSV with optional schema."""
kwargs = {}
if schema:
if "date" in schema:
kwargs["parse_dates"] = ["date"]
df = pd.read_csv(path, **kwargs)
print(f" Loaded {path}: {df.shape[0]} rows ร {df.shape[1]} cols")
return df
def extract_all(self, sources):
"""Extract data from all configured sources."""
data = {}
for source in sources:
print(f"๐ฅ Extracting {source['name']}...")
path = source["path"]
if Path(path).exists():
data[source["name"]] = self.extract_csv(path, source["schema"])
else:
print(f" โ ๏ธ File not found: {path}")
return data
Step 3: Validate
# validate.py
class DataValidator:
"""Validate data quality and schema."""
def validate_schema(self, df, expected_schema):
"""Check that columns exist and have expected types."""
issues = []
for col, expected_type in expected_schema.items():
if col not in df.columns:
issues.append(f"Missing column: {col}")
continue
actual_type = df[col].dtype
if expected_type == "datetime" and not pd.api.types.is_datetime64_any_dtype(df[col]):
issues.append(f"Column '{col}' should be datetime, got {actual_type}")
return issues
def check_data_quality(self, df):
"""Run quality checks and return a report."""
report = {
"rows": len(df),
"columns": len(df.columns),
"missing": df.isnull().sum().to_dict(),
"duplicates": df.duplicated().sum(),
"negative_values": {},
}
for col in df.select_dtypes(include="number").columns:
neg = (df[col] < 0).sum()
if neg > 0:
report["negative_values"][col] = neg
return report
def validate_all(self, data, config):
"""Validate all datasets."""
for name, df in data.items():
print(f"๐ Validating {name}...")
schema = config.get("schema", {})
issues = self.validate_schema(df, schema)
if issues:
for issue in issues:
print(f" โ {issue}")
quality = self.check_data_quality(df)
print(f" Rows: {quality['rows']}, Duplicates: {quality['duplicates']}")
missing_cols = {k: v for k, v in quality["missing"].items() if v > 0}
if missing_cols:
print(f" Missing values: {missing_cols}")
Step 4: Clean
# clean.py
class DataCleaner:
"""Clean and preprocess data."""
def __init__(self, config):
self.max_missing_pct = config.get("max_missing_pct", 0.2)
self.outlier_method = config.get("outlier_method", "iqr")
self.outlier_threshold = config.get("outlier_threshold", 1.5)
def remove_duplicates(self, df):
"""Remove duplicate rows."""
before = len(df)
df = df.drop_duplicates()
return df, before - len(df)
def handle_missing(self, df):
"""Handle missing values based on column type."""
for col in df.columns:
missing_pct = df[col].isnull().mean()
if missing_pct > self.max_missing_pct:
df = df.drop(columns=[col])
elif df[col].dtype in ["float64", "int64"]:
df[col] = df[col].fillna(df[col].median())
else:
df[col] = df[col].fillna(df[col].mode().iloc[0] if not df[col].mode().empty else "")
return df
def remove_outliers(self, df):
"""Cap outliers using IQR method."""
numeric_cols = df.select_dtypes(include="number").columns
for col in numeric_cols:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - self.outlier_threshold * IQR
upper = Q3 + self.outlier_threshold * IQR
df[col] = df[col].clip(lower, upper)
return df
def clean_all(self, data):
"""Clean all datasets."""
for name, df in data.items():
print(f"๐งน Cleaning {name}...")
df, dupes = self.remove_duplicates(df)
print(f" Removed {dupes} duplicates")
df = self.handle_missing(df)
df = self.remove_outliers(df)
data[name] = df
return data
Step 5: Transform and Analyze
# analyze.py
class DataAnalyzer:
"""Analyze data and generate insights."""
def summary_statistics(self, df):
"""Generate summary statistics."""
return {
"numeric": df.describe(),
"categorical": {col: df[col].value_counts().to_dict()
for col in df.select_dtypes("object").columns[:5]},
}
def top_correlations(self, df, target=None, n=10):
"""Find top correlated features."""
numeric = df.select_dtypes(include="number")
if len(numeric.columns) < 2:
return {}
corr = numeric.corr()
if target and target in corr.columns:
return corr[target].drop(target).abs().sort_values(ascending=False).head(n)
return corr
def analyze_all(self, data):
"""Run analysis on all datasets."""
results = {}
for name, df in data.items():
print(f"๐ Analyzing {name}...")
results[name] = {
"summary": self.summary_statistics(df),
"shape": df.shape,
"columns": list(df.columns),
}
return results
Step 6: Run the Pipeline
# pipeline.py
from pathlib import Path
class DataPipeline:
"""End-to-end data analysis pipeline."""
def __init__(self, config_path="config.yaml"):
self.config = load_config(config_path)
def run(self, days_back=1):
print("=" * 60)
print("๐ DATA ANALYSIS PIPELINE")
print("=" * 60)
# Get data sources
sources = get_data_paths(self.config, days_back)
# Extract
extractor = DataExtractor()
data = extractor.extract_all(sources)
if not data:
print("โ No data found")
return
# Validate
validator = DataValidator()
validator.validate_all(data, self.config)
# Clean
cleaner = DataCleaner(self.config.get("cleaning", {}))
data = cleaner.clean_all(data)
# Analyze
analyzer = DataAnalyzer()
results = analyzer.analyze_all(data)
# Save processed data
output_dir = Path(self.config["data"]["processed_path"])
output_dir.mkdir(parents=True, exist_ok=True)
for name, df in data.items():
df.to_parquet(output_dir / f"{name}_processed.parquet")
print("\nโ
Pipeline complete!")
return data, results
# Run
if __name__ == "__main__":
pipeline = DataPipeline()
data, results = pipeline.run(days_back=7)
โ Previous
Data Visualization Best Practices
Next โ
Data Science Projects for Beginners โ Build Your Portfolio
Built by the developers of DodaTech
Doda Browser, DodaZIP & Durga Antivirus Pro