AF
AIRFLOW MASTERY
Production Field Guide — v2.x / v3.x
System Operational
Scheduler · Workers · Webserver · Triggerer
Python 3.11+ · Celery / K8s Executors
// 01 — Core Concepts

Architecture Deep Dive

Airflow is a workflow orchestration platform — not an ETL tool, not a data pipeline tool. It schedules and monitors arbitrary directed acyclic graphs of tasks. Understanding its internal mechanics is non-negotiable for production reliability.

DAG Files
~/dags/
Scheduler
DagBag parse
Metastore DB
Postgres / MySQL
Executor
Celery / K8s
Workers
run tasks

Component Breakdown

Scheduler
The Brain

Continuously scans DAG files. Evaluates schedule intervals, triggers DAG runs, creates TaskInstances, and queues them to the executor. The heartbeat loop runs every ~5s. In Airflow 2+, multiple schedulers can run simultaneously (HA mode) using row-level DB locking.

Executor
The Dispatcher

Determines how tasks run. LocalExecutor (subprocess), CeleryExecutor (distributed queue via Redis/RabbitMQ), KubernetesExecutor (pod-per-task), CeleryKubernetesExecutor (hybrid). Never a bottleneck itself — tasks run in workers, not the scheduler.

Metastore DB
The Source of Truth

All state lives here. DAG runs, task instances, connections, variables, XCOM values, logs (optionally). Use Postgres 13+ in production. Never use SQLite beyond local dev. Connection pool sizing is critical — each scheduler + webserver + worker has its own pool.

Triggerer
Async Deferral (2.2+)

Runs asyncio event loop for deferred tasks. Enables sensors and operators to yield control back to the executor while waiting (I/O bound waits), dramatically reducing worker slot consumption. Critical for any workflow with polling sensors.

The DagBag Parse Cycle — Critical to Understand

The scheduler parses every file in dags_folder every dag_dir_list_interval seconds (default 300s). Each parse is a full Python import of the module. This is why top-level code in DAG files is catastrophic — it runs on every parse, not just on execution.

NEVER do this at module level: database connections, API calls, file I/O, heavy imports, environment variable fetches that call external systems. The scheduler will call this code hundreds of times per hour across all its workers.
pythondag_parse_pitfalls.py
# ✗ BAD: top-level import of a slow library — runs on EVERY parse
import tensorflow as tf          # 3s import. Scheduler grinds to halt.
records = db.fetch_all()         # ✗ live DB call on every parse!
config = requests.get('http://config-svc').json()  # ✗ network call!

# ✓ GOOD: defer everything into callables
from airflow.decorators import dag, task
from airflow.models import Variable
from datetime import datetime
import pendulum

@dag(
    dag_id='safe_dag',
    schedule='@daily',
    start_date=pendulum.datetime(2024, 1, 1, tz='UTC'),
    catchup=False,
    tags=['production'],
    default_args={
        'owner': 'data-eng',
        'retries': 3,
        'retry_delay': pendulum.duration(minutes=5),
        'retry_exponential_backoff': True,
        'max_retry_delay': pendulum.duration(hours=2),
    }
)
def safe_dag():
    @task
    def load_config():
        # ✓ Variable.get uses cache — still prefer Secrets backend
        config_json = Variable.get('pipeline_config', deserialize_json=True)
        return config_json

    @task
    def fetch_records(config: dict):
        # ✓ DB connection is created inside the task — at execution time
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        hook = PostgresHook(postgres_conn_id='prod_postgres')
        return hook.get_records("SELECT * FROM events LIMIT 1000")

    cfg = load_config()
    fetch_records(cfg)

safe_dag_instance = safe_dag()

Executor Comparison Matrix

ExecutorConcurrencyIsolationOverheadBest For
SequentialExecutor1 task at a timeNoneZeroDev/debug only
LocalExecutorN subprocessesProcessLowSmall teams, single node
CeleryExecutorUnlimited workersProcessMediumProduction, horizontal scale
KubernetesExecutorUnlimited podsContainerHigh (pod start)True isolation, variable workloads
CeleryK8sExecutorBoth modesMixedMedium-HighHybrid: fast small + isolated large
// 02 — Patterns & Anti-Patterns

DAG Design Mastery

Dynamic DAG Generation — The Right Way

Dynamic DAGs are one of Airflow's most powerful features and also the most commonly abused. The key insight: generate structure at parse time, not execution time.

pythondynamic_dags.py
"""
Pattern 1: Config-driven DAG factory
Generate N DAGs from a single config — one DAG per tenant/environment.
"""
from airflow.decorators import dag, task
from airflow.operators.empty import EmptyOperator
import pendulum
import yaml
from pathlib import Path

# ✓ Read config at parse time — fast file I/O is acceptable
CONFIG_PATH = Path("/opt/airflow/config/pipelines.yaml")
PIPELINE_CONFIGS: list[dict] = yaml.safe_load(CONFIG_PATH.read_text())

for pipeline_cfg in PIPELINE_CONFIGS:
    tenant = pipeline_cfg['tenant']
    schedule = pipeline_cfg['schedule']
    tables = pipeline_cfg['tables']       # list of tables to process
    sla_minutes = pipeline_cfg.get('sla_minutes', 60)

    @dag(
        dag_id=f"pipeline_{tenant}",
        schedule=schedule,
        start_date=pendulum.datetime(2024, 1, 1, tz='UTC'),
        catchup=False,
        tags=['auto-generated', tenant],
        default_args={
            'owner': pipeline_cfg.get('owner', 'platform'),
            'retries': 2,
            'sla': pendulum.duration(minutes=sla_minutes),
        }
    )
    def tenant_pipeline(cfg=pipeline_cfg):
        start = EmptyOperator(task_id='start')
        end   = EmptyOperator(task_id='end', trigger_rule='none_failed_min_one_success')

        @task(task_id='validate_source')
        def validate_source():
            # runs validation query against source system
            pass

        table_tasks = []
        for table in cfg['tables']:
            @task(task_id=f"process_{table}", retries=3)
            def process_table(table_name=table):
                from airflow.providers.google.cloud.hooks.bigquery import BigQueryHook
                hook = BigQueryHook(gcp_conn_id='gcp_prod')
                # Execute table-specific transformation
                hook.run_query(f"""
                    INSERT INTO `prod.{cfg['tenant']}.{table_name}`
                    SELECT * FROM `staging.{table_name}`
                    WHERE _partitiontime = '{{ ds }}'
                """)
            table_tasks.append(process_table())

        start >> validate_source() >> table_tasks >> end

    # ✓ Inject into globals — this is the canonical way to expose dynamic DAGs
    globals()[f"pipeline_{tenant}"] = tenant_pipeline()
