QCML-specific 100-question research ideation swarm using geometric observables framework
Project-specific wrapper around /research-ideation for the QCML geometric observatory project.
This skill runs a 100-question research ideation swarm against the QCML codebase. It:
research/ideation/questions.yamlideation_XXXX.yaml files to resume where it left offresearch/ideation/output_dir: "research/ideation"
num_knights: 6
test_bar: "smoke"
smoke_crises: ["2008_gfc", "2020_covid", "2022_rates", "2023_svb"]
keep_threshold: 0.3 # minimum Cohen's d
| File | Purpose |
|---|---|
qcml_geometry/core.py | QCMLGeometry class — Hilbert space embedding, metric tensor, Berry curvature |
qcml_geometry/observables.py | BaseRegimeDetector ABC + all detector implementations |
qcml_geometry/__init__.py | All detector exports |
experiments/data_loader.py | fetch_data(), create_feature_matrix(), ALL_CRISES dict |
experiments/evaluation.py | compute_cohens_d_with_ci(), cliffs_delta(), compute_effect_sizes() |
experiments/baselines.py | All baseline detector implementations |
experiments/regime_comparison.py | Full comparison pipeline with causal preprocessing |
qcml_geometry/fusion.py | Fusion methods (regime-adaptive, hierarchical, etc.) |
New detectors MUST follow this pattern:
from qcml_geometry.observables import BaseRegimeDetector
import numpy as np
class MyNewDetector(BaseRegimeDetector):
"""One-line description.
Detailed description of the observable and its theoretical basis.
Args:
window: Rolling window size for computation.
hilbert_dim: Dimension of Hilbert space (default 4).
"""
def __init__(self, window: int = 60, hilbert_dim: int = 4, **kwargs):
self.window = window
self.hilbert_dim = hilbert_dim
@property
def name(self) -> str:
return "My New Observable"
def fit(self, X: np.ndarray, **kwargs) -> 'MyNewDetector':
"""Fit on pre-crisis data only (causal)."""
# Store training statistics for normalization
self._train_mean = np.mean(X, axis=0)
self._train_std = np.std(X, axis=0) + 1e-8
return self
def compute_regime_scores(self, X: np.ndarray) -> np.ndarray:
"""Produce 1-D regime score time series.
CRITICAL: Must use expanding windows only — no future data.
Higher scores = more anomalous / crisis-like.
"""
scores = np.zeros(len(X))
for t in range(self.window, len(X)):
window_data = X[t - self.window:t] # past only
scores[t] = self._compute_observable(window_data)
return scores
def _compute_observable(self, window_data: np.ndarray) -> float:
"""Compute the observable value for a single window."""
# Implementation here
pass
from experiments.data_loader import fetch_data, create_feature_matrix, ALL_CRISES
# Fetch market data
df = fetch_data(['SPY', 'DIA'], '2005-01-01', '2025-12-31')
# Create feature matrix (returns, realized vol, etc.)
X = create_feature_matrix(df, symbols=['SPY', 'DIA'])
# Crisis definitions
crisis = ALL_CRISES['2008_gfc']
# {'start': '2008-09-01', 'end': '2009-03-31', 'label': 'GFC 2008'}
from experiments.evaluation import compute_cohens_d_with_ci, cliffs_delta
# Split scores into crisis vs normal periods
crisis_mask = (dates >= crisis['start']) & (dates <= crisis['end'])
crisis_scores = scores[crisis_mask]
normal_scores = scores[~crisis_mask]
# Cohen's d with bootstrap CI
d, ci_lo, ci_hi = compute_cohens_d_with_ci(
crisis_scores, normal_scores,
n_bootstrap=1000, # use 1000 for smoke test speed
seed=42
)
Workers running empirical tests should follow this pattern:
import json
import numpy as np
from pathlib import Path
# Setup
SMOKE_CRISES = ['2008_gfc', '2020_covid', '2022_rates', '2023_svb']
KEEP_THRESHOLD = 0.3
from experiments.data_loader import fetch_data, create_feature_matrix, ALL_CRISES
from experiments.evaluation import compute_cohens_d_with_ci
# Fetch data
df = fetch_data(['SPY', 'DIA'], '2005-01-01', '2025-12-31')
X = create_feature_matrix(df, symbols=['SPY', 'DIA'])
dates = X.index if hasattr(X, 'index') else None
# Instantiate detector
detector = MyNewDetector(window=60)
# For each crisis, fit on pre-crisis data, score full series
results = {}
for crisis_key in SMOKE_CRISES:
crisis = ALL_CRISES[crisis_key]
# Fit on data before crisis
pre_crisis_mask = dates < crisis['start']
detector.fit(X[pre_crisis_mask].values)
# Score full series
scores = detector.compute_regime_scores(X.values)
# Evaluate
crisis_mask = (dates >= crisis['start']) & (dates <= crisis['end'])
d, ci_lo, ci_hi = compute_cohens_d_with_ci(
scores[crisis_mask], scores[~crisis_mask],
n_bootstrap=1000, seed=42
)
results[crisis_key] = {'d': float(d), 'ci_lo': float(ci_lo), 'ci_hi': float(ci_hi)}
# Verdict
d_values = [r['d'] for r in results.values()]
median_d = float(np.median(d_values))
max_d = float(np.max(d_values))
passes = max_d > KEEP_THRESHOLD # keep if ANY crisis shows signal
# Save
output = {
'detector': detector.name,
'crises_tested': SMOKE_CRISES,
'results': results,
'median_d': median_d,
'max_d': max_d,
'passes_threshold': passes,
}
Path('smoke_results.json').write_text(json.dumps(output, indent=2))
The 100 questions span 10 themes:
Current top performers (from Paper 1, 36 methods x 17 crises):
Dead signals (d < 0.05):
Key insight: QCML methods have |rho|=0.132 vs baselines (highly orthogonal), making fusion valuable.