Introdução

Este projeto foi desenhado para ir além de um ETL simples. A proposta é receber arquivos CSV brutos de corridas de táxi de Nova York, padronizar schema, aplicar regras de qualidade, gerar métricas analíticas e disponibilizar o resultado para consumo por SQL e dashboard.

O valor técnico da solução está menos no fato de "ler arquivos" e mais na forma como ela organiza responsabilidades: ingestão, transformação, qualidade, agregação e consumo ficam separadas, tornando o fluxo mais fácil de manter, testar e evoluir.


Objetivo

Transformar dados brutos de Yellow Taxi em uma camada analítica confiável, com rastreabilidade de erros e artefatos prontos para exploração.

Fluxo lógico:

CSV bruto
  -> ingestão
  -> padronização de colunas
  -> transformação de tipos e criação de features
  -> aplicação de regras de qualidade
  -> separação entre dados válidos e inválidos
  -> geração de agregações
  -> persistência em Parquet
  -> consumo via DuckDB e Streamlit

Estrutura do projeto

app/
  dashboard.py
data/
  raw/
  processed/
sql/
src/
  analytics.py
  ingest.py
  pipeline.py
  quality.py
  transform.py
  utils.py
README.md
requirements.txt

Decisão arquitetural

A estrutura foi separada por responsabilidade:

  • src/ concentra a lógica da pipeline.

  • data/raw/ preserva a matéria-prima.

  • data/processed/ guarda os datasets gerados.

  • app/ representa a camada de consumo.

  • sql/ suporta exploração e consultas analíticas.

Essa divisão evita acoplamento desnecessário. Notebook, dashboard e pipeline deixam de competir pelo mesmo papel.

Orquestração da pipeline

O ponto de entrada está em src/pipeline.py:

def run_pipeline(raw_dir: Path = DATA_RAW_DIR, processed_dir: Path = DATA_PROCESSED_DIR, nrows: int | None = None) -> None:
    ensure_directories()

    df_raw = ingest_raw_data(raw_dir=raw_dir, nrows=nrows)
    df_transformed = transform_base(df_raw)
    df_quality = apply_quality_rules(df_transformed)

    df_valid, df_invalid = split_valid_invalid(df_quality)
    df_quality_summary = build_quality_summary(df_quality)
    df_quality_rules = quality_by_rule(df_quality)

    df_daily = build_daily_metrics(df_valid)
    df_weekly = build_weekly_metrics(df_valid)
    df_shift = build_shift_metrics(df_valid)
    df_vendor = build_vendor_metrics(df_valid)
    df_payment = build_payment_metrics(df_valid)

Por que isso é importante?

Essa função deixa explícita a ordem das etapas e, com isso, comunica a arquitetura do projeto:

  1. Ingestão

  2. Transformação

  3. Qualidade

  4. Separação entre válido e inválido

  5. Agregação

  6. Persistência

Isso reduz ambiguidade e evita um problema comum em projetos pequenos: concentrar toda a lógica em um único arquivo gigante, difícil de revisar e perigoso de alterar.

Ingestão dos arquivos

A ingestão está em src/ingest.py.

Descoberta automática dos CSVs

def discover_csv_files(raw_dir: Path = DATA_RAW_DIR, pattern: str = CSV_PATTERN) -> list[Path]:
    files = sorted(raw_dir.glob(pattern))
    if not files:
        raise FileNotFoundError(
            f"Nenhum arquivo encontrado em '{raw_dir}' com pattern '{pattern}'."
        )
    return files

Decisão crítica 1: usar pattern em vez de nomes fixos

O projeto define:

CSV_PATTERN = "yellow_tripdata_*.csv"

Isso permite que novos arquivos sejam reconhecidos automaticamente, sem alterar código. É uma decisão simples, mas importante, porque reduz erro operacional e melhora reusabilidade.

Decisão crítica 2: falhar cedo

Se não houver arquivo, a pipeline lança erro imediatamente. Isso é melhor do que seguir com DataFrame vazio e descobrir o problema só depois, em alguma etapa indireta.

