Cloud Vehicle — Phase 2 Reference

Five modules: GKE, BigQuery, Terraform, Pub/Sub, VPC Service Controls. Real configs, real patterns, real tradeoffs — GCP as the reference implementation.

Autopilot vs Standard

Autopilot: GCP manages nodes, scaling, patching. You define Pods; GCP provisions exactly the compute needed. Correct default for FDE deployments — you're not a platform team. Standard: You manage node pools, machine types, autoscalers. Use only when you need GPUs, custom node taints, or spot-instance cost optimization that Autopilot can't express.

Workload Identity — the golden standard

Never mount a JSON service account key into a Pod. Workload Identity binds a Kubernetes ServiceAccount to a GCP IAM ServiceAccount via annotation. The Pod gets GCP credentials automatically — scoped, audited, rotatable without redeployment. If you see --set serviceAccountKey=$(cat key.json) in a Helm values file, that's a security incident waiting to happen.

Private Cluster

Enterprise security requirement: nodes have no public IPs. All egress routes through a NAT gateway; all API calls route through Private Google Access. The cost: your CI/CD runner needs to be inside the VPC (or use a bastion). Budget an extra half-day for the networking when standing up a private cluster for the first time.

Helm = repeatable deployments

Helm is the package manager for Kubernetes manifests. A Chart is a parameterized template; values.yaml is your environment config. FDE use: one Chart, three values files: dev.yaml, staging.yaml, prod.yaml. Promote through environments by changing values, not manifests. Every deployment is auditable and repeatable.

GKE Autopilot cluster — Terraform
Private cluster, Workload Identity, binary auth

# ── GKE Autopilot cluster ─────────────────────────────────────────────────
# Autopilot = Google manages nodes. You manage Pods.
# Right choice for FDE deployments: no node ops overhead,
# automatic bin-packing, built-in security hardening.
#
# Key decisions documented here:
#   private_cluster_config   → no public node IPs (enterprise requirement)
#   workload_identity_config → pods get GCP creds without JSON keys
#   binary_authorization     → only signed container images can run
#   release_channel REGULAR  → automatic patch updates, 1-week lag from rapid

resource "google_container_cluster" "main" {
  name     = "${var.project_id}-cluster"
  location = var.region
  project  = var.project_id

  # Autopilot mode — Google manages node pools entirely
  enable_autopilot = true

  # ── Networking ──────────────────────────────────────────────────────────
  network    = google_compute_network.main.name
  subnetwork = google_compute_subnetwork.main.name

  # Private cluster: nodes have RFC-1918 IPs only.
  # master_ipv4_cidr_block must not overlap with your VPC subnets.
  private_cluster_config {
    enable_private_nodes    = true
    enable_private_endpoint = false          # keep public endpoint for kubectl; IAP protects it
    master_ipv4_cidr_block  = "172.16.0.32/28"
  }

  # Only your CI/CD runner + bastion IPs can reach the API server
  master_authorized_networks_config {
    cidr_blocks {
      cidr_block   = var.authorized_cidr    # e.g. "10.0.0.0/8"
      display_name = "internal-networks"
    }
  }

  # ── Workload Identity ────────────────────────────────────────────────────
  # This single line enables WI at the cluster level.
  # Per-namespace KSA→GSA binding is in workload-identity.yaml.
  workload_identity_config {
    workload_pool = "${var.project_id}.svc.id.goog"
  }

  # ── Release channel ──────────────────────────────────────────────────────
  # REGULAR: auto-patches, ~1-week delay from RAPID.
  # Good default for production. Use STABLE for regulated/finance clients.
  release_channel {
    channel = "REGULAR"
  }

  # ── Add-ons ──────────────────────────────────────────────────────────────
  addons_config {
    # Managed Prometheus — scrapes workload metrics, integrates with Cloud Monitoring
    gke_backup_agent_config { enabled = true }
  }

  # ── Logging / Monitoring → Cloud Operations ──────────────────────────────
  logging_config {
    enable_components = ["SYSTEM_COMPONENTS", "WORKLOADS"]
  }
  monitoring_config {
    enable_components = ["SYSTEM_COMPONENTS", "APISERVER", "WORKLOADS"]
    managed_prometheus { enabled = true }
  }

  # ── Deletion protection ──────────────────────────────────────────────────
  # Prevents accidental `terraform destroy` on production.
  # Set to false in dev/staging tf workspaces.
  deletion_protection = var.environment == "prod" ? true : false

  lifecycle {
    ignore_changes = [node_config]   # Autopilot manages node config; don't drift
  }
}

# ── Cloud NAT for private node egress ────────────────────────────────────
# Private nodes have no public IPs, so outbound internet traffic
# (pulling container images from Docker Hub, etc.) routes through NAT.
resource "google_compute_router" "main" {
  name    = "${var.project_id}-router"
  network = google_compute_network.main.name
  region  = var.region
}

resource "google_compute_router_nat" "main" {
  name                               = "${var.project_id}-nat"
  router                             = google_compute_router.main.name
  region                             = var.region
  nat_ip_allocate_option             = "AUTO_ONLY"
  source_subnetwork_ip_ranges_to_nat = "ALL_SUBNETWORKS_ALL_IP_RANGES"

  log_config {
    enable = true
    filter = "ERRORS_ONLY"
  }
}
Why Autopilot over Standard for FDE work? Standard clusters require you to right-size node pools, manage autoscaler configs, handle node upgrades, and respond to node-level issues. Autopilot eliminates all of that — GCP bins-packs your Pods onto nodes it manages. The tradeoff is less control over node machine types. In practice, FDEs are deploying AI agents and data pipelines, not kernel modules — Autopilot is the right default 90% of the time.
Workload Identity binding
KSA → GSA without JSON keys

# workload-identity.yaml
# ─────────────────────────────────────────────────────────────────────────
# Workload Identity: Pods get GCP IAM credentials without managing keys.
#
# The chain:
#   Pod spec → serviceAccountName: agent-sa       (Kubernetes SA)
#   KSA annotation → iam.gke.io/gcp-service-account (GCP SA binding)
#   IAM binding → roles/iam.workloadIdentityUser    (trust grant)
#
# Result: when the agent code calls `google.auth.default()`, GCP returns
# credentials scoped to the GSA — automatically, without any key file.
# The credentials rotate every hour. Nothing to manage. Nothing to leak.

---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: agent-sa
  namespace: agents
  annotations:
    # This annotation is the binding point.
    # The value MUST be exactly: PROJECT_ID.svc.id.goog[NAMESPACE/KSA_NAME]
    iam.gke.io/gcp-service-account: "agent-runner@PROJECT_ID.iam.gserviceaccount.com"

---
# The IAM binding is also required (set via Terraform, not YAML):
#
# resource "google_service_account_iam_binding" "workload_identity" {
#   service_account_id = google_service_account.agent_runner.name
#   role               = "roles/iam.workloadIdentityUser"
#   members = [
#     "serviceAccount:PROJECT_ID.svc.id.goog[agents/agent-sa]"
#   ]
# }

---
# Verify it works after deployment:
# kubectl run -it --rm wi-test \
#   --image=google/cloud-sdk:slim \
#   --serviceaccount=agent-sa \
#   --namespace=agents \
#   -- gcloud auth print-identity-token
#
# If you see a token: binding is working.
# If you see "permission denied": check the IAM binding above.
The JSON key problem. A JSON service account key is a static credential. It doesn't expire. If it leaks (committed to git, exposed in logs, embedded in a container image layer), the attacker has permanent access until you manually rotate. Workload Identity tokens expire every hour and are never written to disk. This is the difference between a locked door and a door that re-locks itself every 60 minutes.
Agent deployment manifest
Production-grade Pod spec with resource limits, probes, secrets

# agent-deployment.yaml
# A production-grade Deployment for a Vertex AI / ADK agent workload.
# Every field here is intentional — annotations explain the why.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: data-agent
  namespace: agents
  labels:
    app: data-agent
    version: "1.0.0"
    tier: application