pythontaskflow_advanced.py
"""
Pattern 2: TaskFlow API with branching, XCom, and dynamic task mapping.
Dynamic task mapping (Airflow 2.3+) is a game-changer — fan-out
without knowing N at parse time.
"""
from airflow.decorators import dag, task, task_group
from airflow.operators.python import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
import pendulum

@dag(
    dag_id='advanced_taskflow',
    schedule='0 4 * * *',
    start_date=pendulum.datetime(2024, 1, 1, tz='UTC'),
    catchup=False,
    max_active_tasks=16,          # per-DAG concurrency cap
    max_active_runs=1,             # prevent overlapping runs
)
def advanced_pipeline():

    @task
    def get_work_items() -> list[dict]:
        """Returns variable-length list — dynamic map target"""
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        hook = PostgresHook('prod')
        rows = hook.get_records("SELECT id, payload FROM work_queue WHERE status='pending'")
        return [{'id': r[0], 'payload': r[1]} for r in rows]

    # ✓ Dynamic task mapping — creates N parallel tasks at runtime
    @task(
        max_active_tis_per_dag=8,    # throttle parallelism
        retries=2,
    )
    def process_item(item: dict) -> dict:
        """Runs once per item in work_items list"""
        result = heavy_computation(item['payload'])
        return {'id': item['id'], 'status': 'ok', 'result': result}

    @task(trigger_rule=TriggerRule.ALL_DONE)   # run even if some mapped tasks fail
    def aggregate_results(results: list[dict]) -> dict:
        ok = [r for r in results if r and r.get('status') == 'ok']
        return {'total': len(results), 'ok': len(ok), 'failed': len(results) - len(ok)}

    @task_group(group_id='notifications')
    def notify(summary: dict):
        @task
        def send_slack(s=summary):
            from airflow.providers.slack.operators.slack_webhook import SlackWebhookOperator
            if s['failed'] > 0:
                msg = f"⚠️ Pipeline complete: {s['ok']}/{s['total']} ok, {s['failed']} failed"
            else:
                msg = f"✅ Pipeline complete: all {s['total']} items processed"
            SlackWebhookOperator(task_id='_inner', slack_webhook_conn_id='slack_ops', message=msg).execute({})

        @task
        def update_dashboard(s=summary):
            # POST metrics to internal observability service
            import httpx
            httpx.post('https://metrics.internal/ingest', json=s, timeout=10)

        send_slack()
        update_dashboard()

    items = get_work_items()
    # .expand() is the magic — one call generates N mapped task instances
    processed = process_item.expand(item=items)
    summary = aggregate_results(processed)
    notify(summary)

advanced_pipeline()

Trigger Rules — The Full Picture

Trigger RuleWhen Task RunsCommon Use Case
ALL_SUCCESSAll upstreams succeededDefault — normal pipeline steps
ALL_FAILEDAll upstreams failedFailure-path cleanup tasks
ALL_DONEAll upstreams finished (any state)Always-run summaries, notifications
ONE_SUCCESSAt least one upstream succeededFan-in where partial success is OK
ONE_FAILEDAt least one upstream failedEarly alerting before all complete
NONE_FAILEDNo upstreams failed (skips allowed)Branching — tasks after a branch
NONE_SKIPPEDNo upstreams were skippedStrict flows with no optional paths
NONE_FAILED_MIN_ONE_SUCCESSNo failures + at least one successJoin after optional parallel tasks
Branching gotcha: When using BranchPythonOperator, all tasks NOT on the chosen branch receive a skipped state. Any downstream join task must use NONE_FAILED_MIN_ONE_SUCCESS or NONE_FAILED — otherwise the join task itself gets skipped too.
// 03 — Custom Operators & Hooks

Building Production-Grade Operators

When built-in operators don't fit, write your own. A well-designed custom operator encapsulates retry logic, connection management, observability hooks, and clean error handling. Here's the full pattern.

pythonoperators/base_api_operator.py
from __future__ import annotations
import time
import logging
from typing import Any, Sequence
from airflow.models.baseoperator import BaseOperator
from airflow.utils.context import Context
from airflow.exceptions import AirflowException, AirflowSkipException

log = logging.getLogger(__name__)

