{
"$type": "site.standard.document",
"bskyPostRef": {
"cid": "bafyreifbskcetrnyc5kfp5wje56iqdjq7w57eho622g7cvbm3xchlj4u5e",
"uri": "at://did:plc:25rdn5elo5izoxrmtis34zuk/app.bsky.feed.post/3mozn7lvpz6o2"
},
"coverImage": {
"$type": "blob",
"ref": {
"$link": "bafkreiebye66ijnxpckj3qikkvmtu2mwusqvrpv7lkrhv7v26sti6jba3q"
},
"mimeType": "image/webp",
"size": 309328
},
"path": "/jubinsoni/apache-spark-query-optimization-on-databricks-catalyst-aqe-and-photon-engine-4e5p",
"publishedAt": "2026-06-24T09:32:37.000Z",
"site": "https://dev.to",
"tags": [
"databricks",
"spark",
"python",
"performance",
"https://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf",
"https://docs.databricks.com/en/optimizations/aqe.html",
"https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution",
"https://docs.databricks.com/en/compute/photon.html",
"https://www.databricks.com/blog/2021/06/17/photon-a-fast-query-engine-for-lakehouse-systems.html",
"https://docs.databricks.com/en/optimizations/cost-based-optimizer.html",
"https://spark.apache.org/docs/latest/sql-performance-tuning.html",
"https://docs.databricks.com/en/optimizations/join-reorder.html",
"https://dl.acm.org/doi/10.1145/3514221.3526054",
"https://sparkbyexamples.com/spark/spark-explain-plan-modes/",
"Part 4: Real-Time AI Feature Engineering with Spark Structured Streaming",
"Part 3: Apache Spark Query Optimization — Catalyst, AQE, and Photon Engine",
"Part 1: Building Production-Grade Delta Lake Pipelines with Apache Spark",
"Part 2: Fine-Tuning LLMs at Scale with Databricks MLflow and Spark"
],
"textContent": "> A deep dive into how Spark transforms your SQL into a physical execution plan — and how Databricks layers Adaptive Query Execution and the Photon vectorized engine on top to squeeze out maximum performance.\n\n## Table of Contents\n\n * Why Query Optimization Matters\n * The Catalyst Optimizer Pipeline\n * Stage 1: Parsing — From SQL to Unresolved Logical Plan\n * Stage 2: Analysis — Binding to the Catalog\n * Stage 3: Logical Optimization — Rule-Based Rewrites\n * Stage 4: Physical Planning — Strategies and Cost Models\n * Adaptive Query Execution (AQE)\n * The Photon Engine\n * Reading Explain Plans\n * Tuning Reference Table\n * References\n\n\n\n## Why Query Optimization Matters\n\nA Spark query written by a human and a Spark query executed by the engine are often very different things. The gap between them — the _optimization_ — is what separates a job that runs in 3 minutes from one that runs in 3 hours on identical hardware.\n\nDatabricks compounds Spark's native Catalyst optimizer with two additional layers:\n\n * **Adaptive Query Execution (AQE)** — re-optimizes the query _at runtime_ using actual statistics collected mid-job\n * **Photon** — a C++ vectorized execution engine that replaces the JVM-based Spark executor for eligible operators\n\n\n\nUnderstanding all three lets you write queries that cooperate with the engine rather than fight it.\n\n## The Catalyst Optimizer Pipeline\n\nCatalyst is Spark's rule-based and cost-based query optimizer. Every query — whether written in SQL, DataFrame API, or Dataset API — passes through the same four-stage pipeline before a single byte of data is read.\n\n## Stage 1: Parsing — From SQL to Unresolved Logical Plan\n\n\n # ── Catalyst Stage 1: Parsing ─────────────────────────────────────────────────\n # Spark uses ANTLR4 to parse SQL into an Abstract Syntax Tree (AST).\n # At this point column names are NOT validated — the plan is \"unresolved\".\n\n from pyspark.sql import SparkSession\n\n spark = SparkSession.builder.appName(\"catalyst-demo\").getOrCreate()\n\n # Both of these produce identical internal representations\n df_api = (\n spark.table(\"prod.silver.events_clean\")\n .filter(\"event_type = 'purchase'\")\n .groupBy(\"platform\")\n .agg({\"revenue\": \"sum\"})\n )\n\n sql_api = spark.sql(\"\"\"\n SELECT platform, SUM(revenue) AS total_revenue\n FROM prod.silver.events_clean\n WHERE event_type = 'purchase'\n GROUP BY platform\n \"\"\")\n\n # Inspect the unresolved logical plan (before analysis)\n df_api.explain(mode=\"formatted\")\n # Output includes:\n # == Parsed Logical Plan ==\n # 'Aggregate ['platform], ['platform, unresolvedAlias('sum('revenue), None)]\n # +- 'Filter ('event_type = 'purchase)\n # +- 'UnresolvedRelation [prod, silver, events_clean]\n\n\nThe key insight here: `UnresolvedRelation` and `unresolvedAlias` mean Spark hasn't touched the catalog yet. Column names could be typos at this point and Catalyst doesn't know.\n\n## Stage 2: Analysis — Binding to the Catalog\n\nThe Analyzer walks the unresolved AST and looks up every relation and attribute against the **Catalog** (in Databricks, this is Unity Catalog). It resolves column names, infers data types, validates references, and binds functions.\n\n\n\n # ── Catalyst Stage 2: Analysis ────────────────────────────────────────────────\n # After analysis, every column is resolved to a specific attribute with a type.\n # AnalysisException is thrown HERE if a column doesn't exist.\n\n from pyspark.sql import functions as F\n from pyspark.sql.utils import AnalysisException\n\n # Example of what Analysis catches:\n try:\n spark.table(\"prod.silver.events_clean\") \\\n .select(\"nonexistent_column\") \\\n .show()\n except AnalysisException as e:\n print(f\"Analysis failed: {e}\")\n # → AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION]\n # A column or function parameter with name `nonexistent_column` cannot be resolved.\n\n # After successful analysis, inspect the resolved plan\n df = (\n spark.table(\"prod.silver.events_clean\")\n .filter(F.col(\"event_type\") == \"purchase\")\n .select(\"platform\", \"revenue\", \"user_id\")\n )\n\n # The analyzed plan shows fully qualified attribute IDs like:\n # == Analyzed Logical Plan ==\n # platform: string, revenue: double, user_id: string\n # Project [platform#42, revenue#67, user_id#31]\n # +- Filter (event_type#39 = purchase)\n # +- Relation prod.silver.events_clean[...] parquet\n print(df._jdf.queryExecution().analyzed())\n\n\n## Stage 3: Logical Optimization — Rule-Based Rewrites\n\nThis is where Catalyst applies its ~100+ built-in rules to produce an equivalent but cheaper logical plan. Rules fire repeatedly in fixed-point iteration until the plan stabilises.\n\n\n\n # ── Catalyst Stage 3: Key Optimization Rules ──────────────────────────────────\n\n # RULE 1: Predicate Pushdown\n # Catalyst moves filters as close to the data source as possible,\n # so Spark reads fewer rows from Parquet.\n\n df_before = (\n spark.table(\"prod.silver.events_clean\")\n .join(\n spark.table(\"prod.silver.users_clean\"),\n on=\"user_id\"\n )\n .filter(F.col(\"event_type\") == \"purchase\") # ← filter AFTER join\n )\n\n # Catalyst rewrites this internally as if you wrote:\n df_after_equivalent = (\n spark.table(\"prod.silver.events_clean\")\n .filter(F.col(\"event_type\") == \"purchase\") # ← filter BEFORE join\n .join(\n spark.table(\"prod.silver.users_clean\"),\n on=\"user_id\"\n )\n )\n # Result: potentially millions fewer rows shuffled during the join\n\n # RULE 2: Column Pruning\n # Catalyst removes columns not needed by downstream operators.\n # Even if you select(*), Spark will only read the columns it needs.\n df_pruned = (\n spark.table(\"prod.silver.events_clean\")\n .select(\"*\")\n .filter(F.col(\"event_type\") == \"purchase\")\n .groupBy(\"platform\")\n .agg(F.sum(\"revenue\").alias(\"total_revenue\"))\n )\n # Internally, Catalyst prunes all columns except: event_type, platform, revenue\n\n # RULE 3: Constant Folding\n # Expressions with only literals are evaluated at plan time, not per-row.\n df_constants = spark.range(1000).select(\n F.lit(2 + 3 * 4).alias(\"always_14\"), # folded to Literal(14) at plan time\n F.col(\"id\") * F.lit(1).alias(\"same_id\"), # simplified to just col(\"id\")\n )\n\n # RULE 4: Boolean Simplification\n # AND/OR chains with tautologies or contradictions are collapsed\n df_simplified = spark.range(100).filter(\n (F.col(\"id\") > 10) & F.lit(True) # simplified to just (col(\"id\") > 10)\n )\n\n # See all optimizations applied:\n print(df_pruned._jdf.queryExecution().optimizedPlan())\n\n\n## Stage 4: Physical Planning — Strategies and Cost Models\n\nThe physical planner maps each logical operator to one or more physical implementations and selects the best one using a cost model. The most impactful decision here is **join strategy selection**.\n\n\n\n # ── Catalyst Stage 4: Physical Planning & Join Strategies ────────────────────\n\n # JOIN STRATEGY 1: Broadcast Hash Join (BHJ)\n # Best when one side is small enough to fit in executor memory.\n # No shuffle — the small table is broadcast to all workers.\n\n spark.conf.set(\"spark.sql.autoBroadcastJoinThreshold\", \"10mb\") # default\n\n large_df = spark.table(\"prod.silver.events_clean\") # 500GB\n small_df = spark.table(\"prod.gold.product_catalog\") # 8MB ← will be broadcast\n\n result_bhj = large_df.join(small_df, on=\"product_id\") # BHJ auto-selected\n\n # Force BHJ with a broadcast hint (overrides threshold check):\n from pyspark.sql.functions import broadcast\n result_forced = large_df.join(broadcast(small_df), on=\"product_id\")\n\n\n # JOIN STRATEGY 2: Sort Merge Join (SMJ)\n # Default for large-large joins. Both sides are sorted and merged.\n # Requires a full shuffle — expensive but handles any size.\n\n spark.conf.set(\"spark.sql.autoBroadcastJoinThreshold\", \"-1\") # disable BHJ\n large_df2 = spark.table(\"prod.silver.transactions_clean\") # 200GB\n\n result_smj = large_df.join(large_df2, on=\"user_id\") # SMJ selected\n\n\n # JOIN STRATEGY 3: Shuffle Hash Join (SHJ)\n # Hash-based, no sort. Chosen by AQE when one side is much smaller\n # than the other but still above the broadcast threshold.\n spark.conf.set(\"spark.sql.join.preferSortMergeJoin\", \"false\")\n\n\n # WHOLE-STAGE CODEGEN: Spark fuses multiple operators into a single\n # Java function to avoid virtual dispatch overhead and intermediate objects.\n # Verify it's active in your plan:\n spark.conf.set(\"spark.sql.codegen.wholeStage\", \"true\") # default\n\n result_bhj.explain(mode=\"formatted\")\n # Look for: *(1) BroadcastHashJoin — the *(N) prefix = WholeStageCodegen stage N\n\n\n## Adaptive Query Execution (AQE)\n\nAQE is Databricks' most impactful runtime optimization layer. It materializes shuffle map output statistics at **shuffle boundaries** and uses them to make three key decisions _after_ data has been partially processed.\n\n\n\n\n\n # ── AQE Configuration ─────────────────────────────────────────────────────────\n\n # AQE is ON by default in Databricks Runtime 7.3+\n spark.conf.set(\"spark.sql.adaptive.enabled\", \"true\")\n\n # 1. Dynamic Partition Coalescing\n # Merges small post-shuffle partitions to avoid thousands of tiny tasks\n spark.conf.set(\"spark.sql.adaptive.coalescePartitions.enabled\", \"true\")\n spark.conf.set(\"spark.sql.adaptive.advisoryPartitionSizeInBytes\", \"128mb\")\n spark.conf.set(\"spark.sql.adaptive.coalescePartitions.minPartitionNum\", \"1\")\n\n # 2. Dynamic Join Strategy Switching\n # Allows AQE to downgrade SMJ → BHJ at runtime if a side turns out small\n spark.conf.set(\"spark.sql.adaptive.localShuffleReader.enabled\", \"true\")\n\n # AQE broadcast threshold (can be higher than static threshold since\n # we now KNOW the actual size)\n spark.conf.set(\"spark.sql.adaptive.autoBroadcastJoinThreshold\", \"30mb\")\n\n # 3. Skew Join Optimization\n # Splits oversized partitions and replicates the non-skewed side\n spark.conf.set(\"spark.sql.adaptive.skewJoin.enabled\", \"true\")\n spark.conf.set(\"spark.sql.adaptive.skewJoin.skewedPartitionFactor\", \"5\") # 5x median\n spark.conf.set(\"spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes\", \"256mb\")\n\n # Verify AQE decisions in the query plan:\n df = (\n spark.table(\"prod.silver.events_clean\")\n .join(spark.table(\"prod.silver.users_clean\"), on=\"user_id\")\n .groupBy(\"platform\")\n .agg(F.sum(\"revenue\").alias(\"total\"))\n )\n df.explain(mode=\"formatted\")\n # Look for: AdaptiveSparkPlan isFinalPlan=true\n # and: == Final Physical Plan == (shows post-AQE decisions)\n\n\n## The Photon Engine\n\nPhoton is Databricks' native vectorized query engine written in C++. It replaces the JVM-based Spark executor for eligible operations, processing data in **column-oriented batches** (vectors) rather than row-by-row.\n\n\n\n # ── Photon Configuration & Verification ───────────────────────────────────────\n # Photon is available on Databricks Runtime 9.1+ with Photon-enabled clusters.\n # Enable it at the cluster level (UI: Cluster > Configuration > Enable Photon)\n # or via config:\n\n spark.conf.set(\"spark.databricks.photon.enabled\", \"true\")\n\n # Photon-accelerated operators (as of DBR 13.x):\n # ✅ Scan (Parquet, Delta) ✅ Filter / Project\n # ✅ Hash Aggregate ✅ Sort\n # ✅ Broadcast Hash Join ✅ Sort Merge Join\n # ✅ Window functions ✅ Union / Expand\n # ✅ String functions ✅ Math functions\n # ❌ UDFs (Python/Scala) ❌ Some complex types\n # ❌ Streaming (partial) ❌ RDD-based operations\n\n # Verify Photon is executing your query:\n df = spark.sql(\"\"\"\n SELECT\n platform,\n DATE_TRUNC('month', event_ts) AS month,\n SUM(revenue) AS total_revenue,\n COUNT(DISTINCT user_id) AS unique_buyers,\n AVG(revenue) AS avg_order_value\n FROM prod.silver.events_clean\n WHERE event_type = 'purchase'\n AND event_ts >= '2024-01-01'\n GROUP BY platform, DATE_TRUNC('month', event_ts)\n ORDER BY month DESC, total_revenue DESC\n \"\"\")\n\n df.explain(mode=\"formatted\")\n # Look for operators prefixed with \"Photon\" in the physical plan:\n # == Physical Plan ==\n # PhotonResultStage\n # +- PhotonSort [month DESC NULLS LAST, total_revenue DESC NULLS LAST]\n # +- PhotonShuffleExchangeSink hashpartitioning(platform, month)\n # +- PhotonGroupingAgg [platform, month], [sum(revenue), count(user_id), avg(revenue)]\n # +- PhotonFilter (event_type = purchase AND event_ts >= 2024-01-01)\n # +- PhotonScan parquet prod.silver.events_clean\n\n # Photon performance metrics appear in Spark UI under \"Photon Metrics\":\n # - Photon scan time\n # - Photon total compute time\n # - Rows processed by Photon vs fallback JVM\n\n\n## Reading Explain Plans\n\nThe `explain(mode=\"formatted\")` output is your primary debugging tool. Here's how to read it efficiently:\n\n\n\n # ── Explain Plan Modes ────────────────────────────────────────────────────────\n\n df = (\n spark.table(\"prod.silver.events_clean\")\n .filter(F.col(\"event_type\") == \"purchase\")\n .join(broadcast(spark.table(\"prod.gold.product_catalog\")), on=\"product_id\")\n .groupBy(\"platform\", \"category\")\n .agg(\n F.sum(\"revenue\").alias(\"total_revenue\"),\n F.count(\"*\").alias(\"transaction_count\")\n )\n )\n\n # Mode 1: simple (default) — compact tree\n df.explain()\n\n # Mode 2: extended — all 4 plan stages side by side\n df.explain(mode=\"extended\")\n\n # Mode 3: formatted — human-readable with operator details (RECOMMENDED)\n df.explain(mode=\"formatted\")\n\n # Mode 4: cost — includes estimated row counts and sizes (requires ANALYZE TABLE)\n df.explain(mode=\"cost\")\n\n # Mode 5: codegen — shows generated Java code for WholeStageCodegen\n df.explain(mode=\"codegen\")\n\n\n # ── Key Signals to Look For ───────────────────────────────────────────────────\n\n # ✅ GOOD signs:\n # *(N) prefix → WholeStageCodegen active (operators fused)\n # BroadcastHashJoin → small table correctly broadcast, no shuffle\n # PhotonXxx → Photon accelerating this operator\n # AdaptiveSparkPlan → AQE is engaged\n # PartitionFilters → Delta/Parquet file skipping active\n # PushedFilters → filters pushed to Parquet reader\n\n # ❌ WARNING signs:\n # Exchange (shuffle) → unexpected shuffle (missing broadcast hint?)\n # SortMergeJoin → large-large join (may need Z-ORDER or AQE tuning)\n # HashAggregate x2 → partial + final agg = shuffle involved\n # CartesianProduct → missing join condition! Will OOM on large tables\n # ObjectHashAggregate → non-codegen path, JVM overhead\n # GenerateXxx → explode() or similar, can't be fused\n\n\n # ── ANALYZE TABLE: feed statistics to CBO ─────────────────────────────────────\n # Without stats, Catalyst uses default estimates (1M rows, 8 bytes/col).\n # Run ANALYZE to give the Cost-Based Optimizer real numbers.\n\n spark.sql(\"ANALYZE TABLE prod.silver.events_clean COMPUTE STATISTICS\")\n spark.sql(\"\"\"\n ANALYZE TABLE prod.silver.events_clean\n COMPUTE STATISTICS FOR COLUMNS user_id, event_type, platform, revenue\n \"\"\")\n # Now explain(mode=\"cost\") shows real row counts and sizes\n\n\n## Tuning Reference Table\n\nA quick-reference guide for the most impactful Spark/Databricks configs, what they control, and when to change them:\n\nConfig Key | Default | What It Controls | When to Tune\n---|---|---|---\n`spark.sql.adaptive.enabled` | `true` | Master AQE switch | Keep on; only disable for debugging\n`spark.sql.adaptive.advisoryPartitionSizeInBytes` | `64mb` | Target post-coalesce partition size | Increase to `128mb`–`256mb` for large shuffles\n`spark.sql.adaptive.skewJoin.enabled` | `true` | AQE skew split | Keep on; tune `skewedPartitionFactor` if needed\n`spark.sql.autoBroadcastJoinThreshold` | `10mb` | Static BHJ threshold | Increase to `50mb`–`100mb` if executor memory allows\n`spark.sql.adaptive.autoBroadcastJoinThreshold` | `30mb` | AQE runtime BHJ threshold | Increase if AQE isn't catching small tables\n`spark.sql.shuffle.partitions` | `200` | Default shuffle partition count | Set to `8 × num_cores` for your cluster\n`spark.sql.files.maxPartitionBytes` | `128mb` | Max bytes per Parquet read partition | Reduce for high-parallelism scans\n`spark.databricks.photon.enabled` | `true` | Photon vectorized engine | Keep on; disable only for UDF-heavy jobs\n`spark.sql.codegen.wholeStage` | `true` | Whole-Stage CodeGen fusion | Keep on; disable only for debugging\n`spark.sql.statistics.histogram.enabled` | `false` | Column histograms for CBO | Enable after running ANALYZE TABLE\n`spark.sql.cbo.enabled` | `true` | Cost-Based Optimizer | Keep on; requires ANALYZE TABLE to be useful\n`spark.databricks.delta.optimizeWrite.enabled` | `true` | Auto bin-pack write files | Keep on for all Delta writes\n\n## Key Takeaways\n\n * **Catalyst has four stages** : Parse → Analyze → Optimize → Plan. Each stage has a distinct job, and understanding them tells you exactly where to look when a query misbehaves.\n * **Predicate pushdown and column pruning** are the two most impactful automatic optimizations — they reduce the data volume Spark has to move before any aggregation or join.\n * **AQE is not a set-and-forget feature** : tune `advisoryPartitionSizeInBytes` to your actual data sizes, and verify its decisions with `explain(mode=\"formatted\")` — look for `AdaptiveSparkPlan isFinalPlan=true`.\n * **Photon drops in transparently** for most SQL and DataFrame operations. The exceptions are Python UDFs, RDD operations, and some complex types — refactor these away from hot paths.\n * **Run`ANALYZE TABLE ... COMPUTE STATISTICS FOR COLUMNS`** on your most-joined tables. The CBO's join ordering and strategy decisions improve dramatically with real statistics vs. default estimates.\n * **`explain(mode=\"formatted\")`** is your most important debugging tool — learn to read it before reaching for cluster config changes.\n\n\n\n## References\n\n 1. **Apache Spark — Catalyst Optimizer (Deep Dive Paper, Armbrust et al., SIGMOD 2015)**\n🔗 https://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf\n\n 2. **Databricks — Adaptive Query Execution**\n🔗 https://docs.databricks.com/en/optimizations/aqe.html\n\n 3. **Apache Spark Docs — Adaptive Query Execution**\n🔗 https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution\n\n 4. **Databricks — Photon Runtime**\n🔗 https://docs.databricks.com/en/compute/photon.html\n\n 5. **Databricks Blog — Photon: A Fast Query Engine for Lakehouse Systems**\n🔗 https://www.databricks.com/blog/2021/06/17/photon-a-fast-query-engine-for-lakehouse-systems.html\n\n 6. **Databricks — Cost-Based Optimizer**\n🔗 https://docs.databricks.com/en/optimizations/cost-based-optimizer.html\n\n 7. **Apache Spark — Performance Tuning Guide**\n🔗 https://spark.apache.org/docs/latest/sql-performance-tuning.html\n\n 8. **Databricks — Broadcast Join Hints**\n🔗 https://docs.databricks.com/en/optimizations/join-reorder.html\n\n 9. **\"Photon: A Fast Query Engine for Lakehouse Systems\" (Behm et al., SIGMOD 2022)**\n🔗 https://dl.acm.org/doi/10.1145/3514221.3526054\n\n 10. **Spark by Examples — Explain Plan Modes**\n🔗 https://sparkbyexamples.com/spark/spark-explain-plan-modes/\n\n\n\n\n_Next in the series:_\n\n * _Part 4: Real-Time AI Feature Engineering with Spark Structured Streaming_\n * _Part 3: Apache Spark Query Optimization — Catalyst, AQE, and Photon Engine_\n\n\n\n_Previous in the series_\n\n * _Part 1: Building Production-Grade Delta Lake Pipelines with Apache Spark_\n * _Part 2: Fine-Tuning LLMs at Scale with Databricks MLflow and Spark_\n\n",
"title": "Apache Spark Query Optimization on Databricks: Catalyst, AQE, and Photon Engine"
}