Kafka, Airflow & PySpark

This guide covers three pillars of modern data engineering individually — Apache Kafka for real-time event streaming, Apache Airflow for workflow orchestration, and PySpark for large-scale data transformation — then shows how they compose into a production pipeline stack.

Among them, this trio powers a huge proportion of real-world data platforms: Kafka handles ingestion and event routing, Airflow schedules and monitors batch/streaming jobs, and PySpark performs heavy transformations. Understanding each tool and their integration points is essential for any data engineering role.

Table of Contents

Part 1: Apache Kafka

Core Concepts

1. Topics, Partitions & Offsets

A Kafka topic is an append-only, immutable log of events. Topics are split into partitions — the unit of parallelism and data distribution. Each message within a partition gets a monotonically increasing offset. Ordering is guaranteed only within a single partition, not across partitions. When producing, keys determine partition assignment via a hash (default: murmur2). Choosing the right partition count is critical — too few limits consumer parallelism; too many increases broker metadata overhead and end-to-end latency.

2. Consumer Groups & Rebalancing

Consumers subscribe to topics as part of a consumer group. Kafka assigns each partition to exactly one consumer in the group, enabling horizontal scale-out. When consumers join or leave, a rebalance occurs. Older "eager" rebalancing stops all consumers during reassignment. Modern Kafka (2.4+) supports cooperative incremental rebalancing which only revokes the specific partitions that need to move, minimizing downtime. The new KIP-848 consumer group protocol (server-side assignment, GA in Kafka 3.7+) further reduces rebalance latency.

3. Exactly-Once Semantics (EOS)

Kafka supports three delivery guarantees: at-most-once, at-least-once, and exactly-once. EOS relies on two features: idempotent producers (deduplication via producer ID + sequence number) and transactions (atomic writes across multiple partitions). For consume-transform-produce patterns, use isolation.level=read_committed so consumers only see committed transaction data. EOS adds ~3-5% latency overhead and requires careful transactional.id management.

4. Replication & ISR (In-Sync Replicas)

Each partition has a leader and zero or more follower replicas. The ISR set contains followers that are caught up with the leader within replica.lag.time.max.ms. A produce is acknowledged based on acks: acks=0 (fire-and-forget), acks=1 (leader only), acks=all (all ISR members). Setting min.insync.replicas=2 with acks=all and replication.factor=3 is the standard production durability configuration.

5. Schema Registry & Schema Evolution

In production, a Schema Registry (e.g., Confluent Schema Registry) manages Avro, Protobuf, or JSON schemas for topics. It enforces compatibility rules — BACKWARD, FORWARD, FULL — preventing producers from publishing data that would break consumers. This decouples producer and consumer deployments. The schema ID is embedded in the first 5 bytes of each message (magic byte + 4-byte ID).

6. Log Compaction & Retention

Kafka supports two retention strategies: time/size-based deletion and log compaction. Compaction keeps only the latest value for each key, making the topic a materializable table. This is the foundation for Kafka as a "streaming database" — changelog topics in Kafka Streams use compaction. Compaction runs as a background thread and never removes the active segment, so recent duplicates may temporarily exist.

↑ Back to top

Industry Use Cases

1. Real-Time Event-Driven Microservices

E-commerce platforms use Kafka as the central nervous system between services. When a user places an order, the order service publishes to an orders topic. The inventory service, payment service, notification service, and analytics service each consume from this topic independently via their own consumer groups, enabling loose coupling and independent scaling.

2. Change Data Capture (CDC)

Debezium connectors capture row-level changes from databases (PostgreSQL, MySQL, MongoDB) and publish them to Kafka topics. Downstream consumers replicate data to search indices (Elasticsearch), caches (Redis), or data warehouses in near-real-time. This eliminates fragile batch ETL scripts and provides a reliable, ordered change stream.

3. Real-Time Fraud Detection

Financial services stream transaction events through Kafka. Kafka Streams or Flink applications window these events (e.g., 5-minute tumbling windows) to detect anomalous patterns — multiple high-value transactions from different locations, velocity checks — and publish alerts to a downstream topic that triggers blocking actions.

4. Log Aggregation & Observability

Organizations route application logs, metrics, and traces through Kafka before sinking them to Elasticsearch, Grafana Loki, or object storage. Kafka acts as a buffer that absorbs traffic spikes and decouples producers (applications) from consumers (observability stack), preventing backpressure from slowing services.

↑ Back to top

Code Examples

1. Idempotent Producer with Avro (Python — confluent-kafka)

from confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

# Schema Registry setup
sr_client = SchemaRegistryClient({"url": "http://schema-registry:8081"})
avro_schema = """
{
  "type": "record",
  "name": "OrderEvent",
  "fields": [
    {"name": "order_id", "type": "string"},
    {"name": "user_id",  "type": "string"},
    {"name": "amount",   "type": "double"},
    {"name": "ts",       "type": "long", "logicalType": "timestamp-millis"}
  ]
}
"""

serializer = AvroSerializer(sr_client, avro_schema)

# Idempotent producer config — ensures no duplicates on retry
producer = Producer({
    "bootstrap.servers": "broker:9092",
    "enable.idempotence": True,
    "acks": "all",
    "retries": 5,
    "max.in.flight.requests.per.connection": 5,
})

def send_order(order: dict):
    producer.produce(
        topic="orders",
        key=order["order_id"],
        value=serializer(order, SerializationContext("orders", MessageField.VALUE)),
        on_delivery=lambda err, msg: print(f"Delivered to {msg.partition()} @ {msg.offset()}") if not err else print(f"FAIL: {err}"),
    )
    producer.flush()

2. Consumer Group with Manual Offset Commit

from confluent_kafka import Consumer, KafkaError

consumer = Consumer({
    "bootstrap.servers": "broker:9092",
    "group.id": "order-processor",
    "auto.offset.reset": "earliest",
    "enable.auto.commit": False,  # manual commit for at-least-once
    "isolation.level": "read_committed",  # only read committed txn messages
})
consumer.subscribe(["orders"])

try:
    while True:
        msg = consumer.poll(timeout=1.0)
        if msg is None:
            continue
        if msg.error():
            if msg.error().code() == KafkaError._PARTITION_EOF:
                continue
            raise Exception(msg.error())

        # Process message — must be idempotent for at-least-once
        process_order(msg.value())

        # Commit only after successful processing
        consumer.commit(asynchronous=False)
finally:
    consumer.close()

3. Kafka Streams Topology — Windowed Aggregation (Java-style concept, Python pseudocode)

# Conceptual: Kafka Streams windowed count (Java API, shown as pseudocode)
# Counts events per user_id in 5-minute tumbling windows

stream = builder.stream("clickstream")  \
    .group_by_key()                        \
    .windowed_by(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5))) \
    .count(Materialized.as_("click-counts-store"))

# Output: KTable<Windowed<String>, Long>
# Can be forwarded to a topic or queried via interactive queries

4. Schema Registry — Avro Producer with Schema Evolution

from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer

# Schema Registry ensures backward-compatible evolution
schema_registry = SchemaRegistryClient({
    "url": "http://schema-registry:8081"
})