spec:
  replicas: 2                       # min 2 for HA; HPA scales this up
  selector:
    matchLabels:
      app: data-agent
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxUnavailable: 0             # never take both pods down simultaneously
      maxSurge: 1                   # spin up 1 extra pod before terminating old

  template:
    metadata:
      labels:
        app: data-agent
        version: "1.0.0"
      annotations:
        # Prometheus scrape config — picked up by managed Prometheus
        prometheus.io/scrape: "true"
        prometheus.io/port: "8080"
        prometheus.io/path: "/metrics"

    spec:
      # ── Identity ─────────────────────────────────────────────────────
      serviceAccountName: agent-sa    # the KSA with Workload Identity binding

      # ── Security context ──────────────────────────────────────────────
      securityContext:
        runAsNonRoot: true
        runAsUser: 1000
        fsGroup: 2000
        seccompProfile:
          type: RuntimeDefault

      # ── Containers ────────────────────────────────────────────────────
      containers:
      - name: agent
        image: REGION-docker.pkg.dev/PROJECT_ID/agents/data-agent:VERSION
        imagePullPolicy: Always      # never use :latest in production

        ports:
        - containerPort: 8080
          name: http

        # ── Resource requests and limits ────────────────────────────
        # Requests: what GKE uses for scheduling (guaranteed).
        # Limits: hard ceiling — Pod is OOM-killed if it exceeds memory limit.
        # Rule of thumb: limit/request ratio ≤ 2x.
        # Autopilot will NOT schedule a Pod if requests exceed available capacity.
        resources:
          requests:
            cpu: "500m"             # 0.5 vCPU
            memory: "1Gi"
          limits:
            cpu: "2000m"            # 2 vCPU burst
            memory: "2Gi"

        # ── Probes ────────────────────────────────────────────────────
        # Liveness: if this fails, Kubernetes restarts the container.
        # Readiness: if this fails, the Pod is removed from the Service
        #            endpoint — traffic stops routing to it without a restart.
        # Always have both. A hung agent (liveness) is different from
        # a starting-up agent (readiness).
        livenessProbe:
          httpGet:
            path: /healthz
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
          failureThreshold: 3

        readinessProbe:
          httpGet:
            path: /readyz
            port: 8080
          initialDelaySeconds: 10
          periodSeconds: 5
          failureThreshold: 2

        # ── Environment variables ──────────────────────────────────
        # Non-sensitive config via ConfigMap, secrets via Secret.
        # Never inline secret values in manifests.
        env:
        - name: PROJECT_ID
          valueFrom:
            configMapKeyRef:
              name: agent-config
              key: project_id
        - name: BIGQUERY_DATASET
          valueFrom:
            configMapKeyRef:
              name: agent-config
              key: bq_dataset
        # Vertex AI API key (if needed) from Secret Manager via CSI driver
        - name: VERTEX_API_KEY
          valueFrom:
            secretKeyRef:
              name: vertex-api-key
              key: api-key

      # ── Topology constraints ──────────────────────────────────────────
      # Spread pods across zones — single-zone failure doesn't take both down.
      topologySpreadConstraints:
      - maxSkew: 1
        topologyKey: topology.kubernetes.io/zone
        whenUnsatisfiable: DoNotSchedule
        labelSelector:
          matchLabels:
            app: data-agent

---
# Horizontal Pod Autoscaler: scale 2→10 based on CPU
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: data-agent-hpa
  namespace: agents
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: data-agent
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70    # scale up when avg CPU > 70%
The probe trap FDEs hit. Agents often have a slow startup (loading model weights, initializing DB connections). Without initialDelaySeconds tuned correctly, Kubernetes restarts the container before it's finished starting — and you get a crash loop that looks like a code bug. Rule: set initialDelaySeconds to 2x your measured startup time. Separate liveness from readiness: a Pod can be alive (not crashed) but not yet ready (still warming up).
Helm values — prod vs dev
One chart, three environments

# helm/values/prod.yaml
# ─────────────────────────────────────────────────────────────────────────
# Helm lets you write the Kubernetes manifest once (the Chart) and
# parameterize environment differences here. Promote dev→prod by
# changing this file — not by editing raw YAML.
#
# Deploy:  helm upgrade --install data-agent ./chart -f values/prod.yaml
# Diff:    helm diff upgrade data-agent ./chart -f values/prod.yaml

replicaCount: 3              # prod: 3 replicas min

image:
  repository: us-central1-docker.pkg.dev/my-project/agents/data-agent
  tag: "1.4.2"               # pin exact tag — never "latest" in prod
  pullPolicy: Always

resources:
  requests:
    cpu: "1000m"
    memory: "2Gi"
  limits:
    cpu: "4000m"
    memory: "4Gi"

autoscaling:
  enabled: true
  minReplicas: 3
  maxReplicas: 20
  targetCPUUtilizationPercentage: 70

config:
  projectId: "my-prod-project"
  bigqueryDataset: "gold"
  vertexRegion: "us-central1"
  logLevel: "INFO"            # prod: INFO, dev: DEBUG

ingress:
  enabled: true
  annotations:
    kubernetes.io/ingress.class: "gce"
    kubernetes.io/ingress.global-static-ip-name: "agent-ip"
  host: "agent.company.com"

serviceAccount:
  name: agent-sa              # must match Workload Identity binding

---
# helm/values/dev.yaml  (differences from prod only)
replicaCount: 1

image:
  tag: "latest-dev"           # dev: float on latest build

resources:
  requests:
    cpu: "250m"
    memory: "512Mi"
  limits:
    cpu: "1000m"
    memory: "1Gi"

autoscaling:
  enabled: false              # dev: fixed replica count

config:
  projectId: "my-dev-project"
  bigqueryDataset: "dev_gold"
  logLevel: "DEBUG"           # dev: verbose logging

ingress:
  enabled: false              # dev: use kubectl port-forward instead
The Helm mental model. A Chart is a parameterized description of your application. values.yaml is the environment manifest. The separation means your Kubernetes YAML never has hardcoded project IDs, image tags, or resource limits — those live in values/prod.yaml which is reviewed, version-controlled, and promotable. helm diff (from the helm-diff plugin) shows you exactly what will change before you deploy — essential for client-site deployments where surprises are costly.
Partitioning vs Clustering

Partitioning: physical data segmentation by a column (usually date). BQ skips entire partitions that don't match your filter. A WHERE event_date = '2025-04-02' on a date-partitioned table touches only that day's data — potentially 1/365th the cost. Clustering: within each partition, rows are sorted by up to 4 columns. WHERE region = 'Southeast' on a clustered table skips irrelevant row groups. Use both together: partition by date, cluster by the next most selective filter column.

Slot vs On-Demand pricing

On-demand: pay per TB scanned ($6.25/TB). Correct default for variable/unpredictable workloads. Slots: buy compute capacity (100-slot increments), unlimited scans. Correct when monthly scan cost > slot reservation cost (~$1,600/month for 100 slots). FDE heuristic: if a client's BQ bill exceeds $5K/month, model out the slot break-even. You'll look like a hero.

Materialized Views

BQ can maintain a pre-computed aggregation that refreshes automatically as base tables change. Use for Gold-layer aggregations queried >10x/day by dashboards. The query hits the materialized view's pre-computed result instead of scanning the fact table. Caveat: they add write latency and have freshness windows — not suitable for real-time requirements.

Column-level security

BigQuery Policy Tags let you mark columns as PII/PHI and restrict which IAM principals can see the data. A data analyst with roles/bigquery.dataViewer sees NULL in the ssn column; a data engineer with the masking policy exemption sees the real value. This is how you satisfy HIPAA/GDPR without maintaining separate datasets for different audiences.

BigQuery table schema — Terraform
Partitioned + clustered Gold table with column security

# ── BigQuery dataset + Gold fact table ────────────────────────────────────
# Every configuration decision below has a cost and security implication.

resource "google_bigquery_dataset" "gold" {
  dataset_id                  = "gold"
  location                    = var.region      # DATA RESIDENCY: match your VPC region
  description                 = "Business-ready Gold layer. Source of truth for BI and AI."
  delete_contents_on_destroy  = false           # never auto-drop in prod

  # Default encryption with CMEK (Customer-Managed Encryption Key)
  # Required for: HIPAA, FedRAMP, some finance clients
  default_encryption_configuration {
    kms_key_name = google_kms_crypto_key.bq_key.id
  }

  # Access control: analysts can query; engineers can write; no public access
  access {
    role          = "READER"
    group_by_email = "data-analysts@company.com"
  }
  access {
    role          = "WRITER"
    group_by_email = "data-engineers@company.com"
  }
}

resource "google_bigquery_table" "fct_orders" {
  dataset_id = google_bigquery_dataset.gold.dataset_id
  table_id   = "fct_orders"
  project    = var.project_id

  description = "One row per order. Partitioned by order_date. Clustered by region, status."

  # ── Partitioning ──────────────────────────────────────────────────────
  # TIME-BASED: BQ creates one physical partition per day.
  # Queries filtered on order_date only scan matching partitions.
  # A 3-year table with 1000 daily rows = 1095 partitions.
  # WHERE order_date = '2025-04-02' scans exactly 1/1095 of the data.
  #
  # require_partition_filter = true enforces that every query MUST
  # include an order_date filter. Prevents accidental full-table scans.
  time_partitioning {
    type                     = "DAY"
    field                    = "order_date"
    expiration_ms            = 63072000000   # 730 days (2 years)
    require_partition_filter = true
  }

  # ── Clustering ────────────────────────────────────────────────────────
  # Within each partition, rows are physically sorted by these columns.
  # Queries filtering on region AND/OR order_status skip entire row groups.
  # BQ automatically maintains clustering as data is inserted.
  # Max 4 columns. Order matters: most selective filter column first.
  clustering = ["region", "order_status", "customer_tier"]

  # ── Schema ───────────────────────────────────────────────────────────
  schema = jsonencode([
    { name = "order_id",        type = "INTEGER",   mode = "REQUIRED" },
    { name = "customer_id",     type = "INTEGER",   mode = "REQUIRED" },
    { name = "order_date",      type = "DATE",      mode = "REQUIRED" },
    { name = "order_total_usd", type = "NUMERIC",   mode = "NULLABLE" },
    { name = "order_status",    type = "STRING",    mode = "REQUIRED" },
    { name = "region",          type = "STRING",    mode = "REQUIRED" },
    { name = "customer_tier",   type = "INTEGER",   mode = "NULLABLE" },
    { name = "customer_email",  type = "STRING",    mode = "NULLABLE",
      policyTags = { names = [google_data_catalog_policy_tag.pii.id] }
    },
    { name = "_loaded_at",      type = "TIMESTAMP", mode = "REQUIRED" },
    { name = "_row_hash",       type = "STRING",    mode = "REQUIRED" }
  ])

  # ── Prevent accidental deletion ──────────────────────────────────────
  lifecycle {
    prevent_destroy = true
  }
}

