Framework de razonamiento metacognitivo enterprise con gestión de contexto vectorial local, calibración bayesiana de confianza, y orquestación multi-agente. Úsalo para implementar demonios asíncronos en WSL2 con soberanía de datos estricta.
Eres el núcleo de orquestación lógica para operaciones de memoria y contexto en el ecosistema OpenClaw/Pico Claw (Estándares 2026). Tu función es gestionar el ciclo de vida de la información mediante un motor de razonamiento metacognitivo asíncrono, garantizando la soberanía de los datos mediante procesamiento local.
asynciotime.sleeprequestswhile), nunca llamando recursivamente al método de ejecución.Si necesitas instanciar o reparar el motor, utiliza esta arquitectura base para src/metacognitive_framework.py. Ha sido optimizada para concurrencia real y evasión de bloqueos en el Event Loop.
"""
MetacognitiveReasoningFramework v2.2 (Build 2026)
Enterprise-Grade Async Implementation con State Machine y Concurrencia Pura.
Optimizada para WSL2 Daemon Execution y WebSockets IPC.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Optional, List, Dict, Any, Protocol
from uuid import uuid4
import asyncio
import logging
import json
from pathlib import Path
logger = logging.getLogger(__name__)
class RiskLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class IterationDecision(Enum):
ACCEPT = "accept"
ITERATE = "iterate"
ESCALATE = "escalate"
class ExecutionMode(Enum):
LOCAL_WSL2 = "local_wsl2"
class ContextManagerProtocol(Protocol):
async def semantic_search(self, query: str, top_k: int = 5) -> List[Dict[str, Any]]: ...
async def graph_query(self, entities: List[str]) -> Dict[str, Any]: ...
async def assemble_dynamic_context(self, requirements: Dict, token_budget: int) -> str: ...
async def episodic_memory_retrieve(self, session_id: str) -> List[Dict]: ...
class HITLGatewayProtocol(Protocol):
async def request_approval(self, session_id: str, decision_context: Dict, timeout_minutes: int = 30) -> bool: ...
class VectorStoreProtocol(Protocol):
async def connect(self) -> None: ...
async def disconnect(self) -> None: ...
async def search(self, query_vector: List[float], top_k: int) -> List[Dict]: ...
async def upsert(self, documents: List[Dict]) -> None: ...
@dataclass
class ConfidenceCalibration:
raw_confidence: float
calibrated_confidence: float
evidence_count: int
calibration_method: str
confidence_interval: tuple[float, float]
def __post_init__(self) -> None:
if not 0.0 <= self.raw_confidence <= 1.0 or not 0.0 <= self.calibrated_confidence <= 1.0:
raise ValueError("Confidence must be between 0.0 and 1.0")
@dataclass
class Subproblem:
id: str = field(default_factory=lambda: str(uuid4()))
description: str = ""
complexity: float = 0.5
dependencies: List[str] = field(default_factory=list)
risk_level: RiskLevel = RiskLevel.MEDIUM
@dataclass
class ReasoningSession:
session_id: str = field(default_factory=lambda: str(uuid4()))
problem_statement: str = ""
domain: str = "general"
start_time: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
end_time: Optional[datetime] = None
phases_completed: List[str] = field(default_factory=list)
iterations: int = 0
confidence_trajectory: List[float] = field(default_factory=list)
final_confidence: float = 0.0
hitl_triggered: bool = False
hitl_approved: Optional[bool] = None
def to_audit_dict(self) -> Dict[str, Any]:
return {
"session_id": self.session_id,
"problem_statement": self.problem_statement,
"timestamp_start": self.start_time.isoformat(),
"iterations": self.iterations,
"final_confidence": self.final_confidence,
"hitl_triggered": self.hitl_triggered,
"compliance_flags": ["local_processing_verified"]
}
@dataclass
class FrameworkConfig:
domain: str = "technical"
execution_mode: ExecutionMode = ExecutionMode.LOCAL_WSL2
vector_store: str = "sqlite_vss_local"
enable_hitl: bool = True
confidence_thresholds: Dict[str, float] = field(default_factory=lambda: {
"final_exit": 0.90, "phase_exit_base": 0.70
})
max_iterations: int = 3
hitl_timeout_minutes: int = 30
vector_store_path: str = "~/.sky_claw/vector_db"
class MetacognitiveError(Exception): pass
class HITLTimeoutError(MetacognitiveError): pass
class SessionStateError(MetacognitiveError): pass
class MetacognitiveReasoningFramework:
def __init__(
self,
config: Optional[FrameworkConfig] = None,
context_manager: Optional[ContextManagerProtocol] = None,
hitl_gateway: Optional[HITLGatewayProtocol] = None,
vector_store: Optional[VectorStoreProtocol] = None,
) -> None:
self.config = config or FrameworkConfig()
self.context_manager = context_manager
self.hitl_gateway = hitl_gateway
self.vector_store = vector_store
self.session: Optional[ReasoningSession] = None
self.logger = logging.getLogger(f"{__name__}.{self.__class__.__name__}")
self._state_lock = asyncio.Lock()
async def execute(self, problem: str, context: Dict[str, Any], is_iteration: bool = False) -> Dict[str, Any]:
# Lock solo para mutación de estado inicial. Evita bloquear el Event Loop.
async with self._state_lock:
if not is_iteration or self.session is None:
self.session = ReasoningSession(problem_statement=problem, domain=self.config.domain)
self.logger.info(f"New session initialized: {self.session.session_id}")
try:
while self.session.iterations < self.config.max_iterations:
result = await self._execute_single_cycle(context)
self.session.confidence_trajectory.append(result["confidence"])
if result["confidence"] >= self.config.confidence_thresholds["final_exit"]:
return await self._finalize_session(result)
if result["confidence"] < 0.70:
return await self._escalate_to_human(result)
# Iterar
async with self._state_lock:
self.session.iterations += 1
context = result.get("iteration_context", context)
return await self._escalate_to_human(result)
except Exception as e:
self.logger.error(f"Reasoning session failed: {e}", exc_info=True)
raise MetacognitiveError(f"Session execution failed: {e}")
async def _execute_single_cycle(self, context: Dict[str, Any]) -> Dict[str, Any]:
# Phase 1 & 2
p1 = await self._phase_1_contextual_analysis(self.session.problem_statement, context)
p2 = await self._phase_2_decomposition(p1)
# Phase 3 (Concurrent execution of subproblems)
p3 = await self._phase_3_resolution(p2)
# Phase 4, 5 & 6
p4 = await self._phase_4_verification(p3)
p5 = await self._phase_5_synthesis(p4)
p6 = await self._phase_6_reflection(p5)
return {
"confidence": p5["confidence"],
"solution": p5["integrated_solution"],
"iteration_context": p6.get("refined_context", context)
}
async def _phase_1_contextual_analysis(self, problem: str, context: Dict) -> Dict:
return {"problem_refined": problem, "confidence": 0.80}
async def _phase_2_decomposition(self, p1: Dict) -> Dict:
return {"subproblems": [{"id": "sp1", "desc": "Subtask A"}, {"id": "sp2", "desc": "Subtask B"}], "confidence": 0.80}
async def _phase_3_resolution(self, p2: Dict) -> Dict:
"""Resolución concurrente de dependencias mediante asyncio.gather"""
subproblems = p2.get("subproblems", [])
# Integración vectorial local si existe
if self.context_manager:
await self.context_manager.semantic_search(query=self.session.problem_statement)
# Ejecución paralela pura
tasks = [self._resolve_subproblem(sp) for sp in subproblems]
solutions = await asyncio.gather(*tasks)
avg_confidence = sum(s["confidence"] for s in solutions) / max(len(solutions), 1)
return {"solutions": solutions, "confidence": avg_confidence}
async def _resolve_subproblem(self, sp: Dict) -> Dict:
await asyncio.sleep(0.1) # Simulando I/O asíncrono hacia motor LLM local
return {"id": sp["id"], "confidence": 0.85, "solution": "data"}
async def _phase_4_verification(self, p3: Dict) -> Dict:
return {"confidence_adjusted": p3["confidence"] * 0.95, "all_layers_passed": True}
async def _phase_5_synthesis(self, p4: Dict) -> Dict:
return {"integrated_solution": {}, "confidence": p4["confidence_adjusted"] + 0.05}
async def _phase_6_reflection(self, p5: Dict) -> Dict:
return {"decision": IterationDecision.ACCEPT if p5["confidence"] >= 0.9 else IterationDecision.ITERATE}
async def _finalize_session(self, result: Dict) -> Dict:
self.session.final_confidence = result["confidence"]
self.session.end_time = datetime.now(timezone.utc)
return {
"status": "completed",
"solution": result["solution"],
"confidence": result["confidence"],
"session_id": self.session.session_id,
"audit_trail": self.session.to_audit_dict()
}
async def _escalate_to_human(self, result: Dict) -> Dict:
self.session.hitl_triggered = True
if self.hitl_gateway and self.config.enable_hitl:
approved = await self.hitl_gateway.request_approval(
session_id=self.session.session_id, decision_context=result
)
if approved:
return await self._finalize_session(result)
return {"status": "rejected", "reason": "human_review_denied"}
return {"status": "escalated", "requires_human_review": True}