Cum să construiești un pipeline ETL în Python
In this tutorial, you'll learn about Cum să construiești un pipeline ETL în Python. We cover key concepts, practical examples, and best practices.
Un pipeline ETL (Extract, Transform, Load) extrage date din surse multiple, le transformă și le încarcă într-o destinație, centralizând informațiile pentru analiză.
The Problem
Fără un pipeline ETL bine structurat, datele rămân izolate în surse diferite (fișiere, baze de date, API-uri), iar transformările se aplică manual, generând erori și inconsistențe. Procesarea devine greu de reprodus și de întreținut.
The Wrong Way
# extragere manuală, fără reutilizare
import csv
with open("vanzari.csv") as f:
reader = csv.DictReader(f)
data = []
for row in reader:
if row["pret"] and float(row["pret"]) > 0:
data.append(row)
with open("vanzari_curate.csv", "w") as f:
writer = csv.DictWriter(f, fieldnames=data[0].keys())
writer.writeheader()
writer.writerows(data)
Problemă: Codul este rigid, greu de extins și nu separă clar etapele de extragere, transformare și încărcare.
The Right Way
import pandas as pd
def extract(filepath):
return pd.read_csv(filepath)
def transform(df):
df = df.dropna(subset=["pret"])
df["pret"] = df["pret"].astype(float)
df["total"] = df["cantitate"] * df["pret"]
return df
def load(df, output_path):
df.to_csv(output_path, index=False)
pipeline = load(transform(extract("vanzari.csv")), "raport.csv")
print("Pipeline ETL finalizat cu succes.")
Output:
Pipeline ETL finalizat cu succes.
Step-by-Step Fix
1. Definește funcția de extracție
Citește datele din sursă (CSV, JSON, SQL, API) și returnează un DataFrame.
def extract_from_db(connection_string, query):
from sqlalchemy import create_engine
engine = create_engine(connection_string)
return pd.read_sql(query, engine)
2. Implementează transformările
Aplică operații de curățare, filtrare, agregare sau îmbinare.
def transform(df):
df = df.drop_duplicates()
df["data"] = pd.to_datetime(df["data"])
return df
3. Scrie funcția de încărcare
Salvează rezultatul în destinația finală (CSV, bază de date, API).
def load_to_db(df, table_name, engine):
df.to_sql(table_name, engine, if_exists="replace", index=False)
Prevention Tips
- Separă clar etapele ETL pentru reutilizare și testare
- Folosește
try/exceptpentru a gestiona erorile de conectare - Loghează fiecare etapă pentru depanare
- Testează fiecare funcție individual înainte de a o integra în pipeline
Common Mistakes
- Amestecarea etapelor — extracție, transformare și încărcare în aceeași funcție
- Ignorarea erorilor — fără handling pentru fișiere lipsă sau conexiuni întrerupte
- Fără logging — greu de depanat când pipeline-ul eșuează la pasul X
- Hardcodarea căilor — folosește configurare sau variabile de mediu
- Fără validare — încarcă date chiar dacă transformarea a eșuat parțial
FAQ
Construit de dezvoltătorii Doda Browser, DodaZIP și Durga Antivirus Pro. Instrumentele DodaTech se integrează perfect cu pipeline-urile ETL pentru productivitate și securitate sporite.
Built by the developers of DodaTech
Doda Browser, DodaZIP & Durga Antivirus Pro