class RobustAPIOperator(BaseOperator):
    """
    Production-grade API operator with:
      - Exponential backoff with jitter
      - Circuit breaker pattern
      - OpenTelemetry span injection
      - Response schema validation
      - Graceful degradation to cache
    """

    # Tells Airflow which template fields to render with Jinja
    template_fields: Sequence[str] = ('endpoint', 'payload', 'headers')
    template_fields_renderers = {'payload': 'json', 'headers': 'json'}
    ui_color = '#1c7a4a'     # custom color in Airflow UI graph view
    ui_fgcolor = '#ffffff'

    def __init__(
        self,
        *,
        conn_id: str,
        endpoint: str,
        method: str = 'GET',
        payload: dict | None = None,
        headers: dict | None = None,
        response_check: callable | None = None,
        max_retries: int = 3,
        backoff_factor: float = 2.0,
        timeout: int = 30,
        do_xcom_push: bool = True,
        skip_on_empty: bool = False,
        **kwargs,
    ):
        super().__init__(**kwargs)
        self.conn_id = conn_id
        self.endpoint = endpoint
        self.method = method.upper()
        self.payload = payload or {}
        self.headers = headers or {}
        self.response_check = response_check
        self.max_retries = max_retries
        self.backoff_factor = backoff_factor
        self.timeout = timeout
        self.do_xcom_push = do_xcom_push
        self.skip_on_empty = skip_on_empty

    def execute(self, context: Context) -> Any:
        from airflow.hooks.http_hook import HttpHook
        import random

        hook = HttpHook(method=self.method, http_conn_id=self.conn_id)

        attempt = 0
        last_exc = None

        while attempt <= self.max_retries:
            try:
                log.info("[%s] Attempt %d/%d → %s %s",
                         self.task_id, attempt + 1, self.max_retries + 1,
                         self.method, self.endpoint)

                response = hook.run(
                    endpoint=self.endpoint,
                    data=self.payload,
                    headers=self.headers,
                    extra_options={'timeout': self.timeout, 'verify': True}
                )
                response.raise_for_status()

                result = response.json()

                # Optional: caller-provided validation function
                if self.response_check and not self.response_check(result):
                    raise AirflowException(f"response_check() failed on: {result}")

                if self.skip_on_empty and not result:
                    log.info("Empty response with skip_on_empty=True — skipping")
                    raise AirflowSkipException("Empty response")

                # Push to XCom for downstream tasks
                context['ti'].xcom_push(key='response', value=result)
                log.info("[%s] Success on attempt %d", self.task_id, attempt + 1)
                return result

            except AirflowSkipException:
                raise
            except Exception as exc:
                last_exc = exc
                attempt += 1
                if attempt > self.max_retries:
                    break
                # Exponential backoff with full jitter
                sleep_time = self.backoff_factor ** attempt + random.uniform(0, 1)
                log.warning("[%s] Attempt %d failed (%s). Retrying in %.1fs",
                            self.task_id, attempt, exc, sleep_time)
                time.sleep(sleep_time)

        raise AirflowException(
            f"All {self.max_retries + 1} attempts failed. Last error: {last_exc}"
        ) from last_exc

    def on_kill(self):
        """Called when task is externally killed — clean up resources here."""
        log.warning("[%s] Task killed — performing cleanup", self.task_id)
        # Cancel any in-flight requests, release connections, etc.
pythonoperators/deferred_sensor.py
"""
Deferrable Sensor — uses Airflow Triggerer for async polling.
Does NOT occupy a worker slot while waiting. Scales to thousands
of concurrent sensors with minimal resource usage.
"""
from __future__ import annotations
import asyncio
from datetime import timedelta
from typing import Any, AsyncIterator
from airflow.triggers.base import BaseTrigger, TriggerEvent
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.context import Context

class S3KeyTrigger(BaseTrigger):
    """Async trigger that polls S3 for a key — runs in Triggerer process."""
    def __init__(self, bucket: str, key: str, poll_interval: float = 30.0):
        super().__init__()
        self.bucket = bucket
        self.key = key
        self.poll_interval = poll_interval

    def serialize(self) -> tuple[str, dict]:
        # Must be serializable — stored in DB while trigger awaits
        return (
            "my_package.triggers.S3KeyTrigger",
            {"bucket": self.bucket, "key": self.key, "poll_interval": self.poll_interval}
        )

    async def run(self) -> AsyncIterator[TriggerEvent]:
        import aiobotocore.session as aioboto
        session = aioboto.get_session()
        async with session.create_client('s3') as s3:
            while True:
                try:
                    await s3.head_object(Bucket=self.bucket, Key=self.key)
                    yield TriggerEvent({"status": "found", "key": self.key})
                    return
                except s3.exceptions.ClientError:
                    await asyncio.sleep(self.poll_interval)  # yields control to event loop!

class DeferrableS3Sensor(BaseSensorOperator):
    def __init__(self, *, bucket: str, key: str, poll_interval: float = 30.0, **kwargs):
        super().__init__(**kwargs)
        self.bucket = bucket
        self.key = key
        self.poll_interval = poll_interval

    def execute(self, context: Context):
        # Immediately defer — do NOT poll in execute()
        self.defer(
            trigger=S3KeyTrigger(self.bucket, self.key, self.poll_interval),
            method_name="execute_complete",
            timeout=timedelta(hours=6),
        )

    def execute_complete(self, context: Context, event: dict) -> str:
        # Called by Triggerer when TriggerEvent fires — resumes in a worker slot
        if event["status"] != "found":
            raise AirflowException(f"Unexpected trigger event: {event}")
        self.log.info("S3 key found: s3://%s/%s", self.bucket, event["key"])
        return event["key"]
// 04 — Observability in Production

Monitoring, Alerting & SLAs

99.2%
Target DAG Success Rate
<30s
Max Scheduler Heartbeat
0
Zombie Tasks Tolerated
<60s
Task Queue Wait P95
<5s
DAG Parse Time Max

Callback-Based Alerting System

pythoncallbacks/alerting.py
"""
Centralized callback system — attach to default_args to alert on
every DAG failure, retry, SLA miss, and task success event.
"""
from __future__ import annotations
import json
import logging
from typing import TYPE_CHECKING
from airflow.models import TaskInstance, DagRun
from airflow.utils.state import State

if TYPE_CHECKING:
    from airflow.utils.context import Context

log = logging.getLogger(__name__)

def _build_slack_blocks(title: str, color: str, fields: list[dict]) -> list[dict]:
    """Build Slack Block Kit message."""
    return [
        {"type": "header", "text": {"type": "plain_text", "text": title}},
        {"type": "section", "fields": [
            {"type": "mrkdwn", "text": f"*{f['label']}*\n{f['value']}"}
            for f in fields
        ]},
    ]

