Architecture Deep Dive
Airflow is a workflow orchestration platform — not an ETL tool, not a data pipeline tool. It schedules and monitors arbitrary directed acyclic graphs of tasks. Understanding its internal mechanics is non-negotiable for production reliability.
~/dags/
DagBag parse
Postgres / MySQL
Celery / K8s
run tasks
Component Breakdown
Continuously scans DAG files. Evaluates schedule intervals, triggers DAG runs, creates TaskInstances, and queues them to the executor. The heartbeat loop runs every ~5s. In Airflow 2+, multiple schedulers can run simultaneously (HA mode) using row-level DB locking.
Determines how tasks run. LocalExecutor (subprocess), CeleryExecutor (distributed queue via Redis/RabbitMQ), KubernetesExecutor (pod-per-task), CeleryKubernetesExecutor (hybrid). Never a bottleneck itself — tasks run in workers, not the scheduler.
All state lives here. DAG runs, task instances, connections, variables, XCOM values, logs (optionally). Use Postgres 13+ in production. Never use SQLite beyond local dev. Connection pool sizing is critical — each scheduler + webserver + worker has its own pool.
Runs asyncio event loop for deferred tasks. Enables sensors and operators to yield control back to the executor while waiting (I/O bound waits), dramatically reducing worker slot consumption. Critical for any workflow with polling sensors.
The DagBag Parse Cycle — Critical to Understand
The scheduler parses every file in dags_folder every dag_dir_list_interval seconds (default 300s). Each parse is a full Python import of the module. This is why top-level code in DAG files is catastrophic — it runs on every parse, not just on execution.
# ✗ BAD: top-level import of a slow library — runs on EVERY parse import tensorflow as tf # 3s import. Scheduler grinds to halt. records = db.fetch_all() # ✗ live DB call on every parse! config = requests.get('http://config-svc').json() # ✗ network call! # ✓ GOOD: defer everything into callables from airflow.decorators import dag, task from airflow.models import Variable from datetime import datetime import pendulum @dag( dag_id='safe_dag', schedule='@daily', start_date=pendulum.datetime(2024, 1, 1, tz='UTC'), catchup=False, tags=['production'], default_args={ 'owner': 'data-eng', 'retries': 3, 'retry_delay': pendulum.duration(minutes=5), 'retry_exponential_backoff': True, 'max_retry_delay': pendulum.duration(hours=2), } ) def safe_dag(): @task def load_config(): # ✓ Variable.get uses cache — still prefer Secrets backend config_json = Variable.get('pipeline_config', deserialize_json=True) return config_json @task def fetch_records(config: dict): # ✓ DB connection is created inside the task — at execution time from airflow.providers.postgres.hooks.postgres import PostgresHook hook = PostgresHook(postgres_conn_id='prod_postgres') return hook.get_records("SELECT * FROM events LIMIT 1000") cfg = load_config() fetch_records(cfg) safe_dag_instance = safe_dag()
Executor Comparison Matrix
| Executor | Concurrency | Isolation | Overhead | Best For |
|---|---|---|---|---|
| SequentialExecutor | 1 task at a time | None | Zero | Dev/debug only |
| LocalExecutor | N subprocesses | Process | Low | Small teams, single node |
| CeleryExecutor | Unlimited workers | Process | Medium | Production, horizontal scale |
| KubernetesExecutor | Unlimited pods | Container | High (pod start) | True isolation, variable workloads |
| CeleryK8sExecutor | Both modes | Mixed | Medium-High | Hybrid: fast small + isolated large |
DAG Design Mastery
Dynamic DAG Generation — The Right Way
Dynamic DAGs are one of Airflow's most powerful features and also the most commonly abused. The key insight: generate structure at parse time, not execution time.
""" Pattern 1: Config-driven DAG factory Generate N DAGs from a single config — one DAG per tenant/environment. """ from airflow.decorators import dag, task from airflow.operators.empty import EmptyOperator import pendulum import yaml from pathlib import Path # ✓ Read config at parse time — fast file I/O is acceptable CONFIG_PATH = Path("/opt/airflow/config/pipelines.yaml") PIPELINE_CONFIGS: list[dict] = yaml.safe_load(CONFIG_PATH.read_text()) for pipeline_cfg in PIPELINE_CONFIGS: tenant = pipeline_cfg['tenant'] schedule = pipeline_cfg['schedule'] tables = pipeline_cfg['tables'] # list of tables to process sla_minutes = pipeline_cfg.get('sla_minutes', 60) @dag( dag_id=f"pipeline_{tenant}", schedule=schedule, start_date=pendulum.datetime(2024, 1, 1, tz='UTC'), catchup=False, tags=['auto-generated', tenant], default_args={ 'owner': pipeline_cfg.get('owner', 'platform'), 'retries': 2, 'sla': pendulum.duration(minutes=sla_minutes), } ) def tenant_pipeline(cfg=pipeline_cfg): start = EmptyOperator(task_id='start') end = EmptyOperator(task_id='end', trigger_rule='none_failed_min_one_success') @task(task_id='validate_source') def validate_source(): # runs validation query against source system pass table_tasks = [] for table in cfg['tables']: @task(task_id=f"process_{table}", retries=3) def process_table(table_name=table): from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook hook = BigQueryHook(gcp_conn_id='gcp_prod') # Execute table-specific transformation hook.run_query(f""" INSERT INTO `prod.{cfg['tenant']}.{table_name}` SELECT * FROM `staging.{table_name}` WHERE _partitiontime = '{{ ds }}' """) table_tasks.append(process_table()) start >> validate_source() >> table_tasks >> end # ✓ Inject into globals — this is the canonical way to expose dynamic DAGs globals()[f"pipeline_{tenant}"] = tenant_pipeline()
""" Pattern 2: TaskFlow API with branching, XCom, and dynamic task mapping. Dynamic task mapping (Airflow 2.3+) is a game-changer — fan-out without knowing N at parse time. """ from airflow.decorators import dag, task, task_group from airflow.operators.python import BranchPythonOperator from airflow.utils.trigger_rule import TriggerRule import pendulum @dag( dag_id='advanced_taskflow', schedule='0 4 * * *', start_date=pendulum.datetime(2024, 1, 1, tz='UTC'), catchup=False, max_active_tasks=16, # per-DAG concurrency cap max_active_runs=1, # prevent overlapping runs ) def advanced_pipeline(): @task def get_work_items() -> list[dict]: """Returns variable-length list — dynamic map target""" from airflow.providers.postgres.hooks.postgres import PostgresHook hook = PostgresHook('prod') rows = hook.get_records("SELECT id, payload FROM work_queue WHERE status='pending'") return [{'id': r[0], 'payload': r[1]} for r in rows] # ✓ Dynamic task mapping — creates N parallel tasks at runtime @task( max_active_tis_per_dag=8, # throttle parallelism retries=2, ) def process_item(item: dict) -> dict: """Runs once per item in work_items list""" result = heavy_computation(item['payload']) return {'id': item['id'], 'status': 'ok', 'result': result} @task(trigger_rule=TriggerRule.ALL_DONE) # run even if some mapped tasks fail def aggregate_results(results: list[dict]) -> dict: ok = [r for r in results if r and r.get('status') == 'ok'] return {'total': len(results), 'ok': len(ok), 'failed': len(results) - len(ok)} @task_group(group_id='notifications') def notify(summary: dict): @task def send_slack(s=summary): from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator if s['failed'] > 0: msg = f"⚠️ Pipeline complete: {s['ok']}/{s['total']} ok, {s['failed']} failed" else: msg = f"✅ Pipeline complete: all {s['total']} items processed" SlackWebhookOperator(task_id='_inner', slack_webhook_conn_id='slack_ops', message=msg).execute({}) @task def update_dashboard(s=summary): # POST metrics to internal observability service import httpx httpx.post('https://metrics.internal/ingest', json=s, timeout=10) send_slack() update_dashboard() items = get_work_items() # .expand() is the magic — one call generates N mapped task instances processed = process_item.expand(item=items) summary = aggregate_results(processed) notify(summary) advanced_pipeline()
Trigger Rules — The Full Picture
| Trigger Rule | When Task Runs | Common Use Case |
|---|---|---|
| ALL_SUCCESS | All upstreams succeeded | Default — normal pipeline steps |
| ALL_FAILED | All upstreams failed | Failure-path cleanup tasks |
| ALL_DONE | All upstreams finished (any state) | Always-run summaries, notifications |
| ONE_SUCCESS | At least one upstream succeeded | Fan-in where partial success is OK |
| ONE_FAILED | At least one upstream failed | Early alerting before all complete |
| NONE_FAILED | No upstreams failed (skips allowed) | Branching — tasks after a branch |
| NONE_SKIPPED | No upstreams were skipped | Strict flows with no optional paths |
| NONE_FAILED_MIN_ONE_SUCCESS | No failures + at least one success | Join after optional parallel tasks |
BranchPythonOperator, all tasks NOT on the chosen branch receive a skipped state. Any downstream join task must use NONE_FAILED_MIN_ONE_SUCCESS or NONE_FAILED — otherwise the join task itself gets skipped too.
Building Production-Grade Operators
When built-in operators don't fit, write your own. A well-designed custom operator encapsulates retry logic, connection management, observability hooks, and clean error handling. Here's the full pattern.
from __future__ import annotations import time import logging from typing import Any, Sequence from airflow.models.baseoperator import BaseOperator from airflow.utils.context import Context from airflow.exceptions import AirflowException, AirflowSkipException log = logging.getLogger(__name__) class RobustAPIOperator(BaseOperator): """ Production-grade API operator with: - Exponential backoff with jitter - Circuit breaker pattern - OpenTelemetry span injection - Response schema validation - Graceful degradation to cache """ # Tells Airflow which template fields to render with Jinja template_fields: Sequence[str] = ('endpoint', 'payload', 'headers') template_fields_renderers = {'payload': 'json', 'headers': 'json'} ui_color = '#1c7a4a' # custom color in Airflow UI graph view ui_fgcolor = '#ffffff' def __init__( self, *, conn_id: str, endpoint: str, method: str = 'GET', payload: dict | None = None, headers: dict | None = None, response_check: callable | None = None, max_retries: int = 3, backoff_factor: float = 2.0, timeout: int = 30, do_xcom_push: bool = True, skip_on_empty: bool = False, **kwargs, ): super().__init__(**kwargs) self.conn_id = conn_id self.endpoint = endpoint self.method = method.upper() self.payload = payload or {} self.headers = headers or {} self.response_check = response_check self.max_retries = max_retries self.backoff_factor = backoff_factor self.timeout = timeout self.do_xcom_push = do_xcom_push self.skip_on_empty = skip_on_empty def execute(self, context: Context) -> Any: from airflow.hooks.http_hook import HttpHook import random hook = HttpHook(method=self.method, http_conn_id=self.conn_id) attempt = 0 last_exc = None while attempt <= self.max_retries: try: log.info("[%s] Attempt %d/%d → %s %s", self.task_id, attempt + 1, self.max_retries + 1, self.method, self.endpoint) response = hook.run( endpoint=self.endpoint, data=self.payload, headers=self.headers, extra_options={'timeout': self.timeout, 'verify': True} ) response.raise_for_status() result = response.json() # Optional: caller-provided validation function if self.response_check and not self.response_check(result): raise AirflowException(f"response_check() failed on: {result}") if self.skip_on_empty and not result: log.info("Empty response with skip_on_empty=True — skipping") raise AirflowSkipException("Empty response") # Push to XCom for downstream tasks context['ti'].xcom_push(key='response', value=result) log.info("[%s] Success on attempt %d", self.task_id, attempt + 1) return result except AirflowSkipException: raise except Exception as exc: last_exc = exc attempt += 1 if attempt > self.max_retries: break # Exponential backoff with full jitter sleep_time = self.backoff_factor ** attempt + random.uniform(0, 1) log.warning("[%s] Attempt %d failed (%s). Retrying in %.1fs", self.task_id, attempt, exc, sleep_time) time.sleep(sleep_time) raise AirflowException( f"All {self.max_retries + 1} attempts failed. Last error: {last_exc}" ) from last_exc def on_kill(self): """Called when task is externally killed — clean up resources here.""" log.warning("[%s] Task killed — performing cleanup", self.task_id) # Cancel any in-flight requests, release connections, etc.
""" Deferrable Sensor — uses Airflow Triggerer for async polling. Does NOT occupy a worker slot while waiting. Scales to thousands of concurrent sensors with minimal resource usage. """ from __future__ import annotations import asyncio from datetime import timedelta from typing import Any, AsyncIterator from airflow.triggers.base import BaseTrigger, TriggerEvent from airflow.sensors.base import BaseSensorOperator from airflow.utils.context import Context class S3KeyTrigger(BaseTrigger): """Async trigger that polls S3 for a key — runs in Triggerer process.""" def __init__(self, bucket: str, key: str, poll_interval: float = 30.0): super().__init__() self.bucket = bucket self.key = key self.poll_interval = poll_interval def serialize(self) -> tuple[str, dict]: # Must be serializable — stored in DB while trigger awaits return ( "my_package.triggers.S3KeyTrigger", {"bucket": self.bucket, "key": self.key, "poll_interval": self.poll_interval} ) async def run(self) -> AsyncIterator[TriggerEvent]: import aiobotocore.session as aioboto session = aioboto.get_session() async with session.create_client('s3') as s3: while True: try: await s3.head_object(Bucket=self.bucket, Key=self.key) yield TriggerEvent({"status": "found", "key": self.key}) return except s3.exceptions.ClientError: await asyncio.sleep(self.poll_interval) # yields control to event loop! class DeferrableS3Sensor(BaseSensorOperator): def __init__(self, *, bucket: str, key: str, poll_interval: float = 30.0, **kwargs): super().__init__(**kwargs) self.bucket = bucket self.key = key self.poll_interval = poll_interval def execute(self, context: Context): # Immediately defer — do NOT poll in execute() self.defer( trigger=S3KeyTrigger(self.bucket, self.key, self.poll_interval), method_name="execute_complete", timeout=timedelta(hours=6), ) def execute_complete(self, context: Context, event: dict) -> str: # Called by Triggerer when TriggerEvent fires — resumes in a worker slot if event["status"] != "found": raise AirflowException(f"Unexpected trigger event: {event}") self.log.info("S3 key found: s3://%s/%s", self.bucket, event["key"]) return event["key"]
Monitoring, Alerting & SLAs
Callback-Based Alerting System
""" Centralized callback system — attach to default_args to alert on every DAG failure, retry, SLA miss, and task success event. """ from __future__ import annotations import json import logging from typing import TYPE_CHECKING from airflow.models import TaskInstance, DagRun from airflow.utils.state import State if TYPE_CHECKING: from airflow.utils.context import Context log = logging.getLogger(__name__) def _build_slack_blocks(title: str, color: str, fields: list[dict]) -> list[dict]: """Build Slack Block Kit message.""" return [ {"type": "header", "text": {"type": "plain_text", "text": title}}, {"type": "section", "fields": [ {"type": "mrkdwn", "text": f"*{f['label']}*\n{f['value']}"} for f in fields ]}, ] def on_failure_callback(context: Context) -> None: """Fires when a task fails. Sends structured Slack alert + Pagerduty if high priority.""" import httpx from airflow.models import Variable ti: TaskInstance = context['task_instance'] dag_run: DagRun = context['dag_run'] exception = context.get('exception') log_url = ti.log_url severity = ti.dag.tags and 'critical' in ti.dag.tags blocks = _build_slack_blocks( title=f"{'🔴 CRITICAL' if severity else '🟠 FAILURE'}: {ti.dag_id}.{ti.task_id}", color='danger', fields=[ {'label': 'DAG', 'value': ti.dag_id}, {'label': 'Task', 'value': ti.task_id}, {'label': 'Run ID', 'value': dag_run.run_id}, {'label': 'Try #', 'value': str(ti.try_number)}, {'label': 'Error', 'value': f"`{str(exception)[:200]}`"}, {'label': 'Logs', 'value': f"<{log_url}|View logs>"}, ] ) slack_token = Variable.get('slack_bot_token') slack_channel = Variable.get('slack_alerts_channel', '#data-alerts') httpx.post( 'https://slack.com/api/chat.postMessage', headers={'Authorization': f'Bearer {slack_token}'}, json={'channel': slack_channel, 'blocks': blocks}, timeout=10 ) if severity: _trigger_pagerduty(ti, exception, context) def on_retry_callback(context: Context) -> None: ti: TaskInstance = context['task_instance'] log.warning("RETRY: %s.%s attempt %d/%d — %s", ti.dag_id, ti.task_id, ti.try_number, ti.max_tries + 1, context.get('exception') ) # Lightweight — just log on retries, Slack only on final failure def on_sla_miss_callback( dag, task_list, blocking_task_list, slas, blocking_tis ) -> None: """SLA miss fires when a task hasn't completed by dag.sla relative to execution_date.""" import httpx from airflow.models import Variable sla_str = ", ".join([f"{s.dag_id}.{s.task_id}" for s in slas]) log.error("SLA MISS: %s", sla_str) # Post to ops channel — SLA misses are high priority def _trigger_pagerduty(ti: TaskInstance, exc, context: Context): import httpx from airflow.models import Variable pd_key = Variable.get('pagerduty_routing_key') httpx.post('https://events.pagerduty.com/v2/enqueue', json={ 'routing_key': pd_key, 'event_action': 'trigger', 'payload': { 'summary': f"Airflow CRITICAL: {ti.dag_id}.{ti.task_id} failed", 'severity': 'critical', 'source': 'airflow', 'custom_details': {'run_id': ti.run_id, 'error': str(exc)[:500]}, } }, timeout=10) # ── Usage in DAG default_args ──────────────────────────────────── DEFAULT_ARGS = { 'owner': 'data-engineering', 'retries': 3, 'on_failure_callback': on_failure_callback, 'on_retry_callback': on_retry_callback, 'sla': __import__('pendulum').duration(hours=2), }
StatsD / Prometheus Metrics
# airflow.cfg or env vars (AIRFLOW__METRICS__STATSD_*) [metrics] statsd_on = True statsd_host = statsd-exporter.monitoring.svc.cluster.local statsd_port = 9125 statsd_prefix = airflow statsd_allow_list = dag.,scheduler.,executor.,pool. # statsd_allow_list filters metrics sent — reduces cardinality [scheduler] # How often scheduler emits its own heartbeat metric scheduler_heartbeat_sec = 5 # Alert externally if this metric goes stale > # airflow_scheduler_heartbeat{} for 30+ seconds
groups: - name: airflow.critical rules: # Scheduler died or stalled - alert: AirflowSchedulerDead expr: time() - airflow_scheduler_heartbeat > 60 for: 2m labels: severity: critical annotations: summary: "Airflow scheduler heartbeat missing for {{ $value }}s" # Zombie tasks accumulating - alert: AirflowZombieTasksHigh expr: airflow_zombies_killed_total > 5 for: 10m labels: severity: warning annotations: summary: "Zombie tasks detected: {{ $value }}" # Task queue depth too high (workers overwhelmed) - alert: AirflowTaskQueueDepth expr: airflow_executor_queued_tasks > 200 for: 5m labels: severity: warning # DAG import errors present - alert: AirflowDagImportErrors expr: airflow_dag_processing_import_errors > 0 for: 1m labels: severity: critical annotations: summary: "{{ $value }} DAG(s) failing to parse" # DB connection pool saturation - alert: AirflowDBPoolSaturation expr: airflow_pool_starving_tasks > 0 for: 5m labels: severity: warning
Debugging & Incident Response
CLI Combat Reference
# ── DAG STATE ──────────────────────────────────────────────────── # List all active DAG runs airflow dags list-runs --dag-id my_dag --state running # Check which tasks are in-flight airflow tasks states-for-dag-run my_dag scheduled__2024-01-15T04:00:00+00:00 # Find all zombie task instances (running in DB, not in executor) airflow tasks list --dag-id my_dag --tree # ── MANUALLY CONTROL TASKS ─────────────────────────────────────── # Trigger a single task for a specific execution date (re-run) airflow tasks run my_dag my_task 2024-01-15T04:00:00 --local --ignore-all-dependencies # Clear failed tasks and re-queue them airflow tasks clear my_dag --start-date 2024-01-15 --end-date 2024-01-16 --yes # Clear just one task across a date range airflow tasks clear my_dag -t my_failing_task --start-date 2024-01-01 --yes # Mark a task as success without running it airflow tasks mark-success my_dag my_task 2024-01-15T04:00:00 # ── DAG RUNS ───────────────────────────────────────────────────── # Backfill specific date range (spawns new dag runs) airflow dags backfill my_dag --start-date 2024-01-01 --end-date 2024-01-07 --reset-dagruns # Trigger a manual run with config airflow dags trigger my_dag --conf '{"env": "prod", "batch_size": 500}' # Pause/unpause a DAG airflow dags pause my_dag airflow dags unpause my_dag # ── VARIABLES & CONNECTIONS ────────────────────────────────────── airflow variables get pipeline_config airflow variables set pipeline_config '{"batch_size": 1000}' --json airflow connections get prod_postgres airflow connections test prod_postgres # ← actually tests the connection! # ── SCHEDULER HEALTH ───────────────────────────────────────────── airflow jobs check --job-type SchedulerJob --limit 1 airflow scheduler --num-runs 1 --do-pickle # run scheduler for exactly 1 loop (debug) # ── DAG PARSE TIME PROFILING ───────────────────────────────────── python -c " import time from airflow.models import DagBag start = time.time() bag = DagBag(dag_folder='/opt/airflow/dags', include_examples=False) elapsed = time.time() - start print(f'Parse time: {elapsed:.2f}s') print(f'Import errors: {bag.import_errors}') for dag_id, dag in bag.dags.items(): print(f' {dag_id}: {len(dag.tasks)} tasks') "
DB-Level Debugging — Direct SQL Queries
-- ── FIND STUCK/ZOMBIE TASKS ────────────────────────────────── SELECT ti.dag_id, ti.task_id, ti.run_id, ti.state, ti.start_date, ti.end_date, EXTRACT(EPOCH FROM (NOW() - ti.start_date)) / 60 AS running_minutes, ti.hostname, ti.pid FROM task_instance ti WHERE ti.state = 'running' AND ti.start_date < NOW() - INTERVAL '2 hours' ORDER BY running_minutes DESC; -- ── TASK SUCCESS RATE PER DAG (last 7 days) ────────────────── SELECT dag_id, COUNT(*) AS total_runs, SUM(CASE WHEN state = 'success' THEN 1 ELSE 0 END) AS successes, ROUND( 100.0 * SUM(CASE WHEN state = 'success' THEN 1 ELSE 0 END) / COUNT(*), 2 ) AS success_rate_pct, AVG(EXTRACT(EPOCH FROM (end_date - start_date))) AS avg_duration_secs FROM task_instance WHERE start_date > NOW() - INTERVAL '7 days' GROUP BY dag_id ORDER BY success_rate_pct ASC; -- ── XCOM SIZE AUDIT (large XComs kill performance) ─────────── SELECT dag_id, task_id, run_id, key, pg_size_pretty(LENGTH(value::text)::bigint) AS xcom_size, LENGTH(value::text) AS raw_bytes FROM xcom WHERE LENGTH(value::text) > 10000 -- anything > 10KB is suspicious ORDER BY raw_bytes DESC LIMIT 20; -- ── SCHEDULER LAG: time between execution_date and actual start ─ SELECT dag_id, AVG(EXTRACT(EPOCH FROM (start_date - data_interval_end))) AS avg_lag_secs, MAX(EXTRACT(EPOCH FROM (start_date - data_interval_end))) AS max_lag_secs FROM dag_run WHERE start_date > NOW() - INTERVAL '24 hours' AND run_type = 'scheduled' GROUP BY dag_id HAVING AVG(EXTRACT(EPOCH FROM (start_date - data_interval_end))) > 300 ORDER BY avg_lag_secs DESC;
Python Debugging Toolkit — Run Locally
""" Local DAG inspector — run this WITHOUT a running Airflow instance. Point AIRFLOW__DATABASE__SQL_ALCHEMY_CONN at your prod DB (read-only replica!) to inspect live state from your laptop. """ import os from airflow.models import DagBag, TaskInstance, DagRun from airflow.utils.session import create_session from sqlalchemy import func os.environ['AIRFLOW__DATABASE__SQL_ALCHEMY_CONN'] = ( 'postgresql+psycopg2://readonly_user:pass@prod-db:5432/airflow' ) def inspect_dag_health(dag_id: str, lookback_hours: int = 24): """Full health report for a specific DAG.""" import pendulum cutoff = pendulum.now('UTC').subtract(hours=lookback_hours) with create_session() as session: tis = ( session.query(TaskInstance) .filter( TaskInstance.dag_id == dag_id, TaskInstance.start_date >= cutoff ) .all() ) state_counts = {} durations = [] for ti in tis: state_counts[ti.state] = state_counts.get(ti.state, 0) + 1 if ti.duration: durations.append(ti.duration) print(f"\n{'='*60}") print(f"DAG Health Report: {dag_id}") print(f"Lookback: {lookback_hours}h | Total TIs: {len(tis)}") print(f"State breakdown: {state_counts}") if durations: durations.sort() print(f"Duration p50={durations[len(durations)//2]:.1f}s " f"p95={durations[int(len(durations)*0.95)]:.1f}s " f"max={max(durations):.1f}s") if state_counts.get('failed', 0) > 0: print(f"⚠ {state_counts['failed']} failures in last {lookback_hours}h!") def find_slow_tasks(percentile: float = 0.95, min_duration: int = 300) -> None: """Find tasks that consistently take longer than usual — potential bottlenecks.""" with create_session() as session: results = session.execute(""" SELECT dag_id, task_id, COUNT(*) as runs, PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY duration) AS p50, PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY duration) AS p95, MAX(duration) AS max_dur FROM task_instance WHERE state = 'success' AND start_date > NOW() - INTERVAL '7 days' AND duration IS NOT NULL GROUP BY dag_id, task_id HAVING PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY duration) > :min_dur ORDER BY p95 DESC LIMIT 20 """, {"min_dur": min_duration}) print(f"\n{'DAG.TASK':<50} {'runs':>6} {'p50':>8} {'p95':>8} {'max':>8}") for r in results: key = f"{r.dag_id}.{r.task_id}" print(f"{key:<50} {r.runs:>6} {r.p50:>7.0f}s {r.p95:>7.0f}s {r.max_dur:>7.0f}s") if __name__ == '__main__': inspect_dag_health('my_critical_dag', lookback_hours=48) find_slow_tasks(min_duration=300)
Common Failure Patterns & Fixes
Worker process killed (OOM, node eviction) while task was running. DB says running, process is dead. Fix: airflow tasks clear or increase worker_refresh_interval and zombie_detection_interval in scheduler config.
Non-deferrable sensors consume worker slots while sleeping. With 32 workers and 30 slow sensors, nothing else runs. Fix: migrate sensors to deferrable pattern, or use a dedicated sensor pool with limited slots.
A Python syntax error or bad import causes the DAG to disappear from the UI without obvious warning. Check DAG Import Errors in the UI, or run airflow dags list-import-errors. Set up the Prometheus alert above.
XCom is stored in the metastore DB. Passing dataframes or large JSONs through XCom causes DB bloat, slow queries, and scheduler degradation. Fix: use S3/GCS as an intermediary and push only the path via XCom.
Performance & Concurrency Tuning
# ── SCHEDULER ──────────────────────────────────────────────────── [scheduler] max_threads = 4 # parser threads; 2-4 for most setups dag_dir_list_interval = 30 # scan dags/ every 30s (reduce from 300) min_file_process_interval = 30 # min time between parsing same file file_parsing_sort_mode = modified_time # prioritise recently changed files max_dagruns_to_create_per_loop = 10 max_tis_per_query = 512 # batch size for TI state fetching use_row_level_locking = True # required for HA scheduler (2+ schedulers) zombie_detection_interval = 60 # how often to check for zombies zombie_threshold = 300 # task running > 5m with no heartbeat = zombie # ── DATABASE ────────────────────────────────────────────────────── [database] sql_alchemy_pool_size = 5 # connections per process sql_alchemy_max_overflow = 10 # burst connections sql_alchemy_pool_recycle = 1800 # recycle connections every 30min sql_alchemy_pool_pre_ping = True # test connection before use (avoids stale conn) # ── CELERY EXECUTOR ────────────────────────────────────────────── [celery] worker_concurrency = 16 # tasks per worker process worker_prefetch_multiplier = 1 # CRITICAL: set to 1 for fair task distribution task_acks_late = True # task acked after completion, not on pickup task_track_started = True worker_enable_remote_control = False # security: disable remote kill in prod operation_timeout = 1.0 celery_app_name = airflow.executors.celery_executor # ── CORE ───────────────────────────────────────────────────────── [core] parallelism = 128 # global max concurrent tasks max_active_tasks_per_dag = 64 # per-dag concurrency cap max_active_runs_per_dag = 5 # prevent catchup storms dagbag_import_timeout = 30 # kill slow-parsing DAG files dag_file_processor_timeout = 50 # slightly higher than import_timeout killed_task_cleanup_time = 60 default_task_execution_timeout = 3600 # 1hr global task timeout # ── WEBSERVER ──────────────────────────────────────────────────── [webserver] workers = 4 # gunicorn workers worker_timeout = 120 web_server_worker_timeout = 120
Pool Strategy — Slot-Based Throttling
""" Airflow Pools are the primary mechanism to control concurrency at a resource level — not just task level. Use pools to: - Limit concurrent API calls to an external service - Cap DB connections from tasks - Separate workloads (sensors vs compute vs IO) - Reserve slots for critical DAGs """ from airflow.models.pool import Pool from airflow.utils.session import create_session POOLS = [ # Default pool — never set to unlimited in production {'name': 'default_pool', 'slots': 128, 'description': 'General compute tasks'}, # Sensor pool — prevents sensor deadlock {'name': 'sensor_pool', 'slots': 32, 'description': 'Polling sensors (use deferrable!)'}, # External API limits {'name': 'salesforce_api', 'slots': 5, 'description': 'Salesforce API rate limit: 5 concurrent'}, {'name': 'bigquery_pool', 'slots': 50, 'description': 'BigQuery concurrent query limit'}, # Heavy compute jobs — prevent resource contention {'name': 'spark_jobs', 'slots': 8, 'description': 'Spark cluster: max concurrent submissions'}, # Priority pool — reserved for SLA-bound DAGs {'name': 'priority_pool', 'slots': 16, 'description': 'Reserved for critical SLA DAGs'}, ] def upsert_pools(): with create_session() as session: for p in POOLS: existing = session.query(Pool).filter(Pool.pool == p['name']).first() if existing: existing.slots = p['slots'] existing.description = p['description'] else: session.add(Pool(pool=p['name'], slots=p['slots'], description=p['description'])) print(f"Pool '{p['name']}': {p['slots']} slots") # Usage in tasks: # @task(pool='salesforce_api', pool_slots=1, priority_weight=10) # def call_salesforce(): ... # Or in operators: SomeOperator(pool='spark_jobs', pool_slots=2, ...)
XCom Backend — S3/GCS for Large Payloads
""" Custom XCom Backend that stores large values in S3 and small ones in DB. Configure: AIRFLOW__CORE__XCOM_BACKEND=my_package.xcom_backends.s3_xcom.S3XComBackend """ import json, pickle from typing import Any from airflow.models.xcom import BaseXCom from airflow.utils.session import provide_session XCOM_S3_BUCKET = 'my-airflow-xcom' SIZE_THRESHOLD_BYTES = 48_000 # store in S3 if > 48KB S3_PREFIX = 's3://' class S3XComBackend(BaseXCom): @staticmethod def serialize_value( value: Any, *, key: str = None, task_id: str = None, dag_id: str = None, run_id: str = None, map_index: int = None, ) -> bytes: import boto3 serialized = pickle.dumps(value) if len(serialized) < SIZE_THRESHOLD_BYTES: # Small value — store inline in DB as normal return BaseXCom.serialize_value(value) # Large value — upload to S3, store reference in DB s3_key = f"xcom/{dag_id}/{run_id}/{task_id}/{key}.pkl" s3 = boto3.client('s3') s3.put_object( Bucket=XCOM_S3_BUCKET, Key=s3_key, Body=serialized, ServerSideEncryption='aws:kms', ) reference = {'__xcom_s3_key__': s3_key, 'bucket': XCOM_S3_BUCKET} return json.dumps(reference).encode() @staticmethod def deserialize_value(result: "S3XComBackend") -> Any: import boto3 raw = result.value try: ref = json.loads(raw) if '__xcom_s3_key__' in ref: s3 = boto3.client('s3') body = s3.get_object(Bucket=ref['bucket'], Key=ref['__xcom_s3_key__'])['Body'] return pickle.loads(body.read()) except (json.JSONDecodeError, KeyError): pass return BaseXCom.deserialize_value(result)
Production Hardening
Secrets Management — Never Use Variables for Secrets
# airflow.cfg: # [secrets] # backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend # backend_kwargs = {"connections_prefix": "airflow/connections", "variables_prefix": "airflow/variables"} # # Airflow will auto-resolve connections stored as: # aws secret: airflow/connections/prod_postgres # value: postgresql+psycopg2://user:pass@host:5432/db # # And variables as: # aws secret: airflow/variables/pipeline_config # value: {"batch_size": 1000} # In code — completely transparent. No code changes needed. from airflow.models import Variable from airflow.providers.postgres.hooks.postgres import PostgresHook # Variable.get resolves from Secrets Manager automatically config = Variable.get('pipeline_config', deserialize_json=True) # PostgresHook resolves connection from Secrets Manager automatically hook = PostgresHook(postgres_conn_id='prod_postgres') # Resolution order: env var → secrets backend → metastore DB → exception # Set AIRFLOW_CONN_PROD_POSTGRES=... in env for local dev override
Zero-Downtime DAG Deployments
#!/bin/bash # Safe DAG deployment strategy: # 1. Validate all DAGs parse clean # 2. Sync to DAG store # 3. Wait for scheduler to pick up # 4. Verify no import errors post-deploy set -euo pipefail DAG_BUCKET="s3://my-airflow-dags/dags" LOCAL_DAG_DIR="./dags" AIRFLOW_HOST="https://airflow.internal" echo "=== Pre-deploy validation ===" # 1. Validate all DAGs parse without errors python -c " import sys from airflow.models import DagBag bag = DagBag(dag_folder='$LOCAL_DAG_DIR', include_examples=False) if bag.import_errors: print('PARSE ERRORS DETECTED:') for fname, err in bag.import_errors.items(): print(f' {fname}: {err}') sys.exit(1) print(f'OK: {len(bag.dags)} DAGs validated') " echo "=== Syncing DAGs to S3 ===" # 2. Sync with delete — remove orphaned files aws s3 sync "$LOCAL_DAG_DIR" "$DAG_BUCKET" \ --exclude "*.pyc" \ --exclude "__pycache__/*" \ --exclude "*.DS_Store" \ --delete \ --exact-timestamps echo "=== Waiting for scheduler to pick up changes ===" sleep 35 # Wait > dag_dir_list_interval (30s) echo "=== Post-deploy verification ===" # 3. Check for import errors via API IMPORT_ERRORS=$(curl -sf \ -u "${AIRFLOW_USER}:${AIRFLOW_PASS}" \ "$AIRFLOW_HOST/api/v1/importErrors" \ | python3 -c "import sys,json; d=json.load(sys.stdin); print(d['total_entries'])") if [ "$IMPORT_ERRORS" -gt "0" ]; then echo "ALERT: $IMPORT_ERRORS import errors detected post-deploy!" curl -sf -u "${AIRFLOW_USER}:${AIRFLOW_PASS}" \ "$AIRFLOW_HOST/api/v1/importErrors" | python3 -m json.tool exit 1 fi echo "Deploy complete. $IMPORT_ERRORS import errors. All DAGs healthy."
Kubernetes Executor — Pod Template
apiVersion: v1 kind: Pod metadata: labels: app: airflow-worker spec: restartPolicy: Never serviceAccountName: airflow-worker securityContext: runAsNonRoot: true runAsUser: 50000 containers: - name: base image: my-registry/airflow:2.9.0-prod imagePullPolicy: Always resources: requests: cpu: "500m" memory: "1Gi" limits: cpu: "2" memory: "4Gi" env: - name: AIRFLOW__CORE__EXECUTOR value: KubernetesExecutor - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN valueFrom: secretKeyRef: name: airflow-secrets key: sql-alchemy-conn volumeMounts: - name: dags mountPath: /opt/airflow/dags readOnly: true - name: logs mountPath: /opt/airflow/logs volumes: - name: dags persistentVolumeClaim: claimName: airflow-dags-efs - name: logs emptyDir: {} tolerations: - key: "airflow-worker" operator: "Exists" effect: "NoSchedule"
executor_config per-task to override pod resources for heavy tasks. A SparkSubmitOperator might need {"pod_override": {"spec": {"containers": [{"resources": {"requests": {"memory": "8Gi"}}}]}}} while most tasks run on 1Gi defaults.
Production Recipes
Idempotent Incremental Load Pattern
""" Gold standard incremental load: DELETE + INSERT within a transaction. Idempotent — can be re-run for any execution_date without side effects. """ from airflow.decorators import dag, task import pendulum @dag(schedule='@daily', start_date=pendulum.datetime(2024, 1, 1), catchup=False) def incremental_load(): @task def upsert_partition(**context): from airflow.providers.postgres.hooks.postgres import PostgresHook ds = context['ds'] # '2024-01-15' — the data interval date hook = PostgresHook('prod_postgres') conn = hook.get_conn() with conn.cursor() as cur: cur.execute("BEGIN") try: # Step 1: atomically delete the partition being replaced cur.execute("DELETE FROM fact_events WHERE event_date = %s", (ds,)) print(f"Deleted {cur.rowcount} rows for {ds}") # Step 2: insert fresh data cur.execute(""" INSERT INTO fact_events (event_date, user_id, event_type, revenue) SELECT event_date, user_id, event_type, SUM(amount) as revenue FROM raw_events WHERE event_date = %s AND _loaded_at >= NOW() - INTERVAL '2 days' -- recency guard GROUP BY event_date, user_id, event_type """, (ds,)) print(f"Inserted {cur.rowcount} rows for {ds}") cur.execute("COMMIT") except Exception as e: cur.execute("ROLLBACK") raise @task def validate_row_count(**context): from airflow.providers.postgres.hooks.postgres import PostgresHook ds = context['ds'] hook = PostgresHook('prod_postgres') count = hook.get_first( "SELECT COUNT(*) FROM fact_events WHERE event_date = %s", parameters=(ds,) )[0] print(f"Row count for {ds}: {count}") if count == 0: raise ValueError(f"Zero rows loaded for {ds} — pipeline may be broken") return count upsert_partition() >> validate_row_count() incremental_load()
Data Quality Gate Pattern
""" Reusable data quality gate using Great Expectations or custom rules. Halts the pipeline if quality checks fail — no silent bad data. """ from dataclasses import dataclass, field from typing import Callable from airflow.decorators import task @dataclass class QualityCheck: name: str query: str threshold: float comparison: str = '>=' # actual_value {comparison} threshold severity: str = 'error' # 'error' halts, 'warning' logs only DEFAULT_CHECKS = [ QualityCheck( name='null_rate_user_id', query="SELECT 1 - (COUNT(user_id)::float / NULLIF(COUNT(*),0)) FROM {table} WHERE event_date='{ds}'", threshold=0.01, comparison='<=', # null rate must be <= 1% ), QualityCheck( name='row_count_vs_yesterday', query=""" SELECT COUNT(*) FROM {table} WHERE event_date='{ds}' / NULLIF((SELECT COUNT(*) FROM {table} WHERE event_date='{ds}'::date - 1), 0) """, threshold=0.5, comparison='>=', # today must be >= 50% of yesterday severity='warning', ), ] def make_quality_gate_task(table: str, checks: list[QualityCheck] = DEFAULT_CHECKS): @task(task_id=f"quality_gate_{table}") def quality_gate(**context): from airflow.providers.postgres.hooks.postgres import PostgresHook ds = context['ds'] hook = PostgresHook('prod_postgres') failures, warnings = [], [] for check in checks: sql = check.query.format(table=table, ds=ds) result = hook.get_first(sql) actual = float(result[0]) if result else 0.0 ops = {'>=': lambda a,t: a >= t, '<=': lambda a,t: a <= t, '>': lambda a,t: a > t, '<': lambda a,t: a < t} passed = ops[check.comparison](actual, check.threshold) status = 'PASS' if passed else check.severity.upper() msg = f"[{status}] {check.name}: {actual:.4f} {check.comparison} {check.threshold}" print(msg) if not passed: (failures if check.severity == 'error' else warnings).append(msg) if warnings: print(f"WARNINGS ({len(warnings)}): " + "; ".join(warnings)) if failures: raise ValueError( f"Quality gate FAILED for {table} on {ds}:\n" + "\n".join(failures) ) return {'table': table, 'ds': ds, 'checks_passed': len(checks) - len(failures)} return quality_gate()
Production Checklist
- No top-level I/O in DAG files. All connections, API calls, and heavy imports inside task callables.
- catchup=False on all new DAGs. Set explicitly — never rely on global default.
- max_active_runs=1 by default. Override only when overlapping runs are safe and tested.
- on_failure_callback on every DAG. Silence is not acceptable in production.
- SLA set on time-critical DAGs. Use
slain default_args +on_sla_miss_callbackon the DAG. - Pools on every external resource interaction. APIs, databases, cloud services — all rate-limited.
- Deferrable sensors everywhere possible. Non-deferrable sensors in prod = eventual deadlock.
- XCom only for metadata / small values. Never pass DataFrames, large dicts, or binary data via XCom.
- Idempotent tasks. Every task must be safely re-runnable for any execution_date.
- Secrets in secrets backend, not Variables. Variables are visible to any Airflow user.
- Parse time < 5 seconds per file. Monitor with
airflow_dag_processing_last_durationmetric. - Postgres connection pool sized correctly. (schedulers × pool_size) + (webserver workers × pool_size) + (celery workers × pool_size) must not exceed Postgres max_connections.
- DAG parse errors alert in <1 minute. Silent parse failures are the #1 source of mysterious outages.
- Log remote storage configured. Never rely on local worker disk for task logs in production.