# Avro schema (v2 — added "loyalty_tier" field with default)
CUSTOMER_SCHEMA = """{
    "type": "record",
    "name": "Customer",
    "namespace": "com.example.events",
    "fields": [
        {"name": "customer_id", "type": "string"},
        {"name": "name", "type": "string"},
        {"name": "email", "type": "string"},
        {"name": "loyalty_tier", "type": "string", "default": "bronze"}
    ]
}"""

avro_serializer = AvroSerializer(
    schema_registry, CUSTOMER_SCHEMA,
    conf={"auto.register.schemas": True}
)

producer = SerializingProducer({
    "bootstrap.servers": "broker:9092",
    "key.serializer": lambda k, _: k.encode("utf-8"),
    "value.serializer": avro_serializer,
})

# Produce an event — Schema Registry validates compatibility
producer.produce(
    topic="customers",
    key="cust-1234",
    value={"customer_id": "cust-1234", "name": "Alice",
           "email": "alice@example.com", "loyalty_tier": "gold"},
)
producer.flush()

5. Dead Letter Queue (DLQ) Pattern

from confluent_kafka import Consumer, Producer
import json

consumer = Consumer({
    "bootstrap.servers": "broker:9092",
    "group.id": "order-processor",
    "enable.auto.commit": False,
})
consumer.subscribe(["orders"])

dlq_producer = Producer({"bootstrap.servers": "broker:9092"})

MAX_RETRIES = 3

while True:
    msg = consumer.poll(1.0)
    if msg is None or msg.error():
        continue

    for attempt in range(MAX_RETRIES):
        try:
            order = json.loads(msg.value())
            process_order(order)
            consumer.commit(asynchronous=False)
            break
        except Exception as e:
            if attempt == MAX_RETRIES - 1:
                # All retries exhausted — send to DLQ for manual review
                dlq_producer.produce(
                    topic="orders.dlq",
                    key=msg.key(),
                    value=msg.value(),
                    headers={
                        "error": str(e).encode(),
                        "original_topic": "orders".encode(),
                        "retry_count": str(MAX_RETRIES).encode(),
                    },
                )
                dlq_producer.flush()
                consumer.commit(asynchronous=False)
↑ Back to top

Comparison / When to Use

FeatureApache KafkaAWS KinesisGoogle Pub/SubApache Pulsar
OrderingPer-partitionPer-shardPer-key (ordering keys)Per-partition
RetentionConfigurable (days/forever via compaction)1–365 days7 days (31 with Lite)Configurable + tiered storage
ReplayFull offset-based replayShard iterator resetSeek to timestampFull offset replay
ThroughputMillions msg/sec (scales with partitions)1 MB/s per shardAuto-scalesMillions msg/sec
Exactly-onceNative (idempotent + txns)At-least-once (dedup in app)At-least-onceNative (txns)
Managed optionConfluent Cloud, MSK, AivenFully managed (AWS)Fully managed (GCP)StreamNative
EcosystemConnect, Streams, ksqlDB, Schema RegistryLambda integrationDataflow integrationFunctions, IO connectors
Best forHigh-throughput, replay, multi-consumerAWS-native simple streamingGCP-native pub/subMulti-tenant, geo-replication

Choose Kafka when: You need high-throughput event streaming with replay, exactly-once semantics, or a rich connector ecosystem. Choose Kinesis/Pub/Sub when: You're committed to one cloud and want zero ops overhead. Choose Pulsar when: You need native multi-tenancy or built-in tiered storage without Confluent.

↑ Back to top

Gotchas & Anti-patterns

  1. Too many partitions per topic. Each partition costs memory on brokers (index files, replica fetchers). A common mistake is setting 100+ partitions "just in case." Start with partitions = max(target_throughput / producer_throughput_per_partition, target_throughput / consumer_throughput_per_partition) and scale up.
  2. Large messages (>1 MB). Kafka is optimized for small messages. Sending images or files through Kafka causes broker memory pressure and slows replication. Instead, store the blob in S3/GCS and send only a reference (URI) in the Kafka message.
  3. Not handling rebalances gracefully. During a rebalance, offsets for revoked partitions must be committed before they're reassigned. Forgetting this causes duplicate processing. Use on_revoke callbacks to flush and commit.
  4. Relying on message timestamps for ordering. Timestamps can be producer-set (CreateTime) or broker-set (LogAppendTime). Neither guarantees global order. Only offsets within a partition provide ordering guarantees.
  5. Auto-commit with slow processing. If enable.auto.commit=true and your processing takes longer than auto.commit.interval.ms, offsets may be committed before processing finishes, causing data loss on failure. Prefer manual commits after processing.
↑ Back to top

Exercises

  1. Build a CDC pipeline. Set up a PostgreSQL database with a products table. Deploy Debezium PostgreSQL connector to capture inserts/updates to a Kafka topic. Write a consumer that materializes the latest state of each product into a local dictionary (simulating a cache). Verify that deleting a row in PostgreSQL produces a tombstone message (null value).
  2. Implement exactly-once consume-transform-produce. Create a Kafka producer that sends raw clickstream events (user_id, page, timestamp). Write a transactional consumer that reads these events, enriches them with a session ID (events within 30 min of last event from same user_id = same session), and produces enriched events to a new topic — all within a transaction.
  3. Partition strategy experiment. Create a topic with 6 partitions. Produce 10,000 messages: half with null keys (round-robin), half with keys (hashed). Write a consumer group with 3 consumers and observe how partitions are assigned. Measure throughput difference between the two patterns. Then add a 4th consumer and observe rebalancing behavior.
↑ Back to top

Quiz

Q1: What happens if you have more consumers in a consumer group than partitions in the subscribed topic?

The extra consumers sit idle. Kafka assigns at most one consumer per partition within a consumer group. If you have 8 consumers and 6 partitions, 2 consumers will not receive any messages. This is by design — partition count sets the upper bound on consumer parallelism.

Q2: Explain the difference between acks=1 and acks=all. When would you choose each?

acks=1: the leader writes the message to its local log and responds immediately without waiting for followers. This is faster but risks data loss if the leader crashes before replication. acks=all: the leader waits for all in-sync replicas (ISR) to acknowledge the write. Combined with min.insync.replicas=2, this ensures durability even if one broker fails. Choose acks=1 for low-latency, loss-tolerant use cases (metrics, logs). Choose acks=all for financial transactions, orders, or any data where loss is unacceptable.

Q3: How does log compaction differ from time-based retention?

Time-based retention deletes entire log segments older than retention.ms regardless of content. Log compaction retains the latest value for each key and removes older duplicates, keeping the topic as a compact key-value snapshot. Compaction never removes records from the active segment. A compacted topic can be used as a materialized view — consumers reading from offset 0 can rebuild the full current state. Time-based retention is for event streams; compaction is for changelog/state topics.

Q4: Why is Kafka not suitable as a traditional message queue for task distribution?

