Validate a completed PySpark to Snowpark Connect (SCOS) migration by running the migrated workload end-to-end with synthetic data. Use when: verifying migration correctness, smoke-testing migrated code, checking SCOS runtime compatibility. Triggers: validate migration, verify scos, test migration, smoke test, run migrated workload.
Run the migrated _scos workload by importing its real functions/classes and executing them with synthetic data.
[snowpark-connect] Intent Detection: After user indicates validation intent (validate, verify, check, test, review migration).
$ARGUMENTS - Path to migrated _scos script, notebook, or directory.Derived names used throughout:
<workload> = basename of $ARGUMENTS (e.g., if $ARGUMENTS is /path/to/my_pipeline_scos.py, then <workload> = my_pipeline_scos)<workload>_test/ = test directory created alongside the workload<workload>_test/ and add a single entrypoint.py file that triggers the main execution flow.spark.table()) by checking if tables exist and creating them if permitted. Mock Category B sources (spark.read.* with cloud/local paths) by uploading synthetic files to a Snowflake stage and replacing paths in the workload copy (see Phases 1.3 and 2.1).from modeling_library import model, load_data) and call it. Do NOT rewrite or duplicate workload logic — no independent test cases like "Test window functions" or "Test joins".uv --version || echo "PREREQ_FAIL: uv not installed"
uv run --project <SKILL_DIRECTORY> \
python -c "from snowflake import snowpark_connect; spark = snowpark_connect.init_spark_session(); print('OK')" \
|| echo "PREREQ_FAIL: Snowflake connection failed"
# Check 3 (notebook workloads only): jupyter nbconvert
uv run --project <SKILL_DIRECTORY> \
jupyter nbconvert --version \
|| echo "PREREQ_FAIL: jupyter nbconvert not installed"
You MUST perform the phases below in order.
test -e "$ARGUMENTS" || echo "ABORT: Migrated workload not found"
Find all external data access in the workload: spark.read.*, spark.table(), spark.sql("SELECT ... FROM ..."), boto3/S3.
For directory workloads, scan ALL .py files in the directory — not just the main entrypoint. Data reads may occur in any module (e.g., a loader.py or data_access.py).
For single .py files, search the source directly. For .ipynb notebooks, search within the source arrays of code cells in the notebook JSON.
Classify each data source into one of two categories:
Category A — Table-mockable (handled via Snowflake tables):
spark.table("table_name") callsspark.sql("SELECT ... FROM table_name") callsFor each, determine:
database.schema.table_name)Category B — Stage-mockable (requires Snowflake stage with synthetic files):
spark.read.csv("s3://..."), spark.read.parquet("gs://..."), spark.read.json("/mnt/..."), or any spark.read.* call with an external cloud path (s3://, s3a://, gs://, abfs://, wasb://, adl://) or local/mounted pathspark.read.format(...).load("path") variantsFor each file read, capture:
csv, parquet, json, text, or format(...).load(...)s3://analytics-lake/raw/events/2024/header=True, inferSchema=True, delimiter=","If any Category B (stage-mockable) reads were found, you MUST pause and alert the user:
⚠️ External cloud reads detected in the migrated workload:
1. spark.read.<method>("<path>")
2. ...
These paths reference external cloud storage (S3, GCS, Azure Blob, etc.).
Snowflake recommends creating an external stage that points to these
cloud locations for production use.
For validation, I need a Snowflake stage to upload synthetic test data.
Do you already have an external stage for these locations?
- If YES: provide the stage name and I'll upload mock data files to it.
- If NO: I'll create an internal stage (SCOS_VALIDATION_<workload>)
and upload synthetic files there for testing.
Wait for the user to respond before proceeding.
<STAGE_NAME>. Skip stage creation in Phase 1.3.1 (stage already exists). Proceed to Phase 1.3.2 to generate and upload mock data.SCOS_VALIDATION_<workload> as <STAGE_NAME>. Create it as an internal stage in Phase 1.3.1.<STAGE_NAME>. Create it in Phase 1.3.1.Use a single stage for ALL mock files. Do NOT create multiple stages — store all synthetic data files in one stage, organized by subpath (e.g., @<STAGE_NAME>/events/, @<STAGE_NAME>/users/).
If any Category A (table-mockable) reads were found, you MUST pause and verify table existence in Snowflake before proceeding:
SHOW TABLES or SELECT COUNT(*).uv run --project <SKILL_DIRECTORY> python -c "
from snowflake.snowpark import Session
session = Session.builder.config('connection_name', 'default').create()