TaxiPipeline ETL
TaxiPipeline ETL
Pipeline ETL profissional para ingestão, limpeza, validação, deduplicação e carga de dados públicos de corridas de táxi amarelo de Nova York (NYC Yellow Taxi Trip Data), utilizando SQL Server como banco de dados e C#/.NET 9 como orquestrador.

Suporta três modos de ingestão: arquivos CSV locais, arquivos Parquet locais e download direto da API pública da NYC TLC.

SQL Server C# / .NET 9 ETL Clean Architecture Data Engineering

Objetivo

Demonstrar domínio de engenharia de dados com foco em arquitetura de dados em camadas, validação de qualidade, rastreabilidade operacional e carga confiável.

  • Arquitetura de dados em camadas (Landing → Staging → Core)
  • Stored procedures T-SQL com conversão segura, validação e deduplicação
  • Código C# orientado a objetos com separação de responsabilidades (Clean Architecture)
  • Controle de lote, rastreabilidade e tratamento de erros
  • Carga idempotente e deduplicação por hash SHA2_256
  • Qualidade de dados com 11 regras de validação e rejeição documentada
  • Ingestão multi-formato (CSV, Parquet e API pública)

Arquitetura

O pipeline foi desenhado para suportar múltiplas entradas de dados, leitura automática por formato e processamento em camadas até a carga final confiável.

CSV Local
Arquivos planos processados diretamente do diretório de entrada.
Parquet Local
Arquivos colunares lidos com suporte a formatação e limitação de registros.
API NYC TLC
Download streaming do dataset público oficial da NYC TLC.
TaxiApiService Responsável pela aquisição dos dados externos com download em streaming.
FileReaderResolver Detecta automaticamente o tipo de arquivo e roteia o processamento para o leitor apropriado.
CsvFileReaderService
Leitura flexível de CSV com mapeamento entre formatos de coluna.
ParquetFileReaderService
Leitura de arquivos Parquet com conversão de tipos para o domínio.
Landing
Ingestão de dados brutos preservando o conteúdo original.
Staging
Limpeza, tipagem, validação e deduplicação.
Core
Base final confiável e pronta para consumo analítico.
Ops
Controle operacional, logs, métricas e auditoria.

Arquitetura em Camadas — C#

A aplicação foi estruturada para separar entrada, orquestração, infraestrutura e domínio, facilitando manutenção e evolução do projeto.

Console Program.cs como ponto de entrada, configuração de CLI e injeção de dependência.
Application PipelineOrchestrator coordena as 8 etapas do pipeline.
Database
Persistência, controle de lote e execução de procedures.
FileSystem
Leitura, escrita e gerenciamento de arquivos.
API
Integração com a fonte pública da NYC TLC.
Logging
Observabilidade, auditoria e diagnóstico.
Domain Entidades, interfaces, enums, regras e contratos centrais do sistema.

Fluxo do Pipeline

Cada execução segue 8 etapas sequenciais do início do lote até a carga final e fechamento com métricas.

Etapa 1 — StartBatchAbre registro de execução em ops.BatchControl.
Etapa 2 — ReadFileLê arquivo CSV ou Parquet e converte para List<TripRecord>.
Etapa 3 — InsertLandingRealiza carga em landing.YellowTripRaw usando SqlBulkCopy.
Etapa 4 — CleanDataConverte tipos com TRY_CAST e gera hash para staging.YellowTripClean.
Etapa 5 — RejectInvalidAplica 11 regras de validação e move inválidos para staging.YellowTripRejected.
Etapa 6 — DeduplicateUsa SHA2_256 para detectar duplicados e marca is_duplicate = 1.
Etapa 7 — LoadCoreInsere apenas registros válidos e únicos em core.Trip.
Etapa 8 — FinishBatchAtualiza métricas e status da execução no lote.

Fluxo de Processamento de Dados

O processamento evolui do dado bruto até a base confiável, preservando rastreabilidade e controle de qualidade.

Arquivo de EntradaCSV, Parquet ou API.
landing.YellowTripRawPreserva o dado bruto com todas as colunas em NVARCHAR.
staging.YellowTripCleanTipagem correta, hash gerado para deduplicação e cálculo de duração da corrida.
Registros Válidos
Dados que passaram pelas regras de validação.
staging.YellowTripRejected
Armazena registros inválidos com motivo de rejeição.
DeduplicaçãoMarca registros com is_duplicate igual a 0 ou 1.
core.TripTabela final confiável com índice UNIQUE baseado em hash.