In a message queue (e.g., RabbitMQ), any consumer can pick up any message, and once acknowledged, it's removed. Kafka ties partitions to consumers 1:1 within a group, so you can't have fine-grained work-stealing. Messages persist until retention expires — they're not removed on consumption. Also, Kafka's rebalance on consumer failure is slower than queue-based redelivery. However, Kafka 3.7+ introduces share groups (KIP-932) that enable queue-like semantics where multiple consumers can process from the same partition.

Q5: What is the purpose of transactional.id in Kafka producers?

The transactional.id enables exactly-once semantics across producer restarts. When a producer starts with a transactional.id, the broker "fences" (disables) any previous producer with the same ID, preventing zombie instances from writing duplicate data. It ties together idempotent production and transaction coordination. Each instance in a consume-transform-produce loop should have a unique, stable transactional.id (often derived from input partition assignment).

↑ Back to top

Further Reading

↑ Back to top

Part 2: Apache Airflow

Core Concepts

1. DAG Design & the TaskFlow API

In Airflow 2.x+, the TaskFlow API (@task decorator) replaces verbose operator instantiation with Pythonic functions. Tasks decorated with @task automatically handle XCom serialization — the return value of one task is passed as an argument to the next. This eliminates manual xcom_push/pull calls. DAGs should be treated as configuration, not business logic: keep the DAG file thin and push heavy computation into the task's execute() or delegated services (Spark, SQL engines).

2. Dynamic Task Mapping (expand)

Airflow 2.3+ supports dynamic task mapping via .expand(). Instead of hard-coding a fixed number of parallel tasks, you can fan-out at runtime based on the output of an upstream task. This is essential for ETL patterns where the number of files, partitions, or API pages is not known until execution. Under the hood, mapped tasks share the same operator definition but each gets a unique map_index. Combined with .partial() for static params, this replaces most SubDAG and TaskGroup-based fan-out hacks.

3. Executors: Celery, Kubernetes, Local

The executor determines how tasks run. LocalExecutor runs tasks as subprocesses on the scheduler node — fine for light workloads. CeleryExecutor distributes tasks across a pool of persistent workers via a message broker (Redis/RabbitMQ) — good for steady, large workloads. KubernetesExecutor spins up a new pod per task — optimal for heterogeneous resource requirements and auto-scaling, but with higher task startup latency (~10-30s). Choose based on scale, isolation needs, and infrastructure.

4. Sensors, Deferrable Operators & Triggers

Sensors poll for an external condition (file exists, partition available). Classic sensors occupy a worker slot while waiting, which wastes resources. Airflow 2.2+ introduced deferrable operators (async) — they suspend the task, free the worker slot, and register a lightweight trigger that runs in the triggerer process. When the condition is met, the trigger resumes the task. This can reduce worker slot consumption by 10x for sensor-heavy DAGs.

5. Connections, Variables & Secrets Backends

Airflow stores credentials in Connections (host, login, password, extras JSON) and configuration in Variables. For production, use a secrets backend (AWS Secrets Manager, HashiCorp Vault, GCP Secret Manager) rather than the metadata DB. Critical best practice: never call Variable.get() at DAG parse-time (top-level code) — it triggers a DB query every scheduler heartbeat. Use Jinja templates ({{ var.value.my_var }}) or access variables inside task callables.

6. Datasets & Data-Aware Scheduling

Airflow 2.4+ supports datasets as first-class scheduling triggers. A producing DAG declares outlets=[Dataset("s3://bucket/table")] on a task. A consuming DAG uses schedule=[Dataset("s3://bucket/table")] so it runs when the upstream task completes. This creates event-driven, cross-DAG dependencies without explicit TriggerDagRunOperator calls. Datasets are logical URIs — Airflow doesn't validate the URI target, so naming conventions matter.

↑ Back to top

Industry Use Cases

1. Daily Warehouse Refresh (ELT)

A retailer runs a nightly DAG that: (1) extracts incremental data from PostgreSQL and Shopify APIs, (2) loads raw data to S3/GCS, (3) triggers dbt transformations in Snowflake/BigQuery, (4) runs data quality checks (Great Expectations or dbt tests), (5) sends a Slack notification. Airflow orchestrates this multi-step pipeline, handling retries, alerting on failure, and ensuring tasks run in the correct order.

2. ML Feature Pipeline Orchestration

A fintech company uses Airflow to orchestrate feature computation for fraud detection models. A daily DAG: (1) pulls transaction data from Kafka-backed tables, (2) submits PySpark jobs to compute rolling aggregates (e.g., 7-day spend per merchant category), (3) writes features to an online feature store (Feast/Tecton), (4) triggers model retraining via an MLflow API call. Dynamic task mapping fans out PySpark jobs across feature groups in parallel.

3. Multi-Cloud Data Sync

An enterprise with data in AWS (S3, Redshift) and Azure (ADLS, Synapse) uses Airflow to synchronize reference data bidirectionally. Airflow's provider ecosystem offers operators for both clouds, and the DAG handles schema drift detection, incremental sync, and reconciliation checks. Connection management via secrets backends keeps credentials cloud-specific.

↑ Back to top

Code Examples

1. TaskFlow API with Dynamic Task Mapping

import pendulum
from airflow.sdk import DAG, task

with DAG(
    dag_id="dynamic_elt_pipeline",
    start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
    schedule="@daily",
    catchup=False,
    tags=["elt", "dynamic"],
) as dag:

    @task()
    def list_source_tables() -> list[str]:
        """Discover tables to extract — could query a metadata DB."""
        return ["orders", "customers", "products", "inventory"]

    @task()
    def extract_and_load(table_name: str, ds=None):
        """Extract from source DB, load to S3 as Parquet."""
        from airflow.hooks.base import BaseHook
        conn = BaseHook.get_connection("source_postgres")
        # ... extract logic using conn, partition by ds ...
        print(f"Extracted {table_name} for {ds}")

    @task()
    def run_dbt_models():
        """Trigger dbt run via BashOperator or dbt Cloud API."""
        import subprocess
        subprocess.run(["dbt", "run", "--select", "tag:daily"], check=True)

    tables = list_source_tables()
    # Dynamic fan-out: one task per table, determined at runtime
    loaded = extract_and_load.expand(table_name=tables)
    loaded >> run_dbt_models()

2. Deferrable Sensor — Wait for S3 File

from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor

wait_for_export = S3KeySensor(
    task_id="wait_for_export_file",
    bucket_name="data-lake-raw",
    bucket_key="exports/{{ ds }}/transactions.parquet",
    aws_conn_id="aws_default",
    deferrable=True,       # frees worker slot while waiting
    poke_interval=60,      # check every 60s (in triggerer, not worker)
    timeout=3600,          # fail after 1 hour
)

3. Dataset-Driven Cross-DAG Dependency

from airflow.sdk import DAG, Dataset, task
import pendulum

ORDERS_DATASET = Dataset("s3://warehouse/silver/orders")

# Producer DAG — declares outlet
with DAG(dag_id="ingest_orders", schedule="@daily",
         start_date=pendulum.datetime(2024, 1, 1)):

    @task(outlets=[ORDERS_DATASET])
    def load_orders():
        # ... load logic ...
        pass

    load_orders()