def on_failure_callback(context: Context) -> None:
    """Fires when a task fails. Sends structured Slack alert + Pagerduty if high priority."""
    import httpx
    from airflow.models import Variable

    ti: TaskInstance = context['task_instance']
    dag_run: DagRun   = context['dag_run']
    exception         = context.get('exception')
    log_url           = ti.log_url

    severity = ti.dag.tags and 'critical' in ti.dag.tags

    blocks = _build_slack_blocks(
        title=f"{'🔴 CRITICAL' if severity else '🟠 FAILURE'}: {ti.dag_id}.{ti.task_id}",
        color='danger',
        fields=[
            {'label': 'DAG',          'value': ti.dag_id},
            {'label': 'Task',         'value': ti.task_id},
            {'label': 'Run ID',       'value': dag_run.run_id},
            {'label': 'Try #',        'value': str(ti.try_number)},
            {'label': 'Error',        'value': f"`{str(exception)[:200]}`"},
            {'label': 'Logs',         'value': f"<{log_url}|View logs>"},
        ]
    )

    slack_token = Variable.get('slack_bot_token')
    slack_channel = Variable.get('slack_alerts_channel', '#data-alerts')

    httpx.post(
        'https://slack.com/api/chat.postMessage',
        headers={'Authorization': f'Bearer {slack_token}'},
        json={'channel': slack_channel, 'blocks': blocks},
        timeout=10
    )

    if severity:
        _trigger_pagerduty(ti, exception, context)

def on_retry_callback(context: Context) -> None:
    ti: TaskInstance = context['task_instance']
    log.warning("RETRY: %s.%s attempt %d/%d — %s",
        ti.dag_id, ti.task_id, ti.try_number, ti.max_tries + 1,
        context.get('exception')
    )
    # Lightweight — just log on retries, Slack only on final failure

def on_sla_miss_callback(
    dag, task_list, blocking_task_list, slas, blocking_tis
) -> None:
    """SLA miss fires when a task hasn't completed by dag.sla relative to execution_date."""
    import httpx
    from airflow.models import Variable
    sla_str = ", ".join([f"{s.dag_id}.{s.task_id}" for s in slas])
    log.error("SLA MISS: %s", sla_str)
    # Post to ops channel — SLA misses are high priority

def _trigger_pagerduty(ti: TaskInstance, exc, context: Context):
    import httpx
    from airflow.models import Variable
    pd_key = Variable.get('pagerduty_routing_key')
    httpx.post('https://events.pagerduty.com/v2/enqueue', json={
        'routing_key': pd_key,
        'event_action': 'trigger',
        'payload': {
            'summary': f"Airflow CRITICAL: {ti.dag_id}.{ti.task_id} failed",
            'severity': 'critical',
            'source': 'airflow',
            'custom_details': {'run_id': ti.run_id, 'error': str(exc)[:500]},
        }
    }, timeout=10)

# ── Usage in DAG default_args ────────────────────────────────────
DEFAULT_ARGS = {
    'owner': 'data-engineering',
    'retries': 3,
    'on_failure_callback': on_failure_callback,
    'on_retry_callback': on_retry_callback,
    'sla': __import__('pendulum').duration(hours=2),
}

StatsD / Prometheus Metrics

iniairflow.cfg — metrics config
# airflow.cfg or env vars (AIRFLOW__METRICS__STATSD_*)
[metrics]
statsd_on = True
statsd_host = statsd-exporter.monitoring.svc.cluster.local
statsd_port = 9125
statsd_prefix = airflow
statsd_allow_list = dag.,scheduler.,executor.,pool.
# statsd_allow_list filters metrics sent — reduces cardinality

[scheduler]
# How often scheduler emits its own heartbeat metric
scheduler_heartbeat_sec = 5
# Alert externally if this metric goes stale >
# airflow_scheduler_heartbeat{} for 30+ seconds
yamlprometheus/airflow_alerts.yaml
groups:
  - name: airflow.critical
    rules:

    # Scheduler died or stalled
    - alert: AirflowSchedulerDead
      expr: time() - airflow_scheduler_heartbeat > 60
      for: 2m
      labels:
        severity: critical
      annotations:
        summary: "Airflow scheduler heartbeat missing for {{ $value }}s"

    # Zombie tasks accumulating
    - alert: AirflowZombieTasksHigh
      expr: airflow_zombies_killed_total > 5
      for: 10m
      labels:
        severity: warning
      annotations:
        summary: "Zombie tasks detected: {{ $value }}"

    # Task queue depth too high (workers overwhelmed)
    - alert: AirflowTaskQueueDepth
      expr: airflow_executor_queued_tasks > 200
      for: 5m
      labels:
        severity: warning

    # DAG import errors present
    - alert: AirflowDagImportErrors
      expr: airflow_dag_processing_import_errors > 0
      for: 1m
      labels:
        severity: critical
      annotations:
        summary: "{{ $value }} DAG(s) failing to parse"

    # DB connection pool saturation
    - alert: AirflowDBPoolSaturation
      expr: airflow_pool_starving_tasks > 0
      for: 5m
      labels:
        severity: warning
// 05 — Diagnosing Live Systems

Debugging & Incident Response

Debugging hierarchy: Logs → DB state → Scheduler logs → Worker logs → StatsD metrics. Always start with task logs, never the worker process directly.

CLI Combat Reference

bashairflow CLI — live debugging
# ── DAG STATE ────────────────────────────────────────────────────
# List all active DAG runs
airflow dags list-runs --dag-id my_dag --state running

# Check which tasks are in-flight
airflow tasks states-for-dag-run my_dag scheduled__2024-01-15T04:00:00+00:00

# Find all zombie task instances (running in DB, not in executor)
airflow tasks list --dag-id my_dag --tree

# ── MANUALLY CONTROL TASKS ───────────────────────────────────────
# Trigger a single task for a specific execution date (re-run)
airflow tasks run my_dag my_task 2024-01-15T04:00:00 --local --ignore-all-dependencies

# Clear failed tasks and re-queue them
airflow tasks clear my_dag --start-date 2024-01-15 --end-date 2024-01-16 --yes

# Clear just one task across a date range
airflow tasks clear my_dag -t my_failing_task --start-date 2024-01-01 --yes