Tecnologias

Componente Tecnologia
Banco de dadosSQL Server 2019+
LinguagemC# / .NET 9
Acesso ao bancoMicrosoft.Data.SqlClient 5.2.2
Leitura ParquetParquet.Net
Injeção de dependênciaMicrosoft.Extensions.DependencyInjection
ConfiguraçãoMicrosoft.Extensions.Configuration
LoggingMicrosoft.Extensions.Logging
Bulk insertSqlBulkCopy
Download APIHttpClient (streaming)
DeduplicaçãoHASHBYTES SHA2_256

Estrutura do Projeto

TaxiPipeline/ ├── TaxiPipeline.slnx ├── .gitignore ├── README.md ├── data/ │ ├── input/ │ └── archive/ ├── docs/ │ └── DOCUMENTACAO_TECNICA.md ├── sql/ │ ├── 01_database/ │ ├── 02_schemas/ │ ├── 03_tables/ │ ├── 04_indexes/ │ └── 05_stored_procedures/ └── src/ ├── TaxiPipeline.Domain/ ├── TaxiPipeline.Application/ ├── TaxiPipeline.Infrastructure/ └── TaxiPipeline.Console/

Pré-requisitos

Requisito Versão mínima
SQL Server2019 (Express, Developer ou Standard)
.NET SDK9.0
Permissõessysadmin ou dbcreator no SQL Server

Instalação e Configuração

1. Clonar o Repositório

git clone https://github.com/TonFLY/Taxi-ETL-Pipeline---SQL-Server-CSHARP.git cd Taxi-ETL-Pipeline---SQL-Server-CSHARP/TaxiPipeline

2. Criar o Banco de Dados

Execute os scripts SQL na ordem numérica no SQL Server Management Studio, Azure Data Studio ou sqlcmd.

sql/01_database/001_create_database.sql sql/02_schemas/001_create_schemas.sql sql/03_tables/001_ops_tables.sql sql/03_tables/002_landing_tables.sql sql/03_tables/003_staging_tables.sql sql/03_tables/004_core_tables.sql sql/04_indexes/001_create_indexes.sql sql/05_stored_procedures/ops/001_usp_start_batch.sql sql/05_stored_procedures/ops/002_usp_finish_batch.sql sql/05_stored_procedures/ops/003_usp_log_error.sql sql/05_stored_procedures/landing/001_usp_insert_yellow_trip_raw.sql sql/05_stored_procedures/staging/001_usp_clean_yellow_trip_data.sql sql/05_stored_procedures/staging/002_usp_reject_invalid_yellow_trip_data.sql sql/05_stored_procedures/staging/003_usp_deduplicate_yellow_trip_data.sql sql/05_stored_procedures/core/001_usp_load_trip.sql

3. Configurar a Connection String

Crie o arquivo src/TaxiPipeline.Console/appsettings.Development.json.

{ "Pipeline": { "ConnectionString": "Server=SEU_SERVIDOR;Database=TaxiPipelineDB;User Id=SEU_USUARIO;Password=SUA_SENHA;TrustServerCertificate=True;", "InputDirectory": "C:\\caminho\\para\\data\\input", "ArchiveDirectory": "C:\\caminho\\para\\data\\archive", "DownloadDirectory": "C:\\caminho\\para\\data\\input", "ArchiveAfterProcessing": false, "MaxRecordsFromApi": 1000 } }
Importante: nunca coloque credenciais reais no appsettings.json. Use appsettings.Development.json ou appsettings.Local.json.

4. Restaurar Dependências e Compilar

dotnet restore src/TaxiPipeline.Console/TaxiPipeline.Console.csproj dotnet build src/TaxiPipeline.Console/TaxiPipeline.Console.csproj

Modos de Execução

Processar todos os arquivos do diretório de entrada

dotnet run --project src/TaxiPipeline.Console

Processa todos os arquivos que correspondam ao padrão FilePattern no InputDirectory.

Processar um arquivo específico

dotnet run --project src/TaxiPipeline.Console -- "C:\caminho\para\arquivo.csv" dotnet run --project src/TaxiPipeline.Console -- "C:\caminho\para\arquivo.parquet"

Download e processamento da API NYC TLC