### Leitura com rastreabilidade

def read_taxi_csv(file_path: Path, nrows: int | None = None) -> pd.DataFrame:
    df = pd.read_csv(file_path, low_memory=False, nrows=nrows)
    df = normalize_columns(df)
    df["arquivo_origem"] = file_path.name
return df

Decisão crítica 3: registrar arquivo_origem

Essa coluna é um detalhe extremamente valioso, porque viabiliza:

  • análise de qualidade por lote;

  • identificação rápida de arquivo problemático;

  • comparação entre entregas;

  • geração de sumário global e por origem.

Sem esse campo, o pipeline ainda funcionaria, mas perderia grande parte da rastreabilidade operacional.

Decisão crítica 4: usar low_memory=False

Em CSVs grandes, a inferência por blocos do pandas pode gerar tipos inconsistentes. Essa configuração torna a leitura mais previsível, o que é especialmente útil quando a base contém valores malformados.

Consolidação dos lotes

def ingest_raw_data(raw_dir: Path = DATA_RAW_DIR, nrows: int | None = None) -> pd.DataFrame:
   files = discover_csv_files(raw_dir=raw_dir)
   frames = [read_taxi_csv(file_path=csv_file, nrows=nrows) for csv_file in files]
   combined = pd.concat(frames, ignore_index=True)
   return normalize_columns(combined)

Para o escopo do projeto, concatenar os arquivos em memória é uma decisão equilibrada: mantém a solução simples e suficiente, sem antecipar complexidade de particionamento ou processamento distribuído.


Transformação e padronização da base

Essa etapa está em src/transform.py.

Conversão explícita de datas

out["tpep_pickup_datetime"] = pd.to_datetime(out["tpep_pickup_datetime"], errors="coerce")
out["tpep_dropoff_datetime"] = pd.to_datetime(out["tpep_dropoff_datetime"], errors="coerce")

Decisão crítica 5: usar errors="coerce"

Esse parâmetro evita que um valor inválido derrube a pipeline inteira. Em vez disso, o dado problemático vira NaT e segue para a camada de qualidade, onde será classificado corretamente.

Essa separação de responsabilidade é muito boa:

  • transformação converte;

  • qualidade avalia;

  • pipeline preserva observabilidade.

Conversão explícita de numéricos