# Mark a task as success without running it
airflow tasks mark-success my_dag my_task 2024-01-15T04:00:00

# ── DAG RUNS ─────────────────────────────────────────────────────
# Backfill specific date range (spawns new dag runs)
airflow dags backfill my_dag --start-date 2024-01-01 --end-date 2024-01-07 --reset-dagruns

# Trigger a manual run with config
airflow dags trigger my_dag --conf '{"env": "prod", "batch_size": 500}'

# Pause/unpause a DAG
airflow dags pause my_dag
airflow dags unpause my_dag

# ── VARIABLES & CONNECTIONS ──────────────────────────────────────
airflow variables get pipeline_config
airflow variables set pipeline_config '{"batch_size": 1000}' --json
airflow connections get prod_postgres
airflow connections test prod_postgres       # ← actually tests the connection!

# ── SCHEDULER HEALTH ─────────────────────────────────────────────
airflow jobs check --job-type SchedulerJob --limit 1
airflow scheduler --num-runs 1 --do-pickle   # run scheduler for exactly 1 loop (debug)

# ── DAG PARSE TIME PROFILING ─────────────────────────────────────
python -c "
import time
from airflow.models import DagBag
start = time.time()
bag = DagBag(dag_folder='/opt/airflow/dags', include_examples=False)
elapsed = time.time() - start
print(f'Parse time: {elapsed:.2f}s')
print(f'Import errors: {bag.import_errors}')
for dag_id, dag in bag.dags.items():
    print(f'  {dag_id}: {len(dag.tasks)} tasks')
"

DB-Level Debugging — Direct SQL Queries

sqlpostgres — airflow metastore queries
-- ── FIND STUCK/ZOMBIE TASKS ──────────────────────────────────
SELECT
    ti.dag_id, ti.task_id, ti.run_id, ti.state,
    ti.start_date, ti.end_date,
    EXTRACT(EPOCH FROM (NOW() - ti.start_date)) / 60 AS running_minutes,
    ti.hostname, ti.pid
FROM task_instance ti
WHERE ti.state = 'running'
  AND ti.start_date < NOW() - INTERVAL '2 hours'
ORDER BY running_minutes DESC;

-- ── TASK SUCCESS RATE PER DAG (last 7 days) ──────────────────
SELECT
    dag_id,
    COUNT(*) AS total_runs,
    SUM(CASE WHEN state = 'success' THEN 1 ELSE 0 END) AS successes,
    ROUND(
        100.0 * SUM(CASE WHEN state = 'success' THEN 1 ELSE 0 END) / COUNT(*), 2
    ) AS success_rate_pct,
    AVG(EXTRACT(EPOCH FROM (end_date - start_date))) AS avg_duration_secs
FROM task_instance
WHERE start_date > NOW() - INTERVAL '7 days'
GROUP BY dag_id
ORDER BY success_rate_pct ASC;

-- ── XCOM SIZE AUDIT (large XComs kill performance) ───────────
SELECT
    dag_id, task_id, run_id, key,
    pg_size_pretty(LENGTH(value::text)::bigint) AS xcom_size,
    LENGTH(value::text) AS raw_bytes
FROM xcom
WHERE LENGTH(value::text) > 10000   -- anything > 10KB is suspicious
ORDER BY raw_bytes DESC
LIMIT 20;

-- ── SCHEDULER LAG: time between execution_date and actual start ─
SELECT
    dag_id,
    AVG(EXTRACT(EPOCH FROM (start_date - data_interval_end))) AS avg_lag_secs,
    MAX(EXTRACT(EPOCH FROM (start_date - data_interval_end))) AS max_lag_secs
FROM dag_run
WHERE start_date > NOW() - INTERVAL '24 hours'
  AND run_type = 'scheduled'
GROUP BY dag_id
HAVING AVG(EXTRACT(EPOCH FROM (start_date - data_interval_end))) > 300
ORDER BY avg_lag_secs DESC;

Python Debugging Toolkit — Run Locally

pythondebug_tools/dag_inspector.py
"""
Local DAG inspector — run this WITHOUT a running Airflow instance.
Point AIRFLOW__DATABASE__SQL_ALCHEMY_CONN at your prod DB (read-only replica!)
to inspect live state from your laptop.
"""
import os
from airflow.models import DagBag, TaskInstance, DagRun
from airflow.utils.session import create_session
from sqlalchemy import func

os.environ['AIRFLOW__DATABASE__SQL_ALCHEMY_CONN'] = (
    'postgresql+psycopg2://readonly_user:pass@prod-db:5432/airflow'
)

def inspect_dag_health(dag_id: str, lookback_hours: int = 24):
    """Full health report for a specific DAG."""
    import pendulum
    cutoff = pendulum.now('UTC').subtract(hours=lookback_hours)

    with create_session() as session:
        tis = (
            session.query(TaskInstance)
            .filter(
                TaskInstance.dag_id == dag_id,
                TaskInstance.start_date >= cutoff
            )
            .all()
        )

    state_counts = {}
    durations = []
    for ti in tis:
        state_counts[ti.state] = state_counts.get(ti.state, 0) + 1
        if ti.duration:
            durations.append(ti.duration)

    print(f"\n{'='*60}")
    print(f"DAG Health Report: {dag_id}")
    print(f"Lookback: {lookback_hours}h | Total TIs: {len(tis)}")
    print(f"State breakdown: {state_counts}")
    if durations:
        durations.sort()
        print(f"Duration p50={durations[len(durations)//2]:.1f}s "
              f"p95={durations[int(len(durations)*0.95)]:.1f}s "
              f"max={max(durations):.1f}s")
    if state_counts.get('failed', 0) > 0:
        print(f"⚠ {state_counts['failed']} failures in last {lookback_hours}h!")

