Manages MongoDB Atlas Stream Processing (ASP) workflows. Handles workspace provisioning, data source/sink connections, processor lifecycle operations, debugging diagnostics, and tier sizing. Supports Kafka, Atlas clusters, S3, HTTPS, and Lambda integrations for streaming data workloads and event processing. NOT for general MongoDB queries or Atlas cluster management. Requires MongoDB MCP Server with Atlas API credentials.
Build, operate, and debug Atlas Stream Processing (ASP) pipelines using four MCP tools from the MongoDB MCP Server.
This skill requires the MongoDB MCP Server connected with:
apiClientId and apiClientSecret)The 4 tools: atlas-streams-discover, atlas-streams-build, atlas-streams-manage, atlas-streams-teardown.
All operations require an Atlas project ID. If unknown, call atlas-list-projects first to find your project ID.
If the MongoDB MCP Server is not connected or the streams tools are missing, see references/mcp-troubleshooting.md for diagnostic steps and fallback options.
| Action | Use when |
|---|---|
list-workspaces | See all workspaces in a project |
inspect-workspace | Review workspace config, state, region |
list-connections | See all connections in a workspace |
inspect-connection | Check connection state, config, health |
list-processors | See all processors in a workspace |
inspect-processor | Check processor state, pipeline, config |
diagnose-processor | Full health report: state, stats, errors |
get-networking | PrivateLink and VPC peering details. Optional: cloudProvider + region to get Atlas account details for PrivateLink setup |
Pagination (all list actions): limit (1-100, default 20), pageNum (default 1).
Response format: responseFormat — "concise" (default for list actions) or "detailed" (default for inspect/diagnose).
| Resource | Key parameters |
|---|---|
workspace | cloudProvider, region, tier (default SP10), includeSampleData |
connection | connectionName, connectionType (Kafka/Cluster/S3/Https/Kinesis/Lambda/SchemaRegistry/Sample), connectionConfig |
processor | processorName, pipeline (must start with $source, end with $merge/$emit), dlq, autoStart |
privatelink | privateLinkConfig (project-level, not tied to a specific workspace) |
Field mapping — only fill fields for the selected resource type:
projectId, workspaceName, cloudProvider, region, tier, includeSampleData. Leave empty: all connection and processor fields.projectId, workspaceName, connectionName, connectionType, connectionConfig. Leave empty: all workspace and processor fields. (See references/connection-configs.md for type-specific schemas.)projectId, workspaceName, processorName, pipeline, dlq (recommended), autoStart (optional). Leave empty: all workspace and connection fields. (See references/pipeline-patterns.md for pipeline examples.)projectId, privateLinkConfig. Note: PrivateLink is project-level, not workspace-level. workspaceName is not required — omit it. Leave empty: all connection and processor fields.| Action | Notes |
|---|---|
start-processor | Begins billing. Optional tier override, resumeFromCheckpoint |
stop-processor | Stops billing. Retains state 45 days |
modify-processor | Processor must be stopped first. Change pipeline, DLQ, or name |
update-workspace | Change tier or region |
update-connection | Update config (networking is immutable — must delete and recreate) |
accept-peering / reject-peering | VPC peering management |
Field mapping — always fill projectId, workspaceName, then by action:
"start-processor" → resourceName. Optional: tier, resumeFromCheckpoint, startAtOperationTime (ISO 8601 timestamp to resume from a specific point)"stop-processor" → resourceName"modify-processor" → resourceName. At least one of: pipeline, dlq, newName"update-workspace" → newRegion or newTier"update-connection" → resourceName, connectionConfig. Exception: networking config (e.g., PrivateLink) cannot be modified after creation — delete and recreate."accept-peering" → peeringId, requesterAccountId, requesterVpcId"reject-peering" → peeringIdState pre-checks:
start-processor → errors if processor is already STARTEDstop-processor → no-ops if already STOPPED or CREATED (not an error)modify-processor → errors if processor is STARTED (must stop first)Processor states: CREATED → STARTED (via start) → STOPPED (via stop). Can also enter FAILED on runtime errors. Modify requires STOPPED or CREATED state.
Teardown safety checks:
| Resource | Safety behavior |
|---|---|
processor | Auto-stops before deleting |
connection | Blocks if referenced by running processor |
workspace | Cascading delete of all connections and processors |
privatelink / peering | Remove networking resources |
Field mapping — always fill projectId, resource, then:
resource: "workspace" → workspaceNameresource: "connection" or "processor" → workspaceName, resourceNameresource: "privatelink" or "peering" → resourceName (the ID). These are project-level resources, not tied to a specific workspace.Before deleting a workspace, inspect it first:
atlas-streams-discover → inspect-workspace — get connection/processor countsatlas-streams-teardownYou MUST call search-knowledge before composing any processor pipeline. This is not optional.
prefix vs path for S3 $emit.dataSources: [{"name": "devcenter"}] for working pipelines, e.g. "Atlas Stream Processing tumbling window example".Also fetch examples from the official ASP examples repo when building non-trivial processors: https://github.com/mongodb/ASP_example (quickstarts, example processors, Terraform examples). Start with example_processors/README.md for the full pattern catalog.
Key quickstarts:
| Quickstart | Pattern |
|---|---|
00_hello_world.json | Inline $source.documents with $match (zero infra, ephemeral) |
01_changestream_basic.json | Change stream → tumbling window → $merge to Atlas |
03_kafka_to_mongo.json | Kafka source → tumbling window rollup → $merge to Atlas |
04_mongo_to_mongo.json | Chained processors: rollup → archive to separate collection |
05_kafka_tail.json | Real-time Kafka topic monitoring (sinkless, like tail -f) |
Invalid constructs — these are NOT valid in streaming pipelines:
$$NOW, $$ROOT, $$CURRENT — NOT available in stream processing. NEVER use these. Use the document's own timestamp field or _stream_meta metadata for event time instead of $$NOW.$source — HTTPS is for $https enrichment or sink only, NOT as a data source$source without topic — topic field is required$merge, $emit, $https, or $externalFunction async) required for deployed processors (sinkless only works via sp.process())$emit target — Lambda uses $externalFunction (mid-pipeline enrichment), not $emit$validate with validationAction: "error" — crashes processor; use "dlq" insteadRequired fields by stage:
$source (change stream): include fullDocument: "updateLookup" to get the full document content$source (Kinesis): use stream (NOT streamName or topic)$emit (Kinesis): MUST include partitionKey$emit (S3): use path (NOT prefix)$https: must include connectionName, path, method, as, onError: "dlq"$externalFunction: must include connectionName, functionName, execution, as, onError: "dlq"$validate: must include validator with $jsonSchema and validationAction: "dlq"$lookup: include parallelism setting (e.g., parallelism: 2) for concurrent I/OSee references/pipeline-patterns.md for stage field examples with JSON syntax.
SchemaRegistry connection: connectionType must be "SchemaRegistry" (not "Kafka"). Schema type values are case-sensitive (use lowercase avro, not AVRO). See references/connection-configs.md for required fields and auth types.
Elicitation: When creating connections, the build tool auto-collects missing sensitive fields (passwords, bootstrap servers) via MCP elicitation. Do NOT ask the user for these — let the tool collect them.
Auto-normalization:
bootstrapServers array → auto-converted to comma-separated stringschemaRegistryUrls string → auto-wrapped in arraydbRoleToExecute → defaults to {role: "readWriteAnyDatabase", type: "BUILT_IN"} for Cluster connectionsWorkspace creation: includeSampleData defaults to true, which auto-creates the sample_stream_solar connection.
Region naming: The region field uses Atlas-specific names that differ by cloud provider. Using the wrong format returns a cryptic dataProcessRegion error.
| Provider | Cloud Region | Streams region Value |
|---|---|---|
| AWS | us-east-1 | VIRGINIA_USA |
| AWS | us-east-2 | OHIO_USA |
| AWS | eu-west-1 | DUBLIN_IRL |
| GCP | us-central1 | US_CENTRAL1 |
| GCP | europe-west1 | EUROPE_WEST1 |
| Azure | eastus | eastus |
| Azure | westeurope | westeurope |
See references/connection-configs.md for the full region mapping table. If unsure, inspect an existing workspace with atlas-streams-discover → inspect-workspace and check dataProcessRegion.region.
Know what each connection type can do before creating pipelines:
| Connection Type | As Source ($source) | As Sink ($merge / $emit) | Mid-Pipeline | Notes |
|---|---|---|---|---|
| Cluster | ✅ Change streams | ✅ $merge to collections | ✅ $lookup | Change streams monitor insert/update/delete/replace operations |
| Kafka | ✅ Topic consumer | ✅ $emit to topics | ❌ | Source MUST include topic field |
| Sample Stream | ✅ Sample data | ❌ Not valid | ❌ | Testing/demo only |
| S3 | ❌ Not valid | ✅ $emit to buckets | ❌ | Sink only - use path, format, compression. Supports AWS PrivateLink. |
| Https | ❌ Not valid | ✅ $https as sink | ✅ $https enrichment | Can be used mid-pipeline for enrichment OR as final sink stage |
| AWSLambda | ❌ Not valid | ✅ $externalFunction (async only) | ✅ $externalFunction (sync or async) | Sink: execution: "async" required. Mid-pipeline: execution: "sync" or "async" |
| AWS Kinesis | ✅ Stream consumer | ✅ $emit to streams | ❌ | Similar to Kafka pattern |
| SchemaRegistry | ❌ Not valid | ❌ Not valid | ✅ Schema resolution | Metadata only - used by Kafka connections for Avro schemas |
Common connection usage mistakes to avoid:
$externalFunction as sink with execution: "sync" → Must use execution: "async" for sink stage$merge with Kafka → Use $emit for Kafka sinksSee references/connection-configs.md for detailed connection configuration schemas by type.
atlas-streams-discover → list-workspaces (check existing)atlas-streams-build → resource: "workspace" (region near data, SP10 for dev)atlas-streams-build → resource: "connection" (for each source/sink/enrichment)atlas-streams-discover → list-connections + inspect-connection for each — verify names match targets, present summary to usersearch-knowledge to validate field names. Fetch relevant examples from https://github.com/mongodb/ASP_exampleatlas-streams-build → resource: "processor" (with DLQ configured)atlas-streams-manage → start-processor (warn about billing)Incremental pipeline development (recommended): See references/development-workflow.md for the full 5-phase lifecycle.
$source → $merge pipeline (validate connectivity)$match stages (validate filtering)$addFields / $project transforms (validate reshaping)Modify a processor pipeline:
atlas-streams-manage → action: "stop-processor" — processor MUST be stopped firstatlas-streams-manage → action: "modify-processor" — provide new pipelineatlas-streams-manage → action: "start-processor" — restartDebug a failing processor:
atlas-streams-discover → diagnose-processor — one-shot health report. Always call this first.partitionIdleTimeout to Kafka $source (e.g., {"size": 30, "unit": "second"})find on DLQ collection
See references/output-diagnostics.md for the full pattern table.CRITICAL: A single pipeline can only have ONE terminal sink ($merge or $emit). When users request multiple output destinations (e.g., "write to Atlas AND emit to Kafka"), you MUST acknowledge the single-sink constraint and propose chained processors using an intermediate destination. See references/pipeline-patterns.md for the full pattern with examples.
See references/development-workflow.md for the complete pre-deploy quality checklist (connection validation, pipeline validation) and post-deploy verification workflow.
See references/sizing-and-parallelism.md for tier specifications, parallelism formulas, complexity scoring, and performance optimization strategies.
See references/development-workflow.md for the complete troubleshooting table covering processor failures, API errors, configuration issues, and performance problems.
Atlas Stream Processing has no free tier. All deployed processors incur continuous charges while running.
stop-processor stops billing; stopped processors retain state for 45 days at no chargesp.process() in mongosh — runs pipelines ephemerally without deploying a processorreferences/sizing-and-parallelism.md for tier pricing and cost optimization strategiesatlas-streams-teardown and atlas-streams-manage require user confirmation — do not bypassatlas-streams-teardown for a workspace, you MUST first inspect the workspace with atlas-streams-discover to count connections and processors, then present this information to the user before requesting confirmationresumeFromCheckpoint: false drops all window state — warn user first| File | Read when... |
|---|---|
references/pipeline-patterns.md | Building or modifying processor pipelines |
references/connection-configs.md | Creating connections (type-specific schemas) |
references/development-workflow.md | Following lifecycle management or debugging decision trees |
references/output-diagnostics.md | Processor output is unexpected (zero, low, or wrong) |
references/sizing-and-parallelism.md | Choosing tiers, tuning parallelism, or optimizing cost |