Guide for starting the initial data ingestion pipeline after deploying a new DataSurface Yellow model. Covers tagging, secrets, and DAG activation.
This guide walks through the steps to get the data ingestion pipeline running after deploying a new DataSurface Yellow model.
This is a demo project for customers doing their first DataSurface setup. Before using this skill, ensure:
/setup-walkthrough to create the DataSurface Yellow environment on Docker Desktop with Kubernetes/create-customer-data-simulator to start a simulator that generates customer and address data in the source databaseOnce the environment is running and test data is being generated, use this skill to start the ingestion pipeline.
Understanding the pipeline flow helps diagnose where issues occur:
Infrastructure → Factory → Ingestion → Reconcile/CQRS
| DAG | Purpose |
|---|---|
| demo-psp_infrastructure | Main orchestrator. Loads model from git (tagged releases only), writes DAG configs to database, creates factory and system DAGs. |
| scd2_factory_dag | Factory pattern. Reads ingestion configs from DB, dynamically creates/updates/removes ingestion DAGs. |
| scd2_datatransformer_factory | Factory for DataTransformer jobs (masking, aggregation, derived columns). Creates transformer DAGs from DB configs. |
| scd2__CustomerDB_ingestion | Dynamic ingestion DAG. Runs on cron schedule, snapshots source tables, writes to staging, performs SCD2 merge (tracks inserts/updates/deletes with history). |
| Demo_PSP_K8sMergeDB_reconcile | Creates workspace views for consumers after ingestion tables exist. Maps DSG assignments to database views. |
| Demo_PSP_default_K8sMergeDB_cqrs | CQRS replication. Copies merged data to target databases for querying. Separates write path (merge DB) from read paths. |
v*.*.*-demo patternThe reconcile and ingestion jobs only pick up tagged releases matching the pattern v*.*.*-demo.
git fetch --tags
git tag -l "v*-demo" --sort=-version:refname | head -5
kubectl logs -n demo1 deployment/demo-psp-mcp-server --tail=50 | grep -E "tag:|commit:"
# Tag current commit
git tag v1.0.1-demo
git push origin v1.0.1-demo
# Or tag a specific commit
git tag v1.0.1-demo <commit-hash>
git push origin v1.0.1-demo
REQUIRED: The ingestion DAG needs credentials to access the source database. Create this secret before proceeding.
kubectl get secret customer-source-credential -n demo1 2>/dev/null && echo "Secret exists" || echo "Secret MISSING - create it below"
kubectl create secret generic customer-source-credential \
--from-literal=USER=postgres \
--from-literal=PASSWORD=password \
-n demo1
Naming rules for secrets:
_) become hyphens (-)USER, PASSWORD, TOKENSee /create-k8-credential skill for detailed credential creation.
IMPORTANT: DAGs are paused by default. You MUST unpause them BEFORE triggering, otherwise triggers will be ignored.
# Unpause all key DAGs FIRST
kubectl exec -n demo1 deployment/airflow-api-server -- airflow dags unpause demo-psp_infrastructure
kubectl exec -n demo1 deployment/airflow-api-server -- airflow dags unpause scd2_factory_dag
kubectl exec -n demo1 deployment/airflow-api-server -- airflow dags unpause scd2_datatransformer_factory
kubectl exec -n demo1 deployment/airflow-api-server -- airflow dags unpause Demo_PSP_K8sMergeDB_reconcile
kubectl exec -n demo1 deployment/airflow-api-server -- airflow dags unpause Demo_PSP_default_K8sMergeDB_cqrs
Verify DAGs are unpaused (is_paused should be False):
kubectl exec -n demo1 deployment/airflow-api-server -- airflow dags list 2>/dev/null
After secrets are created and DAGs are unpaused, trigger the pipeline in order:
kubectl exec -n demo1 deployment/airflow-api-server -- airflow dags trigger demo-psp_infrastructure
Wait ~20 seconds for completion.
kubectl exec -n demo1 deployment/airflow-api-server -- airflow dags list 2>/dev/null | grep -E "(factory|reconcile|cqrs|infrastructure)"
Expected DAGs:
demo-psp_infrastructure - Main infrastructure DAG (triggers model merge)
Demo_PSP_K8sMergeDB_reconcile - Creates workspace views after tables exist
Demo_PSP_default_K8sMergeDB_cqrs - CQRS replication DAG
scd2_datatransformer_factory - Factory for data transformer DAGs
scd2_factory_dag - Factory that creates ingestion DAGs
kubectl exec -n demo1 deployment/airflow-api-server -- airflow dags trigger scd2_factory_dag
Wait ~15 seconds, then verify:
kubectl exec -n demo1 deployment/airflow-api-server -- airflow dags list | grep ingestion
Expected output:
scd2__CustomerDB_ingestion
kubectl exec -n demo1 deployment/airflow-api-server -- airflow dags unpause scd2__CustomerDB_ingestion
kubectl exec -n demo1 deployment/airflow-api-server -- airflow dags trigger scd2__CustomerDB_ingestion
kubectl exec -n demo1 airflow-worker-0 -c worker -- \
ls -lt "/opt/airflow/logs/dag_id=scd2__CustomerDB_ingestion/" | head -5
# Get the latest run
LATEST_RUN=$(kubectl exec -n demo1 airflow-worker-0 -c worker -- \
ls -t "/opt/airflow/logs/dag_id=scd2__CustomerDB_ingestion/" | head -1)
# Check for success
kubectl exec -n demo1 airflow-worker-0 -c worker -- \
cat "/opt/airflow/logs/dag_id=scd2__CustomerDB_ingestion/$LATEST_RUN/task_id=snapshot_merge_job/attempt=1.log" | \
grep -E "(RESULT_CODE|completed|Ingested|total_records)"
Expected output:
"Job completed successfully"
"total_records_ingested": 354
"DATASURFACE_RESULT_CODE=0"
| Issue | Check | Solution |
|---|---|---|
| No dynamic DAGs | Model not tagged | Create and push tag v*.*.*-demo |
| DAG creation failed | Missing secret | Create K8s secret (see Step 2) |
| Trigger ignored / DAG not running | DAGs paused | Unpause BEFORE triggering (see Step 3) |
| Factory finds 0 configs | Infrastructure hasn't run | Unpause and trigger demo-psp_infrastructure first |
| Reconcile finds no tables | Ingestion hasn't run | Wait for ingestion DAG to complete |
Run these in sequence to start initial ingestion:
# 1. Ensure model is tagged and pushed (use appropriate version)
git tag v1.0.1-demo && git push origin v1.0.1-demo
# 2. Create the required customer-source-credential secret
kubectl create secret generic customer-source-credential \
--from-literal=USER=postgres \
--from-literal=PASSWORD=password \
-n demo1
# 3. Unpause all DAGs FIRST (triggers are ignored on paused DAGs)
kubectl exec -n demo1 deployment/airflow-api-server -- airflow dags unpause demo-psp_infrastructure
kubectl exec -n demo1 deployment/airflow-api-server -- airflow dags unpause scd2_factory_dag
kubectl exec -n demo1 deployment/airflow-api-server -- airflow dags unpause scd2_datatransformer_factory
kubectl exec -n demo1 deployment/airflow-api-server -- airflow dags unpause Demo_PSP_K8sMergeDB_reconcile
kubectl exec -n demo1 deployment/airflow-api-server -- airflow dags unpause Demo_PSP_default_K8sMergeDB_cqrs
# 4. Trigger infrastructure DAG to load model and create factory DAGs
kubectl exec -n demo1 deployment/airflow-api-server -- airflow dags trigger demo-psp_infrastructure
sleep 20
# 5. Trigger factory DAG to create ingestion DAGs
kubectl exec -n demo1 deployment/airflow-api-server -- airflow dags trigger scd2_factory_dag
sleep 15
# 6. Unpause and trigger ingestion DAG (created by factory in step 5)
kubectl exec -n demo1 deployment/airflow-api-server -- airflow dags unpause scd2__CustomerDB_ingestion
kubectl exec -n demo1 deployment/airflow-api-server -- airflow dags trigger scd2__CustomerDB_ingestion
# Check all DAGs are unpaused (is_paused should be False)
kubectl exec -n demo1 deployment/airflow-api-server -- airflow dags list 2>/dev/null
# Check recent ingestion logs for success
kubectl exec -n demo1 airflow-worker-0 -c worker -- \
ls -lt "/opt/airflow/logs/dag_id=scd2__CustomerDB_ingestion/" | head -3