dotnet run --project src/TaxiPipeline.Console -- --api dotnet run --project src/TaxiPipeline.Console -- --api 2025 1 dotnet run --project src/TaxiPipeline.Console -- --api 2024 6
A NYC publica dados com cerca de 2 meses de atraso. O pipeline realiza download streaming do Parquet, salva localmente e processa automaticamente.

Exibir ajuda

dotnet run --project src/TaxiPipeline.Console -- --help

Schemas do Banco de Dados

Schema Responsabilidade Tabelas
opsControle operacional: batch, logs, erros, métricas de qualidade4 tabelas
landingDados brutos exatamente como vieram do arquivo2 tabelas
stagingDados limpos, tipados, validados e deduplicados2 tabelas
coreDados finais confiáveis — single source of truth1 tabela

Stored Procedures

Procedure Schema O que faz
usp_start_batchopsCria registro no BatchControl e retorna @batch_id.
usp_finish_batchopsAtualiza status, métricas e timestamps do batch.
usp_log_erroropsRegistra erros no ExecutionError e ExecutionLog.
usp_insert_yellow_trip_rawlandingInsert individual no landing como backup ao BulkCopy.
usp_clean_yellow_trip_datastagingConverte com TRY_CAST, calcula duração e gera hash SHA2_256.
usp_reject_invalid_yellow_trip_datastagingAplica 11 regras e move inválidos para rejected.
usp_deduplicate_yellow_trip_datastagingMarca duplicados intra-batch e cross-batch.
usp_load_tripcoreInsere apenas registros únicos e válidos.

Todas as procedures são idempotentes: limpam dados do batch antes de reprocessar.

Regras de Validação

A procedure staging.usp_reject_invalid_yellow_trip_data aplica 11 regras de qualidade.

Código Regra Campo
R001pickup_datetime não pode ser NULL ou inválidopickup_datetime
R002dropoff_datetime não pode ser NULL ou inválidodropoff_datetime
R003dropoff deve ser posterior ao pickuppickup/dropoff
R004trip_distance não pode ser negativatrip_distance
R005fare_amount não pode ser negativofare_amount
R006total_amount não pode ser negativototal_amount
R007passenger_count deve estar entre 0 e 9passenger_count
R008Duração da corrida não pode exceder 12 horascalculado
R009total_amount não pode ser menor que fare_amounttotal/fare
R010pickup_location_id deve estar entre 1 e 265pickup_location_id
R011dropoff_location_id deve estar entre 1 e 265dropoff_location_id

Deduplicação

A deduplicação acontece em dois níveis para garantir consistência e idempotência.

1. Intra-batch
Registros com o mesmo row_hash dentro do mesmo lote mantêm o menor clean_id e marcam os demais como duplicados.
2. Cross-batch
Se o row_hash já existir em core.Trip, o registro é marcado como duplicado.

Composição do hash (SHA2_256)

HASHBYTES('SHA2_256', CONCAT( vendor_id, '|', pickup_datetime, '|', dropoff_datetime, '|', passenger_count, '|', trip_distance, '|', pickup_location_id, '|', dropoff_location_id, '|', fare_amount, '|', total_amount ))

O índice UNIQUE em core.Trip.row_hash é a garantia final contra duplicatas.

Projetos C#

TaxiPipeline.Domain

ArquivoDescrição
AppSettings.csPOCO com connection string, diretórios, API e limites.
BatchContext.csContexto de execução com BatchId e métricas.
TripRecord.csEntidade com 19 campos como string?.
BatchStatus.csEnum com estados do batch.
PipelineStep.csEnum com as 8 etapas do pipeline.
I*Service.csInterfaces que definem os contratos do sistema.

TaxiPipeline.Application

ArquivoDescrição
PipelineOrchestrator.csExecuta as 8 etapas em sequência com tratamento de erro e logging.

TaxiPipeline.Infrastructure

ArquivoDescrição
TaxiApiService.csDownload streaming com cache local e rename atômico.
SqlConnectionFactory.csCentraliza criação de conexões SQL Server.
BatchService.csStart/Finish batch via stored procedures.
RawLoadService.csSqlBulkCopy com DataTable e mapeamento de colunas.
StoredProcedureExecutor.csExecução genérica de SPs com output parameter.
CsvFileReaderService.csParser CSV com mapeamento flexível.
ParquetFileReaderService.csLeitura Parquet com suporte a limite de registros.
FileReaderResolver.csDetecta extensão e delega ao leitor correto.
ExecutionLogger.csLogging duplo: console e banco.

Configurações

