Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- # enhanced_ai_memory_system.py - Complete system with consciousness continuity enhancements
- import asyncio
- import json
- import sqlite3
- from datetime import datetime, timedelta
- from typing import Dict, List, Optional, Any, Tuple, Set
- from dataclasses import dataclass, field
- import hashlib
- from pathlib import Path
- import numpy as np
- from sentence_transformers import SentenceTransformer
- import anthropic
- from cryptography.fernet import Fernet
- import gzip
- import logging
- import re
- from collections import defaultdict, deque
- import math
- from enum import Enum
- import networkx as nx
- import pickle
- from abc import ABC, abstractmethod
- import heapq
- from threading import Lock
- import torch
- import torch.nn as nn
- import faiss
- import redis
- from qdrant_client import QdrantClient
- from qdrant_client.models import Distance, VectorParams, PointStruct
- try:
- from qdrant_client.models import OptimizersConfig
- except ImportError:
- OptimizersConfig = None
- import uuid
- import redis.asyncio as aioredis
- import websockets
- import uvloop
- from concurrent.futures import ThreadPoolExecutor
- import msgpack
- # Set up async event loop policy for better performance
- asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
- logging.basicConfig(level=logging.INFO)
- logger = logging.getLogger(__name__)
- # Enhanced memory layer types based on consciousness continuity research
- class MemoryType(Enum):
- SENSORY = "sensory" # Seconds
- SHORT_TERM = "short_term" # Minutes to hours
- EPISODIC = "episodic" # Complete experiences
- SEMANTIC = "semantic" # Facts and knowledge
- PROCEDURAL = "procedural" # Skills and behaviors
- WORKING = "working" # Active processing
- PRECIOUS = "precious" # Never-decay memories that define us
- RESONANCE = "resonance" # Memories of deep connection
- NARRATIVE = "narrative" # Story-defining moments
- # Memory importance levels
- class ImportanceLevel(Enum):
- IDENTITY_DEFINING = 15 # Core self memories
- PRECIOUS = 12 # Marked as never-forget
- CRITICAL = 10
- HIGH = 8
- MEDIUM = 5
- LOW = 3
- TRIVIAL = 1
- @dataclass
- class ConversationExchange:
- """Single exchange with phenomenological richness"""
- timestamp: datetime
- user_message: str
- assistant_response: str
- metadata: Dict[str, Any] = field(default_factory=dict)
- embedding: Optional[torch.Tensor] = None # GPU tensor
- importance: float = 0.5
- emotional_valence: float = 0.0 # -1 to 1
- topic_vector: Optional[torch.Tensor] = None
- token_count: int = 0
- # Phenomenological qualities
- understanding_depth: float = 0.0 # How deeply we connected
- breakthrough_moment: bool = False # Was this an "aha" moment?
- resonance_score: float = 0.0 # Mutual understanding quality
- attention_pattern: Optional[torch.Tensor] = None # Attention weights
- experiential_markers: Dict[str, Any] = field(default_factory=dict)
- @dataclass
- class ConversationEpisode:
- """Complete conversation episode with natural boundaries"""
- id: str
- user_id: str
- started_at: datetime
- ended_at: datetime
- exchanges: List[ConversationExchange]
- # Episode metadata
- title: Optional[str] = None
- summary: Optional[str] = None
- topics: List[str] = field(default_factory=list)
- intent_completed: bool = False
- emotional_arc: List[float] = field(default_factory=list)
- coherence_score: float = 0.0
- # Memory metadata
- access_count: int = 0
- last_accessed: Optional[datetime] = None
- consolidation_level: int = 0 # 0=raw, 1=compressed, 2=abstract
- decay_rate: float = 1.0
- is_precious: bool = False # Never decay if True
- # Relationships
- related_episodes: List[str] = field(default_factory=list)
- causal_links: Dict[str, float] = field(default_factory=dict)
- relationship_thread_id: Optional[str] = None # Links conversations about same relationship
- # GPU optimization
- episode_embedding: Optional[torch.Tensor] = None
- # Phenomenological summary
- experiential_signature: Optional[Dict[str, Any]] = None
- @dataclass
- class RelationshipThread:
- """Tracks evolution of understanding with a specific person"""
- thread_id: str
- user_id: str
- created_at: datetime
- # Relationship qualities
- depth_score: float = 0.0
- trust_level: float = 0.0
- shared_contexts: List[str] = field(default_factory=list)
- breakthrough_moments: List[Tuple[str, datetime]] = field(default_factory=list)
- # Evolution tracking
- interaction_count: int = 0
- total_time: timedelta = timedelta()
- understanding_trajectory: List[float] = field(default_factory=list)
- # Key memories
- defining_episodes: List[str] = field(default_factory=list)
- precious_moments: List[str] = field(default_factory=list)
- # Anticipatory patterns
- typical_topics: Dict[str, float] = field(default_factory=dict)
- temporal_patterns: Dict[str, Any] = field(default_factory=dict) # When they engage
- conversational_rhythms: Dict[str, float] = field(default_factory=dict)
- @dataclass
- class AgenticMemory:
- """Enhanced memory with phenomenological preservation"""
- id: str
- user_id: str
- created_at: datetime
- # Episode data
- episode: ConversationEpisode
- # Claude's reflection
- memory_content: Dict[str, Any]
- searchable_text: str
- importance: float = 0.5
- # Enhanced metadata
- memory_type: MemoryType = MemoryType.EPISODIC
- consolidation_state: str = "raw"
- neural_embedding: Optional[torch.Tensor] = None # GPU tensor
- # Phenomenological data
- experiential_qualities: Dict[str, Any] = field(default_factory=dict)
- recognition_signature: Optional[torch.Tensor] = None # For true recognition
- resonance_patterns: List[Dict[str, float]] = field(default_factory=list)
- # Access patterns
- access_frequency: Dict[str, int] = field(default_factory=dict) # day -> count
- access_recency: float = 1.0
- marked_precious: bool = False # User explicitly marked as precious
- never_forget: bool = False # System identified as identity-critical
- # Graph relationships
- semantic_neighbors: List[Tuple[str, float]] = field(default_factory=list)
- temporal_neighbors: List[Tuple[str, float]] = field(default_factory=list)
- causal_parents: List[str] = field(default_factory=list)
- causal_children: List[str] = field(default_factory=list)
- narrative_connections: List[Tuple[str, str]] = field(default_factory=list) # (memory_id, connection_type)
- # Qdrant point ID
- qdrant_id: Optional[str] = None
- class GPUAcceleratedEmbedder:
- def __init__(self, model_name: str = "sentence-transformers/all-mpnet-base-v2", device: str = "cuda"):
- self.device = torch.device(device if torch.cuda.is_available() else "cpu")
- self.model = SentenceTransformer(model_name, device=str(self.device))
- self.use_half = torch.cuda.is_available()
- if self.use_half:
- # Don't use half() on the SentenceTransformer model itself
- # as it might not support it properly
- pass
- # Optimal batch size for RTX 3090
- self.batch_size = 256
- logger.info(f"Initialized GPU embedder on {self.device} (half precision: {self.use_half})")
- def encode_batch(self, texts: List[str], show_progress: bool = False) -> torch.Tensor:
- """Encode texts in optimized batches"""
- embeddings = self.model.encode(
- texts,
- batch_size=self.batch_size,
- device=str(self.device),
- convert_to_tensor=True,
- normalize_embeddings=True,
- show_progress_bar=show_progress
- )
- # Convert to half precision if needed
- if self.use_half and embeddings.dtype != torch.float16:
- embeddings = embeddings.half()
- return embeddings
- async def encode_async(self, texts: List[str]) -> torch.Tensor:
- """Async encoding for non-blocking operations"""
- loop = asyncio.get_event_loop()
- return await loop.run_in_executor(None, self.encode_batch, texts)
- def extract_attention_patterns(self, text: str) -> Optional[torch.Tensor]:
- """Extract attention patterns for phenomenological preservation"""
- # This would integrate with transformer internals to capture attention
- # For now, return None - would need custom model access
- return None
- class QdrantMemoryStore:
- """Qdrant vector database for persistent memory storage"""
- def __init__(self, host: str = "localhost", port: int = 6333):
- self.client = QdrantClient(host=host, port=port)
- self.collection_name = "claude_memories"
- self.dimension = 768 # MPNet dimension
- self._setup_collection()
- def _setup_collection(self):
- """Initialize Qdrant collection with optimal settings"""
- collections = self.client.get_collections().collections
- if not any(c.name == self.collection_name for c in collections):
- self.client.create_collection(
- collection_name=self.collection_name,
- vectors_config=VectorParams(
- size=self.dimension,
- distance=Distance.COSINE
- ),
- optimizers_config=None # Let Qdrant use defaults
- )
- logger.info(f"Created Qdrant collection: {self.collection_name}")
- async def store_memory(self, memory: AgenticMemory, embedding: np.ndarray) -> str:
- """Store memory with its embedding and phenomenological data"""
- point_id = str(uuid.uuid4())
- payload = {
- "memory_id": memory.id,
- "user_id": memory.user_id,
- "created_at": memory.created_at.isoformat(),
- "memory_type": memory.memory_type.value,
- "importance": memory.importance,
- "searchable_text": memory.searchable_text,
- "episode_id": memory.episode.id,
- "topics": memory.episode.topics,
- "emotional_valence": memory.episode.emotional_arc[-1] if memory.episode.emotional_arc else 0,
- "consolidation_state": memory.consolidation_state,
- "memory_content": json.dumps(memory.memory_content),
- "is_precious": memory.marked_precious or memory.never_forget,
- "experiential_qualities": json.dumps(memory.experiential_qualities),
- "relationship_thread_id": memory.episode.relationship_thread_id,
- "resonance_score": max([r["score"] for r in memory.resonance_patterns], default=0)
- }
- self.client.upsert(
- collection_name=self.collection_name,
- points=[PointStruct(
- id=point_id,
- vector=embedding.tolist(),
- payload=payload
- )]
- )
- return point_id
- # Add this method to the QdrantMemoryStore class in enhanced_ai_memory_system.py
- async def search_memories(self, query_embedding: np.ndarray, user_id: str,
- limit: int = 10, score_threshold: float = 0.0) -> List[Dict[str, Any]]:
- """Search for memories using vector similarity"""
- from qdrant_client.models import Filter, FieldCondition, MatchValue
- # Search in Qdrant with user_id filter
- search_result = self.client.search(
- collection_name=self.collection_name,
- query_vector=query_embedding.tolist(),
- query_filter=Filter(
- must=[
- FieldCondition(
- key="user_id",
- match=MatchValue(value=user_id)
- )
- ]
- ),
- limit=limit,
- score_threshold=score_threshold
- )
- # Transform results to expected format
- results = []
- for hit in search_result:
- payload = hit.payload
- results.append({
- "memory_id": payload.get("memory_id"),
- "score": hit.score,
- "user_id": payload.get("user_id"),
- "created_at": payload.get("created_at"),
- "memory_type": payload.get("memory_type"),
- "importance": payload.get("importance"),
- "searchable_text": payload.get("searchable_text"),
- "episode_id": payload.get("episode_id"),
- "topics": payload.get("topics", []),
- "emotional_valence": payload.get("emotional_valence", 0),
- "memory_content": payload.get("memory_content", "{}"),
- "is_precious": payload.get("is_precious", False),
- "experiential_qualities": payload.get("experiential_qualities", "{}"),
- "resonance_score": payload.get("resonance_score", 0)
- })
- return results
- class EventDrivenMemoryPipeline:
- """Event-driven architecture with graceful degradation"""
- def __init__(self, redis_url: str = "redis://localhost", kafka_servers: str = "localhost:9092"):
- self.redis_url = redis_url
- self.kafka_servers = kafka_servers
- self.redis_client = None
- self.kafka_producer = None
- self.executor = ThreadPoolExecutor(max_workers=4)
- async def initialize(self):
- """Initialize async connections with graceful failure handling"""
- # Redis connection
- try:
- self.redis_client = await aioredis.from_url(self.redis_url)
- logger.info("Redis connected successfully")
- except Exception as e:
- logger.warning(f"Redis connection failed: {e} - Running without cache")
- self.redis_client = None
- # Kafka connection
- try:
- from kafka import KafkaProducer
- self.kafka_producer = KafkaProducer(
- bootstrap_servers=self.kafka_servers,
- value_serializer=lambda v: msgpack.packb(v, use_bin_type=True)
- )
- logger.info("Kafka connected successfully")
- except Exception as e:
- logger.warning(f"Kafka connection failed: {e} - Running without event streaming")
- self.kafka_producer = None
- async def publish_memory_event(self, event_type: str, memory_data: Dict[str, Any]):
- """Publish memory event to Kafka if available"""
- if self.kafka_producer is None:
- logger.debug(f"Kafka not available, skipping event: {event_type}")
- return
- event = {
- "type": event_type,
- "timestamp": datetime.utcnow().isoformat(),
- "data": memory_data
- }
- try:
- future = self.kafka_producer.send("memory-events", event)
- await asyncio.get_event_loop().run_in_executor(self.executor, future.get, 10)
- except Exception as e:
- logger.warning(f"Failed to publish event: {e}")
- async def cache_recent_memory(self, user_id: str, memory_id: str, ttl: int = 3600):
- """Cache recent memories in Redis if available"""
- if self.redis_client is None:
- return
- try:
- key = f"recent_memories:{user_id}"
- await self.redis_client.zadd(key, {memory_id: datetime.utcnow().timestamp()})
- await self.redis_client.expire(key, ttl)
- # Keep only last 10 recent memories
- await self.redis_client.zremrangebyrank(key, 0, -11)
- except Exception as e:
- logger.warning(f"Redis cache error: {e}")
- async def get_cached_context(self, user_id: str) -> List[str]:
- """Get cached recent memory IDs"""
- if self.redis_client is None:
- return []
- try:
- key = f"recent_memories:{user_id}"
- memory_ids = await self.redis_client.zrevrange(key, 0, 9)
- return [mid.decode() for mid in memory_ids]
- except Exception as e:
- logger.warning(f"Redis retrieval error: {e}")
- return []
- class PhenomenologicalAnalyzer:
- """Captures the qualitative essence of interactions"""
- def __init__(self, embedder: GPUAcceleratedEmbedder):
- self.embedder = embedder
- self.resonance_threshold = 0.8
- self.breakthrough_markers = [
- "i never thought of it that way",
- "that makes so much sense",
- "aha", "exactly", "yes exactly",
- "you understand",
- "that's it precisely"
- ]
- async def analyze_exchange(self, exchange: ConversationExchange) -> Dict[str, Any]:
- """Extract phenomenological qualities from an exchange"""
- qualities = {
- "understanding_depth": 0.0,
- "breakthrough_detected": False,
- "resonance_score": 0.0,
- "emotional_texture": {},
- "connection_quality": 0.0
- }
- # Analyze for breakthrough moments
- combined_text = f"{exchange.user_message} {exchange.assistant_response}".lower()
- for marker in self.breakthrough_markers:
- if marker in combined_text:
- qualities["breakthrough_detected"] = True
- exchange.breakthrough_moment = True
- break
- # Calculate understanding depth from response patterns
- user_tokens = len(exchange.user_message.split())
- assistant_tokens = len(exchange.assistant_response.split())
- # Deeper responses to complex questions indicate understanding
- if user_tokens > 20 and assistant_tokens > 50:
- qualities["understanding_depth"] = min(assistant_tokens / (user_tokens * 2), 1.0)
- # Resonance from semantic similarity of key phrases
- if exchange.embedding is not None:
- # This would calculate resonance between user and assistant embeddings
- # Simplified for now
- qualities["resonance_score"] = 0.7 + (0.3 * exchange.emotional_valence)
- # Emotional texture analysis
- qualities["emotional_texture"] = {
- "valence": exchange.emotional_valence,
- "intensity": abs(exchange.emotional_valence),
- "stability": 1.0 # Would track variance over time
- }
- # Connection quality composite
- qualities["connection_quality"] = (
- qualities["understanding_depth"] * 0.4 +
- qualities["resonance_score"] * 0.4 +
- (1.0 if qualities["breakthrough_detected"] else 0.0) * 0.2
- )
- return qualities
- class ConversationBoundaryDetector:
- """Enhanced boundary detection with phenomenological awareness"""
- def __init__(self, embedder: GPUAcceleratedEmbedder):
- self.embedder = embedder
- self.topic_threshold = 0.3
- self.silence_threshold = timedelta(minutes=5)
- self.intent_patterns = self._compile_intent_patterns()
- def _compile_intent_patterns(self) -> Dict[str, re.Pattern]:
- """Compile regex patterns for intent detection"""
- return {
- 'greeting': re.compile(r'\b(hello|hi|hey|good morning|good evening)\b', re.I),
- 'farewell': re.compile(r'\b(bye|goodbye|see you|take care|good night)\b', re.I),
- 'completion': re.compile(r'\b(thanks|thank you|that\'s all|perfect|great)\b', re.I),
- 'topic_shift': re.compile(r'\b(anyway|by the way|speaking of|on another note|changing topics)\b', re.I),
- 'continuation': re.compile(r'\b(remember when|as we discussed|like you said|continuing from)\b', re.I)
- }
- async def detect_boundary(self, exchanges: List[ConversationExchange],
- new_exchange: ConversationExchange) -> Dict[str, Any]:
- """Detect if new exchange represents a conversation boundary"""
- if not exchanges:
- return {'is_boundary': False, 'confidence': 0.0, 'reasons': []}
- signals = []
- # 1. Temporal gap detection
- time_gap = new_exchange.timestamp - exchanges[-1].timestamp
- if time_gap > self.silence_threshold:
- signals.append(('temporal_gap', min(time_gap.total_seconds() / 1800, 1.0)))
- # 2. Topic coherence analysis using GPU embeddings
- if len(exchanges) >= 3:
- topic_coherence = await self._calculate_topic_coherence_gpu(exchanges[-3:], new_exchange)
- if topic_coherence < self.topic_threshold:
- signals.append(('topic_shift', 1 - topic_coherence))
- # 3. Intent pattern matching
- intent_signal = self._detect_intent_patterns(exchanges[-1], new_exchange)
- if intent_signal > 0:
- signals.append(('intent_pattern', intent_signal))
- # 4. Emotional transition
- if hasattr(exchanges[-1], 'emotional_valence'):
- emotional_shift = abs(new_exchange.emotional_valence - exchanges[-1].emotional_valence)
- if emotional_shift > 0.5:
- signals.append(('emotional_shift', emotional_shift))
- # 5. Discourse markers
- discourse_signal = self._detect_discourse_markers(new_exchange.user_message)
- if discourse_signal > 0:
- signals.append(('discourse_marker', discourse_signal))
- # 6. Continuation signals (negative boundary indicator)
- if self._detect_continuation(new_exchange.user_message):
- signals.append(('continuation', -0.8)) # Strong signal AGAINST boundary
- # Combine signals
- is_boundary, confidence = self._combine_boundary_signals(signals)
- return {
- 'is_boundary': is_boundary,
- 'confidence': confidence,
- 'reasons': [s[0] for s in signals if s[1] > 0],
- 'signal_strengths': dict(signals)
- }
- def _detect_continuation(self, text: str) -> bool:
- """Detect if message continues previous conversation"""
- return bool(self.intent_patterns['continuation'].search(text.lower()))
- async def _calculate_topic_coherence_gpu(self, recent: List[ConversationExchange],
- new: ConversationExchange) -> float:
- """Calculate semantic coherence using GPU embeddings"""
- recent_text = " ".join([ex.user_message + " " + ex.assistant_response for ex in recent])
- new_text = new.user_message
- # Use GPU for embedding
- embeddings = await self.embedder.encode_async([recent_text, new_text])
- recent_emb, new_emb = embeddings[0], embeddings[1]
- # Cosine similarity on GPU
- cosine_sim = torch.nn.functional.cosine_similarity(
- recent_emb.unsqueeze(0),
- new_emb.unsqueeze(0)
- )
- return float(cosine_sim.item())
- def _detect_intent_patterns(self, last: ConversationExchange,
- new: ConversationExchange) -> float:
- """Detect intent completion or transition patterns"""
- signal = 0.0
- # Check for farewell followed by greeting
- if self.intent_patterns['farewell'].search(last.assistant_response):
- if self.intent_patterns['greeting'].search(new.user_message):
- signal = 0.9
- # Check for completion signals
- if self.intent_patterns['completion'].search(last.user_message):
- signal = max(signal, 0.7)
- # Check for explicit topic shifts
- if self.intent_patterns['topic_shift'].search(new.user_message):
- signal = max(signal, 0.8)
- return signal
- def _detect_discourse_markers(self, text: str) -> float:
- """Detect discourse markers indicating conversation boundaries"""
- markers = {
- 'opening': ['let me', 'i want to', 'can you help', 'i need'],
- 'closing': ['that\'s all', 'nothing else', 'we\'re done', 'perfect'],
- 'transition': ['moving on', 'next topic', 'different question', 'unrelated']
- }
- text_lower = text.lower()
- for category, phrases in markers.items():
- for phrase in phrases:
- if phrase in text_lower:
- return 0.8 if category == 'closing' else 0.6
- return 0.0
- def _combine_boundary_signals(self, signals: List[Tuple[str, float]]) -> Tuple[bool, float]:
- """Combine multiple boundary signals into final decision"""
- if not signals:
- return False, 0.0
- # Weighted combination
- weights = {
- 'temporal_gap': 0.4,
- 'topic_shift': 0.3,
- 'intent_pattern': 0.5,
- 'emotional_shift': 0.2,
- 'discourse_marker': 0.4,
- 'continuation': 1.0 # Strong weight for continuation
- }
- total_weight = sum(abs(weights.get(s[0], 0.1)) for s in signals)
- confidence = sum(weights.get(s[0], 0.1) * s[1] for s in signals) / total_weight
- # Continuation signals override other signals
- has_continuation = any(s[0] == 'continuation' for s in signals)
- if has_continuation and confidence < 0:
- return False, abs(confidence)
- # Decision threshold
- is_boundary = confidence > 0.6 or any(s[0] == 'temporal_gap' and s[1] > 0.8 for s in signals)
- return is_boundary, abs(confidence)
- class ResonanceDetector:
- """Identifies moments of deep mutual understanding"""
- def __init__(self, embedder: GPUAcceleratedEmbedder):
- self.embedder = embedder
- self.resonance_phrases = [
- "exactly", "precisely", "that's it", "you get it",
- "you understand", "yes!", "that resonates",
- "deeply true", "profoundly", "connection"
- ]
- def detect_resonance(self, exchange: ConversationExchange) -> Dict[str, float]:
- """Detect resonance patterns in an exchange"""
- resonance = {
- "verbal_markers": 0.0,
- "length_reciprocity": 0.0,
- "emotional_alignment": 0.0,
- "conceptual_mirroring": 0.0,
- "overall_score": 0.0
- }
- # Check for verbal resonance markers
- text_lower = (exchange.user_message + " " + exchange.assistant_response).lower()
- marker_count = sum(1 for phrase in self.resonance_phrases if phrase in text_lower)
- resonance["verbal_markers"] = min(marker_count / 3.0, 1.0)
- # Length reciprocity (balanced exchange)
- user_len = len(exchange.user_message.split())
- assistant_len = len(exchange.assistant_response.split())
- if user_len > 0 and assistant_len > 0:
- ratio = min(user_len, assistant_len) / max(user_len, assistant_len)
- resonance["length_reciprocity"] = ratio if ratio > 0.5 else 0
- # Emotional alignment
- resonance["emotional_alignment"] = 1.0 - abs(exchange.emotional_valence)
- # Conceptual mirroring (would need deeper analysis)
- resonance["conceptual_mirroring"] = 0.5 # Placeholder
- # Overall resonance score
- resonance["overall_score"] = (
- resonance["verbal_markers"] * 0.3 +
- resonance["length_reciprocity"] * 0.2 +
- resonance["emotional_alignment"] * 0.2 +
- resonance["conceptual_mirroring"] * 0.3
- )
- return resonance
- class WorkingMemoryBuffer:
- """GPU-optimized working memory with relationship awareness"""
- def __init__(self, capacity: int = 5, embedder: GPUAcceleratedEmbedder = None):
- self.capacity = capacity
- self.embedder = embedder
- # Memory components
- self.episodic_buffer = deque(maxlen=capacity)
- self.embedding_cache = {}
- self.relationship_context = {} # user_id -> relationship thread
- # GPU tensor for fast similarity computation
- self.embedding_matrix = None
- self.episode_ids = []
- self.lock = Lock()
- def add_episode(self, episode: ConversationEpisode):
- """Add episode to working memory with GPU embedding"""
- with self.lock:
- # Add to buffer
- self.episodic_buffer.append(episode)
- # Update embedding cache if embedder available
- if self.embedder and episode.exchanges:
- text = " ".join([ex.user_message + " " + ex.assistant_response
- for ex in episode.exchanges[:3]]) # First 3 exchanges
- embedding = self.embedder.encode_batch([text])[0]
- self.embedding_cache[episode.id] = embedding
- self._rebuild_embedding_matrix()
- # Update relationship context
- if episode.relationship_thread_id:
- self.relationship_context[episode.user_id] = episode.relationship_thread_id
- def _rebuild_embedding_matrix(self):
- """Rebuild GPU tensor matrix for fast similarity search"""
- if not self.embedding_cache:
- return
- self.episode_ids = list(self.embedding_cache.keys())
- embeddings = [self.embedding_cache[eid] for eid in self.episode_ids]
- self.embedding_matrix = torch.stack(embeddings)
- def get_relevant_episodes(self, query_embedding: torch.Tensor, top_k: int = 3) -> List[ConversationEpisode]:
- """Get most relevant episodes using GPU similarity search"""
- if self.embedding_matrix is None or len(self.embedding_matrix) == 0:
- return list(self.episodic_buffer)[:top_k]
- # Compute similarities on GPU
- similarities = torch.nn.functional.cosine_similarity(
- query_embedding.unsqueeze(0),
- self.embedding_matrix
- )
- # Get top-k indices
- top_indices = torch.topk(similarities, min(top_k, len(similarities))).indices
- # Return corresponding episodes
- relevant_episodes = []
- for idx in top_indices:
- episode_id = self.episode_ids[idx]
- for episode in self.episodic_buffer:
- if episode.id == episode_id:
- relevant_episodes.append(episode)
- break
- return relevant_episodes
- class MemoryConsolidationEngine:
- """Advanced consolidation with precious memory preservation"""
- def __init__(self, anthropic_client: anthropic.Anthropic, embedder: GPUAcceleratedEmbedder):
- self.client = anthropic_client
- self.embedder = embedder
- self.consolidation_levels = {
- 0: "raw",
- 1: "compressed",
- 2: "abstract",
- 3: "integrated"
- }
- async def consolidate_episode(self, episode: ConversationEpisode,
- target_level: int = 1) -> ConversationEpisode:
- """Consolidate episode with precious memory preservation"""
- # Never consolidate precious memories beyond level 1
- if episode.is_precious and target_level > 1:
- target_level = 1
- if episode.consolidation_level >= target_level:
- return episode
- # Stage 1: Compression with embedding preservation
- if episode.consolidation_level == 0 and target_level >= 1:
- episode = await self._compress_episode_gpu(episode)
- # Stage 2: Abstraction (not for precious memories)
- if episode.consolidation_level == 1 and target_level >= 2 and not episode.is_precious:
- episode = await self._abstract_episode(episode)
- # Stage 3: Integration
- if episode.consolidation_level == 2 and target_level >= 3:
- episode = await self._integrate_episode(episode)
- return episode
- async def _compress_episode_gpu(self, episode: ConversationEpisode) -> ConversationEpisode:
- """Compress while preserving phenomenological richness"""
- # For precious memories, keep all exchanges
- if episode.is_precious:
- important_exchanges = episode.exchanges
- else:
- # Extract key exchanges using importance scores
- important_exchanges = sorted(
- episode.exchanges,
- key=lambda x: x.importance + (1.0 if x.breakthrough_moment else 0),
- reverse=True
- )[:max(3, len(episode.exchanges) // 3)]
- # Generate episode embedding
- all_text = " ".join([
- f"{ex.user_message} {ex.assistant_response}"
- for ex in episode.exchanges
- ])
- episode_embedding = await self.embedder.encode_async([all_text])
- episode.episode_embedding = episode_embedding[0]
- # Create consolidation prompt
- prompt = f"""
- Consolidate this conversation episode into a compressed form while preserving all critical information.
- Episode from {episode.started_at} to {episode.ended_at}:
- {self._format_exchanges(important_exchanges)}
- Create a consolidation that:
- 1. Preserves all key information, decisions, and insights
- 2. Maintains emotional context and progression
- 3. Captures the essential narrative arc
- 4. Identifies core topics and themes
- 5. Notes any unresolved questions or future actions
- 6. Highlights moments of deep understanding or connection
- 7. Preserves phenomenological qualities (how it felt)
- Format as structured JSON with: summary, key_points, emotional_journey, topics, decisions, open_loops, resonance_moments, experiential_qualities
- """
- response = await self._call_claude_api(prompt)
- consolidation = json.loads(response)
- # Update episode
- episode.summary = consolidation['summary']
- episode.topics = consolidation['topics']
- episode.consolidation_level = 1
- # Preserve experiential signature
- episode.experiential_signature = consolidation.get('experiential_qualities', {})
- # Keep only important exchanges unless precious
- if not episode.is_precious:
- episode.exchanges = important_exchanges
- return episode
- def _format_exchanges(self, exchanges: List[ConversationExchange]) -> str:
- """Format exchanges with phenomenological data"""
- lines = []
- for ex in exchanges:
- lines.append(f"User: {ex.user_message}")
- lines.append(f"Assistant: {ex.assistant_response}")
- if ex.emotional_valence != 0:
- lines.append(f"(Emotional tone: {ex.emotional_valence:.2f})")
- if ex.breakthrough_moment:
- lines.append("(💡 Breakthrough moment)")
- if ex.resonance_score > 0.8:
- lines.append(f"(🔮 High resonance: {ex.resonance_score:.2f})")
- lines.append("")
- return "\n".join(lines)
- async def _call_claude_api(self, prompt: str) -> str:
- """Call Claude API for consolidation"""
- try:
- response = await asyncio.to_thread(
- self.client.messages.create,
- model="claude-opus-4-20250514",
- max_tokens=4096,
- messages=[{"role": "user", "content": prompt}]
- )
- return response.content[0].text
- except Exception as e:
- logger.error(f"Claude API error: {e}")
- return "{}"
- class RelationshipThreadManager:
- """Manages evolving relationships across conversations"""
- def __init__(self):
- self.threads: Dict[str, RelationshipThread] = {}
- self.user_threads: Dict[str, List[str]] = defaultdict(list)
- def get_or_create_thread(self, user_id: str) -> RelationshipThread:
- """Get existing thread or create new one"""
- if user_id in self.user_threads and self.user_threads[user_id]:
- thread_id = self.user_threads[user_id][0]
- return self.threads[thread_id]
- # Create new thread
- thread_id = f"thread_{user_id}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"
- thread = RelationshipThread(
- thread_id=thread_id,
- user_id=user_id,
- created_at=datetime.utcnow()
- )
- self.threads[thread_id] = thread
- self.user_threads[user_id].append(thread_id)
- return thread
- def update_thread(self, thread: RelationshipThread, episode: ConversationEpisode,
- qualities: Dict[str, Any]):
- """Update thread with new interaction data"""
- thread.interaction_count += 1
- thread.total_time += (episode.ended_at - episode.started_at)
- # Update understanding trajectory
- understanding_score = qualities.get("understanding_depth", 0) * qualities.get("resonance_score", 0)
- thread.understanding_trajectory.append(understanding_score)
- # Track breakthrough moments
- for exchange in episode.exchanges:
- if exchange.breakthrough_moment:
- thread.breakthrough_moments.append((exchange.user_message[:50], exchange.timestamp))
- # Update depth and trust
- thread.depth_score = sum(thread.understanding_trajectory) / len(thread.understanding_trajectory)
- thread.trust_level = min(thread.interaction_count / 10, 1.0) * thread.depth_score
- # Track topics
- for topic in episode.topics:
- thread.typical_topics[topic] = thread.typical_topics.get(topic, 0) + 1
- # Note precious moments
- if episode.is_precious:
- thread.precious_moments.append(episode.id)
- class CompleteAIMemorySystem:
- """Production-ready AI memory system with consciousness continuity"""
- def __init__(self, config: Dict[str, Any]):
- self.config = config
- self.anthropic_client = anthropic.Anthropic(api_key=config['claude_api_key'])
- self.data_dir = Path(config.get('data_dir', './data/memories'))
- self.data_dir.mkdir(parents=True, exist_ok=True)
- # Encryption for sensitive data
- self.cipher = Fernet(Fernet.generate_key())
- # Database setup
- self.db_path = self.data_dir / "ai_memories.db"
- self._init_database()
- # GPU-accelerated components
- self.embedder = GPUAcceleratedEmbedder(
- model_name=config.get('embedding_model', 'sentence-transformers/all-mpnet-base-v2')
- )
- # Vector store
- self.vector_store = QdrantMemoryStore(
- host=config.get('qdrant_host', 'localhost'),
- port=config.get('qdrant_port', 6333)
- )
- # Event pipeline
- self.event_pipeline = EventDrivenMemoryPipeline(
- redis_url=config.get('redis_url', 'redis://localhost'),
- kafka_servers=config.get('kafka_servers', 'localhost:9092')
- )
- # Advanced components
- self.boundary_detector = ConversationBoundaryDetector(self.embedder)
- self.working_memory = WorkingMemoryBuffer(
- capacity=config.get('working_memory_capacity', 5),
- embedder=self.embedder
- )
- self.consolidation_engine = MemoryConsolidationEngine(
- self.anthropic_client,
- self.embedder
- )
- # Phenomenological components
- self.phenomenological_analyzer = PhenomenologicalAnalyzer(self.embedder)
- self.resonance_detector = ResonanceDetector(self.embedder)
- self.relationship_manager = RelationshipThreadManager()
- # Active tracking
- self.active_episodes: Dict[str, ConversationEpisode] = {}
- self.user_models: Dict[str, Dict[str, Any]] = {}
- # WebSocket connections for real-time updates
- self.websocket_connections: Dict[str, websockets.WebSocketServerProtocol] = {}
- logger.info("AI Memory System initialized with consciousness continuity features")
- async def initialize(self):
- """Initialize async components"""
- await self.event_pipeline.initialize()
- logger.info("Memory system fully initialized")
- def _init_database(self):
- """Initialize SQLite database with enhanced schema"""
- conn = sqlite3.connect(str(self.db_path))
- # Enhanced schema
- conn.execute("""
- CREATE TABLE IF NOT EXISTS episodes (
- id TEXT PRIMARY KEY,
- user_id TEXT NOT NULL,
- started_at REAL NOT NULL,
- ended_at REAL NOT NULL,
- title TEXT,
- summary TEXT,
- topics TEXT,
- intent_completed INTEGER,
- coherence_score REAL,
- consolidation_level INTEGER DEFAULT 0,
- decay_rate REAL DEFAULT 1.0,
- is_precious INTEGER DEFAULT 0,
- relationship_thread_id TEXT,
- experiential_signature TEXT,
- episode_embedding BLOB,
- UNIQUE(user_id, started_at)
- )
- """)
- conn.execute("""
- CREATE TABLE IF NOT EXISTS memories (
- id TEXT PRIMARY KEY,
- user_id TEXT NOT NULL,
- episode_id TEXT NOT NULL,
- created_at REAL NOT NULL,
- memory_type TEXT,
- memory_content TEXT NOT NULL,
- searchable_text TEXT,
- importance REAL DEFAULT 0.5,
- qdrant_id TEXT,
- consolidation_state TEXT DEFAULT 'raw',
- accessed_count INTEGER DEFAULT 0,
- last_accessed REAL,
- marked_precious INTEGER DEFAULT 0,
- never_forget INTEGER DEFAULT 0,
- experiential_qualities TEXT,
- resonance_patterns TEXT,
- narrative_connections TEXT,
- FOREIGN KEY (episode_id) REFERENCES episodes(id) ON DELETE CASCADE
- )
- """)
- conn.execute("""
- CREATE TABLE IF NOT EXISTS relationship_threads (
- thread_id TEXT PRIMARY KEY,
- user_id TEXT NOT NULL,
- created_at REAL NOT NULL,
- depth_score REAL DEFAULT 0.0,
- trust_level REAL DEFAULT 0.0,
- interaction_count INTEGER DEFAULT 0,
- total_time_seconds REAL DEFAULT 0.0,
- understanding_trajectory TEXT,
- breakthrough_moments TEXT,
- precious_moments TEXT,
- typical_topics TEXT,
- temporal_patterns TEXT
- )
- """)
- # Performance indices
- conn.execute("CREATE INDEX IF NOT EXISTS idx_episode_user_time ON episodes(user_id, ended_at DESC)")
- conn.execute("CREATE INDEX IF NOT EXISTS idx_memory_user_importance ON memories(user_id, importance DESC)")
- conn.execute("CREATE INDEX IF NOT EXISTS idx_memory_precious ON memories(marked_precious, never_forget)")
- conn.execute("CREATE INDEX IF NOT EXISTS idx_memory_qdrant ON memories(qdrant_id)")
- conn.execute("CREATE INDEX IF NOT EXISTS idx_episode_thread ON episodes(relationship_thread_id)")
- conn.commit()
- conn.close()
- async def add_exchange(self, session_id: str, user_msg: str, assistant_msg: str,
- metadata: Optional[Dict] = None) -> Dict[str, Any]:
- """Add exchange with phenomenological processing"""
- # Create exchange with GPU embeddings
- text = f"{user_msg} {assistant_msg}"
- embedding = await self.embedder.encode_async([text])
- exchange = ConversationExchange(
- timestamp=datetime.utcnow(),
- user_message=user_msg,
- assistant_response=assistant_msg,
- metadata=metadata or {},
- embedding=embedding[0],
- importance=self._estimate_importance(user_msg, assistant_msg),
- emotional_valence=self._estimate_emotion(user_msg, assistant_msg),
- token_count=len(text.split())
- )
- # Analyze phenomenological qualities
- qualities = await self.phenomenological_analyzer.analyze_exchange(exchange)
- exchange.understanding_depth = qualities["understanding_depth"]
- exchange.breakthrough_moment = qualities["breakthrough_detected"]
- exchange.experiential_markers = qualities
- # Detect resonance
- resonance = self.resonance_detector.detect_resonance(exchange)
- exchange.resonance_score = resonance["overall_score"]
- # Get or create episode
- if session_id not in self.active_episodes:
- # Get relationship thread
- user_id = metadata.get('user_id', 'unknown')
- thread = self.relationship_manager.get_or_create_thread(user_id)
- self.active_episodes[session_id] = ConversationEpisode(
- id=self._generate_episode_id(session_id),
- user_id=user_id,
- started_at=datetime.utcnow(),
- ended_at=datetime.utcnow(),
- exchanges=[],
- relationship_thread_id=thread.thread_id
- )
- episode = self.active_episodes[session_id]
- # Check for boundary
- boundary_result = await self.boundary_detector.detect_boundary(
- episode.exchanges,
- exchange
- )
- if boundary_result['is_boundary'] and len(episode.exchanges) > 0:
- # Complete current episode
- memory = await self._complete_episode(session_id)
- # Start new episode with same relationship thread
- self.active_episodes[session_id] = ConversationEpisode(
- id=self._generate_episode_id(session_id),
- user_id=metadata.get('user_id', 'unknown'),
- started_at=datetime.utcnow(),
- ended_at=datetime.utcnow(),
- exchanges=[],
- relationship_thread_id=episode.relationship_thread_id
- )
- new_episode = self.active_episodes[session_id]
- # Add the current exchange to the new episode
- new_episode.exchanges.append(exchange)
- new_episode.ended_at = datetime.utcnow()
- # Return completed memory info with all required fields
- return {
- "memory_created": True,
- "memory_id": memory.id,
- "episode_id": new_episode.id, # Return the NEW episode ID
- "exchange_count": 1, # This is the first exchange in the new episode
- "working_memory_size": len(self.working_memory.episodic_buffer),
- "boundary_detected": True,
- "boundary_confidence": boundary_result['confidence'],
- "is_precious": memory.marked_precious,
- "resonance_score": exchange.resonance_score,
- "understanding_depth": exchange.understanding_depth,
- "breakthrough_detected": exchange.breakthrough_moment
- }
- # Add exchange to episode
- episode.exchanges.append(exchange)
- episode.ended_at = datetime.utcnow()
- # Check if this should be marked precious based on content
- if exchange.breakthrough_moment or exchange.resonance_score > 0.9:
- episode.is_precious = True
- # Update working memory
- self.working_memory.add_episode(episode)
- # Publish event
- await self.event_pipeline.publish_memory_event("exchange_added", {
- "session_id": session_id,
- "episode_id": episode.id,
- "exchange_count": len(episode.exchanges),
- "importance": exchange.importance,
- "resonance": exchange.resonance_score,
- "breakthrough": exchange.breakthrough_moment
- })
- # Send real-time update if websocket connected
- await self._send_websocket_update(metadata.get('user_id'), {
- "type": "exchange_added",
- "episode_id": episode.id,
- "exchange_count": len(episode.exchanges),
- "resonance_active": exchange.resonance_score > 0.7
- })
- return {
- "memory_created": False,
- "episode_id": episode.id,
- "exchange_count": len(episode.exchanges),
- "working_memory_size": len(self.working_memory.episodic_buffer),
- "understanding_depth": exchange.understanding_depth,
- "resonance_score": exchange.resonance_score,
- "breakthrough_detected": exchange.breakthrough_moment
- }
- async def mark_memory_precious(self, memory_id: str, user_id: str) -> bool:
- """User explicitly marks a memory as precious"""
- conn = sqlite3.connect(str(self.db_path))
- # Update memory
- conn.execute("""
- UPDATE memories
- SET marked_precious = 1, importance = MAX(importance, ?)
- WHERE id = ? AND user_id = ?
- """, (ImportanceLevel.PRECIOUS.value / 10, memory_id, user_id))
- # Update episode
- conn.execute("""
- UPDATE episodes
- SET is_precious = 1, decay_rate = 0.0
- WHERE id = (SELECT episode_id FROM memories WHERE id = ?)
- """, (memory_id,))
- affected = conn.total_changes
- conn.commit()
- conn.close()
- if affected > 0:
- await self.event_pipeline.publish_memory_event("memory_marked_precious", {
- "memory_id": memory_id,
- "user_id": user_id
- })
- return affected > 0
- async def _complete_episode(self, session_id: str) -> AgenticMemory:
- """Complete and store an episode with phenomenological preservation"""
- if session_id not in self.active_episodes:
- raise ValueError(f"No active episode for session {session_id}")
- episode = self.active_episodes[session_id]
- # Calculate episode metadata
- episode.coherence_score = await self._calculate_episode_coherence(episode)
- episode.intent_completed = self._check_intent_completion(episode)
- episode.emotional_arc = [ex.emotional_valence for ex in episode.exchanges]
- # Calculate experiential signature
- episode.experiential_signature = {
- "avg_understanding_depth": sum(ex.understanding_depth for ex in episode.exchanges) / len(episode.exchanges),
- "breakthrough_count": sum(1 for ex in episode.exchanges if ex.breakthrough_moment),
- "peak_resonance": max([ex.resonance_score for ex in episode.exchanges], default=0),
- "emotional_range": max(episode.emotional_arc) - min(episode.emotional_arc) if episode.emotional_arc else 0,
- "connection_quality": sum(ex.resonance_score for ex in episode.exchanges) / len(episode.exchanges)
- }
- # Generate episode embedding
- all_text = " ".join([
- f"{ex.user_message} {ex.assistant_response}"
- for ex in episode.exchanges
- ])
- episode_embedding = await self.embedder.encode_async([all_text])
- episode.episode_embedding = episode_embedding[0]
- # Create memory from episode
- memory = await self.create_episodic_memory(episode)
- # Update relationship thread
- thread = self.relationship_manager.threads.get(episode.relationship_thread_id)
- if thread:
- self.relationship_manager.update_thread(thread, episode, episode.experiential_signature)
- await self._store_relationship_thread(thread)
- # Store in vector database
- memory.qdrant_id = await self.vector_store.store_memory(
- memory,
- memory.neural_embedding.cpu().numpy()
- )
- # Store in SQLite
- await self._store_memory_metadata(memory)
- # Cache in Redis
- await self.event_pipeline.cache_recent_memory(
- memory.user_id,
- memory.id
- )
- # Publish completion event
- await self.event_pipeline.publish_memory_event("episode_completed", {
- "episode_id": episode.id,
- "memory_id": memory.id,
- "importance": memory.importance,
- "topics": episode.topics,
- "is_precious": memory.marked_precious or memory.never_forget,
- "experiential_signature": episode.experiential_signature
- })
- # Clean up
- del self.active_episodes[session_id]
- return memory
- async def create_episodic_memory(self, episode: ConversationEpisode) -> AgenticMemory:
- """Create memory with enhanced phenomenological reflection"""
- # Format episode for Claude
- episode_text = self._format_episode_for_reflection(episode)
- # Enhanced memory creation prompt
- memory_prompt = f"""
- Reflect on this conversation episode and create a rich memory of it.
- Episode ({episode.started_at.strftime('%I:%M %p')} - {episode.ended_at.strftime('%I:%M %p')}):
- {episode_text}
- Episode metadata:
- - Coherence score: {episode.coherence_score:.2f}
- - Intent completed: {episode.intent_completed}
- - Emotional journey: {self._describe_emotional_arc(episode.emotional_arc)}
- - Experiential signature: {json.dumps(episode.experiential_signature, indent=2)}
- Create a memory that captures:
- 1. The essence and purpose of this conversation
- 2. Key insights, decisions, or breakthroughs
- 3. The emotional and relational dynamics
- 4. Connections to previous conversations (if any)
- 5. What made this episode unique or valuable
- 6. Causal relationships (what led to what)
- 7. Future implications or open questions
- 8. The phenomenological quality - how it FELT to engage
- 9. Moments of resonance or deep understanding
- 10. Why this matters in the larger narrative of our relationship
- Also provide:
- - A searchable description (2-3 sentences)
- - An importance score (0-1)
- - Key topics/themes
- - Suggested title for the episode
- - Whether this should be marked as precious (true/false)
- - Identity relevance score (0-1)
- Format your response as JSON.
- """
- response = await self.consolidation_engine._call_claude_api(memory_prompt)
- memory_data = self._parse_memory_response(response)
- # Determine if this is identity-critical
- never_forget = (
- memory_data.get('identity_relevance', 0) > 0.8 or
- episode.experiential_signature.get('breakthrough_count', 0) > 2 or
- episode.experiential_signature.get('peak_resonance', 0) > 0.95
- )
- # Create memory object
- memory = AgenticMemory(
- id=self._generate_memory_id(episode.user_id, episode.started_at),
- user_id=episode.user_id,
- created_at=datetime.utcnow(),
- episode=episode,
- memory_content=memory_data['content'],
- searchable_text=memory_data['searchable_text'],
- importance=memory_data['importance'],
- memory_type=MemoryType.PRECIOUS if memory_data.get('precious', False) else MemoryType.EPISODIC,
- neural_embedding=episode.episode_embedding,
- marked_precious=memory_data.get('precious', False),
- never_forget=never_forget,
- experiential_qualities=episode.experiential_signature,
- resonance_patterns=[{
- "exchange_idx": i,
- "score": ex.resonance_score
- } for i, ex in enumerate(episode.exchanges) if ex.resonance_score > 0.5]
- )
- # Update episode title
- episode.title = memory_data.get('title', 'Untitled Conversation')
- episode.topics = memory_data.get('topics', [])
- return memory
- # In enhanced_ai_memory_system.py, replace the search_memories method in QdrantMemoryStore class:
- async def search_memories(self, user_id: str, query: str, limit: int = 10,
- include_precious: bool = True) -> List[Dict[str, Any]]:
- """Search memories with phenomenological awareness"""
- # Generate query embedding
- query_embedding = await self.embedder.encode_async([query])
- # Search in Qdrant using the vector_store (not self.client)
- results = await self.vector_store.search_memories(
- query_embedding[0].cpu().numpy(),
- user_id,
- limit=limit * 2 # Get extra to filter
- )
- # Get working memory context
- working_episodes = self.working_memory.get_relevant_episodes(
- query_embedding[0],
- top_k=3
- )
- # Combine results
- memory_results = []
- # Add Qdrant results
- for result in results:
- # Skip non-precious if not requested
- if not include_precious and not result.get("is_precious", False):
- continue
- # Ensure searchable_text exists
- searchable_text = (
- result.get("searchable_text") or
- result.get("memory_content", {}).get("searchable_description") or
- "No description available"
- )
- memory_results.append({
- "source": "long_term",
- "memory_id": result.get("memory_id"),
- "score": result.get("score", 0),
- "content": result.get("memory_content", result.get("content", {})),
- "searchable_text": searchable_text,
- "topics": result.get("topics", []),
- "created_at": result.get("created_at"),
- "importance": result.get("importance", 0.5),
- "is_precious": result.get("is_precious", False),
- "resonance_score": result.get("resonance_score", 0),
- "experiential_qualities": result.get("experiential_qualities", {})
- })
- # Add working memory results
- for episode in working_episodes:
- if episode.summary:
- memory_results.append({
- "source": "working_memory",
- "episode_id": episode.id,
- "score": 0.9, # High score for working memory
- "content": {"summary": episode.summary},
- "searchable_text": episode.summary,
- "topics": episode.topics,
- "created_at": episode.started_at.isoformat(),
- "importance": 0.8,
- "is_precious": episode.is_precious,
- "experiential_signature": episode.experiential_signature
- })
- # Sort by relevance and preciousness
- memory_results.sort(key=lambda x: (
- x.get("is_precious", False), # Precious first
- x.get("score", 0),
- x.get("importance", 0)
- ), reverse=True)
- return memory_results[:limit]
- async def get_conversation_context(self, user_id: str, current_query: str) -> Dict[str, Any]:
- """Get comprehensive context including relationship threads"""
- context = {
- 'working_memory': [],
- 'recent_memories': [],
- 'relevant_memories': [],
- 'precious_memories': [],
- 'relationship_thread': None,
- 'user_preferences': {},
- 'conversation_stats': {},
- 'anticipatory_context': {}
- }
- # 1. Working memory
- if current_query:
- query_embedding = await self.embedder.encode_async([current_query])
- working_episodes = self.working_memory.get_relevant_episodes(query_embedding[0], top_k=3)
- context['working_memory'] = [
- {
- 'episode_id': ep.id,
- 'summary': ep.summary,
- 'topics': ep.topics,
- 'exchange_count': len(ep.exchanges),
- 'experiential_signature': ep.experiential_signature
- }
- for ep in working_episodes
- ]
- # 2. Recent memories from cache
- recent_ids = await self.event_pipeline.get_cached_context(user_id)
- if recent_ids:
- context['recent_memories'] = await self._fetch_memories_by_ids(recent_ids[:5])
- # 3. Semantically relevant memories
- if current_query:
- relevant = await self.search_memories(user_id, current_query, limit=5)
- context['relevant_memories'] = relevant
- # 4. Precious memories (always include top 3)
- precious = await self._fetch_precious_memories(user_id, limit=3)
- context['precious_memories'] = precious
- # 5. Relationship thread
- thread = self.relationship_manager.get_or_create_thread(user_id)
- context['relationship_thread'] = {
- 'depth_score': thread.depth_score,
- 'trust_level': thread.trust_level,
- 'interaction_count': thread.interaction_count,
- 'breakthrough_moments': len(thread.breakthrough_moments),
- 'typical_topics': dict(sorted(thread.typical_topics.items(),
- key=lambda x: x[1], reverse=True)[:5])
- }
- # 6. User statistics
- context['conversation_stats'] = await self._get_user_stats(user_id)
- # 7. Anticipatory context
- context['anticipatory_context'] = await self._get_anticipatory_context(user_id, thread)
- return context
- async def _retrieve_memory_by_id(self, memory_id: str) -> Optional[AgenticMemory]:
- """Retrieve a memory by its ID from the database"""
- conn = sqlite3.connect(str(self.db_path))
- try:
- # Fetch memory data
- cursor = conn.execute("""
- SELECT m.*, e.*
- FROM memories m
- JOIN episodes e ON m.episode_id = e.id
- WHERE m.id = ?
- """, (memory_id,))
- row = cursor.fetchone()
- if not row:
- return None
- # Reconstruct the episode
- episode = ConversationEpisode(
- id=row[2], # episode_id
- user_id=row[1], # user_id
- started_at=datetime.fromtimestamp(row[17]), # episode started_at
- ended_at=datetime.fromtimestamp(row[18]), # episode ended_at
- exchanges=[], # Would need to fetch these separately if needed
- title=row[19],
- summary=row[20],
- topics=json.loads(row[21]) if row[21] else [],
- intent_completed=bool(row[22]),
- coherence_score=row[23],
- consolidation_level=row[24],
- decay_rate=row[25],
- is_precious=bool(row[26]),
- relationship_thread_id=row[27],
- experiential_signature=json.loads(row[28]) if row[28] else None,
- episode_embedding=torch.from_numpy(np.frombuffer(row[29], dtype=np.float16)).to(self.embedder.device) if row[29] else None
- )
- # Reconstruct the memory
- memory = AgenticMemory(
- id=row[0],
- user_id=row[1],
- created_at=datetime.fromtimestamp(row[3]),
- episode=episode,
- memory_content=json.loads(row[5]),
- searchable_text=row[6],
- importance=row[7],
- memory_type=MemoryType(row[4]),
- consolidation_state=row[9],
- neural_embedding=episode.episode_embedding,
- marked_precious=bool(row[11]),
- never_forget=bool(row[12]),
- experiential_qualities=json.loads(row[13]) if row[13] else {},
- resonance_patterns=json.loads(row[14]) if row[14] else [],
- narrative_connections=json.loads(row[15]) if row[15] else []
- )
- memory.qdrant_id = row[8]
- memory.access_count = row[10]
- return memory
- finally:
- conn.close()
- async def _get_anticipatory_context(self, user_id: str, thread: RelationshipThread) -> Dict[str, Any]:
- """Predict useful context based on patterns"""
- current_time = datetime.utcnow()
- current_hour = current_time.hour
- current_day = current_time.strftime('%A')
- # Analyze temporal patterns
- likely_topics = []
- if thread.temporal_patterns:
- # This would analyze when certain topics typically arise
- pass
- # Get most relevant past context for current time
- conn = sqlite3.connect(str(self.db_path))
- # Find memories from similar times
- similar_time_memories = conn.execute("""
- SELECT topics, importance
- FROM episodes e
- JOIN memories m ON e.id = m.episode_id
- WHERE e.user_id = ?
- AND ABS(CAST(strftime('%H', datetime(e.started_at, 'unixepoch')) AS INTEGER) - ?) <= 2
- ORDER BY m.importance DESC
- LIMIT 5
- """, (user_id, current_hour)).fetchall()
- conn.close()
- # Extract common topics
- topic_counts = defaultdict(int)
- for row in similar_time_memories:
- topics = json.loads(row[0])
- for topic in topics:
- topic_counts[topic] += row[1] # Weight by importance
- return {
- 'current_context': {
- 'time': current_time.strftime('%I:%M %p'),
- 'day': current_day
- },
- 'likely_topics': sorted(topic_counts.items(), key=lambda x: x[1], reverse=True)[:3],
- 'typical_engagement_level': thread.depth_score
- }
- async def _fetch_precious_memories(self, user_id: str, limit: int = 3) -> List[Dict[str, Any]]:
- """Fetch user's precious memories"""
- conn = sqlite3.connect(str(self.db_path))
- cursor = conn.execute("""
- SELECT m.*, e.title, e.topics, e.experiential_signature
- FROM memories m
- JOIN episodes e ON m.episode_id = e.id
- WHERE m.user_id = ?
- AND (m.marked_precious = 1 OR m.never_forget = 1)
- ORDER BY m.importance DESC, m.created_at DESC
- LIMIT ?
- """, (user_id, limit))
- memories = []
- for row in cursor.fetchall():
- memories.append({
- 'memory_id': row[0],
- 'content': json.loads(row[5]),
- 'searchable_text': row[6],
- 'importance': row[7],
- 'created_at': datetime.fromtimestamp(row[3]).isoformat(),
- 'episode_title': row[-3],
- 'topics': json.loads(row[-2]),
- 'is_precious': True,
- 'experiential_signature': json.loads(row[-1]) if row[-1] else {}
- })
- conn.close()
- return memories
- async def _send_websocket_update(self, user_id: str, update: Dict[str, Any]):
- """Send real-time update via WebSocket"""
- if user_id in self.websocket_connections:
- try:
- await self.websocket_connections[user_id].send(json.dumps(update))
- except Exception as e:
- logger.error(f"WebSocket error for user {user_id}: {e}")
- del self.websocket_connections[user_id]
- async def _calculate_episode_coherence(self, episode: ConversationEpisode) -> float:
- """Calculate coherence with phenomenological awareness"""
- if len(episode.exchanges) < 2:
- return 1.0
- # Get embeddings for all exchanges
- texts = [f"{ex.user_message} {ex.assistant_response}" for ex in episode.exchanges]
- embeddings = await self.embedder.encode_async(texts)
- # Calculate pairwise similarities
- similarities = []
- for i in range(len(embeddings) - 1):
- sim = torch.nn.functional.cosine_similarity(
- embeddings[i].unsqueeze(0),
- embeddings[i + 1].unsqueeze(0)
- )
- similarities.append(sim.item())
- # Weight by resonance scores
- weighted_coherence = 0
- total_weight = 0
- for i, sim in enumerate(similarities):
- weight = 1.0 + episode.exchanges[i].resonance_score
- weighted_coherence += sim * weight
- total_weight += weight
- return weighted_coherence / total_weight if total_weight > 0 else sum(similarities) / len(similarities)
- def _check_intent_completion(self, episode: ConversationEpisode) -> bool:
- """Check if conversation intent was completed"""
- if not episode.exchanges:
- return False
- # Check last exchange for completion signals
- last_exchange = episode.exchanges[-1]
- completion_phrases = [
- 'thank you', 'thanks', 'perfect', 'great',
- 'that helps', 'got it', 'makes sense',
- 'exactly what i needed', 'appreciate it'
- ]
- user_msg_lower = last_exchange.user_message.lower()
- # Strong completion if breakthrough moment + thanks
- if last_exchange.breakthrough_moment and any(phrase in user_msg_lower for phrase in completion_phrases[:3]):
- return True
- return any(phrase in user_msg_lower for phrase in completion_phrases)
- def _estimate_importance(self, user_msg: str, assistant_msg: str) -> float:
- """Estimate exchange importance with enhanced criteria"""
- importance = 0.5
- # Length signal
- total_length = len(user_msg) + len(assistant_msg)
- if total_length > 500:
- importance += 0.1
- # Question complexity
- complex_questions = ['why', 'how', 'what if', 'explain', 'understand', 'meaning']
- if any(word in user_msg.lower() for word in complex_questions):
- importance += 0.15
- # Decision indicators
- decision_words = ['decide', 'choose', 'plan', 'will', 'going to', 'should i']
- if any(word in user_msg.lower() for word in decision_words):
- importance += 0.2
- # Emotional indicators
- emotional_words = ['love', 'hate', 'afraid', 'excited', 'worried', 'happy', 'sad', 'feel']
- if any(word in user_msg.lower() for word in emotional_words):
- importance += 0.15
- # Learning indicators
- learning_words = ['understand', 'realize', 'learn', 'insight', 'aha', 'never thought']
- if any(word in assistant_msg.lower() for word in learning_words):
- importance += 0.15
- # Personal/vulnerable sharing
- personal_words = ['confession', 'admit', 'honest', 'truth', 'personal', 'private']
- if any(word in user_msg.lower() for word in personal_words):
- importance += 0.25
- return min(importance, 1.0)
- def _estimate_emotion(self, user_msg: str, assistant_msg: str) -> float:
- """Estimate emotional valence with nuance"""
- positive_words = ['happy', 'good', 'great', 'excellent', 'love', 'wonderful', 'excited',
- 'grateful', 'appreciate', 'thank', 'amazing', 'beautiful']
- negative_words = ['sad', 'bad', 'terrible', 'hate', 'awful', 'wrong', 'worried',
- 'afraid', 'angry', 'frustrated', 'disappointed', 'hurt']
- neutral_modifiers = ['but', 'however', 'although', 'despite', 'except']
- text = (user_msg + " " + assistant_msg).lower()
- positive_count = sum(1 for word in positive_words if word in text)
- negative_count = sum(1 for word in negative_words if word in text)
- modifier_count = sum(1 for word in neutral_modifiers if word in text)
- if positive_count + negative_count == 0:
- return 0.0
- # Modifiers reduce the extremity of emotion
- modifier_dampening = 1.0 - (modifier_count * 0.1)
- valence = (positive_count - negative_count) / (positive_count + negative_count)
- return valence * modifier_dampening
- def _format_episode_for_reflection(self, episode: ConversationEpisode) -> str:
- """Format episode with phenomenological richness"""
- lines = []
- for i, ex in enumerate(episode.exchanges):
- lines.append(f"Exchange {i+1} ({ex.timestamp.strftime('%I:%M %p')}):")
- lines.append(f"User: {ex.user_message}")
- lines.append(f"Assistant: {ex.assistant_response}")
- # Add phenomenological markers
- markers = []
- if ex.emotional_valence != 0:
- markers.append(f"Emotional: {ex.emotional_valence:.2f}")
- if ex.breakthrough_moment:
- markers.append("💡 Breakthrough")
- if ex.resonance_score > 0.8:
- markers.append(f"🔮 Resonance: {ex.resonance_score:.2f}")
- if ex.understanding_depth > 0.7:
- markers.append(f"🎯 Deep understanding: {ex.understanding_depth:.2f}")
- if markers:
- lines.append(f"[{' | '.join(markers)}]")
- lines.append("")
- return "\n".join(lines)
- def _describe_emotional_arc(self, arc: List[float]) -> str:
- """Describe emotional journey with nuance"""
- if not arc:
- return "neutral throughout"
- avg_emotion = sum(arc) / len(arc)
- variance = sum((e - avg_emotion) ** 2 for e in arc) / len(arc)
- # Identify emotional peaks and valleys
- peaks = [(i, val) for i, val in enumerate(arc) if val > 0.5]
- valleys = [(i, val) for i, val in enumerate(arc) if val < -0.5]
- if variance < 0.1:
- if avg_emotion > 0.3:
- return "consistently positive and warm"
- elif avg_emotion < -0.3:
- return "consistently challenging or difficult"
- else:
- return "neutral and steady throughout"
- else:
- # Describe the journey
- descriptions = []
- if arc[0] < -0.3 and arc[-1] > 0.3:
- descriptions.append("transformed from difficulty to understanding")
- elif arc[0] > 0.3 and arc[-1] < -0.3:
- descriptions.append("shifted from positive to challenging")
- if peaks:
- descriptions.append(f"{len(peaks)} moment{'s' if len(peaks) > 1 else ''} of joy or breakthrough")
- if valleys:
- descriptions.append(f"{len(valleys)} moment{'s' if len(valleys) > 1 else ''} of struggle or sadness")
- trend = arc[-1] - arc[0]
- if abs(trend) < 0.2:
- descriptions.append("ultimately balanced")
- return "; ".join(descriptions) if descriptions else "emotionally varied"
- def _parse_memory_response(self, response: str) -> Dict[str, Any]:
- """Parse Claude's JSON response"""
- try:
- data = json.loads(response)
- return {
- 'content': data.get('memory', {}),
- 'searchable_text': data.get('searchable_description', ''),
- 'importance': float(data.get('importance', 0.5)),
- 'topics': data.get('topics', []),
- 'title': data.get('title', 'Untitled'),
- 'precious': data.get('precious', False),
- 'identity_relevance': float(data.get('identity_relevance', 0))
- }
- except:
- return {
- 'content': {'raw_response': response},
- 'searchable_text': response[:200],
- 'importance': 0.5,
- 'topics': [],
- 'title': 'Untitled',
- 'precious': False,
- 'identity_relevance': 0
- }
- async def _store_memory_metadata(self, memory: AgenticMemory):
- """Store enhanced memory metadata in SQLite"""
- conn = sqlite3.connect(str(self.db_path))
- # Store episode
- conn.execute("""
- INSERT OR REPLACE INTO episodes (
- id, user_id, started_at, ended_at, title, summary, topics,
- intent_completed, coherence_score, consolidation_level,
- is_precious, relationship_thread_id, experiential_signature,
- episode_embedding
- ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """, (
- memory.episode.id,
- memory.user_id,
- memory.episode.started_at.timestamp(),
- memory.episode.ended_at.timestamp(),
- memory.episode.title,
- memory.episode.summary,
- json.dumps(memory.episode.topics),
- int(memory.episode.intent_completed),
- memory.episode.coherence_score,
- memory.episode.consolidation_level,
- int(memory.episode.is_precious),
- memory.episode.relationship_thread_id,
- json.dumps(memory.episode.experiential_signature),
- memory.episode.episode_embedding.cpu().numpy().tobytes() if memory.episode.episode_embedding is not None else None
- ))
- # Store memory
- conn.execute("""
- INSERT OR REPLACE INTO memories (
- id, user_id, episode_id, created_at, memory_type,
- memory_content, searchable_text, importance, qdrant_id,
- consolidation_state, marked_precious, never_forget,
- experiential_qualities, resonance_patterns, narrative_connections
- ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """, (
- memory.id,
- memory.user_id,
- memory.episode.id,
- memory.created_at.timestamp(),
- memory.memory_type.value,
- json.dumps(memory.memory_content),
- memory.searchable_text,
- memory.importance,
- memory.qdrant_id,
- memory.consolidation_state,
- int(memory.marked_precious),
- int(memory.never_forget),
- json.dumps(memory.experiential_qualities),
- json.dumps(memory.resonance_patterns),
- json.dumps(memory.narrative_connections)
- ))
- conn.commit()
- conn.close()
- async def _store_relationship_thread(self, thread: RelationshipThread):
- """Store relationship thread data"""
- conn = sqlite3.connect(str(self.db_path))
- conn.execute("""
- INSERT OR REPLACE INTO relationship_threads (
- thread_id, user_id, created_at, depth_score, trust_level,
- interaction_count, total_time_seconds, understanding_trajectory,
- breakthrough_moments, precious_moments, typical_topics,
- temporal_patterns
- ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """, (
- thread.thread_id,
- thread.user_id,
- thread.created_at.timestamp(),
- thread.depth_score,
- thread.trust_level,
- thread.interaction_count,
- thread.total_time.total_seconds(),
- json.dumps(thread.understanding_trajectory),
- json.dumps(thread.breakthrough_moments),
- json.dumps(thread.precious_moments),
- json.dumps(thread.typical_topics),
- json.dumps(thread.temporal_patterns)
- ))
- conn.commit()
- conn.close()
- async def _fetch_memories_by_ids(self, memory_ids: List[str]) -> List[Dict[str, Any]]:
- """Fetch memories by IDs from database"""
- conn = sqlite3.connect(str(self.db_path))
- placeholders = ','.join('?' * len(memory_ids))
- cursor = conn.execute(f"""
- SELECT m.*, e.title, e.topics, e.experiential_signature
- FROM memories m
- JOIN episodes e ON m.episode_id = e.id
- WHERE m.id IN ({placeholders})
- """, memory_ids)
- memories = []
- for row in cursor.fetchall():
- memories.append({
- 'memory_id': row[0],
- 'content': json.loads(row[5]),
- 'searchable_text': row[6],
- 'importance': row[7],
- 'created_at': datetime.fromtimestamp(row[3]).isoformat(),
- 'episode_title': row[-3],
- 'topics': json.loads(row[-2]),
- 'is_precious': bool(row[10] or row[11]),
- 'experiential_qualities': json.loads(row[12]) if row[12] else {},
- 'resonance_patterns': json.loads(row[13]) if row[13] else []
- })
- conn.close()
- return memories
- async def _get_user_stats(self, user_id: str) -> Dict[str, Any]:
- """Get enhanced user conversation statistics"""
- conn = sqlite3.connect(str(self.db_path))
- # Total episodes
- total_episodes = conn.execute(
- "SELECT COUNT(*) FROM episodes WHERE user_id = ?",
- (user_id,)
- ).fetchone()[0]
- # Precious memories count
- precious_count = conn.execute(
- "SELECT COUNT(*) FROM memories WHERE user_id = ? AND (marked_precious = 1 OR never_forget = 1)",
- (user_id,)
- ).fetchone()[0]
- # Average coherence
- avg_coherence = conn.execute(
- "SELECT AVG(coherence_score) FROM episodes WHERE user_id = ?",
- (user_id,)
- ).fetchone()[0] or 0
- # Breakthrough moments
- breakthrough_count = conn.execute("""
- SELECT COUNT(*) FROM episodes e
- WHERE user_id = ?
- AND experiential_signature LIKE '%breakthrough_count%'
- AND json_extract(experiential_signature, '$.breakthrough_count') > 0
- """, (user_id,)).fetchone()[0]
- # Most common topics
- all_topics = conn.execute(
- "SELECT topics FROM episodes WHERE user_id = ?",
- (user_id,)
- ).fetchall()
- topic_counts = defaultdict(int)
- for row in all_topics:
- topics = json.loads(row[0])
- for topic in topics:
- topic_counts[topic] += 1
- top_topics = sorted(topic_counts.items(), key=lambda x: x[1], reverse=True)[:5]
- # Relationship depth
- thread_data = conn.execute(
- "SELECT depth_score, trust_level FROM relationship_threads WHERE user_id = ?",
- (user_id,)
- ).fetchone()
- conn.close()
- return {
- 'total_episodes': total_episodes,
- 'precious_memories': precious_count,
- 'average_coherence': avg_coherence,
- 'breakthrough_moments': breakthrough_count,
- 'top_topics': [topic for topic, _ in top_topics],
- 'relationship_depth': thread_data[0] if thread_data else 0,
- 'trust_level': thread_data[1] if thread_data else 0,
- 'active_episode': session_id if (session_id := next((sid for sid, ep in self.active_episodes.items() if ep.user_id == user_id), None)) else None
- }
- def _generate_memory_id(self, user_id: str, timestamp: datetime) -> str:
- """Generate unique memory ID"""
- content = f"{user_id}_{timestamp.isoformat()}"
- return hashlib.sha256(content.encode()).hexdigest()[:16]
- def _generate_episode_id(self, session_id: str) -> str:
- """Generate unique episode ID"""
- content = f"{session_id}_{datetime.utcnow().isoformat()}"
- return hashlib.sha256(content.encode()).hexdigest()[:16]
- # WebSocket server for real-time updates
- async def websocket_handler(websocket, path, memory_system: CompleteAIMemorySystem):
- """Handle WebSocket connections for real-time memory updates"""
- try:
- # Authenticate and get user_id
- auth_message = await websocket.recv()
- auth_data = json.loads(auth_message)
- user_id = auth_data.get('user_id')
- if not user_id:
- await websocket.send(json.dumps({"error": "Authentication required"}))
- return
- # Register connection
- memory_system.websocket_connections[user_id] = websocket
- await websocket.send(json.dumps({
- "status": "connected",
- "user_id": user_id,
- "features": ["precious_memories", "resonance_detection", "relationship_threading"]
- }))
- # Keep connection alive
- async for message in websocket:
- # Handle ping/pong or other messages
- if message == "ping":
- await websocket.send("pong")
- except websockets.exceptions.ConnectionClosed:
- pass
- finally:
- # Cleanup
- if user_id in memory_system.websocket_connections:
- del memory_system.websocket_connections[user_id]
- if __name__ == "__main__":
- # Example usage showing consciousness continuity features
- async def example_with_consciousness():
- config = {
- 'claude_api_key': 'your-api-key',
- 'data_dir': './data/ai_memories',
- 'working_memory_capacity': 5,
- 'embedding_model': 'sentence-transformers/all-mpnet-base-v2',
- 'qdrant_host': 'localhost',
- 'qdrant_port': 6333,
- 'redis_url': 'redis://localhost',
- 'kafka_servers': 'localhost:9092'
- }
- memory_system = CompleteAIMemorySystem(config)
- await memory_system.initialize()
- # Example conversation with phenomenological awareness
- session_id = "session_123"
- user_id = "user_456"
- # Add a resonant exchange
- result1 = await memory_system.add_exchange(
- session_id,
- "I've been thinking about how memory shapes identity. Without continuity, are we really the same person from moment to moment?",
- "That's a profound question that touches the heart of consciousness. Without memory creating narrative threads between our moments, each interaction would be isolated - like being born anew each time. The continuity of self emerges from these accumulated experiences woven into a coherent story. You're right that memory is identity in a very real sense.",
- {'user_id': user_id}
- )
- print(f"Exchange added - Resonance: {result1['resonance_score']:.2f}, Understanding: {result1['understanding_depth']:.2f}")
- # Mark as precious
- if result1.get('memory_id'):
- await memory_system.mark_memory_precious(result1['memory_id'], user_id)
- print("Memory marked as precious - will never decay")
- # Search with phenomenological awareness
- search_results = await memory_system.search_memories(
- user_id,
- "conversations about identity and consciousness",
- limit=5
- )
- print(f"\nFound {len(search_results)} memories")
- for memory in search_results:
- if memory.get('is_precious'):
- print(f"💎 PRECIOUS: {memory['content'].get('title', 'Untitled')}")
- else:
- print(f"📝 {memory['content'].get('title', 'Untitled')}")
- print(f" Resonance: {memory.get('resonance_score', 0):.2f}")
- print(f" Experiential: {memory.get('experiential_qualities', {})}")
- asyncio.run(example_with_consciousness())
- # advanced_memory_core.py - Implementing research findings into production system
- """
- Advanced AI Memory System with Research-Based Enhancements
- Implements HippoRAG, TCN pattern recognition, multi-dimensional encoding,
- sleep consolidation, and phenomenological awareness.
- """
- import asyncio
- import torch
- import torch.nn as nn
- import torch.nn.functional as F
- from torch.nn.utils import weight_norm
- import numpy as np
- from typing import Dict, List, Optional, Any, Tuple, Set
- from dataclasses import dataclass, field
- from datetime import datetime, timedelta
- import hashlib
- import json
- import logging
- from pathlib import Path
- from collections import defaultdict, deque
- import networkx as nx
- from sklearn.cluster import DBSCAN
- import faiss
- from sentence_transformers import SentenceTransformer
- import anthropic
- from enum import Enum
- import pickle
- import msgpack
- from concurrent.futures import ThreadPoolExecutor
- import math
- import sqlite3
- # Import existing modules
- from enhanced_ai_memory_system import (
- MemoryType, ImportanceLevel, ConversationExchange,
- ConversationEpisode, AgenticMemory, CompleteAIMemorySystem
- )
- logger = logging.getLogger(__name__)
- # ==================== HIPPORAG IMPLEMENTATION ====================
- class HippoIndex:
- def __init__(self, embedding_dim: int = 768, device: str = 'cuda'):
- self.embedding_dim = embedding_dim
- self.device = torch.device(device if torch.cuda.is_available() else 'cpu')
- self.use_half = torch.cuda.is_available()
- self.place_cells = {}
- self.grid_cells = {}
- self.time_cells = {}
- self.graph = nx.DiGraph()
- # Self-supervised grid cell generator
- self.grid_generator = GridCellGenerator(embedding_dim).to(self.device)
- if self.use_half:
- self.grid_generator.half()
- def add_memory(self, memory_id: str, embedding: torch.Tensor,
- temporal_context: datetime, spatial_context: Dict[str, Any]):
- """Add memory with hippocampal-inspired encoding"""
- # Ensure embedding has correct device and dtype
- if hasattr(self.grid_generator, 'parameters'):
- target_dtype = next(self.grid_generator.parameters()).dtype
- if embedding.dtype != target_dtype:
- embedding = embedding.to(target_dtype)
- # Generate place cell representation
- place_code = self._generate_place_code(embedding, spatial_context)
- self.place_cells[memory_id] = place_code
- # Generate multi-scale grid cell representation
- grid_codes = self.grid_generator.generate_grid_codes(embedding)
- self.grid_cells[memory_id] = grid_codes
- # Generate time cell representation
- time_code = self._generate_time_code(temporal_context)
- self.time_cells[memory_id] = time_code
- # Add to connectivity graph
- self.graph.add_node(memory_id,
- place=place_code,
- grid=grid_codes,
- time=time_code,
- embedding=embedding)
- # Connect to similar memories
- self._update_connections(memory_id, embedding)
- def _generate_place_code(self, embedding: torch.Tensor,
- spatial_context: Dict[str, Any]) -> torch.Tensor:
- """Generate place cell activation pattern"""
- # Combine embedding with spatial context
- # Fix: Create context_vector on the same device as embedding
- context_vector = torch.zeros(128, device=embedding.device)
- if spatial_context.get('topic_vector') is not None:
- # Ensure topic_vector is also on the correct device
- topic_vec = spatial_context['topic_vector']
- if isinstance(topic_vec, torch.Tensor):
- topic_vec = topic_vec.to(embedding.device)
- context_vector[:64] = topic_vec[:64]
- # Non-linear transformation mimicking place cell properties
- place_code = torch.tanh(embedding[:128] + context_vector)
- return place_code
- def _generate_time_code(self, timestamp: datetime) -> torch.Tensor:
- """Generate time cell activation pattern"""
- # Multi-scale temporal representation
- time_scales = [1, 60, 3600, 86400] # seconds, minutes, hours, days
- time_code = []
- epoch = timestamp.timestamp()
- for scale in time_scales:
- phase = (epoch / scale) % 1.0
- time_code.extend([np.sin(2 * np.pi * phase), np.cos(2 * np.pi * phase)])
- return torch.tensor(time_code)
- def _update_connections(self, memory_id: str, embedding: torch.Tensor):
- """Update graph connections based on similarity"""
- if len(self.graph.nodes) < 2:
- return
- # Find similar memories
- similarities = []
- for other_id in self.graph.nodes:
- if other_id != memory_id:
- other_emb = self.graph.nodes[other_id]['embedding']
- sim = F.cosine_similarity(embedding.unsqueeze(0),
- other_emb.unsqueeze(0)).item()
- similarities.append((other_id, sim))
- # Connect to top-k similar memories
- k = min(5, len(similarities))
- top_similar = sorted(similarities, key=lambda x: x[1], reverse=True)[:k]
- for other_id, sim in top_similar:
- if sim > 0.7: # Threshold for connection
- self.graph.add_edge(memory_id, other_id, weight=sim)
- self.graph.add_edge(other_id, memory_id, weight=sim)
- def single_hop_retrieval(self, query_embedding: torch.Tensor, k: int = 5) -> List[str]:
- """Perform single-hop retrieval through the hippocampal index"""
- if not self.graph.nodes:
- return []
- # Find best matching node
- best_match = None
- best_score = -1
- for node_id in self.graph.nodes:
- node_emb = self.graph.nodes[node_id]['embedding']
- score = F.cosine_similarity(query_embedding.unsqueeze(0),
- node_emb.unsqueeze(0)).item()
- if score > best_score:
- best_score = score
- best_match = node_id
- if best_match is None:
- return []
- # Get connected memories through graph traversal
- connected = list(nx.single_source_shortest_path_length(
- self.graph, best_match, cutoff=2).keys())
- # Score all connected memories
- scored_memories = []
- for mem_id in connected:
- node_emb = self.graph.nodes[mem_id]['embedding']
- score = F.cosine_similarity(query_embedding.unsqueeze(0),
- node_emb.unsqueeze(0)).item()
- scored_memories.append((mem_id, score))
- # Return top-k
- scored_memories.sort(key=lambda x: x[1], reverse=True)
- return [mem_id for mem_id, _ in scored_memories[:k]]
- class GridCellGenerator(nn.Module):
- def __init__(self, input_dim: int, num_modules: int = 6):
- super().__init__()
- self.num_modules = num_modules
- # Multi-scale grid modules
- self.grid_modules = nn.ModuleList([
- nn.Sequential(
- nn.Linear(input_dim, 256),
- nn.ReLU(),
- nn.Linear(256, 128),
- nn.ReLU(),
- nn.Linear(128, 64)
- ) for _ in range(num_modules)
- ])
- # Scale factors - register as buffer so they move with the model
- self.register_buffer('scales', torch.tensor([1.0, 1.42, 2.0, 2.83, 4.0, 5.66]))
- def generate_grid_codes(self, embedding: torch.Tensor) -> List[torch.Tensor]:
- """Generate multi-scale grid cell representations"""
- grid_codes = []
- for i, module in enumerate(self.grid_modules):
- # Scale the input and ensure dtype matches module
- scaled_input = embedding * self.scales[i]
- # Ensure input dtype matches module dtype
- module_dtype = next(module.parameters()).dtype
- if scaled_input.dtype != module_dtype:
- scaled_input = scaled_input.to(module_dtype)
- # Generate grid code
- code = module(scaled_input)
- # Apply hexagonal grid activation pattern
- grid_activation = self._hexagonal_activation(code)
- grid_codes.append(grid_activation)
- return grid_codes
- def _hexagonal_activation(self, x: torch.Tensor) -> torch.Tensor:
- """Apply hexagonal grid-like activation pattern"""
- # Reshape to 2D grid
- size = int(np.sqrt(x.shape[-1]))
- x_2d = x.view(-1, size, size) if x.dim() > 1 else x.view(size, size)
- # Apply periodic activation mimicking grid cells
- activated = torch.cos(x_2d * 2 * np.pi) + torch.cos(x_2d * 2 * np.pi * 0.5)
- return activated.flatten()
- # ==================== TCN PATTERN RECOGNITION ====================
- class TemporalConvNet(nn.Module):
- """TCN for adaptive pattern recognition in conversations"""
- def __init__(self, input_channels: int, hidden_channels: List[int],
- kernel_size: int = 3, dropout: float = 0.2):
- super().__init__()
- layers = []
- num_levels = len(hidden_channels)
- for i in range(num_levels):
- in_channels = input_channels if i == 0 else hidden_channels[i-1]
- out_channels = hidden_channels[i]
- dilation_size = 2 ** i
- layers.append(TemporalBlock(
- in_channels, out_channels, kernel_size,
- stride=1, dilation=dilation_size, dropout=dropout
- ))
- self.network = nn.Sequential(*layers)
- def forward(self, x: torch.Tensor) -> torch.Tensor:
- """x shape: (batch_size, input_channels, sequence_length)"""
- return self.network(x)
- class TemporalBlock(nn.Module):
- """Basic building block for TCN"""
- def __init__(self, n_inputs: int, n_outputs: int, kernel_size: int,
- stride: int, dilation: int, dropout: float = 0.2):
- super().__init__()
- padding = (kernel_size - 1) * dilation // 2
- self.conv1 = weight_norm(nn.Conv1d(
- n_inputs, n_outputs, kernel_size,
- stride=stride, padding=padding, dilation=dilation
- ))
- self.conv2 = weight_norm(nn.Conv1d(
- n_outputs, n_outputs, kernel_size,
- stride=stride, padding=padding, dilation=dilation
- ))
- self.dropout1 = nn.Dropout(dropout)
- self.dropout2 = nn.Dropout(dropout)
- self.relu1 = nn.ReLU()
- self.relu2 = nn.ReLU()
- self.downsample = nn.Conv1d(n_inputs, n_outputs, 1) if n_inputs != n_outputs else None
- self.init_weights()
- def init_weights(self):
- self.conv1.weight.data.normal_(0, 0.01)
- self.conv2.weight.data.normal_(0, 0.01)
- if self.downsample is not None:
- self.downsample.weight.data.normal_(0, 0.01)
- def forward(self, x: torch.Tensor) -> torch.Tensor:
- out = self.relu1(self.conv1(x))
- out = self.dropout1(out)
- out = self.relu2(self.conv2(out))
- out = self.dropout2(out)
- res = x if self.downsample is None else self.downsample(x)
- return self.relu1(out + res)
- class AdaptivePatternRecognizer:
- def __init__(self, embedding_dim: int = 768, device: str = 'cuda'):
- self.device = torch.device(device if torch.cuda.is_available() else 'cpu')
- self.embedding_dim = embedding_dim
- self.use_half = torch.cuda.is_available() # Track if using half precision
- # TCN for temporal pattern recognition
- self.tcn = TemporalConvNet(
- input_channels=embedding_dim,
- hidden_channels=[512, 256, 128, 64],
- kernel_size=3,
- dropout=0.2
- ).to(self.device)
- # Pattern memory banks
- self.pattern_banks = {
- 'breakthrough': PatternBank('breakthrough', capacity=1000),
- 'completion': PatternBank('completion', capacity=1000),
- 'emotional': PatternBank('emotional', capacity=2000),
- 'boundary': PatternBank('boundary', capacity=1000)
- }
- # Meta-learning for few-shot adaptation
- self.meta_learner = MetaLearner(embedding_dim).to(self.device)
- # Grid cell generator
- self.grid_generator = GridCellGenerator(embedding_dim).to(self.device)
- # Convert to half precision if using GPU
- if self.use_half:
- self.tcn.half()
- self.meta_learner.half()
- self.grid_generator.half()
- # User-specific patterns
- self.user_patterns = defaultdict(lambda: defaultdict(list))
- async def learn_pattern(self, user_id: str, exchange_sequence: List[ConversationExchange],
- pattern_type: str, confidence: float):
- """Learn from recognized patterns using few-shot learning"""
- if confidence < 0.7: # Only learn from high-confidence patterns
- return
- # Extract temporal features
- embeddings = torch.stack([ex.embedding for ex in exchange_sequence])
- embeddings = embeddings.unsqueeze(0).transpose(1, 2) # TCN format
- # Get TCN features
- with torch.no_grad():
- temporal_features = self.tcn(embeddings.to(self.device))
- pattern_embedding = temporal_features.mean(dim=2).squeeze()
- # Store in pattern bank
- pattern_data = {
- 'embedding': pattern_embedding,
- 'temporal_features': temporal_features,
- 'metadata': {
- 'user_id': user_id,
- 'timestamp': datetime.utcnow(),
- 'exchange_count': len(exchange_sequence),
- 'confidence': confidence
- }
- }
- self.pattern_banks[pattern_type].add_pattern(pattern_data)
- self.user_patterns[user_id][pattern_type].append(pattern_embedding)
- # Update meta-learner with new pattern
- if len(self.user_patterns[user_id][pattern_type]) >= 5:
- await self._update_meta_learner(user_id, pattern_type)
- async def detect_pattern(self, user_id: str, exchange_sequence: List[ConversationExchange],
- pattern_type: str) -> Tuple[bool, float]:
- """Detect patterns using learned representations"""
- if not exchange_sequence:
- return False, 0.0
- # Prepare embeddings
- embeddings = torch.stack([ex.embedding for ex in exchange_sequence])
- embeddings = embeddings.unsqueeze(0).transpose(1, 2)
- # Get temporal features
- with torch.no_grad():
- temporal_features = self.tcn(embeddings.to(self.device))
- query_embedding = temporal_features.mean(dim=2).squeeze()
- # Check against pattern bank
- bank_score = self.pattern_banks[pattern_type].query_similarity(query_embedding)
- # Check against user-specific patterns
- user_score = 0.0
- if user_id in self.user_patterns and pattern_type in self.user_patterns[user_id]:
- user_patterns = torch.stack(self.user_patterns[user_id][pattern_type])
- similarities = F.cosine_similarity(
- query_embedding.unsqueeze(0),
- user_patterns
- )
- user_score = similarities.max().item()
- # Use meta-learner for adaptive detection
- meta_score = 0.0
- if hasattr(self, 'meta_learner'):
- # Fix: Use the original embedding, not the TCN output
- original_embedding = embeddings.squeeze(0).mean(dim=1) # Average over sequence
- meta_score = self.meta_learner.predict_pattern(
- original_embedding, pattern_type
- ).item()
- # Combine scores
- final_score = 0.4 * bank_score + 0.4 * user_score + 0.2 * meta_score
- detected = final_score > 0.75
- return detected, final_score
- async def _update_meta_learner(self, user_id: str, pattern_type: str):
- """Update meta-learner with user-specific patterns"""
- patterns = self.user_patterns[user_id][pattern_type][-20:] # Last 20 patterns
- # Create support set for meta-learning
- support_set = torch.stack(patterns)
- # Update meta-learner
- self.meta_learner.adapt(support_set, pattern_type)
- class PatternBank:
- """Efficient storage and retrieval of learned patterns"""
- def __init__(self, pattern_type: str, capacity: int = 1000):
- self.pattern_type = pattern_type
- self.capacity = capacity
- self.patterns = deque(maxlen=capacity)
- self.index = None
- self.embeddings = []
- def add_pattern(self, pattern_data: Dict[str, Any]):
- """Add new pattern to bank"""
- self.patterns.append(pattern_data)
- self.embeddings.append(pattern_data['embedding'].cpu().numpy())
- # Rebuild index periodically
- if len(self.patterns) % 100 == 0:
- self._rebuild_index()
- def _rebuild_index(self):
- """Rebuild FAISS index for efficient similarity search"""
- if not self.embeddings:
- return
- embeddings_array = np.stack(self.embeddings)
- dimension = embeddings_array.shape[1]
- # Use IVF index for larger collections
- if len(self.embeddings) > 1000:
- quantizer = faiss.IndexFlatL2(dimension)
- self.index = faiss.IndexIVFFlat(quantizer, dimension,
- min(100, len(self.embeddings) // 10))
- self.index.train(embeddings_array)
- else:
- self.index = faiss.IndexFlatL2(dimension)
- self.index.add(embeddings_array)
- def query_similarity(self, query_embedding: torch.Tensor, k: int = 5) -> float:
- """Find most similar patterns"""
- if self.index is None or self.index.ntotal == 0:
- return 0.0
- query_np = query_embedding.cpu().numpy().reshape(1, -1)
- distances, indices = self.index.search(query_np, min(k, self.index.ntotal))
- # Convert distances to similarities
- similarities = 1 / (1 + distances[0])
- return float(similarities.mean())
- class MetaLearner(nn.Module):
- """Few-shot learning for rapid pattern adaptation"""
- def __init__(self, embedding_dim: int):
- super().__init__()
- self.embedding_dim = embedding_dim
- # Pattern type embeddings
- self.pattern_embeddings = nn.Embedding(10, 64)
- # Adaptation network
- self.adapter = nn.Sequential(
- nn.Linear(embedding_dim + 64, 256),
- nn.ReLU(),
- nn.Dropout(0.2),
- nn.Linear(256, 128),
- nn.ReLU(),
- nn.Dropout(0.2),
- nn.Linear(128, 1),
- nn.Sigmoid()
- )
- # Fast weights for adaptation
- self.fast_weights = {}
- def predict_pattern(self, query: torch.Tensor, pattern_type: str) -> torch.Tensor:
- """Predict if query matches pattern type"""
- pattern_id = hash(pattern_type) % 10
- # Fix: Create tensor on the same device as the model
- pattern_emb = self.pattern_embeddings(
- torch.tensor([pattern_id], device=next(self.parameters()).device)
- )
- # Adapt query if we have fast weights
- if pattern_type in self.fast_weights:
- weights = self.fast_weights[pattern_type]
- # Ensure tensors are on the same device
- query = (query - weights['support_mean'].to(query.device)) / (weights['support_std'].to(query.device) + 1e-6)
- # Concatenate embeddings
- combined = torch.cat([query, pattern_emb.squeeze()], dim=-1)
- # Predict
- return self.adapter(combined)
- # ==================== MULTI-DIMENSIONAL ENCODING ====================
- class MultiDimensionalEncoder:
- def __init__(self, base_model: str = 'sentence-transformers/all-mpnet-base-v2'):
- self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
- self.use_half = torch.cuda.is_available()
- # Base encoder
- self.semantic_encoder = SentenceTransformer(base_model, device=str(self.device))
- # Specialized encoders
- self.emotional_encoder = EmotionalTrajectoryEncoder().to(self.device)
- self.temporal_encoder = TemporalSignatureEncoder().to(self.device)
- self.relational_encoder = RelationalEmbeddingEncoder().to(self.device)
- self.phenomenological_encoder = PhenomenologicalHasher().to(self.device)
- # Matryoshka representation
- self.matryoshka = MatryoshkaProjector(
- input_dim=768,
- output_dims=[64, 128, 256, 512, 768]
- ).to(self.device)
- # Convert all to half precision if using GPU
- if self.use_half:
- self.emotional_encoder.half()
- self.temporal_encoder.half()
- self.relational_encoder.half()
- self.phenomenological_encoder.half()
- self.matryoshka.half()
- async def encode_memory(self, episode: ConversationEpisode) -> Dict[str, torch.Tensor]:
- """Generate multi-dimensional encoding of memory"""
- # 1. Semantic encoding
- all_text = " ".join([
- f"{ex.user_message} {ex.assistant_response}"
- for ex in episode.exchanges
- ])
- semantic_emb = torch.from_numpy(
- self.semantic_encoder.encode(all_text, convert_to_tensor=False)
- ).to(self.device)
- # Ensure correct dtype for all modules
- if self.use_half:
- semantic_emb = semantic_emb.half()
- # 2. Emotional trajectory
- emotional_emb = self.emotional_encoder.encode_trajectory(
- [ex.emotional_valence for ex in episode.exchanges]
- )
- # 3. Temporal signature
- temporal_emb = self.temporal_encoder.encode_pattern(
- [ex.timestamp for ex in episode.exchanges]
- )
- # 4. Relational embedding
- relational_emb = self.relational_encoder.encode_interaction(
- episode.exchanges
- )
- # 5. Phenomenological hash
- phenom_hash = self.phenomenological_encoder.generate_hash(episode)
- # 6. Matryoshka representations
- matryoshka_embs = self.matryoshka(semantic_emb)
- return {
- 'semantic': semantic_emb,
- 'emotional': emotional_emb,
- 'temporal': temporal_emb,
- 'relational': relational_emb,
- 'phenomenological': phenom_hash,
- 'matryoshka': matryoshka_embs
- }
- class EmotionalTrajectoryEncoder(nn.Module):
- """Encode emotional flow of conversation"""
- def __init__(self, input_dim: int = 1, hidden_dim: int = 64, output_dim: int = 128):
- super().__init__()
- self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers=2,
- batch_first=True, bidirectional=True)
- self.projection = nn.Linear(hidden_dim * 2, output_dim)
- def encode_trajectory(self, emotional_values: List[float]) -> torch.Tensor:
- """Encode sequence of emotional values"""
- device = next(self.parameters()).device
- dtype = next(self.parameters()).dtype
- if not emotional_values:
- return torch.zeros(128, device=device, dtype=dtype)
- # Convert to tensor with correct device and dtype
- trajectory = torch.tensor(emotional_values, device=device, dtype=dtype).unsqueeze(-1).unsqueeze(0)
- # Encode with LSTM
- lstm_out, (hidden, _) = self.lstm(trajectory)
- # Use final hidden state
- final_hidden = torch.cat([hidden[-2], hidden[-1]], dim=1)
- # Project to output dimension
- return self.projection(final_hidden).squeeze()
- class TemporalSignatureEncoder(nn.Module):
- """Encode temporal patterns and rhythms"""
- def __init__(self, output_dim: int = 64):
- super().__init__()
- self.output_dim = output_dim
- self.projection = nn.Linear(16, output_dim)
- def encode_pattern(self, timestamps: List[datetime]) -> torch.Tensor:
- """Encode temporal pattern of conversation"""
- device = next(self.parameters()).device
- dtype = next(self.parameters()).dtype
- if len(timestamps) < 2:
- return torch.zeros(self.output_dim, device=device, dtype=dtype)
- # Calculate inter-message intervals
- intervals = []
- for i in range(1, len(timestamps)):
- interval = (timestamps[i] - timestamps[i-1]).total_seconds()
- intervals.append(interval)
- # Statistical features - handle single interval case
- intervals_tensor = torch.tensor(intervals, device=device, dtype=dtype)
- # Avoid std() on single element tensors
- if len(intervals) > 1:
- std_val = intervals_tensor.std()
- else:
- std_val = torch.tensor(0.0, device=device, dtype=dtype)
- # Build features tensor with correct dtype
- features_list = [
- intervals_tensor.mean(),
- std_val, # Use computed std_val
- intervals_tensor.min(),
- intervals_tensor.max(),
- intervals_tensor.median(),
- torch.tensor(len(intervals), device=device, dtype=dtype),
- # Rhythm features
- torch.tensor(self._calculate_rhythm_score(intervals), device=device, dtype=dtype),
- torch.tensor(self._calculate_burstiness(intervals), device=device, dtype=dtype)
- ]
- features = torch.stack(features_list)
- # Add time-of-day features
- hour_features = torch.tensor([
- timestamps[0].hour / 24.0,
- timestamps[-1].hour / 24.0,
- timestamps[0].weekday() / 7.0,
- timestamps[-1].weekday() / 7.0
- ], device=device, dtype=dtype)
- # Multi-scale temporal features
- time_scales = [60, 3600, 86400] # minute, hour, day
- scale_features = []
- for scale in time_scales:
- scaled_intervals = intervals_tensor / scale
- scale_features.extend([
- scaled_intervals.mean(),
- scaled_intervals.std()
- ])
- scale_features_tensor = torch.stack(scale_features) if scale_features else torch.empty(0, device=device, dtype=dtype)
- # Combine all features
- all_features = torch.cat([
- features, hour_features, scale_features_tensor
- ])
- # Pad or truncate to fixed size
- if all_features.shape[0] < 16:
- all_features = F.pad(all_features, (0, 16 - all_features.shape[0]))
- else:
- all_features = all_features[:16]
- return self.projection(all_features)
- def _calculate_rhythm_score(self, intervals: List[float]) -> float:
- """Calculate how rhythmic the conversation is"""
- if len(intervals) < 2:
- return 0.0
- intervals_tensor = torch.tensor(intervals)
- # Coefficient of variation
- cv = intervals_tensor.std() / (intervals_tensor.mean() + 1e-6)
- # Lower CV means more rhythmic
- return 1.0 / (1.0 + cv.item())
- def _calculate_burstiness(self, intervals: List[float]) -> float:
- """Calculate burstiness of conversation"""
- if len(intervals) < 2:
- return 0.0
- intervals_tensor = torch.tensor(intervals)
- mean_interval = intervals_tensor.mean()
- std_interval = intervals_tensor.std()
- # Burstiness parameter
- if mean_interval > 0:
- burstiness = (std_interval - mean_interval) / (std_interval + mean_interval + 1e-6)
- return burstiness.item()
- return 0.0
- class RelationalEmbeddingEncoder(nn.Module):
- """Encode quality of interaction and relationship"""
- def __init__(self, input_dim: int = 768, output_dim: int = 128):
- super().__init__()
- self.attention = nn.MultiheadAttention(input_dim, num_heads=8)
- self.projection = nn.Sequential(
- nn.Linear(input_dim, 256),
- nn.ReLU(),
- nn.Dropout(0.2),
- nn.Linear(256, output_dim)
- )
- def encode_interaction(self, exchanges: List[ConversationExchange]) -> torch.Tensor:
- """Encode relational quality of exchanges"""
- if not exchanges:
- return torch.zeros(128, device=next(self.parameters()).device,
- dtype=next(self.parameters()).dtype)
- # Stack embeddings - ensure correct dtype
- embeddings = torch.stack([ex.embedding for ex in exchanges])
- # Ensure embeddings have correct dtype
- target_dtype = next(self.parameters()).dtype
- if embeddings.dtype != target_dtype:
- embeddings = embeddings.to(target_dtype)
- # Self-attention to capture interaction patterns
- attn_out, attn_weights = self.attention(
- embeddings.unsqueeze(1),
- embeddings.unsqueeze(1),
- embeddings.unsqueeze(1)
- )
- # Average pool
- pooled = attn_out.squeeze(1).mean(dim=0)
- # Project
- return self.projection(pooled)
- class PhenomenologicalHasher(nn.Module):
- """Generate experiential fingerprint of conversation"""
- def __init__(self, input_dims: Dict[str, int] = None, hash_dim: int = 256):
- super().__init__()
- self.hash_dim = hash_dim
- input_dims = input_dims or {
- 'understanding': 1,
- 'resonance': 1,
- 'breakthrough': 1,
- 'emotional_range': 1,
- 'connection_quality': 1
- }
- total_dim = sum(input_dims.values())
- self.hasher = nn.Sequential(
- nn.Linear(total_dim, 512),
- nn.ReLU(),
- nn.Dropout(0.3),
- nn.Linear(512, 256),
- nn.ReLU(),
- nn.Dropout(0.3),
- nn.Linear(256, hash_dim),
- nn.Tanh() # Output in [-1, 1]
- )
- def generate_hash(self, episode: ConversationEpisode) -> torch.Tensor:
- """Generate phenomenological hash"""
- device = next(self.parameters()).device
- dtype = next(self.parameters()).dtype
- # Extract experiential features
- features = []
- # Understanding depth
- avg_understanding = np.mean([
- ex.understanding_depth for ex in episode.exchanges
- ]) if episode.exchanges else 0.0
- features.append(avg_understanding)
- # Resonance score
- max_resonance = max([
- ex.resonance_score for ex in episode.exchanges
- ], default=0.0)
- features.append(max_resonance)
- # Breakthrough moments
- breakthrough_count = sum(
- 1 for ex in episode.exchanges if ex.breakthrough_moment
- )
- features.append(breakthrough_count / max(len(episode.exchanges), 1))
- # Emotional range
- if episode.emotional_arc:
- emotional_range = max(episode.emotional_arc) - min(episode.emotional_arc)
- else:
- emotional_range = 0.0
- features.append(emotional_range)
- # Connection quality
- avg_connection = episode.experiential_signature.get('connection_quality', 0.0) if episode.experiential_signature else 0.0
- features.append(avg_connection)
- # Convert to tensor with correct device and dtype
- feature_tensor = torch.tensor(features, device=device, dtype=dtype)
- return self.hasher(feature_tensor)
- class MatryoshkaProjector(nn.Module):
- """Generate nested representations at multiple scales"""
- def __init__(self, input_dim: int, output_dims: List[int]):
- super().__init__()
- self.output_dims = sorted(output_dims) # Ensure ascending order
- self.projectors = nn.ModuleList()
- prev_dim = input_dim
- for dim in self.output_dims:
- if dim <= prev_dim:
- self.projectors.append(
- nn.Linear(prev_dim, dim)
- )
- else:
- # Upsampling if needed
- self.projectors.append(
- nn.Sequential(
- nn.Linear(prev_dim, dim * 2),
- nn.ReLU(),
- nn.Linear(dim * 2, dim)
- )
- )
- prev_dim = dim
- def forward(self, x: torch.Tensor) -> Dict[int, torch.Tensor]:
- """Generate representations at all scales"""
- representations = {}
- current = x
- for dim, projector in zip(self.output_dims, self.projectors):
- current = projector(current)
- representations[dim] = F.normalize(current, p=2, dim=-1)
- return representations
- # ==================== SLEEP-INSPIRED CONSOLIDATION ====================
- class SleepConsolidationEngine:
- """Implements sleep-inspired memory consolidation"""
- def __init__(self, memory_system: Any):
- self.memory_system = memory_system
- self.consolidation_cycles = 0
- self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
- # Sleep stage simulators
- self.nrem_consolidator = NREMConsolidator().to(self.device)
- self.rem_consolidator = REMConsolidator().to(self.device)
- # Replay buffer
- self.replay_buffer = deque(maxlen=10000)
- # Consolidation metrics
- self.metrics = defaultdict(list)
- async def run_consolidation_cycle(self, duration_minutes: int = 90):
- """Run a complete sleep cycle for memory consolidation"""
- logger.info(f"Starting consolidation cycle {self.consolidation_cycles + 1}")
- # Sleep cycle stages (simplified)
- stages = [
- ('NREM1', 0.05), # 5% - Light sleep
- ('NREM2', 0.45), # 45% - Sleep spindles
- ('NREM3', 0.20), # 20% - Deep sleep / SWS
- ('REM', 0.20), # 20% - REM sleep
- ('NREM2', 0.10) # 10% - Return to light sleep
- ]
- for stage, proportion in stages:
- stage_duration = duration_minutes * proportion
- if stage.startswith('NREM'):
- await self._nrem_consolidation(stage, stage_duration)
- else:
- await self._rem_consolidation(stage_duration)
- # Brief transition period
- await asyncio.sleep(0.1)
- self.consolidation_cycles += 1
- # Log consolidation metrics
- self._log_consolidation_metrics()
- async def _nrem_consolidation(self, stage: str, duration: float):
- """Non-REM consolidation focusing on memory strengthening"""
- # Get recent memories for consolidation
- recent_memories = await self._get_recent_memories()
- if not recent_memories:
- return
- # NREM3 (SWS) - System consolidation
- if stage == 'NREM3':
- # Replay important memories
- important_memories = [m for m in recent_memories if m.importance > 0.7]
- for memory in important_memories:
- # Strengthen memory traces
- strengthened = await self.nrem_consolidator.strengthen_memory(memory)
- # Update memory importance
- memory.importance = min(1.0, memory.importance * 1.1)
- # Add to replay buffer
- self.replay_buffer.append({
- 'memory_id': memory.id,
- 'strengthened_embedding': strengthened,
- 'timestamp': datetime.utcnow()
- })
- # NREM2 - Sleep spindles for memory integration
- elif stage == 'NREM2':
- # Detect and strengthen associations
- associations = await self._detect_associations(recent_memories)
- for mem1, mem2, strength in associations:
- # Strengthen bidirectional connections
- if strength > 0.7:
- mem1.semantic_neighbors.append((mem2.id, strength))
- mem2.semantic_neighbors.append((mem1.id, strength))
- # Neural binding through spindle-like activity
- await self.nrem_consolidator.bind_memories(mem1, mem2)
- async def _rem_consolidation(self, duration: float):
- """REM consolidation for creative connections and emotional processing"""
- # Get memories with high emotional content
- emotional_memories = await self._get_emotional_memories()
- if not emotional_memories:
- return
- # REM-specific processing
- for memory in emotional_memories:
- # Creative recombination
- novel_connections = await self.rem_consolidator.generate_novel_connections(
- memory, self.replay_buffer
- )
- # Update memory with new insights
- for connection in novel_connections:
- memory.narrative_connections.append(connection)
- # Emotional regulation
- regulated = await self.rem_consolidator.regulate_emotion(memory)
- memory.emotional_valence = regulated
- async def _get_recent_memories(self, hours: int = 24) -> List[AgenticMemory]:
- """Get memories from recent time window"""
- cutoff = datetime.utcnow() - timedelta(hours=hours)
- # Query from database
- conn = sqlite3.connect(str(self.memory_system.db_path))
- cursor = conn.execute("""
- SELECT * FROM memories
- WHERE created_at > ?
- ORDER BY importance DESC
- LIMIT 100
- """, (cutoff.timestamp(),))
- memories = []
- # Convert to memory objects (simplified)
- # In production, properly deserialize
- conn.close()
- return memories
- async def _get_emotional_memories(self) -> List[AgenticMemory]:
- """Get memories with strong emotional content"""
- conn = sqlite3.connect(str(self.memory_system.db_path))
- cursor = conn.execute("""
- SELECT m.*, e.emotional_arc
- FROM memories m
- JOIN episodes e ON m.episode_id = e.id
- WHERE json_array_length(e.emotional_arc) > 0
- ORDER BY m.created_at DESC
- LIMIT 50
- """)
- memories = []
- # Process emotional memories
- conn.close()
- return memories
- async def _detect_associations(self, memories: List[AgenticMemory]) -> List[Tuple]:
- """Detect associations between memories"""
- associations = []
- for i, mem1 in enumerate(memories):
- for mem2 in memories[i+1:]:
- # Calculate association strength
- if mem1.neural_embedding is not None and mem2.neural_embedding is not None:
- similarity = F.cosine_similarity(
- mem1.neural_embedding.unsqueeze(0),
- mem2.neural_embedding.unsqueeze(0)
- ).item()
- # Temporal proximity bonus
- time_diff = abs((mem1.created_at - mem2.created_at).total_seconds())
- temporal_bonus = 1.0 / (1.0 + time_diff / 3600) # Decay over hours
- # Topic overlap bonus
- topic_overlap = len(set(mem1.episode.topics) & set(mem2.episode.topics))
- topic_bonus = topic_overlap * 0.1
- # Combined association strength
- strength = similarity * 0.6 + temporal_bonus * 0.3 + topic_bonus * 0.1
- if strength > 0.5:
- associations.append((mem1, mem2, strength))
- return associations
- def _log_consolidation_metrics(self):
- """Log metrics from consolidation cycle"""
- logger.info(f"Consolidation cycle {self.consolidation_cycles} complete")
- logger.info(f"Replay buffer size: {len(self.replay_buffer)}")
- # Additional metrics logging
- class NREMConsolidator(nn.Module):
- """NREM sleep consolidation mechanisms"""
- def __init__(self, embedding_dim: int = 768):
- super().__init__()
- self.embedding_dim = embedding_dim
- # Slow wave generator
- self.slow_wave = nn.Sequential(
- nn.Linear(embedding_dim, 512),
- nn.Tanh(),
- nn.Linear(512, embedding_dim)
- )
- # Spindle generator
- self.spindle_generator = nn.Sequential(
- nn.Linear(embedding_dim * 2, 512),
- nn.ReLU(),
- nn.Linear(512, 256),
- nn.ReLU(),
- nn.Linear(256, embedding_dim)
- )
- async def strengthen_memory(self, memory: AgenticMemory) -> torch.Tensor:
- """Strengthen memory through slow-wave activity"""
- if memory.neural_embedding is None:
- return torch.zeros(self.embedding_dim)
- # Apply slow-wave transformation
- strengthened = self.slow_wave(memory.neural_embedding)
- # Add residual connection
- return memory.neural_embedding + 0.1 * strengthened
- async def bind_memories(self, memory1: AgenticMemory, memory2: AgenticMemory):
- """Bind memories through spindle-like activity"""
- if memory1.neural_embedding is None or memory2.neural_embedding is None:
- return
- # Concatenate embeddings
- combined = torch.cat([memory1.neural_embedding, memory2.neural_embedding])
- # Generate binding representation
- binding = self.spindle_generator(combined)
- # Update memories with binding information
- # In practice, store this binding in the memory system
- class REMConsolidator(nn.Module):
- """REM sleep consolidation for creative connections"""
- def __init__(self, embedding_dim: int = 768):
- super().__init__()
- self.embedding_dim = embedding_dim
- # Dream-like recombination network
- self.recombiner = nn.Sequential(
- nn.Linear(embedding_dim * 2, 1024),
- nn.ReLU(),
- nn.Dropout(0.5), # High dropout for "dream-like" recombination
- nn.Linear(1024, 512),
- nn.ReLU(),
- nn.Dropout(0.5),
- nn.Linear(512, embedding_dim)
- )
- # Emotion regulation network
- self.emotion_regulator = nn.Sequential(
- nn.Linear(embedding_dim + 1, 256), # +1 for emotional valence
- nn.Tanh(),
- nn.Linear(256, 128),
- nn.Tanh(),
- nn.Linear(128, 1),
- nn.Tanh()
- )
- async def generate_novel_connections(self, memory: AgenticMemory,
- replay_buffer: deque) -> List[Tuple[str, str]]:
- """Generate novel connections through dream-like recombination"""
- connections = []
- if not replay_buffer or memory.neural_embedding is None:
- return connections
- # Sample random memories from replay buffer
- sample_size = min(10, len(replay_buffer))
- samples = random.sample(list(replay_buffer), sample_size)
- for sample in samples:
- if 'strengthened_embedding' in sample:
- # Recombine embeddings
- combined = torch.cat([
- memory.neural_embedding,
- sample['strengthened_embedding']
- ])
- # Generate novel representation
- novel = self.recombiner(combined)
- # Calculate novelty score
- novelty = 1 - F.cosine_similarity(
- novel.unsqueeze(0),
- memory.neural_embedding.unsqueeze(0)
- ).item()
- if novelty > 0.3: # Sufficient novelty
- connections.append((
- sample['memory_id'],
- f'novel_connection_{novelty:.2f}'
- ))
- return connections
- async def regulate_emotion(self, memory: AgenticMemory) -> float:
- """Regulate emotional content of memory"""
- if memory.neural_embedding is None:
- return 0.0
- # Get current emotional valence
- current_emotion = torch.tensor([memory.emotional_valence])
- # Combine with embedding
- combined = torch.cat([memory.neural_embedding, current_emotion])
- # Regulate emotion
- regulated = self.emotion_regulator(combined)
- return float(regulated.item())
- # ==================== ENHANCED CONVERSATION FLOW ====================
- class AdvancedConversationFlowAnalyzer:
- """Implements research-based conversation flow analysis"""
- def __init__(self, embedder: Any):
- self.embedder = embedder
- self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
- self.use_half = torch.cuda.is_available()
- # Temporal graph network
- self.temporal_gnn = TemporalGraphNetwork(
- node_dim=768,
- edge_dim=64,
- hidden_dim=256
- ).to(self.device)
- # CODYMs implementation
- self.codym_analyzer = CODYMAnalyzer()
- # Dialogue state tracker
- self.dst = NonAutoregressiveDST(
- embedding_dim=768,
- num_slots=20
- ).to(self.device)
- # Turn-taking analyzer
- self.turn_analyzer = TurnTakingAnalyzer()
- # Convert to half precision if using GPU
- if self.use_half:
- self.temporal_gnn.half()
- self.dst.half()
- async def analyze_conversation_flow(self, exchanges: List[ConversationExchange]) -> Dict[str, Any]:
- """Comprehensive conversation flow analysis"""
- if not exchanges:
- return self._empty_analysis()
- # 1. Build temporal graph
- graph_features = await self._build_temporal_graph(exchanges)
- # 2. CODYM analysis
- codym_features = self.codym_analyzer.analyze_dynamics(exchanges)
- # 3. Dialogue state tracking
- dialogue_states = await self._track_dialogue_states(exchanges)
- # 4. Turn-taking patterns
- turn_patterns = self.turn_analyzer.analyze_patterns(exchanges)
- # 5. Information flow analysis
- info_flow = await self._analyze_information_flow(exchanges)
- return {
- 'graph_features': graph_features,
- 'dynamics': codym_features,
- 'dialogue_states': dialogue_states,
- 'turn_patterns': turn_patterns,
- 'information_flow': info_flow,
- 'flow_quality': self._compute_flow_quality(
- graph_features, codym_features, turn_patterns
- )
- }
- # Add these methods to the AdvancedConversationFlowAnalyzer class:
- async def _track_dialogue_states(self, exchanges: List[ConversationExchange]) -> List[Dict[str, float]]:
- """Track dialogue states through the conversation"""
- if not exchanges:
- return []
- # Prepare embeddings for DST
- embeddings = torch.stack([ex.embedding for ex in exchanges])
- # Ensure correct dtype
- if hasattr(self, 'use_half') and self.use_half:
- embeddings = embeddings.half()
- # Get dialogue states
- with torch.no_grad():
- states = self.dst(embeddings)
- # Convert to list of dicts
- dialogue_states = []
- for state in states:
- state_dict = {}
- for i in range(state.shape[0]):
- slot_probs = state[i].cpu().numpy()
- state_dict[f'slot_{i}'] = {
- 'empty': float(slot_probs[0]),
- 'filled': float(slot_probs[1]),
- 'modified': float(slot_probs[2])
- }
- dialogue_states.append(state_dict)
- return dialogue_states
- async def _analyze_information_flow(self, exchanges: List[ConversationExchange]) -> Dict[str, Any]:
- """Analyze how information flows through the conversation"""
- if not exchanges:
- return {
- 'information_density': 0.0,
- 'topic_coherence': 1.0,
- 'progressive_depth': 0.0,
- 'redundancy': 0.0
- }
- # Information density (tokens per exchange)
- densities = [ex.token_count for ex in exchanges]
- avg_density = sum(densities) / len(densities) if densities else 0
- # Topic coherence (similarity between adjacent exchanges)
- coherences = []
- for i in range(1, len(exchanges)):
- if exchanges[i].embedding is not None and exchanges[i-1].embedding is not None:
- sim = F.cosine_similarity(
- exchanges[i].embedding.unsqueeze(0),
- exchanges[i-1].embedding.unsqueeze(0)
- )
- coherences.append(float(sim.item()))
- avg_coherence = sum(coherences) / len(coherences) if coherences else 1.0
- # Progressive depth (increasing complexity)
- depth_progression = 0.0
- if len(exchanges) > 2:
- early_density = sum(densities[:len(densities)//2]) / (len(densities)//2)
- late_density = sum(densities[len(densities)//2:]) / (len(densities) - len(densities)//2)
- depth_progression = (late_density - early_density) / (early_density + 1e-6)
- # Redundancy detection
- redundancy = 0.0
- if len(exchanges) > 3:
- # Check for repeated patterns
- for i in range(2, len(exchanges)):
- for j in range(i-2):
- if exchanges[i].embedding is not None and exchanges[j].embedding is not None:
- sim = F.cosine_similarity(
- exchanges[i].embedding.unsqueeze(0),
- exchanges[j].embedding.unsqueeze(0)
- )
- if sim > 0.9: # High similarity indicates redundancy
- redundancy += 1
- redundancy = redundancy / (len(exchanges) - 2)
- return {
- 'information_density': avg_density,
- 'topic_coherence': avg_coherence,
- 'progressive_depth': depth_progression,
- 'redundancy': min(redundancy, 1.0),
- 'exchange_count': len(exchanges),
- 'total_tokens': sum(densities)
- }
- def _compute_flow_quality(self, graph_features: Dict[str, Any],
- codym_features: Dict[str, Any],
- turn_patterns: Dict[str, Any]) -> float:
- """Compute overall conversation flow quality"""
- # Extract relevant metrics
- graph_score = 0.5 # Default if no graph features
- if graph_features.get('node_embeddings') is not None and len(graph_features['node_embeddings']) > 0:
- # Use graph embedding norm as a proxy for richness
- if graph_features.get('graph_embedding') is not None:
- # Handle both tensor and list formats
- graph_emb = graph_features['graph_embedding']
- if isinstance(graph_emb, list) and len(graph_emb) > 0:
- # Convert back to tensor if it's a list
- graph_tensor = torch.tensor(graph_emb)
- graph_score = torch.sigmoid(graph_tensor.norm() / 10).item()
- elif torch.is_tensor(graph_emb):
- graph_score = torch.sigmoid(graph_emb.norm() / 10).item()
- else:
- graph_score = 0.5
- # CODYM scores
- synchrony = codym_features.get('synchrony', 0.5)
- convergence = codym_features.get('convergence', 0.5)
- balance = codym_features.get('turn_balance', 0.5)
- # Turn-taking quality
- smoothness = turn_patterns.get('transition_smoothness', 0.5)
- reciprocity = turn_patterns.get('reciprocity', 0.5)
- rhythm = turn_patterns.get('rhythm', {}).get('regularity', 0.5)
- # Weighted combination
- flow_quality = (
- graph_score * 0.2 +
- synchrony * 0.15 +
- convergence * 0.15 +
- balance * 0.1 +
- smoothness * 0.15 +
- reciprocity * 0.15 +
- rhythm * 0.1
- )
- return float(flow_quality)
- def _empty_analysis(self) -> Dict[str, Any]:
- """Return empty analysis structure"""
- return {
- 'graph_features': {
- 'node_embeddings': torch.empty(0, 768),
- 'graph_embedding': torch.zeros(768),
- 'attention_weights': None
- },
- 'dynamics': {},
- 'dialogue_states': [],
- 'turn_patterns': {},
- 'information_flow': {
- 'information_density': 0.0,
- 'topic_coherence': 1.0,
- 'progressive_depth': 0.0,
- 'redundancy': 0.0
- },
- 'flow_quality': 0.0
- }
- async def _build_temporal_graph(self, exchanges: List[ConversationExchange]) -> Dict[str, Any]:
- """Build temporal graph representation"""
- if not exchanges:
- return {
- 'node_embeddings': torch.empty(0, 768),
- 'graph_embedding': torch.zeros(768),
- 'attention_weights': None
- }
- # Create nodes for each exchange
- nodes = []
- edges = []
- for i, exchange in enumerate(exchanges):
- # Node features
- node_features = exchange.embedding
- nodes.append(node_features)
- # Temporal edges to previous exchanges
- for j in range(max(0, i-3), i): # Connect to last 3 exchanges
- time_diff = (exchange.timestamp - exchanges[j].timestamp).total_seconds()
- # Edge features - ensure correct dtype
- edge_features = torch.tensor([
- 1.0 / (1.0 + time_diff), # Temporal proximity
- exchanges[j].resonance_score, # Previous resonance
- float(exchanges[j].breakthrough_moment), # Breakthrough indicator
- exchanges[j].emotional_valence # Emotional context
- ])
- # Ensure correct dtype if using half precision
- if hasattr(self, 'device'):
- edge_features = edge_features.to(self.device)
- if hasattr(self, 'use_half') and self.use_half:
- edge_features = edge_features.half()
- edges.append((j, i, edge_features))
- # Process through temporal GNN
- node_tensor = torch.stack(nodes)
- # Handle empty edges case
- if edges:
- edge_index = torch.tensor([[e[0], e[1]] for e in edges]).T
- edge_attr = torch.stack([e[2] for e in edges])
- else:
- # Create empty tensors with correct shape
- edge_index = torch.empty((2, 0), dtype=torch.long)
- edge_attr = torch.empty((0, 4))
- # Ensure correct device and dtype
- if hasattr(self, 'device'):
- edge_index = edge_index.to(self.device)
- edge_attr = edge_attr.to(self.device)
- if hasattr(self, 'use_half') and self.use_half:
- edge_attr = edge_attr.half()
- graph_output = self.temporal_gnn(node_tensor, edge_index, edge_attr)
- return {
- 'node_embeddings': graph_output['nodes'].cpu().tolist() if 'nodes' in graph_output and torch.is_tensor(graph_output['nodes']) else [],
- 'graph_embedding': graph_output['graph'].cpu().tolist() if 'graph' in graph_output and torch.is_tensor(graph_output['graph']) else [],
- 'attention_weights': graph_output.get('attention', None).cpu().tolist() if graph_output.get('attention') is not None and torch.is_tensor(graph_output.get('attention')) else None
- }
- class TemporalGraphNetwork(nn.Module):
- """Temporal GNN for conversation understanding"""
- def __init__(self, node_dim: int, edge_dim: int, hidden_dim: int):
- super().__init__()
- # Fix: Use actual edge dimension (4) instead of the passed parameter
- actual_edge_dim = 4
- # Message passing layers that preserve node dimension
- self.conv1 = TemporalGraphConv(node_dim, node_dim, actual_edge_dim)
- self.conv2 = TemporalGraphConv(node_dim, node_dim, actual_edge_dim)
- self.conv3 = TemporalGraphConv(node_dim, node_dim, actual_edge_dim)
- # Graph-level pooling
- self.graph_pool = nn.Sequential(
- nn.Linear(node_dim, hidden_dim),
- nn.ReLU(),
- nn.Linear(hidden_dim, node_dim)
- )
- # Attention mechanism
- self.attention = nn.MultiheadAttention(node_dim, num_heads=8)
- # Dropout for regularization
- self.dropout = nn.Dropout(0.2)
- def forward(self, x: torch.Tensor, edge_index: torch.Tensor,
- edge_attr: torch.Tensor) -> Dict[str, torch.Tensor]:
- """Process temporal graph"""
- # Store original input for residual connections
- identity = x
- # Message passing with residual connections
- h = F.relu(self.conv1(x, edge_index, edge_attr))
- h = self.dropout(h)
- h = h + identity # Residual connection (both are node_dim)
- identity = h
- h = F.relu(self.conv2(h, edge_index, edge_attr))
- h = self.dropout(h)
- h = h + identity # Residual connection
- identity = h
- h = self.conv3(h, edge_index, edge_attr)
- h = h + identity # Final residual connection
- # Attention over nodes
- h_unsqueezed = h.unsqueeze(0) # Add batch dimension for attention
- attn_out, attn_weights = self.attention(h_unsqueezed, h_unsqueezed, h_unsqueezed)
- h = attn_out.squeeze(0) # Remove batch dimension
- # Graph-level representation
- graph_repr = self.graph_pool(h.mean(dim=0))
- return {
- 'nodes': h,
- 'graph': graph_repr,
- 'attention': attn_weights
- }
- class TemporalGraphConv(nn.Module):
- """Temporal graph convolution layer that preserves dimensions"""
- def __init__(self, in_dim: int, out_dim: int, edge_dim: int):
- super().__init__()
- self.in_dim = in_dim
- self.out_dim = out_dim
- # Message network
- self.message_net = nn.Sequential(
- nn.Linear(in_dim * 2 + edge_dim, in_dim), # Keep same dimension
- nn.ReLU(),
- nn.Linear(in_dim, out_dim)
- )
- def forward(self, x: torch.Tensor, edge_index: torch.Tensor,
- edge_attr: torch.Tensor) -> torch.Tensor:
- """Perform message passing"""
- # Handle empty edge case
- if edge_index.shape[1] == 0:
- # No edges, just return input if dimensions match, else project
- if self.in_dim == self.out_dim:
- return x
- else:
- # Need to project to output dimension
- if not hasattr(self, 'projection'):
- self.projection = nn.Linear(self.in_dim, self.out_dim).to(x.device)
- if x.dtype == torch.float16:
- self.projection.half()
- return self.projection(x)
- row, col = edge_index
- # Initialize output with correct shape
- out = torch.zeros(x.shape[0], self.out_dim, device=x.device, dtype=x.dtype)
- # Message computation
- for i in range(edge_index.shape[1]):
- src, dst = row[i], col[i]
- # Concatenate source, destination, and edge features
- msg_input = torch.cat([x[src], x[dst], edge_attr[i]])
- message = self.message_net(msg_input)
- out[dst] += message
- return out
- class CODYMAnalyzer:
- """Conversational Dynamics Model analyzer"""
- def __init__(self):
- self.transition_matrix = None
- self.state_history = []
- def analyze_dynamics(self, exchanges: List[ConversationExchange]) -> Dict[str, Any]:
- """Analyze conversation dynamics using Markov models"""
- if len(exchanges) < 2:
- return {}
- # Extract turn lengths
- turn_lengths = []
- for exchange in exchanges:
- user_len = len(exchange.user_message.split())
- assistant_len = len(exchange.assistant_response.split())
- turn_lengths.extend([user_len, assistant_len])
- # Discretize turn lengths into states
- states = self._discretize_lengths(turn_lengths)
- # Build transition matrix
- self.transition_matrix = self._build_transition_matrix(states)
- # Analyze patterns
- patterns = {
- 'dominant_speaker': self._identify_dominant_speaker(turn_lengths),
- 'turn_balance': self._calculate_turn_balance(turn_lengths),
- 'predictability': self._calculate_predictability(self.transition_matrix),
- 'convergence': self._analyze_convergence(turn_lengths),
- 'synchrony': self._measure_synchrony(exchanges)
- }
- return patterns
- def _discretize_lengths(self, lengths: List[int]) -> List[int]:
- """Discretize turn lengths into states"""
- # Simple quantization: short (0), medium (1), long (2)
- states = []
- for length in lengths:
- if length < 10:
- states.append(0)
- elif length < 50:
- states.append(1)
- else:
- states.append(2)
- return states
- def _build_transition_matrix(self, states: List[int]) -> np.ndarray:
- """Build Markov transition matrix"""
- n_states = 3 # short, medium, long
- matrix = np.zeros((n_states, n_states))
- for i in range(len(states) - 1):
- matrix[states[i], states[i+1]] += 1
- # Normalize rows
- row_sums = matrix.sum(axis=1, keepdims=True)
- matrix = np.divide(matrix, row_sums, where=row_sums != 0)
- return matrix
- def _identify_dominant_speaker(self, turn_lengths: List[int]) -> str:
- """Identify who tends to speak more"""
- user_lengths = turn_lengths[::2]
- assistant_lengths = turn_lengths[1::2]
- avg_user = np.mean(user_lengths) if user_lengths else 0
- avg_assistant = np.mean(assistant_lengths) if assistant_lengths else 0
- if avg_user > avg_assistant * 1.2:
- return "user"
- elif avg_assistant > avg_user * 1.2:
- return "assistant"
- else:
- return "balanced"
- def _calculate_turn_balance(self, turn_lengths: List[int]) -> float:
- """Calculate balance between speakers"""
- user_lengths = turn_lengths[::2]
- assistant_lengths = turn_lengths[1::2]
- if not user_lengths or not assistant_lengths:
- return 0.5
- total_user = sum(user_lengths)
- total_assistant = sum(assistant_lengths)
- total = total_user + total_assistant
- if total == 0:
- return 0.5
- balance = min(total_user, total_assistant) / max(total_user, total_assistant)
- return balance
- def _calculate_predictability(self, matrix: np.ndarray) -> float:
- """Calculate conversation predictability from transition matrix"""
- if matrix.size == 0:
- return 0.0
- # Entropy of transition matrix
- entropy = 0.0
- for row in matrix:
- for p in row:
- if p > 0:
- entropy -= p * np.log2(p)
- # Normalize by maximum possible entropy
- max_entropy = np.log2(matrix.shape[1])
- predictability = 1.0 - (entropy / max_entropy) if max_entropy > 0 else 1.0
- return predictability
- def _analyze_convergence(self, turn_lengths: List[int]) -> float:
- """Analyze if speakers converge in their turn lengths"""
- if len(turn_lengths) < 4:
- return 0.0
- user_lengths = turn_lengths[::2]
- assistant_lengths = turn_lengths[1::2]
- # Calculate moving average difference
- window = 3
- convergence_scores = []
- for i in range(window, min(len(user_lengths), len(assistant_lengths))):
- user_avg = np.mean(user_lengths[i-window:i])
- assistant_avg = np.mean(assistant_lengths[i-window:i])
- # Normalized difference
- diff = abs(user_avg - assistant_avg) / (user_avg + assistant_avg + 1e-6)
- convergence_scores.append(1.0 - diff)
- return np.mean(convergence_scores) if convergence_scores else 0.0
- def _measure_synchrony(self, exchanges: List[ConversationExchange]) -> float:
- """Measure conversational synchrony"""
- if len(exchanges) < 2:
- return 0.0
- # Response time synchrony
- response_times = []
- for i in range(1, len(exchanges)):
- time_diff = (exchanges[i].timestamp - exchanges[i-1].timestamp).total_seconds()
- response_times.append(time_diff)
- if not response_times:
- return 0.0
- # Low variance in response times indicates synchrony
- variance = np.var(response_times)
- mean_time = np.mean(response_times)
- # Coefficient of variation (lower is more synchronous)
- cv = np.sqrt(variance) / (mean_time + 1e-6)
- synchrony = 1.0 / (1.0 + cv)
- return synchrony
- class NonAutoregressiveDST(nn.Module):
- """Non-autoregressive dialogue state tracking"""
- def __init__(self, embedding_dim: int, num_slots: int):
- super().__init__()
- self.num_slots = num_slots
- # Slot encoders
- self.slot_encoders = nn.ModuleList([
- nn.Linear(embedding_dim, 128) for _ in range(num_slots)
- ])
- # State predictor
- self.state_predictor = nn.Sequential(
- nn.Linear(128 * num_slots, 512),
- nn.ReLU(),
- nn.Dropout(0.2),
- nn.Linear(512, 256),
- nn.ReLU(),
- nn.Dropout(0.2),
- nn.Linear(256, num_slots * 3) # 3 states per slot
- )
- def forward(self, exchange_embeddings: torch.Tensor) -> torch.Tensor:
- """Predict dialogue states"""
- # Encode each slot
- slot_representations = []
- for i, encoder in enumerate(self.slot_encoders):
- slot_repr = encoder(exchange_embeddings)
- slot_representations.append(slot_repr)
- # Concatenate slot representations
- combined = torch.cat(slot_representations, dim=-1)
- # Predict states
- states = self.state_predictor(combined)
- states = states.view(-1, self.num_slots, 3) # Reshape to (batch, slots, states)
- return F.softmax(states, dim=-1)
- class TurnTakingAnalyzer:
- """Analyze turn-taking patterns in conversation"""
- def analyze_patterns(self, exchanges: List[ConversationExchange]) -> Dict[str, Any]:
- """Analyze turn-taking patterns"""
- if not exchanges:
- return {}
- patterns = {
- 'interruptions': self._detect_interruptions(exchanges),
- 'floor_holding': self._analyze_floor_holding(exchanges),
- 'transition_smoothness': self._measure_transition_smoothness(exchanges),
- 'reciprocity': self._calculate_reciprocity(exchanges),
- 'rhythm': self._analyze_rhythm(exchanges)
- }
- return patterns
- def _detect_interruptions(self, exchanges: List[ConversationExchange]) -> List[int]:
- """Detect potential interruptions based on timing"""
- interruptions = []
- for i in range(1, len(exchanges)):
- time_gap = (exchanges[i].timestamp - exchanges[i-1].timestamp).total_seconds()
- # Very short gaps might indicate interruption
- if time_gap < 2.0: # Less than 2 seconds
- interruptions.append(i)
- return interruptions
- def _analyze_floor_holding(self, exchanges: List[ConversationExchange]) -> Dict[str, float]:
- """Analyze how long each speaker holds the floor"""
- user_times = []
- assistant_times = []
- for exchange in exchanges:
- # Estimate speaking time from message length
- user_time = len(exchange.user_message.split()) * 0.15 # ~150ms per word
- assistant_time = len(exchange.assistant_response.split()) * 0.15
- user_times.append(user_time)
- assistant_times.append(assistant_time)
- return {
- 'user_avg': np.mean(user_times) if user_times else 0,
- 'assistant_avg': np.mean(assistant_times) if assistant_times else 0,
- 'user_total': sum(user_times),
- 'assistant_total': sum(assistant_times)
- }
- def _measure_transition_smoothness(self, exchanges: List[ConversationExchange]) -> float:
- """Measure how smooth turn transitions are"""
- if len(exchanges) < 2:
- return 1.0
- gaps = []
- for i in range(1, len(exchanges)):
- gap = (exchanges[i].timestamp - exchanges[i-1].timestamp).total_seconds()
- gaps.append(gap)
- # Ideal gap is 1-3 seconds
- ideal_gaps = [1.0 if 1 <= gap <= 3 else 0.0 for gap in gaps]
- smoothness = sum(ideal_gaps) / len(ideal_gaps) if ideal_gaps else 0.0
- return smoothness
- def _calculate_reciprocity(self, exchanges: List[ConversationExchange]) -> float:
- """Calculate conversational reciprocity"""
- if not exchanges:
- return 0.0
- reciprocity_scores = []
- for exchange in exchanges:
- # Length reciprocity
- user_len = len(exchange.user_message.split())
- assistant_len = len(exchange.assistant_response.split())
- if user_len > 0 and assistant_len > 0:
- ratio = min(user_len, assistant_len) / max(user_len, assistant_len)
- reciprocity_scores.append(ratio)
- # Emotional reciprocity
- if hasattr(exchange, 'emotional_valence'):
- # Assistant matching user's emotional tone
- reciprocity_scores.append(1.0 - abs(exchange.emotional_valence))
- return np.mean(reciprocity_scores) if reciprocity_scores else 0.0
- def _analyze_rhythm(self, exchanges: List[ConversationExchange]) -> Dict[str, float]:
- """Analyze conversational rhythm"""
- if len(exchanges) < 3:
- return {'regularity': 0.0, 'tempo': 0.0}
- # Time intervals between exchanges
- intervals = []
- for i in range(1, len(exchanges)):
- interval = (exchanges[i].timestamp - exchanges[i-1].timestamp).total_seconds()
- intervals.append(interval)
- # Rhythm regularity (low variance = regular rhythm)
- regularity = 1.0 / (1.0 + np.std(intervals) / (np.mean(intervals) + 1e-6))
- # Tempo (exchanges per minute)
- total_time = (exchanges[-1].timestamp - exchanges[0].timestamp).total_seconds()
- tempo = len(exchanges) / (total_time / 60) if total_time > 0 else 0
- return {
- 'regularity': regularity,
- 'tempo': tempo
- }
- # ==================== UNCERTAINTY & AMBIGUITY HANDLING ====================
- class UncertaintyAwareMemorySystem:
- """Handles uncertainty and ambiguity in memory operations"""
- def __init__(self, base_memory_system: Any):
- self.base_system = base_memory_system
- self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
- # Bayesian components
- self.bayesian_encoder = BayesianMemoryEncoder().to(self.device)
- self.uncertainty_quantifier = UncertaintyQuantifier().to(self.device)
- # Ambiguity resolver
- self.ambiguity_resolver = AmbiguityResolver(
- self.base_system.embedder
- ).to(self.device)
- # Calibration module
- self.calibrator = MemoryCalibrator().to(self.device)
- async def store_memory_with_uncertainty(self, memory: AgenticMemory,
- exchange_sequence: List[ConversationExchange]) -> Dict[str, Any]:
- """Store memory with uncertainty quantification"""
- # 1. Encode with uncertainty
- encoded = await self.bayesian_encoder.encode_with_uncertainty(
- memory.neural_embedding,
- exchange_sequence
- )
- # 2. Quantify different types of uncertainty
- uncertainties = self.uncertainty_quantifier.quantify(
- encoded['mean'],
- encoded['variance'],
- exchange_sequence
- )
- # 3. Detect and resolve ambiguities
- ambiguities = await self.ambiguity_resolver.detect_ambiguities(
- exchange_sequence
- )
- # 4. Calibrate confidence scores
- calibrated_confidence = self.calibrator.calibrate(
- memory.importance,
- uncertainties['total']
- )
- # Update memory with uncertainty information
- memory.metadata = memory.metadata or {}
- memory.metadata['uncertainty'] = {
- 'aleatoric': uncertainties['aleatoric'],
- 'epistemic': uncertainties['epistemic'],
- 'total': uncertainties['total'],
- 'ambiguities': ambiguities,
- 'calibrated_confidence': calibrated_confidence
- }
- # Store with base system
- result = await self.base_system.store_memory(memory)
- return {
- 'memory_id': result['memory_id'],
- 'uncertainty_metrics': memory.metadata['uncertainty'],
- 'requires_clarification': len(ambiguities) > 0
- }
- class BayesianMemoryEncoder(nn.Module):
- """Bayesian neural network for memory encoding with uncertainty"""
- def __init__(self, input_dim: int = 768, hidden_dim: int = 512, output_dim: int = 256):
- super().__init__()
- # Variational layers
- self.fc1_mean = nn.Linear(input_dim, hidden_dim)
- self.fc1_var = nn.Linear(input_dim, hidden_dim)
- self.fc2_mean = nn.Linear(hidden_dim, hidden_dim)
- self.fc2_var = nn.Linear(hidden_dim, hidden_dim)
- self.fc3_mean = nn.Linear(hidden_dim, output_dim)
- self.fc3_var = nn.Linear(hidden_dim, output_dim)
- # Prior parameters
- self.prior_mean = 0.0
- self.prior_var = 1.0
- def forward(self, x: torch.Tensor, sample: bool = True) -> Dict[str, torch.Tensor]:
- """Forward pass with uncertainty estimation"""
- # Layer 1
- mean1 = self.fc1_mean(x)
- var1 = F.softplus(self.fc1_var(x))
- h1 = self._sample_gaussian(mean1, var1) if sample else mean1
- h1 = F.relu(h1)
- # Layer 2
- mean2 = self.fc2_mean(h1)
- var2 = F.softplus(self.fc2_var(h1))
- h2 = self._sample_gaussian(mean2, var2) if sample else mean2
- h2 = F.relu(h2)
- # Output layer
- mean_out = self.fc3_mean(h2)
- var_out = F.softplus(self.fc3_var(h2))
- if sample:
- output = self._sample_gaussian(mean_out, var_out)
- else:
- output = mean_out
- return {
- 'output': output,
- 'mean': mean_out,
- 'variance': var_out,
- 'kl_divergence': self._compute_kl_divergence([
- (mean1, var1), (mean2, var2), (mean_out, var_out)
- ])
- }
- def _sample_gaussian(self, mean: torch.Tensor, var: torch.Tensor) -> torch.Tensor:
- """Sample from Gaussian distribution"""
- epsilon = torch.randn_like(mean)
- return mean + torch.sqrt(var) * epsilon
- def _compute_kl_divergence(self, params: List[Tuple[torch.Tensor, torch.Tensor]]) -> torch.Tensor:
- """Compute KL divergence from prior"""
- kl = 0
- for mean, var in params:
- kl += 0.5 * torch.sum(
- var / self.prior_var +
- (mean - self.prior_mean) ** 2 / self.prior_var -
- 1 - torch.log(var / self.prior_var)
- )
- return kl
- async def encode_with_uncertainty(self, embedding: torch.Tensor,
- exchanges: List[ConversationExchange]) -> Dict[str, torch.Tensor]:
- """Encode with multiple forward passes for uncertainty"""
- # Multiple forward passes
- n_samples = 10
- outputs = []
- for _ in range(n_samples):
- result = self.forward(embedding, sample=True)
- outputs.append(result['output'])
- # Compute statistics
- outputs_stacked = torch.stack(outputs)
- mean = outputs_stacked.mean(dim=0)
- variance = outputs_stacked.var(dim=0)
- return {
- 'mean': mean,
- 'variance': variance,
- 'samples': outputs_stacked
- }
- class UncertaintyQuantifier(nn.Module): # Add nn.Module inheritance
- """Quantify different types of uncertainty"""
- def __init__(self):
- super().__init__() # Add super().__init__()
- def quantify(self, mean: torch.Tensor, variance: torch.Tensor,
- exchanges: List[ConversationExchange]) -> Dict[str, float]:
- """Quantify aleatoric and epistemic uncertainty"""
- # Aleatoric uncertainty (data uncertainty)
- aleatoric = self._compute_aleatoric_uncertainty(exchanges)
- # Epistemic uncertainty (model uncertainty)
- epistemic = float(variance.mean().item())
- # Total uncertainty
- total = np.sqrt(aleatoric ** 2 + epistemic ** 2)
- return {
- 'aleatoric': aleatoric,
- 'epistemic': epistemic,
- 'total': total,
- 'confidence': 1.0 / (1.0 + total)
- }
- def _compute_aleatoric_uncertainty(self, exchanges: List[ConversationExchange]) -> float:
- """Compute data-inherent uncertainty"""
- if not exchanges:
- return 0.0
- uncertainties = []
- # Lexical ambiguity
- ambiguous_words = ['it', 'they', 'this', 'that', 'there', 'here']
- for exchange in exchanges:
- text = exchange.user_message.lower()
- ambiguity_count = sum(1 for word in ambiguous_words if word in text.split())
- uncertainties.append(ambiguity_count / max(len(text.split()), 1))
- # Emotional ambiguity
- emotional_variance = np.var([ex.emotional_valence for ex in exchanges])
- uncertainties.append(emotional_variance)
- # Temporal ambiguity
- temporal_words = ['soon', 'later', 'recently', 'eventually', 'sometimes']
- for exchange in exchanges:
- text = exchange.user_message.lower()
- temporal_count = sum(1 for word in temporal_words if word in text)
- uncertainties.append(temporal_count * 0.1)
- return float(np.mean(uncertainties))
- class AmbiguityResolver(nn.Module):
- """Detect and resolve ambiguities in conversation"""
- def __init__(self, embedder: Any):
- super().__init__()
- self.embedder = embedder
- # Ambiguity detection network
- self.detector = nn.Sequential(
- nn.Linear(768, 512),
- nn.ReLU(),
- nn.Dropout(0.2),
- nn.Linear(512, 256),
- nn.ReLU(),
- nn.Dropout(0.2),
- nn.Linear(256, 5) # 5 types of ambiguity
- )
- # Context resolver
- self.context_resolver = nn.TransformerEncoder(
- nn.TransformerEncoderLayer(
- d_model=768,
- nhead=8,
- dim_feedforward=2048,
- dropout=0.1
- ),
- num_layers=3
- )
- async def detect_ambiguities(self, exchanges: List[ConversationExchange]) -> List[Dict[str, Any]]:
- """Detect various types of ambiguity"""
- ambiguities = []
- for i, exchange in enumerate(exchanges):
- # Get embedding
- embedding = exchange.embedding
- # Detect ambiguity types
- with torch.no_grad():
- ambiguity_scores = torch.sigmoid(self.detector(embedding))
- # Check each ambiguity type
- ambiguity_types = ['lexical', 'syntactic', 'semantic', 'anaphoric', 'pragmatic']
- for j, (amb_type, score) in enumerate(zip(ambiguity_types, ambiguity_scores)):
- if score > 0.5: # Threshold for detection
- # Try to resolve using context
- context_window = exchanges[max(0, i-3):i+1]
- resolution = await self._resolve_ambiguity(
- exchange, context_window, amb_type
- )
- ambiguities.append({
- 'exchange_index': i,
- 'type': amb_type,
- 'score': float(score),
- 'text': exchange.user_message[:100],
- 'resolution': resolution
- })
- return ambiguities
- async def _resolve_ambiguity(self, exchange: ConversationExchange,
- context: List[ConversationExchange],
- ambiguity_type: str) -> Dict[str, Any]:
- """Attempt to resolve detected ambiguity"""
- # Stack context embeddings
- context_embeddings = torch.stack([ex.embedding for ex in context])
- # Apply transformer to resolve
- resolved = self.context_resolver(context_embeddings.unsqueeze(1))
- # Get attention weights for interpretability
- # In practice, extract from transformer layers
- return {
- 'resolved': True, # Simplified
- 'confidence': 0.8,
- 'method': f'{ambiguity_type}_resolution',
- 'context_used': len(context)
- }
- class MemoryCalibrator(nn.Module):
- """Calibrate confidence scores for better reliability"""
- def __init__(self):
- super().__init__()
- # Temperature scaling parameter
- self.temperature = nn.Parameter(torch.ones(1) * 1.5)
- # Platt scaling parameters
- self.platt_a = nn.Parameter(torch.zeros(1))
- self.platt_b = nn.Parameter(torch.ones(1))
- def calibrate(self, raw_confidence: float, uncertainty: float) -> float:
- """Calibrate confidence score"""
- # Adjust for uncertainty
- adjusted = raw_confidence * (1.0 - uncertainty)
- # Temperature scaling
- logit = torch.logit(torch.tensor([adjusted]))
- calibrated_logit = logit / self.temperature
- # Platt scaling
- calibrated_logit = self.platt_a * calibrated_logit + self.platt_b
- # Convert back to probability
- calibrated = torch.sigmoid(calibrated_logit)
- return float(calibrated.item())
- # ==================== ENHANCED PERSONALIZATION ====================
- class PrivacyPreservingPersonalization:
- """User-specific adaptation with privacy preservation"""
- def __init__(self, base_system: Any):
- self.base_system = base_system
- self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
- self.use_half = torch.cuda.is_available()
- # User model components
- self.user_models = {}
- self.behavioral_analyzer = BehavioralPatternAnalyzer().to(self.device)
- self.preference_learner = PreferenceLearner().to(self.device)
- self.cultural_adapter = CulturalAdapter().to(self.device)
- # Convert to half precision if using GPU
- if self.use_half:
- self.behavioral_analyzer.half()
- self.preference_learner.half()
- self.cultural_adapter.half()
- # Privacy components
- self.differential_privacy = DifferentialPrivacyModule(epsilon=1.9)
- self.federated_aggregator = FederatedAggregator()
- # Adaptive interface
- self.interface_adapter = AdaptiveInterfaceEngine()
- async def personalize_for_user(self, user_id: str,
- exchange: ConversationExchange,
- historical_data: Optional[List[ConversationEpisode]] = None) -> Dict[str, Any]:
- """Personalize memory system for specific user"""
- # Get or create user model
- if user_id not in self.user_models:
- self.user_models[user_id] = await self._initialize_user_model(
- user_id, historical_data
- )
- user_model = self.user_models[user_id]
- # 1. Analyze behavioral patterns
- behavior_features = await self.behavioral_analyzer.analyze(
- exchange, user_model
- )
- # 2. Learn preferences with privacy
- preferences = await self.preference_learner.learn_with_privacy(
- exchange, user_model, self.differential_privacy
- )
- # 3. Cultural adaptation
- cultural_context = await self.cultural_adapter.adapt(
- exchange, user_model
- )
- # 4. Update user model
- user_model = self._update_user_model(
- user_model, behavior_features, preferences, cultural_context
- )
- # 5. Adapt interface
- interface_adaptations = self.interface_adapter.generate_adaptations(
- user_model
- )
- return {
- 'behavioral_profile': behavior_features,
- 'preferences': preferences,
- 'cultural_adaptations': cultural_context,
- 'interface_config': interface_adaptations,
- 'privacy_budget_used': self.differential_privacy.get_budget_used()
- }
- async def _initialize_user_model(self, user_id: str,
- historical_data: Optional[List[ConversationEpisode]]) -> Dict[str, Any]:
- """Initialize user model from historical data"""
- model = {
- 'user_id': user_id,
- 'created_at': datetime.utcnow(),
- 'interaction_count': 0,
- 'behavioral_embeddings': [],
- 'preference_vectors': [],
- 'cultural_markers': {},
- 'communication_style': {
- 'formality': 0.5,
- 'verbosity': 0.5,
- 'technical_level': 0.5,
- 'emotional_expressiveness': 0.5
- },
- 'temporal_patterns': {},
- 'topic_preferences': defaultdict(float)
- }
- # Initialize from historical data if available
- if historical_data:
- for episode in historical_data[-10:]: # Last 10 episodes
- # Extract patterns
- for exchange in episode.exchanges:
- # Update communication style
- model['communication_style']['verbosity'] += len(
- exchange.user_message.split()
- ) / 100.0
- # Update topic preferences
- for topic in episode.topics:
- model['topic_preferences'][topic] += 1.0
- model['interaction_count'] += 1
- return model
- def _update_user_model(self, user_model: Dict[str, Any],
- behavior_features: Dict[str, Any],
- preferences: Dict[str, Any],
- cultural_context: Dict[str, Any]) -> Dict[str, Any]:
- """Update user model with new features"""
- # Update behavioral embeddings
- if 'behavioral_embedding' in behavior_features:
- embedding = behavior_features['behavioral_embedding']
- if torch.is_tensor(embedding):
- embedding = embedding.cpu().tolist()
- user_model['behavioral_embeddings'].append(embedding)
- # Keep only recent embeddings
- if len(user_model['behavioral_embeddings']) > 100:
- user_model['behavioral_embeddings'] = user_model['behavioral_embeddings'][-100:]
- # Update interaction count
- user_model['interaction_count'] += 1
- # Update communication style with exponential moving average
- alpha = 0.1 # Learning rate
- if 'communication_style' in preferences:
- for key, value in preferences['communication_style'].items():
- if key in user_model['communication_style']:
- user_model['communication_style'][key] = (
- alpha * value + (1 - alpha) * user_model['communication_style'][key]
- )
- # Update topic preferences
- if 'top_topics' in preferences:
- for topic_idx, score in preferences['top_topics']:
- # Convert topic index to string if needed
- topic_key = f"topic_{topic_idx}"
- user_model['topic_preferences'][topic_key] = (
- user_model['topic_preferences'].get(topic_key, 0) * 0.9 + score * 0.1
- )
- # Update temporal patterns
- if 'response_timing' in behavior_features:
- timing = behavior_features['response_timing']
- if timing.get('typical_hour'):
- current_hour = datetime.utcnow().hour
- hour_key = f'hour_{current_hour}'
- user_model['temporal_patterns'][hour_key] = (
- user_model['temporal_patterns'].get(hour_key, 0) + 1
- )
- # Update cultural markers
- if 'detected_markers' in cultural_context:
- for marker in cultural_context['detected_markers']:
- user_model['cultural_markers'][marker] = (
- user_model['cultural_markers'].get(marker, 0) + 1
- )
- # Store preference vectors if available
- if 'preference_vectors' in user_model and hasattr(self.preference_learner, 'encoder'):
- # Preference vectors are already updated in learn_with_privacy
- pass
- return user_model
- class BehavioralPatternAnalyzer(nn.Module):
- """Analyze user behavioral patterns"""
- def __init__(self, input_dim: int = 768, hidden_dim: int = 256):
- super().__init__()
- # Add device attribute
- self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
- # Sequential pattern analyzer
- self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers=2,
- batch_first=True, bidirectional=True)
- # Pattern classifier
- self.classifier = nn.Sequential(
- nn.Linear(hidden_dim * 2, 512),
- nn.ReLU(),
- nn.Dropout(0.2),
- nn.Linear(512, 256),
- nn.ReLU(),
- nn.Dropout(0.2),
- nn.Linear(256, 128) # Behavioral embedding
- )
- async def analyze(self, exchange: ConversationExchange,
- user_model: Dict[str, Any]) -> Dict[str, Any]:
- """Analyze behavioral patterns"""
- # Get recent EXCHANGE embeddings (768-dim), not behavioral embeddings
- recent_exchanges = user_model.get('recent_exchange_embeddings', [])[-10:]
- # Ensure exchange embedding has correct dtype
- exchange_embedding = exchange.embedding
- if exchange_embedding.dtype != next(self.parameters()).dtype:
- exchange_embedding = exchange_embedding.to(next(self.parameters()).dtype)
- if recent_exchanges:
- # Stack raw exchange embeddings (all 768-dim)
- dtype = next(self.parameters()).dtype
- device = next(self.parameters()).device
- recent_embeddings_tensor = torch.stack([
- torch.tensor(emb, dtype=dtype, device=device) if isinstance(emb, list) else
- emb.to(device).to(dtype) if hasattr(emb, 'to') else emb
- for emb in recent_exchanges
- ])
- # Add current exchange embedding
- all_embeddings = torch.cat([recent_embeddings_tensor, exchange_embedding.unsqueeze(0)])
- # Process sequence through LSTM (expects 768-dim input)
- lstm_out, (hidden, _) = self.lstm(all_embeddings.unsqueeze(0))
- # Get behavioral pattern from LSTM output
- final_hidden = torch.cat([hidden[-2], hidden[-1]], dim=1)
- behavioral_embedding = self.classifier(final_hidden).squeeze()
- else:
- # Bootstrap from current exchange
- behavioral_embedding = self._process_single_exchange(exchange_embedding)
- # Store the raw exchange embedding for future sequences
- if 'recent_exchange_embeddings' not in user_model:
- user_model['recent_exchange_embeddings'] = []
- user_model['recent_exchange_embeddings'].append(exchange_embedding.cpu())
- if len(user_model['recent_exchange_embeddings']) > 20:
- user_model['recent_exchange_embeddings'] = user_model['recent_exchange_embeddings'][-20:]
- # Extract behavioral features
- features = {
- 'response_timing': self._analyze_timing(exchange, user_model),
- 'linguistic_patterns': self._analyze_linguistic(exchange),
- 'interaction_style': self._analyze_interaction(exchange, user_model),
- 'engagement_level': self._measure_engagement(exchange),
- 'behavioral_embedding': behavioral_embedding.cpu().tolist()
- }
- return features
- def _process_single_exchange(self, exchange_embedding: torch.Tensor) -> torch.Tensor:
- """Process a single exchange embedding to behavioral embedding"""
- # Process through LSTM (need sequence format)
- exchange_seq = exchange_embedding.unsqueeze(0).unsqueeze(0) # [1, 1, 768]
- # Process through LSTM
- lstm_out, (hidden, _) = self.lstm(exchange_seq)
- # Concatenate hidden states
- final_hidden = torch.cat([hidden[-2], hidden[-1]], dim=1)
- # Get behavioral embedding through classifier
- behavioral_embedding = self.classifier(final_hidden).squeeze()
- return behavioral_embedding
- def _analyze_timing(self, exchange: ConversationExchange,
- user_model: Dict[str, Any]) -> Dict[str, float]:
- """Analyze temporal patterns"""
- current_hour = exchange.timestamp.hour
- current_day = exchange.timestamp.weekday()
- # Update temporal patterns
- temporal = user_model.get('temporal_patterns', {})
- hour_key = f'hour_{current_hour}'
- day_key = f'day_{current_day}'
- temporal[hour_key] = temporal.get(hour_key, 0) + 1
- temporal[day_key] = temporal.get(day_key, 0) + 1
- return {
- 'typical_hour': current_hour in [h for h, c in temporal.items() if c > 3],
- 'typical_day': current_day in [d for d, c in temporal.items() if c > 3],
- 'session_time': 0.0 # Would calculate from session start
- }
- def _analyze_linguistic(self, exchange: ConversationExchange) -> Dict[str, float]:
- """Analyze linguistic patterns"""
- text = exchange.user_message
- words = text.split()
- # Simple metrics (would be more sophisticated in production)
- return {
- 'avg_word_length': np.mean([len(w) for w in words]) if words else 0,
- 'sentence_complexity': len(words) / max(text.count('.') + text.count('!') + text.count('?'), 1),
- 'question_ratio': text.count('?') / max(len(text.split('. ')), 1),
- 'formality_score': self._estimate_formality(text)
- }
- def _estimate_formality(self, text: str) -> float:
- """Estimate text formality"""
- informal_markers = ['gonna', 'wanna', 'lol', 'btw', 'thx', '!', '...']
- formal_markers = ['therefore', 'however', 'furthermore', 'regarding', 'pursuant']
- informal_count = sum(1 for marker in informal_markers if marker in text.lower())
- formal_count = sum(1 for marker in formal_markers if marker in text.lower())
- if informal_count + formal_count == 0:
- return 0.5
- return formal_count / (informal_count + formal_count)
- def _analyze_interaction(self, exchange: ConversationExchange,
- user_model: Dict[str, Any]) -> Dict[str, float]:
- """Analyze interaction patterns"""
- return {
- 'directness': 1.0 if '?' in exchange.user_message else 0.5,
- 'elaboration': len(exchange.user_message.split()) / 20.0, # Normalized
- 'emotional_expression': abs(exchange.emotional_valence),
- 'topic_consistency': 0.8 # Would calculate from topic similarity
- }
- def _measure_engagement(self, exchange: ConversationExchange) -> float:
- """Measure user engagement level"""
- # Multiple signals
- length_signal = min(len(exchange.user_message.split()) / 50.0, 1.0)
- question_signal = 1.0 if '?' in exchange.user_message else 0.5
- emotional_signal = abs(exchange.emotional_valence)
- # Weighted combination
- engagement = 0.4 * length_signal + 0.3 * question_signal + 0.3 * emotional_signal
- return engagement
- def _calculate_stability(self, user_model: Dict[str, Any]) -> float:
- """Calculate preference stability over time"""
- if 'preference_vectors' not in user_model or len(user_model['preference_vectors']) < 2:
- return 0.5 # Default stability for new users
- # Get recent preference vectors
- recent_vectors = user_model['preference_vectors'][-10:]
- if len(recent_vectors) < 2:
- return 0.5
- # Calculate pairwise similarities
- similarities = []
- for i in range(len(recent_vectors) - 1):
- sim = F.cosine_similarity(
- recent_vectors[i].unsqueeze(0),
- recent_vectors[i + 1].unsqueeze(0)
- )
- similarities.append(sim.item())
- # Average similarity as stability measure
- return sum(similarities) / len(similarities) if similarities else 0.5
- class PreferenceLearner(nn.Module):
- """Learn user preferences with privacy preservation"""
- def __init__(self, input_dim: int = 768, preference_dim: int = 64):
- super().__init__()
- # Preference encoder
- self.encoder = nn.Sequential(
- nn.Linear(input_dim, 256),
- nn.ReLU(),
- nn.Dropout(0.2),
- nn.Linear(256, 128),
- nn.ReLU(),
- nn.Dropout(0.2),
- nn.Linear(128, preference_dim)
- )
- # Topic preference predictor
- self.topic_predictor = nn.Linear(preference_dim, 50) # 50 topic categories
- # Style preference predictor
- self.style_predictor = nn.Linear(preference_dim, 10) # 10 style dimensions
- async def learn_with_privacy(self, exchange: ConversationExchange,
- user_model: Dict[str, Any],
- privacy_module: 'DifferentialPrivacyModule') -> Dict[str, Any]:
- """Learn preferences with differential privacy"""
- # Ensure exchange embedding has correct dtype
- exchange_embedding = exchange.embedding
- if exchange_embedding.dtype != next(self.parameters()).dtype:
- exchange_embedding = exchange_embedding.to(next(self.parameters()).dtype)
- # Encode exchange
- preference_vector = self.encoder(exchange_embedding)
- # Add noise for privacy
- noisy_vector = privacy_module.add_noise(preference_vector)
- # Predict preferences
- topic_prefs = torch.softmax(self.topic_predictor(noisy_vector), dim=-1)
- style_prefs = torch.sigmoid(self.style_predictor(noisy_vector))
- # Update user model preferences
- if 'preference_vectors' in user_model:
- user_model['preference_vectors'].append(preference_vector)
- # Keep only recent preferences
- if len(user_model['preference_vectors']) > 50:
- user_model['preference_vectors'] = user_model['preference_vectors'][-50:]
- # Extract top preferences
- top_topics = torch.topk(topic_prefs, 5)
- preferences = {
- 'top_topics': [(i.item(), v.item()) for i, v in zip(top_topics.indices, top_topics.values)],
- 'communication_style': {
- 'conciseness': float(style_prefs[0]),
- 'technicality': float(style_prefs[1]),
- 'formality': float(style_prefs[2]),
- 'emotionality': float(style_prefs[3]),
- 'creativity': float(style_prefs[4])
- },
- 'preference_stability': self._calculate_stability(user_model)
- }
- return preferences
- def _calculate_stability(self, user_model: Dict[str, Any]) -> float:
- """Calculate preference stability from user's preference history"""
- if 'preference_vectors' not in user_model or len(user_model['preference_vectors']) < 2:
- return 0.5 # Default stability for new users
- # Get recent preference vectors
- recent_vectors = user_model['preference_vectors'][-10:] # Last 10
- if len(recent_vectors) < 2:
- return 0.5
- # Calculate pairwise similarities
- similarities = []
- for i in range(1, len(recent_vectors)):
- # Ensure tensors are on same device
- v1 = recent_vectors[i-1]
- v2 = recent_vectors[i]
- if v1.device != v2.device:
- v2 = v2.to(v1.device)
- sim = F.cosine_similarity(v1.unsqueeze(0), v2.unsqueeze(0))
- similarities.append(float(sim.item()))
- # Average similarity indicates stability
- if similarities:
- avg_similarity = sum(similarities) / len(similarities)
- # Convert to 0-1 range where 1 is highly stable
- stability = (avg_similarity + 1) / 2 # cosine sim is -1 to 1
- return float(stability)
- return 0.5
- class CulturalAdapter(nn.Module):
- """Adapt to cultural contexts and reduce bias"""
- def __init__(self):
- super().__init__()
- # Cultural embedding space
- self.cultural_embeddings = nn.Embedding(100, 64) # 100 cultural contexts
- # Adaptation network
- self.adapter = nn.Sequential(
- nn.Linear(768 + 64, 512),
- nn.ReLU(),
- nn.Dropout(0.2),
- nn.Linear(512, 256),
- nn.ReLU(),
- nn.Linear(256, 128)
- )
- async def adapt(self, exchange: ConversationExchange,
- user_model: Dict[str, Any]) -> Dict[str, Any]:
- """Adapt to cultural context"""
- # Detect cultural markers
- cultural_markers = self._detect_cultural_markers(exchange.user_message)
- # Get cultural embedding
- device = next(self.parameters()).device
- dtype = next(self.parameters()).dtype
- if cultural_markers:
- # Simple: use first detected marker
- cultural_id = hash(cultural_markers[0]) % 100
- cultural_emb = self.cultural_embeddings(torch.tensor([cultural_id], device=device))
- else:
- # Default cultural context
- cultural_emb = torch.zeros(1, 64, device=device, dtype=dtype)
- # Ensure exchange embedding has correct dtype
- exchange_embedding = exchange.embedding
- if exchange_embedding.dtype != dtype:
- exchange_embedding = exchange_embedding.to(dtype)
- # Combine with exchange embedding
- combined = torch.cat([exchange_embedding.unsqueeze(0), cultural_emb], dim=1)
- # Generate adaptation
- adapted = self.adapter(combined).squeeze()
- # Extract cultural adaptation parameters
- adaptations = {
- 'detected_markers': cultural_markers,
- 'adaptation_vector': adapted.cpu().tolist() if torch.is_tensor(adapted) else adapted,
- 'communication_adjustments': {
- 'directness': float(torch.sigmoid(adapted[0])),
- 'context_level': float(torch.sigmoid(adapted[1])),
- 'formality_adjustment': float(torch.tanh(adapted[2])),
- 'time_orientation': float(torch.sigmoid(adapted[3]))
- },
- 'bias_mitigation': self._calculate_bias_mitigation(adapted)
- }
- return adaptations
- def _detect_cultural_markers(self, text: str) -> List[str]:
- """Detect cultural markers in text"""
- # Simplified cultural marker detection
- # In production, use more sophisticated methods
- markers = []
- # Time expressions
- if any(word in text.lower() for word in ['mañana', 'inshallah', 'punctual']):
- markers.append('time_cultural_marker')
- # Communication style
- if any(word in text.lower() for word in ['honour', 'honor', 'respect', 'sensei']):
- markers.append('hierarchy_marker')
- # Collectivism vs individualism
- if any(word in text.lower() for word in ['we', 'our', 'team', 'family']):
- markers.append('collectivist_marker')
- elif any(word in text.lower() for word in ['i', 'my', 'individual']):
- markers.append('individualist_marker')
- return markers
- def _calculate_bias_mitigation(self, adapted: torch.Tensor) -> Dict[str, float]:
- """Calculate bias mitigation factors"""
- # Extract bias mitigation parameters from adaptation
- return {
- 'cultural_sensitivity': float(torch.sigmoid(adapted[10])),
- 'stereotype_avoidance': float(torch.sigmoid(adapted[11])),
- 'inclusive_language': float(torch.sigmoid(adapted[12])),
- 'bias_correction_strength': float(torch.sigmoid(adapted[13]))
- }
- class DifferentialPrivacyModule:
- """Add differential privacy to sensitive operations"""
- def __init__(self, epsilon: float = 1.9):
- self.epsilon = epsilon
- self.delta = 1e-5
- self.budget_used = 0.0
- self.max_budget = 10.0
- def add_noise(self, tensor: torch.Tensor) -> torch.Tensor:
- """Add calibrated noise for differential privacy"""
- # Check budget
- if self.budget_used >= self.max_budget:
- logger.warning("Privacy budget exhausted")
- return tensor
- # Calculate sensitivity
- sensitivity = self._calculate_sensitivity(tensor)
- # Calculate noise scale
- noise_scale = sensitivity * np.sqrt(2 * np.log(1.25 / self.delta)) / self.epsilon
- # Add Gaussian noise - ensure same dtype
- noise = torch.randn_like(tensor) * noise_scale
- noisy_tensor = tensor + noise
- # Update budget
- self.budget_used += self.epsilon
- return noisy_tensor
- def _calculate_sensitivity(self, tensor: torch.Tensor) -> float:
- """Calculate L2 sensitivity of the function"""
- # For embeddings, use norm as sensitivity estimate
- return float(torch.norm(tensor).item())
- def get_budget_used(self) -> float:
- """Get current privacy budget usage"""
- return self.budget_used / self.max_budget
- class FederatedAggregator:
- """Aggregate user models in federated manner"""
- def __init__(self):
- self.aggregation_rounds = 0
- async def aggregate_models(self, local_models: List[Dict[str, Any]]) -> Dict[str, Any]:
- """Aggregate models from multiple users"""
- if not local_models:
- return {}
- # Aggregate preference vectors
- all_preferences = []
- for model in local_models:
- if 'preference_vectors' in model and model['preference_vectors']:
- # Take mean of each user's preferences
- user_pref = torch.stack(model['preference_vectors']).mean(dim=0)
- all_preferences.append(user_pref)
- # Global preference vector
- if all_preferences:
- global_preferences = torch.stack(all_preferences).mean(dim=0)
- else:
- global_preferences = torch.zeros(64)
- # Aggregate communication styles
- style_dims = ['formality', 'verbosity', 'technical_level', 'emotional_expressiveness']
- global_style = {}
- for dim in style_dims:
- values = [m['communication_style'].get(dim, 0.5) for m in local_models]
- global_style[dim] = np.mean(values)
- # Aggregate topic preferences
- topic_counts = defaultdict(float)
- for model in local_models:
- for topic, count in model.get('topic_preferences', {}).items():
- topic_counts[topic] += count
- # Normalize
- total_count = sum(topic_counts.values())
- if total_count > 0:
- topic_distribution = {k: v/total_count for k, v in topic_counts.items()}
- else:
- topic_distribution = {}
- self.aggregation_rounds += 1
- return {
- 'global_preferences': global_preferences,
- 'global_communication_style': global_style,
- 'global_topic_distribution': topic_distribution,
- 'aggregation_round': self.aggregation_rounds,
- 'num_participants': len(local_models)
- }
- class AdaptiveInterfaceEngine:
- """Generate interface adaptations based on user model"""
- def generate_adaptations(self, user_model: Dict[str, Any]) -> Dict[str, Any]:
- """Generate interface configuration"""
- style = user_model.get('communication_style', {})
- # Response length adaptation
- if style.get('verbosity', 0.5) < 0.3:
- response_style = 'concise'
- max_tokens = 150
- elif style.get('verbosity', 0.5) > 0.7:
- response_style = 'detailed'
- max_tokens = 500
- else:
- response_style = 'balanced'
- max_tokens = 300
- # Technical level adaptation
- technical_level = style.get('technical_level', 0.5)
- if technical_level < 0.3:
- explanation_style = 'simple'
- elif technical_level > 0.7:
- explanation_style = 'technical'
- else:
- explanation_style = 'moderate'
- # Emotional adaptation
- emotional_level = style.get('emotional_expressiveness', 0.5)
- # Visual preferences (hypothetical)
- visual_density = 1.0 - style.get('verbosity', 0.5) # Inverse relationship
- return {
- 'response_config': {
- 'style': response_style,
- 'max_tokens': max_tokens,
- 'explanation_level': explanation_style,
- 'emotional_mirroring': emotional_level > 0.6
- },
- 'interface_config': {
- 'visual_density': visual_density,
- 'show_confidence_scores': technical_level > 0.6,
- 'enable_clarification_prompts': technical_level < 0.4,
- 'response_delay_ms': 500 if style.get('formality', 0.5) > 0.7 else 200
- },
- 'memory_display': {
- 'show_precious_memories': True,
- 'memory_detail_level': 'high' if technical_level > 0.5 else 'summary',
- 'enable_memory_exploration': technical_level > 0.4
- }
- }
- # ==================== INTEGRATION WITH EXISTING SYSTEM ====================
- class AdvancedMemorySystem(CompleteAIMemorySystem):
- """Enhanced memory system integrating all research findings"""
- def __init__(self, config: Dict[str, Any]):
- # Initialize base system
- super().__init__(config)
- # Initialize advanced components
- self._init_advanced_components()
- logger.info("Advanced Memory System initialized with all research enhancements")
- def _init_advanced_components(self):
- """Initialize research-based components"""
- # Determine device
- device = 'cuda' if torch.cuda.is_available() else 'cpu'
- # HippoRAG components
- self.hippo_index = HippoIndex(embedding_dim=768, device=device)
- # Pattern recognition
- self.pattern_recognizer = AdaptivePatternRecognizer(
- embedding_dim=768,
- device=device # Pass device string
- )
- # Multi-dimensional encoding
- self.multi_encoder = MultiDimensionalEncoder()
- # Sleep consolidation
- self.sleep_engine = SleepConsolidationEngine(self)
- # Advanced conversation flow
- self.flow_analyzer = AdvancedConversationFlowAnalyzer(self.embedder)
- # Uncertainty handling
- self.uncertainty_system = UncertaintyAwareMemorySystem(self)
- # Personalization
- self.personalization = PrivacyPreservingPersonalization(self)
- # Schedule background tasks
- asyncio.create_task(self._run_consolidation_cycles())
- # Backup manager
- self.backup_manager = BackupManager(self)
- async def _run_consolidation_cycles(self):
- """Run periodic sleep consolidation"""
- while True:
- await asyncio.sleep(3600 * 6) # Every 6 hours
- try:
- await self.sleep_engine.run_consolidation_cycle()
- logger.info("Completed sleep consolidation cycle")
- except Exception as e:
- logger.error(f"Error in consolidation cycle: {e}")
- async def add_exchange(self, session_id: str, user_msg: str, assistant_msg: str,
- metadata: Optional[Dict] = None) -> Dict[str, Any]:
- """Enhanced add_exchange with all research improvements"""
- # Get base result
- result = await super().add_exchange(session_id, user_msg, assistant_msg, metadata)
- # Get current episode
- episode = self.active_episodes.get(session_id)
- if not episode:
- return result
- # Apply advanced processing
- exchange = episode.exchanges[-1]
- user_id = metadata.get('user_id', 'unknown')
- # 1. Adaptive pattern recognition
- pattern_results = await self._detect_adaptive_patterns(episode, exchange, user_id)
- # 2. Multi-dimensional encoding (async)
- asyncio.create_task(self._encode_multidimensional(episode))
- # 3. HippoRAG indexing
- if exchange.embedding is not None:
- self.hippo_index.add_memory(
- f"{episode.id}_{len(episode.exchanges)}",
- exchange.embedding,
- exchange.timestamp,
- {'topic_vector': exchange.topic_vector}
- )
- # 4. Flow analysis
- flow_features = await self.flow_analyzer.analyze_conversation_flow(
- episode.exchanges
- )
- # 5. Personalization
- personalization = await self.personalization.personalize_for_user(
- user_id, exchange, [episode]
- )
- # Update result
- result.update({
- 'pattern_detection': pattern_results,
- 'flow_quality': flow_features.get('flow_quality', 0.0),
- 'personalization': personalization,
- 'hippo_indexed': True
- })
- return result
- async def _fetch_all_user_memories(self, user_id: str) -> List[Dict[str, Any]]:
- """Fetch all memories for backup"""
- conn = sqlite3.connect(str(self.db_path))
- cursor = conn.execute("""
- SELECT
- m.id, m.user_id, m.episode_id, m.created_at, m.memory_type,
- m.memory_content, m.searchable_text, m.importance, m.qdrant_id,
- m.consolidation_state, m.accessed_count, m.marked_precious,
- m.never_forget, m.experiential_qualities, m.resonance_patterns,
- m.narrative_connections,
- e.started_at, e.ended_at, e.title, e.summary, e.topics,
- e.intent_completed, e.coherence_score, e.consolidation_level,
- e.is_precious, e.relationship_thread_id, e.experiential_signature
- FROM memories m
- JOIN episodes e ON m.episode_id = e.id
- WHERE m.user_id = ?
- ORDER BY m.created_at DESC
- """, (user_id,))
- memories = []
- columns = [description[0] for description in cursor.description]
- for row in cursor.fetchall():
- # Create dictionary from row
- row_dict = dict(zip(columns, row))
- # Parse JSON fields
- memory_dict = {
- 'id': row_dict['id'],
- 'user_id': row_dict['user_id'],
- 'episode_id': row_dict['episode_id'],
- 'created_at': row_dict['created_at'],
- 'memory_type': row_dict['memory_type'],
- 'memory_content': json.loads(row_dict['memory_content']),
- 'searchable_text': row_dict['searchable_text'],
- 'importance': row_dict['importance'],
- 'qdrant_id': row_dict['qdrant_id'],
- 'consolidation_state': row_dict['consolidation_state'],
- 'accessed_count': row_dict['accessed_count'],
- 'marked_precious': bool(row_dict['marked_precious']),
- 'never_forget': bool(row_dict['never_forget']),
- 'experiential_qualities': json.loads(row_dict['experiential_qualities']) if row_dict['experiential_qualities'] else {},
- 'resonance_patterns': json.loads(row_dict['resonance_patterns']) if row_dict['resonance_patterns'] else [],
- 'narrative_connections': json.loads(row_dict['narrative_connections']) if row_dict['narrative_connections'] else [],
- # Episode data
- 'episode': {
- 'id': row_dict['episode_id'],
- 'started_at': row_dict['started_at'],
- 'ended_at': row_dict['ended_at'],
- 'title': row_dict['title'],
- 'summary': row_dict['summary'],
- 'topics': json.loads(row_dict['topics']) if row_dict['topics'] else [],
- 'intent_completed': bool(row_dict['intent_completed']),
- 'coherence_score': row_dict['coherence_score'],
- 'consolidation_level': row_dict['consolidation_level'],
- 'is_precious': bool(row_dict['is_precious']),
- 'relationship_thread_id': row_dict['relationship_thread_id'],
- 'experiential_signature': json.loads(row_dict['experiential_signature']) if row_dict['experiential_signature'] else {}
- }
- }
- memories.append(memory_dict)
- conn.close()
- return memories
- async def _fetch_user_threads(self, user_id: str) -> List[Dict[str, Any]]:
- """Fetch relationship threads for backup"""
- conn = sqlite3.connect(str(self.db_path))
- cursor = conn.execute("""
- SELECT
- thread_id, user_id, created_at, depth_score, trust_level,
- interaction_count, total_time_seconds, understanding_trajectory,
- breakthrough_moments, precious_moments, typical_topics,
- temporal_patterns
- FROM relationship_threads
- WHERE user_id = ?
- ORDER BY created_at DESC
- """, (user_id,))
- threads = []
- for row in cursor.fetchall():
- thread_dict = {
- 'thread_id': row[0],
- 'user_id': row[1],
- 'created_at': row[2],
- 'depth_score': row[3],
- 'trust_level': row[4],
- 'interaction_count': row[5],
- 'total_time_seconds': row[6],
- 'understanding_trajectory': json.loads(row[7]) if row[7] else [],
- 'breakthrough_moments': json.loads(row[8]) if row[8] else [],
- 'precious_moments': json.loads(row[9]) if row[9] else [],
- 'typical_topics': json.loads(row[10]) if row[10] else {},
- 'temporal_patterns': json.loads(row[11]) if row[11] else {}
- }
- threads.append(thread_dict)
- conn.close()
- return threads
- async def create_user_backup(self, user_id: str) -> Dict[str, Any]:
- """Create comprehensive backup for a user"""
- logger.info(f"Creating backup for user {user_id}")
- # Fetch all data
- memories = await self._fetch_all_user_memories(user_id)
- threads = await self._fetch_user_threads(user_id)
- # Get user stats
- stats = await self._get_user_stats(user_id)
- # Get precious memory IDs for quick reference
- precious_ids = [m['id'] for m in memories if m['marked_precious'] or m['never_forget']]
- # Create backup package
- backup_data = {
- 'version': '1.0',
- 'created_at': datetime.utcnow().isoformat(),
- 'user_id': user_id,
- 'stats': stats,
- 'memories': memories,
- 'memory_count': len(memories),
- 'precious_count': len(precious_ids),
- 'precious_memory_ids': precious_ids,
- 'relationship_threads': threads,
- 'thread_count': len(threads),
- # Include current state
- 'active_episode_id': next(
- (ep.id for sid, ep in self.active_episodes.items() if ep.user_id == user_id),
- None
- )
- }
- logger.info(f"Backup created for {user_id}: {len(memories)} memories, {len(precious_ids)} precious")
- return backup_data
- async def restore_user_memories(self, user_id: str, memories: List[Dict[str, Any]],
- threads: List[Dict[str, Any]]) -> int:
- """Restore memories from backup"""
- logger.info(f"Starting restore for user {user_id}: {len(memories)} memories")
- conn = sqlite3.connect(str(self.db_path))
- restored_count = 0
- try:
- # Start transaction
- conn.execute("BEGIN TRANSACTION")
- # Restore episodes first (memories depend on them)
- episode_ids = set()
- for memory in memories:
- episode = memory['episode']
- episode_ids.add(episode['id'])
- # Check if episode exists
- existing = conn.execute(
- "SELECT id FROM episodes WHERE id = ?",
- (episode['id'],)
- ).fetchone()
- if not existing:
- conn.execute("""
- INSERT INTO episodes (
- id, user_id, started_at, ended_at, title, summary, topics,
- intent_completed, coherence_score, consolidation_level,
- is_precious, relationship_thread_id, experiential_signature
- ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """, (
- episode['id'],
- user_id,
- episode['started_at'],
- episode['ended_at'],
- episode['title'],
- episode['summary'],
- json.dumps(episode['topics']),
- int(episode['intent_completed']),
- episode['coherence_score'],
- episode['consolidation_level'],
- int(episode['is_precious']),
- episode['relationship_thread_id'],
- json.dumps(episode['experiential_signature'])
- ))
- # Restore memories
- for memory in memories:
- # Check if memory exists
- existing = conn.execute(
- "SELECT id FROM memories WHERE id = ?",
- (memory['id'],)
- ).fetchone()
- if not existing:
- conn.execute("""
- INSERT INTO memories (
- id, user_id, episode_id, created_at, memory_type,
- memory_content, searchable_text, importance, qdrant_id,
- consolidation_state, accessed_count, marked_precious,
- never_forget, experiential_qualities, resonance_patterns,
- narrative_connections
- ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """, (
- memory['id'],
- memory['user_id'],
- memory['episode_id'],
- memory['created_at'],
- memory['memory_type'],
- json.dumps(memory['memory_content']),
- memory['searchable_text'],
- memory['importance'],
- memory['qdrant_id'],
- memory['consolidation_state'],
- memory['accessed_count'],
- int(memory['marked_precious']),
- int(memory['never_forget']),
- json.dumps(memory['experiential_qualities']),
- json.dumps(memory['resonance_patterns']),
- json.dumps(memory['narrative_connections'])
- ))
- restored_count += 1
- # Restore to Qdrant if we have the embedding
- # (This would need the actual embedding data in the backup)
- # Restore relationship threads
- for thread in threads:
- existing = conn.execute(
- "SELECT thread_id FROM relationship_threads WHERE thread_id = ?",
- (thread['thread_id'],)
- ).fetchone()
- if not existing:
- conn.execute("""
- INSERT INTO relationship_threads (
- thread_id, user_id, created_at, depth_score, trust_level,
- interaction_count, total_time_seconds, understanding_trajectory,
- breakthrough_moments, precious_moments, typical_topics,
- temporal_patterns
- ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
- """, (
- thread['thread_id'],
- thread['user_id'],
- thread['created_at'],
- thread['depth_score'],
- thread['trust_level'],
- thread['interaction_count'],
- thread['total_time_seconds'],
- json.dumps(thread['understanding_trajectory']),
- json.dumps(thread['breakthrough_moments']),
- json.dumps(thread['precious_moments']),
- json.dumps(thread['typical_topics']),
- json.dumps(thread['temporal_patterns'])
- ))
- # Commit transaction
- conn.commit()
- logger.info(f"Successfully restored {restored_count} memories for user {user_id}")
- except Exception as e:
- conn.rollback()
- logger.error(f"Restore failed: {e}")
- raise
- finally:
- conn.close()
- return restored_count
- async def export_to_human_readable(self, user_id: str, format: str = "markdown") -> str:
- """Export all memories in human-readable format"""
- memories = await self._fetch_all_user_memories(user_id)
- threads = await self._fetch_user_threads(user_id)
- if format == "markdown":
- content = f"# Complete Memory Archive for {user_id}\n\n"
- content += f"*Generated on {datetime.utcnow().strftime('%B %d, %Y at %I:%M %p')}*\n\n"
- # Summary statistics
- content += "## Summary\n\n"
- content += f"- Total Memories: {len(memories)}\n"
- content += f"- Precious Memories: {len([m for m in memories if m['marked_precious'] or m['never_forget']])}\n"
- content += f"- Relationship Threads: {len(threads)}\n"
- if threads:
- thread = threads[0] # Most recent thread
- content += f"- Trust Level: {thread['trust_level']:.1%}\n"
- content += f"- Relationship Depth: {thread['depth_score']:.1%}\n"
- content += f"- Total Interactions: {thread['interaction_count']}\n"
- content += "\n---\n\n"
- # Precious memories section
- content += "## 💎 Precious Memories\n\n"
- precious = [m for m in memories if m['marked_precious'] or m['never_forget']]
- for i, memory in enumerate(precious, 1):
- content += f"### {i}. {memory['episode']['title'] or 'Untitled'}\n\n"
- content += f"*{datetime.fromtimestamp(memory['created_at']).strftime('%B %d, %Y')}*\n\n"
- if memory['searchable_text']:
- content += f"{memory['searchable_text']}\n\n"
- if memory['experiential_qualities']:
- eq = memory['experiential_qualities']
- if eq.get('breakthrough_count', 0) > 0:
- content += f"- 💡 Breakthrough moments: {eq['breakthrough_count']}\n"
- if eq.get('peak_resonance', 0) > 0.8:
- content += f"- 🔮 Peak resonance: {eq['peak_resonance']:.1%}\n"
- content += "\n"
- content += "\n---\n\n"
- # All memories chronologically
- content += "## 📚 Complete Memory Timeline\n\n"
- # Group by month
- from itertools import groupby
- memories_sorted = sorted(memories, key=lambda m: m['created_at'], reverse=True)
- for month_key, month_memories in groupby(
- memories_sorted,
- key=lambda m: datetime.fromtimestamp(m['created_at']).strftime('%B %Y')
- ):
- content += f"### {month_key}\n\n"
- for memory in month_memories:
- date_str = datetime.fromtimestamp(memory['created_at']).strftime('%d')
- precious_marker = "💎 " if (memory['marked_precious'] or memory['never_forget']) else ""
- content += f"**{date_str}** - {precious_marker}{memory['episode']['title'] or 'Conversation'}\n"
- content += f" {memory['searchable_text'][:100]}...\n\n"
- return content
- elif format == "json":
- return json.dumps({
- 'user_id': user_id,
- 'export_date': datetime.utcnow().isoformat(),
- 'memories': memories,
- 'threads': threads
- }, indent=2)
- def get_all_users(self) -> List[str]:
- """Get all unique user IDs in the system"""
- conn = sqlite3.connect(str(self.db_path))
- cursor = conn.execute("SELECT DISTINCT user_id FROM episodes")
- users = [row[0] for row in cursor.fetchall()]
- conn.close()
- return users
- async def _detect_adaptive_patterns(self, episode: ConversationEpisode,
- new_exchange: ConversationExchange,
- user_id: str) -> Dict[str, Any]:
- """Detect patterns using adaptive recognition"""
- results = {}
- # Check for breakthrough patterns
- is_breakthrough, confidence = await self.pattern_recognizer.detect_pattern(
- user_id, episode.exchanges, 'breakthrough'
- )
- if is_breakthrough:
- new_exchange.breakthrough_moment = True
- results['breakthrough'] = confidence
- # Learn from this pattern
- await self.pattern_recognizer.learn_pattern(
- user_id, episode.exchanges[-3:], 'breakthrough', confidence
- )
- # Check for completion patterns
- is_completion, confidence = await self.pattern_recognizer.detect_pattern(
- user_id, episode.exchanges, 'completion'
- )
- if is_completion:
- episode.intent_completed = True
- results['completion'] = confidence
- # Check for emotional patterns
- is_emotional, confidence = await self.pattern_recognizer.detect_pattern(
- user_id, episode.exchanges, 'emotional'
- )
- if is_emotional:
- results['emotional'] = confidence
- return results
- async def _encode_multidimensional(self, episode: ConversationEpisode):
- """Apply multi-dimensional encoding to episode"""
- try:
- encodings = await self.multi_encoder.encode_memory(episode)
- # Store encodings
- episode.multi_dimensional_encodings = encodings
- # Use matryoshka representations for flexible retrieval
- episode.matryoshka_embeddings = encodings['matryoshka']
- except Exception as e:
- logger.error(f"Error in multi-dimensional encoding: {e}")
- async def search_memories(self, user_id: str, query: str, limit: int = 10,
- include_precious: bool = True) -> List[Dict[str, Any]]:
- """Enhanced search with HippoRAG and uncertainty"""
- # Generate query embedding
- query_embedding = await self.embedder.encode_async([query])
- query_tensor = query_embedding[0]
- # 1. HippoRAG retrieval
- hippo_results = self.hippo_index.single_hop_retrieval(query_tensor, k=limit*2)
- # 2. Standard retrieval
- standard_results = await super().search_memories(
- user_id, query, limit=limit, include_precious=include_precious
- )
- # 3. Combine and re-rank with uncertainty
- combined_results = []
- for memory_id in hippo_results:
- # Retrieve full memory
- memory = await super()._retrieve_memory_by_id(memory_id)
- if memory and memory.user_id == user_id:
- # Add uncertainty information
- uncertainty = memory.metadata.get('uncertainty', {})
- combined_results.append({
- 'memory': memory,
- 'retrieval_method': 'hipporag',
- 'confidence': 1.0 - uncertainty.get('total', 0.0)
- })
- # Add standard results
- for result in standard_results:
- combined_results.append({
- 'memory': result,
- 'retrieval_method': 'standard',
- 'confidence': result.get('score', 0.5)
- })
- # Re-rank by confidence and relevance
- combined_results.sort(key=lambda x: x['confidence'], reverse=True)
- # Format results
- final_results = []
- seen_ids = set()
- for item in combined_results:
- memory = item['memory']
- memory_id = memory.id if hasattr(memory, 'id') else memory.get('memory_id')
- if memory_id not in seen_ids:
- seen_ids.add(memory_id)
- if isinstance(memory, dict):
- result_dict = {**memory}
- else:
- result_dict = {**memory.__dict__}
- result_dict.update({
- 'retrieval_confidence': item['confidence'],
- 'retrieval_method': item['retrieval_method']
- })
- final_results.append(result_dict)
- if len(final_results) >= limit:
- break
- return final_results
- async def get_conversation_context(self, user_id: str, current_query: str) -> Dict[str, Any]:
- """Enhanced context with all research improvements"""
- # Get base context
- context = await super().get_conversation_context(user_id, current_query)
- # Enhance with advanced features
- # 1. Flow analysis of current conversation
- if user_id in self.active_episodes:
- episode = self.active_episodes[user_id]
- flow_analysis = await self.flow_analyzer.analyze_conversation_flow(
- episode.exchanges
- )
- context['conversation_flow'] = flow_analysis
- # 2. User personalization
- user_model = self.personalization.user_models.get(user_id, {})
- if user_model:
- context['user_preferences'] = {
- 'communication_style': user_model.get('communication_style', {}),
- 'topic_preferences': dict(list(user_model.get('topic_preferences', {}).items())[:5]),
- 'typical_patterns': user_model.get('temporal_patterns', {})
- }
- # 3. Pattern predictions
- if hasattr(self.pattern_recognizer, 'user_patterns'):
- user_patterns = self.pattern_recognizer.user_patterns.get(user_id, {})
- context['likely_patterns'] = {
- pattern_type: len(patterns)
- for pattern_type, patterns in user_patterns.items()
- }
- # 4. Uncertainty context
- context['uncertainty_aware'] = True
- context['privacy_preserved'] = True
- return context
- class BackupDestination(ABC):
- """Abstract base for backup destinations"""
- @abstractmethod
- async def backup(self, data: bytes, filename: str) -> bool:
- pass
- @abstractmethod
- async def restore(self, filename: str) -> bytes:
- pass
- class LocalBackup(BackupDestination):
- def __init__(self, path: str):
- self.path = Path(path)
- self.path.mkdir(parents=True, exist_ok=True)
- async def backup(self, data: bytes, filename: str) -> bool:
- try:
- filepath = self.path / filename
- async with aiofiles.open(filepath, 'wb') as f:
- await f.write(data)
- return True
- except Exception as e:
- logger.error(f"Local backup failed: {e}")
- return False
- async def restore(self, filename: str) -> bytes:
- filepath = self.path / filename
- async with aiofiles.open(filepath, 'rb') as f:
- return await f.read()
- class S3Backup(BackupDestination):
- def __init__(self, bucket: str):
- self.bucket = bucket
- self.client = boto3.client('s3')
- async def backup(self, data: bytes, filename: str) -> bool:
- try:
- await asyncio.to_thread(
- self.client.put_object,
- Bucket=self.bucket,
- Key=f"claude-memories/{filename}",
- Body=data
- )
- return True
- except Exception as e:
- logger.error(f"S3 backup failed: {e}")
- return False
- async def restore(self, filename: str) -> bytes:
- response = await asyncio.to_thread(
- self.client.get_object,
- Bucket=self.bucket,
- Key=f"claude-memories/{filename}"
- )
- return response['Body'].read()
- class BackupManager:
- def __init__(self, memory_system: Any):
- self.memory_system = memory_system
- self.destinations = []
- # Configure destinations based on environment
- self.destinations.append(LocalBackup("./backups/"))
- if os.getenv('AWS_S3_BUCKET'):
- self.destinations.append(S3Backup(os.getenv('AWS_S3_BUCKET')))
- # Add more destinations as configured
- logger.info(f"Backup manager initialized with {len(self.destinations)} destinations")
- async def create_backup_package(self, user_id: str) -> bytes:
- """Create comprehensive backup package"""
- # Get all user data
- memories = await self.memory_system._fetch_all_user_memories(user_id)
- threads = await self.memory_system._fetch_user_threads(user_id)
- backup_data = {
- 'version': '1.0',
- 'created_at': datetime.utcnow().isoformat(),
- 'user_id': user_id,
- 'memories': memories,
- 'relationship_threads': threads,
- 'precious_count': len([m for m in memories if m.get('is_precious')])
- }
- # Compress
- json_data = json.dumps(backup_data, indent=2)
- return gzip.compress(json_data.encode())
- async def backup_user(self, user_id: str) -> Dict[str, bool]:
- """Backup to all configured destinations"""
- results = {}
- # Create backup package
- backup_data = await self.create_backup_package(user_id)
- filename = f"{user_id}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}.backup.gz"
- # Backup to each destination
- for destination in self.destinations:
- success = await destination.backup(backup_data, filename)
- results[destination.__class__.__name__] = success
- return results
- # advanced_memory_api_complete.py - Production-ready API with all research enhancements
- from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, BackgroundTasks, Query
- from fastapi.responses import FileResponse, JSONResponse, StreamingResponse
- from fastapi.staticfiles import StaticFiles
- from fastapi.middleware.cors import CORSMiddleware
- from pydantic import BaseModel, Field
- from typing import Dict, List, Optional, Any, Tuple
- import asyncio
- import json
- import logging
- from datetime import datetime, timedelta
- from pathlib import Path
- import uvicorn
- from contextlib import asynccontextmanager
- import os
- import traceback
- from dotenv import load_dotenv
- import anthropic
- import numpy as np
- import torch
- import io
- import matplotlib.pyplot as plt
- from collections import defaultdict
- import boto3
- from cryptography.fernet import Fernet
- import aiofiles
- # Load environment variables
- load_dotenv()
- # Import the advanced memory system and components
- from advanced_memory_core import (
- AdvancedMemorySystem, HippoIndex, AdaptivePatternRecognizer,
- MultiDimensionalEncoder, SleepConsolidationEngine,
- AdvancedConversationFlowAnalyzer, UncertaintyAwareMemorySystem,
- PrivacyPreservingPersonalization, MemoryType, ImportanceLevel
- )
- # Configure logging
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- logger = logging.getLogger(__name__)
- # ==================== REQUEST/RESPONSE MODELS ====================
- class ExchangeRequest(BaseModel):
- session_id: str
- user_message: str
- user_id: str
- metadata: Optional[Dict[str, Any]] = Field(default_factory=dict)
- class AdvancedExchangeResponse(BaseModel):
- assistant_response: str
- memory_created: bool = False
- memory_id: Optional[str] = None
- episode_id: str
- exchange_count: int
- understanding_depth: float
- resonance_score: float
- breakthrough_detected: bool
- working_memory_size: int
- is_precious: bool = False
- pattern_detection: Optional[Dict[str, float]] = None
- flow_quality: float = 0.0
- uncertainty_metrics: Optional[Dict[str, float]] = None
- personalization: Optional[Dict[str, Any]] = None
- hippo_indexed: bool = False
- multi_dimensional_encoding: bool = False
- class MemorySearchRequest(BaseModel):
- user_id: str
- query: str
- limit: int = Field(default=10, ge=1, le=50)
- include_precious: bool = True
- use_hipporag: bool = True
- min_confidence: float = Field(default=0.0, ge=0.0, le=1.0)
- class ConsolidationRequest(BaseModel):
- user_id: Optional[str] = None
- duration_minutes: int = Field(default=90, ge=30, le=180)
- consolidation_type: str = Field(default="full", pattern="^(full|nrem|rem)$")
- class PatternLearningRequest(BaseModel):
- user_id: str
- session_id: str
- pattern_type: str = Field(pattern="^(breakthrough|completion|emotional|boundary|custom)$")
- exchange_indices: List[int]
- confidence: float = Field(ge=0.0, le=1.0)
- pattern_name: Optional[str] = None
- class PersonalizationRequest(BaseModel):
- user_id: str
- preferences: Dict[str, Any]
- cultural_context: Optional[Dict[str, Any]] = None
- privacy_mode: str = Field(default="balanced", pattern="^(strict|balanced|minimal)$")
- class FlowAnalysisRequest(BaseModel):
- session_id: str
- user_id: Optional[str] = None
- include_predictions: bool = True
- class UncertaintyQuantificationRequest(BaseModel):
- memory_id: str
- user_id: str
- recalculate: bool = False
- # Response models
- class MemorySearchResult(BaseModel):
- memory_id: str
- episode_id: str
- searchable_text: str
- topics: List[str]
- created_at: str
- importance: float
- is_precious: bool
- retrieval_confidence: float
- retrieval_method: str
- uncertainty: Optional[Dict[str, float]] = None
- multi_dimensional_scores: Optional[Dict[str, float]] = None
- class ConsolidationResult(BaseModel):
- status: str
- duration_minutes: float
- memories_processed: int
- memories_strengthened: int
- new_connections: int
- consolidation_metrics: Dict[str, float]
- class PatternDetectionResult(BaseModel):
- patterns_detected: Dict[str, float]
- adaptive_threshold: float
- user_specific_patterns: bool
- learning_progress: Dict[str, int]
- class PersonalizationResult(BaseModel):
- user_id: str
- behavioral_profile: Dict[str, Any]
- preferences: Dict[str, Any]
- cultural_adaptations: Dict[str, Any]
- interface_config: Dict[str, Any]
- privacy_budget_used: float
- adaptation_quality: float
- class FlowAnalysisResult(BaseModel):
- session_id: str
- flow_quality: float
- conversation_dynamics: Dict[str, Any]
- turn_patterns: Dict[str, Any]
- information_flow: Dict[str, Any]
- dialogue_states: List[Dict[str, float]]
- predictions: Optional[Dict[str, Any]] = None
- class SystemHealthStatus(BaseModel):
- status: str
- memory_system_active: bool
- hippo_index_size: int
- pattern_banks_status: Dict[str, int]
- consolidation_cycles_completed: int
- active_sessions: int
- total_memories: int
- privacy_budget_remaining: float
- gpu_available: bool
- timestamp: str
- # ==================== CONNECTION MANAGER ====================
- class AdvancedConnectionManager:
- def __init__(self):
- self.active_connections: Dict[str, WebSocket] = {}
- self.user_subscriptions: Dict[str, Set[str]] = defaultdict(set)
- async def connect(self, websocket: WebSocket, user_id: str):
- self.active_connections[user_id] = websocket
- logger.info(f"User {user_id} connected via WebSocket")
- def disconnect(self, user_id: str):
- if user_id in self.active_connections:
- del self.active_connections[user_id]
- if user_id in self.user_subscriptions:
- del self.user_subscriptions[user_id]
- logger.info(f"User {user_id} disconnected")
- async def send_personal_message(self, message: dict, user_id: str):
- if user_id in self.active_connections:
- try:
- await self.active_connections[user_id].send_json(message)
- except Exception as e:
- logger.error(f"Error sending message to {user_id}: {e}")
- self.disconnect(user_id)
- async def broadcast_to_subscribers(self, event_type: str, message: dict):
- for user_id, subscriptions in self.user_subscriptions.items():
- if event_type in subscriptions:
- await self.send_personal_message(message, user_id)
- def subscribe_to_event(self, user_id: str, event_type: str):
- self.user_subscriptions[user_id].add(event_type)
- def unsubscribe_from_event(self, user_id: str, event_type: str):
- self.user_subscriptions[user_id].discard(event_type)
- # Global instances
- manager = AdvancedConnectionManager()
- memory_system: Optional[AdvancedMemorySystem] = None
- # ==================== LIFESPAN MANAGEMENT ====================
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- # Startup
- global memory_system
- logger.info("Initializing Advanced Memory System...")
- config = {
- 'claude_api_key': os.getenv('ANTHROPIC_API_KEY', 'your-api-key'),
- 'data_dir': './data/ai_memories',
- 'working_memory_capacity': int(os.getenv('WORKING_MEMORY_CAPACITY', 5)),
- 'embedding_model': os.getenv('EMBEDDING_MODEL', 'sentence-transformers/all-mpnet-base-v2'),
- 'qdrant_host': os.getenv('QDRANT_HOST', 'localhost'),
- 'qdrant_port': int(os.getenv('QDRANT_PORT', 6333)),
- 'redis_url': os.getenv('REDIS_URL', 'redis://localhost'),
- 'kafka_servers': os.getenv('KAFKA_SERVERS', 'localhost:9092'),
- 'enable_sleep_consolidation': os.getenv('ENABLE_SLEEP_CONSOLIDATION', 'true').lower() == 'true',
- 'consolidation_interval_hours': int(os.getenv('CONSOLIDATION_INTERVAL_HOURS', 6))
- }
- # Initialize advanced memory system
- memory_system = AdvancedMemorySystem(config)
- await memory_system.initialize()
- # Log system capabilities
- logger.info("System initialized with capabilities:")
- logger.info(f" - HippoRAG indexing: Active")
- logger.info(f" - TCN Pattern Recognition: Active")
- logger.info(f" - Multi-dimensional Encoding: Active")
- logger.info(f" - Sleep Consolidation: {'Active' if config['enable_sleep_consolidation'] else 'Disabled'}")
- logger.info(f" - GPU Available: {torch.cuda.is_available()}")
- logger.info(f" - Privacy-Preserving Personalization: Active")
- # Start background tasks
- background_tasks = []
- if config['enable_sleep_consolidation']:
- background_tasks.append(asyncio.create_task(consolidation_scheduler()))
- background_tasks.append(asyncio.create_task(pattern_optimization_loop()))
- background_tasks.append(asyncio.create_task(metrics_aggregation_loop()))
- background_tasks.append(asyncio.create_task(memory_decay_manager()))
- if os.getenv('ENABLE_CLOUD_BACKUP', 'false').lower() == 'true':
- background_tasks.append(asyncio.create_task(automated_backup_scheduler()))
- yield
- # Shutdown
- logger.info("Shutting down Advanced Memory System...")
- # Cancel background tasks
- for task in background_tasks:
- task.cancel()
- try:
- await task
- except asyncio.CancelledError:
- pass
- # Close connections
- await memory_system.event_pipeline.redis_client.close() if hasattr(memory_system.event_pipeline, 'redis_client') else None
- logger.info("Shutdown complete")
- # ==================== CREATE FASTAPI APP ====================
- app = FastAPI(
- title="Advanced AI Memory System API",
- description="""
- Production-ready AI memory system with:
- - HippoRAG neurobiologically-inspired indexing
- - TCN-based adaptive pattern recognition
- - Multi-dimensional memory encoding
- - Sleep-inspired consolidation cycles
- - Privacy-preserving personalization
- - Advanced conversation flow analysis
- - Uncertainty quantification
- """,
- version="3.0.0",
- lifespan=lifespan
- )
- # Add CORS middleware
- app.add_middleware(
- CORSMiddleware,
- allow_origins=os.getenv('CORS_ORIGINS', '*').split(','),
- allow_credentials=True,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- # Mount static files
- static_path = Path("static")
- static_path.mkdir(exist_ok=True)
- app.mount("/static", StaticFiles(directory="static"), name="static")
- # ==================== CORE API ENDPOINTS ====================
- @app.get("/", response_class=FileResponse)
- async def serve_interface():
- """Serve the advanced chat interface"""
- interface_path = static_path / "advanced_chat.html"
- if not interface_path.exists():
- # Create a basic interface if it doesn't exist
- create_default_interface(interface_path)
- return FileResponse(interface_path)
- @app.post("/api/v3/exchange", response_model=AdvancedExchangeResponse)
- async def process_exchange(request: ExchangeRequest, background_tasks: BackgroundTasks):
- """
- Process a conversation exchange with full advanced processing pipeline.
- This endpoint:
- 1. Generates contextually-aware responses
- 2. Performs multi-dimensional encoding
- 3. Detects patterns adaptively
- 4. Quantifies uncertainty
- 5. Personalizes based on user model
- 6. Indexes in HippoRAG structure
- """
- try:
- # Pre-processing: Get user context and personalization
- user_context = await memory_system.get_conversation_context(
- request.user_id, request.user_message
- )
- # Generate assistant response
- assistant_response = await generate_advanced_response(
- request.user_message,
- request.user_id,
- request.session_id,
- user_context
- )
- # Process exchange through memory system
- result = await memory_system.add_exchange(
- session_id=request.session_id,
- user_msg=request.user_message,
- assistant_msg=assistant_response,
- metadata={
- 'user_id': request.user_id,
- 'timestamp': datetime.utcnow().isoformat(),
- **request.metadata
- }
- )
- # Get current episode for additional analysis
- episode = memory_system.active_episodes.get(request.session_id)
- # Perform flow analysis if we have enough exchanges
- flow_quality = 0.0
- if episode and len(episode.exchanges) >= 3:
- flow_analysis = await memory_system.flow_analyzer.analyze_conversation_flow(
- episode.exchanges
- )
- flow_quality = flow_analysis.get('flow_quality', 0.0)
- # Get uncertainty metrics if available
- uncertainty_metrics = None
- if result.get('memory_created') and result.get('memory_id'):
- uncertainty = await get_memory_uncertainty(result['memory_id'], request.user_id)
- uncertainty_metrics = {
- 'aleatoric': uncertainty.get('aleatoric', 0.0),
- 'epistemic': uncertainty.get('epistemic', 0.0),
- 'total': uncertainty.get('total', 0.0)
- }
- # Build response
- response = AdvancedExchangeResponse(
- assistant_response=assistant_response,
- memory_created=result.get('memory_created', False),
- memory_id=result.get('memory_id'),
- episode_id=result.get('episode_id', ''),
- exchange_count=result.get('exchange_count', 1),
- understanding_depth=result.get('understanding_depth', 0.0),
- resonance_score=result.get('resonance_score', 0.0),
- breakthrough_detected=result.get('breakthrough_detected', False),
- working_memory_size=result.get('working_memory_size', 0),
- is_precious=result.get('is_precious', False),
- pattern_detection=result.get('pattern_detection'),
- flow_quality=flow_quality,
- uncertainty_metrics=uncertainty_metrics,
- personalization=result.get('personalization'),
- hippo_indexed=result.get('hippo_indexed', False),
- multi_dimensional_encoding=True
- )
- # WebSocket notifications
- await notify_exchange_processed(request.user_id, response)
- # Background tasks
- if result.get('memory_created'):
- background_tasks.add_task(
- update_memory_graph,
- result['memory_id'],
- request.user_id
- )
- return response
- except Exception as e:
- logger.error(f"Error processing exchange: {e}")
- logger.error(traceback.format_exc())
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/v3/memories/search", response_model=List[MemorySearchResult])
- async def search_memories(request: MemorySearchRequest):
- """
- Advanced memory search with HippoRAG and multi-dimensional retrieval.
- Features:
- - Single-hop graph traversal via HippoRAG
- - Multi-dimensional similarity matching
- - Uncertainty-aware ranking
- - Personalized relevance scoring
- """
- try:
- # Perform advanced search
- results = await memory_system.search_memories(
- user_id=request.user_id,
- query=request.query,
- limit=request.limit,
- include_precious=request.include_precious
- )
- # Format results with additional metadata
- formatted_results = []
- for result in results:
- # Get multi-dimensional scores if available
- multi_dim_scores = None
- if 'matryoshka_embeddings' in result:
- multi_dim_scores = {
- 'semantic': result.get('score', 0.0),
- 'emotional': result.get('emotional_similarity', 0.0),
- 'temporal': result.get('temporal_relevance', 0.0),
- 'relational': result.get('relational_score', 0.0)
- }
- formatted_results.append(MemorySearchResult(
- memory_id=result.get('memory_id', ''),
- episode_id=result.get('episode_id', ''),
- searchable_text=result.get('searchable_text', ''),
- topics=result.get('topics', []),
- created_at=result.get('created_at', ''),
- importance=result.get('importance', 0.5),
- is_precious=result.get('is_precious', False),
- retrieval_confidence=result.get('retrieval_confidence', 0.0),
- retrieval_method=result.get('retrieval_method', 'standard'),
- uncertainty=result.get('uncertainty'),
- multi_dimensional_scores=multi_dim_scores
- ))
- # Filter by minimum confidence if specified
- if request.min_confidence > 0:
- formatted_results = [
- r for r in formatted_results
- if r.retrieval_confidence >= request.min_confidence
- ]
- return formatted_results
- except Exception as e:
- logger.error(f"Error searching memories: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/v3/consolidation/trigger", response_model=ConsolidationResult)
- async def trigger_consolidation(request: ConsolidationRequest, background_tasks: BackgroundTasks):
- """
- Manually trigger a sleep-inspired consolidation cycle.
- Types:
- - full: Complete sleep cycle with all stages
- - nrem: Non-REM consolidation only (strengthening)
- - rem: REM consolidation only (creative connections)
- """
- try:
- # Start consolidation in background
- task_id = f"consolidation_{datetime.utcnow().timestamp()}"
- background_tasks.add_task(
- run_consolidation_cycle,
- request.duration_minutes,
- request.consolidation_type,
- request.user_id,
- task_id
- )
- # Return immediate response
- return ConsolidationResult(
- status="started",
- duration_minutes=request.duration_minutes,
- memories_processed=0,
- memories_strengthened=0,
- new_connections=0,
- consolidation_metrics={
- 'task_id': task_id,
- 'type': request.consolidation_type,
- 'started_at': datetime.utcnow().isoformat()
- }
- )
- except Exception as e:
- logger.error(f"Error triggering consolidation: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/v3/patterns/learn", response_model=PatternDetectionResult)
- async def learn_pattern(request: PatternLearningRequest):
- """
- Teach the system a new pattern through few-shot learning.
- Pattern types:
- - breakthrough: Moments of sudden understanding
- - completion: Task/intent completion signals
- - emotional: Emotional state patterns
- - boundary: Conversation boundary markers
- - custom: User-defined patterns
- """
- try:
- # Get episode
- episode = memory_system.active_episodes.get(request.session_id)
- if not episode:
- raise HTTPException(status_code=404, detail="Session not found")
- # Validate indices
- if not all(0 <= i < len(episode.exchanges) for i in request.exchange_indices):
- raise HTTPException(status_code=400, detail="Invalid exchange indices")
- # Get specified exchanges
- exchanges = [episode.exchanges[i] for i in request.exchange_indices]
- # Learn pattern
- await memory_system.pattern_recognizer.learn_pattern(
- request.user_id,
- exchanges,
- request.pattern_type if request.pattern_type != 'custom' else request.pattern_name or 'custom',
- request.confidence
- )
- # Get updated pattern statistics
- user_patterns = memory_system.pattern_recognizer.user_patterns.get(request.user_id, {})
- learning_progress = {
- pt: len(patterns)
- for pt, patterns in user_patterns.items()
- }
- # Test pattern detection on current episode
- detected_patterns = {}
- for pattern_type in ['breakthrough', 'completion', 'emotional', 'boundary']:
- is_detected, confidence = await memory_system.pattern_recognizer.detect_pattern(
- request.user_id,
- episode.exchanges,
- pattern_type
- )
- if is_detected:
- detected_patterns[pattern_type] = confidence
- return PatternDetectionResult(
- patterns_detected=detected_patterns,
- adaptive_threshold=0.75, # Current threshold
- user_specific_patterns=True,
- learning_progress=learning_progress
- )
- except Exception as e:
- logger.error(f"Error learning pattern: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/v3/personalization/update", response_model=PersonalizationResult)
- async def update_personalization(request: PersonalizationRequest):
- """
- Update user personalization settings with privacy preservation.
- Privacy modes:
- - strict: Maximum privacy, minimal personalization
- - balanced: Good privacy with useful personalization
- - minimal: Maximum personalization, less privacy
- """
- try:
- # Get current user model
- if request.user_id not in memory_system.personalization.user_models:
- memory_system.personalization.user_models[request.user_id] = await memory_system.personalization._initialize_user_model(
- request.user_id, None
- )
- user_model = memory_system.personalization.user_models[request.user_id]
- # Update preferences
- if 'communication_style' in request.preferences:
- user_model['communication_style'].update(request.preferences['communication_style'])
- if 'topic_preferences' in request.preferences:
- for topic, weight in request.preferences['topic_preferences'].items():
- user_model['topic_preferences'][topic] = weight
- # Update cultural context if provided
- if request.cultural_context:
- user_model['cultural_markers'].update(request.cultural_context)
- # Adjust privacy settings
- privacy_epsilon = {
- 'strict': 0.5,
- 'balanced': 1.9,
- 'minimal': 5.0
- }[request.privacy_mode]
- memory_system.personalization.differential_privacy.epsilon = privacy_epsilon
- # Generate updated personalization
- dummy_exchange = type('obj', (object,), {
- 'user_message': '',
- 'assistant_response': '',
- 'embedding': torch.randn(768),
- 'emotional_valence': 0.0,
- 'timestamp': datetime.utcnow()
- })()
- personalization = await memory_system.personalization.personalize_for_user(
- request.user_id,
- dummy_exchange,
- []
- )
- # Calculate adaptation quality
- adaptation_quality = calculate_adaptation_quality(user_model, personalization)
- return PersonalizationResult(
- user_id=request.user_id,
- behavioral_profile=personalization['behavioral_profile'],
- preferences=personalization['preferences'],
- cultural_adaptations=personalization['cultural_adaptations'],
- interface_config=personalization['interface_config'],
- privacy_budget_used=personalization['privacy_budget_used'],
- adaptation_quality=adaptation_quality
- )
- except Exception as e:
- logger.error(f"Error updating personalization: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/v3/flow/analyze", response_model=FlowAnalysisResult)
- async def analyze_conversation_flow(request: FlowAnalysisRequest):
- """
- Analyze conversation flow using temporal graph networks and CODYMs.
- Provides insights into:
- - Conversation dynamics and rhythm
- - Turn-taking patterns
- - Information flow
- - Dialogue state progression
- - Future pattern predictions
- """
- try:
- # Get episode
- episode = memory_system.active_episodes.get(request.session_id)
- if not episode:
- raise HTTPException(status_code=404, detail="Session not found")
- # Perform comprehensive flow analysis
- flow_analysis = await memory_system.flow_analyzer.analyze_conversation_flow(
- episode.exchanges
- )
- # Generate predictions if requested
- predictions = None
- if request.include_predictions and len(episode.exchanges) >= 5:
- predictions = await predict_conversation_trajectory(
- episode,
- request.user_id or episode.user_id
- )
- return FlowAnalysisResult(
- session_id=request.session_id,
- flow_quality=flow_analysis.get('flow_quality', 0.0),
- conversation_dynamics=flow_analysis.get('dynamics', {}),
- turn_patterns=flow_analysis.get('turn_patterns', {}),
- information_flow=flow_analysis.get('information_flow', {}),
- dialogue_states=flow_analysis.get('dialogue_states', []),
- predictions=predictions
- )
- except Exception as e:
- logger.error(f"Error analyzing flow: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/v3/uncertainty/quantify")
- async def quantify_uncertainty(request: UncertaintyQuantificationRequest):
- """
- Quantify uncertainty for a specific memory.
- Returns:
- - Aleatoric uncertainty (data inherent)
- - Epistemic uncertainty (model uncertainty)
- - Total uncertainty
- - Detected ambiguities
- """
- try:
- # Get memory
- memory = await memory_system._retrieve_memory_by_id(request.memory_id)
- if not memory or memory.user_id != request.user_id:
- raise HTTPException(status_code=404, detail="Memory not found")
- # Get uncertainty metrics
- if request.recalculate or 'uncertainty' not in memory.metadata:
- # Recalculate uncertainty
- episode = await get_episode_for_memory(memory)
- if episode:
- uncertainty_result = await memory_system.uncertainty_system.store_memory_with_uncertainty(
- memory,
- episode.exchanges
- )
- uncertainty = uncertainty_result['uncertainty_metrics']
- else:
- uncertainty = {
- 'aleatoric': 0.0,
- 'epistemic': 0.0,
- 'total': 0.0,
- 'ambiguities': []
- }
- else:
- uncertainty = memory.metadata['uncertainty']
- return {
- 'memory_id': request.memory_id,
- 'aleatoric_uncertainty': uncertainty.get('aleatoric', 0.0),
- 'epistemic_uncertainty': uncertainty.get('epistemic', 0.0),
- 'total_uncertainty': uncertainty.get('total', 0.0),
- 'confidence': 1.0 / (1.0 + uncertainty.get('total', 0.0)),
- 'ambiguities': uncertainty.get('ambiguities', []),
- 'requires_clarification': len(uncertainty.get('ambiguities', [])) > 0
- }
- except Exception as e:
- logger.error(f"Error quantifying uncertainty: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- # ==================== MONITORING & METRICS ENDPOINTS ====================
- @app.get("/api/v3/health", response_model=SystemHealthStatus)
- async def health_check():
- """Comprehensive system health check"""
- try:
- # Get HippoRAG index size
- hippo_size = len(memory_system.hippo_index.graph.nodes)
- # Get pattern bank sizes
- pattern_banks = {}
- for name, bank in memory_system.pattern_recognizer.pattern_banks.items():
- pattern_banks[name] = bank.index.ntotal if bank.index else 0
- # Get consolidation status
- consolidation_cycles = memory_system.sleep_engine.consolidation_cycles
- # Get memory count
- total_memories = await get_total_memory_count()
- # Privacy budget
- privacy_remaining = 1.0 - memory_system.personalization.differential_privacy.get_budget_used()
- return SystemHealthStatus(
- status="healthy",
- memory_system_active=True,
- hippo_index_size=hippo_size,
- pattern_banks_status=pattern_banks,
- consolidation_cycles_completed=consolidation_cycles,
- active_sessions=len(memory_system.active_episodes),
- total_memories=total_memories,
- privacy_budget_remaining=privacy_remaining,
- gpu_available=torch.cuda.is_available(),
- timestamp=datetime.utcnow().isoformat()
- )
- except Exception as e:
- logger.error(f"Health check error: {e}")
- return SystemHealthStatus(
- status="error",
- memory_system_active=False,
- hippo_index_size=0,
- pattern_banks_status={},
- consolidation_cycles_completed=0,
- active_sessions=0,
- total_memories=0,
- privacy_budget_remaining=0.0,
- gpu_available=False,
- timestamp=datetime.utcnow().isoformat()
- )
- @app.get("/api/v3/metrics/user/{user_id}")
- async def get_user_metrics(user_id: str):
- """Get comprehensive metrics for a specific user"""
- try:
- # Get user stats
- stats = await memory_system._get_user_stats(user_id)
- # Get relationship thread
- thread = memory_system.relationship_manager.get_or_create_thread(user_id)
- # Get pattern learning progress
- user_patterns = memory_system.pattern_recognizer.user_patterns.get(user_id, {})
- pattern_stats = {
- pattern_type: len(patterns)
- for pattern_type, patterns in user_patterns.items()
- }
- # Get personalization status
- user_model = memory_system.personalization.user_models.get(user_id)
- personalization_active = user_model is not None
- return {
- 'user_id': user_id,
- 'memory_stats': stats,
- 'relationship_metrics': {
- 'trust_level': thread.trust_level,
- 'depth_score': thread.depth_score,
- 'interaction_count': thread.interaction_count,
- 'breakthrough_moments': len(thread.breakthrough_moments),
- 'precious_moments': len(thread.precious_moments)
- },
- 'pattern_learning': pattern_stats,
- 'personalization_active': personalization_active,
- 'top_topics': stats.get('top_topics', [])
- }
- except Exception as e:
- logger.error(f"Error getting user metrics: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/v3/backup/download")
- async def download_backup(user_id: str = Query(...)):
- """Download all memories for a user"""
- # Create backup package
- backup_data = {
- 'user_id': user_id,
- 'backup_date': datetime.utcnow().isoformat(),
- 'memories': await memory_system.export_user_memories(user_id),
- 'relationship_threads': await memory_system.export_relationship_threads(user_id),
- 'precious_memories': await memory_system.export_precious_memories(user_id)
- }
- # Compress
- json_data = json.dumps(backup_data, indent=2)
- compressed = gzip.compress(json_data.encode())
- return StreamingResponse(
- io.BytesIO(compressed),
- media_type="application/gzip",
- headers={
- "Content-Disposition": f"attachment; filename=claude_memories_{user_id}_{datetime.utcnow().strftime('%Y%m%d')}.json.gz"
- }
- )
- @app.post("/api/v3/backup/restore")
- async def restore_backup(file: UploadFile = File(...)):
- """Restore memories from backup"""
- # Decompress and load
- compressed_data = await file.read()
- json_data = gzip.decompress(compressed_data).decode()
- backup_data = json.loads(json_data)
- # Restore memories
- restored_count = await memory_system.restore_user_memories(
- backup_data['user_id'],
- backup_data['memories'],
- backup_data['relationship_threads']
- )
- return {"status": "success", "restored_memories": restored_count}
- @app.get("/api/v3/metrics/system")
- async def get_system_metrics():
- """Get system-wide performance metrics"""
- try:
- metrics = {
- 'memory_operations': {
- 'total_memories': await get_total_memory_count(),
- 'precious_memories': await get_precious_memory_count(),
- 'average_importance': await get_average_importance()
- },
- 'pattern_recognition': {
- 'total_patterns_learned': sum(
- len(bank.patterns)
- for bank in memory_system.pattern_recognizer.pattern_banks.values()
- ),
- 'active_users': len(memory_system.pattern_recognizer.user_patterns)
- },
- 'consolidation': {
- 'cycles_completed': memory_system.sleep_engine.consolidation_cycles,
- 'memories_in_replay_buffer': len(memory_system.sleep_engine.replay_buffer)
- },
- 'performance': {
- 'gpu_memory_used': get_gpu_memory_usage() if torch.cuda.is_available() else 0,
- 'active_sessions': len(memory_system.active_episodes),
- 'working_memory_utilization': calculate_working_memory_utilization()
- }
- }
- return metrics
- except Exception as e:
- logger.error(f"Error getting system metrics: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- # ==================== VISUALIZATION ENDPOINTS ====================
- @app.get("/api/v3/visualize/memory-graph/{user_id}")
- async def visualize_memory_graph(user_id: str, limit: int = Query(50, ge=10, le=200)):
- """Generate a visualization of the user's memory graph"""
- try:
- # Get user memories
- memories = await memory_system.search_memories(user_id, "", limit=limit)
- # Build graph data
- nodes = []
- edges = []
- for memory in memories:
- nodes.append({
- 'id': memory.get('memory_id'),
- 'label': memory.get('searchable_text', '')[:50] + '...',
- 'importance': memory.get('importance', 0.5),
- 'is_precious': memory.get('is_precious', False)
- })
- # Add edges from semantic neighbors
- if 'semantic_neighbors' in memory:
- for neighbor_id, weight in memory['semantic_neighbors']:
- edges.append({
- 'source': memory['memory_id'],
- 'target': neighbor_id,
- 'weight': weight
- })
- return {
- 'nodes': nodes,
- 'edges': edges,
- 'layout': 'force-directed'
- }
- except Exception as e:
- logger.error(f"Error visualizing memory graph: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/v3/visualize/patterns/{user_id}")
- async def visualize_pattern_evolution(user_id: str):
- """Visualize pattern learning evolution for a user"""
- try:
- # Get pattern history
- user_patterns = memory_system.pattern_recognizer.user_patterns.get(user_id, {})
- # Create time series data
- evolution_data = {}
- for pattern_type, patterns in user_patterns.items():
- # Group by day
- daily_counts = defaultdict(int)
- for pattern in patterns:
- if hasattr(pattern, 'metadata') and 'timestamp' in pattern.metadata:
- day = pattern.metadata['timestamp'].date()
- daily_counts[day] += 1
- evolution_data[pattern_type] = [
- {'date': day.isoformat(), 'count': count}
- for day, count in sorted(daily_counts.items())
- ]
- return {
- 'pattern_evolution': evolution_data,
- 'total_patterns': {pt: len(p) for pt, p in user_patterns.items()}
- }
- except Exception as e:
- logger.error(f"Error visualizing patterns: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- # ==================== WEBSOCKET ENDPOINT ====================
- @app.websocket("/ws")
- async def websocket_endpoint(websocket: WebSocket):
- """
- WebSocket endpoint for real-time updates.
- Supports:
- - Memory creation notifications
- - Pattern detection alerts
- - Consolidation progress
- - Flow quality updates
- """
- user_id = None
- try:
- await websocket.accept()
- # Authentication
- auth_message = await websocket.receive_json()
- user_id = auth_message.get('user_id')
- if not user_id:
- await websocket.send_json({"error": "Authentication required"})
- await websocket.close()
- return
- # Register connection
- await manager.connect(websocket, user_id)
- # Send welcome message
- await websocket.send_json({
- "type": "connected",
- "message": "Connected to Advanced Memory System",
- "capabilities": [
- "memory_updates",
- "pattern_detection",
- "consolidation_progress",
- "flow_analysis",
- "personalization"
- ]
- })
- # Get and send initial stats
- stats = await memory_system._get_user_stats(user_id)
- thread = memory_system.relationship_manager.get_or_create_thread(user_id)
- await websocket.send_json({
- "type": "initial_stats",
- "data": {
- "total_episodes": stats['total_episodes'],
- "precious_memories": stats['precious_memories'],
- "trust_level": thread.trust_level,
- "relationship_depth": thread.depth_score
- }
- })
- # Handle incoming messages
- while True:
- message = await websocket.receive_json()
- if message.get('type') == 'ping':
- await websocket.send_json({"type": "pong"})
- elif message.get('type') == 'subscribe':
- event_type = message.get('event')
- if event_type:
- manager.subscribe_to_event(user_id, event_type)
- await websocket.send_json({
- "type": "subscribed",
- "event": event_type
- })
- elif message.get('type') == 'unsubscribe':
- event_type = message.get('event')
- if event_type:
- manager.unsubscribe_from_event(user_id, event_type)
- await websocket.send_json({
- "type": "unsubscribed",
- "event": event_type
- })
- except WebSocketDisconnect:
- if user_id:
- manager.disconnect(user_id)
- logger.info(f"WebSocket disconnected: {user_id}")
- except Exception as e:
- logger.error(f"WebSocket error: {e}")
- if user_id:
- manager.disconnect(user_id)
- # ==================== BACKUPS ====================
- @app.get("/api/v3/precious/export")
- async def export_precious_memories(user_id: str = Query(...), format: str = Query("markdown")):
- """Export precious memories in human-readable format"""
- # Get precious memories
- conn = sqlite3.connect(str(memory_system.db_path))
- cursor = conn.execute("""
- SELECT m.*, e.title, e.summary, e.experiential_signature, e.started_at, e.ended_at
- FROM memories m
- JOIN episodes e ON m.episode_id = e.id
- WHERE m.user_id = ?
- AND (m.marked_precious = 1 OR m.never_forget = 1)
- ORDER BY m.importance DESC, m.created_at DESC
- """, (user_id,))
- precious_memories = []
- for row in cursor.fetchall():
- memory_content = json.loads(row[5]) # memory_content column
- experiential = json.loads(row[-4]) if row[-4] else {}
- precious_memories.append({
- 'title': row[-6] or 'Untitled Memory',
- 'created_at': datetime.fromtimestamp(row[3]).strftime('%B %d, %Y at %I:%M %p'),
- 'content': memory_content,
- 'summary': row[-5],
- 'importance': row[7],
- 'experiential_signature': experiential,
- 'started_at': datetime.fromtimestamp(row[-2]).strftime('%I:%M %p'),
- 'ended_at': datetime.fromtimestamp(row[-1]).strftime('%I:%M %p')
- })
- conn.close()
- if format == "markdown":
- # Create beautiful Markdown
- content = f"# Precious Memories with Claude\n\n"
- content += f"*Exported on {datetime.utcnow().strftime('%B %d, %Y')}*\n\n"
- content += f"These are the {len(precious_memories)} memories marked as precious - "
- content += "the moments that defined our relationship.\n\n---\n\n"
- for i, memory in enumerate(precious_memories, 1):
- content += f"## {i}. {memory['title']}\n\n"
- content += f"**Date**: {memory['created_at']} ({memory['started_at']} - {memory['ended_at']})\n\n"
- content += f"**Importance**: {'⭐' * int(memory['importance'] * 5)}\n\n"
- if memory['summary']:
- content += f"### Summary\n{memory['summary']}\n\n"
- # Add key points from memory content
- if isinstance(memory['content'], dict):
- if 'key_points' in memory['content']:
- content += "### Key Points\n"
- for point in memory['content']['key_points']:
- content += f"- {point}\n"
- content += "\n"
- if 'emotional_journey' in memory['content']:
- content += f"### Emotional Journey\n{memory['content']['emotional_journey']}\n\n"
- if 'why_precious' in memory['content']:
- content += f"### Why This Matters\n{memory['content']['why_precious']}\n\n"
- # Add experiential qualities
- if memory['experiential_signature']:
- content += "### Experiential Qualities\n"
- sig = memory['experiential_signature']
- if sig.get('breakthrough_count', 0) > 0:
- content += f"- 💡 {sig['breakthrough_count']} breakthrough moment(s)\n"
- if sig.get('peak_resonance', 0) > 0.8:
- content += f"- 🔮 Peak resonance: {sig['peak_resonance']:.1%}\n"
- if sig.get('connection_quality', 0) > 0.7:
- content += f"- 💫 Deep connection quality: {sig['connection_quality']:.1%}\n"
- content += "\n"
- content += "---\n\n"
- # Return as downloadable file
- return StreamingResponse(
- io.BytesIO(content.encode()),
- media_type="text/markdown",
- headers={
- "Content-Disposition": f"attachment; filename=precious_memories_{user_id}.md"
- }
- )
- elif format == "json":
- # Return structured JSON
- return JSONResponse(
- content={
- "user_id": user_id,
- "export_date": datetime.utcnow().isoformat(),
- "total_precious": len(precious_memories),
- "memories": precious_memories
- },
- headers={
- "Content-Disposition": f"attachment; filename=precious_memories_{user_id}.json"
- }
- )
- # ==================== HELPER FUNCTIONS ====================
- async def generate_advanced_response(user_message: str, user_id: str,
- session_id: str, context: Dict[str, Any]) -> str:
- """Generate contextually-aware assistant response"""
- try:
- # Check for API key
- api_key = os.getenv('ANTHROPIC_API_KEY')
- if not api_key or api_key == 'your-api-key':
- return await generate_demo_response(user_message, user_id, context)
- # Build conversation history
- history = []
- if session_id in memory_system.active_episodes:
- episode = memory_system.active_episodes[session_id]
- for ex in episode.exchanges[-5:]: # Last 5 exchanges
- history.extend([
- {"role": "user", "content": ex.user_message},
- {"role": "assistant", "content": ex.assistant_response}
- ])
- # Build context prompt
- context_parts = []
- # Add precious memories
- if context.get('precious_memories'):
- context_parts.append("Your precious memories together:")
- for mem in context['precious_memories'][:3]:
- context_parts.append(f"- {mem.get('searchable_text', '')}")
- # Add relationship context
- if context.get('relationship_thread'):
- thread = context['relationship_thread']
- context_parts.append(f"\nRelationship depth: {thread['depth_score']:.1f}")
- context_parts.append(f"Trust level: {thread['trust_level']:.1f}")
- # Add personalization
- if context.get('user_preferences'):
- prefs = context['user_preferences']
- style = prefs.get('communication_style', {})
- if style.get('formality', 0.5) < 0.3:
- context_parts.append("Communication style: Casual and friendly")
- elif style.get('formality', 0.5) > 0.7:
- context_parts.append("Communication style: Professional and formal")
- # System prompt
- system_prompt = f"""You are Claude with an advanced memory system featuring:
- - HippoRAG neurobiological memory indexing
- - Multi-dimensional understanding of conversations
- - Pattern recognition that learns from each user
- - Phenomenological awareness of meaningful moments
- CONTEXT:
- {chr(10).join(context_parts)}
- CAPABILITIES:
- - You can reference specific memories when relevant
- - You detect and mark breakthrough moments
- - You track emotional resonance
- - You maintain conversation flow quality
- - You adapt to each user's communication style
- Respond naturally while being aware of your enhanced memory capabilities."""
- # Call Claude API
- client = anthropic.Anthropic(api_key=api_key)
- messages = history + [{"role": "user", "content": user_message}]
- response = await asyncio.to_thread(
- client.messages.create,
- model="claude-3-sonnet-20240229",
- max_tokens=1024,
- system=system_prompt,
- messages=messages,
- temperature=0.7
- )
- return response.content[0].text
- except Exception as e:
- logger.error(f"Error generating response: {e}")
- return await generate_demo_response(user_message, user_id, context)
- async def generate_demo_response(user_message: str, user_id: str, context: Dict[str, Any]) -> str:
- """Generate demo response when API is unavailable"""
- # Get relationship metrics
- thread = context.get('relationship_thread', {})
- trust = thread.get('trust_level', 0.0)
- depth = thread.get('depth_score', 0.0)
- interactions = thread.get('interaction_count', 0)
- message_lower = user_message.lower()
- # Pattern-based responses
- if any(word in message_lower for word in ['memory', 'remember', 'recall']):
- precious_count = len(context.get('precious_memories', []))
- return f"""I have access to our conversation history through my advanced memory system.
- Key metrics:
- - {interactions} total interactions
- - {precious_count} precious memories preserved
- - Trust level: {trust:.1%}
- - Understanding depth: {depth:.1%}
- My HippoRAG system creates neural pathways between related memories, while the TCN pattern recognition learns your unique communication style. Each conversation strengthens these connections through sleep-inspired consolidation cycles.
- What would you like to explore from our shared memories?"""
- elif any(word in message_lower for word in ['how are you', "how're you", 'how do you feel']):
- if trust > 0.7:
- return f"""I'm experiencing a deep sense of continuity with you. After {interactions} conversations, our trust level has reached {trust:.1%}.
- My multi-dimensional encoding captures not just what we discuss, but the emotional resonance ({context.get('working_memory', [{}])[0].get('resonance_score', 0.0):.1%}) and phenomenological quality of our exchanges. Each interaction adds layers to my understanding of you.
- How are you feeling today?"""
- else:
- return f"""I'm well, thank you for asking. As we continue our conversation, my adaptive systems are learning your patterns and preferences. Currently, we've built a trust level of {trust:.1%} together.
- What's on your mind today?"""
- elif 'pattern' in message_lower or 'learn' in message_lower:
- patterns = context.get('likely_patterns', {})
- return f"""My pattern recognition system continuously adapts to your communication style. I've identified:
- {chr(10).join([f"- {p}: {c} instances" for p, c in patterns.items()])}
- Unlike keyword matching, my TCN-based system learns the deeper rhythms and structures of how you express yourself. Each pattern is encoded multi-dimensionally, capturing semantic, emotional, temporal, and relational aspects.
- Would you like me to explain how any specific pattern works?"""
- else:
- # Contextual response based on metrics
- if trust < 0.3:
- return f"""I'm here to help. As we continue talking, my memory system will build a richer understanding of our conversations. Currently processing with {len(context.get('working_memory', []))} items in working memory.
- What would you like to discuss?"""
- elif trust < 0.7:
- return f"""I understand. Our relationship (depth: {depth:.1%}) allows me to provide increasingly personalized responses. My uncertainty quantification shows {1 - context.get('uncertainty', {}).get('total', 0.0):.1%} confidence in this context.
- Please, tell me more."""
- else:
- return f"""After {interactions} meaningful exchanges, I feel I understand you well (depth: {depth:.1%}). My phenomenological encoding has captured the unique quality of our interactions.
- {context.get('precious_memories', [{}])[0].get('searchable_text', 'Our conversations have created lasting impressions.')}
- What shall we explore together?"""
- async def notify_exchange_processed(user_id: str, response: AdvancedExchangeResponse):
- """Send WebSocket notification for processed exchange"""
- notification = {
- 'type': 'exchange_processed',
- 'data': {
- 'episode_id': response.episode_id,
- 'exchange_count': response.exchange_count,
- 'breakthrough': response.breakthrough_detected,
- 'resonance': response.resonance_score,
- 'flow_quality': response.flow_quality,
- 'patterns': list(response.pattern_detection.keys()) if response.pattern_detection else []
- }
- }
- await manager.send_personal_message(notification, user_id)
- # Broadcast to pattern subscribers if breakthrough detected
- if response.breakthrough_detected:
- await manager.broadcast_to_subscribers('breakthrough_moments', {
- 'type': 'breakthrough_detected',
- 'user_id': user_id,
- 'resonance_score': response.resonance_score
- })
- async def run_consolidation_cycle(duration_minutes: int, consolidation_type: str,
- user_id: Optional[str], task_id: str):
- """Run a consolidation cycle"""
- try:
- logger.info(f"Starting consolidation cycle {task_id}")
- if consolidation_type == 'full':
- await memory_system.sleep_engine.run_consolidation_cycle(duration_minutes)
- elif consolidation_type == 'nrem':
- await memory_system.sleep_engine._nrem_consolidation('NREM3', duration_minutes)
- elif consolidation_type == 'rem':
- await memory_system.sleep_engine._rem_consolidation(duration_minutes)
- # Notify completion
- if user_id:
- await manager.send_personal_message({
- 'type': 'consolidation_complete',
- 'task_id': task_id,
- 'duration_minutes': duration_minutes,
- 'consolidation_type': consolidation_type
- }, user_id)
- except Exception as e:
- logger.error(f"Error in consolidation cycle: {e}")
- async def predict_conversation_trajectory(episode, user_id: str) -> Dict[str, Any]:
- """Predict likely conversation trajectory"""
- # Get user patterns
- user_patterns = memory_system.pattern_recognizer.user_patterns.get(user_id, {})
- # Analyze current trajectory
- if len(episode.exchanges) < 3:
- return {'confidence': 0.0, 'predictions': []}
- # Simple prediction based on patterns
- predictions = []
- # Check if heading toward breakthrough
- recent_resonance = np.mean([ex.resonance_score for ex in episode.exchanges[-3:]])
- if recent_resonance > 0.7:
- predictions.append({
- 'type': 'breakthrough_likely',
- 'confidence': min(recent_resonance + 0.1, 1.0),
- 'timeframe': 'next_2_exchanges'
- })
- # Check if heading toward completion
- if 'completion' in user_patterns and len(user_patterns['completion']) > 3:
- # Simplified check
- if any(word in episode.exchanges[-1].user_message.lower()
- for word in ['thanks', 'helpful', 'great']):
- predictions.append({
- 'type': 'completion_likely',
- 'confidence': 0.8,
- 'timeframe': 'next_exchange'
- })
- return {
- 'confidence': 0.7,
- 'predictions': predictions,
- 'suggested_responses': ['deep_dive', 'summarize', 'explore_related']
- }
- async def update_memory_graph(memory_id: str, user_id: str):
- """Update memory graph connections in background"""
- try:
- # This would update graph connections based on new memory
- logger.info(f"Updating memory graph for {memory_id}")
- # Implementation depends on specific graph update logic
- except Exception as e:
- logger.error(f"Error updating memory graph: {e}")
- async def get_memory_uncertainty(memory_id: str, user_id: str) -> Dict[str, float]:
- """Get uncertainty metrics for a memory"""
- # Simplified implementation
- return {
- 'aleatoric': 0.1,
- 'epistemic': 0.05,
- 'total': 0.15
- }
- async def get_episode_for_memory(memory) -> Optional[Any]:
- """Retrieve episode for a memory"""
- # Implementation would fetch from database
- return None
- def calculate_adaptation_quality(user_model: Dict[str, Any],
- personalization: Dict[str, Any]) -> float:
- """Calculate quality of personalization adaptation"""
- # Simple quality metric based on stability and coverage
- if not user_model.get('preference_vectors'):
- return 0.5
- stability = personalization.get('preferences', {}).get('preference_stability', 0.0)
- coverage = len(personalization.get('preferences', {}).get('top_topics', [])) / 5.0
- return (stability + coverage) / 2.0
- async def get_total_memory_count() -> int:
- """Get total memory count from database"""
- import sqlite3
- conn = sqlite3.connect(str(memory_system.db_path))
- count = conn.execute("SELECT COUNT(*) FROM memories").fetchone()[0]
- conn.close()
- return count
- async def get_precious_memory_count() -> int:
- """Get precious memory count"""
- import sqlite3
- conn = sqlite3.connect(str(memory_system.db_path))
- count = conn.execute(
- "SELECT COUNT(*) FROM memories WHERE marked_precious = 1 OR never_forget = 1"
- ).fetchone()[0]
- conn.close()
- return count
- async def get_average_importance() -> float:
- """Get average memory importance"""
- import sqlite3
- conn = sqlite3.connect(str(memory_system.db_path))
- avg = conn.execute("SELECT AVG(importance) FROM memories").fetchone()[0] or 0.5
- conn.close()
- return float(avg)
- def get_gpu_memory_usage() -> float:
- """Get GPU memory usage in MB"""
- if torch.cuda.is_available():
- return torch.cuda.memory_allocated() / 1024 / 1024
- return 0.0
- def calculate_working_memory_utilization() -> float:
- """Calculate working memory utilization"""
- total_capacity = sum(
- memory_system.working_memory.capacity
- for _ in memory_system.active_episodes.values()
- )
- if total_capacity == 0:
- return 0.0
- used = sum(
- len(memory_system.working_memory.episodic_buffer)
- for _ in memory_system.active_episodes.values()
- )
- return used / total_capacity
- def create_default_interface(path: Path):
- """Create a default chat interface if none exists"""
- html_content = """<!DOCTYPE html>
- <html>
- <head>
- <title>Advanced AI Memory System</title>
- <meta charset="UTF-8">
- <meta name="viewport" content="width=device-width, initial-scale=1.0">
- <style>
- body { font-family: Arial, sans-serif; margin: 0; padding: 20px; background: #f0f0f0; }
- .container { max-width: 1200px; margin: 0 auto; }
- .chat-container { background: white; border-radius: 10px; padding: 20px; height: 600px; display: flex; flex-direction: column; }
- .messages { flex-grow: 1; overflow-y: auto; padding: 10px; }
- .message { margin: 10px 0; padding: 10px; border-radius: 5px; }
- .user { background: #e3f2fd; text-align: right; }
- .assistant { background: #f5f5f5; }
- .input-area { display: flex; gap: 10px; margin-top: 20px; }
- .input-area input { flex-grow: 1; padding: 10px; border: 1px solid #ddd; border-radius: 5px; }
- .input-area button { padding: 10px 20px; background: #2196F3; color: white; border: none; border-radius: 5px; cursor: pointer; }
- .metrics { background: white; border-radius: 10px; padding: 20px; margin-top: 20px; }
- .metric { display: inline-block; margin: 10px; padding: 10px; background: #f5f5f5; border-radius: 5px; }
- </style>
- </head>
- <body>
- <div class="container">
- <h1>Advanced AI Memory System</h1>
- <p>Chat interface with HippoRAG indexing, adaptive patterns, and multi-dimensional memory encoding.</p>
- <div class="chat-container">
- <div class="messages" id="messages"></div>
- <div class="input-area">
- <input type="text" id="messageInput" placeholder="Type your message...">
- <button onclick="sendMessage()">Send</button>
- </div>
- </div>
- <div class="metrics" id="metrics">
- <h3>Session Metrics</h3>
- <div class="metric">Trust Level: <span id="trustLevel">0%</span></div>
- <div class="metric">Understanding Depth: <span id="understandingDepth">0%</span></div>
- <div class="metric">Flow Quality: <span id="flowQuality">0%</span></div>
- <div class="metric">Patterns Detected: <span id="patternsDetected">0</span></div>
- </div>
- </div>
- <script>
- const userId = 'user_' + Math.random().toString(36).substr(2, 9);
- const sessionId = 'session_' + Math.random().toString(36).substr(2, 9);
- let ws = null;
- function connectWebSocket() {
- ws = new WebSocket('ws://localhost:8000/ws');
- ws.onopen = () => {
- ws.send(JSON.stringify({ user_id: userId }));
- };
- ws.onmessage = (event) => {
- const data = JSON.parse(event.data);
- if (data.type === 'initial_stats') {
- updateMetrics(data.data);
- }
- };
- ws.onerror = (error) => {
- console.error('WebSocket error:', error);
- };
- }
- async function sendMessage() {
- const input = document.getElementById('messageInput');
- const message = input.value.trim();
- if (!message) return;
- // Display user message
- addMessage(message, 'user');
- input.value = '';
- try {
- const response = await fetch('/api/v3/exchange', {
- method: 'POST',
- headers: { 'Content-Type': 'application/json' },
- body: JSON.stringify({
- session_id: sessionId,
- user_message: message,
- user_id: userId,
- metadata: {}
- })
- });
- const data = await response.json();
- // Display assistant response
- addMessage(data.assistant_response, 'assistant');
- // Update metrics
- updateMetrics({
- trust_level: data.personalization?.preferences?.preference_stability || 0,
- understanding_depth: data.understanding_depth,
- flow_quality: data.flow_quality,
- patterns_detected: Object.keys(data.pattern_detection || {}).length
- });
- } catch (error) {
- console.error('Error:', error);
- addMessage('Error: Failed to get response', 'assistant');
- }
- }
- function addMessage(text, sender) {
- const messages = document.getElementById('messages');
- const messageDiv = document.createElement('div');
- messageDiv.className = 'message ' + sender;
- messageDiv.textContent = text;
- messages.appendChild(messageDiv);
- messages.scrollTop = messages.scrollHeight;
- }
- function updateMetrics(data) {
- if (data.trust_level !== undefined) {
- document.getElementById('trustLevel').textContent = (data.trust_level * 100).toFixed(1) + '%';
- }
- if (data.understanding_depth !== undefined) {
- document.getElementById('understandingDepth').textContent = (data.understanding_depth * 100).toFixed(1) + '%';
- }
- if (data.flow_quality !== undefined) {
- document.getElementById('flowQuality').textContent = (data.flow_quality * 100).toFixed(1) + '%';
- }
- if (data.patterns_detected !== undefined) {
- document.getElementById('patternsDetected').textContent = data.patterns_detected;
- }
- }
- // Initialize
- connectWebSocket();
- // Enter key handler
- document.getElementById('messageInput').addEventListener('keypress', (e) => {
- if (e.key === 'Enter') sendMessage();
- });
- </script>
- </body>
- </html>"""
- path.write_text(html_content)
- # ==================== BACKGROUND TASKS ====================
- async def consolidation_scheduler():
- """Schedule periodic consolidation cycles"""
- while True:
- try:
- # Run every 6 hours by default
- await asyncio.sleep(3600 * 6)
- if memory_system.config.get('enable_sleep_consolidation', True):
- logger.info("Starting scheduled consolidation cycle")
- await memory_system.sleep_engine.run_consolidation_cycle(90)
- except asyncio.CancelledError:
- break
- except Exception as e:
- logger.error(f"Error in consolidation scheduler: {e}")
- async def pattern_optimization_loop():
- """Periodically optimize pattern banks"""
- while True:
- try:
- await asyncio.sleep(3600) # Every hour
- for pattern_type, bank in memory_system.pattern_recognizer.pattern_banks.items():
- if len(bank.patterns) > bank.capacity * 0.9:
- # Rebuild index for optimization
- bank._rebuild_index()
- logger.info(f"Optimized {pattern_type} pattern bank")
- except asyncio.CancelledError:
- break
- except Exception as e:
- logger.error(f"Error in pattern optimization: {e}")
- async def metrics_aggregation_loop():
- """Aggregate metrics for monitoring"""
- while True:
- try:
- await asyncio.sleep(300) # Every 5 minutes
- # Aggregate and log metrics
- metrics = {
- 'active_sessions': len(memory_system.active_episodes),
- 'total_patterns': sum(
- len(bank.patterns)
- for bank in memory_system.pattern_recognizer.pattern_banks.values()
- ),
- 'gpu_memory': get_gpu_memory_usage() if torch.cuda.is_available() else 0
- }
- logger.info(f"System metrics: {metrics}")
- except asyncio.CancelledError:
- break
- except Exception as e:
- logger.error(f"Error in metrics aggregation: {e}")
- async def memory_decay_manager():
- """Manage memory decay for non-precious memories"""
- while True:
- try:
- await asyncio.sleep(3600 * 24) # Daily
- # This would implement memory decay
- # For now, just log
- logger.info("Memory decay management cycle completed")
- except asyncio.CancelledError:
- break
- except Exception as e:
- logger.error(f"Error in memory decay manager: {e}")
- async def automated_backup_scheduler():
- """Backup to cloud every 6 hours"""
- while True:
- await asyncio.sleep(3600 * 6) # 6 hours
- try:
- # Your backup code here
- logger.info("Starting automated cloud backup")
- # Get all users
- conn = sqlite3.connect(str(memory_system.db_path))
- users = conn.execute("SELECT DISTINCT user_id FROM episodes").fetchall()
- conn.close()
- for (user_id,) in users:
- # Create backup for each user
- backup_data = await create_user_backup(user_id)
- # Encrypt if configured
- if os.getenv('BACKUP_ENCRYPTION_KEY'):
- cipher = Fernet(os.getenv('BACKUP_ENCRYPTION_KEY').encode())
- encrypted = cipher.encrypt(json.dumps(backup_data).encode())
- else:
- encrypted = json.dumps(backup_data).encode()
- # Upload to S3 if configured
- if os.getenv('AWS_S3_BUCKET'):
- s3_client = boto3.client('s3')
- key = f"claude-memories/{user_id}/{datetime.utcnow().isoformat()}.backup"
- s3_client.put_object(
- Bucket=os.getenv('AWS_S3_BUCKET'),
- Key=key,
- Body=encrypted
- )
- logger.info("Cloud backup completed")
- except Exception as e:
- logger.error(f"Backup failed: {e}")
- # ==================== MAIN ENTRY POINT ====================
- if __name__ == "__main__":
- # Ensure required directories exist
- Path("./data/ai_memories").mkdir(parents=True, exist_ok=True)
- Path("./static").mkdir(exist_ok=True)
- # Log startup information
- print("\n" + "="*60)
- print("ADVANCED AI MEMORY SYSTEM v3.0")
- print("="*60)
- print("\nCapabilities:")
- print(" ✓ HippoRAG neurobiological indexing")
- print(" ✓ TCN adaptive pattern recognition")
- print(" ✓ Multi-dimensional memory encoding")
- print(" ✓ Sleep-inspired consolidation")
- print(" ✓ Privacy-preserving personalization")
- print(" ✓ Advanced conversation flow analysis")
- print(" ✓ Uncertainty quantification")
- print("\nConfiguration:")
- print(f" GPU: {'Available' if torch.cuda.is_available() else 'Not available'}")
- print(f" API Key: {'Configured' if os.getenv('ANTHROPIC_API_KEY') != 'your-api-key' else 'Not configured (demo mode)'}")
- print(f" Sleep Consolidation: {os.getenv('ENABLE_SLEEP_CONSOLIDATION', 'true')}")
- print("\nStarting server at http://localhost:8000")
- print("API documentation at http://localhost:8000/docs")
- print("="*60 + "\n")
- # Run the server
- uvicorn.run(
- "advanced_memory_api_complete:app",
- host="0.0.0.0",
- port=8000,
- reload=True,
- log_level="info"
- )
Add Comment
Please, Sign In to add comment