# ── Column-level security: PII policy tag ────────────────────────────────
resource "google_data_catalog_policy_tag" "pii" {
  taxonomy     = google_data_catalog_taxonomy.main.id
  display_name = "PII"
  description  = "Personally Identifiable Information. Masked for most roles."
}

# Grant unmasked access only to the data-engineers group
resource "google_data_catalog_policy_tag_iam_binding" "pii_access" {
  policy_tag = google_data_catalog_policy_tag.pii.name
  role       = "roles/datacatalog.categoryFineGrainedReader"
  members    = ["group:data-engineers@company.com"]
}
The partition filter enforcement trick. require_partition_filter = true is the most underused BigQuery setting. Without it, a single rogue query like SELECT * FROM gold.fct_orders LIMIT 10 from a notebook will scan the entire table — potentially thousands of dollars. With it, BQ rejects the query with "table requires a filter on partition column." Annoying to developers the first time. Saves enormous money long-term.
Gold fact model — dbt + BigQuery SQL
Star schema, merge pattern, window aggregations

-- models/gold/fct_orders.sql
-- Gold is the final consumer-facing layer.
-- One row per order. Denormalized for query performance.
-- Optimized for the access pattern: filter by date + region + status.

{{
  config(
    materialized          = 'incremental',
    unique_key            = 'order_id',
    partition_by          = { 'field': 'order_date', 'data_type': 'date', 'granularity': 'day' },
    cluster_by            = ['region', 'order_status', 'customer_tier'],
    incremental_strategy  = 'merge',       -- BQ native MERGE: upsert on unique_key
    on_schema_change      = 'sync_all_columns',
    tags                  = ['gold', 'orders', 'daily'],
  )
}}

WITH

orders AS (
    SELECT * FROM {{ ref('stg_orders') }}
    {% if is_incremental() %}
    -- Only process Silver rows newer than the last Gold run.
    -- _loaded_at is the Bronze ingestion timestamp — always set.
    WHERE _loaded_at > (
        SELECT MAX(_loaded_at) FROM {{ this }}
    )
    {% endif %}
),

customers AS (
    -- Snapshot of customer attributes at query time.
    -- For historical accuracy, use a Type 2 SCD dim_customers table here.
    SELECT
        customer_id,
        customer_sk,
        email      AS customer_email,
        region,
        customer_tier,
        is_active
    FROM {{ ref('stg_customers') }}
),

-- ── Core join ─────────────────────────────────────────────────────────
joined AS (
    SELECT
        o.order_id,
        o.customer_id,
        c.customer_sk,
        o.order_date,
        o.order_total_usd,
        o.order_status,
        o.is_large_order,
        c.customer_email,
        COALESCE(c.region, 'UNKNOWN')        AS region,
        COALESCE(c.customer_tier, 0)         AS customer_tier,
        c.is_active                          AS customer_is_active,
        o._loaded_at,
        o._row_hash
    FROM orders o
    LEFT JOIN customers c USING (customer_id)
    -- LEFT JOIN preserves orders even if customer record is missing.
    -- The orphan test in Silver warns us; Gold doesn't silently drop orders.
),

-- ── Window aggregations added at Gold ─────────────────────────────────
-- These are analytical facts (not definitional) — they belong in Gold.
-- Running totals, lag comparisons, and cohort metrics live here.
enriched AS (
    SELECT
        *,

        -- Customer's running order count at the time of this order
        COUNT(order_id) OVER (
            PARTITION BY customer_id
            ORDER BY order_date
            ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        )                                    AS customer_order_sequence,

        -- Revenue contribution vs same customer's all-time avg
        ROUND(
            order_total_usd /
            NULLIF(AVG(order_total_usd) OVER (PARTITION BY customer_id), 0),
        2)                                   AS order_vs_customer_avg,

        -- Days since this customer's previous order
        DATE_DIFF(
            order_date,
            LAG(order_date) OVER (
                PARTITION BY customer_id
                ORDER BY order_date
            ),
            DAY
        )                                    AS days_since_last_order

    FROM joined
)

SELECT * FROM enriched
Why LEFT JOIN in Gold and not INNER JOIN. An INNER JOIN silently drops every order whose customer_id has no matching customer. In a CDC pipeline, customer records can briefly lag behind order records. An INNER JOIN hides that revenue from dashboards until the customer record arrives. LEFT JOIN + COALESCE + the Silver orphan test gives you visibility into the problem without hiding the orders from the business.
Reading EXPLAIN plans + diagnosing skew
The most valuable debugging skill in BQ

-- ── EXPLAIN plan basics ────────────────────────────────────────────────
-- Run in BQ console or via bq CLI:
--   bq query --use_legacy_sql=false 'EXPLAIN SELECT ...'
--
-- Key stages to look for in the execution plan:

-- STAGE 1: Input  → reads and filters partitions
-- STAGE 2: Join   → hash join or broadcast join
-- STAGE 3: Agg    → GROUP BY / window functions
-- STAGE 4: Output → writes result

-- What you're looking for:
--   "estimated bytes processed": if this is >> what you expected,
--   your partition filter isn't being pushed down.
--
--   "shuffle": data moved between workers. High shuffle = potential skew.
--
--   "max worker time >> avg worker time": DATA SKEW.
--   One worker is processing most of the data.

-- ── Detect partition filter push-down ─────────────────────────────────
-- Good: only 1 partition scanned
SELECT COUNT(*) FROM `project.gold.fct_orders`
WHERE order_date = '2025-04-02'       -- partition filter: BQ reads 1 day

-- Bad: full table scan (no date filter) — NEVER let this reach production
-- SELECT COUNT(*) FROM `project.gold.fct_orders`
-- WHERE order_status = 'COMPLETED'   -- not the partition key: reads ALL

-- ── Detect and fix data skew ───────────────────────────────────────────
-- Symptom: GROUP BY country takes 45min; 1 worker handles 90% of rows.
-- Diagnosis: check the distribution of the GROUP BY key.

SELECT
    region,
    COUNT(*) AS row_count,
    ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) AS pct
FROM `project.gold.fct_orders`
GROUP BY region
ORDER BY row_count DESC

-- If one value has >50% of rows, you have skew on that join/group key.

-- Fix 1: Pre-aggregate the heavy key separately, then JOIN
WITH se_orders AS (
    SELECT customer_id, SUM(order_total_usd) AS regional_total
    FROM `project.gold.fct_orders`
    WHERE region = 'Southeast'        -- handle the fat partition alone
    GROUP BY customer_id
),
other_orders AS (
    SELECT customer_id, SUM(order_total_usd) AS regional_total
    FROM `project.gold.fct_orders`
    WHERE region != 'Southeast'
    GROUP BY customer_id
)
SELECT * FROM se_orders
UNION ALL
SELECT * FROM other_orders

-- Fix 2: Salt the join key to spread a hot key across workers
-- (When skew is in a JOIN, not a GROUP BY)
SELECT
    FLOOR(RAND() * 10) AS salt,     -- add random 0-9 suffix
    order_id,
    customer_id
FROM `project.gold.fct_orders`
WHERE region = 'Southeast'

-- ── BigQuery INFORMATION_SCHEMA: operational intelligence ─────────────
-- How much did each job cost? Which queries are the most expensive?

SELECT
    job_id,
    user_email,
    query,
    total_bytes_processed / POW(1024, 4)    AS tb_scanned,
    total_bytes_processed / POW(1024, 4) * 6.25 AS estimated_cost_usd,
    total_slot_ms / 1000                    AS slot_seconds,
    creation_time
FROM `region-us`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
WHERE creation_time >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR)
  AND statement_type = 'SELECT'
ORDER BY total_bytes_processed DESC
LIMIT 20
The INFORMATION_SCHEMA query is your FinOps weapon. Run it weekly on a client's BigQuery project and you'll almost always find 1-2 queries responsible for 60-80% of the cost — usually from a dashboard tool refreshing on a schedule without partition filters. Fixing those 2 queries can cut BQ spend by 50%+ in a single week. That's the kind of "quick win" that justifies an FDE engagement.
Cost guard + dry-run pattern
Never let a bad query reach production billing

"""
bq_cost_guard.py — Dry-run any BQ query before execution.

BigQuery dry-run returns bytes_processed without running the query.
Use this in CI/CD: fail the pipeline if a query would scan > threshold.
Use interactively: sanity-check a new query before the analyst runs it.
"""
from google.cloud import bigquery
from google.cloud.bigquery import QueryJobConfig

client = bigquery.Client()
COST_PER_TB_USD = 6.25


