Extracts and maps data lineage from various sources including SQL, dbt, Airflow, and Spark, generating comprehensive lineage graphs for impact analysis.
Extracts and maps data lineage from various sources to provide comprehensive data flow visibility.
This skill parses and extracts data lineage information from SQL queries, dbt projects, Airflow DAGs, and Spark jobs. It generates comprehensive lineage graphs showing data flow from source to destination, enabling impact analysis and data governance.
{
"sources": {
"type": "array",
"required": true,
"items": {
"type": {
"type": "string",
"enum": ["sql", "dbt", "airflow", "spark", "file"]
},
"content": {
"type": "string|object",
"description": "SQL string, file path, or manifest object"
},
"metadata": {
"type": "object",
"properties": {
"database": "string",
"schema": "string",
"catalog": "string"
}
}
}
},
"existingLineage": {
"type": "object",
"description": "Existing lineage graph to merge with"
},
"targetCatalog": {
"type": "string",
"enum": ["datahub", "amundsen", "alation", "openlineage", "json"],
"default": "json",
"description": "Target format for lineage export"
},
"options": {
"type": "object",
"properties": {
"columnLevel": {
"type": "boolean",
"default": true,
"description": "Extract column-level lineage"
},
"resolveViews": {
"type": "boolean",
"default": false,
"description": "Resolve views to underlying tables"
},
"includeTemporary": {
"type": "boolean",
"default": false,
"description": "Include temporary/CTE tables in lineage"
}
}
}
}
{
"lineageGraph": {
"type": "object",
"properties": {
"nodes": {
"type": "array",
"items": {
"id": "string",
"type": "table|view|file|external",
"name": "string",
"database": "string",
"schema": "string",
"columns": "array"
}
},
"edges": {
"type": "array",
"items": {
"source": "string",
"target": "string",
"transformationType": "string",
"sql": "string"
}
}
}
},
"columnLineage": {
"type": "array",
"items": {
"targetColumn": {
"table": "string",
"column": "string"
},
"sourceColumns": {
"type": "array",
"items": {
"table": "string",
"column": "string",
"transformation": "string"
}
},
"transformationLogic": "string"
}
},
"impactAnalysis": {
"type": "object",
"properties": {
"upstream": {
"type": "array",
"description": "All upstream dependencies"
},
"downstream": {
"type": "array",
"description": "All downstream dependents"
},
"criticalPath": {
"type": "array",
"description": "Most important lineage path"
}
}
},
"catalogIntegration": {
"type": "object",
"description": "Export format for target catalog",
"properties": {
"format": "string",
"payload": "object|string"
}
},
"statistics": {
"tablesCount": "number",
"columnsCount": "number",
"edgesCount": "number",
"maxDepth": "number"
}
}
{
"sources": [
{
"type": "sql",
"content": "INSERT INTO analytics.fct_orders SELECT o.order_id, c.customer_name FROM staging.orders o JOIN staging.customers c ON o.customer_id = c.id",
"metadata": {
"database": "warehouse",
"schema": "analytics"
}
}
],
"options": {
"columnLevel": true
}
}
{
"sources": [
{
"type": "dbt",
"content": "./target/manifest.json"
}
],
"targetCatalog": "datahub",
"options": {
"resolveViews": true
}
}
{
"sources": [
{
"type": "dbt",
"content": "./analytics/target/manifest.json"
},
{
"type": "airflow",
"content": "./dags/etl_pipeline.py"
},
{
"type": "sql",
"content": "SELECT * FROM external_db.customers"
}
],
"targetCatalog": "openlineage"
}
{
"sources": [
{
"type": "dbt",
"content": "./target/manifest.json"
}
],
"options": {
"columnLevel": true,
"impactAnalysisTarget": "raw.customers"
}
}
| Statement Type | Extracted Information |
|---|---|
| SELECT | Source tables, column mappings |
| INSERT INTO...SELECT | Target table, source tables |
| CREATE TABLE AS | New table, source lineage |
| MERGE | Target, source, update/insert columns |
| UPDATE...FROM | Target table, source join tables |
Extracts from manifest.json:
ref() and source()catalog.jsonMaps lineage from:
Parses lineage from:
| Type | Example | Lineage |
|---|---|---|
| Direct | SELECT customer_id | 1:1 mapping |
| Rename | customer_id AS cust_id | Rename mapping |
| Expression | CONCAT(first, last) AS name | Multi-column → single |
| Aggregation | SUM(amount) AS total | Many → single with agg |
| Case | CASE WHEN... | Conditional mapping |
{
"columnLineage": [
{
"targetColumn": {
"table": "fct_orders",
"column": "customer_name"
},
"sourceColumns": [
{
"table": "stg_customers",
"column": "first_name",
"transformation": "CONCAT"
},
{
"table": "stg_customers",
"column": "last_name",
"transformation": "CONCAT"
}
],
"transformationLogic": "CONCAT(first_name, ' ', last_name)"
}
]
}
{
"format": "datahub",
"payload": {
"entities": [...],
"relationships": [...]
}
}
{
"format": "openlineage",
"payload": {
"eventType": "COMPLETE",
"run": {...},
"job": {...},
"inputs": [...],
"outputs": [...]
}
}
{
"format": "amundsen",
"payload": {
"tables": [...],
"columns": [...],
"lineage": [...]
}
}
data-lineage.js)data-catalog.js)dbt-project-setup.js)