{
"$type": "site.standard.document",
"bskyPostRef": {
"cid": "bafyreidq4mmjjc2u5k4f5fqmh6353apwfddub352m2asfvo7zam6nl4zcq",
"uri": "at://did:plc:25rdn5elo5izoxrmtis34zuk/app.bsky.feed.post/3mpcugjthj7m2"
},
"coverImage": {
"$type": "blob",
"ref": {
"$link": "bafkreihj6672bg2hvm5xdsn37xllbozfomub7o73oq6pqitlvtiok523ui"
},
"mimeType": "image/webp",
"size": 93116
},
"path": "/jubinsoni/azure-databricks-for-feature-engineering-at-scale-with-apache-spark-delta-lake-and-mlflow-3k4n",
"publishedAt": "2026-06-28T01:35:55.000Z",
"site": "https://dev.to",
"tags": [
"azure",
"databricks",
"spark",
"mlops",
"Azure Databricks Documentation",
"Delta Lake — The Definitive Guide",
"Apache Spark SQL — Window Functions",
"MLflow Tracking Documentation",
"MLflow Model Registry",
"Medallion Architecture on Databricks",
"Delta Lake Time Travel",
"Databricks Feature Store Overview"
],
"textContent": "Raw data doesn't win model competitions. Features do. And when your raw data is tens of billions of rows sitting across multiple sources, you can't afford to run pandas in a notebook and call it a day.\n\nIn this tutorial I'll walk through building a production-grade feature engineering pipeline on **Azure Databricks** using:\n\n * **Apache Spark** for distributed transformation at scale\n * **Delta Lake** for reliable, versioned feature storage with ACID guarantees\n * **MLflow** for tracking feature pipeline runs, parameters, and the models trained on top of them\n\n\n\nThe use case is a customer churn prediction system, but the patterns apply to any ML feature pipeline.\n\n## Architecture Overview\n\nThe pipeline follows the **Medallion Architecture** — a layered approach where data gets progressively cleaner and more feature-ready as it moves from Bronze to Silver to Gold. MLflow sits across all three layers tracking every run.\n\n## Pipeline Flow\n\n## Layer Breakdown\n\nLayer | Delta Table | What happens here | Typical latency\n---|---|---|---\n**Bronze** | `churn.bronze.events` | Raw ingest, no transforms, append only | Minutes\n**Silver** | `churn.silver.customers` | Deduplication, null handling, schema enforcement | Minutes\n**Gold** | `churn.gold.features` | Aggregations, window functions, encoding | Minutes to hours\n**MLflow Run** | N/A | Training, metric logging, artifact storage | Hours\n**Registry** | N/A | Versioned model store, stage promotion | On demand\n\n## Step 1 — Bronze Layer: Raw Ingest\n\nThe Bronze layer is append-only. No transforms. No business logic. Just get the data in and preserve it exactly as it arrived so you can always replay from source.\n\n\n\n from pyspark.sql import SparkSession\n from pyspark.sql.functions import current_timestamp, lit\n from delta.tables import DeltaTable\n\n spark = SparkSession.builder.getOrCreate()\n\n # Read raw events from ADLS Gen2 / Event Hub / source of choice\n raw_events = spark.read.format('json').load('abfss://raw@yourstorage.dfs.core.windows.net/events/')\n\n # Add ingestion metadata — never mutate source columns\n bronze_df = raw_events.withColumn('_ingested_at', current_timestamp()) \\\n .withColumn('_source', lit('events_api'))\n\n # Write to Bronze Delta table — append only, no overwrites\n bronze_df.write \\\n .format('delta') \\\n .mode('append') \\\n .option('mergeSchema', 'true') \\\n .saveAsTable('churn.bronze.events')\n\n print(f\"Bronze rows written: {bronze_df.count()}\")\n\n\n> **Why append-only?** If your downstream pipeline produces bad features, you want to replay from Bronze without re-ingesting from source. Overwriting Bronze breaks that ability.\n\n## Step 2 — Silver Layer: Clean and Validate\n\nSilver is where you enforce schema, handle nulls, deduplicate, and standardize. Think of it as your canonical, trusted dataset.\n\n\n\n from pyspark.sql.functions import col, to_timestamp, when, trim, upper\n from delta.tables import DeltaTable\n\n bronze = spark.table('churn.bronze.events')\n\n silver_df = bronze \\\n .filter(col('customer_id').isNotNull()) \\\n .filter(col('event_type').isNotNull()) \\\n .dropDuplicates(['customer_id', 'event_id']) \\\n .withColumn('event_ts', to_timestamp(col('event_timestamp'))) \\\n .withColumn('event_type', upper(trim(col('event_type')))) \\\n .withColumn('country_code', when(col('country').isNull(), lit('UNKNOWN'))\n .otherwise(upper(col('country')))) \\\n .select(\n 'customer_id',\n 'event_id',\n 'event_type',\n 'event_ts',\n 'country_code',\n 'product_id',\n 'session_id',\n '_ingested_at',\n )\n\n # Upsert into Silver using Delta MERGE — idempotent on re-runs\n if DeltaTable.isDeltaTable(spark, 'churn.silver.customers'):\n silver_table = DeltaTable.forName(spark, 'churn.silver.customers')\n silver_table.alias('tgt').merge(\n silver_df.alias('src'),\n 'tgt.customer_id = src.customer_id AND tgt.event_id = src.event_id'\n ).whenNotMatchedInsertAll().execute()\n else:\n silver_df.write.format('delta').saveAsTable('churn.silver.customers')\n\n print(f\"Silver table updated. Total rows: {spark.table('churn.silver.customers').count()}\")\n\n\n## Step 3 — Gold Layer: Feature Engineering\n\nThis is the heart of the pipeline. We compute aggregated, windowed, and encoded features that the model will actually train on.\n\n\n\n from pyspark.sql.functions import (\n col, count, countDistinct, sum as _sum,\n avg, datediff, max as _max, min as _min,\n current_date, expr, when\n )\n from pyspark.sql.window import Window\n\n silver = spark.table('churn.silver.customers')\n\n # ------------------------------------------------------------------\n # 1. Aggregate features per customer over 30 / 90 day windows\n # ------------------------------------------------------------------\n today = current_date()\n\n agg_features = silver \\\n .withColumn('days_since_event', datediff(today, col('event_ts'))) \\\n .groupBy('customer_id') \\\n .agg(\n count('event_id') .alias('total_events'),\n countDistinct('session_id') .alias('total_sessions'),\n countDistinct('product_id') .alias('distinct_products'),\n _sum(when(col('days_since_event') <= 30, 1).otherwise(0)) .alias('events_last_30d'),\n _sum(when(col('days_since_event') <= 90, 1).otherwise(0)) .alias('events_last_90d'),\n _max('event_ts') .alias('last_event_ts'),\n _min('event_ts') .alias('first_event_ts'),\n ) \\\n .withColumn('days_since_last_event', datediff(today, col('last_event_ts'))) \\\n .withColumn('customer_tenure_days', datediff(today, col('first_event_ts'))) \\\n .withColumn('avg_events_per_day',\n col('total_events') / (col('customer_tenure_days') + 1))\n\n # ------------------------------------------------------------------\n # 2. Encode churn risk tier as ordinal feature\n # ------------------------------------------------------------------\n feature_df = agg_features \\\n .withColumn('recency_tier',\n when(col('days_since_last_event') <= 7, lit(3)) # active\n .when(col('days_since_last_event') <= 30, lit(2)) # at risk\n .otherwise(lit(1)) # churned\n ) \\\n .withColumn('engagement_score',\n (col('events_last_30d') * 0.6 + col('events_last_90d') * 0.4) /\n (col('customer_tenure_days') + 1)\n )\n\n # ------------------------------------------------------------------\n # 3. Write to Gold feature store — overwrite with partition by date\n # ------------------------------------------------------------------\n feature_df \\\n .withColumn('feature_date', current_date()) \\\n .write \\\n .format('delta') \\\n .mode('overwrite') \\\n .option('replaceWhere', f\"feature_date = '{today}'\") \\\n .saveAsTable('churn.gold.features')\n\n print(f\"Gold features written: {feature_df.count()} customers\")\n\n\n## Step 4 — MLflow: Track the Training Run\n\nWith features in Gold, we hand off to MLflow to train, track, and register the model. Notice we log the Delta table version so we can always reproduce exactly which feature snapshot trained which model.\n\n\n\n import mlflow\n import mlflow.sklearn\n from mlflow.models.signature import infer_signature\n from sklearn.ensemble import GradientBoostingClassifier\n from sklearn.model_selection import train_test_split\n from sklearn.metrics import roc_auc_score, f1_score\n import pandas as pd\n\n mlflow.set_experiment('/churn-prediction/feature-pipeline')\n\n # Read Gold features — capture Delta version for reproducibility\n gold_table = DeltaTable.forName(spark, 'churn.gold.features')\n delta_version = gold_table.history(1).select('version').collect()[0][0]\n\n features_pdf = spark.table('churn.gold.features').toPandas()\n\n FEATURE_COLS = [\n 'total_events', 'total_sessions', 'distinct_products',\n 'events_last_30d', 'events_last_90d', 'days_since_last_event',\n 'customer_tenure_days', 'avg_events_per_day',\n 'recency_tier', 'engagement_score',\n ]\n TARGET = 'churned'\n\n X = features_pdf[FEATURE_COLS]\n y = features_pdf[TARGET]\n X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)\n\n with mlflow.start_run(run_name=f'gbm-features-v{delta_version}') as run:\n\n params = {'n_estimators': 200, 'max_depth': 5, 'learning_rate': 0.05}\n model = GradientBoostingClassifier(**params, random_state=42)\n model.fit(X_train, y_train)\n\n y_pred = model.predict(X_test)\n y_prob = model.predict_proba(X_test)[:, 1]\n\n # Log everything\n mlflow.log_params(params)\n mlflow.log_metric('roc_auc', roc_auc_score(y_test, y_prob))\n mlflow.log_metric('f1_score', f1_score(y_test, y_pred))\n mlflow.log_param('delta_feature_version', delta_version)\n mlflow.log_param('feature_columns', FEATURE_COLS)\n mlflow.log_param('training_rows', len(X_train))\n\n # Log model with signature\n signature = infer_signature(X_train, y_pred)\n mlflow.sklearn.log_model(\n model,\n artifact_path='churn-gbm',\n signature=signature,\n registered_model_name='churn-prediction-gbm',\n )\n\n print(f\"Run ID: {run.info.run_id}\")\n print(f\"ROC-AUC: {roc_auc_score(y_test, y_prob):.4f}\")\n print(f\"Feature Delta version logged: {delta_version}\")\n\n\n## Bonus: Delta Lake Time Travel for Feature Reproducibility\n\nOne of the best things about Delta Lake is time travel. If a model behaves unexpectedly in production, you can reload the exact feature snapshot it was trained on.\n\n\n\n # Reload the exact feature version that trained a specific model run\n import mlflow\n\n run = mlflow.get_run('your-run-id-here')\n feature_version = int(run.data.params['delta_feature_version'])\n\n # Rehydrate that exact feature snapshot\n historical_features = spark.read \\\n .format('delta') \\\n .option('versionAsOf', feature_version) \\\n .table('churn.gold.features')\n\n print(f\"Loaded feature snapshot from Delta version {feature_version}\")\n print(f\"Row count: {historical_features.count()}\")\n\n # You can now retrain on the exact same data to reproduce the result\n\n\n## Service Comparison\n\nTool | Role in pipeline | Why not the alternative\n---|---|---\n**Apache Spark** | Distributed feature computation | Pandas (single node, OOM at scale), Dask (less native Databricks integration)\n**Delta Lake** | Feature storage with versioning | Parquet (no ACID, no time travel), Hive tables (no merge support)\n**MLflow Tracking** | Experiment and param logging | Manual logging (not reproducible), W&B (extra cost, less native on Databricks)\n**MLflow Registry** | Model versioning and promotion | Custom model store (more ops overhead)\n**Medallion Architecture** | Pipeline layer separation | Flat pipelines (hard to debug, no replay capability)\n**Delta MERGE** | Idempotent Silver upserts | Overwrite (destroys history), append (creates duplicates)\n\n## Things to Watch in Production\n\n**Shuffle partitions matter.** Spark defaults to 200 shuffle partitions which is fine for small data but will bottleneck at scale. Set `spark.conf.set(\"spark.sql.shuffle.partitions\", \"auto\")` on Databricks Runtime 10+ or tune it manually to `2-3x your core count`.\n\n**Z-ordering on Gold features.** If you're querying Gold by `customer_id` frequently, add `OPTIMIZE churn.gold.features ZORDER BY (customer_id)` after the write. This co-locates related data and cuts query times dramatically on large tables.\n\n**Log Delta version in every MLflow run.** This is non-negotiable for reproducibility. Without it you can't prove which feature snapshot trained which model, which becomes a compliance problem in regulated industries.\n\n**Cluster autoscaling for feature jobs.** Feature engineering jobs tend to have spiky resource needs (big during aggregation, small during writes). Enable autoscaling on your Databricks cluster and set a min/max node count rather than a fixed size.\n\n## Wrapping Up\n\nThe combination of Spark, Delta Lake, and MLflow on Databricks gives you a feature engineering pipeline that is reproducible (Delta time travel + MLflow param logging), scalable (Spark handles billions of rows), and auditable (every run is tracked, every feature version is stored).\n\nThe Medallion Architecture keeps the pipeline modular — you can rerun just the Gold layer if you change a feature definition without touching Bronze or Silver, and MLflow ties model performance back to the exact feature version that produced it.\n\n## References\n\n * Azure Databricks Documentation\n * Delta Lake — The Definitive Guide\n * Apache Spark SQL — Window Functions\n * MLflow Tracking Documentation\n * MLflow Model Registry\n * Medallion Architecture on Databricks\n * Delta Lake Time Travel\n * Databricks Feature Store Overview\n\n",
"title": "Azure Databricks for MLOps and Feature Engineering at Scale with Apache Spark, Delta Lake, and MLflow"
}