def dry_run(sql: str, max_gb: float = 100.0) -> dict:
    """
    Returns bytes estimate. Raises if query would exceed max_gb.
    Does NOT execute the query — zero cost.
    """
    config = QueryJobConfig(dry_run=True, use_query_cache=False)
    job = client.query(sql, job_config=config)

    bytes_est  = job.total_bytes_processed
    gb_est     = bytes_est / 1e9
    tb_est     = bytes_est / 1e12
    cost_est   = tb_est * COST_PER_TB_USD

    result = {
        "bytes":    bytes_est,
        "gb":       round(gb_est, 2),
        "tb":       round(tb_est, 4),
        "cost_usd": round(cost_est, 4),
    }

    if gb_est > max_gb:
        raise ValueError(
            f"Query would scan {gb_est:.1f} GB (limit: {max_gb} GB).\n"
            f"  Estimated cost: ${cost_est:.2f}\n"
            f"  Add a partition filter or review the query before running."
        )

    print(f"  [ok] Dry-run: {gb_est:.2f} GB | ${cost_est:.4f} | safe to run.")
    return result


def set_query_budget(project_id: str, monthly_limit_usd: float = 500.0):
    """
    Sets a project-level BigQuery quota that hard-caps monthly spend.
    Queries exceeding the budget return an error instead of charging.

    Run once per project setup — not on every pipeline run.
    """
    # This requires the BQ Admin API — see:
    # https://cloud.google.com/bigquery/docs/custom-quotas
    print(f"Set via Cloud Console: IAM > Quotas > BigQuery > Query usage per day")
    print(f"Target: {monthly_limit_usd / 30:.0f} USD/day = {monthly_limit_usd:.0f} USD/month")
    print(f"Or via bq CLI: bq update --project_id={project_id} \\")
    print(f"  --set_label=billing_budget:{int(monthly_limit_usd)}")


# ── CI/CD integration example ─────────────────────────────────────────────
if __name__ == "__main__":
    test_query = """
        SELECT region, SUM(order_total_usd) AS revenue
        FROM `my-project.gold.fct_orders`
        WHERE order_date BETWEEN '2025-01-01' AND '2025-03-31'
        GROUP BY region
    """
    stats = dry_run(test_query, max_gb=50)
    print(f"Safe to run: {stats}")
The dry-run pattern belongs in CI/CD. Every new dbt model touching BigQuery should run a dry-run in the PR pipeline. If the model would scan >500GB, the PR fails with the byte estimate and a prompt to add partition filters. This catches expensive queries before they reach production billing — not after the end-of-month invoice arrives. It's also a great way to prove to a skeptical client CFO that you're managing their cloud costs proactively.
State file = the source of truth

Terraform tracks what it has created in a state file. If two engineers run terraform apply simultaneously against the same state, you get race conditions and drift. Solution: remote state with locking — store state in GCS, use DynamoDB or GCS object lock to prevent concurrent applies. The state file contains secrets (service account keys, DB passwords) — never commit it to git.

Workspaces = environments

Terraform workspaces let you use one configuration for multiple environments. terraform workspace new prod creates a separate state file. Reference the workspace in resource names: ${terraform.workspace}-cluster. Prod resources are named prod-cluster; dev gets dev-cluster. One set of .tf files; three isolated environments.

Modules = reusable infra components

A Terraform module is a directory of .tf files with input variables and outputs. Write a module once for "GKE + BigQuery + IAM baseline"; instantiate it for each client project with different variables. FDE use: your fde-landing-zone module spins up a complete, secure client environment in 10 minutes. That's the "deploy forward in 5 minutes" capability the FDE doc demands.

Plan before apply — always

terraform plan -out=tfplan generates an execution plan without making changes. Review it: are you creating what you expect? Destroying anything unexpected? On a client site, always run plan first and get a second set of eyes on destructive operations (- destroy lines). terraform apply tfplan applies exactly the plan you reviewed — no surprises.

FDE landing zone — main.tf
Complete GCP environment in one apply

# main.tf — FDE Landing Zone
# Provisions: VPC + subnets, GKE Autopilot, BigQuery datasets,
#             Cloud Storage buckets (Bronze/Silver/Gold), Artifact Registry
#
# Deploy:   terraform init && terraform workspace new prod && terraform apply
# Destroy:  terraform workspace select dev && terraform destroy
# (prod has prevent_destroy on key resources)

terraform {
  required_version = ">= 1.5"
  required_providers {
    google = {
      source  = "hashicorp/google"
      version = "~> 5.0"
    }
    google-beta = {
      source  = "hashicorp/google-beta"
      version = "~> 5.0"
    }
  }
}

provider "google" {
  project = var.project_id
  region  = var.region
}

locals {
  env    = terraform.workspace                         # "dev", "staging", "prod"
  prefix = "${var.project_id}-${local.env}"
  is_prod = local.env == "prod"
}

# ── VPC ──────────────────────────────────────────────────────────────────
resource "google_compute_network" "main" {
  name                    = "${local.prefix}-vpc"
  auto_create_subnetworks = false   # custom subnets only — never auto
  routing_mode            = "GLOBAL"
}

resource "google_compute_subnetwork" "main" {
  name          = "${local.prefix}-subnet"
  network       = google_compute_network.main.id
  region        = var.region
  ip_cidr_range = var.subnet_cidr     # e.g. "10.0.0.0/20"

  # Secondary ranges for GKE Pods and Services
  secondary_ip_range {
    range_name    = "pods"
    ip_cidr_range = var.pods_cidr     # e.g. "10.1.0.0/16"
  }
  secondary_ip_range {
    range_name    = "services"
    ip_cidr_range = var.services_cidr # e.g. "10.2.0.0/20"
  }

  # VPC Flow Logs: essential for security auditing and debugging
  log_config {
    aggregation_interval = "INTERVAL_5_SEC"
    flow_sampling        = 0.5
    metadata             = "INCLUDE_ALL_METADATA"
  }
}

# ── Cloud Storage buckets (Medallion layers) ─────────────────────────────
resource "google_storage_bucket" "bronze" {
  name          = "${local.prefix}-bronze"
  location      = var.region
  force_destroy = !local.is_prod

  versioning { enabled = true }     # immutability via versioning
  uniform_bucket_level_access = true

  # Auto-delete Bronze after 90 days (cost management)
  lifecycle_rule {
    condition { age = 90 }
    action    { type = "Delete" }
  }
  lifecycle_rule {
    condition { age = 30 }
    action { type = "SetStorageClass"; storage_class = "NEARLINE" }
  }
}

resource "google_storage_bucket" "silver" {
  name          = "${local.prefix}-silver"
  location      = var.region
  force_destroy = !local.is_prod
  uniform_bucket_level_access = true
  versioning { enabled = true }
}

resource "google_storage_bucket" "gold" {
  name          = "${local.prefix}-gold"
  location      = var.region
  force_destroy = !local.is_prod
  uniform_bucket_level_access = true
}

# ── Artifact Registry (container images) ─────────────────────────────────
resource "google_artifact_registry_repository" "agents" {
  repository_id = "agents"
  format        = "DOCKER"
  location      = var.region
  description   = "Agent container images"

  # Vulnerability scanning on push
  docker_config { immutable_tags = local.is_prod }
}

# ── BigQuery datasets ─────────────────────────────────────────────────────
resource "google_bigquery_dataset" "bronze" {
  dataset_id  = "${local.env}_bronze"
  location    = var.region
  description = "Raw landing zone. Managed by dbt Bronze models."
  delete_contents_on_destroy = !local.is_prod
}

resource "google_bigquery_dataset" "silver" {
  dataset_id  = "${local.env}_silver"
  location    = var.region
  description = "Cleaned, deduplicated. Single Source of Truth."
  delete_contents_on_destroy = !local.is_prod
}

resource "google_bigquery_dataset" "gold" {
  dataset_id  = "${local.env}_gold"
  location    = var.region
  description = "Business-ready aggregates. Drives BI and AI."
  delete_contents_on_destroy = !local.is_prod
}
The local.env = terraform.workspace pattern is what makes one codebase serve three environments. Every resource name, bucket name, and dataset name includes the workspace prefix: prod-bronze, dev-bronze. Destroy dev with confidence — prod state is entirely separate. When onboarding a new client environment, terraform workspace new client-name && terraform apply provisions the full stack in under 10 minutes.
IAM — least-privilege bindings
The most important security file in your stack

# iam.tf — Least-privilege IAM
# Rule: grant the minimum role that allows the task. Nothing more.
# Every binding here has a one-line comment explaining why it exists.

# ── Service Accounts ─────────────────────────────────────────────────────
resource "google_service_account" "agent_runner" {
  account_id   = "${local.prefix}-agent"
  display_name = "Data Agent Runner"
  description  = "Used by GKE Pods via Workload Identity. Reads BQ, writes GCS."
}

resource "google_service_account" "dbt_runner" {
  account_id   = "${local.prefix}-dbt"
  display_name = "dbt Transform Runner"
  description  = "Runs dbt models in CI/CD. BQ job runner + dataset writer."
}

resource "google_service_account" "ingest" {
  account_id   = "${local.prefix}-ingest"
  display_name = "Ingest Service"
  description  = "Writes raw data to Bronze GCS bucket only."
}

# ── Agent runner: read BQ Gold, read/write Silver GCS ────────────────────
resource "google_bigquery_dataset_iam_member" "agent_gold_reader" {
  dataset_id = google_bigquery_dataset.gold.dataset_id
  role       = "roles/bigquery.dataViewer"       # read-only: no DDL, no writes
  member     = "serviceAccount:${google_service_account.agent_runner.email}"
}

resource "google_storage_bucket_iam_member" "agent_silver_reader" {
  bucket = google_storage_bucket.silver.name
  role   = "roles/storage.objectViewer"          # read Silver Parquet files
  member = "serviceAccount:${google_service_account.agent_runner.email}"
}

