This guide covers three pillars of modern data engineering individually — Apache Kafka for real-time event streaming, Apache Airflow for workflow orchestration, and PySpark for large-scale data transformation — then shows how they compose into a production pipeline stack.
Among them, this trio powers a huge proportion of real-world data platforms: Kafka handles ingestion and event routing, Airflow schedules and monitors batch/streaming jobs, and PySpark performs heavy transformations. Understanding each tool and their integration points is essential for any data engineering role.
A Kafka topic is an append-only, immutable log of events. Topics are split into partitions — the unit of parallelism and data distribution. Each message within a partition gets a monotonically increasing offset. Ordering is guaranteed only within a single partition, not across partitions. When producing, keys determine partition assignment via a hash (default: murmur2). Choosing the right partition count is critical — too few limits consumer parallelism; too many increases broker metadata overhead and end-to-end latency.
Consumers subscribe to topics as part of a consumer group. Kafka assigns each partition to exactly one consumer in the group, enabling horizontal scale-out. When consumers join or leave, a rebalance occurs. Older "eager" rebalancing stops all consumers during reassignment. Modern Kafka (2.4+) supports cooperative incremental rebalancing which only revokes the specific partitions that need to move, minimizing downtime. The new KIP-848 consumer group protocol (server-side assignment, GA in Kafka 3.7+) further reduces rebalance latency.
Kafka supports three delivery guarantees: at-most-once, at-least-once, and exactly-once. EOS relies on two features: idempotent producers (deduplication via producer ID + sequence number) and transactions (atomic writes across multiple partitions). For consume-transform-produce patterns, use isolation.level=read_committed so consumers only see committed transaction data. EOS adds ~3-5% latency overhead and requires careful transactional.id management.
Each partition has a leader and zero or more follower replicas. The ISR set contains followers that are caught up with the leader within replica.lag.time.max.ms. A produce is acknowledged based on acks: acks=0 (fire-and-forget), acks=1 (leader only), acks=all (all ISR members). Setting min.insync.replicas=2 with acks=all and replication.factor=3 is the standard production durability configuration.
In production, a Schema Registry (e.g., Confluent Schema Registry) manages Avro, Protobuf, or JSON schemas for topics. It enforces compatibility rules — BACKWARD, FORWARD, FULL — preventing producers from publishing data that would break consumers. This decouples producer and consumer deployments. The schema ID is embedded in the first 5 bytes of each message (magic byte + 4-byte ID).
Kafka supports two retention strategies: time/size-based deletion and log compaction. Compaction keeps only the latest value for each key, making the topic a materializable table. This is the foundation for Kafka as a "streaming database" — changelog topics in Kafka Streams use compaction. Compaction runs as a background thread and never removes the active segment, so recent duplicates may temporarily exist.
↑ Back to topE-commerce platforms use Kafka as the central nervous system between services. When a user places an order, the order service publishes to an orders topic. The inventory service, payment service, notification service, and analytics service each consume from this topic independently via their own consumer groups, enabling loose coupling and independent scaling.
Debezium connectors capture row-level changes from databases (PostgreSQL, MySQL, MongoDB) and publish them to Kafka topics. Downstream consumers replicate data to search indices (Elasticsearch), caches (Redis), or data warehouses in near-real-time. This eliminates fragile batch ETL scripts and provides a reliable, ordered change stream.
Financial services stream transaction events through Kafka. Kafka Streams or Flink applications window these events (e.g., 5-minute tumbling windows) to detect anomalous patterns — multiple high-value transactions from different locations, velocity checks — and publish alerts to a downstream topic that triggers blocking actions.
Organizations route application logs, metrics, and traces through Kafka before sinking them to Elasticsearch, Grafana Loki, or object storage. Kafka acts as a buffer that absorbs traffic spikes and decouples producers (applications) from consumers (observability stack), preventing backpressure from slowing services.
↑ Back to topfrom confluent_kafka import Producer
from confluent_kafka.serialization import SerializationContext, MessageField
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
# Schema Registry setup
sr_client = SchemaRegistryClient({"url": "http://schema-registry:8081"})
avro_schema = """
{
"type": "record",
"name": "OrderEvent",
"fields": [
{"name": "order_id", "type": "string"},
{"name": "user_id", "type": "string"},
{"name": "amount", "type": "double"},
{"name": "ts", "type": "long", "logicalType": "timestamp-millis"}
]
}
"""
serializer = AvroSerializer(sr_client, avro_schema)
# Idempotent producer config — ensures no duplicates on retry
producer = Producer({
"bootstrap.servers": "broker:9092",
"enable.idempotence": True,
"acks": "all",
"retries": 5,
"max.in.flight.requests.per.connection": 5,
})
def send_order(order: dict):
producer.produce(
topic="orders",
key=order["order_id"],
value=serializer(order, SerializationContext("orders", MessageField.VALUE)),
on_delivery=lambda err, msg: print(f"Delivered to {msg.partition()} @ {msg.offset()}") if not err else print(f"FAIL: {err}"),
)
producer.flush()
from confluent_kafka import Consumer, KafkaError
consumer = Consumer({
"bootstrap.servers": "broker:9092",
"group.id": "order-processor",
"auto.offset.reset": "earliest",
"enable.auto.commit": False, # manual commit for at-least-once
"isolation.level": "read_committed", # only read committed txn messages
})
consumer.subscribe(["orders"])
try:
while True:
msg = consumer.poll(timeout=1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
raise Exception(msg.error())
# Process message — must be idempotent for at-least-once
process_order(msg.value())
# Commit only after successful processing
consumer.commit(asynchronous=False)
finally:
consumer.close()
# Conceptual: Kafka Streams windowed count (Java API, shown as pseudocode)
# Counts events per user_id in 5-minute tumbling windows
stream = builder.stream("clickstream") \
.group_by_key() \
.windowed_by(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5))) \
.count(Materialized.as_("click-counts-store"))
# Output: KTable<Windowed<String>, Long>
# Can be forwarded to a topic or queried via interactive queries
from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
# Schema Registry ensures backward-compatible evolution
schema_registry = SchemaRegistryClient({
"url": "http://schema-registry:8081"
})
# Avro schema (v2 — added "loyalty_tier" field with default)
CUSTOMER_SCHEMA = """{
"type": "record",
"name": "Customer",
"namespace": "com.example.events",
"fields": [
{"name": "customer_id", "type": "string"},
{"name": "name", "type": "string"},
{"name": "email", "type": "string"},
{"name": "loyalty_tier", "type": "string", "default": "bronze"}
]
}"""
avro_serializer = AvroSerializer(
schema_registry, CUSTOMER_SCHEMA,
conf={"auto.register.schemas": True}
)
producer = SerializingProducer({
"bootstrap.servers": "broker:9092",
"key.serializer": lambda k, _: k.encode("utf-8"),
"value.serializer": avro_serializer,
})
# Produce an event — Schema Registry validates compatibility
producer.produce(
topic="customers",
key="cust-1234",
value={"customer_id": "cust-1234", "name": "Alice",
"email": "alice@example.com", "loyalty_tier": "gold"},
)
producer.flush()
from confluent_kafka import Consumer, Producer
import json
consumer = Consumer({
"bootstrap.servers": "broker:9092",
"group.id": "order-processor",
"enable.auto.commit": False,
})
consumer.subscribe(["orders"])
dlq_producer = Producer({"bootstrap.servers": "broker:9092"})
MAX_RETRIES = 3
while True:
msg = consumer.poll(1.0)
if msg is None or msg.error():
continue
for attempt in range(MAX_RETRIES):
try:
order = json.loads(msg.value())
process_order(order)
consumer.commit(asynchronous=False)
break
except Exception as e:
if attempt == MAX_RETRIES - 1:
# All retries exhausted — send to DLQ for manual review
dlq_producer.produce(
topic="orders.dlq",
key=msg.key(),
value=msg.value(),
headers={
"error": str(e).encode(),
"original_topic": "orders".encode(),
"retry_count": str(MAX_RETRIES).encode(),
},
)
dlq_producer.flush()
consumer.commit(asynchronous=False)
↑ Back to top
| Feature | Apache Kafka | AWS Kinesis | Google Pub/Sub | Apache Pulsar |
|---|---|---|---|---|
| Ordering | Per-partition | Per-shard | Per-key (ordering keys) | Per-partition |
| Retention | Configurable (days/forever via compaction) | 1–365 days | 7 days (31 with Lite) | Configurable + tiered storage |
| Replay | Full offset-based replay | Shard iterator reset | Seek to timestamp | Full offset replay |
| Throughput | Millions msg/sec (scales with partitions) | 1 MB/s per shard | Auto-scales | Millions msg/sec |
| Exactly-once | Native (idempotent + txns) | At-least-once (dedup in app) | At-least-once | Native (txns) |
| Managed option | Confluent Cloud, MSK, Aiven | Fully managed (AWS) | Fully managed (GCP) | StreamNative |
| Ecosystem | Connect, Streams, ksqlDB, Schema Registry | Lambda integration | Dataflow integration | Functions, IO connectors |
| Best for | High-throughput, replay, multi-consumer | AWS-native simple streaming | GCP-native pub/sub | Multi-tenant, geo-replication |
Choose Kafka when: You need high-throughput event streaming with replay, exactly-once semantics, or a rich connector ecosystem. Choose Kinesis/Pub/Sub when: You're committed to one cloud and want zero ops overhead. Choose Pulsar when: You need native multi-tenancy or built-in tiered storage without Confluent.
↑ Back to toppartitions = max(target_throughput / producer_throughput_per_partition, target_throughput / consumer_throughput_per_partition) and scale up.on_revoke callbacks to flush and commit.enable.auto.commit=true and your processing takes longer than auto.commit.interval.ms, offsets may be committed before processing finishes, causing data loss on failure. Prefer manual commits after processing.The extra consumers sit idle. Kafka assigns at most one consumer per partition within a consumer group. If you have 8 consumers and 6 partitions, 2 consumers will not receive any messages. This is by design — partition count sets the upper bound on consumer parallelism.
acks=1 and acks=all. When would you choose each?acks=1: the leader writes the message to its local log and responds immediately without waiting for followers. This is faster but risks data loss if the leader crashes before replication. acks=all: the leader waits for all in-sync replicas (ISR) to acknowledge the write. Combined with min.insync.replicas=2, this ensures durability even if one broker fails. Choose acks=1 for low-latency, loss-tolerant use cases (metrics, logs). Choose acks=all for financial transactions, orders, or any data where loss is unacceptable.
Time-based retention deletes entire log segments older than retention.ms regardless of content. Log compaction retains the latest value for each key and removes older duplicates, keeping the topic as a compact key-value snapshot. Compaction never removes records from the active segment. A compacted topic can be used as a materialized view — consumers reading from offset 0 can rebuild the full current state. Time-based retention is for event streams; compaction is for changelog/state topics.
In a message queue (e.g., RabbitMQ), any consumer can pick up any message, and once acknowledged, it's removed. Kafka ties partitions to consumers 1:1 within a group, so you can't have fine-grained work-stealing. Messages persist until retention expires — they're not removed on consumption. Also, Kafka's rebalance on consumer failure is slower than queue-based redelivery. However, Kafka 3.7+ introduces share groups (KIP-932) that enable queue-like semantics where multiple consumers can process from the same partition.
transactional.id in Kafka producers?The transactional.id enables exactly-once semantics across producer restarts. When a producer starts with a transactional.id, the broker "fences" (disables) any previous producer with the same ID, preventing zombie instances from writing duplicate data. It ties together idempotent production and transaction coordination. Each instance in a consume-transform-produce loop should have a unique, stable transactional.id (often derived from input partition assignment).
In Airflow 2.x+, the TaskFlow API (@task decorator) replaces verbose operator instantiation with Pythonic functions. Tasks decorated with @task automatically handle XCom serialization — the return value of one task is passed as an argument to the next. This eliminates manual xcom_push/pull calls. DAGs should be treated as configuration, not business logic: keep the DAG file thin and push heavy computation into the task's execute() or delegated services (Spark, SQL engines).
Airflow 2.3+ supports dynamic task mapping via .expand(). Instead of hard-coding a fixed number of parallel tasks, you can fan-out at runtime based on the output of an upstream task. This is essential for ETL patterns where the number of files, partitions, or API pages is not known until execution. Under the hood, mapped tasks share the same operator definition but each gets a unique map_index. Combined with .partial() for static params, this replaces most SubDAG and TaskGroup-based fan-out hacks.
The executor determines how tasks run. LocalExecutor runs tasks as subprocesses on the scheduler node — fine for light workloads. CeleryExecutor distributes tasks across a pool of persistent workers via a message broker (Redis/RabbitMQ) — good for steady, large workloads. KubernetesExecutor spins up a new pod per task — optimal for heterogeneous resource requirements and auto-scaling, but with higher task startup latency (~10-30s). Choose based on scale, isolation needs, and infrastructure.
Sensors poll for an external condition (file exists, partition available). Classic sensors occupy a worker slot while waiting, which wastes resources. Airflow 2.2+ introduced deferrable operators (async) — they suspend the task, free the worker slot, and register a lightweight trigger that runs in the triggerer process. When the condition is met, the trigger resumes the task. This can reduce worker slot consumption by 10x for sensor-heavy DAGs.
Airflow stores credentials in Connections (host, login, password, extras JSON) and configuration in Variables. For production, use a secrets backend (AWS Secrets Manager, HashiCorp Vault, GCP Secret Manager) rather than the metadata DB. Critical best practice: never call Variable.get() at DAG parse-time (top-level code) — it triggers a DB query every scheduler heartbeat. Use Jinja templates ({{ var.value.my_var }}) or access variables inside task callables.
Airflow 2.4+ supports datasets as first-class scheduling triggers. A producing DAG declares outlets=[Dataset("s3://bucket/table")] on a task. A consuming DAG uses schedule=[Dataset("s3://bucket/table")] so it runs when the upstream task completes. This creates event-driven, cross-DAG dependencies without explicit TriggerDagRunOperator calls. Datasets are logical URIs — Airflow doesn't validate the URI target, so naming conventions matter.
A retailer runs a nightly DAG that: (1) extracts incremental data from PostgreSQL and Shopify APIs, (2) loads raw data to S3/GCS, (3) triggers dbt transformations in Snowflake/BigQuery, (4) runs data quality checks (Great Expectations or dbt tests), (5) sends a Slack notification. Airflow orchestrates this multi-step pipeline, handling retries, alerting on failure, and ensuring tasks run in the correct order.
A fintech company uses Airflow to orchestrate feature computation for fraud detection models. A daily DAG: (1) pulls transaction data from Kafka-backed tables, (2) submits PySpark jobs to compute rolling aggregates (e.g., 7-day spend per merchant category), (3) writes features to an online feature store (Feast/Tecton), (4) triggers model retraining via an MLflow API call. Dynamic task mapping fans out PySpark jobs across feature groups in parallel.
An enterprise with data in AWS (S3, Redshift) and Azure (ADLS, Synapse) uses Airflow to synchronize reference data bidirectionally. Airflow's provider ecosystem offers operators for both clouds, and the DAG handles schema drift detection, incremental sync, and reconciliation checks. Connection management via secrets backends keeps credentials cloud-specific.
↑ Back to topimport pendulum
from airflow.sdk import DAG, task
with DAG(
dag_id="dynamic_elt_pipeline",
start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
schedule="@daily",
catchup=False,
tags=["elt", "dynamic"],
) as dag:
@task()
def list_source_tables() -> list[str]:
"""Discover tables to extract — could query a metadata DB."""
return ["orders", "customers", "products", "inventory"]
@task()
def extract_and_load(table_name: str, ds=None):
"""Extract from source DB, load to S3 as Parquet."""
from airflow.hooks.base import BaseHook
conn = BaseHook.get_connection("source_postgres")
# ... extract logic using conn, partition by ds ...
print(f"Extracted {table_name} for {ds}")
@task()
def run_dbt_models():
"""Trigger dbt run via BashOperator or dbt Cloud API."""
import subprocess
subprocess.run(["dbt", "run", "--select", "tag:daily"], check=True)
tables = list_source_tables()
# Dynamic fan-out: one task per table, determined at runtime
loaded = extract_and_load.expand(table_name=tables)
loaded >> run_dbt_models()
from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor
wait_for_export = S3KeySensor(
task_id="wait_for_export_file",
bucket_name="data-lake-raw",
bucket_key="exports/{{ ds }}/transactions.parquet",
aws_conn_id="aws_default",
deferrable=True, # frees worker slot while waiting
poke_interval=60, # check every 60s (in triggerer, not worker)
timeout=3600, # fail after 1 hour
)
from airflow.sdk import DAG, Dataset, task
import pendulum
ORDERS_DATASET = Dataset("s3://warehouse/silver/orders")
# Producer DAG — declares outlet
with DAG(dag_id="ingest_orders", schedule="@daily",
start_date=pendulum.datetime(2024, 1, 1)):
@task(outlets=[ORDERS_DATASET])
def load_orders():
# ... load logic ...
pass
load_orders()
# Consumer DAG — triggers when dataset is updated
with DAG(dag_id="transform_orders", schedule=[ORDERS_DATASET],
start_date=pendulum.datetime(2024, 1, 1)):
@task()
def build_order_metrics():
# runs automatically after ingest_orders completes
pass
build_order_metrics()
↑ Back to top
| Feature | Apache Airflow | Prefect | Dagster | Databricks Workflows |
|---|---|---|---|---|
| Scheduling | Cron, timetables, datasets | Cron, event-driven | Cron, sensors, assets | Cron, file arrival triggers |
| Paradigm | DAG of tasks (operators) | Flows & tasks (Python-native) | Assets & ops (data-aware) | Jobs & tasks (Spark-centric) |
| Dynamic tasks | expand() in 2.3+ | Native (map) | Native (dynamic partitions) | For-each tasks |
| Executor model | Celery, K8s, Local | Agents, Work Pools | Executors (K8s, etc.) | Databricks clusters |
| UI | Grid, Graph, Gantt | Cloud UI (free tier) | Dagit (asset lineage) | Databricks Workflows UI |
| Ecosystem | 80+ provider packages | Growing integrations | Growing integrations | Databricks-native (Spark, Delta) |
| Learning curve | Moderate (many concepts) | Low (Pythonic) | Moderate (asset model) | Low if already on Databricks |
| Best for | Complex multi-system orchestration | Python-native workflows | Data asset-centric lineage | Spark-centric workloads |
Choose Airflow when: You need a battle-tested orchestrator with a massive provider ecosystem, complex dependency patterns, and your team is comfortable with its operational overhead. Choose Prefect/Dagster when: You want a more Pythonic developer experience or asset-based lineage. Choose Databricks Workflows when: Your entire workload is Spark/Delta and you're already on the Databricks platform.
↑ Back to topexecute() methods runs every time the scheduler parses the DAG file (every ~30s). Importing heavy libraries (pandas, torch) or calling APIs/DB at the top level tanks scheduler performance. Move imports inside task callables.INSERT INTO without deduplication, retries create duplicates. Always use UPSERT/MERGE, partition-based overwrites, or write to temp locations with atomic rename.data_interval_start. Airflow 2.2+ shifted from execution_date to data_interval_start/end. Many legacy patterns still reference execution_date, which is confusing (it's the start of the interval, not when the DAG actually runs). Always use data_interval_start for partitioning logic..expand(), (3) after all loads complete, triggers a dbt run. Add a sensor that waits for a daily flag file before starting.ingest_raw (produces Dataset A), transform_silver (triggered by Dataset A, produces Dataset B), and build_gold (triggered by Dataset B). Verify in the Airflow UI that the Dataset graph shows the correct dependency chain.execution_date and data_interval_start in Airflow 2.x?They are the same value — execution_date is the legacy name, and data_interval_start is the modern name introduced in Airflow 2.2. Both represent the start of the data interval the DAG run covers. The actual wall-clock time the DAG runs is data_interval_end (or slightly after). For a daily DAG scheduled for 2024-01-15, data_interval_start = 2024-01-15T00:00:00 and data_interval_end = 2024-01-16T00:00:00, but the DAG actually executes after midnight on the 16th.
Variable.get() at the top level of a DAG file?The Airflow scheduler re-parses DAG files at a regular interval (every ~30s by default). Top-level Variable.get() triggers a database query on every parse, which hammers the metadata DB and slows down scheduling for all DAGs. Instead, use Jinja templates ({{ var.value.my_var }}) which are resolved only at task execution time, or call Variable.get() inside a task callable.
.expand()) differ from a SubDAG?SubDAGs are deprecated. They created a separate DAG run with its own scheduler budget, leading to deadlocks and performance issues. Dynamic task mapping creates tasks within the same DAG run — they share the same pool, scheduling, and UI. Mapped tasks are determined at runtime (based on upstream output), while SubDAGs were defined at parse time. Mapped tasks are first-class: they appear in the UI grid with individual status, logs, and retry controls.
Deferrable operators allow a task to suspend execution, free its worker slot, and register an async trigger (a lightweight Python coroutine). The triggerer is a separate Airflow process that runs these triggers using asyncio. When a trigger fires (e.g., a file appears in S3), the triggerer notifies the scheduler, which resumes the task on a worker. This architecture is critical for sensor-heavy DAGs where tasks spend most of their time waiting.
Airflow uses a scheduler loop that periodically parses DAG files and evaluates task dependencies. The scheduler heartbeat is typically 1-5 seconds, and task execution involves worker assignment, potential pod/container creation (K8s executor), and metadata DB state transitions. This overhead makes Airflow suitable for batch orchestration (minutes to hours) but not for sub-second, real-time triggering. For sub-second needs, use a streaming framework (Kafka Streams, Flink) and use Airflow only to orchestrate the deployment/lifecycle of those streaming jobs.
PySpark's performance relies on the Catalyst optimizer, which transforms logical plans through rule-based and cost-based optimization (predicate pushdown, column pruning, join reordering). The optimized logical plan is converted to physical plan stages. Tungsten provides off-heap memory management and whole-stage code generation (WSCG), compiling pipeline stages into optimized Java bytecode. Understanding the query plan (df.explain(True)) is critical for diagnosing performance issues — look for BroadcastHashJoin vs. SortMergeJoin, filter placement, and exchange (shuffle) nodes.
Data in Spark is divided into partitions — the unit of parallelism. Transformations like groupBy, join, and repartition trigger a shuffle — data is serialized, written to disk, and redistributed across executors via the network. Shuffles are the most expensive operation in Spark. To minimize them: (1) use broadcast() for small dimension tables, (2) pre-partition data on common join keys, (3) use coalesce() instead of repartition() to reduce partitions, (4) leverage bucketing for repeated joins. Target partition sizes of 100-200 MB for optimal throughput.
When one side of a join fits in memory (default threshold: spark.sql.autoBroadcastJoinThreshold = 10 MB), Spark broadcasts the small table to all executors, avoiding a shuffle on the large table. This turns an O(n log n) sort-merge into an O(n) hash lookup. For dimensions up to ~1-2 GB, manually hint with F.broadcast(small_df). Sort-merge join is the default for two large tables — both sides are sorted on the join key and merged. If you're joining the same large tables repeatedly, consider bucketing them by the join key to eliminate the sort phase.
AQE (enabled by default since Spark 3.2) re-optimizes the query plan at runtime based on actual data statistics from completed map stages. Key features: (1) coalescing post-shuffle partitions — merges small partitions to reduce task overhead, (2) converting sort-merge join to broadcast join when runtime statistics show one side is small, (3) skew join optimization — splits skewed partitions into smaller ones. AQE significantly reduces the need for manual tuning. Check it with spark.conf.set("spark.sql.adaptive.enabled", "true").
PySpark's Structured Streaming uses the same DataFrame API for batch and stream processing. Key concepts: triggers (how often to process — processingTime, availableNow, continuous), output modes (append, complete, update), and checkpointing (stores offsets and state to durable storage for fault tolerance). The availableNow trigger (Spark 3.3+) processes all available data then stops — perfect for Airflow-triggered micro-batch patterns.
Spark's unified memory model splits executor memory into execution memory (shuffles, joins, aggregations) and storage memory (cached data). They can borrow from each other. When memory is exhausted, Spark spills to disk — this is slow but prevents OOM. Common OOM causes: (1) large broadcast variables, (2) skewed partitions, (3) collecting large datasets to the driver (collect()). Monitor via Spark UI's Storage and Executor tabs. Set spark.executor.memory judiciously — overprovisioning wastes cluster resources; underprovisioning causes excessive spill.
A media streaming company processes 500 TB of daily event logs (plays, pauses, searches). PySpark reads Parquet files from S3, applies sessionization (grouping events by user within 30-min gaps), computes aggregates (daily active users, content popularity), and writes output to Delta Lake tables partitioned by date. Spark's distributed execution makes this feasible; a single-node tool would take days.
An insurance company uses PySpark to validate 200M+ claim records against business rules: coverage dates must precede claim dates, amounts must be positive, policy IDs must exist in a reference table. PySpark enables these checks as distributed joins and filters, outputting a quality report and quarantining bad records — all within 15 minutes.
A ride-sharing platform computes features for demand prediction: rolling 1-hour ride counts per geo-hex, driver supply ratios, weather-joined features. PySpark window functions (F.window) and geo-spatial UDFs process billions of ride events. Output features are written to a feature store (Feast) or Hive tables consumed by training jobs.
from pyspark.sql import SparkSession, functions as F
spark = SparkSession.builder \
.appName("incremental_load") \
.config("spark.sql.adaptive.enabled", "true") \
.getOrCreate()
# Read only the partitions we need — predicate pushdown to Parquet
date_filter = "2024-06-15"
raw_df = spark.read.parquet("s3a://data-lake/raw/events/") \
.where(F.col("event_date") == date_filter)
# Deduplicate on event_id (at-least-once upstream guarantee)
deduped_df = raw_df.dropDuplicates(["event_id"])
# Transform: add derived columns
enriched_df = deduped_df.withColumn(
"event_hour", F.hour("event_timestamp")
).withColumn(
"is_weekend", F.dayofweek("event_timestamp").isin([1, 7])
)
# Write to Delta Lake with MERGE-style overwrite per partition
enriched_df.write \
.format("delta") \
.mode("overwrite") \
.option("replaceWhere", f"event_date = '{date_filter}'") \
.partitionBy("event_date") \
.save("s3a://data-lake/silver/events/")
from pyspark.sql import functions as F
# Problem: joining on user_id but 1% of users generate 50% of events
SALT_BUCKETS = 10
# Salt the large (skewed) side
events_salted = events_df.withColumn(
"salt", (F.rand() * SALT_BUCKETS).cast("int")
).withColumn(
"salted_key", F.concat(F.col("user_id"), F.lit("_"), F.col("salt"))
)
# Explode the small (dimension) side to match all salt values
users_exploded = users_df.crossJoin(
spark.range(0, SALT_BUCKETS).withColumnRenamed("id", "salt")
).withColumn(
"salted_key", F.concat(F.col("user_id"), F.lit("_"), F.col("salt"))
)
# Now join on salted_key — evenly distributed
joined_df = events_salted.join(
users_exploded, on="salted_key", how="inner"
).drop("salt", "salted_key")
from pyspark.sql import Window, functions as F
SESSION_GAP = 1800 # 30 minutes in seconds
window_spec = Window.partitionBy("user_id").orderBy("event_ts")
sessionized = events_df \
.withColumn("prev_ts", F.lag("event_ts").over(window_spec)) \
.withColumn("gap_seconds",
F.unix_timestamp("event_ts") - F.unix_timestamp("prev_ts")
) \
.withColumn("new_session",
F.when(
(F.col("gap_seconds") > SESSION_GAP) | F.col("prev_ts").isNull(), 1
).otherwise(0)
) \
.withColumn("session_id",
F.concat(
F.col("user_id"), F.lit("_"),
F.sum("new_session").over(window_spec)
)
)
from pyspark.sql import functions as F
# Small dimension table (~50 MB) — broadcast to avoid shuffle
product_dim = spark.read.parquet("s3a://warehouse/dim_products/")
# Large fact table (~500 GB)
sales_fact = spark.read.parquet("s3a://warehouse/fact_sales/") \
.where(F.col("sale_date") >= "2024-01-01")
# Explicit broadcast hint — overrides autoBroadcastJoinThreshold
result = sales_fact.join(
F.broadcast(product_dim),
on="product_id",
how="left"
).groupBy("category", "sale_date") \
.agg(
F.sum("revenue").alias("total_revenue"),
F.countDistinct("order_id").alias("order_count")
)
from pyspark.sql import functions as F
from pyspark.sql.types import StringType, ArrayType
import pandas as pd
# BAD: Row-level Python UDF (10-100x slower)
# @udf(StringType())
# def clean_text(text):
# return text.lower().strip() if text else None
# GOOD: Pandas UDF — operates on Arrow batches (vectorized, near-native speed)
@F.pandas_udf(StringType())
def clean_text(series: pd.Series) -> pd.Series:
"""Vectorized text cleaning — runs on entire column batch."""
return series.str.lower().str.strip().str.replace(r'[^\w\s]', '', regex=True)
# Apply: Spark transfers data via Arrow → Pandas Series → Arrow
cleaned_df = raw_df.withColumn(
"clean_description", clean_text(F.col("product_description"))
)
# Grouped Map Pandas UDF — apply a function per group
@F.pandas_udf("user_id string, event_count long, avg_duration double",
F.PandasUDFType.GROUPED_MAP)
def compute_user_stats(pdf: pd.DataFrame) -> pd.DataFrame:
"""Compute per-user statistics using full Pandas power."""
return pd.DataFrame({
"user_id": [pdf["user_id"].iloc[0]],
"event_count": [len(pdf)],
"avg_duration": [pdf["duration_seconds"].mean()],
})
user_stats = events_df.groupBy("user_id").apply(compute_user_stats)
↑ Back to top
| Feature | PySpark | Pandas / Polars | dbt (SQL) | Apache Flink |
|---|---|---|---|---|
| Scale | TB–PB (distributed) | GB (single-node) | Depends on warehouse | TB–PB (distributed) |
| Paradigm | DataFrame + SQL | DataFrame | SQL models | DataStream + Table API |
| Latency | Seconds–minutes (batch/micro-batch) | Milliseconds (local) | Depends on warehouse | Milliseconds (true streaming) |
| Streaming | Structured Streaming (micro-batch) | No | No | Native event-time streaming |
| Language | Python, Scala, SQL | Python, Rust (Polars) | SQL + Jinja | Java, Scala, Python, SQL |
| Ecosystem | MLlib, GraphX, Delta Lake | scikit-learn, NumPy | dbt packages, exposures | CDC connectors, Iceberg |
| Best for | Large-scale batch ETL, feature eng. | Small data, prototyping | Warehouse transformations | True low-latency streaming |
Choose PySpark when: Data exceeds single-node memory, you need distributed shuffles/joins, or you're already on a Spark platform (Databricks, EMR). Choose Pandas/Polars when: Data fits in memory and you want simpler development. Choose dbt when: Transformations are pure SQL against a warehouse. Choose Flink when: You need true event-time streaming with millisecond latency.
↑ Back to topcollect() on large DataFrames. collect() pulls all data to the driver — instant OOM on large datasets. Use take(n), show(), or write to storage. If you need a count, use df.count() (distributed), not len(df.collect()).pyspark.sql.functions. If you must use UDFs, use Pandas UDFs (@pandas_udf) which operate on Arrow batches with near-native speed.coalesce() before write, or leverage Delta Lake's OPTIMIZE (auto-compaction) to merge small files.groupBy or join on a key with skewed distribution (e.g., null keys, hot users) creates some tasks that process 100x more data than others. The Spark UI's "Task Duration" histogram reveals skew. Fix with AQE skew join hints, salting, or isolating skewed keys.df.cache() stores data in memory, but memory is limited. Caching a DataFrame you only use once wastes memory and can evict more valuable caches. Cache only when a DataFrame is reused multiple times in the same job. Always unpersist() when done.big_events (10M rows, join key = user_id with 1% of values being a single "hot" user) and user_dim (100K rows). Join them naively. Use explain(True) to confirm it's a SortMergeJoin. Then: (a) apply a broadcast hint for user_dim, (b) verify the plan shows BroadcastHashJoin, (c) measure execution time difference, (d) implement salting for the hot-key scenario and compare.dropDuplicates with watermark, and writes output to a Parquet sink with checkpointing. Use trigger(availableNow=True) mode to simulate Airflow-triggered micro-batches.repartition(n) and coalesce(n)?repartition(n) performs a full shuffle to create exactly n partitions — all data is redistributed. It can increase or decrease partition count. coalesce(n) reduces partition count without a full shuffle by combining adjacent partitions. It can only decrease the count and results in unevenly sized partitions. Use coalesce when reducing partitions (e.g., before writing) and repartition when you need evenly distributed data or are increasing partition count.
AQE re-optimizes the query plan at stage boundaries using runtime statistics from completed map stages. Three key optimizations: (1) Coalescing post-shuffle partitions — merges small partitions to avoid task overhead (reduces 200 tasks of 1 MB each to 10 tasks of 20 MB). (2) Converting sort-merge join to broadcast join — if a map output is smaller than expected, AQE switches to broadcast. (3) Skew join optimization — detects partition skew and splits oversized partitions into sub-partitions for parallel processing.
Python UDFs require row-by-row serialization between the JVM (Spark) and a Python process via a socket. Each row is serialized with pickle, sent to Python, processed, and sent back. This serialization overhead makes them 10-100x slower than native Spark functions. The alternative is Pandas UDFs (@pandas_udf) which use Apache Arrow for columnar transfer — entire batches of rows are transferred efficiently, and the UDF operates on a pandas.Series, leveraging vectorized NumPy operations.
Writing DataFrames with many partitions creates one file per partition per writer. A 1000-partition DataFrame written partitioned by date can create 1000 tiny files per date partition. This slows reads (high per-file overhead, especially on object storage with costly list operations). Fixes: (1) coalesce(N) before write where N targets 100-200 MB per file, (2) repartition(column) to consolidate data, (3) Delta Lake OPTIMIZE which compacts small files into larger ones, (4) Spark configuration spark.sql.files.maxRecordsPerFile to control row count per file.
trigger(availableNow=True) in Structured Streaming?availableNow (Spark 3.3+) processes all available data across potentially multiple batches, then terminates the query. It's ideal for micro-batch patterns where an external scheduler (like Airflow) triggers the streaming job on a schedule. Unlike trigger(once=True) which processes all data in a single batch (risking OOM), availableNow respects maxFilesPerTrigger and processes data in manageable increments while still draining everything available. This gives you streaming's exactly-once semantics with batch-schedule control.
The Kafka → Airflow → PySpark pipeline is the backbone of many modern data platforms. Kafka handles real-time ingestion and event routing. Airflow orchestrates scheduled batch/micro-batch jobs. PySpark performs the heavy transformations. Together, they implement a lambda-like architecture (streaming + batch) or, more commonly today, a micro-batch architecture where Airflow triggers PySpark Structured Streaming jobs to process accumulated Kafka data.
┌──────────────────────────────────────────────────────────────────────────┐
│ DATA SOURCES │
│ ┌─────────┐ ┌─────────┐ ┌─────────┐ ┌─────────────┐ │
│ │ App APIs │ │ IoT │ │ CDC │ │ Third-Party │ │
│ └────┬─────┘ └────┬────┘ └────┬────┘ └──────┬──────┘ │
└───────┼──────────────┼───────────┼───────────────┼──────────────────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌──────────────────────────────────────────────────────────────────────────┐
│ APACHE KAFKA CLUSTER │
│ ┌─────────────┐ ┌──────────────┐ ┌───────────────┐ │
│ │ raw.orders │ │ raw.events │ │ cdc.products │ ... │
│ │ (topic) │ │ (topic) │ │ (topic) │ │
│ └──────┬──────┘ └──────┬───────┘ └───────┬───────┘ │
│ │ Schema Registry manages schemas │ │
└─────────┼────────────────┼───────────────────┼──────────────────────────┘
│ │ │
▼ ▼ ▼
┌──────────────────────────────────────────────────────────────────────────┐
│ APACHE AIRFLOW (Orchestrator) │
│ │
│ ┌────────────────────────────────────────────────────────────────┐ │
│ │ DAG: daily_pipeline (schedule: @daily) │ │
│ │ │ │
│ │ [sensor] [spark_submit] [spark_submit] │ │
│ │ wait_for_data ──▶ bronze_to_silver ──▶ silver_to_gold │ │
│ │ │ │ │
│ │ [quality_check] │ │
│ │ │ │ │
│ │ [notify_slack] │ │
│ └────────────────────────────────────────────────────────────────┘ │
└──────────────────────────────────────────────────────────────────────────┘
│ │ │
▼ ▼ ▼
┌──────────────────────────────────────────────────────────────────────────┐
│ PYSPARK / DATABRICKS CLUSTER │
│ │
│ ┌────────────┐ ┌──────────────┐ ┌──────────────┐ │
│ │ Bronze │ │ Silver │ │ Gold │ │
│ │ (raw from │───▶│ (cleaned, │───▶│ (aggregated, │ │
│ │ Kafka) │ │ deduped) │ │ modeled) │ │
│ └────────────┘ └──────────────┘ └──────┬───────┘ │
│ │ │
│ Delta Lake / Iceberg Tables │ │
└─────────────────────────────────────────────────┼────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────────────────┐
│ CONSUMPTION LAYER │
│ ┌───────────┐ ┌──────────┐ ┌────────────┐ ┌────────────┐ │
│ │ BI / │ │ ML │ │ Reverse │ │ Real-time │ │
│ │ Dashboards│ │ Training │ │ ETL │ │ APIs │ │
│ └───────────┘ └──────────┘ └────────────┘ └────────────┘ │
└──────────────────────────────────────────────────────────────────────────┘
Data flow: Sources produce events to Kafka topics (with Avro schemas). Airflow runs a daily DAG that (1) verifies data availability in Kafka (sensor), (2) submits a PySpark job to read from Kafka, deduplicate, and write to a Bronze Delta table, (3) submits a second PySpark job to clean, validate, and write to Silver, (4) submits a third job to aggregate into Gold dimension/fact tables, (5) runs quality checks, (6) notifies the team.
↑ Back to topimport pendulum
from airflow.sdk import DAG, task
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.providers.slack.notifications.slack import send_slack_notification
with DAG(
dag_id="kafka_spark_pipeline",
start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
schedule="@daily",
catchup=False,
default_args={"retries": 2, "retry_delay": pendulum.duration(minutes=5)},
on_failure_callback=[send_slack_notification(
text="Pipeline FAILED: {{ dag.dag_id }} on {{ ds }}",
channel="#data-alerts",
slack_conn_id="slack_default",
)],
) as dag:
@task()
def check_kafka_lag(ds=None):
"""Verify Kafka consumer lag is within acceptable bounds."""
from confluent_kafka.admin import AdminClient
admin = AdminClient({"bootstrap.servers": "broker:9092"})
# Check consumer group lag for the spark consumer group
# Raise AirflowException if lag exceeds threshold
print(f"Kafka lag check passed for {ds}")
bronze_job = SparkSubmitOperator(
task_id="kafka_to_bronze",
application="s3://spark-jobs/kafka_to_bronze.py",
conn_id="spark_default",
application_args=["--date", "{{ ds }}"],
conf={
"spark.sql.adaptive.enabled": "true",
"spark.jars.packages": "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0,io.delta:delta-spark_2.12:3.1.0",
},
executor_memory="4g",
executor_cores=4,
num_executors=8,
)
silver_job = SparkSubmitOperator(
task_id="bronze_to_silver",
application="s3://spark-jobs/bronze_to_silver.py",
conn_id="spark_default",
application_args=["--date", "{{ ds }}"],
conf={"spark.sql.adaptive.enabled": "true"},
executor_memory="8g",
executor_cores=4,
num_executors=12,
)
gold_job = SparkSubmitOperator(
task_id="silver_to_gold",
application="s3://spark-jobs/silver_to_gold.py",
conn_id="spark_default",
application_args=["--date", "{{ ds }}"],
)
@task()
def run_quality_checks(ds=None):
"""Run Great Expectations suite against Gold tables."""
import great_expectations as gx
context = gx.get_context()
result = context.run_checkpoint(checkpoint_name="gold_daily_check")
if not result.success:
raise Exception("Data quality checks failed")
check_kafka_lag() >> bronze_job >> silver_job >> gold_job >> run_quality_checks()
import sys
from pyspark.sql import SparkSession, functions as F
from pyspark.sql.avro.functions import from_avro
spark = SparkSession.builder.appName("kafka_to_bronze").getOrCreate()
run_date = sys.argv[sys.argv.index("--date") + 1]
# Read from Kafka — Structured Streaming with availableNow trigger
kafka_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker:9092") \
.option("subscribe", "raw.orders,raw.events") \
.option("startingOffsets", "earliest") \
.option("maxOffsetsPerTrigger", 1000000) \
.load()
# Parse Kafka message: key, value (Avro), topic, partition, offset, timestamp
parsed_df = kafka_df.select(
F.col("topic"),
F.col("partition"),
F.col("offset"),
F.col("timestamp").alias("kafka_timestamp"),
F.col("key").cast("string").alias("message_key"),
F.col("value").cast("string").alias("message_value"),
F.lit(run_date).alias("load_date"),
)
# Write to Delta Bronze table — streaming with checkpointing
query = parsed_df.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", f"s3a://checkpoints/bronze/{run_date}") \
.option("mergeSchema", "true") \
.trigger(availableNow=True) \
.partitionBy("topic", "load_date") \
.start("s3a://data-lake/bronze/kafka_events/")
query.awaitTermination()
spark.stop()
↑ Back to top
availableNow in an Airflow-triggered pattern, the checkpoint location must be stable (same path for the same logical job). Creating a new checkpoint per DAG run causes Spark to re-read from startingOffsets on each run, leading to duplicate processing. Use a persistent checkpoint and only reset it during disaster recovery.kafka-consumer-groups.sh to monitor Spark's progress — it won't show correct lag. Use Spark's streaming query progress listener or check the checkpoint's offsets/ directory.mergeSchema=true on Delta writes (for additive changes), and ensure downstream Spark jobs handle new columns gracefully (use try_cast or schema-on-read patterns).availableNow trigger will simply process nothing if no new data exists. Make the Airflow task idempotent and let it complete with zero records processed as a valid outcome.| Pattern | Kafka + Airflow + PySpark | Kafka + Flink | ADF + Synapse | Databricks end-to-end |
|---|---|---|---|---|
| Ingestion | Kafka (streaming) | Kafka (streaming) | ADF copy activity | Auto Loader / DLT |
| Orchestration | Airflow (flexible) | Flink jobs (self-contained) | ADF pipelines | Databricks Workflows |
| Transformation | PySpark (batch/micro-batch) | Flink (true streaming) | Mapping data flows / Synapse SQL | PySpark / SQL / DLT |
| Latency | Minutes (micro-batch) | Milliseconds to seconds | Minutes (batch) | Seconds–minutes |
| Complexity | High (3 systems to manage) | Medium (1 system for stream) | Low (Azure managed) | Medium (single platform) |
| Best for | Flexible multi-system, team knows Spark | Low-latency streaming at scale | Azure shops, less code | All-in on Databricks lakehouse |
Spark Structured Streaming tracks offsets in its own checkpoint directory (written to HDFS/S3), not in Kafka's __consumer_offsets topic. Spark reads from Kafka using assign mode internally (even when you use subscribe), bypassing the consumer group protocol. Therefore, Kafka CLI tools like kafka-consumer-groups.sh will show no committed offsets for Spark's group.id (or an incorrect/stale value). Monitor progress via query.lastProgress or the Spark UI's Structured Streaming tab.
Splitting into separate tasks provides: (1) Independent retryability — if silver-to-gold fails, you retry only that step, not re-reading all data from Kafka. (2) Observability — Airflow shows per-stage success/failure, duration, and logs. (3) Resource optimization — each stage can request different executor memory/cores. (4) Checkpointing — Delta tables between stages act as durable checkpoints. The trade-off is increased overhead from multiple Spark context startups, but this is minor compared to reprocessing costs.
Use a multi-layer defense: (1) Schema Registry enforces BACKWARD compatibility — consumers with the old schema can still read new messages (the new field is ignored or has a default). (2) On the Spark side, enable mergeSchema=true on Delta writes so the new column is added automatically. (3) Downstream queries should use explicit SELECT lists (not SELECT *) or handle the new column with coalesce(new_col, default). (4) Update dbt models or Gold transformations to incorporate the new field.
trigger(availableNow=True) in bridging streaming and batch orchestration?availableNow (Spark 3.3+) tells the streaming query to process all data that is currently available, across multiple micro-batches if needed, and then terminate. This allows Airflow to submit a Spark streaming job like a batch job — it runs, processes everything since the last checkpoint, and exits. It preserves streaming's checkpointing semantics (exactly-once offsets) while fitting into a batch orchestration model. Unlike trigger(once=True), it respects rate limits like maxFilesPerTrigger to avoid OOM on backfill.
Three-tool stack advantages: Vendor flexibility (run Kafka, Airflow, Spark on any cloud or on-prem), fine-grained control over each component, ability to swap individual pieces (e.g., replace Airflow with Prefect). Disadvantages: Higher operational burden (three systems to deploy, monitor, upgrade), more integration complexity (schema mismatches, checkpoint management), slower development loop. Databricks advantages: Single platform, integrated Delta Live Tables + Unity Catalog + Workflows, better developer experience for lakehouse patterns. Databricks disadvantages: Vendor lock-in, less flexibility for non-Spark workloads, higher cost at scale.