Creates Prowler Attack Paths openCypher queries using the Cartography schema as the source of truth for node labels, properties, and relationships. Also covers Prowler-specific additions (Internet node, ProwlerFinding, internal isolation labels) and $provider_uid scoping for predefined queries. Trigger: When creating or updating Attack Paths queries.
Attack Paths queries are openCypher queries that analyze cloud infrastructure graphs (ingested via Cartography) to detect security risks like privilege escalation paths, network exposure, and misconfigurations.
Queries are written in openCypher Version 9 for compatibility with both Neo4j and Amazon Neptune.
This skill covers two types of queries with different isolation mechanisms:
| Predefined queries | Custom queries | |
|---|---|---|
| Where they live | api/src/backend/api/attack_paths/queries/{provider}.py | User/LLM-supplied via the custom query API endpoint |
| Provider isolation | AWSAccount {id: $provider_uid} anchor + path connectivity | Automatic _Provider_{uuid} label injection via cypher_sanitizer.py |
| What to write |
Chain every MATCH from the aws variable |
| Plain Cypher, no isolation boilerplate needed |
| Internal labels | Never use (_ProviderResource, _Tenant_*, _Provider_*) | Never use (injected automatically by the system) |
For predefined queries: every node must be reachable from the AWSAccount root via graph traversal. This is the isolation boundary.
For custom queries: write natural Cypher without isolation concerns. The query runner injects a _Provider_{uuid} label into every node pattern before execution, and a post-query filter catches edge cases.
Queries can be created from:
pathfinding.cloud ID (e.g., ECS-001, GLUE-001)
paths.json is too large for WebFetch. Use Bash:# Fetch a single path by ID
curl -s https://raw.githubusercontent.com/DataDog/pathfinding.cloud/main/docs/paths.json \
| jq '.[] | select(.id == "ecs-002")'
# List all path IDs and names
curl -s https://raw.githubusercontent.com/DataDog/pathfinding.cloud/main/docs/paths.json \
| jq -r '.[] | "\(.id): \(.name)"'
# Filter by service prefix
curl -s https://raw.githubusercontent.com/DataDog/pathfinding.cloud/main/docs/paths.json \
| jq -r '.[] | select(.id | startswith("ecs")) | "\(.id): \(.name)"'
If jq is not available, use python3 -c "import json,sys; ..." as a fallback.
Natural language description from the user
One parameter is injected automatically by the query runner:
| Parameter | Property it matches | Used on | Purpose |
|---|---|---|---|
$provider_uid | id | AWSAccount | Scopes to a specific AWS account |
All other nodes are isolated by path connectivity from the AWSAccount anchor.
All query files start with these imports:
from api.attack_paths.queries.types import (
AttackPathsQueryAttribution,
AttackPathsQueryDefinition,
AttackPathsQueryParameterDefinition,
)
from tasks.jobs.attack_paths.config import PROWLER_FINDING_LABEL
The PROWLER_FINDING_LABEL constant (value: "ProwlerFinding") is used via f-string interpolation in all queries. Never hardcode the label string.
There are four distinct privilege escalation patterns. Choose based on the attack type:
| Sub-pattern | Target | path_target shape | Example |
|---|---|---|---|
| Self-escalation | Principal's own policies | (aws)--(target_policy:AWSPolicy)--(principal) | IAM-001 |
| Lateral to user | Other IAM users | (aws)--(target_user:AWSUser) | IAM-002 |
| Assume-role lateral | Assumable roles | (aws)--(target_role:AWSRole)<-[:STS_ASSUMEROLE_ALLOW]-(principal) | IAM-014 |
| PassRole + service | Service-trusting roles | (aws)--(target_role:AWSRole)-[:TRUSTS_AWS_PRINCIPAL]->(...) | EC2-001 |
The principal modifies resources attached to itself. path_target loops back to principal:
AWS_{QUERY_NAME} = AttackPathsQueryDefinition(
id="aws-{kebab-case-name}",
name="{Human-friendly label} ({REFERENCE_ID})",
short_description="{Brief explanation, no technical permissions.}",
description="{Detailed description of the attack vector and impact.}",
attribution=AttackPathsQueryAttribution(
text="pathfinding.cloud - {REFERENCE_ID} - {permission}",
link="https://pathfinding.cloud/paths/{reference_id_lowercase}",
),
provider="aws",
cypher=f"""
// Find principals with {permission}
MATCH path_principal = (aws:AWSAccount {{id: $provider_uid}})--(principal:AWSPrincipal)--(policy:AWSPolicy)--(stmt:AWSPolicyStatement)
WHERE stmt.effect = 'Allow'
AND any(action IN stmt.action WHERE
toLower(action) = '{permission_lowercase}'
OR toLower(action) = '{service}:*'
OR action = '*'
)
// Find target resources attached to the same principal
MATCH path_target = (aws)--(target_policy:AWSPolicy)--(principal)
WHERE target_policy.arn CONTAINS $provider_uid
AND any(resource IN stmt.resource WHERE
resource = '*'
OR target_policy.arn CONTAINS resource
)
WITH collect(path_principal) + collect(path_target) AS paths
UNWIND paths AS p
UNWIND nodes(p) AS n
WITH paths, collect(DISTINCT n) AS unique_nodes
UNWIND unique_nodes AS n
OPTIONAL MATCH (n)-[pfr]-(pf:{PROWLER_FINDING_LABEL} {{status: 'FAIL'}})
RETURN paths, collect(DISTINCT pf) as dpf, collect(DISTINCT pfr) as dpfr
""",
parameters=[],
)
path_target shapesThe other 3 sub-patterns share the same path_principal, deduplication tail, and RETURN as self-escalation. Only the path_target MATCH differs:
// Lateral to user (e.g., IAM-002) - targets other IAM users
MATCH path_target = (aws)--(target_user:AWSUser)
WHERE any(resource IN stmt.resource WHERE resource = '*' OR target_user.arn CONTAINS resource OR resource CONTAINS target_user.name)
// Assume-role lateral (e.g., IAM-014) - targets roles the principal can assume
MATCH path_target = (aws)--(target_role:AWSRole)<-[:STS_ASSUMEROLE_ALLOW]-(principal)
WHERE any(resource IN stmt.resource WHERE resource = '*' OR target_role.arn CONTAINS resource OR resource CONTAINS target_role.name)
// PassRole + service (e.g., EC2-001) - targets roles trusting a service
MATCH path_target = (aws)--(target_role:AWSRole)-[:TRUSTS_AWS_PRINCIPAL]->(:AWSPrincipal {arn: '{service}.amazonaws.com'})
WHERE any(resource IN stmt.resource WHERE resource = '*' OR target_role.arn CONTAINS resource OR resource CONTAINS target_role.name)
Multi-permission: PassRole queries require a second permission. Add MATCH (principal)--(policy2:AWSPolicy)--(stmt2:AWSPolicyStatement) with its own WHERE before path_target, then check BOTH stmt.resource AND stmt2.resource against the target. See IAM-015 or EC2-001 in aws.py for examples.
The Internet node is reached via CAN_ACCESS through the already-scoped resource, not via a standalone lookup:
AWS_{QUERY_NAME} = AttackPathsQueryDefinition(
id="aws-{kebab-case-name}",
name="{Human-friendly label}",
short_description="{Brief explanation.}",
description="{Detailed description.}",
provider="aws",
cypher=f"""
// Match exposed resources (MUST chain from `aws`)
MATCH path = (aws:AWSAccount {{id: $provider_uid}})--(resource:EC2Instance)
WHERE resource.exposed_internet = true
// Internet node reached via path connectivity through the resource
OPTIONAL MATCH (internet:Internet)-[can_access:CAN_ACCESS]->(resource)
WITH collect(path) AS paths, head(collect(internet)) AS internet, collect(can_access) AS can_access
UNWIND paths AS p
UNWIND nodes(p) AS n
WITH paths, internet, can_access, collect(DISTINCT n) AS unique_nodes
UNWIND unique_nodes AS n
OPTIONAL MATCH (n)-[pfr]-(pf:{PROWLER_FINDING_LABEL} {{status: 'FAIL'}})
RETURN paths, collect(DISTINCT pf) as dpf, collect(DISTINCT pfr) as dpfr,
internet, can_access
""",
parameters=[],
)
Add to the {PROVIDER}_QUERIES list at the bottom of the file:
AWS_QUERIES: list[AttackPathsQueryDefinition] = [
# ... existing queries ...
AWS_{NEW_QUERY_NAME}, # Add here
]
FIRST, read all files in the queries module to understand the structure, type definitions, registration, and existing style:
api/src/backend/api/attack_paths/queries/
├── __init__.py # Module exports
├── types.py # AttackPathsQueryDefinition, AttackPathsQueryParameterDefinition
├── registry.py # Query registry logic
└── {provider}.py # Provider-specific queries (e.g., aws.py)
DO NOT use generic templates. Match the exact style of existing queries in the file.
This is the most important step. Every node label, property, and relationship in the query must exist in the Cartography schema for the pinned version. Do not guess or rely on memory.
Check api/pyproject.toml for the Cartography dependency, then fetch the schema:
grep cartography api/pyproject.toml
Build the schema URL (ALWAYS use the specific tag, not master/main):
# Git dependency (prowler-cloud/[email protected]):