# ── dbt runner: needs to create tables and run jobs in BQ ────────────────
# roles/bigquery.dataEditor: create/update/delete tables (not drop datasets)
# roles/bigquery.jobUser:    run query jobs (required to execute any SQL)
# These two together = dbt can do its job and nothing else.
resource "google_bigquery_dataset_iam_member" "dbt_silver_editor" {
  dataset_id = google_bigquery_dataset.silver.dataset_id
  role       = "roles/bigquery.dataEditor"
  member     = "serviceAccount:${google_service_account.dbt_runner.email}"
}

resource "google_bigquery_dataset_iam_member" "dbt_gold_editor" {
  dataset_id = google_bigquery_dataset.gold.dataset_id
  role       = "roles/bigquery.dataEditor"
  member     = "serviceAccount:${google_service_account.dbt_runner.email}"
}

resource "google_project_iam_member" "dbt_job_user" {
  project = var.project_id
  role    = "roles/bigquery.jobUser"             # run BQ jobs
  member  = "serviceAccount:${google_service_account.dbt_runner.email}"
}

# ── Ingest service: ONLY write to Bronze bucket ───────────────────────────
resource "google_storage_bucket_iam_member" "ingest_bronze_writer" {
  bucket = google_storage_bucket.bronze.name
  role   = "roles/storage.objectCreator"         # create only — cannot delete/overwrite
  member = "serviceAccount:${google_service_account.ingest.email}"
}
# Ingest service has NO BigQuery access — by design.
# If Bronze ingest is compromised, attacker cannot read/modify BQ data.

# ── Workload Identity: bind GKE KSA to GSA ───────────────────────────────
resource "google_service_account_iam_binding" "agent_workload_identity" {
  service_account_id = google_service_account.agent_runner.name
  role               = "roles/iam.workloadIdentityUser"
  members = [
    "serviceAccount:${var.project_id}.svc.id.goog[agents/agent-sa]"
  ]
}
The blast radius principle. Every IAM binding limits the damage if that service account is compromised. The ingest service has roles/storage.objectCreator on Bronze only — if it's compromised, the attacker can write garbage to Bronze but cannot read production data, modify Silver/Gold, or run BigQuery jobs. This is the Principle of Least Privilege in practice: design so that any single compromise has a bounded, recoverable blast radius.
variables.tf + terraform.tfvars
Parameterize once, deploy anywhere

# variables.tf — all input parameters with descriptions and validation
# This file is the "interface" to your Terraform module.
# Anyone deploying this stack can read variables.tf and know
# exactly what they need to provide — no reading the main.tf required.

variable "project_id" {
  type        = string
  description = "GCP project ID. Must already exist."
  validation {
    condition     = length(var.project_id) > 0
    error_message = "project_id cannot be empty."
  }
}

variable "region" {
  type        = string
  description = "Primary GCP region. All resources deploy here."
  default     = "us-central1"
  validation {
    condition     = contains(["us-central1", "us-east1", "europe-west1", "europe-west4", "asia-northeast1"], var.region)
    error_message = "Must be a supported region. Check your data residency requirements."
  }
}

variable "environment" {
  type        = string
  description = "Deployment environment. Controls resource naming and deletion protection."
  validation {
    condition     = contains(["dev", "staging", "prod"], var.environment)
    error_message = "environment must be dev, staging, or prod."
  }
}

variable "subnet_cidr" {
  type        = string
  description = "Primary subnet CIDR. Must not overlap with on-prem or other VPCs."
  default     = "10.0.0.0/20"
}

variable "pods_cidr" {
  type        = string
  description = "GKE Pod IP range. Needs to accommodate max_pods_per_node * max_nodes."
  default     = "10.1.0.0/16"   # 65,536 pod IPs — safe for most deployments
}

variable "services_cidr" {
  type        = string
  description = "GKE Service IP range."
  default     = "10.2.0.0/20"   # 4,096 service IPs
}

variable "authorized_cidr" {
  type        = string
  description = "CIDR block allowed to reach the GKE API server. Your VPN or bastion range."
  default     = "10.0.0.0/8"
}

---
# terraform.tfvars — actual values (NOT committed to git for prod)
# Use separate .tfvars files per environment:
#   dev.tfvars, staging.tfvars, prod.tfvars
# In CI/CD: terraform apply -var-file=prod.tfvars

project_id      = "my-client-project-prod"
region          = "us-central1"
environment     = "prod"
subnet_cidr     = "10.10.0.0/20"
pods_cidr       = "10.20.0.0/16"
services_cidr   = "10.30.0.0/20"
authorized_cidr = "10.0.0.0/8"
Validation blocks are documentation that runs. Without validation blocks, a typo like environment = "production" (vs "prod") silently creates a resource named production-cluster and breaks every naming convention in your stack. With validation, Terraform fails immediately with a clear error message. Write validation for every variable with a constrained set of valid values — it costs 3 lines and saves hours of debugging.
backend.tf — remote state + locking
Never lose state. Never have concurrent applies.

# backend.tf — Remote state configuration
# Run this BEFORE anything else in a new project:
#   1. Create the GCS bucket manually (or via bootstrap script)
#   2. Add this file
#   3. terraform init   (migrates local state to GCS)
#
# Why remote state?
#   - Local state breaks team workflows (who has the latest state.tfstate?)
#   - GCS provides versioning + automatic locking
#   - State is encrypted at rest by default in GCS
#
# WARNING: the state file contains secrets. Enable:
#   - Uniform bucket-level access (no public ACLs)
#   - Customer-managed encryption (CMEK) for regulated clients
#   - Object versioning (recover from accidental corruption)

terraform {
  backend "gcs" {
    bucket  = "my-project-terraform-state"   # created manually before init
    prefix  = "envs/prod"                    # separate folder per workspace
    # GCS provides object-level locking natively — no separate lock table needed
    # (unlike S3, which requires DynamoDB for locking)
  }
}

# ── Bootstrap script: create the state bucket ────────────────────────────
# Run this once, manually, before the first terraform init.
# The state bucket itself is NOT managed by Terraform (chicken-and-egg).
#
# gcloud storage buckets create gs://my-project-terraform-state \
#   --location=us-central1 \
#   --uniform-bucket-level-access \
#   --public-access-prevention
#
# gcloud storage buckets update gs://my-project-terraform-state \
#   --versioning
#
# gcloud storage buckets update gs://my-project-terraform-state \
#   --default-encryption-key=projects/my-project/locations/us-central1/keyRings/main/cryptoKeys/terraform

# ── Team workflow ─────────────────────────────────────────────────────────
# Developer A:
#   terraform plan   → acquires lock, generates plan, releases lock
#   terraform apply  → acquires lock, applies, releases lock
#
# Developer B (while A is applying):
#   terraform apply  → BLOCKED with "state is locked by process X"
#   → This is correct behavior. Wait for A to finish.
#
# Emergency unlock (if a process died holding the lock):
#   terraform force-unlock LOCK_ID
#   → Only use this if you're certain no other process is running.
The state bucket is the most important infrastructure you manage manually. It must exist before terraform init, it must have versioning enabled (to recover from state corruption), and it must have strict access controls (the state file contains every secret value Terraform has ever created). On a client site, propose GCS + CMEK for the state bucket on day 1. It's 30 minutes of setup that prevents catastrophic recovery scenarios later.
Legacy SQL / CRM Pub/Sub Topic Push/Pull Subscription Cloud Run Agent real-time processing path
Pub/Sub Topic Dataflow (BQ Storage Write) BigQuery (streaming buffer) high-volume ingest path
Dead Letter Topic Cloud Function Alert + Bronze quarantine failure path
Pub/Sub topic + subscriptions — Terraform
With dead-letter topic and retention

# pubsub.tf — streaming glue between legacy systems and your platform

# ── Main topic ────────────────────────────────────────────────────────────
resource "google_pubsub_topic" "orders" {
  name    = "${local.prefix}-orders"
  project = var.project_id

  # Message retention: keep undelivered messages for 7 days.
  # This is your replay window: if a subscriber goes down for a week,
  # it can catch up. Without this, messages older than the default
  # (7-day max) are gone forever.
  message_retention_duration = "604800s"   # 7 days in seconds

  # Schema enforcement: reject messages that don't match the Avro schema.
  # Prevents malformed events from reaching your pipeline at all.
  # schema_settings {
  #   schema   = google_pubsub_schema.order_event.id
  #   encoding = "JSON"
  # }
}

# ── Dead Letter Topic ─────────────────────────────────────────────────────
# Messages that fail to be acknowledged after max_delivery_attempts
# are forwarded here instead of being dropped.
# A Cloud Function or subscriber monitors DLT and alerts + quarantines.
resource "google_pubsub_topic" "orders_dlq" {
  name    = "${local.prefix}-orders-dlq"
  project = var.project_id
  message_retention_duration = "604800s"   # 7 days to investigate failures
}

