Execute Databricks primary workflow: Delta Lake ETL pipelines. Use when building data ingestion pipelines, implementing medallion architecture, or creating Delta Lake transformations. Trigger with phrases like "databricks ETL", "delta lake pipeline", "medallion architecture", "databricks data pipeline", "bronze silver gold".
Build production Delta Lake ETL pipelines using the medallion architecture (Bronze > Silver > Gold). Uses Auto Loader (cloudFiles) for incremental ingestion, MERGE INTO for upserts, and Delta Live Tables for declarative pipelines.
databricks-install-auth setupRaw Sources (S3/ADLS/GCS)
│ Auto Loader (cloudFiles)
▼
Bronze (raw + metadata)
│ Cleanse, deduplicate, type-cast
▼
Silver (conformed)
│ Aggregate, join, feature engineer
▼
Gold (analytics-ready)
Auto Loader (cloudFiles format) incrementally processes new files as they arrive. It handles schema inference, evolution, and scales to millions of files.
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, input_file_name, lit
spark = SparkSession.builder.getOrCreate()
# Streaming ingestion with Auto Loader
bronze_stream = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", "/checkpoints/bronze/orders/schema")
.option("cloudFiles.inferColumnTypes", "true")
.option("cloudFiles.schemaEvolutionMode", "addNewColumns")
.load("s3://data-lake/raw/orders/")
)
# Add ingestion metadata
bronze_with_meta = (
bronze_stream
.withColumn("_ingested_at", current_timestamp())
.withColumn("_source_file", input_file_name())
.withColumn("_source_system", lit("orders-api"))
)
# Write to bronze Delta table
(bronze_with_meta.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/checkpoints/bronze/orders/data")
.option("mergeSchema", "true")
.toTable("prod_catalog.bronze.raw_orders"))
Read from Bronze, apply business logic, and MERGE INTO Silver with upsert semantics.
from pyspark.sql.functions import col, trim, lower, to_timestamp, sha2, concat_ws
from delta.tables import DeltaTable
# Read new records from bronze (batch mode for scheduled jobs)
bronze_df = spark.table("prod_catalog.bronze.raw_orders")
# Apply transformations
silver_df = (
bronze_df
.withColumn("order_id", col("order_id").cast("string"))
.withColumn("customer_email", lower(trim(col("customer_email"))))
.withColumn("order_date", to_timestamp(col("order_date"), "yyyy-MM-dd'T'HH:mm:ss"))
.withColumn("amount", col("amount").cast("decimal(12,2)"))
.withColumn("email_hash", sha2(col("customer_email"), 256))
.filter(col("order_id").isNotNull())
.dropDuplicates(["order_id"])
)
# Upsert into silver with MERGE
if spark.catalog.tableExists("prod_catalog.silver.orders"):
target = DeltaTable.forName(spark, "prod_catalog.silver.orders")
(target.alias("t")
.merge(silver_df.alias("s"), "t.order_id = s.order_id")
.whenMatchedUpdateAll()
.whenNotMatchedInsertAll()
.execute())