{
  "$type": "site.standard.document",
  "bskyPostRef": {
    "cid": "bafyreifbskcetrnyc5kfp5wje56iqdjq7w57eho622g7cvbm3xchlj4u5e",
    "uri": "at://did:plc:25rdn5elo5izoxrmtis34zuk/app.bsky.feed.post/3mozn7lvpz6o2"
  },
  "coverImage": {
    "$type": "blob",
    "ref": {
      "$link": "bafkreiebye66ijnxpckj3qikkvmtu2mwusqvrpv7lkrhv7v26sti6jba3q"
    },
    "mimeType": "image/webp",
    "size": 309328
  },
  "path": "/jubinsoni/apache-spark-query-optimization-on-databricks-catalyst-aqe-and-photon-engine-4e5p",
  "publishedAt": "2026-06-24T09:32:37.000Z",
  "site": "https://dev.to",
  "tags": [
    "databricks",
    "spark",
    "python",
    "performance",
    "https://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf",
    "https://docs.databricks.com/en/optimizations/aqe.html",
    "https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution",
    "https://docs.databricks.com/en/compute/photon.html",
    "https://www.databricks.com/blog/2021/06/17/photon-a-fast-query-engine-for-lakehouse-systems.html",
    "https://docs.databricks.com/en/optimizations/cost-based-optimizer.html",
    "https://spark.apache.org/docs/latest/sql-performance-tuning.html",
    "https://docs.databricks.com/en/optimizations/join-reorder.html",
    "https://dl.acm.org/doi/10.1145/3514221.3526054",
    "https://sparkbyexamples.com/spark/spark-explain-plan-modes/",
    "Part 4: Real-Time AI Feature Engineering with Spark Structured Streaming",
    "Part 3: Apache Spark Query Optimization — Catalyst, AQE, and Photon Engine",
    "Part 1: Building Production-Grade Delta Lake Pipelines with Apache Spark",
    "Part 2: Fine-Tuning LLMs at Scale with Databricks MLflow and Spark"
  ],
  "textContent": "> A deep dive into how Spark transforms your SQL into a physical execution plan — and how Databricks layers Adaptive Query Execution and the Photon vectorized engine on top to squeeze out maximum performance.\n\n##  Table of Contents\n\n  * Why Query Optimization Matters\n  * The Catalyst Optimizer Pipeline\n  * Stage 1: Parsing — From SQL to Unresolved Logical Plan\n  * Stage 2: Analysis — Binding to the Catalog\n  * Stage 3: Logical Optimization — Rule-Based Rewrites\n  * Stage 4: Physical Planning — Strategies and Cost Models\n  * Adaptive Query Execution (AQE)\n  * The Photon Engine\n  * Reading Explain Plans\n  * Tuning Reference Table\n  * References\n\n\n\n##  Why Query Optimization Matters\n\nA Spark query written by a human and a Spark query executed by the engine are often very different things. The gap between them — the _optimization_ — is what separates a job that runs in 3 minutes from one that runs in 3 hours on identical hardware.\n\nDatabricks compounds Spark's native Catalyst optimizer with two additional layers:\n\n  * **Adaptive Query Execution (AQE)** — re-optimizes the query _at runtime_ using actual statistics collected mid-job\n  * **Photon** — a C++ vectorized execution engine that replaces the JVM-based Spark executor for eligible operators\n\n\n\nUnderstanding all three lets you write queries that cooperate with the engine rather than fight it.\n\n##  The Catalyst Optimizer Pipeline\n\nCatalyst is Spark's rule-based and cost-based query optimizer. Every query — whether written in SQL, DataFrame API, or Dataset API — passes through the same four-stage pipeline before a single byte of data is read.\n\n##  Stage 1: Parsing — From SQL to Unresolved Logical Plan\n\n\n    # ── Catalyst Stage 1: Parsing ─────────────────────────────────────────────────\n    # Spark uses ANTLR4 to parse SQL into an Abstract Syntax Tree (AST).\n    # At this point column names are NOT validated — the plan is \"unresolved\".\n\n    from pyspark.sql import SparkSession\n\n    spark = SparkSession.builder.appName(\"catalyst-demo\").getOrCreate()\n\n    # Both of these produce identical internal representations\n    df_api = (\n        spark.table(\"prod.silver.events_clean\")\n            .filter(\"event_type = 'purchase'\")\n            .groupBy(\"platform\")\n            .agg({\"revenue\": \"sum\"})\n    )\n\n    sql_api = spark.sql(\"\"\"\n        SELECT platform, SUM(revenue) AS total_revenue\n        FROM prod.silver.events_clean\n        WHERE event_type = 'purchase'\n        GROUP BY platform\n    \"\"\")\n\n    # Inspect the unresolved logical plan (before analysis)\n    df_api.explain(mode=\"formatted\")\n    # Output includes:\n    # == Parsed Logical Plan ==\n    # 'Aggregate ['platform], ['platform, unresolvedAlias('sum('revenue), None)]\n    #   +- 'Filter ('event_type = 'purchase)\n    #      +- 'UnresolvedRelation [prod, silver, events_clean]\n\n\nThe key insight here: `UnresolvedRelation` and `unresolvedAlias` mean Spark hasn't touched the catalog yet. Column names could be typos at this point and Catalyst doesn't know.\n\n##  Stage 2: Analysis — Binding to the Catalog\n\nThe Analyzer walks the unresolved AST and looks up every relation and attribute against the **Catalog** (in Databricks, this is Unity Catalog). It resolves column names, infers data types, validates references, and binds functions.\n\n\n\n    # ── Catalyst Stage 2: Analysis ────────────────────────────────────────────────\n    # After analysis, every column is resolved to a specific attribute with a type.\n    # AnalysisException is thrown HERE if a column doesn't exist.\n\n    from pyspark.sql import functions as F\n    from pyspark.sql.utils import AnalysisException\n\n    # Example of what Analysis catches:\n    try:\n        spark.table(\"prod.silver.events_clean\") \\\n            .select(\"nonexistent_column\") \\\n            .show()\n    except AnalysisException as e:\n        print(f\"Analysis failed: {e}\")\n        # → AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION]\n        #   A column or function parameter with name `nonexistent_column` cannot be resolved.\n\n    # After successful analysis, inspect the resolved plan\n    df = (\n        spark.table(\"prod.silver.events_clean\")\n            .filter(F.col(\"event_type\") == \"purchase\")\n            .select(\"platform\", \"revenue\", \"user_id\")\n    )\n\n    # The analyzed plan shows fully qualified attribute IDs like:\n    # == Analyzed Logical Plan ==\n    # platform: string, revenue: double, user_id: string\n    # Project [platform#42, revenue#67, user_id#31]\n    #   +- Filter (event_type#39 = purchase)\n    #      +- Relation prod.silver.events_clean[...] parquet\n    print(df._jdf.queryExecution().analyzed())\n\n\n##  Stage 3: Logical Optimization — Rule-Based Rewrites\n\nThis is where Catalyst applies its ~100+ built-in rules to produce an equivalent but cheaper logical plan. Rules fire repeatedly in fixed-point iteration until the plan stabilises.\n\n\n\n    # ── Catalyst Stage 3: Key Optimization Rules ──────────────────────────────────\n\n    # RULE 1: Predicate Pushdown\n    # Catalyst moves filters as close to the data source as possible,\n    # so Spark reads fewer rows from Parquet.\n\n    df_before = (\n        spark.table(\"prod.silver.events_clean\")\n            .join(\n                spark.table(\"prod.silver.users_clean\"),\n                on=\"user_id\"\n            )\n            .filter(F.col(\"event_type\") == \"purchase\")  # ← filter AFTER join\n    )\n\n    # Catalyst rewrites this internally as if you wrote:\n    df_after_equivalent = (\n        spark.table(\"prod.silver.events_clean\")\n            .filter(F.col(\"event_type\") == \"purchase\")  # ← filter BEFORE join\n            .join(\n                spark.table(\"prod.silver.users_clean\"),\n                on=\"user_id\"\n            )\n    )\n    # Result: potentially millions fewer rows shuffled during the join\n\n    # RULE 2: Column Pruning\n    # Catalyst removes columns not needed by downstream operators.\n    # Even if you select(*), Spark will only read the columns it needs.\n    df_pruned = (\n        spark.table(\"prod.silver.events_clean\")\n            .select(\"*\")\n            .filter(F.col(\"event_type\") == \"purchase\")\n            .groupBy(\"platform\")\n            .agg(F.sum(\"revenue\").alias(\"total_revenue\"))\n    )\n    # Internally, Catalyst prunes all columns except: event_type, platform, revenue\n\n    # RULE 3: Constant Folding\n    # Expressions with only literals are evaluated at plan time, not per-row.\n    df_constants = spark.range(1000).select(\n        F.lit(2 + 3 * 4).alias(\"always_14\"),      # folded to Literal(14) at plan time\n        F.col(\"id\") * F.lit(1).alias(\"same_id\"),   # simplified to just col(\"id\")\n    )\n\n    # RULE 4: Boolean Simplification\n    # AND/OR chains with tautologies or contradictions are collapsed\n    df_simplified = spark.range(100).filter(\n        (F.col(\"id\") > 10) & F.lit(True)   # simplified to just (col(\"id\") > 10)\n    )\n\n    # See all optimizations applied:\n    print(df_pruned._jdf.queryExecution().optimizedPlan())\n\n\n##  Stage 4: Physical Planning — Strategies and Cost Models\n\nThe physical planner maps each logical operator to one or more physical implementations and selects the best one using a cost model. The most impactful decision here is **join strategy selection**.\n\n\n\n    # ── Catalyst Stage 4: Physical Planning & Join Strategies ────────────────────\n\n    # JOIN STRATEGY 1: Broadcast Hash Join (BHJ)\n    # Best when one side is small enough to fit in executor memory.\n    # No shuffle — the small table is broadcast to all workers.\n\n    spark.conf.set(\"spark.sql.autoBroadcastJoinThreshold\", \"10mb\")  # default\n\n    large_df = spark.table(\"prod.silver.events_clean\")      # 500GB\n    small_df = spark.table(\"prod.gold.product_catalog\")     # 8MB ← will be broadcast\n\n    result_bhj = large_df.join(small_df, on=\"product_id\")  # BHJ auto-selected\n\n    # Force BHJ with a broadcast hint (overrides threshold check):\n    from pyspark.sql.functions import broadcast\n    result_forced = large_df.join(broadcast(small_df), on=\"product_id\")\n\n\n    # JOIN STRATEGY 2: Sort Merge Join (SMJ)\n    # Default for large-large joins. Both sides are sorted and merged.\n    # Requires a full shuffle — expensive but handles any size.\n\n    spark.conf.set(\"spark.sql.autoBroadcastJoinThreshold\", \"-1\")  # disable BHJ\n    large_df2 = spark.table(\"prod.silver.transactions_clean\")   # 200GB\n\n    result_smj = large_df.join(large_df2, on=\"user_id\")  # SMJ selected\n\n\n    # JOIN STRATEGY 3: Shuffle Hash Join (SHJ)\n    # Hash-based, no sort. Chosen by AQE when one side is much smaller\n    # than the other but still above the broadcast threshold.\n    spark.conf.set(\"spark.sql.join.preferSortMergeJoin\", \"false\")\n\n\n    # WHOLE-STAGE CODEGEN: Spark fuses multiple operators into a single\n    # Java function to avoid virtual dispatch overhead and intermediate objects.\n    # Verify it's active in your plan:\n    spark.conf.set(\"spark.sql.codegen.wholeStage\", \"true\")  # default\n\n    result_bhj.explain(mode=\"formatted\")\n    # Look for: *(1) BroadcastHashJoin — the *(N) prefix = WholeStageCodegen stage N\n\n\n##  Adaptive Query Execution (AQE)\n\nAQE is Databricks' most impactful runtime optimization layer. It materializes shuffle map output statistics at **shuffle boundaries** and uses them to make three key decisions _after_ data has been partially processed.\n\n\n\n\n\n    # ── AQE Configuration ─────────────────────────────────────────────────────────\n\n    # AQE is ON by default in Databricks Runtime 7.3+\n    spark.conf.set(\"spark.sql.adaptive.enabled\", \"true\")\n\n    # 1. Dynamic Partition Coalescing\n    # Merges small post-shuffle partitions to avoid thousands of tiny tasks\n    spark.conf.set(\"spark.sql.adaptive.coalescePartitions.enabled\",       \"true\")\n    spark.conf.set(\"spark.sql.adaptive.advisoryPartitionSizeInBytes\",     \"128mb\")\n    spark.conf.set(\"spark.sql.adaptive.coalescePartitions.minPartitionNum\", \"1\")\n\n    # 2. Dynamic Join Strategy Switching\n    # Allows AQE to downgrade SMJ → BHJ at runtime if a side turns out small\n    spark.conf.set(\"spark.sql.adaptive.localShuffleReader.enabled\",       \"true\")\n\n    # AQE broadcast threshold (can be higher than static threshold since\n    # we now KNOW the actual size)\n    spark.conf.set(\"spark.sql.adaptive.autoBroadcastJoinThreshold\",       \"30mb\")\n\n    # 3. Skew Join Optimization\n    # Splits oversized partitions and replicates the non-skewed side\n    spark.conf.set(\"spark.sql.adaptive.skewJoin.enabled\",                 \"true\")\n    spark.conf.set(\"spark.sql.adaptive.skewJoin.skewedPartitionFactor\",   \"5\")    # 5x median\n    spark.conf.set(\"spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes\", \"256mb\")\n\n    # Verify AQE decisions in the query plan:\n    df = (\n        spark.table(\"prod.silver.events_clean\")\n            .join(spark.table(\"prod.silver.users_clean\"), on=\"user_id\")\n            .groupBy(\"platform\")\n            .agg(F.sum(\"revenue\").alias(\"total\"))\n    )\n    df.explain(mode=\"formatted\")\n    # Look for: AdaptiveSparkPlan isFinalPlan=true\n    # and: == Final Physical Plan == (shows post-AQE decisions)\n\n\n##  The Photon Engine\n\nPhoton is Databricks' native vectorized query engine written in C++. It replaces the JVM-based Spark executor for eligible operations, processing data in **column-oriented batches** (vectors) rather than row-by-row.\n\n\n\n    # ── Photon Configuration & Verification ───────────────────────────────────────\n    # Photon is available on Databricks Runtime 9.1+ with Photon-enabled clusters.\n    # Enable it at the cluster level (UI: Cluster > Configuration > Enable Photon)\n    # or via config:\n\n    spark.conf.set(\"spark.databricks.photon.enabled\", \"true\")\n\n    # Photon-accelerated operators (as of DBR 13.x):\n    # ✅ Scan (Parquet, Delta)         ✅ Filter / Project\n    # ✅ Hash Aggregate                ✅ Sort\n    # ✅ Broadcast Hash Join           ✅ Sort Merge Join\n    # ✅ Window functions              ✅ Union / Expand\n    # ✅ String functions              ✅ Math functions\n    # ❌ UDFs (Python/Scala)          ❌ Some complex types\n    # ❌ Streaming (partial)          ❌ RDD-based operations\n\n    # Verify Photon is executing your query:\n    df = spark.sql(\"\"\"\n        SELECT\n            platform,\n            DATE_TRUNC('month', event_ts)   AS month,\n            SUM(revenue)                    AS total_revenue,\n            COUNT(DISTINCT user_id)         AS unique_buyers,\n            AVG(revenue)                    AS avg_order_value\n        FROM prod.silver.events_clean\n        WHERE event_type = 'purchase'\n          AND event_ts >= '2024-01-01'\n        GROUP BY platform, DATE_TRUNC('month', event_ts)\n        ORDER BY month DESC, total_revenue DESC\n    \"\"\")\n\n    df.explain(mode=\"formatted\")\n    # Look for operators prefixed with \"Photon\" in the physical plan:\n    # == Physical Plan ==\n    # PhotonResultStage\n    # +- PhotonSort [month DESC NULLS LAST, total_revenue DESC NULLS LAST]\n    #    +- PhotonShuffleExchangeSink hashpartitioning(platform, month)\n    #       +- PhotonGroupingAgg [platform, month], [sum(revenue), count(user_id), avg(revenue)]\n    #          +- PhotonFilter (event_type = purchase AND event_ts >= 2024-01-01)\n    #             +- PhotonScan parquet prod.silver.events_clean\n\n    # Photon performance metrics appear in Spark UI under \"Photon Metrics\":\n    # - Photon scan time\n    # - Photon total compute time\n    # - Rows processed by Photon vs fallback JVM\n\n\n##  Reading Explain Plans\n\nThe `explain(mode=\"formatted\")` output is your primary debugging tool. Here's how to read it efficiently:\n\n\n\n    # ── Explain Plan Modes ────────────────────────────────────────────────────────\n\n    df = (\n        spark.table(\"prod.silver.events_clean\")\n            .filter(F.col(\"event_type\") == \"purchase\")\n            .join(broadcast(spark.table(\"prod.gold.product_catalog\")), on=\"product_id\")\n            .groupBy(\"platform\", \"category\")\n            .agg(\n                F.sum(\"revenue\").alias(\"total_revenue\"),\n                F.count(\"*\").alias(\"transaction_count\")\n            )\n    )\n\n    # Mode 1: simple (default) — compact tree\n    df.explain()\n\n    # Mode 2: extended — all 4 plan stages side by side\n    df.explain(mode=\"extended\")\n\n    # Mode 3: formatted — human-readable with operator details (RECOMMENDED)\n    df.explain(mode=\"formatted\")\n\n    # Mode 4: cost — includes estimated row counts and sizes (requires ANALYZE TABLE)\n    df.explain(mode=\"cost\")\n\n    # Mode 5: codegen — shows generated Java code for WholeStageCodegen\n    df.explain(mode=\"codegen\")\n\n\n    # ── Key Signals to Look For ───────────────────────────────────────────────────\n\n    # ✅ GOOD signs:\n    # *(N) prefix          → WholeStageCodegen active (operators fused)\n    # BroadcastHashJoin    → small table correctly broadcast, no shuffle\n    # PhotonXxx            → Photon accelerating this operator\n    # AdaptiveSparkPlan    → AQE is engaged\n    # PartitionFilters     → Delta/Parquet file skipping active\n    # PushedFilters        → filters pushed to Parquet reader\n\n    # ❌ WARNING signs:\n    # Exchange (shuffle)   → unexpected shuffle (missing broadcast hint?)\n    # SortMergeJoin        → large-large join (may need Z-ORDER or AQE tuning)\n    # HashAggregate x2     → partial + final agg = shuffle involved\n    # CartesianProduct     → missing join condition! Will OOM on large tables\n    # ObjectHashAggregate  → non-codegen path, JVM overhead\n    # GenerateXxx          → explode() or similar, can't be fused\n\n\n    # ── ANALYZE TABLE: feed statistics to CBO ─────────────────────────────────────\n    # Without stats, Catalyst uses default estimates (1M rows, 8 bytes/col).\n    # Run ANALYZE to give the Cost-Based Optimizer real numbers.\n\n    spark.sql(\"ANALYZE TABLE prod.silver.events_clean COMPUTE STATISTICS\")\n    spark.sql(\"\"\"\n        ANALYZE TABLE prod.silver.events_clean\n        COMPUTE STATISTICS FOR COLUMNS user_id, event_type, platform, revenue\n    \"\"\")\n    # Now explain(mode=\"cost\") shows real row counts and sizes\n\n\n##  Tuning Reference Table\n\nA quick-reference guide for the most impactful Spark/Databricks configs, what they control, and when to change them:\n\nConfig Key | Default | What It Controls | When to Tune\n---|---|---|---\n`spark.sql.adaptive.enabled` | `true` | Master AQE switch | Keep on; only disable for debugging\n`spark.sql.adaptive.advisoryPartitionSizeInBytes` | `64mb` | Target post-coalesce partition size | Increase to `128mb`–`256mb` for large shuffles\n`spark.sql.adaptive.skewJoin.enabled` | `true` | AQE skew split | Keep on; tune `skewedPartitionFactor` if needed\n`spark.sql.autoBroadcastJoinThreshold` | `10mb` | Static BHJ threshold | Increase to `50mb`–`100mb` if executor memory allows\n`spark.sql.adaptive.autoBroadcastJoinThreshold` | `30mb` | AQE runtime BHJ threshold | Increase if AQE isn't catching small tables\n`spark.sql.shuffle.partitions` | `200` | Default shuffle partition count | Set to `8 × num_cores` for your cluster\n`spark.sql.files.maxPartitionBytes` | `128mb` | Max bytes per Parquet read partition | Reduce for high-parallelism scans\n`spark.databricks.photon.enabled` | `true` | Photon vectorized engine | Keep on; disable only for UDF-heavy jobs\n`spark.sql.codegen.wholeStage` | `true` | Whole-Stage CodeGen fusion | Keep on; disable only for debugging\n`spark.sql.statistics.histogram.enabled` | `false` | Column histograms for CBO | Enable after running ANALYZE TABLE\n`spark.sql.cbo.enabled` | `true` | Cost-Based Optimizer | Keep on; requires ANALYZE TABLE to be useful\n`spark.databricks.delta.optimizeWrite.enabled` | `true` | Auto bin-pack write files | Keep on for all Delta writes\n\n##  Key Takeaways\n\n  * **Catalyst has four stages** : Parse → Analyze → Optimize → Plan. Each stage has a distinct job, and understanding them tells you exactly where to look when a query misbehaves.\n  * **Predicate pushdown and column pruning** are the two most impactful automatic optimizations — they reduce the data volume Spark has to move before any aggregation or join.\n  * **AQE is not a set-and-forget feature** : tune `advisoryPartitionSizeInBytes` to your actual data sizes, and verify its decisions with `explain(mode=\"formatted\")` — look for `AdaptiveSparkPlan isFinalPlan=true`.\n  * **Photon drops in transparently** for most SQL and DataFrame operations. The exceptions are Python UDFs, RDD operations, and some complex types — refactor these away from hot paths.\n  * **Run`ANALYZE TABLE ... COMPUTE STATISTICS FOR COLUMNS`** on your most-joined tables. The CBO's join ordering and strategy decisions improve dramatically with real statistics vs. default estimates.\n  * **`explain(mode=\"formatted\")`** is your most important debugging tool — learn to read it before reaching for cluster config changes.\n\n\n\n##  References\n\n  1. **Apache Spark — Catalyst Optimizer (Deep Dive Paper, Armbrust et al., SIGMOD 2015)**\n🔗 https://people.csail.mit.edu/matei/papers/2015/sigmod_spark_sql.pdf\n\n  2. **Databricks — Adaptive Query Execution**\n🔗 https://docs.databricks.com/en/optimizations/aqe.html\n\n  3. **Apache Spark Docs — Adaptive Query Execution**\n🔗 https://spark.apache.org/docs/latest/sql-performance-tuning.html#adaptive-query-execution\n\n  4. **Databricks — Photon Runtime**\n🔗 https://docs.databricks.com/en/compute/photon.html\n\n  5. **Databricks Blog — Photon: A Fast Query Engine for Lakehouse Systems**\n🔗 https://www.databricks.com/blog/2021/06/17/photon-a-fast-query-engine-for-lakehouse-systems.html\n\n  6. **Databricks — Cost-Based Optimizer**\n🔗 https://docs.databricks.com/en/optimizations/cost-based-optimizer.html\n\n  7. **Apache Spark — Performance Tuning Guide**\n🔗 https://spark.apache.org/docs/latest/sql-performance-tuning.html\n\n  8. **Databricks — Broadcast Join Hints**\n🔗 https://docs.databricks.com/en/optimizations/join-reorder.html\n\n  9. **\"Photon: A Fast Query Engine for Lakehouse Systems\" (Behm et al., SIGMOD 2022)**\n🔗 https://dl.acm.org/doi/10.1145/3514221.3526054\n\n  10. **Spark by Examples — Explain Plan Modes**\n🔗 https://sparkbyexamples.com/spark/spark-explain-plan-modes/\n\n\n\n\n_Next in the series:_\n\n  * _Part 4: Real-Time AI Feature Engineering with Spark Structured Streaming_\n  * _Part 3: Apache Spark Query Optimization — Catalyst, AQE, and Photon Engine_\n\n\n\n_Previous in the series_\n\n  * _Part 1: Building Production-Grade Delta Lake Pipelines with Apache Spark_\n  * _Part 2: Fine-Tuning LLMs at Scale with Databricks MLflow and Spark_\n\n",
  "title": "Apache Spark Query Optimization on Databricks: Catalyst, AQE, and Photon Engine"
}