ClickHouse database patterns, query optimization, analytics, and data engineering best practices for high-performance analytical workloads.
AggregatingMergeTree() PARTITION BY toYYYYMM( hour ) ORDER BY ( hour , market_id); -- 查詢聚合資料 SELECT hour , market_id, sumMerge(total_volume) AS volume, countMerge(total_trades) AS trades, uniqMerge(unique_users) AS users FROM market_stats_hourly WHERE hour
= toStartOfHour(now()
INTERVAL 24 HOUR ) GROUP BY hour , market_id ORDER BY hour DESC ; 查詢優化模式 高效過濾 -- PASS: 良好:先使用索引欄位 SELECT * FROM markets_analytics WHERE date
= '2025-01-01' AND market_id = 'market-123' AND volume
1000 ORDER BY date DESC LIMIT 100 ; -- FAIL: 不良:先過濾非索引欄位 SELECT * FROM markets_analytics WHERE volume
1000 AND market_name LIKE '%election%' AND date
= '2025-01-01' ; 聚合 -- PASS: 良好:使用 ClickHouse 特定聚合函式 SELECT toStartOfDay(created_at) AS day , market_id, sum (volume) AS total_volume, count () AS total_trades, uniq(trader_id) AS unique_traders, avg (trade_size) AS avg_size FROM trades WHERE created_at
INTERVAL 7 DAY GROUP BY day , market_id ORDER BY day DESC , total_volume DESC ; -- PASS: 使用 quantile 計算百分位數(比 percentile 更高效) SELECT quantile( 0.50 )(trade_size) AS median, quantile( 0.95 )(trade_size) AS p95, quantile( 0.99 )(trade_size) AS p99 FROM trades WHERE created_at
= now()
INTERVAL 1 HOUR ; 視窗函式 -- 計算累計總和 SELECT date , market_id, volume, sum (volume) OVER ( PARTITION BY market_id ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) AS cumulative_volume FROM markets_analytics WHERE date
= today()
INTERVAL
30
DAY
ORDER
BY
market_id,
date
;
資料插入模式
批量插入(推薦)
import
{
ClickHouse
}
from
'clickhouse'
const
clickhouse =
new
ClickHouse
({
url
: process.
env
.
CLICKHOUSE_URL
,
port
:
8123
,
basicAuth
: {
username
: process.
env
.
CLICKHOUSE_USER
,
password
: process.
env
.
CLICKHOUSE_PASSWORD
}
})
// PASS: 批量插入(高效)
async
function
bulkInsertTrades
(
trades
:
Trade
[]
) {
const
values = trades.
map
(
trade
=>
( ' ${trade.id} ', ' ${trade.market_id} ', ' ${trade.user_id} ', ${trade.amount} , ' ${trade.timestamp.toISOString()} ' )
).
join
(
','
)
await
clickhouse.
query
(
INSERT INTO trades (id, market_id, user_id, amount, timestamp) VALUES ${values}
).
toPromise
()
}
// FAIL: 個別插入(慢)
async
function
insertTrade
(
trade
:
Trade
) {
// 不要在迴圈中這樣做!
await
clickhouse.
query
(
INSERT INTO trades VALUES (' ${trade.id} ', ...)
).
toPromise
()
}
串流插入
// 用於持續資料攝取
import
{ createWriteStream }
from
'fs'
import
{ pipeline }
from
'stream/promises'
async
function
streamInserts
(
) {
const
stream = clickhouse.
insert
(
'trades'
).
stream
()
for
await
(
const
batch
of
dataSource) {
stream.
write
(batch)
}
await
stream.
end
()
}
物化視圖
即時聚合
-- 建立每小時統計的物化視圖
CREATE
MATERIALIZED
VIEW
market_stats_hourly_mv
TO
market_stats_hourly
AS
SELECT
toStartOfHour(
timestamp
)
AS
hour
,
market_id,
sumState(amount)
AS
total_volume,
countState()
AS
total_trades,
uniqState(user_id)
AS
unique_users
FROM
trades
GROUP
BY
hour
, market_id;
-- 查詢物化視圖
SELECT
hour
,
market_id,
sumMerge(total_volume)
AS
volume,
countMerge(total_trades)
AS
trades,
uniqMerge(unique_users)
AS
users
FROM
market_stats_hourly
WHERE
hour
= now()
'QueryFinish' AND query_duration_ms
1000 AND event_time
= now()
INTERVAL 1 HOUR ORDER BY query_duration_ms DESC LIMIT 10 ; 表格統計 -- 檢查表格大小 SELECT database, table , formatReadableSize( sum (bytes)) AS size, sum ( rows ) AS rows , max (modification_time) AS latest_modification FROM system.parts WHERE active GROUP BY database, table ORDER BY sum (bytes) DESC ; 常見分析查詢 時間序列分析 -- 每日活躍使用者 SELECT toDate( timestamp ) AS date , uniq(user_id) AS daily_active_users FROM events WHERE timestamp
= today()
today() ) GROUP BY session_id; 世代分析 -- 按註冊月份的使用者世代 SELECT toStartOfMonth(signup_date) AS cohort, toStartOfMonth(activity_date) AS month , dateDiff( 'month' , cohort, month ) AS months_since_signup, count ( DISTINCT user_id) AS active_users FROM ( SELECT user_id, min (toDate( timestamp )) OVER ( PARTITION BY user_id) AS signup_date, toDate( timestamp ) AS activity_date FROM events ) GROUP BY cohort, month , months_since_signup ORDER BY cohort, months_since_signup; 資料管線模式 ETL 模式 // 提取、轉換、載入 async function etlPipeline ( ) { // 1. 從來源提取 const rawData = await extractFromPostgres () // 2. 轉換 const transformed = rawData. map ( row => ({ date : new Date (row. created_at ). toISOString (). split ( 'T' )[ 0 ], market_id : row. market_slug , volume : parseFloat (row. total_volume ), trades : parseInt (row. trade_count ) })) // 3. 載入到 ClickHouse await bulkInsertToClickHouse (transformed) } // 定期執行 setInterval (etlPipeline, 60 * 60 * 1000 ) // 每小時 變更資料捕獲(CDC) // 監聽 PostgreSQL 變更並同步到 ClickHouse import { Client } from 'pg' const pgClient = new Client ({ connectionString : process. env . DATABASE_URL })
pgClient. query ( 'LISTEN market_updates' )
pgClient. on ( 'notification' , async (msg) => { const update = JSON . parse (msg. payload ) await clickhouse. insert ( 'market_updates' , [ { market_id : update. id , event_type : update. operation , // INSERT, UPDATE, DELETE timestamp : new Date (), data : JSON . stringify (update. new_data ) } ]) }) 最佳實務