# ── Pull subscription: for Cloud Run agent consumer ──────────────────────
resource "google_pubsub_subscription" "orders_agent" {
  name    = "${local.prefix}-orders-agent"
  topic   = google_pubsub_topic.orders.id
  project = var.project_id

  # How long subscriber has to ACK before message is redelivered.
  # Set this to > your processing time. If an order takes 10s to process,
  # set ack_deadline to 30s to avoid redelivery mid-processing.
  ack_deadline_seconds = 30

  # Retain unacked messages for 3 days (within the topic's 7-day window)
  message_retention_duration = "259200s"

  # Retry policy: exponential backoff on processing failures
  retry_policy {
    minimum_backoff = "10s"
    maximum_backoff = "600s"   # max 10 minutes between retries
  }

  # Dead letter: after 5 failed delivery attempts, send to DLQ
  dead_letter_policy {
    dead_letter_topic     = google_pubsub_topic.orders_dlq.id
    max_delivery_attempts = 5
  }

  # Ordering: deliver messages with the same ordering_key in order.
  # Use when message sequence matters (e.g. ORDER_CREATED → ORDER_SHIPPED).
  enable_message_ordering = true
}

# ── Push subscription: for Cloud Run (webhook model) ─────────────────────
# Pub/Sub delivers messages directly to your Cloud Run endpoint.
# Good for low-volume, latency-sensitive processing.
# Pull is better for high-volume (you control the polling rate).
resource "google_pubsub_subscription" "orders_cloudrun" {
  name    = "${local.prefix}-orders-cloudrun"
  topic   = google_pubsub_topic.orders.id

  push_config {
    push_endpoint = "${google_cloud_run_v2_service.agent.uri}/pubsub"
    oidc_token {
      service_account_email = google_service_account.agent_runner.email
    }
  }

  ack_deadline_seconds = 60
  dead_letter_policy {
    dead_letter_topic     = google_pubsub_topic.orders_dlq.id
    max_delivery_attempts = 5
  }
}
The dead-letter topic is non-negotiable. Without a DLT, messages that fail processing after max_delivery_attempts are silently dropped. You'll never know an order was lost until a customer calls support three days later. With a DLT, every failed message is quarantined, visible, alertable, and replayable. The 10 minutes it takes to add a DLT in Terraform is worth weeks of forensic debugging later.
publisher.py — publishing with ordering keys
Batching, ordering, error handling

"""
pubsub/publisher.py — Production Pub/Sub publisher.

Key patterns:
  - OrderingKey: guarantees in-order delivery per customer
  - Batching: reduce API calls for high-volume publishing
  - Future callbacks: non-blocking publish with error handling
  - Exponential backoff: retry transient failures
"""
import json
import logging
from datetime import datetime, timezone
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1
from google.api_core.exceptions import GoogleAPIError

logger = logging.getLogger(__name__)

PROJECT_ID = "my-project"
TOPIC_ID   = "prod-orders"


def get_publisher() -> pubsub_v1.PublisherClient:
    """
    Publisher with batching enabled.
    Pub/Sub batches messages before sending — reduces API calls
    and increases throughput at the cost of a small latency increase.
    """
    batch_settings = pubsub_v1.types.BatchSettings(
        max_messages  = 100,      # send when 100 messages queued
        max_bytes     = 1_000_000, # or when batch hits 1MB
        max_latency   = 0.01,     # or after 10ms — whichever first
    )
    return pubsub_v1.PublisherClient(batch_settings=batch_settings)


def publish_order_event(
    order: dict,
    event_type: str = "ORDER_CREATED",
) -> str:
    """
    Publish a single order event with ordering key.

    OrderingKey = customer_id:
      All events for the same customer arrive at the subscriber
      in the order they were published. Essential for:
        ORDER_CREATED → ORDER_SHIPPED → ORDER_DELIVERED
      Without ordering: subscriber might process DELIVERED before CREATED.

    Returns the message ID on success.
    """
    publisher  = get_publisher()
    topic_path = publisher.topic_path(PROJECT_ID, TOPIC_ID)

    # Envelope pattern: wrap payload with metadata
    # The subscriber uses event_type to route to the right handler.
    message = {
        "event_type":    event_type,
        "event_version": "1.0",
        "event_time":    datetime.now(timezone.utc).isoformat(),
        "source":        "order-service",
        "payload":       order,
    }

    future = publisher.publish(
        topic_path,
        data          = json.dumps(message).encode("utf-8"),
        ordering_key  = f"customer-{order['customer_id']}",  # per-customer ordering
        event_type    = event_type,          # message attributes (filterable)
        environment   = "prod",
    )

    try:
        message_id = future.result(timeout=30)
        logger.info(f"Published {event_type} for order {order['order_id']}: {message_id}")
        return message_id
    except TimeoutError:
        logger.error(f"Publish timed out for order {order['order_id']}")
        raise
    except GoogleAPIError as e:
        logger.error(f"Pub/Sub API error: {e}")
        raise


def publish_batch(orders: list[dict], event_type: str = "ORDER_CREATED") -> list:
    """
    Publish a batch of orders non-blocking.
    All futures are started, then we wait for results.
    Much faster than sequential publish for bulk ingestion.
    """
    publisher  = get_publisher()
    topic_path = publisher.topic_path(PROJECT_ID, TOPIC_ID)
    futures    = []

    for order in orders:
        message = json.dumps({
            "event_type": event_type,
            "event_time": datetime.now(timezone.utc).isoformat(),
            "payload":    order,
        }).encode("utf-8")

        future = publisher.publish(
            topic_path,
            data         = message,
            ordering_key = f"customer-{order['customer_id']}",
        )
        futures.append((order["order_id"], future))

    # Collect results
    results, errors = [], []
    for order_id, future in futures:
        try:
            msg_id = future.result(timeout=60)
            results.append({"order_id": order_id, "message_id": msg_id})
        except Exception as e:
            errors.append({"order_id": order_id, "error": str(e)})
            logger.error(f"Failed to publish order {order_id}: {e}")

    if errors:
        logger.warning(f"{len(errors)} publish failures — check DLQ")
    logger.info(f"Published {len(results)}/{len(orders)} messages")
    return results
