Column-oriented MsgPack result encoding for all query outputs on the platform. Row-based JSON is forbidden. Covers ColumnarResult struct, PaginationMeta struct, EncodeColumnar() and DecodeColumnar() functions, wire format shape, HTTP/NATS content type, cursor vs offset pagination meta, and integration points in List() and Analytics() repository methods. Used by both OLTP (List) and analytics (Analytics) execution paths. Keywords: MsgPack, columnar encoding, column-oriented, ColumnarResult, pagination meta, cursor, offset, encoding, decoding, HTTP, NATS, application/x-msgpack, query result, PostgreSQL 18.
SCOPE: DAL SERVICE ONLY — These patterns run INSIDE the
data-access-layerservice. Entity services NEVER use pgxpool, AcquireForTenant, or execute SQL directly. Entity services use the/dal-service-patternsskill to communicate with DAL via NATS. If you are implementing an entity service (identity, incident-core, etc.), STOP and use/dal-service-patternsinstead.
Used by: database-architect, code-generator (DAL service only)
All query results — OLTP and analytics — are encoded as column-oriented MsgPack. Never use:
json.Marshal(rows) — row-based JSON[]byte of [{"col": val}, ...] — row-based encodingColumn-oriented means values are grouped by column, not by row:
// Row-based (FORBIDDEN)
[
{"id": "uuid1", "status": "open", "priority": "high"},
{"id": "uuid2", "status": "closed", "priority": "low"}
]
// Column-oriented MsgPack (REQUIRED)
{
"columns": ["id", "status", "priority"],
"types": ["uuid", "text", "text"],
"data": {
"id": ["uuid1", "uuid2"],
"status": ["open", "closed"],
"priority": ["high", "low"]
},
"pagination": {
"next_cursor": "2026-03-25T10:00:00Z",
"next_offset": 0,
"has_more": true,
"total_returned": 2
}
}
// internal/encoding/columnar.go (inside data-access-layer service)
package encoding
import (
"fmt"
"github.com/vmihailenco/msgpack/v5"
)
// ColumnarResult is the standard wire format for all query responses.
type ColumnarResult struct {
Columns []string `msgpack:"columns"`
Types []string `msgpack:"types"`
Data map[string][]any `msgpack:"data"`
Pagination PaginationMeta `msgpack:"pagination"`
}
// PaginationMeta carries cursor/offset continuation tokens.
type PaginationMeta struct {
NextCursor string `msgpack:"next_cursor,omitempty"` // cursor pagination
NextOffset int `msgpack:"next_offset,omitempty"` // offset pagination
HasMore bool `msgpack:"has_more"`
TotalReturned int `msgpack:"total_returned"`
}
// EncodeColumnar packs []map[string]any into column-oriented MsgPack bytes.
// columns defines the output column order; extra keys in rows are ignored.
// Cross-join dotted columns (e.g. "user.user_name") must be included in
// columns if they should appear in the output.
func EncodeColumnar(rows []map[string]any, columns []string, pagination PaginationMeta) ([]byte, error) {
result := ColumnarResult{
Columns: columns,
Types: inferTypes(rows, columns),
Data: make(map[string][]any, len(columns)),
Pagination: pagination,
}
for _, col := range columns {
result.Data[col] = make([]any, 0, len(rows))
}
for _, row := range rows {
for _, col := range columns {
result.Data[col] = append(result.Data[col], row[col]) // nil if absent
}
}
b, err := msgpack.Marshal(result)
if err != nil {
return nil, fmt.Errorf("encoding columnar msgpack: %w", err)
}
return b, nil
}
// inferTypes returns a best-effort type string per column from the first row.
func inferTypes(rows []map[string]any, columns []string) []string {
types := make([]string, len(columns))
for i, col := range columns {
types[i] = "any"
if len(rows) > 0 {
if v := rows[0][col]; v != nil {
switch v.(type) {
case string:
types[i] = "text"
case int, int32, int64:
types[i] = "int"
case float32, float64:
types[i] = "float"
case bool:
types[i] = "bool"
default:
types[i] = "any"
}
}
}
}
return types
}
// DecodeColumnar unpacks column-oriented MsgPack into ColumnarResult.
func DecodeColumnar(data []byte) (ColumnarResult, error) {
var result ColumnarResult
if err := msgpack.Unmarshal(data, &result); err != nil {
return ColumnarResult{}, fmt.Errorf("decoding columnar msgpack: %w", err)
}
return result, nil
}
// ToRows converts ColumnarResult back into []map[string]any for convenience
// (e.g. in test assertions or downstream processing).
func (r ColumnarResult) ToRows() []map[string]any {
if len(r.Columns) == 0 || len(r.Data) == 0 {
return nil
}
n := len(r.Data[r.Columns[0]])
rows := make([]map[string]any, n)
for i := range rows {
rows[i] = make(map[string]any, len(r.Columns))
for _, col := range r.Columns {
rows[i][col] = r.Data[col][i]
}
}
return rows
}
// internal/encoding/pagination.go (inside data-access-layer service)
package encoding
import "fmt"
// BuildCursorMeta builds PaginationMeta for cursor-based paging.
// Pass all returned rows (before trimming to limit) to detect HasMore.
func BuildCursorMeta(rows []map[string]any, limit int) (PaginationMeta, []map[string]any) {
hasMore := len(rows) > limit
if hasMore {
rows = rows[:limit]
}
meta := PaginationMeta{
HasMore: hasMore,
TotalReturned: len(rows),
}
if hasMore && len(rows) > 0 {
if ts, ok := rows[len(rows)-1]["created_at"]; ok {
meta.NextCursor = fmt.Sprintf("%v", ts)
}
}
return meta, rows
}
// BuildOffsetMeta builds PaginationMeta for offset-based paging.
func BuildOffsetMeta(rows []map[string]any, currentOffset, limit int) PaginationMeta {
return PaginationMeta{
NextOffset: currentOffset + len(rows),
HasMore: len(rows) == limit,
TotalReturned: len(rows),
}
}
// HTTP handler: set correct Content-Type
w.Header().Set("Content-Type", "application/x-msgpack")
w.WriteHeader(http.StatusOK)
w.Write(msgpackBytes)
// NATS message: set header
msg := nats.NewMsg(subject)
msg.Header.Set("Content-Type", "application/x-msgpack")
msg.Data = msgpackBytes
nc.PublishMsg(msg)
// Consumer decoding
result, err := encoding.DecodeColumnar(msg.Data)
Encoding happens inside the DAL service handler, after SQL execution:
// internal/handler/handler.go — HandleList encodes result after execution
func (h *Handler) HandleList(msg *nats.Msg) {
// ... parse, compile, execute ...
results, columns := scanToMaps(rows)
// Resolve cross-joins if needed
if len(compiled.CrossJoins) > 0 {
results, _ = h.crossjoin.Resolve(ctx, results, compiled.CrossJoins, 4)
columns = crossjoin.AppendCrossJoinColumns(columns, req.Query.Columns, compiled.CrossJoins)
}
// GOOD: Encode as MsgPack columnar
pagination, results := encoding.BuildCursorMeta(results, req.Query.Pagination.Limit)
msgpackBytes, _ := encoding.EncodeColumnar(results, columns, pagination)
msg.Respond(msgpackBytes)
}
// BAD: Returning row-based JSON from DAL
func (h *Handler) HandleList(msg *nats.Msg) {
jsonBytes, _ := json.Marshal(rows) // FORBIDDEN — must use EncodeColumnar
msg.Respond(jsonBytes)
}
EncodeColumnar.columns slice — EncodeColumnar skips keys not in columns; dotted keys like user.user_name must be appended before encoding.BuildCursorMeta with already-trimmed rows — pass full limit+1 result rows so HasMore detection works correctly.Content-Type header — must be application/x-msgpack for both HTTP and NATS.json.Unmarshal on the client — response is MsgPack; use DecodeColumnar or a MsgPack client library.