← Back to Projects

ETL Studio

CSV → validated datasets with background jobs, SSE progress, retries/DLQ, and S3-backed uploads

Role
Solo
Focus
Background jobs, SSE progress, Multi-tenant, Observability
Testing
Integration tests (API + worker), Snapshot tests for mapping, E2E run flows
Key Patterns
ETL, FastAPI, Celery, SSE
ETLFastAPICelerySSES3/MinIOReliabilityMulti-tenantObservability

What it does

  • Uploads a CSV and maps columns into a canonical schema (date, campaign, channel, spend, clicks, conversions)
  • Runs an import as a background job and streams progress live to the UI via SSE
  • Persists cleaned records and row-level validation errors; exports both cleaned CSV and errors CSV
  • Publishes versioned schema/rules, reruns the same file, and compares runs to quantify rule changes
  • Scopes datasets and runs to organizations (teams/members/invites) for multi-tenant workflows

Architecture

┌────────────────────────┐      ┌─────────────────────────┐
│  Next.js UI            │─────►│  FastAPI API            │
│  (datasets/runs)       │      │  Auth + datasets + runs │
└───────────┬────────────┘      └───────────┬─────────────┘
            │                               │
            │ SSE /runs/:id/events          │ Postgres (metadata)
            ▼                               ▼
┌────────────────────────┐      ┌──────────────────────────┐
│  SSE Stream            │      │  Postgres                │
│  (snapshot/progress)   │      │  datasets/runs/records   │
└────────────────────────┘      └────────────┬─────────────┘
                                             │ enqueue job
                                             ▼
                                    ┌──────────────────┐
                                    │  Redis (broker)  │
                                    └────────┬─────────┘
                                             │
                                             ▼
                                    ┌──────────────────┐
                                    │  Celery Worker   │
                                    │  parse/validate  │
                                    └────────┬─────────┘
                                             │ read/write
                                             ▼
                                    ┌──────────────────┐
                                    │  S3/MinIO        │
                                    │  (uploads)       │
                                    └──────────────────┘

Reliability & Guardrails

  • Idempotent run execution — safe reruns clear prior rows/errors for the run and reprocess from a single source of truth
  • Retries + DLQ — transient failures retry with backoff; exhausted retries land in DLQ with attempt history for debugging
  • Guardrails — upload size/row/column limits, strict mapping/schema checks, and bounded error payloads
  • Observability — OpenTelemetry spans across API + worker for end-to-end tracing (enqueue → process → SSE)

Key Engineering Decisions

  • SSE for run progress instead of pollingLower latency, reconnect-friendly; server pushes state; avoids thundering herd on status endpoints.
  • DLQ + attempt history instead of silent retry exhaustionTransient failures get retries; permanent failures land in DLQ with full context for debugging.
  • Versioned schema/rules with rerun + compareReproducible imports; quantify impact of rule changes before production rollout.

Failure Modes Handled

  • Worker crash mid-run — job retries from Redis; idempotent run clears prior rows and reprocesses
  • S3 timeout or credential rotation — retry with backoff; eventually DLQ with attempt history
  • Corrupt CSV or malformed rows — row-level validation errors captured; bounded context for debugging
  • Schema migration breaks existing runs — versioned schema; old runs stay on old version
  • SSE client disconnects — snapshot endpoint on reconnect; no state loss
  • Org isolation breach — RBAC on all dataset/run queries; invite-based membership

How to demo in 2 minutes

  1. Clone the repo and install deps (frontend/backend/worker)
  2. docker compose up -d (Postgres + Redis + MinIO)
  3. Run migrations + seed demo org/user (see README demo section)
  4. Start backend, worker, and frontend dev servers
  5. Create dataset → upload sample.csv → map columns → Start run
  6. Watch SSE progress → open Results → download cleaned CSV + errors CSV
  7. Publish a new schema version → rerun → Compare runs

Links