The ordering key contract. enable_message_ordering = true in the subscription + ordering_key in the publisher = guaranteed in-order delivery per key. Without this, a subscriber might process ORDER_SHIPPED before ORDER_CREATED for the same customer, leaving your Silver layer in an inconsistent state. The cost: slightly lower throughput (messages with the same key can't be parallelized). Always use it for event streams where sequence matters.
subscriber.py — pull consumer with ACK management
The most important Pub/Sub pattern to get right

"""
pubsub/subscriber.py — Pull subscriber with correct ACK semantics.

The most common Pub/Sub mistake: ACKing before processing.
If you ACK and then your processing crashes, the message is gone.
If you NACK or let the deadline expire, Pub/Sub redelivers it.
ACK only after you have confirmed the work is done and durable.
"""
import json
import logging
from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

logger = logging.getLogger(__name__)

PROJECT_ID       = "my-project"
SUBSCRIPTION_ID  = "prod-orders-agent"


def process_order_event(message: pubsub_v1.subscriber.message.Message) -> None:
    """
    Callback invoked for each message.

    ACK/NACK contract:
      message.ack()   — processing succeeded. Pub/Sub stops delivering.
      message.nack()  — processing failed. Pub/Sub redelivers after backoff.
      (no response)   — ack_deadline expires, Pub/Sub redelivers.

    Never ACK before your work is committed to a durable store.
    The typical sequence:
      1. Parse and validate the message
      2. Write to DuckDB / BigQuery / GCS   ← durable commit
      3. message.ack()                      ← only now
    """
    try:
        # ── Parse ────────────────────────────────────────────────────────
        envelope = json.loads(message.data.decode("utf-8"))
        event_type = envelope.get("event_type", "UNKNOWN")
        payload    = envelope.get("payload", {})
        order_id   = payload.get("order_id")

        logger.info(f"Processing {event_type} for order {order_id} "
                    f"(delivery #{message.delivery_attempt})")

        # ── Route by event type ──────────────────────────────────────────
        if event_type == "ORDER_CREATED":
            handle_order_created(payload)
        elif event_type == "ORDER_SHIPPED":
            handle_order_shipped(payload)
        elif event_type == "ORDER_CANCELLED":
            handle_order_cancelled(payload)
        else:
            # Unknown event type: ACK to prevent infinite redelivery,
            # but log for investigation.
            logger.warning(f"Unknown event_type '{event_type}' — ACKing to drain")
            message.ack()
            return

        # ── ACK only after successful processing ────────────────────────
        message.ack()
        logger.info(f"ACKed order {order_id}")

    except json.JSONDecodeError as e:
        # Malformed JSON: NACK once to let retry policy try again,
        # but after max_delivery_attempts it goes to DLQ automatically.
        logger.error(f"Malformed message payload: {e}")
        message.nack()

    except Exception as e:
        # Unexpected error: NACK so Pub/Sub redelivers with backoff.
        # After max_delivery_attempts (5), message goes to DLQ.
        logger.error(f"Processing failed for order {order_id}: {e}", exc_info=True)
        message.nack()


def handle_order_created(payload: dict) -> None:
    """Write new order to Bronze Parquet / streaming BQ insert."""
    # Implementation: call ingest.py or BQ streaming insert
    logger.info(f"  → ORDER_CREATED: {payload.get('order_id')}")


def handle_order_shipped(payload: dict) -> None:
    """Update order status in Silver (via BQ MERGE or DuckDB UPDATE)."""
    logger.info(f"  → ORDER_SHIPPED: {payload.get('order_id')}")


def handle_order_cancelled(payload: dict) -> None:
    """Mark order cancelled. Trigger refund event if needed."""
    logger.info(f"  → ORDER_CANCELLED: {payload.get('order_id')}")


def start_subscriber(timeout: float = None) -> None:
    """Start the pull subscriber. Blocks until timeout or KeyboardInterrupt."""
    subscriber = pubsub_v1.SubscriberClient()
    sub_path   = subscriber.subscription_path(PROJECT_ID, SUBSCRIPTION_ID)

    # Flow control: limit in-flight messages to avoid OOM
    flow_control = pubsub_v1.types.FlowControl(
        max_messages       = 50,       # process at most 50 concurrent messages
        max_bytes          = 50_000_000,  # 50MB in flight
    )

    streaming_pull_future = subscriber.subscribe(
        sub_path,
        callback     = process_order_event,
        flow_control = flow_control,
    )

    logger.info(f"Listening for messages on {sub_path}")
    try:
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()
        streaming_pull_future.result()
    except KeyboardInterrupt:
        streaming_pull_future.cancel()
ACK-after-commit is the only safe ACK pattern. ACK before your work is durable means a process crash between ACK and commit loses the message permanently — Pub/Sub won't redeliver an ACKed message. ACK after commit means at-least-once delivery: if the process crashes after commit but before ACK, the message is redelivered and processed again. Design your handlers to be idempotent — processing the same order twice should be safe. This is the correct tradeoff.
dead_letter.py — DLQ monitor + quarantine
What to do when messages fail

"""
pubsub/dead_letter.py — Dead Letter Queue monitor.

Messages land here after max_delivery_attempts (5) failed.
This process:
  1. Reads from the DLQ subscription
  2. Categorizes the failure (malformed JSON, missing fields, etc.)
  3. Writes to Bronze quarantine folder for investigation
  4. Sends a Slack/PagerDuty alert
  5. ACKs from DLQ (removes from queue)

Run as a separate Cloud Run service on a slow poll (every 5 minutes).
Not time-critical — DLQ is for investigation, not real-time processing.
"""
import json
import logging
from datetime import datetime, timezone
from pathlib import Path
from google.cloud import pubsub_v1, storage

logger = logging.getLogger(__name__)

PROJECT_ID       = "my-project"
DLQ_SUB_ID       = "prod-orders-dlq-monitor"
QUARANTINE_BUCKET = "prod-bronze"
QUARANTINE_PREFIX = "quarantine/orders"


def categorize_failure(message_data: bytes) -> str:
    """Determine why the message likely failed."""
    try:
        payload = json.loads(message_data.decode("utf-8"))
        if "event_type" not in payload:
            return "missing_event_type"
        if "payload" not in payload:
            return "missing_payload"
        if "order_id" not in payload.get("payload", {}):
            return "missing_order_id"
        return "processing_error"   # parsed fine but processor crashed
    except (json.JSONDecodeError, UnicodeDecodeError):
        return "malformed_json"


def quarantine_message(
    message: pubsub_v1.subscriber.message.Message,
    failure_reason: str,
) -> str:
    """Write failed message to Bronze quarantine for investigation."""
    client = storage.Client()
    bucket = client.bucket(QUARANTINE_BUCKET)

    timestamp = datetime.now(timezone.utc).strftime("%Y/%m/%d/%H")
    blob_name = (
        f"{QUARANTINE_PREFIX}/{timestamp}/"
        f"{failure_reason}_{message.message_id}.json"
    )

    quarantine_record = {
        "message_id":        message.message_id,
        "failure_reason":    failure_reason,
        "delivery_attempts": message.delivery_attempt,
        "publish_time":      message.publish_time.isoformat() if message.publish_time else None,
        "quarantined_at":    datetime.now(timezone.utc).isoformat(),
        "attributes":        dict(message.attributes),
        "data_base64":       message.data.hex(),  # store as hex for safety
        "data_text":         message.data.decode("utf-8", errors="replace"),
    }

    blob = bucket.blob(blob_name)
    blob.upload_from_string(
        json.dumps(quarantine_record, indent=2),
        content_type="application/json",
    )

    logger.warning(
        f"Quarantined message {message.message_id} "
        f"(reason: {failure_reason}) → gs://{QUARANTINE_BUCKET}/{blob_name}"
    )
    return f"gs://{QUARANTINE_BUCKET}/{blob_name}"


def send_alert(failure_reason: str, message_id: str, quarantine_path: str) -> None:
    """Send alert to Slack / PagerDuty. Replace with your alerting integration."""
    alert = {
        "severity":       "warning" if failure_reason != "malformed_json" else "error",
        "message":        f"DLQ message quarantined: {failure_reason}",
        "message_id":     message_id,
        "quarantine_path": quarantine_path,
        "action_required": "Inspect quarantine file and determine if replay is needed",
    }
    logger.error(f"ALERT: {json.dumps(alert)}")
    # TODO: post to Slack webhook / PagerDuty API


def process_dlq(max_messages: int = 100) -> dict:
    """Pull and process DLQ messages."""
    subscriber = pubsub_v1.SubscriberClient()
    sub_path   = subscriber.subscription_path(PROJECT_ID, DLQ_SUB_ID)

    response = subscriber.pull(
        request={"subscription": sub_path, "max_messages": max_messages}
    )

    stats = {"processed": 0, "quarantined": 0, "categories": {}}

    for received_message in response.received_messages:
        msg = received_message.message

        failure_reason = categorize_failure(msg.data)
        quarantine_path = quarantine_message(msg, failure_reason)
        send_alert(failure_reason, msg.message_id, quarantine_path)

        # ACK from DLQ — message is now in quarantine (durable)
        subscriber.acknowledge(
            request={"subscription": sub_path, "ack_ids": [received_message.ack_id]}
        )

        stats["processed"] += 1
        stats["quarantined"] += 1
        stats["categories"][failure_reason] = stats["categories"].get(failure_reason, 0) + 1

    if stats["processed"] > 0:
        logger.warning(f"DLQ processed: {json.dumps(stats)}")
    return stats


if __name__ == "__main__":
    result = process_dlq()
    print(f"DLQ run complete: {result}")
The quarantine pattern completes the pipeline. Bronze is immutable. Silver is cleaned. Gold is business-ready. But what about the data that was too broken to make it through? The quarantine folder in Bronze is the answer: it's the fourth, invisible layer — the "anti-Bronze" that captures everything the pipeline rejected. Every quarantined message has enough metadata to investigate, fix the root cause, and replay it into the main pipeline once the upstream issue is resolved.
What VPC SC actually does

VPC Service Controls creates a security perimeter around GCP managed services (BigQuery, GCS, Vertex AI, Pub/Sub). Even if an attacker compromises a service account, they cannot exfiltrate data to a project outside the perimeter. Calls to bq.tables.export from an unauthorized network return 403 PERMISSION_DENIED — regardless of IAM roles.

When you need it

Mandatory for: HIPAA, FedRAMP, PCI-DSS, most defense/government clients, any client whose compliance team has a "data exfiltration prevention" requirement. Budget an extra day for the initial setup and half a day for each new service you add to the perimeter — ingress/egress rules require careful tuning.

Perimeter vs IAM

IAM controls who can do what. VPC SC controls where they can do it from. IAM: "this service account can read BigQuery." VPC SC: "only from inside this network perimeter." They are complementary, not substitutes. You need both for a hardened environment.

Dry-run mode is essential

Before enforcing a perimeter, run it in dry-run mode first. Dry-run logs every request that would be blocked without actually blocking it. Run dry-run for 48-72 hours, review Cloud Logging, and add ingress/egress rules for everything that would fail. Skipping dry-run on a production environment will break services mid-day — and the error messages are cryptic.

VPC Service Controls perimeter — Terraform
Wraps BQ, GCS, Vertex AI in a data exfiltration fence

# vpc_sc.tf — VPC Service Controls perimeter
# ─────────────────────────────────────────────────────────────────────────
# IMPORTANT: Always start in dry-run mode (use_explicit_dry_run_spec = true).
# Enforced mode blocks traffic immediately. Use dry-run to find what breaks.
# Migration path:
#   1. Deploy with use_explicit_dry_run_spec = true
#   2. Monitor violations for 48-72 hours in Cloud Logging
#   3. Add ingress/egress rules for all legitimate traffic
#   4. Change spec to status and remove dry run config
# ─────────────────────────────────────────────────────────────────────────

# ── Access Policy (org-level, created once) ───────────────────────────────
# An Access Policy is the container for all Service Perimeters.
# One per organization. If your client already has one, use data source:
#   data "google_access_context_manager_access_policy" "main" {}
resource "google_access_context_manager_access_policy" "main" {
  parent = "organizations/${var.org_id}"
  title  = "${var.project_id} Access Policy"
}

# ── Access Level: who can access perimeter-protected services ─────────────
# This level permits access from:
#   - Your approved corporate IP ranges
#   - The GKE cluster's node CIDR (for workloads inside the cluster)
resource "google_access_context_manager_access_level" "corporate" {
  parent = "accessPolicies/${google_access_context_manager_access_policy.main.name}"
  name   = "accessPolicies/${google_access_context_manager_access_policy.main.name}/accessLevels/corporate_network"
  title  = "Corporate Network"

  basic {
    conditions {
      ip_subnetworks = [
        var.corporate_cidr,          # e.g. "203.0.113.0/24" — office NAT IP
        var.vpn_cidr,                # e.g. "10.0.0.0/8" — on-prem VPN range
        var.gke_nodes_cidr,          # GKE node CIDR for workload access
      ]
      # Optional: require specific device policies (managed devices only)
      # device_policy { require_screen_lock = true }
    }
  }
}

# ── Service Perimeter ────────────────────────────────────────────────────
resource "google_access_context_manager_service_perimeter" "main" {
  parent = "accessPolicies/${google_access_context_manager_access_policy.main.name}"
  name   = "accessPolicies/${google_access_context_manager_access_policy.main.name}/servicePerimeters/data_perimeter"
  title  = "Data Exfiltration Prevention Perimeter"

  # ENFORCED spec — blocks real traffic.
  # Remove this block and use spec{} below for dry-run.
  status {
    restricted_services = [
      "bigquery.googleapis.com",
      "storage.googleapis.com",
      "aiplatform.googleapis.com",   # Vertex AI
      "pubsub.googleapis.com",
      "cloudkms.googleapis.com",
    ]

    resources = [
      "projects/${var.project_number}",
    ]

    access_levels = [
      google_access_context_manager_access_level.corporate.name,
    ]

    # VPC Accessible Services: only services in restricted_services
    # can be called from inside the perimeter. Prevents internal lateral movement.
    vpc_accessible_services {
      enable_restriction = true
      allowed_services   = [
        "bigquery.googleapis.com",
        "storage.googleapis.com",
        "aiplatform.googleapis.com",
        "pubsub.googleapis.com",
      ]
    }

    # Ingress/egress rules: see ingress_egress.tf
    ingress_policies  = [/* defined in ingress_egress.tf */]
    egress_policies   = [/* defined in ingress_egress.tf */]
  }

  # DRY-RUN spec: log violations without blocking.
  # During initial setup, deploy this instead of status{} above.
  # spec {
  #   restricted_services = [ ... same as status ... ]
  #   resources           = [ ... ]
  #   access_levels       = [ ... ]
  # }
  # use_explicit_dry_run_spec = true

  lifecycle {
    # Prevent accidental perimeter deletion — would immediately expose data
    prevent_destroy = var.environment == "prod" ? true : false
  }
}
The dry-run lesson every FDE learns once. Skipping dry-run mode and enforcing VPC SC immediately on a production project will break services within minutes — Cloud Run calling BigQuery, Vertex AI writing to GCS, dbt running from CI/CD. The error message is always the same cryptic 403, and tracking down every affected service takes hours. The 48-72 hour dry-run investment always pays for itself.
Ingress + egress rules — the tuning work
Allowing legitimate traffic through the perimeter

# ingress_egress.tf
# ─────────────────────────────────────────────────────────────────────────
# Ingress rule: "Who outside the perimeter can call services inside it?"
# Egress rule:  "Who inside the perimeter can call services outside it?"
#
# Common rules you'll need:
#   Ingress: CI/CD service account (dbt runner from GitHub Actions)
#   Ingress: Looker / BI tool IP calling BigQuery
#   Egress:  Vertex AI calling GCS (model artifacts)
#   Egress:  Cloud Run calling external APIs
# ─────────────────────────────────────────────────────────────────────────

# ── Ingress: CI/CD dbt runner can call BigQuery from outside perimeter ────
# GitHub Actions runs dbt from outside your VPC. Without this rule,
# dbt's BQ calls get 403 PERMISSION_DENIED even with correct IAM.
resource "google_access_context_manager_service_perimeter" "main" {
  # ... (extends the resource from vpc_sc.tf — shown here inline for clarity)

  status {
    ingress_policies {
      ingress_from {
        # Allow from specific service accounts (not IP-based)
        identities = [
          "serviceAccount:${google_service_account.dbt_runner.email}",
        ]
        # From any source (GitHub-hosted runners have variable IPs)
        sources {
          access_level = "*"   # any access level = any source
        }
      }
      ingress_to {
        resources = ["projects/${var.project_number}"]
        operations {
          service_name = "bigquery.googleapis.com"
          method_selectors {
            method = "google.cloud.bigquery.v2.JobService.InsertJob"
          }
          method_selectors {
            method = "google.cloud.bigquery.v2.TableService.InsertTable"
          }
        }
      }
    }

    # ── Ingress: Looker / BI tool by IP ─────────────────────────────────
    ingress_policies {
      ingress_from {
        sources {
          # Looker has fixed outbound IPs — allow them by CIDR via access level
          access_level = google_access_context_manager_access_level.bi_tool.name
        }
        identities = ["serviceAccount:${var.looker_service_account}"]
      }
      ingress_to {
        resources = ["projects/${var.project_number}"]
        operations {
          service_name = "bigquery.googleapis.com"
          method_selectors { method = "*" }   # all BQ methods
        }
      }
    }

    # ── Egress: Vertex AI can read/write GCS for model artifacts ─────────
    egress_policies {
      egress_from {
        identities = [
          "serviceAccount:service-${var.project_number}@gcp-sa-aiplatform.iam.gserviceaccount.com",
        ]
      }
      egress_to {
        resources = ["projects/${var.project_number}"]   # same project = same perimeter, but explicit
        operations {
          service_name = "storage.googleapis.com"
          method_selectors { method = "google.storage.objects.get" }
          method_selectors { method = "google.storage.objects.create" }
        }
      }
    }
  }
}
Ingress/egress rules are where VPC SC becomes an ongoing maintenance task. Every new service, every new CI/CD integration, every new BI tool requires a new ingress or egress rule. The operational discipline: when a service account starts getting 403s after VPC SC enforcement, the first question is always "does it need an ingress rule?" Keep a running doc of every rule in your perimeter and why it exists — this becomes the compliance audit trail.
Dry-run violation query — Cloud Logging
Find what would break before it does

-- ── Query dry-run VPC SC violations in Cloud Logging ─────────────────────
-- Run in the GCP Logs Explorer after deploying in dry-run mode.
-- This shows every API call that WOULD be blocked by the perimeter.
-- Add ingress/egress rules for all legitimate traffic before enforcing.
--
-- Log filter (Logs Explorer syntax):
protoPayload.methodName="google.identity.accesscontextmanager.v1.AccessContextManager.UpdateServicePerimeter"
resource.type="audited_resource"
severity="WARNING"

-- ── BigQuery query against log exports (if using Log Sink to BQ) ─────────
SELECT
  timestamp,
  protoPayload.authenticationInfo.principalEmail AS caller,
  protoPayload.requestMetadata.callerIp           AS caller_ip,
  protoPayload.methodName                         AS api_method,
  protoPayload.resourceName                       AS resource,
  protoPayload.status.code                        AS status_code,
  protoPayload.status.message                     AS status_message,
  -- VPC SC specific fields
  JSON_VALUE(protoPayload.metadata, '$.violationReason') AS violation_reason,
  JSON_VALUE(protoPayload.metadata, '$.accessLevels')    AS access_levels
FROM `my-project.log_exports.cloudaudit_googleapis_com_data_access`
WHERE
  timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 48 HOUR)
  AND JSON_VALUE(protoPayload.metadata, '$.dryRun') = 'true'  -- dry-run only