# Consumer DAG — triggers when dataset is updated
with DAG(dag_id="transform_orders", schedule=[ORDERS_DATASET],
         start_date=pendulum.datetime(2024, 1, 1)):

    @task()
    def build_order_metrics():
        # runs automatically after ingest_orders completes
        pass

    build_order_metrics()
↑ Back to top

Comparison / When to Use

FeatureApache AirflowPrefectDagsterDatabricks Workflows
SchedulingCron, timetables, datasetsCron, event-drivenCron, sensors, assetsCron, file arrival triggers
ParadigmDAG of tasks (operators)Flows & tasks (Python-native)Assets & ops (data-aware)Jobs & tasks (Spark-centric)
Dynamic tasksexpand() in 2.3+Native (map)Native (dynamic partitions)For-each tasks
Executor modelCelery, K8s, LocalAgents, Work PoolsExecutors (K8s, etc.)Databricks clusters
UIGrid, Graph, GanttCloud UI (free tier)Dagit (asset lineage)Databricks Workflows UI
Ecosystem80+ provider packagesGrowing integrationsGrowing integrationsDatabricks-native (Spark, Delta)
Learning curveModerate (many concepts)Low (Pythonic)Moderate (asset model)Low if already on Databricks
Best forComplex multi-system orchestrationPython-native workflowsData asset-centric lineageSpark-centric workloads

Choose Airflow when: You need a battle-tested orchestrator with a massive provider ecosystem, complex dependency patterns, and your team is comfortable with its operational overhead. Choose Prefect/Dagster when: You want a more Pythonic developer experience or asset-based lineage. Choose Databricks Workflows when: Your entire workload is Spark/Delta and you're already on the Databricks platform.

↑ Back to top

