Generate realistic synthetic data using Faker and Spark, with non-linear distributions, integrity constraints, and save to Databricks. Use when creating test data, demo datasets, or synthetic tables.
Generate realistic, story-driven synthetic data for Databricks using Python with Faker and Spark.
These libraries are useful for generating realistic synthetic data:
These are typically NOT pre-installed on Databricks. Install them using execute_databricks_command tool:
code: "%pip install faker holidays"Save the returned cluster_id and context_id for subsequent calls.
scripts/generate_data.py)run_python_file_on_databricks MCP toolcluster_id and context_idAlways work with local files first, then execute. This makes debugging easier - you can see and edit the code.
The first execution auto-selects a running cluster and creates an execution context. Reuse this context for follow-up calls - it's much faster (~1s vs ~15s) and shares variables/imports:
First execution - use run_python_file_on_databricks tool:
file_path: "scripts/generate_data.py"Returns: { success, output, error, cluster_id, context_id, ... }
Save cluster_id and context_id for follow-up calls.
If execution fails:
run_python_file_on_databricks tool:
file_path: "scripts/generate_data.py"cluster_id: "<saved_cluster_id>"context_id: "<saved_context_id>"Follow-up executions reuse the context (faster, shares state):
file_path: "scripts/validate_data.py"cluster_id: "<saved_cluster_id>"context_id: "<saved_context_id>"When execution fails:
cluster_id and context_id (faster, keeps installed libraries)context_id to create a fresh oneDatabricks provides Spark, pandas, numpy, and common data libraries by default. Only install a library if you get an import error.
Use execute_databricks_command tool:
code: "%pip install faker"cluster_id: "<cluster_id>"context_id: "<context_id>"The library is immediately available in the same context.
Note: Keeping the same context_id means installed libraries persist across calls.
By default, use the ai_dev_kit catalog. Ask the user which schema to use:
"I'll save the data to
ai_dev_kit.<schema>. What schema name would you like to use? (You can also specify a different catalog if needed.)"
If the user provides just a schema name, use ai_dev_kit.{schema}. If they provide catalog.schema, use that instead.
Always create the catalog, schema, and volume inside the Python script using spark.sql(). Do NOT make separate MCP SQL calls - it's much slower.
The spark variable is available by default on Databricks clusters.
# =============================================================================
# CREATE INFRASTRUCTURE (inside the Python script)
# =============================================================================
spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {CATALOG}.{SCHEMA}.raw_data")
Always save data to a Volume as parquet files, never directly to tables (unless the user explicitly requests tables). This is the input for the downstream Spark Declarative Pipeline (SDP) that will handle bronze/silver/gold layers.
VOLUME_PATH = f"/Volumes/{CATALOG}/{SCHEMA}/raw_data"
# Save as parquet files (raw data)
spark.createDataFrame(customers_pdf).write.mode("overwrite").parquet(f"{VOLUME_PATH}/customers")
spark.createDataFrame(orders_pdf).write.mode("overwrite").parquet(f"{VOLUME_PATH}/orders")
spark.createDataFrame(tickets_pdf).write.mode("overwrite").parquet(f"{VOLUME_PATH}/tickets")
By default, generate raw, transactional data only. Do not create fields that represent sums, totals, averages, or counts.
total_orders, sum_revenue, avg_csat, order_countWhy? A Spark Declarative Pipeline (SDP) will typically be built after data generation to:
The synthetic data is the source for this pipeline. Aggregations happen downstream.
Note: If the user specifically requests aggregated fields or summary tables, follow their instructions.
# GOOD - Raw transactional data
# Customer table: one row per customer, no aggregated fields
customers_data.append({
"customer_id": cid,
"name": fake.company(),
"tier": "Enterprise",
"region": "North",
})
# Order table: one row per order
orders_data.append({
"order_id": f"ORD-{i:06d}",
"customer_id": cid,
"amount": 150.00, # This order's amount
"order_date": "2024-10-15",
})
# BAD - Don't add pre-aggregated fields
# customers_data.append({
# "customer_id": cid,
# "total_orders": 47, # NO - this is an aggregation
# "total_revenue": 12500.00, # NO - this is a sum
# "avg_order_value": 265.95, # NO - this is an average
# })
Always generate data for the last ~6 months ending at the current date. This ensures:
from datetime import datetime, timedelta
# Dynamic date range - last 6 months from today
END_DATE = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
START_DATE = END_DATE - timedelta(days=180)
# Place special events within this range (e.g., incident 3 weeks ago)
INCIDENT_END = END_DATE - timedelta(days=21)
INCIDENT_START = INCIDENT_END - timedelta(days=10)
Generate enough data so patterns remain visible after downstream aggregation (SDP pipelines often aggregate by day/week/region/category). Rules of thumb:
| Grain | Minimum Records | Rationale |
|---|---|---|
| Daily time series | 50-100/day | See trends after weekly rollup |
| Per category | 500+ per category | Statistical significance |
| Per customer | 5-20 events/customer | Enough for customer-level analysis |
| Total rows | 10K-50K minimum | Patterns survive GROUP BY |
# Example: 8000 tickets over 180 days = ~44/day average
# After weekly aggregation: ~310 records per week per category
# After monthly by region: still enough to see patterns
N_TICKETS = 8000
N_CUSTOMERS = 2500 # Each has ~3 tickets on average
N_ORDERS = 25000 # ~10 orders per customer average
Always structure scripts with configuration variables at the top:
"""Generate synthetic data for [use case]."""
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from faker import Faker
import holidays
from pyspark.sql import SparkSession
# =============================================================================
# CONFIGURATION - Edit these values
# =============================================================================
CATALOG = "my_catalog"
SCHEMA = "my_schema"
VOLUME_PATH = f"/Volumes/{CATALOG}/{SCHEMA}/raw_data"
# Data sizes - enough for aggregation patterns to survive
N_CUSTOMERS = 2500
N_ORDERS = 25000
N_TICKETS = 8000
# Date range - last 6 months from today
END_DATE = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
START_DATE = END_DATE - timedelta(days=180)
# Special events (within the date range)
INCIDENT_END = END_DATE - timedelta(days=21)
INCIDENT_START = INCIDENT_END - timedelta(days=10)
# Holiday calendar for realistic patterns
US_HOLIDAYS = holidays.US(years=[START_DATE.year, END_DATE.year])
# Reproducibility
SEED = 42
# =============================================================================
# SETUP
# =============================================================================
np.random.seed(SEED)
Faker.seed(SEED)
fake = Faker()
spark = SparkSession.builder.getOrCreate()
# ... rest of script
Generate data with pandas (faster, easier), convert to Spark for saving:
import pandas as pd
# Generate with pandas
customers_pdf = pd.DataFrame({
"customer_id": [f"CUST-{i:05d}" for i in range(N_CUSTOMERS)],
"name": [fake.company() for _ in range(N_CUSTOMERS)],
"tier": np.random.choice(['Free', 'Pro', 'Enterprise'], N_CUSTOMERS, p=[0.6, 0.3, 0.1]),
"region": np.random.choice(['North', 'South', 'East', 'West'], N_CUSTOMERS, p=[0.4, 0.25, 0.2, 0.15]),
"created_at": [fake.date_between(start_date='-2y', end_date='-6m') for _ in range(N_CUSTOMERS)],
})
# Convert to Spark and save
customers_df = spark.createDataFrame(customers_pdf)
customers_df.write.mode("overwrite").parquet(f"{VOLUME_PATH}/customers")
Generate master tables first, then iterate on them to create related tables with matching IDs:
# 1. Generate customers (master table)
customers_pdf = pd.DataFrame({
"customer_id": [f"CUST-{i:05d}" for i in range(N_CUSTOMERS)],
"tier": np.random.choice(['Free', 'Pro', 'Enterprise'], N_CUSTOMERS, p=[0.6, 0.3, 0.1]),
# ...
})
# 2. Create lookup for foreign key generation
customer_ids = customers_pdf["customer_id"].tolist()
customer_tier_map = dict(zip(customers_pdf["customer_id"], customers_pdf["tier"]))
# Weight by tier - Enterprise customers generate more orders
tier_weights = customers_pdf["tier"].map({'Enterprise': 5.0, 'Pro': 2.0, 'Free': 1.0})
customer_weights = (tier_weights / tier_weights.sum()).tolist()
# 3. Generate orders with valid foreign keys and tier-based logic
orders_data = []
for i in range(N_ORDERS):
cid = np.random.choice(customer_ids, p=customer_weights)
tier = customer_tier_map[cid]
# Amount depends on tier
if tier == 'Enterprise':
amount = np.random.lognormal(7, 0.8)
elif tier == 'Pro':
amount = np.random.lognormal(5, 0.7)
else:
amount = np.random.lognormal(3.5, 0.6)
orders_data.append({
"order_id": f"ORD-{i:06d}",
"customer_id": cid,
"amount": round(amount, 2),
"order_date": fake.date_between(start_date=START_DATE, end_date=END_DATE),
})
orders_pdf = pd.DataFrame(orders_data)
# 4. Generate tickets that reference both customers and orders
order_ids = orders_pdf["order_id"].tolist()
tickets_data = []
for i in range(N_TICKETS):
cid = np.random.choice(customer_ids, p=customer_weights)
oid = np.random.choice(order_ids) # Or None for general inquiry
tickets_data.append({
"ticket_id": f"TKT-{i:06d}",
"customer_id": cid,
"order_id": oid if np.random.random() > 0.3 else None,
# ...
})
tickets_pdf = pd.DataFrame(tickets_data)
Never use uniform distributions - real data is rarely uniform:
# BAD - Uniform (unrealistic)
prices = np.random.uniform(10, 1000, size=N_ORDERS)
# GOOD - Log-normal (realistic for prices, salaries, order amounts)
prices = np.random.lognormal(mean=4.5, sigma=0.8, size=N_ORDERS)
# GOOD - Pareto/power law (popularity, wealth, page views)
popularity = (np.random.pareto(a=2.5, size=N_PRODUCTS) + 1) * 10
# GOOD - Exponential (time between events, resolution time)
resolution_hours = np.random.exponential(scale=24, size=N_TICKETS)
# GOOD - Weighted categorical
regions = np.random.choice(
['North', 'South', 'East', 'West'],
size=N_CUSTOMERS,
p=[0.40, 0.25, 0.20, 0.15]
)
Add weekday/weekend effects, holidays, seasonality, and event spikes:
import holidays
# Load holiday calendar
US_HOLIDAYS = holidays.US(years=[START_DATE.year, END_DATE.year])
def get_daily_multiplier(date):
"""Calculate volume multiplier for a given date."""
multiplier = 1.0
# Weekend drop
if date.weekday() >= 5:
multiplier *= 0.6
# Holiday drop (even lower than weekends)
if date in US_HOLIDAYS:
multiplier *= 0.3
# Q4 seasonality (higher in Oct-Dec)
multiplier *= 1 + 0.15 * (date.month - 6) / 6
# Incident spike
if INCIDENT_START <= date <= INCIDENT_END:
multiplier *= 3.0
# Random noise
multiplier *= np.random.normal(1, 0.1)
return max(0.1, multiplier)
# Distribute tickets across dates with realistic patterns
date_range = pd.date_range(START_DATE, END_DATE, freq='D')
daily_volumes = [int(BASE_DAILY_TICKETS * get_daily_multiplier(d)) for d in date_range]
Attributes within a row should correlate logically:
def generate_ticket(customer_id, tier, date):
"""Generate a coherent ticket where attributes correlate."""
# Priority correlates with tier
if tier == 'Enterprise':
priority = np.random.choice(['Critical', 'High', 'Medium'], p=[0.3, 0.5, 0.2])
else:
priority = np.random.choice(['Critical', 'High', 'Medium', 'Low'], p=[0.05, 0.2, 0.45, 0.3])
# Resolution time correlates with priority
resolution_scale = {'Critical': 4, 'High': 12, 'Medium': 36, 'Low': 72}
resolution_hours = np.random.exponential(scale=resolution_scale[priority])
# CSAT correlates with resolution time
if resolution_hours < 4:
csat = np.random.choice([4, 5], p=[0.3, 0.7])
elif resolution_hours < 24:
csat = np.random.choice([3, 4, 5], p=[0.2, 0.5, 0.3])
else:
csat = np.random.choice([1, 2, 3, 4], p=[0.1, 0.3, 0.4, 0.2])
return {
"customer_id": customer_id,
"priority": priority,
"resolution_hours": round(resolution_hours, 1),
"csat_score": csat,
"created_at": date,
}
Save as scripts/generate_data.py:
"""Generate synthetic customer, order, and ticket data."""
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
from faker import Faker
import holidays
from pyspark.sql import SparkSession
# =============================================================================
# CONFIGURATION
# =============================================================================
CATALOG = "my_catalog"
SCHEMA = "my_schema"
VOLUME_PATH = f"/Volumes/{CATALOG}/{SCHEMA}/raw_data"
N_CUSTOMERS = 2500
N_ORDERS = 25000
N_TICKETS = 8000
# Date range - last 6 months from today
END_DATE = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
START_DATE = END_DATE - timedelta(days=180)
# Special events (within the date range)
INCIDENT_END = END_DATE - timedelta(days=21)
INCIDENT_START = INCIDENT_END - timedelta(days=10)
# Holiday calendar
US_HOLIDAYS = holidays.US(years=[START_DATE.year, END_DATE.year])
SEED = 42
# =============================================================================
# SETUP
# =============================================================================
np.random.seed(SEED)
Faker.seed(SEED)
fake = Faker()
spark = SparkSession.builder.getOrCreate()
# =============================================================================
# CREATE INFRASTRUCTURE
# =============================================================================
print(f"Creating catalog/schema/volume if needed...")
spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")
spark.sql(f"CREATE VOLUME IF NOT EXISTS {CATALOG}.{SCHEMA}.raw_data")
print(f"Generating: {N_CUSTOMERS:,} customers, {N_ORDERS:,} orders, {N_TICKETS:,} tickets")
# =============================================================================
# 1. CUSTOMERS (Master Table)
# =============================================================================
print("Generating customers...")
customers_pdf = pd.DataFrame({
"customer_id": [f"CUST-{i:05d}" for i in range(N_CUSTOMERS)],
"name": [fake.company() for _ in range(N_CUSTOMERS)],
"tier": np.random.choice(['Free', 'Pro', 'Enterprise'], N_CUSTOMERS, p=[0.6, 0.3, 0.1]),
"region": np.random.choice(['North', 'South', 'East', 'West'], N_CUSTOMERS, p=[0.4, 0.25, 0.2, 0.15]),
})
# ARR correlates with tier
customers_pdf["arr"] = customers_pdf["tier"].apply(
lambda t: round(np.random.lognormal(11, 0.5), 2) if t == 'Enterprise'
else round(np.random.lognormal(8, 0.6), 2) if t == 'Pro' else 0
)
# Lookups for foreign keys
customer_ids = customers_pdf["customer_id"].tolist()
customer_tier_map = dict(zip(customers_pdf["customer_id"], customers_pdf["tier"]))
tier_weights = customers_pdf["tier"].map({'Enterprise': 5.0, 'Pro': 2.0, 'Free': 1.0})
customer_weights = (tier_weights / tier_weights.sum()).tolist()
print(f" Created {len(customers_pdf):,} customers")
# =============================================================================
# 2. ORDERS (References Customers)
# =============================================================================
print("Generating orders...")
orders_data = []
for i in range(N_ORDERS):
cid = np.random.choice(customer_ids, p=customer_weights)
tier = customer_tier_map[cid]
amount = np.random.lognormal(7 if tier == 'Enterprise' else 5 if tier == 'Pro' else 3.5, 0.7)
orders_data.append({
"order_id": f"ORD-{i:06d}",
"customer_id": cid,
"amount": round(amount, 2),
"status": np.random.choice(['completed', 'pending', 'cancelled'], p=[0.85, 0.10, 0.05]),
"order_date": fake.date_between(start_date=START_DATE, end_date=END_DATE),
})
orders_pdf = pd.DataFrame(orders_data)
print(f" Created {len(orders_pdf):,} orders")
# =============================================================================
# 3. TICKETS (References Customers, with incident spike)
# =============================================================================
print("Generating tickets...")
def get_daily_volume(date, base=25):
vol = base * (0.6 if date.weekday() >= 5 else 1.0)
if date in US_HOLIDAYS:
vol *= 0.3 # Even lower on holidays
if INCIDENT_START <= date <= INCIDENT_END:
vol *= 3.0
return int(vol * np.random.normal(1, 0.15))
# Distribute tickets across dates
tickets_data = []
ticket_idx = 0
for day in pd.date_range(START_DATE, END_DATE):
daily_count = get_daily_volume(day.to_pydatetime())
is_incident = INCIDENT_START <= day.to_pydatetime() <= INCIDENT_END
for _ in range(daily_count):
if ticket_idx >= N_TICKETS:
break
cid = np.random.choice(customer_ids, p=customer_weights)
tier = customer_tier_map[cid]
# Category - Auth dominates during incident
if is_incident:
category = np.random.choice(['Auth', 'Network', 'Billing', 'Account'], p=[0.65, 0.15, 0.1, 0.1])
else:
category = np.random.choice(['Auth', 'Network', 'Billing', 'Account'], p=[0.25, 0.30, 0.25, 0.20])
# Priority correlates with tier
priority = np.random.choice(['Critical', 'High', 'Medium'], p=[0.3, 0.5, 0.2]) if tier == 'Enterprise' \
else np.random.choice(['Critical', 'High', 'Medium', 'Low'], p=[0.05, 0.2, 0.45, 0.3])
# Resolution time correlates with priority
res_scale = {'Critical': 4, 'High': 12, 'Medium': 36, 'Low': 72}
resolution = np.random.exponential(scale=res_scale[priority])
# CSAT degrades during incident for Auth
if is_incident and category == 'Auth':
csat = np.random.choice([1, 2, 3, 4, 5], p=[0.15, 0.25, 0.35, 0.2, 0.05])
else:
csat = 5 if resolution < 4 else (4 if resolution < 12 else np.random.choice([2, 3, 4], p=[0.2, 0.5, 0.3]))
tickets_data.append({
"ticket_id": f"TKT-{ticket_idx:06d}",
"customer_id": cid,
"category": category,
"priority": priority,
"resolution_hours": round(resolution, 1),
"csat_score": csat,
"created_at": day.strftime("%Y-%m-%d"),
})
ticket_idx += 1
if ticket_idx >= N_TICKETS:
break
tickets_pdf = pd.DataFrame(tickets_data)
print(f" Created {len(tickets_pdf):,} tickets")
# =============================================================================
# 4. SAVE TO VOLUME
# =============================================================================
print(f"\nSaving to {VOLUME_PATH}...")
spark.createDataFrame(customers_pdf).write.mode("overwrite").parquet(f"{VOLUME_PATH}/customers")
spark.createDataFrame(orders_pdf).write.mode("overwrite").parquet(f"{VOLUME_PATH}/orders")
spark.createDataFrame(tickets_pdf).write.mode("overwrite").parquet(f"{VOLUME_PATH}/tickets")
print("Done!")
# =============================================================================
# 5. VALIDATION
# =============================================================================
print("\n=== VALIDATION ===")
print(f"Tier distribution: {customers_pdf['tier'].value_counts(normalize=True).to_dict()}")
print(f"Avg order by tier: {orders_pdf.merge(customers_pdf[['customer_id', 'tier']]).groupby('tier')['amount'].mean().to_dict()}")
incident_tickets = tickets_pdf[tickets_pdf['created_at'].between(
INCIDENT_START.strftime("%Y-%m-%d"), INCIDENT_END.strftime("%Y-%m-%d")
)]
print(f"Incident period tickets: {len(incident_tickets):,} ({len(incident_tickets)/len(tickets_pdf)*100:.1f}%)")
print(f"Incident Auth %: {(incident_tickets['category'] == 'Auth').mean()*100:.1f}%")
Execute using run_python_file_on_databricks tool:
file_path: "scripts/generate_data.py"If it fails, edit the file and re-run with the same cluster_id and context_id.
After successful execution, use get_volume_folder_details tool to verify the generated data:
volume_path: "my_catalog/my_schema/raw_data/customers"format: "parquet"table_stat_level: "SIMPLE"This returns schema, row counts, and column statistics to confirm the data was written correctly.
ai_dev_kit catalog, ask user for schema nameCREATE CATALOG/SCHEMA/VOLUME IF NOT EXISTStotal_x, sum_x, avg_x fields - SDP pipeline computes those/Volumes/{catalog}/{schema}/raw_data/<input_datasource_name>datetime.now() - timedelta(days=180) for last 6 monthscluster_id and context_id for faster iterationsfaker and holidays first; most others are pre-installed