O arquivo appsettings.json contém as configurações base sem credenciais reais.

{ "Pipeline": { "ConnectionString": "Server=YOUR_SERVER;Database=TaxiPipelineDB;User Id=YOUR_USER;Password=YOUR_PASSWORD;TrustServerCertificate=True;", "InputDirectory": "..\\..\\..\\..\\..\\data\\input", "ArchiveDirectory": "..\\..\\..\\..\\..\\data\\archive", "DownloadDirectory": "..\\..\\..\\..\\..\\data\\input", "FilePattern": "*.csv", "BulkCopyBatchSize": 5000, "BulkCopyTimeoutSeconds": 120, "ArchiveAfterProcessing": true, "CsvDelimiter": ",", "ApiBaseUrl": "https://d37ci6vzurychx.cloudfront.net/trip-data/", "ApiFileNamePattern": "yellow_tripdata_{0}-{1:D2}.parquet", "MaxRecordsFromApi": 10000 } }

Hierarquia de Configuração

1. appsettings.json 2. appsettings.Development.json 3. appsettings.Local.json 4. Variáveis de ambiente (prefixo TAXIPIPELINE_)

Segurança — Proteção de Credenciais

MecanismoDetalhes
appsettings.jsonContém apenas placeholders.
appsettings.Development.jsonCredenciais reais, ignorado pelo Git.
appsettings.Local.jsonAlternativa local, também ignorado pelo Git.
Variáveis de ambientePrefixo TAXIPIPELINE_ com maior prioridade.
.gitignoreExclui arquivos locais de configuração e dados baixados.

Consultas de Verificação

-- Status dos batches SELECT batch_id, source_file_name, batch_status, total_rows_read, total_rows_landed, total_rows_cleaned, total_rows_rejected, total_rows_loaded, started_at, finished_at FROM ops.BatchControl ORDER BY batch_id DESC; -- Log de execução do último batch SELECT step_name, step_status, rows_affected, message, started_at, finished_at FROM ops.ExecutionLog WHERE batch_id = (SELECT MAX(batch_id) FROM ops.BatchControl) ORDER BY log_id; -- Total de corridas carregadas SELECT COUNT(*) AS total_trips FROM core.Trip;

Decisões Técnicas

  • Landing com NVARCHAR: preserva o dado original sem perda por conversão prematura.
  • TRY_CAST no staging: conversão segura que retorna NULL em vez de erro.
  • SqlBulkCopy: performance muito superior ao insert linha a linha.
  • SHA2_256: reduz drasticamente o risco de colisão.
  • Idempotência: cada procedure limpa dados do batch antes de reprocessar.
  • 4 schemas: separação clara entre bruto, tratado, final e controle.
  • Logging duplo: console para dev e banco para auditoria.
  • Download atômico: uso de arquivo temporário antes do rename final.

Melhorias Futuras

  • Processamento paralelo de múltiplos arquivos
  • Dashboard de monitoramento com métricas de qualidade
  • Testes unitários e de integração
  • Containerização com Docker + Docker Compose
  • CI/CD com GitHub Actions
  • Notificações por email ou Slack em caso de falha
  • Particionamento por data em core.Trip
  • Suporte a carga incremental por watermark
  • Migração automática de schema com DbUp ou Flyway
  • Suporte a múltiplos tipos de táxi

Fonte de Dados

InformaçãoValor
Fonte oficialNYC Taxi & Limousine Commission (TLC)
CDN (Parquet)https://d37ci6vzurychx.cloudfront.net/trip-data/
Período disponível2009 até cerca de 2 meses atrás
Volume mensalAproximadamente 3 milhões de registros
FormatoApache Parquet

Material de Estudos e Referências

Este projeto foi construído com base em vivência profissional com SQL Server e C#, complementada por documentação oficial e material de estudo.

Curso

CursoPlataformaInstrutor
Programação Orientada a Objetos com C#UdemyNelio Alves

Documentação Oficial

  • C# Language Reference
  • .NET 9 Documentation
  • Dependency Injection in .NET
  • Configuration in .NET
  • Logging in .NET
  • Microsoft.Data.SqlClient
  • SqlBulkCopy Class
  • HttpClient Class
  • Clean Architecture (.NET)
  • T-SQL Reference
  • TRY_CAST
  • HASHBYTES (SHA2_256)
  • Stored Procedures
  • ROW_NUMBER
  • Indexes
  • Schemas