Tier-2 phase skill. Executes document parsing (AI_PARSE_DOCUMENT), AI_AGG aggregation for split documents, AI-ready content layer refresh, Semantic View creation, and pipeline verification. No reactive gates — all decisions were made in prior phases.
This phase skill executes document parsing, aggregation, content layer refresh, Semantic View creation, and pipeline verification. There are no reactive gates in this phase — all data decisions were made in the classify and extract phases.
| Parameter | Source |
|---|---|
{db}, {schema}, {stage}, {warehouse} | confirm-environment |
{warehouse_size_decision} | confirm-pipeline-config |
{classification_distribution} | phase-classify |
{extraction_count} | phase-extract |
| Parameter | Description |
|---|---|
{pages_parsed} |
| Total pages parsed |
{raw_content_rows} | Rows in CLINICAL_DOCUMENTS_RAW_CONTENT |
{sv_created} | Whether Semantic View was created |
{pipeline_summary} | Full pipeline summary |
If {warehouse_size_decision} = auto-resize:
ALTER WAREHOUSE {warehouse} SET WAREHOUSE_SIZE = '3XLARGE';
CALL {db}.{schema}.CLINICAL_DOCUMENTS_PARSE_WITH_IMAGES_V2();
Parsing mode selection logic:
complex_tables_flag = YES → mode: LAYOUTimage_flag = YES → mode: LAYOUT, extract_images: truemode: OCRFor documents with images, INJECT_IMAGE_DESCRIPTIONS UDF inserts AI-generated descriptions adjacent to image references in page content.
Report: "{N} pages parsed from {M} documents"
If {warehouse_size_decision} = auto-resize:
ALTER WAREHOUSE {warehouse} SET WAREHOUSE_SIZE = 'XLARGE';
For documents that were split in preprocessing:
CALL {db}.{schema}.CLASSIFY_AGGREGATED_DOCUMENTS();
Uses AI_AGG to classify across all pages of a split document.
CALL {db}.{schema}.EXTRACT_DOCUMENT_TYPE_SPECIFIC_VALUES_WITH_AI_AGG();
Iterates through each document classification, builds extraction prompt via LISTAGG, runs AI_AGG over page content.
If {warehouse_size_decision} = auto-resize:
ALTER WAREHOUSE {warehouse} SET WAREHOUSE_SIZE = 'MEDIUM';
IMPORTANT: Before running the refresh, verify prerequisite data exists:
SELECT 'parse_output' AS source, COUNT(*) AS rows FROM {db}.{schema}.DOCS_PARSE_OUTPUT
UNION ALL
SELECT 'classifications', COUNT(*) FROM {db}.{schema}.DOC_CLASSIFICATION_METADATA_ROWS
UNION ALL
SELECT 'extractions', COUNT(*) FROM {db}.{schema}.DOC_TYPE_SPECIFIC_VALUES_EXTRACT_OUTPUT;
If any returns 0, WARN the user and skip that data source rather than consuming the stream with empty results.
INSERT INTO {db}.{schema}.CLINICAL_DOCUMENTS_RAW_CONTENT (
DOCUMENT_RELATIVE_PATH, DOCUMENT_STAGE, PAGE_NUMBER_IN_PARENT,
DOCUMENT_CLASSIFICATION, PATIENT_NAME, MRN, PAGE_CONTENT,
DOC_TOTAL_PAGES, PRESIGNED_URL, STAGE_FILE_URL, URL_GENERATED_AT
)
-- Non-split documents
SELECT
s.DOCUMENT_RELATIVE_PATH, s.DOCUMENT_STAGE, s.PAGE_NUMBER_IN_PARENT,
dcm_cls.FIELD_VALUE AS DOCUMENT_CLASSIFICATION,
{coalesce_patient_fields} AS PATIENT_NAME,
{coalesce_mrn_fields} AS MRN,
CONCAT('[Page ', s.PAGE_NUMBER_IN_PARENT, ']' || CHR(10) || CHR(10), s.PAGE_CONTENT),
s.DOC_TOTAL_PAGES,
GET_PRESIGNED_URL(@{db}.{schema}.{stage}, s.DOCUMENT_RELATIVE_PATH, 604800),
CONCAT('snow://stage/', REPLACE(s.DOCUMENT_STAGE, '@', ''), '/', s.DOCUMENT_RELATIVE_PATH),
CURRENT_TIMESTAMP()
FROM {db}.{schema}.DOCS_PARSE_OUTPUT s
LEFT JOIN {db}.{schema}.DOC_CLASSIFICATION_METADATA_ROWS dcm_cls
ON s.DOCUMENT_RELATIVE_PATH = dcm_cls.DOCUMENT_RELATIVE_PATH
AND dcm_cls.FIELD_NAME = 'DOCUMENT_CLASSIFICATION'
{join_pivot_views_non_split}
WHERE s.PARENT_DOCUMENT_RELATIVE_PATH IS NULL
UNION ALL
-- Split documents (join via DOCUMENT_HIERARCHY)
SELECT
s.PARENT_DOCUMENT_RELATIVE_PATH, dh.PARENT_DOCUMENT_STAGE, s.PAGE_NUMBER_IN_PARENT,
dcm_cls.FIELD_VALUE, {coalesce_patient_fields}, {coalesce_mrn_fields},
CONCAT('[Page ', s.PAGE_NUMBER_IN_PARENT, ']' || CHR(10) || CHR(10), s.PAGE_CONTENT),
s.DOC_TOTAL_PAGES,
GET_PRESIGNED_URL(@{db}.{schema}.{stage}, s.PARENT_DOCUMENT_RELATIVE_PATH, 604800),
CONCAT('snow://stage/', REPLACE(dh.PARENT_DOCUMENT_STAGE, '@', ''), '/', s.PARENT_DOCUMENT_RELATIVE_PATH),
CURRENT_TIMESTAMP()
FROM {db}.{schema}.DOCS_PARSE_OUTPUT s
JOIN {db}.{schema}.DOCUMENT_HIERARCHY dh
ON s.DOCUMENT_RELATIVE_PATH = dh.DOCUMENT_RELATIVE_PATH
LEFT JOIN {db}.{schema}.DOC_CLASSIFICATION_METADATA_ROWS dcm_cls
ON s.PARENT_DOCUMENT_RELATIVE_PATH = dcm_cls.DOCUMENT_RELATIVE_PATH
AND dcm_cls.FIELD_NAME = 'DOCUMENT_CLASSIFICATION'
{join_pivot_views_split}
WHERE s.PARENT_DOCUMENT_RELATIVE_PATH IS NOT NULL;
EXECUTE TASK {db}.{schema}.REFRESH_RAW_CONTENT_TASK;
If the stream is consumed but RAW_CONTENT is empty:
CREATE OR REPLACE STREAM {db}.{schema}.DOCS_PARSE_OUTPUT_STREAM ON TABLE {db}.{schema}.DOCS_PARSE_OUTPUT;The Semantic View cannot be created until pivot views contain data.
-- Verify pivot views have data
SELECT COUNT(*) FROM {db}.{schema}.DISCHARGE_SUMMARY_V;
CRITICAL: GENERATE_DYNAMIC_OBJECTS() stored procedure CANNOT be created via snowflake_sql_execute — two patterns in the $$ body are incompatible (Constraints #16: EXECUTE IMMEDIATE...INTO :var → unexpected 'INTO', and #17: IDENTIFIER(var||'...') → unexpected 'v_fqn'). DO NOT attempt to CREATE the procedure. Instead, execute each step from the proc body individually:
Use dynamic_pipeline_setup.sql Steps 0-7b as templates. Replace :v_fqn with '{db}.{schema}'.
When creating pivot views manually (not via GENERATE_DYNAMIC_OBJECTS):
'MRN', 'PATIENT_NAME'"'MRN'" (double-quoted with embedded single quotes)"'MRN'" AS MRN, UPPER("'PATIENT_NAME'") AS PATIENT_NAME"MRN" or MRN → causes invalid identifier 'MRN'DIMENSIONS use TABLE.DIMENSION_NAME AS ACTUAL_COLUMN_NAME (dimension name first, NOT the column!):
DISCHARGE_SUMMARY_V.DS_MRN AS MRNDISCHARGE_SUMMARY_V.MRN AS DS_MRNMETRICS use TABLE.METRIC_NAME AS AGGREGATE_EXPRESSION:
DISCHARGE_SUMMARY_V.PAT_CNT AS COUNT(DISTINCT MRN)SELECT 'Documents' AS metric, COUNT(DISTINCT DOCUMENT_RELATIVE_PATH) AS value FROM {db}.{schema}.DOCUMENT_HIERARCHY
UNION ALL
SELECT 'Pages Parsed', COUNT(*) FROM {db}.{schema}.DOCS_PARSE_OUTPUT
UNION ALL
SELECT 'Classifications', COUNT(DISTINCT DOCUMENT_RELATIVE_PATH) FROM {db}.{schema}.DOC_CLASSIFICATION_METADATA_ROWS
UNION ALL
SELECT 'Extractions', COUNT(*) FROM {db}.{schema}.DOC_TYPE_SPECIFIC_VALUES_EXTRACT_OUTPUT
UNION ALL
SELECT 'AI-Ready Pages', COUNT(*) FROM {db}.{schema}.CLINICAL_DOCUMENTS_RAW_CONTENT;
Sample extracted data:
SELECT DOCUMENT_CLASSIFICATION, FIELD_NAME, FIELD_VALUE
FROM {db}.{schema}.DOC_TYPE_SPECIFIC_VALUES_EXTRACT_OUTPUT
ORDER BY DOCUMENT_RELATIVE_PATH, DOCUMENT_CLASSIFICATION, FIELD_NAME
LIMIT 20;
If {warehouse_size_decision} = auto-resize:
ALTER WAREHOUSE {warehouse} SET WAREHOUSE_SIZE = '{original_size}';
PHASE COMPLETE: parse-and-refresh
pages_parsed: {N}
raw_content_rows: {M}
sv_created: {true|false}
pipeline_summary:
Documents: {doc_count}
Pages Parsed: {page_count}
Classifications: {class_count}
Extractions: {extract_count}
AI-Ready Pages: {raw_count}
STOP HERE. Return to caller for "What next?" routing.