def find_slow_tasks(percentile: float = 0.95, min_duration: int = 300) -> None:
    """Find tasks that consistently take longer than usual — potential bottlenecks."""
    with create_session() as session:
        results = session.execute("""
            SELECT dag_id, task_id,
                   COUNT(*) as runs,
                   PERCENTILE_CONT(0.5)  WITHIN GROUP (ORDER BY duration) AS p50,
                   PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY duration) AS p95,
                   MAX(duration) AS max_dur
            FROM task_instance
            WHERE state = 'success'
              AND start_date > NOW() - INTERVAL '7 days'
              AND duration IS NOT NULL
            GROUP BY dag_id, task_id
            HAVING PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY duration) > :min_dur
            ORDER BY p95 DESC
            LIMIT 20
        """, {"min_dur": min_duration})

        print(f"\n{'DAG.TASK':<50} {'runs':>6} {'p50':>8} {'p95':>8} {'max':>8}")
        for r in results:
            key = f"{r.dag_id}.{r.task_id}"
            print(f"{key:<50} {r.runs:>6} {r.p50:>7.0f}s {r.p95:>7.0f}s {r.max_dur:>7.0f}s")

if __name__ == '__main__':
    inspect_dag_health('my_critical_dag', lookback_hours=48)
    find_slow_tasks(min_duration=300)

Common Failure Patterns & Fixes

Zombie Tasks
Worker Lost Mid-Task

Worker process killed (OOM, node eviction) while task was running. DB says running, process is dead. Fix: airflow tasks clear or increase worker_refresh_interval and zombie_detection_interval in scheduler config.

Sensor Deadlock
All Worker Slots Held by Sensors

Non-deferrable sensors consume worker slots while sleeping. With 32 workers and 30 slow sensors, nothing else runs. Fix: migrate sensors to deferrable pattern, or use a dedicated sensor pool with limited slots.

Parse Errors
Silent DAG Disappearance

A Python syntax error or bad import causes the DAG to disappear from the UI without obvious warning. Check DAG Import Errors in the UI, or run airflow dags list-import-errors. Set up the Prometheus alert above.

XCom Bloat
Large Payloads via XCom

XCom is stored in the metastore DB. Passing dataframes or large JSONs through XCom causes DB bloat, slow queries, and scheduler degradation. Fix: use S3/GCS as an intermediary and push only the path via XCom.

// 06 — Tuning for Scale

Performance & Concurrency Tuning

iniairflow.cfg — production tuning
# ── SCHEDULER ────────────────────────────────────────────────────
[scheduler]
max_threads = 4                     # parser threads; 2-4 for most setups
dag_dir_list_interval = 30          # scan dags/ every 30s (reduce from 300)
min_file_process_interval = 30      # min time between parsing same file
file_parsing_sort_mode = modified_time  # prioritise recently changed files
max_dagruns_to_create_per_loop = 10
max_tis_per_query = 512            # batch size for TI state fetching
use_row_level_locking = True        # required for HA scheduler (2+ schedulers)
zombie_detection_interval = 60      # how often to check for zombies
zombie_threshold = 300              # task running > 5m with no heartbeat = zombie

# ── DATABASE ──────────────────────────────────────────────────────
[database]
sql_alchemy_pool_size = 5           # connections per process
sql_alchemy_max_overflow = 10       # burst connections
sql_alchemy_pool_recycle = 1800     # recycle connections every 30min
sql_alchemy_pool_pre_ping = True    # test connection before use (avoids stale conn)

# ── CELERY EXECUTOR ──────────────────────────────────────────────
[celery]
worker_concurrency = 16             # tasks per worker process
worker_prefetch_multiplier = 1      # CRITICAL: set to 1 for fair task distribution
task_acks_late = True               # task acked after completion, not on pickup
task_track_started = True
worker_enable_remote_control = False # security: disable remote kill in prod
operation_timeout = 1.0
celery_app_name = airflow.executors.celery_executor

# ── CORE ─────────────────────────────────────────────────────────
[core]
parallelism = 128                   # global max concurrent tasks
max_active_tasks_per_dag = 64       # per-dag concurrency cap
max_active_runs_per_dag = 5         # prevent catchup storms
dagbag_import_timeout = 30          # kill slow-parsing DAG files
dag_file_processor_timeout = 50     # slightly higher than import_timeout
killed_task_cleanup_time = 60
default_task_execution_timeout = 3600  # 1hr global task timeout

# ── WEBSERVER ────────────────────────────────────────────────────
[webserver]
workers = 4                         # gunicorn workers
worker_timeout = 120
web_server_worker_timeout = 120

Pool Strategy — Slot-Based Throttling

pythonpools/setup_pools.py
"""
Airflow Pools are the primary mechanism to control concurrency
at a resource level — not just task level.

Use pools to:
  - Limit concurrent API calls to an external service
  - Cap DB connections from tasks
  - Separate workloads (sensors vs compute vs IO)
  - Reserve slots for critical DAGs
"""
from airflow.models.pool import Pool
from airflow.utils.session import create_session

POOLS = [
    # Default pool — never set to unlimited in production
    {'name': 'default_pool',        'slots': 128, 'description': 'General compute tasks'},
    # Sensor pool — prevents sensor deadlock
    {'name': 'sensor_pool',          'slots': 32,  'description': 'Polling sensors (use deferrable!)'},
    # External API limits
    {'name': 'salesforce_api',       'slots': 5,   'description': 'Salesforce API rate limit: 5 concurrent'},
    {'name': 'bigquery_pool',        'slots': 50,  'description': 'BigQuery concurrent query limit'},
    # Heavy compute jobs — prevent resource contention
    {'name': 'spark_jobs',           'slots': 8,   'description': 'Spark cluster: max concurrent submissions'},
    # Priority pool — reserved for SLA-bound DAGs
    {'name': 'priority_pool',        'slots': 16,  'description': 'Reserved for critical SLA DAGs'},
]

def upsert_pools():
    with create_session() as session:
        for p in POOLS:
            existing = session.query(Pool).filter(Pool.pool == p['name']).first()
            if existing:
                existing.slots = p['slots']
                existing.description = p['description']
            else:
                session.add(Pool(pool=p['name'], slots=p['slots'],
                                   description=p['description']))
            print(f"Pool '{p['name']}': {p['slots']} slots")

