Skip to content

Cum să construiești un pipeline ETL în Python

DodaTech Updated 2025-01-15 2 min read

In this tutorial, you'll learn about Cum să construiești un pipeline ETL în Python. We cover key concepts, practical examples, and best practices.

Un pipeline ETL (Extract, Transform, Load) extrage date din surse multiple, le transformă și le încarcă într-o destinație, centralizând informațiile pentru analiză.

The Problem

Fără un pipeline ETL bine structurat, datele rămân izolate în surse diferite (fișiere, baze de date, API-uri), iar transformările se aplică manual, generând erori și inconsistențe. Procesarea devine greu de reprodus și de întreținut.

The Wrong Way

# extragere manuală, fără reutilizare
import csv

with open("vanzari.csv") as f:
    reader = csv.DictReader(f)
    data = []
    for row in reader:
        if row["pret"] and float(row["pret"]) > 0:
            data.append(row)

with open("vanzari_curate.csv", "w") as f:
    writer = csv.DictWriter(f, fieldnames=data[0].keys())
    writer.writeheader()
    writer.writerows(data)

Problemă: Codul este rigid, greu de extins și nu separă clar etapele de extragere, transformare și încărcare.

The Right Way

import pandas as pd

def extract(filepath):
    return pd.read_csv(filepath)

def transform(df):
    df = df.dropna(subset=["pret"])
    df["pret"] = df["pret"].astype(float)
    df["total"] = df["cantitate"] * df["pret"]
    return df

def load(df, output_path):
    df.to_csv(output_path, index=False)

pipeline = load(transform(extract("vanzari.csv")), "raport.csv")
print("Pipeline ETL finalizat cu succes.")

Output:

Pipeline ETL finalizat cu succes.

Step-by-Step Fix

1. Definește funcția de extracție

Citește datele din sursă (CSV, JSON, SQL, API) și returnează un DataFrame.

def extract_from_db(connection_string, query):
    from sqlalchemy import create_engine
    engine = create_engine(connection_string)
    return pd.read_sql(query, engine)

2. Implementează transformările

Aplică operații de curățare, filtrare, agregare sau îmbinare.

def transform(df):
    df = df.drop_duplicates()
    df["data"] = pd.to_datetime(df["data"])
    return df

3. Scrie funcția de încărcare

Salvează rezultatul în destinația finală (CSV, bază de date, API).

def load_to_db(df, table_name, engine):
    df.to_sql(table_name, engine, if_exists="replace", index=False)

Prevention Tips

  • Separă clar etapele ETL pentru reutilizare și testare
  • Folosește try/except pentru a gestiona erorile de conectare
  • Loghează fiecare etapă pentru depanare
  • Testează fiecare funcție individual înainte de a o integra în pipeline

Common Mistakes

  1. Amestecarea etapelor — extracție, transformare și încărcare în aceeași funcție
  2. Ignorarea erorilor — fără handling pentru fișiere lipsă sau conexiuni întrerupte
  3. Fără logging — greu de depanat când pipeline-ul eșuează la pasul X
  4. Hardcodarea căilor — folosește configurare sau variabile de mediu
  5. Fără validare — încarcă date chiar dacă transformarea a eșuat parțial

FAQ

### Ce înseamnă ETL?

Extract, Transform, Load — un proces care extrage date din diverse surse, le transformă într-un format util și le încarcă într-o destinație finală.

### Care e diferența dintre ETL și ELT?

În ETL, transformarea are loc înainte de încărcare; în ELT, datele brute se încarcă mai întâi, apoi se transformă în destinație (comun în data warehousing).

### Ce biblioteci Python sunt utile pentru ETL?

Pandas, SQLAlchemy, requests (API), pyarrow (Parquet), Great Expectations (validare).

Construit de dezvoltătorii Doda Browser, DodaZIP și Durga Antivirus Pro. Instrumentele DodaTech se integrează perfect cu pipeline-urile ETL pentru productivitate și securitate sporite.

Built by the developers of DodaTech

Doda Browser, DodaZIP & Durga Antivirus Pro