Gotchas & Anti-patterns

  1. Heavy computation at DAG parse time. Any code outside operator execute() methods runs every time the scheduler parses the DAG file (every ~30s). Importing heavy libraries (pandas, torch) or calling APIs/DB at the top level tanks scheduler performance. Move imports inside task callables.
  2. Abusing XComs for large data. XComs are stored in the metadata DB (default). Pushing DataFrames or large JSON blobs through XCom bloats the DB and hits serialization limits. XCom should carry small metadata (file paths, row counts, status flags). For large data, use object storage (S3, GCS) and pass the URI via XCom.
  3. Not making tasks idempotent. Tasks can be retried or manually re-run. If your task does INSERT INTO without deduplication, retries create duplicates. Always use UPSERT/MERGE, partition-based overwrites, or write to temp locations with atomic rename.
  4. Overly complex DAGs with 500+ tasks. The scheduler performance degrades with DAG complexity. Prefer multiple smaller DAGs connected via Datasets or TriggerDagRunOperator over a single monolithic DAG. Dynamic task mapping can also inadvertently create thousands of tasks — add safeguards.
  5. Ignoring time zones and data_interval_start. Airflow 2.2+ shifted from execution_date to data_interval_start/end. Many legacy patterns still reference execution_date, which is confusing (it's the start of the interval, not when the DAG actually runs). Always use data_interval_start for partitioning logic.
↑ Back to top

Exercises

  1. Build a dynamic ELT DAG. Create a DAG that: (1) queries a metadata table to discover which source tables need syncing, (2) dynamically maps an extract-load task per table using .expand(), (3) after all loads complete, triggers a dbt run. Add a sensor that waits for a daily flag file before starting.
  2. Implement cross-DAG dataset dependencies. Create three DAGs: ingest_raw (produces Dataset A), transform_silver (triggered by Dataset A, produces Dataset B), and build_gold (triggered by Dataset B). Verify in the Airflow UI that the Dataset graph shows the correct dependency chain.
  3. Convert a sensor-heavy DAG to deferrable. Take a DAG with 3 S3KeySensors that block worker slots for up to 30 minutes each. Convert them to deferrable mode. Compare worker slot utilization before and after using the Airflow pool metrics.
↑ Back to top

Quiz

Q1: What is the difference between execution_date and data_interval_start in Airflow 2.x?

They are the same value — execution_date is the legacy name, and data_interval_start is the modern name introduced in Airflow 2.2. Both represent the start of the data interval the DAG run covers. The actual wall-clock time the DAG runs is data_interval_end (or slightly after). For a daily DAG scheduled for 2024-01-15, data_interval_start = 2024-01-15T00:00:00 and data_interval_end = 2024-01-16T00:00:00, but the DAG actually executes after midnight on the 16th.

Q2: Why should you avoid calling Variable.get() at the top level of a DAG file?

The Airflow scheduler re-parses DAG files at a regular interval (every ~30s by default). Top-level Variable.get() triggers a database query on every parse, which hammers the metadata DB and slows down scheduling for all DAGs. Instead, use Jinja templates ({{ var.value.my_var }}) which are resolved only at task execution time, or call Variable.get() inside a task callable.

Q3: How does dynamic task mapping (.expand()) differ from a SubDAG?

SubDAGs are deprecated. They created a separate DAG run with its own scheduler budget, leading to deadlocks and performance issues. Dynamic task mapping creates tasks within the same DAG run — they share the same pool, scheduling, and UI. Mapped tasks are determined at runtime (based on upstream output), while SubDAGs were defined at parse time. Mapped tasks are first-class: they appear in the UI grid with individual status, logs, and retry controls.

Q4: Explain the purpose of deferrable operators and the triggerer component.

Deferrable operators allow a task to suspend execution, free its worker slot, and register an async trigger (a lightweight Python coroutine). The triggerer is a separate Airflow process that runs these triggers using asyncio. When a trigger fires (e.g., a file appears in S3), the triggerer notifies the scheduler, which resumes the task on a worker. This architecture is critical for sensor-heavy DAGs where tasks spend most of their time waiting.

Q5: What scheduling model does Airflow use and why is it not suitable for sub-second scheduling?

Airflow uses a scheduler loop that periodically parses DAG files and evaluates task dependencies. The scheduler heartbeat is typically 1-5 seconds, and task execution involves worker assignment, potential pod/container creation (K8s executor), and metadata DB state transitions. This overhead makes Airflow suitable for batch orchestration (minutes to hours) but not for sub-second, real-time triggering. For sub-second needs, use a streaming framework (Kafka Streams, Flink) and use Airflow only to orchestrate the deployment/lifecycle of those streaming jobs.

↑ Back to top

Further Reading

↑ Back to top

Part 3: PySpark

Core Concepts

1. Catalyst Optimizer & Tungsten Execution

PySpark's performance relies on the Catalyst optimizer, which transforms logical plans through rule-based and cost-based optimization (predicate pushdown, column pruning, join reordering). The optimized logical plan is converted to physical plan stages. Tungsten provides off-heap memory management and whole-stage code generation (WSCG), compiling pipeline stages into optimized Java bytecode. Understanding the query plan (df.explain(True)) is critical for diagnosing performance issues — look for BroadcastHashJoin vs. SortMergeJoin, filter placement, and exchange (shuffle) nodes.

2. Partitioning, Shuffles & the Shuffle Problem

Data in Spark is divided into partitions — the unit of parallelism. Transformations like groupBy, join, and repartition trigger a shuffle — data is serialized, written to disk, and redistributed across executors via the network. Shuffles are the most expensive operation in Spark. To minimize them: (1) use broadcast() for small dimension tables, (2) pre-partition data on common join keys, (3) use coalesce() instead of repartition() to reduce partitions, (4) leverage bucketing for repeated joins. Target partition sizes of 100-200 MB for optimal throughput.

3. Broadcast Joins vs. Sort-Merge Joins

When one side of a join fits in memory (default threshold: spark.sql.autoBroadcastJoinThreshold = 10 MB), Spark broadcasts the small table to all executors, avoiding a shuffle on the large table. This turns an O(n log n) sort-merge into an O(n) hash lookup. For dimensions up to ~1-2 GB, manually hint with F.broadcast(small_df). Sort-merge join is the default for two large tables — both sides are sorted on the join key and merged. If you're joining the same large tables repeatedly, consider bucketing them by the join key to eliminate the sort phase.

4. Adaptive Query Execution (AQE)

AQE (enabled by default since Spark 3.2) re-optimizes the query plan at runtime based on actual data statistics from completed map stages. Key features: (1) coalescing post-shuffle partitions — merges small partitions to reduce task overhead, (2) converting sort-merge join to broadcast join when runtime statistics show one side is small, (3) skew join optimization — splits skewed partitions into smaller ones. AQE significantly reduces the need for manual tuning. Check it with spark.conf.set("spark.sql.adaptive.enabled", "true").

5. Structured Streaming

PySpark's Structured Streaming uses the same DataFrame API for batch and stream processing. Key concepts: triggers (how often to process — processingTime, availableNow, continuous), output modes (append, complete, update), and checkpointing (stores offsets and state to durable storage for fault tolerance). The availableNow trigger (Spark 3.3+) processes all available data then stops — perfect for Airflow-triggered micro-batch patterns.

6. Memory Management & Spill

Spark's unified memory model splits executor memory into execution memory (shuffles, joins, aggregations) and storage memory (cached data). They can borrow from each other. When memory is exhausted, Spark spills to disk — this is slow but prevents OOM. Common OOM causes: (1) large broadcast variables, (2) skewed partitions, (3) collecting large datasets to the driver (collect()). Monitor via Spark UI's Storage and Executor tabs. Set spark.executor.memory judiciously — overprovisioning wastes cluster resources; underprovisioning causes excessive spill.

↑ Back to top

Industry Use Cases

1. Large-Scale ETL (Petabyte Data Lakes)

A media streaming company processes 500 TB of daily event logs (plays, pauses, searches). PySpark reads Parquet files from S3, applies sessionization (grouping events by user within 30-min gaps), computes aggregates (daily active users, content popularity), and writes output to Delta Lake tables partitioned by date. Spark's distributed execution makes this feasible; a single-node tool would take days.

2. Data Quality & Validation at Scale

An insurance company uses PySpark to validate 200M+ claim records against business rules: coverage dates must precede claim dates, amounts must be positive, policy IDs must exist in a reference table. PySpark enables these checks as distributed joins and filters, outputting a quality report and quarantining bad records — all within 15 minutes.

3. Feature Engineering for ML Pipelines

A ride-sharing platform computes features for demand prediction: rolling 1-hour ride counts per geo-hex, driver supply ratios, weather-joined features. PySpark window functions (F.window) and geo-spatial UDFs process billions of ride events. Output features are written to a feature store (Feast) or Hive tables consumed by training jobs.

↑ Back to top

Code Examples

1. Incremental Load with Partition Pruning

from pyspark.sql import SparkSession, functions as F

spark = SparkSession.builder \
    .appName("incremental_load") \
    .config("spark.sql.adaptive.enabled", "true") \
    .getOrCreate()

# Read only the partitions we need — predicate pushdown to Parquet
date_filter = "2024-06-15"
raw_df = spark.read.parquet("s3a://data-lake/raw/events/") \
    .where(F.col("event_date") == date_filter)

# Deduplicate on event_id (at-least-once upstream guarantee)
deduped_df = raw_df.dropDuplicates(["event_id"])

# Transform: add derived columns
enriched_df = deduped_df.withColumn(
    "event_hour", F.hour("event_timestamp")
).withColumn(
    "is_weekend", F.dayofweek("event_timestamp").isin([1, 7])
)

# Write to Delta Lake with MERGE-style overwrite per partition
enriched_df.write \
    .format("delta") \
    .mode("overwrite") \
    .option("replaceWhere", f"event_date = '{date_filter}'") \
    .partitionBy("event_date") \
    .save("s3a://data-lake/silver/events/")

2. Skew-Resistant Join with Salting

from pyspark.sql import functions as F

# Problem: joining on user_id but 1% of users generate 50% of events
SALT_BUCKETS = 10

# Salt the large (skewed) side
events_salted = events_df.withColumn(
    "salt", (F.rand() * SALT_BUCKETS).cast("int")
).withColumn(
    "salted_key", F.concat(F.col("user_id"), F.lit("_"), F.col("salt"))
)

# Explode the small (dimension) side to match all salt values
users_exploded = users_df.crossJoin(
    spark.range(0, SALT_BUCKETS).withColumnRenamed("id", "salt")
).withColumn(
    "salted_key", F.concat(F.col("user_id"), F.lit("_"), F.col("salt"))
)

# Now join on salted_key — evenly distributed
joined_df = events_salted.join(
    users_exploded, on="salted_key", how="inner"
).drop("salt", "salted_key")

3. Window Functions for Sessionization

from pyspark.sql import Window, functions as F

SESSION_GAP = 1800  # 30 minutes in seconds

window_spec = Window.partitionBy("user_id").orderBy("event_ts")

sessionized = events_df \
    .withColumn("prev_ts", F.lag("event_ts").over(window_spec)) \
    .withColumn("gap_seconds",
        F.unix_timestamp("event_ts") - F.unix_timestamp("prev_ts")
    ) \
    .withColumn("new_session",
        F.when(
            (F.col("gap_seconds") > SESSION_GAP) | F.col("prev_ts").isNull(), 1
        ).otherwise(0)
    ) \
    .withColumn("session_id",
        F.concat(
            F.col("user_id"), F.lit("_"),
            F.sum("new_session").over(window_spec)
        )
    )

4. Broadcast Join with Dimension Table

from pyspark.sql import functions as F

# Small dimension table (~50 MB) — broadcast to avoid shuffle
product_dim = spark.read.parquet("s3a://warehouse/dim_products/")

# Large fact table (~500 GB)
sales_fact = spark.read.parquet("s3a://warehouse/fact_sales/") \
    .where(F.col("sale_date") >= "2024-01-01")

# Explicit broadcast hint — overrides autoBroadcastJoinThreshold
result = sales_fact.join(
    F.broadcast(product_dim),
    on="product_id",
    how="left"
).groupBy("category", "sale_date") \
 .agg(
    F.sum("revenue").alias("total_revenue"),
    F.countDistinct("order_id").alias("order_count")
)

5. Pandas UDF — Vectorized Text Processing

from pyspark.sql import functions as F
from pyspark.sql.types import StringType, ArrayType
import pandas as pd

# BAD: Row-level Python UDF (10-100x slower)
# @udf(StringType())
# def clean_text(text):
#     return text.lower().strip() if text else None

# GOOD: Pandas UDF — operates on Arrow batches (vectorized, near-native speed)
@F.pandas_udf(StringType())
def clean_text(series: pd.Series) -> pd.Series:
    """Vectorized text cleaning — runs on entire column batch."""
    return series.str.lower().str.strip().str.replace(r'[^\w\s]', '', regex=True)

# Apply: Spark transfers data via Arrow → Pandas Series → Arrow
cleaned_df = raw_df.withColumn(
    "clean_description", clean_text(F.col("product_description"))
)

# Grouped Map Pandas UDF — apply a function per group
@F.pandas_udf("user_id string, event_count long, avg_duration double",
             F.PandasUDFType.GROUPED_MAP)
def compute_user_stats(pdf: pd.DataFrame) -> pd.DataFrame:
    """Compute per-user statistics using full Pandas power."""
    return pd.DataFrame({
        "user_id": [pdf["user_id"].iloc[0]],
        "event_count": [len(pdf)],
        "avg_duration": [pdf["duration_seconds"].mean()],
    })

user_stats = events_df.groupBy("user_id").apply(compute_user_stats)
↑ Back to top

Comparison / When to Use

FeaturePySparkPandas / Polarsdbt (SQL)Apache Flink
ScaleTB–PB (distributed)GB (single-node)Depends on warehouseTB–PB (distributed)
ParadigmDataFrame + SQLDataFrameSQL modelsDataStream + Table API
LatencySeconds–minutes (batch/micro-batch)Milliseconds (local)Depends on warehouseMilliseconds (true streaming)
StreamingStructured Streaming (micro-batch)NoNoNative event-time streaming
LanguagePython, Scala, SQLPython, Rust (Polars)SQL + JinjaJava, Scala, Python, SQL
EcosystemMLlib, GraphX, Delta Lakescikit-learn, NumPydbt packages, exposuresCDC connectors, Iceberg
Best forLarge-scale batch ETL, feature eng.Small data, prototypingWarehouse transformationsTrue low-latency streaming

Choose PySpark when: Data exceeds single-node memory, you need distributed shuffles/joins, or you're already on a Spark platform (Databricks, EMR). Choose Pandas/Polars when: Data fits in memory and you want simpler development. Choose dbt when: Transformations are pure SQL against a warehouse. Choose Flink when: You need true event-time streaming with millisecond latency.

↑ Back to top

Gotchas & Anti-patterns

  1. Using collect() on large DataFrames. collect() pulls all data to the driver — instant OOM on large datasets. Use take(n), show(), or write to storage. If you need a count, use df.count() (distributed), not len(df.collect()).
  2. Python UDFs killing performance. Row-level Python UDFs serialize each row from JVM to Python and back — 10-100x slower than native Spark functions. Always prefer built-in pyspark.sql.functions. If you must use UDFs, use Pandas UDFs (@pandas_udf) which operate on Arrow batches with near-native speed.
  3. Too many small files (small file problem). Writing partitioned data with too many partitions creates thousands of tiny Parquet files. This destroys read performance (high per-file overhead, slow S3 listing). Use coalesce() before write, or leverage Delta Lake's OPTIMIZE (auto-compaction) to merge small files.
  4. Ignoring data skew in joins. A groupBy or join on a key with skewed distribution (e.g., null keys, hot users) creates some tasks that process 100x more data than others. The Spark UI's "Task Duration" histogram reveals skew. Fix with AQE skew join hints, salting, or isolating skewed keys.
  5. Caching everything. df.cache() stores data in memory, but memory is limited. Caching a DataFrame you only use once wastes memory and can evict more valuable caches. Cache only when a DataFrame is reused multiple times in the same job. Always unpersist() when done.
↑ Back to top

Exercises

  1. Optimize a slow join. Generate two DataFrames: big_events (10M rows, join key = user_id with 1% of values being a single "hot" user) and user_dim (100K rows). Join them naively. Use explain(True) to confirm it's a SortMergeJoin. Then: (a) apply a broadcast hint for user_dim, (b) verify the plan shows BroadcastHashJoin, (c) measure execution time difference, (d) implement salting for the hot-key scenario and compare.
  2. Build a sessionization pipeline. Create synthetic clickstream data (user_id, page, timestamp). Implement sessionization using window functions — sessions break after a 30-minute gap. Output: user_id, session_id, session_start, session_end, page_count, session_duration_seconds. Write results partitioned by date.
  3. Incremental micro-batch with Structured Streaming. Set up a local file source (CSV or JSON files arriving in a directory). Write a Structured Streaming job that reads new files, deduplicates by event_id using dropDuplicates with watermark, and writes output to a Parquet sink with checkpointing. Use trigger(availableNow=True) mode to simulate Airflow-triggered micro-batches.
↑ Back to top

Quiz

Q1: What is the difference between repartition(n) and coalesce(n)?

repartition(n) performs a full shuffle to create exactly n partitions — all data is redistributed. It can increase or decrease partition count. coalesce(n) reduces partition count without a full shuffle by combining adjacent partitions. It can only decrease the count and results in unevenly sized partitions. Use coalesce when reducing partitions (e.g., before writing) and repartition when you need evenly distributed data or are increasing partition count.

Q2: Explain Adaptive Query Execution (AQE) and name three optimizations it performs.

AQE re-optimizes the query plan at stage boundaries using runtime statistics from completed map stages. Three key optimizations: (1) Coalescing post-shuffle partitions — merges small partitions to avoid task overhead (reduces 200 tasks of 1 MB each to 10 tasks of 20 MB). (2) Converting sort-merge join to broadcast join — if a map output is smaller than expected, AQE switches to broadcast. (3) Skew join optimization — detects partition skew and splits oversized partitions into sub-partitions for parallel processing.

Q3: Why are Python UDFs slow in PySpark, and what is the alternative?

Python UDFs require row-by-row serialization between the JVM (Spark) and a Python process via a socket. Each row is serialized with pickle, sent to Python, processed, and sent back. This serialization overhead makes them 10-100x slower than native Spark functions. The alternative is Pandas UDFs (@pandas_udf) which use Apache Arrow for columnar transfer — entire batches of rows are transferred efficiently, and the UDF operates on a pandas.Series, leveraging vectorized NumPy operations.

Q4: What causes the "small file problem" and how do you fix it?

Writing DataFrames with many partitions creates one file per partition per writer. A 1000-partition DataFrame written partitioned by date can create 1000 tiny files per date partition. This slows reads (high per-file overhead, especially on object storage with costly list operations). Fixes: (1) coalesce(N) before write where N targets 100-200 MB per file, (2) repartition(column) to consolidate data, (3) Delta Lake OPTIMIZE which compacts small files into larger ones, (4) Spark configuration spark.sql.files.maxRecordsPerFile to control row count per file.

Q5: When would you use trigger(availableNow=True) in Structured Streaming?

availableNow (Spark 3.3+) processes all available data across potentially multiple batches, then terminates the query. It's ideal for micro-batch patterns where an external scheduler (like Airflow) triggers the streaming job on a schedule. Unlike trigger(once=True) which processes all data in a single batch (risking OOM), availableNow respects maxFilesPerTrigger and processes data in manageable increments while still draining everything available. This gives you streaming's exactly-once semantics with batch-schedule control.

↑ Back to top

Further Reading

↑ Back to top

Part 4: The Stack — End-to-End Pipeline

Architecture Overview

The Kafka → Airflow → PySpark pipeline is the backbone of many modern data platforms. Kafka handles real-time ingestion and event routing. Airflow orchestrates scheduled batch/micro-batch jobs. PySpark performs the heavy transformations. Together, they implement a lambda-like architecture (streaming + batch) or, more commonly today, a micro-batch architecture where Airflow triggers PySpark Structured Streaming jobs to process accumulated Kafka data.

Architecture Diagram (ASCII)


┌──────────────────────────────────────────────────────────────────────────┐
│                        DATA SOURCES                                     │
│  ┌─────────┐  ┌─────────┐  ┌─────────┐  ┌─────────────┐               │
│  │ App APIs │  │   IoT   │  │ CDC     │  │ Third-Party │               │
│  └────┬─────┘  └────┬────┘  └────┬────┘  └──────┬──────┘               │
└───────┼──────────────┼───────────┼───────────────┼──────────────────────┘
        │              │           │               │
        ▼              ▼           ▼               ▼
┌──────────────────────────────────────────────────────────────────────────┐
│                    APACHE KAFKA CLUSTER                                  │
│  ┌─────────────┐  ┌──────────────┐  ┌───────────────┐                  │
│  │ raw.orders  │  │ raw.events   │  │ cdc.products  │  ...             │
│  │  (topic)    │  │  (topic)     │  │  (topic)      │                  │
│  └──────┬──────┘  └──────┬───────┘  └───────┬───────┘                  │
│         │  Schema Registry manages schemas   │                          │
└─────────┼────────────────┼───────────────────┼──────────────────────────┘
          │                │                   │
          ▼                ▼                   ▼
┌──────────────────────────────────────────────────────────────────────────┐
│                   APACHE AIRFLOW (Orchestrator)                          │
│                                                                          │
│  ┌────────────────────────────────────────────────────────────────┐      │
│  │  DAG: daily_pipeline (schedule: @daily)                       │      │
│  │                                                                │      │
│  │  [sensor]          [spark_submit]        [spark_submit]       │      │
│  │  wait_for_data ──▶ bronze_to_silver ──▶ silver_to_gold       │      │
│  │                                                    │          │      │
│  │                                            [quality_check]    │      │
│  │                                                    │          │      │
│  │                                             [notify_slack]    │      │
│  └────────────────────────────────────────────────────────────────┘      │
└──────────────────────────────────────────────────────────────────────────┘
          │                │                   │
          ▼                ▼                   ▼
┌──────────────────────────────────────────────────────────────────────────┐
│                  PYSPARK / DATABRICKS CLUSTER                            │
│                                                                          │
│  ┌────────────┐    ┌──────────────┐    ┌──────────────┐                 │
│  │  Bronze    │    │   Silver     │    │    Gold      │                 │
│  │ (raw from  │───▶│ (cleaned,    │───▶│ (aggregated, │                 │
│  │  Kafka)    │    │  deduped)    │    │  modeled)    │                 │
│  └────────────┘    └──────────────┘    └──────┬───────┘                 │
│                                                │                         │
│              Delta Lake / Iceberg Tables        │                         │
└─────────────────────────────────────────────────┼────────────────────────┘
                                                  │
                                                  ▼
┌──────────────────────────────────────────────────────────────────────────┐
│                     CONSUMPTION LAYER                                    │
│  ┌───────────┐  ┌──────────┐  ┌────────────┐  ┌────────────┐           │
│  │   BI /    │  │  ML      │  │  Reverse   │  │  Real-time │           │
│  │ Dashboards│  │ Training │  │  ETL       │  │  APIs      │           │
│  └───────────┘  └──────────┘  └────────────┘  └────────────┘           │
└──────────────────────────────────────────────────────────────────────────┘

Data flow: Sources produce events to Kafka topics (with Avro schemas). Airflow runs a daily DAG that (1) verifies data availability in Kafka (sensor), (2) submits a PySpark job to read from Kafka, deduplicate, and write to a Bronze Delta table, (3) submits a second PySpark job to clean, validate, and write to Silver, (4) submits a third job to aggregate into Gold dimension/fact tables, (5) runs quality checks, (6) notifies the team.

↑ Back to top

Combined Code Example

Airflow DAG: Orchestrating the End-to-End Pipeline

import pendulum
from airflow.sdk import DAG, task
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.slack.notifications.slack import send_slack_notification

with DAG(
    dag_id="kafka_spark_pipeline",
    start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
    schedule="@daily",
    catchup=False,
    default_args={"retries": 2, "retry_delay": pendulum.duration(minutes=5)},
    on_failure_callback=[send_slack_notification(
        text="Pipeline FAILED: {{ dag.dag_id }} on {{ ds }}",
        channel="#data-alerts",
        slack_conn_id="slack_default",
    )],
) as dag:

    @task()
    def check_kafka_lag(ds=None):
        """Verify Kafka consumer lag is within acceptable bounds."""
        from confluent_kafka.admin import AdminClient
        admin = AdminClient({"bootstrap.servers": "broker:9092"})
        # Check consumer group lag for the spark consumer group
        # Raise AirflowException if lag exceeds threshold
        print(f"Kafka lag check passed for {ds}")

    bronze_job = SparkSubmitOperator(
        task_id="kafka_to_bronze",
        application="s3://spark-jobs/kafka_to_bronze.py",
        conn_id="spark_default",
        application_args=["--date", "{{ ds }}"],
        conf={
            "spark.sql.adaptive.enabled": "true",
            "spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,io.delta:delta-spark_2.12:3.1.0",
        },
        executor_memory="4g",
        executor_cores=4,
        num_executors=8,
    )

    silver_job = SparkSubmitOperator(
        task_id="bronze_to_silver",
        application="s3://spark-jobs/bronze_to_silver.py",
        conn_id="spark_default",
        application_args=["--date", "{{ ds }}"],
        conf={"spark.sql.adaptive.enabled": "true"},
        executor_memory="8g",
        executor_cores=4,
        num_executors=12,
    )

    gold_job = SparkSubmitOperator(
        task_id="silver_to_gold",
        application="s3://spark-jobs/silver_to_gold.py",
        conn_id="spark_default",
        application_args=["--date", "{{ ds }}"],
    )

    @task()
    def run_quality_checks(ds=None):
        """Run Great Expectations suite against Gold tables."""
        import great_expectations as gx
        context = gx.get_context()
        result = context.run_checkpoint(checkpoint_name="gold_daily_check")
        if not result.success:
            raise Exception("Data quality checks failed")

    check_kafka_lag() >> bronze_job >> silver_job >> gold_job >> run_quality_checks()

PySpark Job: kafka_to_bronze.py (Reading from Kafka)

import sys
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.avro.functions import from_avro

spark = SparkSession.builder.appName("kafka_to_bronze").getOrCreate()
run_date = sys.argv[sys.argv.index("--date") + 1]

# Read from Kafka — Structured Streaming with availableNow trigger
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "broker:9092") \
    .option("subscribe", "raw.orders,raw.events") \
    .option("startingOffsets", "earliest") \
    .option("maxOffsetsPerTrigger", 1000000) \
    .load()