# Usage in tasks:
# @task(pool='salesforce_api', pool_slots=1, priority_weight=10)
# def call_salesforce(): ...
# Or in operators: SomeOperator(pool='spark_jobs', pool_slots=2, ...)

XCom Backend — S3/GCS for Large Payloads

pythonxcom_backends/s3_xcom.py
"""
Custom XCom Backend that stores large values in S3 and small ones in DB.
Configure: AIRFLOW__CORE__XCOM_BACKEND=my_package.xcom_backends.s3_xcom.S3XComBackend
"""
import json, pickle
from typing import Any
from airflow.models.xcom import BaseXCom
from airflow.utils.session import provide_session

XCOM_S3_BUCKET = 'my-airflow-xcom'
SIZE_THRESHOLD_BYTES = 48_000  # store in S3 if > 48KB
S3_PREFIX = 's3://'

class S3XComBackend(BaseXCom):

    @staticmethod
    def serialize_value(
        value: Any,
        *,
        key: str = None,
        task_id: str = None,
        dag_id: str = None,
        run_id: str = None,
        map_index: int = None,
    ) -> bytes:
        import boto3

        serialized = pickle.dumps(value)

        if len(serialized) < SIZE_THRESHOLD_BYTES:
            # Small value — store inline in DB as normal
            return BaseXCom.serialize_value(value)

        # Large value — upload to S3, store reference in DB
        s3_key = f"xcom/{dag_id}/{run_id}/{task_id}/{key}.pkl"
        s3 = boto3.client('s3')
        s3.put_object(
            Bucket=XCOM_S3_BUCKET,
            Key=s3_key,
            Body=serialized,
            ServerSideEncryption='aws:kms',
        )
        reference = {'__xcom_s3_key__': s3_key, 'bucket': XCOM_S3_BUCKET}
        return json.dumps(reference).encode()

    @staticmethod
    def deserialize_value(result: "S3XComBackend") -> Any:
        import boto3
        raw = result.value
        try:
            ref = json.loads(raw)
            if '__xcom_s3_key__' in ref:
                s3 = boto3.client('s3')
                body = s3.get_object(Bucket=ref['bucket'], Key=ref['__xcom_s3_key__'])['Body']
                return pickle.loads(body.read())
        except (json.JSONDecodeError, KeyError):
            pass
        return BaseXCom.deserialize_value(result)
// 07 — Live System Operations

Production Hardening

Secrets Management — Never Use Variables for Secrets

pythonsecrets/aws_secrets_backend.py
# airflow.cfg:
# [secrets]
# backend = airflow.providers.amazon.aws.secrets.secrets_manager.SecretsManagerBackend
# backend_kwargs = {"connections_prefix": "airflow/connections", "variables_prefix": "airflow/variables"}
#
# Airflow will auto-resolve connections stored as:
#   aws secret: airflow/connections/prod_postgres
#   value: postgresql+psycopg2://user:pass@host:5432/db
#
# And variables as:
#   aws secret: airflow/variables/pipeline_config
#   value: {"batch_size": 1000}

# In code — completely transparent. No code changes needed.
from airflow.models import Variable
from airflow.providers.postgres.hooks.postgres import PostgresHook

# Variable.get resolves from Secrets Manager automatically
config = Variable.get('pipeline_config', deserialize_json=True)

# PostgresHook resolves connection from Secrets Manager automatically
hook = PostgresHook(postgres_conn_id='prod_postgres')

# Resolution order: env var → secrets backend → metastore DB → exception
# Set AIRFLOW_CONN_PROD_POSTGRES=... in env for local dev override

Zero-Downtime DAG Deployments

bashdeploy/deploy_dags.sh
#!/bin/bash
# Safe DAG deployment strategy:
# 1. Validate all DAGs parse clean
# 2. Sync to DAG store
# 3. Wait for scheduler to pick up
# 4. Verify no import errors post-deploy

set -euo pipefail

DAG_BUCKET="s3://my-airflow-dags/dags"
LOCAL_DAG_DIR="./dags"
AIRFLOW_HOST="https://airflow.internal"

echo "=== Pre-deploy validation ==="

# 1. Validate all DAGs parse without errors
python -c "
import sys
from airflow.models import DagBag
bag = DagBag(dag_folder='$LOCAL_DAG_DIR', include_examples=False)
if bag.import_errors:
    print('PARSE ERRORS DETECTED:')
    for fname, err in bag.import_errors.items():
        print(f'  {fname}: {err}')
    sys.exit(1)
print(f'OK: {len(bag.dags)} DAGs validated')
"

echo "=== Syncing DAGs to S3 ==="

# 2. Sync with delete — remove orphaned files
aws s3 sync "$LOCAL_DAG_DIR" "$DAG_BUCKET" \
    --exclude "*.pyc" \
    --exclude "__pycache__/*" \
    --exclude "*.DS_Store" \
    --delete \
    --exact-timestamps

echo "=== Waiting for scheduler to pick up changes ==="
sleep 35  # Wait > dag_dir_list_interval (30s)

echo "=== Post-deploy verification ==="

# 3. Check for import errors via API
IMPORT_ERRORS=$(curl -sf \
    -u "${AIRFLOW_USER}:${AIRFLOW_PASS}" \
    "$AIRFLOW_HOST/api/v1/importErrors" \
    | python3 -c "import sys,json; d=json.load(sys.stdin); print(d['total_entries'])")

if [ "$IMPORT_ERRORS" -gt "0" ]; then
    echo "ALERT: $IMPORT_ERRORS import errors detected post-deploy!"
    curl -sf -u "${AIRFLOW_USER}:${AIRFLOW_PASS}" \
        "$AIRFLOW_HOST/api/v1/importErrors" | python3 -m json.tool
    exit 1
fi

echo "Deploy complete. $IMPORT_ERRORS import errors. All DAGs healthy."

Kubernetes Executor — Pod Template

