Multi-source data fusion using Dempster-Shafer theory. Combine signals from multiple independent sources to detect convergence events with mathematical confidence.
A system that ingests signals from multiple independent data sources (news, markets, social, sensors, APIs) and fuses them using Dempster-Shafer belief theory to detect when multiple signals converge on the same entity or event. When 3+ independent sources agree, you have high-confidence intelligence.
Source 1 (news) ──┐
Source 2 (market) ─┤
Source 3 (social) ─┼──→ Signal Normalizer → D-S Fusion → Convergence Detector → Alert
Source 4 (sensor) ─┤
Source N ──────────┘
Unlike simple averaging, D-S theory handles uncertainty and conflicting evidence:
// Dempster-Shafer combination rule
function dsFuse(belief1, belief2) {
// belief = { hypothesis: probability, uncertainty: probability }
const k = belief1.hypothesis * belief2.uncertainty +
belief1.uncertainty * belief2.hypothesis; // conflict
if (k >= 1) return { hypothesis: 0, uncertainty: 1, conflict: 1 }; // total conflict
return {
hypothesis: (belief1.hypothesis * belief2.hypothesis) / (1 - k),
uncertainty: (belief1.uncertainty * belief2.uncertainty) / (1 - k),
conflict: k
};
}
// Fuse multiple signals
function fuseSignals(signals) {
let fused = signals[0].belief;
for (let i = 1; i < signals.length; i++) {
fused = dsFuse(fused, signals[i].belief);
}
return fused;
}
// signal-fusion.js
const http = require('http');
const PORT = process.env.PORT || 5175;
// Signal storage — keyed by entity
const entitySignals = new Map();
// Ingest a signal from any source
function ingestSignal(signal) {
const { entity, source, type, confidence, data, timestamp } = signal;
if (!entitySignals.has(entity)) {
entitySignals.set(entity, { signals: [], lastFusion: null, alert: null });
}
const entry = entitySignals.get(entity);
entry.signals.push({
source,
type,
belief: { hypothesis: confidence, uncertainty: 1 - confidence },
data,
timestamp: timestamp || new Date().toISOString(),
expires: Date.now() + (24 * 60 * 60 * 1000) // 24h TTL
});
// Clean expired signals
entry.signals = entry.signals.filter(s => Date.now() < s.expires);
// Run fusion if 2+ sources
const uniqueSources = new Set(entry.signals.map(s => s.source));
if (uniqueSources.size >= 2) {
const fusionResult = fuseEntity(entity, entry.signals);
entry.lastFusion = fusionResult;
// Alert on convergence
if (fusionResult.confidence >= 0.7 && uniqueSources.size >= 3) {
entry.alert = {
entity,
confidence: fusionResult.confidence,
sources: [...uniqueSources],
sourceCount: uniqueSources.size,
timestamp: new Date().toISOString()
};
console.log(`[CONVERGENCE] ${entity}: ${fusionResult.confidence.toFixed(3)} from ${uniqueSources.size} sources`);
}
}
}
function fuseEntity(entity, signals) {
// Group by source, take latest from each
const bySource = {};
for (const s of signals) {
if (!bySource[s.source] || s.timestamp > bySource[s.source].timestamp) {
bySource[s.source] = s;
}
}
const sourceSignals = Object.values(bySource);
if (sourceSignals.length < 2) return { confidence: sourceSignals[0]?.belief.hypothesis || 0 };
const fused = fuseSignals(sourceSignals);
return {
confidence: fused.hypothesis,
uncertainty: fused.uncertainty,
conflict: fused.conflict,
sourceCount: sourceSignals.length,
sources: sourceSignals.map(s => ({ source: s.source, confidence: s.belief.hypothesis }))
};
}
// HTTP server
const server = http.createServer(async (req, res) => {
if (req.method === 'POST' && req.url === '/api/signal') {
let body = '';
req.on('data', c => body += c);
req.on('end', () => {
const signal = JSON.parse(body);
ingestSignal(signal);
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ status: 'ingested', entity: signal.entity }));
});
return;
}
if (req.url === '/api/alerts') {
const alerts = [...entitySignals.values()]
.filter(e => e.alert)
.map(e => e.alert)
.sort((a, b) => b.confidence - a.confidence);
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(alerts));
return;
}
if (req.url?.startsWith('/api/entity/')) {
const entity = decodeURIComponent(req.url.split('/api/entity/')[1]);
const data = entitySignals.get(entity);
res.writeHead(data ? 200 : 404, { 'Content-Type': 'application/json' });
res.end(JSON.stringify(data || { error: 'not found' }));
return;
}
res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({
status: 'ok',
entities: entitySignals.size,
activeAlerts: [...entitySignals.values()].filter(e => e.alert).length
}));
});
server.listen(PORT, () => console.log(`[Signal Fusion] :${PORT}`));
# News signal
curl -X POST localhost:5175/api/signal -H 'Content-Type: application/json' \
-d '{"entity":"AAPL","source":"news","type":"sentiment","confidence":0.8,"data":{"headline":"Apple announces record earnings"}}'
# Market signal
curl -X POST localhost:5175/api/signal -H 'Content-Type: application/json' \
-d '{"entity":"AAPL","source":"market","type":"price_action","confidence":0.75,"data":{"change":"+3.2%"}}'
# Social signal
curl -X POST localhost:5175/api/signal -H 'Content-Type: application/json' \
-d '{"entity":"AAPL","source":"social","type":"volume","confidence":0.65,"data":{"mentions":15000}}'
# Check convergence
curl localhost:5175/api/alerts
CREATE TABLE external_signals (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
entity TEXT NOT NULL,
source TEXT NOT NULL,
signal_type TEXT,
confidence FLOAT,
data JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE convergence_alerts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
entity TEXT NOT NULL,
fused_confidence FLOAT,
source_count INTEGER,
sources TEXT[],
data JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_signals_entity ON external_signals(entity);
CREATE INDEX idx_alerts_confidence ON convergence_alerts(fused_confidence DESC);
SKILL_COMPLETE