# Parse Kafka message: key, value (Avro), topic, partition, offset, timestamp
parsed_df = kafka_df.select(
    F.col("topic"),
    F.col("partition"),
    F.col("offset"),
    F.col("timestamp").alias("kafka_timestamp"),
    F.col("key").cast("string").alias("message_key"),
    F.col("value").cast("string").alias("message_value"),
    F.lit(run_date).alias("load_date"),
)

# Write to Delta Bronze table — streaming with checkpointing
query = parsed_df.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", f"s3a://checkpoints/bronze/{run_date}") \
    .option("mergeSchema", "true") \
    .trigger(availableNow=True) \
    .partitionBy("topic", "load_date") \
    .start("s3a://data-lake/bronze/kafka_events/")

query.awaitTermination()
spark.stop()
↑ Back to top

Integration Gotchas

  1. Checkpoint location reuse across dates. When using Structured Streaming with availableNow in an Airflow-triggered pattern, the checkpoint location must be stable (same path for the same logical job). Creating a new checkpoint per DAG run causes Spark to re-read from startingOffsets on each run, leading to duplicate processing. Use a persistent checkpoint and only reset it during disaster recovery.
  2. Kafka offset management vs. Spark checkpointing. Spark manages its own offsets in the checkpoint, independent of Kafka consumer group offsets. Don't rely on kafka-consumer-groups.sh to monitor Spark's progress — it won't show correct lag. Use Spark's streaming query progress listener or check the checkpoint's offsets/ directory.
  3. Airflow task granularity vs. Spark job boundaries. Don't submit one giant Spark job that does bronze → silver → gold. If the silver step fails, you'd have to rerun the expensive bronze read. Split into separate Airflow tasks so each step is independently retryable. Pass Data between steps via Delta tables (durable storage), not DataFrames.
  4. Schema evolution across the stack. A schema change in a Kafka topic (adding a field) must propagate through the entire pipeline. Use Schema Registry to enforce compatibility, enable mergeSchema=true on Delta writes (for additive changes), and ensure downstream Spark jobs handle new columns gracefully (use try_cast or schema-on-read patterns).
  5. Airflow sensing Kafka directly. There's no production-quality KafkaSensor in Airflow for checking "has new data arrived since last run." Instead, use the checkpoint-based approach: Spark's availableNow trigger will simply process nothing if no new data exists. Make the Airflow task idempotent and let it complete with zero records processed as a valid outcome.