yamlk8s/pod_template.yaml
apiVersion: v1
kind: Pod
metadata:
  labels:
    app: airflow-worker
spec:
  restartPolicy: Never
  serviceAccountName: airflow-worker
  securityContext:
    runAsNonRoot: true
    runAsUser: 50000
  containers:
    - name: base
      image: my-registry/airflow:2.9.0-prod
      imagePullPolicy: Always
      resources:
        requests:
          cpu: "500m"
          memory: "1Gi"
        limits:
          cpu: "2"
          memory: "4Gi"
      env:
        - name: AIRFLOW__CORE__EXECUTOR
          value: KubernetesExecutor
        - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: airflow-secrets
              key: sql-alchemy-conn
      volumeMounts:
        - name: dags
          mountPath: /opt/airflow/dags
          readOnly: true
        - name: logs
          mountPath: /opt/airflow/logs
  volumes:
    - name: dags
      persistentVolumeClaim:
        claimName: airflow-dags-efs
    - name: logs
      emptyDir: {}
  tolerations:
    - key: "airflow-worker"
      operator: "Exists"
      effect: "NoSchedule"
K8s Executor tip: Use executor_config per-task to override pod resources for heavy tasks. A SparkSubmitOperator might need {"pod_override": {"spec": {"containers": [{"resources": {"requests": {"memory": "8Gi"}}}]}}} while most tasks run on 1Gi defaults.
// 08 — Copy-Paste Patterns

Production Recipes

Idempotent Incremental Load Pattern

pythonrecipes/incremental_load.py
"""
Gold standard incremental load: DELETE + INSERT within a transaction.
Idempotent — can be re-run for any execution_date without side effects.
"""
from airflow.decorators import dag, task
import pendulum

@dag(schedule='@daily', start_date=pendulum.datetime(2024, 1, 1), catchup=False)
def incremental_load():

    @task
    def upsert_partition(**context):
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        ds = context['ds']  # '2024-01-15' — the data interval date
        hook = PostgresHook('prod_postgres')
        conn = hook.get_conn()
        with conn.cursor() as cur:
            cur.execute("BEGIN")
            try:
                # Step 1: atomically delete the partition being replaced
                cur.execute("DELETE FROM fact_events WHERE event_date = %s", (ds,))
                print(f"Deleted {cur.rowcount} rows for {ds}")

                # Step 2: insert fresh data
                cur.execute("""
                    INSERT INTO fact_events (event_date, user_id, event_type, revenue)
                    SELECT
                        event_date, user_id, event_type,
                        SUM(amount) as revenue
                    FROM raw_events
                    WHERE event_date = %s
                      AND _loaded_at >= NOW() - INTERVAL '2 days'  -- recency guard
                    GROUP BY event_date, user_id, event_type
                """, (ds,))
                print(f"Inserted {cur.rowcount} rows for {ds}")

                cur.execute("COMMIT")
            except Exception as e:
                cur.execute("ROLLBACK")
                raise

    @task
    def validate_row_count(**context):
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        ds = context['ds']
        hook = PostgresHook('prod_postgres')
        count = hook.get_first(
            "SELECT COUNT(*) FROM fact_events WHERE event_date = %s", parameters=(ds,)
        )[0]
        print(f"Row count for {ds}: {count}")
        if count == 0:
            raise ValueError(f"Zero rows loaded for {ds} — pipeline may be broken")
        return count

    upsert_partition() >> validate_row_count()

incremental_load()

Data Quality Gate Pattern

pythonrecipes/data_quality_gate.py
"""
Reusable data quality gate using Great Expectations or custom rules.
Halts the pipeline if quality checks fail — no silent bad data.
"""
from dataclasses import dataclass, field
from typing import Callable
from airflow.decorators import task

@dataclass
class QualityCheck:
    name: str
    query: str
    threshold: float
    comparison: str = '>='   # actual_value {comparison} threshold
    severity: str = 'error'  # 'error' halts, 'warning' logs only

DEFAULT_CHECKS = [
    QualityCheck(
        name='null_rate_user_id',
        query="SELECT 1 - (COUNT(user_id)::float / NULLIF(COUNT(*),0)) FROM {table} WHERE event_date='{ds}'",
        threshold=0.01, comparison='<=',   # null rate must be <= 1%
    ),
    QualityCheck(
        name='row_count_vs_yesterday',
        query="""
            SELECT COUNT(*) FROM {table} WHERE event_date='{ds}'
                   / NULLIF((SELECT COUNT(*) FROM {table}
                             WHERE event_date='{ds}'::date - 1), 0)
        """,
        threshold=0.5, comparison='>=',   # today must be >= 50% of yesterday
        severity='warning',
    ),
]

def make_quality_gate_task(table: str, checks: list[QualityCheck] = DEFAULT_CHECKS):
    @task(task_id=f"quality_gate_{table}")
    def quality_gate(**context):
        from airflow.providers.postgres.hooks.postgres import PostgresHook
        ds = context['ds']
        hook = PostgresHook('prod_postgres')

        failures, warnings = [], []

        for check in checks:
            sql = check.query.format(table=table, ds=ds)
            result = hook.get_first(sql)
            actual = float(result[0]) if result else 0.0

            ops = {'>=': lambda a,t: a >= t, '<=': lambda a,t: a <= t,
                   '>': lambda a,t: a > t, '<': lambda a,t: a < t}
            passed = ops[check.comparison](actual, check.threshold)

            status = 'PASS' if passed else check.severity.upper()
            msg = f"[{status}] {check.name}: {actual:.4f} {check.comparison} {check.threshold}"
            print(msg)

            if not passed:
                (failures if check.severity == 'error' else warnings).append(msg)

        if warnings:
            print(f"WARNINGS ({len(warnings)}): " + "; ".join(warnings))

        if failures:
            raise ValueError(
                f"Quality gate FAILED for {table} on {ds}:\n" +
                "\n".join(failures)
            )

        return {'table': table, 'ds': ds, 'checks_passed': len(checks) - len(failures)}

    return quality_gate()

Production Checklist