```python

numeric_cols = [ 
"passenger_count"
,"trip_distance"
,"pickup_longitude"
,"pickup_latitude"
,"RateCodeID"
,"dropoff_longitude"
,"dropoff_latitude"
,"payment_type"
,"fare_amount"
,"extra"
,"mta_tax"
,"tip_amount"
,"tolls_amount"
,"improvement_surcharge"
,"total_amount"
,"VendorID"
]
for col in numeric_cols:
   if col in out.columns:
         out[col] = pd.to_numeric(out[col], errors="coerce")

Decisão crítica 6: não confiar no schema implícito do CSV

CSV não preserva tipagem robusta. Forçar a conversão é essencial para evitar comportamento inconsistente nas etapas seguintes.

Derivação de colunas temporais

out["pickup_date"] = out["tpep_pickup_datetime"].dt.normalize()
out["pickup_year"] = out["tpep_pickup_datetime"].dt.year
out["pickup_month"] = out["tpep_pickup_datetime"].dt.month
out["pickup_day"] = out["tpep_pickup_datetime"].dt.day
out["pickup_hour"] = out["tpep_pickup_datetime"].dt.hour
out["pickup_year_month"] = out["tpep_pickup_datetime"].dt.strftime("%Y-%m")

Essas colunas aparecem cedo porque serão reutilizadas em qualidade, agregações e dashboard. Centralizar essa semântica em um único lugar evita divergência entre camadas.


Métricas derivadas e proteção contra erros matemáticos

Uma parte crítica da transformação é a criação de indicadores derivados:

out["trip_duration_min"] = (
    out["tpep_dropoff_datetime"] - out["tpep_pickup_datetime"]
).dt.total_seconds() / 60
out["trip_duration_hr"] = out["trip_duration_min"] / 60
out["avg_speed_mph"] = safe_divide(out["trip_distance"], out["trip_duration_hr"])
out["tip_pct"] = safe_divide(out["tip_amount"] * 100, out["fare_amount"])
out["revenue_per_minute"] = safe_divide(out["total_amount"], out["trip_duration_min"])
out["revenue_per_mile"] = safe_divide(out["total_amount"], out["trip_distance"])

O helper safe_divide, em src/utils.py, é:

def safe_divide(
    numerator: pd.Series,
    denominator: pd.Series,
) -> pd.Series:
    denominator_arr = pd.Series(denominator).replace(0, pd.NA)
    result = pd.Series(numerator) / denominator_arr
    return result.replace([float("inf"), float("-inf")], pd.NA)

Decisão crítica 7: encapsular divisão segura

Essa função protege a pipeline de gerar valores infinitos ou enganosos quando o denominador é zero. Isso é crucial em métricas como:

  • velocidade média;

  • percentual de gorjeta;

  • receita por minuto;

  • receita por milha.

Sem esse cuidado, agregações posteriores poderiam incorporar inf ou resultados absurdos, degradando a qualidade analítica da base.


Enriquecimento semântico da base

Classificação por turno

def shiftfrom_hour(hour: float | int | None) -> str:
    if pd.isna(hour):
        return "unknown"
    hour = int(hour)
    if 5 <= hour <= 11:
        return "morning"
    if 12 <= hour <= 16:
        return "afternoon"
    if 17 <= hour <= 21:
        return "evening"
    else:
        return "night"

Depois:

out["shift_of_day"] = out["pickup_hour"].apply(_shift_from_hour)

Decisão crítica 8: materializar regra de negócio na camada de dados

Turno não é um campo do arquivo original. É uma interpretação analítica. Colocá-lo na transformação, e não apenas no dashboard, torna a base muito mais reutilizável.

Faixas de distância e duração

out["distance_band"] = pd.cut(
    out["trip_distance"],
    bins=[0, 1, 3, 10, 30, float("inf")],
    labels=["0-1", "1-3", "3-10", "10-30", "30+"],
    include_lowest=True,
).astype("string")

out["duration_band"] = pd.cut(
    out["trip_duration_min"],
    bins=[0, 5, 15, 30, 60, 120, 300, float("inf")],
    labels=["0-5", "5-15", "15-30", "30-60", "60-120", "120-300", "300+"],
    include_lowest=True,
).astype("string")

Por que essas faixas importam?

Porque elas transformam variáveis contínuas em dimensões fáceis de interpretar. Isso melhora muito a exploração analítica e a construção de narrativas no dashboard ou em relatórios.


Consistência financeira

out["total_calculado"] = (
    out["fare_amount"].fillna(0)
   + out["extra"].fillna(0)
   + out["mta_tax"].fillna(0)
   + out["tip_amount"].fillna(0)
   + out["tolls_amount"].fillna(0)
   + out["improvement_surcharge"].fillna(0)
)

Decisão crítica 9: recalcular total_amount

Em vez de aceitar o total como verdade absoluta, o projeto recompõe o valor a partir de seus componentes. Isso é importante porque cria uma verificação de coerência entre colunas monetárias, algo muito mais sofisticado do que apenas validar tipo e presença.


Validação do mês esperado pelo nome do arquivo

out["ano_mes_esperado"] = out["arquivo_origem"].map(extract_year_month_from_filename)

Com o helper:

def extract_year_month_from_filename(file_name: str) -> str | None:
    match = re.search(r"(\d{4}-\d{2})", file_name)
    return match.group(1) if match else None

Decisão crítica 10: usar metadado do lote como controle

Essa é uma escolha muito inteligente porque compara o contexto do arquivo com o conteúdo do registro. Assim, a pipeline consegue identificar mistura indevida de partições ou nomeação incorreta dos lotes.


Qualidade de dados como parte da arquitetura

As regras de qualidade estão em src/quality.py.

Catálogo de flags

FLAG_COLUMNS = [
    "flag_data_nula",
    "flag_ordem_temporal_invalida",
    "flag_distancia_invalida",
    "flag_passageiros_invalidos",
    "flag_tarifa_invalida",
    "flag_duracao_invalida",
    "flag_velocidade_invalida",
    "flag_vendor_invalido",
    "flag_payment_type_invalido",
    "flag_store_and_fwd_invalido",
    "flag_ratecode_invalido",
    "flag_duplicado",
    "flag_total_inconsistente",
    "flag_pickup_coord_invalida",
    "flag_dropoff_coord_invalida",
    "flag_fora_mes",
]

Decisão crítica 11: modelar qualidade por regra, não só por status final

Ter uma flag por regra aumenta explicabilidade. Em vez de apenas saber que um registro é inválido, sabemos exatamente por quê.

Isso viabiliza:

  • auditoria;

  • priorização de correções;

  • diagnóstico por tipo de erro;

  • métricas de qualidade por regra.

Regras principais

Datas nulas:

out["flag_data_nula"] = out["tpep_pickup_datetime"].isna() | out["tpep_dropoff_datetime"].isna()

Ordem temporal inválida:

out["flag_ordem_temporal_invalida"] = out["tpep_dropoff_datetime"] < out["tpep_pickup_datetime"]

Distância fora do plausível:

out["flag_distancia_invalida"] = out["trip_distance"].isna() | (out["trip_distance"] <= 0) | (out["trip_distance"] > 100)

Passageiros inválidos:

out["flag_passageiros_invalidos"] = (
        out["passenger_count"].isna() | (out["passenger_count"] <= 0) | (out["passenger_count"] > 6)
)

Tarifa inválida:

out["flag_tarifa_invalida"] = out["fare_amount"].isna() | (out["fare_amount"] <= 0) | (out["fare_amount"] > 500)

Duração inválida:

out["flag_duracao_invalida"] = (
    out["trip_duration_min"].isna() | (out["trip_duration_min"] <= 0) (out["trip_duration_min"] > 300)
)

Velocidade inválida:

out["flag_velocidade_invalida"] = (
    out["avg_speed_mph"].isna() | (out["avg_speed_mph"] <= 0) | (out["avg_speed_mph"] > 85)
)

Leitura técnica dessas decisões

Essas regras combinam dois tipos de validação:

  • integridade básica, como nulos e ordem temporal;

  • plausibilidade de negócio, como velocidade, distância e tarifa.

Essa combinação é importante porque dados podem estar "preenchidos" e ainda assim serem analiticamente ruins.

Validação de domínios categóricos

vendors_validos = {1, 2}
payment_types_validos = {1, 2, 3, 4, 5, 6}
store_fwd_validos = {"Y", "N"}
rate_codes_validos = {1, 2, 3, 4, 5, 6}
out["flag_vendor_invalido"] = ~out["VendorID"].isin(vendors_validos)
out["flag_payment_type_invalido"] = ~out["payment_type"].isin(payment_types_validos)
out["flag_store_and_fwd_invalido"] = ~store_and_fwd.isin(store_fwd_validos)
out["flag_ratecode_invalido"] = ~out["RateCodeID"].isin(rate_codes_validos)

Validar domínio impede categorias inesperadas de contaminarem agregações e visualizações.

Duplicidade

out["flag_duplicado"] = out.duplicated(
    subset=[
        "VendorID",
        "tpep_pickup_datetime",
        "tpep_dropoff_datetime",
        "trip_distance",
        "fare_amount",
        "total_amount",
    ],
        keep=False,
    )

Decisão crítica 12: marcar duplicado em vez de remover silenciosamente

Essa escolha preserva evidência de problema upstream. Em vez de "limpar e esquecer", a pipeline registra o erro e permite análise posterior.

Inconsistência financeira

out["flag_total_inconsistente"] = (out["total_amount"] - out["total_calculado"]).abs() > 0.01

O uso de tolerância evita falsos positivos por arredondamento.

Coordenadas fora da área esperada

out["flag_pickup_coord_invalida"] = (
    out["pickup_longitude"].isna()
    | out["pickup_latitude"].isna()
    | (out["pickup_longitude"] < -74.30)
    | (out["pickup_longitude"] > -73.65)
    | (out["pickup_latitude"] < 40.45)
    | (out["pickup_latitude"] > 40.95)
)

Existe regra equivalente para dropoff. Isso protege a base contra anomalias geográficas.

Registros fora do mês esperado

out["flag_fora_mes"] = (
expected_month.isna()
    | ~expected_month.isin(EXPECTED_MONTHS)
    | (out["pickup_year_month"] != expected_month)
)

Essa regra cruza conteúdo e metadado do lote, o que é um sinal de maturidade no desenho da qualidade.

Consolidação da qualidade

out[FLAG_COLUMNS] = out[FLAG_COLUMNS].fillna(False)
out["qtd_regras_invalidas"] = out[FLAG_COLUMNS].astype(int).sum(axis=1)
out["flag_invalido"] = out["qtd_regras_invalidas"] > 0

Decisão crítica 13: medir intensidade do problema

flag_invalido responde se o registro pode ir para a camada clean.

qtd_regras_invalidas responde quão comprometido ele está.

Isso enriquece muito a análise de qualidade, porque permite olhar não apenas volume de erros, mas severidade acumulada por registro e por lote.


Separação entre camada válida e inválida

def split_valid_invalid(df_quality: pd.DataFrame) -> tuple[pd.DataFrame, pd.DataFrame]:
    valid = df_quality[~df_quality["flag_invalido"]].copy()
    invalid = df_quality[df_quality["flag_invalido"]].copy()
    return valid, invalid

Decisão crítica 14: preservar o dado inválido

Essa é uma escolha muito acertada. Registros inválidos não são apenas "lixo". Eles servem para:

  • auditoria;

  • diagnóstico de problemas upstream;

  • revisão de regras de negócio;

  • monitoramento de qualidade ao longo do tempo.

Apagar esses registros significaria perder parte importante da inteligência operacional da pipeline.


Sumário de qualidade por lote

grouped = (
    df_quality.groupby("arquivo_origem", dropna=False)
    .agg(
        total_registros=("arquivo_origem", "size"),
        registros_invalidos=("flag_invalido", "sum"),
        media_regras_invalidas=("qtd_regras_invalidas", "mean"),
    )
    .reset_index()
)

Depois são derivados:

  • registros_validos

  • pct_invalidos

  • score_qualidade

E ainda existe uma linha global:

global_row = {
    "arquivo_origem": "__GLOBAL__",
    ...
}

Decisão crítica 15: combinar visão por arquivo e visão global

Essa decisão é ótima porque atende dois públicos ao mesmo tempo:

  • o técnico-operacional, que precisa saber qual lote deu problema;

  • o executivo-analítico, que quer saber a qualidade geral do pipeline.


Análise de impacto por regra

for flag in FLAG_COLUMNS:
    invalid_count = int(df_quality[flag].sum())
    rule_stats.append(
        {
            "regra": flag,
            "invalid_count": invalid_count,
            "invalid_pct": (invalid_count / total * 100) if total else 0.0,
        }
    )

Por que isso é valioso?

Porque qualidade sem priorização vira apenas relatório. Quando sabemos quais regras mais disparam, conseguimos atacar as causas que geram maior impacto.


Geração das métricas analíticas

Em src/analytics.py, a função central é _metrics_by_group:

def metricsby_group(df: pd.DataFrame, group_cols: list[str]) -> pd.DataFrame:
    grouped = (
        df.groupby(group_cols, dropna=False)
        .agg(
            total_trips=("arquivo_origem", "size"),
            avg_distance=("trip_distance", "mean"),
            avg_fare=("fare_amount", "mean"),
            avg_duration=("trip_duration_min", "mean"),
            avg_speed=("avg_speed_mph", "mean"),
            avg_tip_pct=("tip_pct", "mean"),
            total_revenue=("total_amount", "sum"),
        )
        .reset_index()
    )

Decisão crítica 16: centralizar a lógica de agregação

Em vez de repetir blocos parecidos em várias funções, o projeto extrai o comportamento comum para um helper.

Isso gera benefícios claros:

  • menos duplicação;

  • menor risco de divergência entre métricas;

  • manutenção mais barata;

  • evolução mais segura.

Percentual de participação

total_trips_base = grouped["total_trips"].sum()
grouped["pct_trips"] = 0.0
if total_trips_base > 0:
    grouped["pct_trips"] = grouped["total_trips"] / total_trips_base * 100

Esse campo adiciona contexto relativo, muito útil para dashboard e storytelling analítico.


Visões analíticas geradas

Diário

def build_daily_metrics(df_clean: pd.DataFrame) -> pd.DataFrame:
    return metricsby_group(df_clean, ["pickup_date"]).sort_values("pickup_date")

Ideal para séries temporais e comportamento diário da operação.

Semanal

out["pickup_week_start"] = (
    out["pickup_date"] - pd.to_timedelta(out["pickup_date"].dt.dayofweek, unit="D")
)

Decisão crítica 17: normalizar para início da semana

Essa chave de agregação torna a análise semanal consistente e fácil de consumir, sem ambiguidade de calendário.

Turno, vendor e pagamento

def build_shift_metrics(df_clean: pd.DataFrame) -> pd.DataFrame:
    return metricsby_group(df_clean, ["shift_of_day"]).sort_values("total_trips", ascending=False)
def build_payment_metrics(df_clean: pd.DataFrame) -> pd.DataFrame:
    return metricsby_group(df_clean, ["payment_type"]).sort_values("total_trips", ascending=False)
def build_vendor_metrics(df_clean: pd.DataFrame) -> pd.DataFrame:
    return metricsby_group(df_clean, ["VendorID"]).sort_values("total_trips", ascending=False)

Essas tabelas já saem prontas para responder perguntas operacionais e de receita.


Persistência em Parquet

def saveparquet(df, output_path: Path) -> None:
    output_path.parent.mkdir(parents=True, exist_ok=True)
    df.to_parquet(output_path, index=False)

Decisão crítica 18: usar Parquet como formato da camada processada

Parquet foi uma escolha muito adequada porque oferece:

  • melhor compressão do que CSV;

  • schema mais consistente;

  • leitura mais rápida;

  • ótima integração com engines analíticas como DuckDB.

Artefatos gerados

  • taxi_clean.parquet

  • taxi_invalid.parquet

  • taxi_quality_summary.parquet

  • taxi_quality_by_rule.parquet

  • taxi_daily.parquet

  • taxi_weekly.parquet

  • taxi_shift.parquet

  • taxi_vendor.parquet

  • taxi_payment_type.parquet

Decisão crítica 19: persistir datasets temáticos

Essa materialização evita que cada consumidor precise recalcular agregações. O dashboard, por exemplo, fica mais simples e mais rápido porque consome visões já prontas.


Consumo com DuckDB e Streamlit

No app/dashboard.py, a função _query monta views sobre os Parquets:

with duckdb.connect() as conn:
    conn.execute(
        f"""
        CREATE OR REPLACE VIEW vw_taxi_clean AS
        SELECT * FROM read_parquet('{(PROCESSED_DIR / "taxi_clean.parquet").as_posix()}');
        ...
        """
    )
    return conn.execute(sql).df()

Decisão crítica 20: usar DuckDB entre armazenamento e aplicação

Essa escolha traz vários ganhos:

  • permite consultar arquivos Parquet com SQL;

  • reduz a quantidade de lógica de transformação dentro do dashboard;

  • facilita expansão futura da camada analítica;

  • mantém a interface focada em consumo, não em processamento.

Validação prévia dos artefatos

def ensuredata():
    required = [
        "taxi_clean.parquet",
        "taxi_daily.parquet",
        "taxi_weekly.parquet",
        "taxi_quality_summary.parquet",
        "taxi_quality_by_rule.parquet",
        "taxi_shift.parquet",
        "taxi_vendor.parquet",
        "taxi_payment_type.parquet",
    ]

Essa verificação melhora muito a experiência operacional. Em vez de um stack trace genérico, o usuário recebe uma instrução clara para executar a pipeline antes do dashboard.


Exemplo de KPI consumido no dashboard

SELECT
        COUNT(*)               AS 'Total de viagens'
    ,   AVG(trip_distance)     AS 'Distancia media (mi)'
    ,   AVG(fare_amount)       AS 'Tarifa media ($)'
    ,   AVG(trip_duration_min) AS 'Duracao media (min)'
    ,   AVG(avg_speed_mph)     AS 'Velocidade media (mph)'
    ,   AVG(tip_pct)           AS 'Pct de gorjeta'
    ,   SUM(total_amount)      AS 'Receita total ($)'
FROM vw_taxi_clean

Esse trecho mostra um ponto importante: o dashboard trabalha sobre a camada clean, ou seja, os indicadores finais já nascem protegidos das distorções causadas por dados inválidos.


Principais méritos técnicos da solução

1. Qualidade antes da agregação

Esse é o principal acerto do projeto. As métricas são construídas sobre df_valid, não sobre a base bruta completa.

2. Dado inválido preservado para auditoria

O pipeline não descarta evidência. Ele separa, registra e permite análise posterior.

3. Regras de negócio explícitas

Limites de velocidade, duração, distância, geografia e domínio aparecem no código e podem ser revisados com clareza.

4. Camada analítica desacoplada do dashboard

O app consome artefatos e SQL, em vez de concentrar toda a inteligência na interface.

5. Estrutura simples, mas madura

Mesmo sem frameworks pesados, o projeto demonstra princípios corretos de engenharia de dados.


Limitações e oportunidades de evolução

Processamento em memória

Hoje todos os CSVs são concatenados em um DataFrame. Para volumes maiores, seria natural evoluir para processamento particionado ou engine mais escalável.

Regras hardcoded

As regras funcionam bem no escopo atual, mas poderiam migrar para configuração externa ou framework de qualidade.

Ausência de testes automatizados

Os melhores candidatos para teste unitário seriam:

  • safe_divide

  • _shift_from_hour

  • extract_year_month_from_filename

  • regras de qualidade com casos limítrofes

  • agregações com massas pequenas controladas

Observabilidade ainda simples

Evoluções interessantes:

  • logging estruturado;

  • métricas de execução;

  • volume processado por lote;

  • alertas de degradação de qualidade.


Conclusão

O projeto mostra como uma pipeline relativamente enxuta pode incorporar princípios reais de engenharia de dados:

  • modularização;

  • schema explícito;

  • qualidade auditável;

  • separação entre dado válido e inválido;

  • persistência em formato analítico;

  • consumo desacoplado.

O ponto mais forte da implementação é tratar qualidade como parte do desenho da solução, e não como correção tardia. Isso faz com que a camada consumida pelo dashboard seja mais confiável e, ao mesmo tempo, mantém os erros acessíveis para investigação.


Comandos de execução

Ambiente

python -m venv .venv
.venv\Scripts\activate
pip install -r requirements.txt

Pipeline completa

python -m src.pipeline

Pipeline com amostra

python -m src.pipeline --nrows 100000

Dashboard

streamlit run app/dashboard.py

Resumo dos arquivos principais

src/ingest.py

Descoberta e leitura dos CSVs brutos, com rastreabilidade por arquivo.

src/transform.py

Conversão de tipos, criação de métricas derivadas e enriquecimento semântico.

src/quality.py

Aplicação das regras de qualidade, separação entre válidos e inválidos e geração dos diagnósticos.

src/analytics.py

Construção das agregações analíticas reutilizáveis.

src/pipeline.py

Orquestração fim a fim e persistência dos artefatos.

app/dashboard.py

Consulta dos Parquets com DuckDB e apresentação dos indicadores em Streamlit.