↑ Back to top

Comparison / When to Use This Stack

PatternKafka + Airflow + PySparkKafka + FlinkADF + SynapseDatabricks end-to-end
IngestionKafka (streaming)Kafka (streaming)ADF copy activityAuto Loader / DLT
OrchestrationAirflow (flexible)Flink jobs (self-contained)ADF pipelinesDatabricks Workflows
TransformationPySpark (batch/micro-batch)Flink (true streaming)Mapping data flows / Synapse SQLPySpark / SQL / DLT
LatencyMinutes (micro-batch)Milliseconds to secondsMinutes (batch)Seconds–minutes
ComplexityHigh (3 systems to manage)Medium (1 system for stream)Low (Azure managed)Medium (single platform)
Best forFlexible multi-system, team knows SparkLow-latency streaming at scaleAzure shops, less codeAll-in on Databricks lakehouse
↑ Back to top

Quiz

Q1: In a Kafka → Spark Structured Streaming pipeline, where are consumer offsets tracked and why doesn't the Kafka consumer group show progress?

Spark Structured Streaming tracks offsets in its own checkpoint directory (written to HDFS/S3), not in Kafka's __consumer_offsets topic. Spark reads from Kafka using assign mode internally (even when you use subscribe), bypassing the consumer group protocol. Therefore, Kafka CLI tools like kafka-consumer-groups.sh will show no committed offsets for Spark's group.id (or an incorrect/stale value). Monitor progress via query.lastProgress or the Spark UI's Structured Streaming tab.

