Building Data Validation Pipelines: ETL Quality at Scale

Catch data quality issues before they propagate. Schema validation, anomaly detection, and data governance patterns.

๐Ÿ“… Published: April 4, 2026 | โœ๏ธ Updated: April 4, 2026 | โฑ๏ธ 11 min read

The Data Quality Problem: Garbage In, Garbage Out

Your data pipeline ingests data from 20 sources. It transforms, combines, and loads into your data warehouse. Downstream, analytics break, ML models underperform, reports show wrong numbers.

Why? Data quality issues weren't caught upstream.

  • Missing fields: Source system stopped sending email addresses
  • Invalid values: Revenue = -$1M (data entry error)
  • Duplicates: Same user appeared 3x
  • Schema changes: New field added upstream, pipeline breaks
  • Anomalies: 100x spike in transactions (real or fake?)

Result: Bad decisions based on bad data. Re-processing takes days. Trust in the data pipeline erodes.

Why Data Quality Is Hard

Challenge 1: Multiple Sources

Data comes from databases, APIs, files, sensors. Each has different formats, schemas, update frequencies. Inconsistent field names, types, precision.

Challenge 2: Schema Evolution

Source systems change. New fields added. Existing fields renamed or removed. Your pipeline must adapt without breaking.

Challenge 3: Silent Failures

Data is silently corrupted. Missing value defaults to 0. Date is parsed wrong. Null becomes "null" (string). Pipeline completes "successfully" but data is garbage.

Pattern 1: Schema Validation

Define expected schema. Validate every record against it. Reject or quarantine invalid records.

# Define schema for user data schema = { "type": "object", "required": ["id", "email", "name"], "properties": { "id": {"type": "integer"}, "email": {"type": "string", "format": "email"}, "name": {"type": "string", "minLength": 1}, "age": {"type": "integer", "minimum": 0, "maximum": 150}, "created_at": {"type": "string", "format": "date-time"} } } # Validate record record = { "id": 123, "email": "john@example.com", "name": "John", "age": 30, "created_at": "2024-01-01T00:00:00Z" } if validate(record, schema): load_to_warehouse(record) else: quarantine_record(record, error="age field not in valid range")

Key validation rules:

Rule Validates Example
Required Fields No null/missing values email must be present
Type Checking Correct data type age must be integer
Range Validation Values within bounds age 0-150
Format Validation Correct format email is valid format
Referential Integrity Foreign keys valid user_id exists in users table

Pattern 2: Anomaly Detection

Some data quality issues aren't schema violationsโ€”they're anomalies. A sale of $1M is valid if it's a special order, but anomalous if typical order is $100.

Statistical Anomalies

# Track distribution of values def detect_anomaly(field_value, field_stats): mean = field_stats['mean'] stddev = field_stats['stddev'] # Z-score: how many standard deviations from mean z_score = (field_value - mean) / stddev # Flagg if > 3 standard deviations if abs(z_score) > 3: return True # anomaly return False # Example transaction_amounts = [100, 95, 110, 105, 1000000] # mean = 221020, stddev = 447213 # 1000000 has z-score > 3, flagged as anomaly

Time-Series Anomalies

Anomaly Type Detection Method Example
Spike Sudden increase from baseline Traffic jumps 10x in 1 hour
Drop Sudden decrease from baseline Revenue drops to 0 at midnight
Trend Shift Sustained change in trajectory Error rate trending up consistently
Seasonal Deviation Wrong pattern for time of day/week Heavy traffic on Sunday 3am

Pattern 3: Quality Metrics

Measure data quality. Track over time. Alert when degrading.

Metric What It Measures Calculation Alert Threshold
Completeness % fields with values rows with value / total rows < 95%
Validity % values matching schema valid rows / total rows < 98%
Uniqueness % unique values where expected unique IDs / total IDs < 99%
Timeliness Freshness of data now - max(timestamp) > 1 hour
# Calculate data quality score def calculate_quality_score(records): completeness = count_non_null(records) / count_total(records) validity = count_valid_schema(records) / count_total(records) uniqueness = count_unique_ids(records) / count_total(records) # Weighted average score = ( completeness * 0.4 + validity * 0.4 + uniqueness * 0.2 ) if score < 0.95: alert("DATA QUALITY DEGRADED") return score

Pattern 4: Validation Pipeline Design

Structure your pipeline to catch quality issues early and handle them gracefully.

# Validation pipeline stages def etl_pipeline(source_data): # Stage 1: Extract and parse records = extract(source_data) schema_errors = [] # Stage 2: Schema validation valid_records = [] for record in records: if is_valid_schema(record): valid_records.append(record) else: schema_errors.append(record) # Stage 3: Anomaly detection anomalies = [] for record in valid_records: if is_anomaly(record): anomalies.append(record) valid_records.remove(record) # Stage 4: Referential integrity integrity_errors = [] for record in valid_records: if not check_foreign_keys(record): integrity_errors.append(record) valid_records.remove(record) # Stage 5: Load clean data load_to_warehouse(valid_records) # Stage 6: Log issues log_quality_report({ 'total': len(records), 'valid': len(valid_records), 'schema_errors': len(schema_errors), 'anomalies': len(anomalies), 'integrity_errors': len(integrity_errors) }) # Stage 7: Quarantine bad data quarantine(schema_errors + anomalies + integrity_errors)

Pattern 5: Data Remediation

Bad data happens. You need processes to detect, fix, and prevent.

Detection โ†’ Investigation โ†’ Remediation

Phase 1: Detection
Data quality metric drops below threshold
Alert: "Completeness dropped from 99% to 87%"

Phase 2: Investigation
What field? When did it start? Which source?
Root cause: Email field not sent by source API after schema update

Phase 3: Remediation
Option A: Fix upstream source
Option B: Reload from backup
Option C: Fill with default value
Option D: Exclude affected period

Phase 4: Prevention
Add stricter validation
Add alerting to source system
Add data quality SLA to contract

Complete Data Validation Architecture

Production-Ready Data Quality Checklist:
โœ“ Schema validation on all incoming data
โœ“ Anomaly detection with statistical methods
โœ“ Quality metrics tracked and alerted
โœ“ Quarantine process for bad data
โœ“ Referential integrity checks
โœ“ Data lineage tracking
โœ“ Root cause analysis process
โœ“ Remediation procedures documented

Key Takeaways

Data quality issues compound. Catch them early, close to the source.

โœ“ Validate schemas on all incoming data
โœ“ Detect anomalies with statistical methods
โœ“ Track quality metrics continuously
โœ“ Quarantine bad data, don't delete
โœ“ Build remediation processes upfront

Building Data Quality Pipelines?

We've designed data validation systems for enterprises processing petabytes daily. Let's discuss your data quality strategy.

Get Free Data Quality Assessment

Related Posts from Our Blog