Use when analyzing data with Hive/Impala tables, writing SQL for data exploration, or building/deploying Spark ETL jobs on HDFS/YARN. ALWAYS trigger this skill — even if the user does not use these exact words — for any of the following: writing or reviewing a Spark Scala job, migrating SQL from Hive/Impala to Spark, creating or altering Hive tables, inserting data into partitioned tables, joining large tables in Spark SQL, using Spark UDFs, verifying table schema before coding, GROUP BY with text fields, OOM on large tables, INSERT column mismatch or silent data shifts, broadcast join stall or task explosion, DataFrame API being slow, cache() not materializing, metadata not visible after Spark write, date window off-by-one, control character regex not matching, Scala string interpolation bugs in Spark SQL, or any time the user says their Spark job is slow, wrong, or behaving unexpectedly.
| Mode | When | How to behave |
|---|---|---|
| Analysis | Profiling, threshold selection, diversity check, strategy decisions | Run SQL → present numbers → ask user before drawing conclusions |
| Coding | Writing Spark jobs, ETL SQL, table creation | Follow the rules below strictly; never guess types, column order, or table names |
When asked to analyze data or explore thresholds:
The numbers surface trade-offs. The decision belongs to the user.
⚙️ Engine: use Impala for exploration (seconds vs minutes). Switch to Hive only if the query requires features Impala lacks (e.g.,
LATERAL VIEW EXPLODE).
These rules exist because the resulting bugs are invisible: row counts look correct, no errors thrown, but data is silently wrong or performance collapses.
Never guess field names or types. A single type mismatch silently ruins JOINs.
hive -e "DESCRIBE db.table_name"
hive -e "SHOW CREATE TABLE db.table_name"
If JOIN key types differ across tables, cast explicitly — do not rely on implicit conversion.
Hard-coded names break multi-environment deployment and are impossible to override without recompiling.
// ❌
val df = spark.sql("SELECT * FROM some_db.some_table WHERE dt = '${dt}'")
// ✅ receive via CLI args, declare in .job config
val inputTable = cmd.getOptionValue("inputTable")
val df = spark.sql(s"SELECT * FROM $inputTable WHERE dt = '${dt}'")
User-generated text (descriptions, names) often contains ASCII control characters (\x01, \x03, etc. — Hive column delimiters). If such a field is in GROUP BY, Hive/Impala splits one row into many, shifting all downstream field values. Row count still looks normal — the bug is invisible.
Rule: finish all aggregation in CTEs, then append text fields in the outermost SELECT via LEFT JOIN.
-- In Hive/Impala:
WITH core AS (
SELECT key_id, group_concat(cast(id AS string), ',') AS id_list
FROM source_table
GROUP BY key_id -- ⚠️ no free-text fields here
)
SELECT core.*, regexp_replace(t.description, '[\\x00-\\x1f]', '') AS description
FROM core LEFT JOIN text_table t ON core.key_id = t.key_id AND t.dt = '${dt}';
Engine note:
group_concatis Hive/Impala syntax. In Spark SQL, useconcat_ws(',', collect_list(cast(id AS string)))instead. Mixing engines in one job is a common source of "function not found" errors.
Regex in Spark Scala: inside a Spark Scala
s"""..."""string, use[\u0000-\u001f]instead of[\\x00-\\x1f]— Scala processes the backslash before SQL sees it. Seereferences/spark-pitfalls.md §8.1.
Directly running GROUP BY + group_concat on a billion-row raw table causes OOM. First reduce the set in a CTE, then join the large table.
WITH targets AS (SELECT DISTINCT entity_id FROM ... WHERE conditions),
core AS (
SELECT t.entity_id, group_concat(cast(d.id AS string), ',') AS id_list
FROM targets t
INNER JOIN huge_table d ON t.entity_id = d.entity_id
GROUP BY t.entity_id -- aggregates only the filtered subset
)
SELECT * FROM core; -- this is a read query, not INSERT; see Rule 6 for INSERT patterns
Real project benchmark: DataFrame API (manual cache + repeated .count()) → 367 lines, 45+ Jobs, 3 hours. Spark SQL equivalent → 50 lines, 1 Job, 15 minutes.
Root causes:
.count() triggers a full Spark JobforeachPartition(_ => ()) is a no-op — it does not materialize cache// ✅
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1") // see Rule 5
spark.sql(s"INSERT OVERWRITE TABLE $output PARTITION(dt='${dt}') WITH ... SELECT ...")
If cache() is genuinely needed (result consumed by two independent branches), trigger it with a real Action:
df.count() // ✅ always works; adds one extra Job
df.write.mode("overwrite").format("noop").save("") // ✅ Spark 3.0+ with noop DataSource on classpath; no disk I/O
df.foreachPartition((_: Iterator[Row]) => ()) // ❌ no-op — never use this
Prefer
count()unless you have confirmednoopDataSource is available in your cluster. If uncertain, ask the user.
Remove all debug count()/collect() calls before deploying to production — each one is a full extra Job.
Spark may auto-broadcast a table it considers "small" — if that estimate is wrong, you get task explosion and job timeout. Always set this before any SQL execution when working with tables of unknown or medium-to-large size:
// Set before any SQL runs — has no effect if set after query planning
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")
| Table size estimate | Recommendation |
|---|---|
| < 1M rows, known small | Keep default; Spark auto-decides correctly |
| 1M–5M rows, or unsure | Disable first; review Spark UI, re-enable only if safe |
| > 5M rows, or job shows broadcast stall | Always disable |
If you observe a Stage stuck at BroadcastExchange in Spark UI with exploding task counts, disable immediately and rerun. See references/spark-pitfalls.md §3 for details and how to force-broadcast genuinely small tables with SQL hints.
CREATE TABLE IF NOT EXISTS does not modify an existing table's schema. INSERT ... SELECT * matches by position. Adding a new column shifts all subsequent columns silently — row count stays the same, data is corrupt.
// ❌
spark.sql(s"INSERT OVERWRITE TABLE $output PARTITION(dt='$dt') SELECT * FROM tmp")
// ✅ always list columns explicitly
spark.sql(s"""INSERT OVERWRITE TABLE $output PARTITION(dt='$dt')
|SELECT col1, col2, col3, new_col FROM tmp""".stripMargin)
When adding a column to an existing table:
DROP TABLE IF EXISTS, then let the job recreate it with the new schema.ALTER TABLE ADD COLUMNS, then update every INSERT statement. After the run, verify the new column has actual values — row count alone cannot detect a column shift.Dynamic column matching (when schema may vary across runs):
val tableColumns = spark.sql(s"DESCRIBE $output")
.filter("col_name NOT IN ('dt', '') AND col_name NOT LIKE '#%'")
.select("col_name").as[String].collect().toSeq
val dataColumns = result.columns.toSeq
val missingCols = tableColumns.diff(dataColumns)
// ⚠️ order by dataColumns first — NEVER reorder by tableColumns
val selectExprs = dataColumns.map(c => s"`$c`") ++ missingCols.map(c => s"NULL AS `$c`")
result.createOrReplaceTempView("tmp_result")
spark.sql(s"""INSERT OVERWRITE TABLE $output PARTITION(dt='$dt')
|SELECT ${selectExprs.mkString(", ")} FROM tmp_result""".stripMargin)
An INNER JOIN to fetch an optional field (description, tag, title — any field not guaranteed to exist for every row) silently drops non-matching rows. Row count on the primary table looks fine; you only notice the loss if you compare before/after counts explicitly.
-- ❌ rows with no description are silently dropped
... INNER JOIN meta_table meta ON core.id = meta.id
-- ✅ missing metadata becomes NULL; no rows are lost
... LEFT JOIN meta_table meta ON core.id = meta.id AND meta.dt = '${dt}'
Spark writes to HDFS but does not update the Hive Metastore. Querying immediately after a write without refreshing returns stale or empty results.
spark.sql(s"REFRESH TABLE $output") // or MSCK REPAIR TABLE / INVALIDATE METADATA in Hive/Impala
The restriction is on return types, not parameter types. Seq[String] as a UDF input parameter is completely safe. Only nested Scala collection return types cause NoClassDefFoundError at runtime due to JVM type erasure.
// ✅ Safe input parameter — Seq[String] as argument is fine
val myUdf = udf((items: Seq[String], size: Int) => {
// ❌ Unsafe return: Array[Seq[String]]
items.grouped(size).map(_.toSeq).toArray // crashes at runtime
// ✅ Safe return: flatten each group with a delimiter
items.grouped(size).map(_.mkString("|||")).toArray
})
See references/spark-pitfalls.md §5 for the full type-safety table and downstream split reconstruction pattern.
| Symptom | Root Cause | Rule |
|---|---|---|
| Hard-coded table name in Spark source | Cannot deploy cross-env | Rule 1 |
| New column all NULL / field value shifts | SELECT * + schema change | Rule 6 |
| Dynamic INSERT writes wrong columns | selectExprs ordered by target schema, not data columns | Rule 6 |
| 45+ Jobs, 3-hour runtime | DataFrame API + multiple count() | Rule 4 |
cache() didn't materialize | foreachPartition(_ => ()) is a no-op | Rule 4 |
| Job timeout, 26k+ tasks | Medium-sized table auto-broadcast | Rule 5 |
| Row explosion, field misalignment | Control characters in GROUP BY field | Rule 2 |
| OOM | Direct aggregation on billion-row table | Rule 3 |
| Silent row loss | INNER JOIN on optional field | Rule 7 |
| Hive/Impala sees no data after write | Metadata not refreshed | Rule 8 |
| JOIN key type mismatch → slow / wrong | Types not verified before coding | Rule 0 |
| Control char regex broken in Spark Scala | [\\x00-\\x1f] in s-string; use [\u0000-\u001f] | spark-pitfalls §8.1 |
| Date window off by one (13d vs 14d) | date_sub(dt, N) + <= dt gives N+1 days | spark-pitfalls §8.3 |
| Spark SQL variable error | $varName near punctuation; use ${varName} | spark-pitfalls §8.2 |
UDF NoClassDefFoundError | Returning nested Scala collections | Rule 9 |
| DESCRIBE returns junk rows | Missing NOT LIKE '#%' filter on partition info rows | Rule 6 |
| Job slower after "SQL optimization" | Extra back-JOINs cost more than the GROUP BY savings | spark-pitfalls §7.1 |
| Wrong broadcast decision by Catalyst | Missing table statistics; run ANALYZE TABLE | spark-pitfalls §9 |
DROP TABLE or external lifecycle management toolTBLPROPERTIES extensions for automatic expiration — consult your cluster's documentationdt; write one partition per run to support rerunscount()/collect() calls before production deploymentreferences/spark-pitfalls.md — detailed root-cause analysis and extended code examples for Rules 1–9references/sql-patterns.md — SQL patterns where AI assistants commonly write incorrect code (Rules 2–3 and control character handling)