Q2: Why should you split bronze → silver → gold into separate Airflow tasks rather than one Spark job?

Splitting into separate tasks provides: (1) Independent retryability — if silver-to-gold fails, you retry only that step, not re-reading all data from Kafka. (2) Observability — Airflow shows per-stage success/failure, duration, and logs. (3) Resource optimization — each stage can request different executor memory/cores. (4) Checkpointing — Delta tables between stages act as durable checkpoints. The trade-off is increased overhead from multiple Spark context startups, but this is minor compared to reprocessing costs.

Q3: How would you handle a scenario where a Kafka topic's schema adds a new field?

Use a multi-layer defense: (1) Schema Registry enforces BACKWARD compatibility — consumers with the old schema can still read new messages (the new field is ignored or has a default). (2) On the Spark side, enable mergeSchema=true on Delta writes so the new column is added automatically. (3) Downstream queries should use explicit SELECT lists (not SELECT *) or handle the new column with coalesce(new_col, default). (4) Update dbt models or Gold transformations to incorporate the new field.

Q4: What is the role of trigger(availableNow=True) in bridging streaming and batch orchestration?

availableNow (Spark 3.3+) tells the streaming query to process all data that is currently available, across multiple micro-batches if needed, and then terminate. This allows Airflow to submit a Spark streaming job like a batch job — it runs, processes everything since the last checkpoint, and exits. It preserves streaming's checkpointing semantics (exactly-once offsets) while fitting into a batch orchestration model. Unlike trigger(once=True), it respects rate limits like maxFilesPerTrigger to avoid OOM on backfill.

Q5: What are the trade-offs of using this three-tool stack vs. a fully managed platform like Databricks?

Three-tool stack advantages: Vendor flexibility (run Kafka, Airflow, Spark on any cloud or on-prem), fine-grained control over each component, ability to swap individual pieces (e.g., replace Airflow with Prefect). Disadvantages: Higher operational burden (three systems to deploy, monitor, upgrade), more integration complexity (schema mismatches, checkpoint management), slower development loop. Databricks advantages: Single platform, integrated Delta Live Tables + Unity Catalog + Workflows, better developer experience for lakehouse patterns. Databricks disadvantages: Vendor lock-in, less flexibility for non-Spark workloads, higher cost at scale.

↑ Back to top

Further Reading

↑ Back to top