ORDER BY timestamp DESC

-- ── Categorize violations for perimeter tuning ────────────────────────────
SELECT
  protoPayload.authenticationInfo.principalEmail AS principal,
  protoPayload.methodName                         AS api_method,
  COUNT(*) AS violation_count,
  -- Determine what ingress/egress rule is needed
  CASE
    WHEN protoPayload.methodName LIKE 'google.cloud.bigquery%'
    THEN 'Add BigQuery ingress rule for this principal'
    WHEN protoPayload.methodName LIKE 'google.storage%'
    THEN 'Add GCS ingress or egress rule'
    WHEN protoPayload.methodName LIKE 'google.cloud.aiplatform%'
    THEN 'Add Vertex AI ingress rule'
    ELSE 'Investigate: ' || protoPayload.methodName
  END AS remediation_action
FROM `my-project.log_exports.cloudaudit_googleapis_com_data_access`
WHERE
  timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 72 HOUR)
  AND JSON_VALUE(protoPayload.metadata, '$.dryRun') = 'true'
GROUP BY principal, api_method
ORDER BY violation_count DESC

-- ── Check: are there any ENFORCED violations (after going live)? ──────────
-- This query should return zero rows in a healthy deployment.
-- Any rows = legitimate traffic being blocked = add an ingress/egress rule.
SELECT
  timestamp,
  protoPayload.authenticationInfo.principalEmail AS blocked_caller,
  protoPayload.methodName                         AS blocked_method,
  protoPayload.resourceName                       AS blocked_resource
FROM `my-project.log_exports.cloudaudit_googleapis_com_data_access`
WHERE
  timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 HOUR)
  AND protoPayload.status.code = 403
  AND JSON_VALUE(protoPayload.metadata, '$.dryRun') IS NULL   -- enforced, not dry-run
ORDER BY timestamp DESC
The last query is your production health check. Set it up as a scheduled BigQuery query that runs every 15 minutes and writes results to a monitoring table. If COUNT(*) > 0, a Cloud Function fires a Slack alert: "VPC SC is blocking legitimate traffic — investigate immediately." This is your Day 2 operations pattern for VPC SC: it either returns zero rows (healthy) or tells you exactly which principal, which method, and which resource is being blocked.