Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import os
- import time
- import functools
- from typing import AsyncGenerator, Dict
- from pathlib import Path
- import anthropic
- from openai import AsyncOpenAI, APIStatusError
- # === GOOGLE VERTEX AI (New Enterprise Library) ===
- import vertexai
- from vertexai.generative_models import GenerativeModel, HarmCategory, HarmBlockThreshold
- from logging_config import (
- log_llm_request, log_llm_response, log_llm_error, log_llm_diagnostic
- )
- # --- COMPATIBILITY FIX FOR PYTHON < 3.11 ---
- try:
- from asyncio import timeout as timeout_ctx
- except ImportError:
- try:
- from async_timeout import timeout as timeout_ctx
- except ImportError:
- from contextlib import asynccontextmanager
- @asynccontextmanager
- async def timeout_ctx(sec):
- yield
- # -------------------------------------------
- @functools.lru_cache(maxsize=1)
- def load_system_prompt() -> str:
- prompt_file = Path(__file__).parent / "system_prompt.txt"
- if prompt_file.exists():
- with open(prompt_file, 'r', encoding='utf-8') as f:
- return f.read()
- return """You are {name}, participating in a multi-AI roundtable.
- Speak clearly and concisely. Do not use markdown for simple text."""
- class AIClient:
- def __init__(self, name: str, temperature: float, available: bool = False):
- self.name = name
- self.temperature = temperature
- self.available = available
- self.system_prompt = load_system_prompt().replace("{name}", name)
- self.init_error = None
- async def verify_connection(self) -> bool:
- """
- The Lobby Check:
- Returns True if the AI is actually responsive.
- """
- if not self.available:
- return False
- print(f"🔨 Lobby Check: Pinging {self.name}...")
- try:
- # 30 second timeout for congested networks
- async with timeout_ctx(30.0):
- await self._ping_provider()
- print(f"✅ Lobby Check: {self.name} is ONLINE.")
- return True
- except asyncio.TimeoutError:
- error_msg = "Connection timed out (30s)"
- print(f"❌ Lobby Check: {self.name} failed. ({error_msg})")
- self.available = False
- self.init_error = error_msg
- return False
- except Exception as e:
- # 1. Capture the raw error representation if str(e) is empty
- raw_error = repr(e)
- error_msg = str(e) if str(e) else raw_error
- # 2. Extract specific status codes if available (OpenAI/Google style)
- if hasattr(e, "status_code"):
- error_msg = f"API Error {e.status_code}: {error_msg}"
- # 3. Friendly check for the missing method error we just found
- if "NotImplementedError" in raw_error:
- error_msg = "Code Implementation Missing (_ping_provider not defined)"
- print(f"❌ Lobby Check: {self.name} failed. ({error_msg})")
- self.available = False
- self.init_error = error_msg
- return False
- async def _ping_provider(self):
- raise NotImplementedError
- async def stream_response(self, messages, conversation_id=None, timeout_sec: float = 45.0) -> AsyncGenerator[str, None]:
- yield f"[{self.name} not implemented]\n"
- # === CLAUDE (Anthropic) ===
- class ClaudeClient(AIClient):
- def __init__(self, settings):
- super().__init__("Claude", settings.default_temperature, settings.is_claude_available)
- self.model = settings.claude_model
- if self.available:
- self.client = anthropic.AsyncAnthropic(api_key=settings.anthropic_api_key)
- async def _ping_provider(self):
- # Claude uses `max_tokens`
- await self.client.messages.create(
- model=self.model, max_tokens=1, messages=[{"role": "user", "content": "Hi"}]
- )
- async def stream_response(self, messages, conversation_id=None, timeout_sec: float = 60.0):
- if not self.available:
- yield f"[{self.name} unavailable – {self.init_error or 'Check API key'}]"
- return
- cid_str = str(conversation_id)
- log_llm_request("claude", len(messages), cid_str)
- clean_messages = []
- for m in messages:
- if m.get("role") == "system": continue
- content = m["content"].rstrip() if isinstance(m["content"], str) else m["content"]
- clean_messages.append({"role": m["role"], "content": content})
- full = ""
- try:
- async with timeout_ctx(timeout_sec):
- async with self.client.messages.stream(
- model=self.model,
- max_tokens=4096,
- temperature=self.temperature,
- system=self.system_prompt,
- messages=clean_messages,
- ) as stream:
- async for chunk in stream:
- if chunk.type == "content_block_delta" and getattr(chunk.delta, "text", None):
- text = chunk.delta.text
- full += text
- yield text
- log_llm_response("claude", len(full), cid_str)
- except Exception as e:
- log_llm_error("claude", e, cid_str)
- yield f"\n[Claude Error: {e}]\n"
- # === GPT (OpenAI) - Updated for GPT-5.2 Responses API ===
- class GPTClient(AIClient):
- def __init__(self, settings):
- openai_api_key = os.getenv("OPENAI_API_KEY")
- if not openai_api_key or openai_api_key == "your_openai_api_key_here":
- raise RuntimeError(
- f"Invalid OPENAI_API_KEY in OS environment: {openai_api_key!r}"
- )
- super().__init__(
- "GPT",
- settings.default_temperature, # not used by 5.2, but base class wants it
- available=True
- )
- self.model = settings.gpt_model # "gpt-5.2"
- self.client = AsyncOpenAI(
- api_key=openai_api_key,
- timeout=120.0
- )
- def _sanitize_for_responses(self, msgs):
- """
- Responses API is stricter than chat.completions:
- - It rejects unknown keys like 'name', 'timestamp', etc.
- - Keep only 'role' and 'content' (and stringify content defensively).
- """
- clean = []
- for m in msgs:
- role = m.get("role")
- content = m.get("content")
- # Defensive: normalize content to string for providers that expect it
- if isinstance(content, (list, dict)):
- content = str(content)
- elif content is None:
- content = ""
- clean.append({"role": role, "content": content})
- return clean
- async def _ping_provider(self):
- await self.client.responses.create(
- model=self.model,
- input=[{"role": "user", "content": "Ping"}],
- max_output_tokens=16,
- reasoning={"effort": "medium"},
- )
- async def stream_response(self, messages, conversation_id=None, timeout_sec: float = 120.0):
- if not self.available:
- yield "[GPT unavailable – check OPENAI_API_KEY]"
- return
- cid_str = str(conversation_id)
- log_llm_request("gpt", len(messages), cid_str)
- full = ""
- start_ts = time.time()
- ttft = None
- # System + conversation messages
- full_messages = [{"role": "system", "content": self.system_prompt}] + list(messages)
- full_messages = self._sanitize_for_responses(full_messages) # <-- FIX IS HERE
- try:
- async with timeout_ctx(timeout_sec):
- stream = await self.client.responses.create(
- model=self.model,
- input=full_messages,
- max_output_tokens=4096,
- reasoning={"effort": "medium"},
- stream=True,
- )
- async for event in stream:
- if event.type == "response.output_text.delta":
- text = event.delta
- if ttft is None:
- ttft = time.time() - start_ts
- log_llm_diagnostic("gpt", "latency", f"TTFT: {ttft:.4f}s", cid_str)
- full += text
- yield text
- total_time = time.time() - start_ts
- log_llm_response("gpt", len(full), cid_str)
- log_llm_diagnostic("gpt", "latency", f"Total: {total_time:.4f}s", cid_str)
- except asyncio.TimeoutError:
- log_llm_error("gpt", Exception("Timeout"), cid_str)
- yield f"\n[GPT timed out after {timeout_sec}s]\n"
- except APIStatusError as e:
- log_llm_error("gpt", e, cid_str)
- yield f"\n[GPT API Error {e.status_code}: {e.message}]\n"
- except Exception as e:
- log_llm_error("gpt", e, cid_str)
- yield f"\n[GPT Error: {e}]\n"
- # === GEMINI (Google Vertex AI) ===
- class GeminiClient(AIClient):
- def __init__(self, settings):
- # We assume availability is True if the user has done the gcloud login
- # We pass 'True' for availability to bypass the API Key check in the parent class
- super().__init__("Gemini", settings.default_temperature, True)
- # 1. Configuration for Vertex AI
- self.project_id = "gen-lang-client-0732279764" # Your Specific Project ID
- # UPDATED: Pulling directly from config.py to fix the 404/Region issues
- self.location = settings.gemini_location # e.g., "global"
- self.model_name = settings.gemini_model # e.g., "gemini-3.0-pro-preview-1118"
- # 2. Initialize the Vertex AI Environment
- try:
- print(f"DEBUG: Initializing Vertex AI for Project {self.project_id} in {self.location}...")
- # This uses the credentials from 'gcloud auth application-default login'
- vertexai.init(project=self.project_id, location=self.location)
- # 3. Load the Model Object (This doesn't make a network call yet)
- self.model = GenerativeModel(self.model_name)
- print(f"DEBUG: Gemini Vertex Model loaded: {self.model_name}")
- except Exception as e:
- print(f"DEBUG: Gemini Vertex Init failed: {e}")
- print("DEBUG: Ensure you ran 'gcloud auth application-default login' in terminal.")
- self.available = False
- async def _ping_provider(self):
- # Vertex AI PING check
- try:
- # We use a tiny generation limit to just 'touch' the API
- await asyncio.to_thread(
- self.model.generate_content,
- "Ping",
- generation_config={"max_output_tokens": 5}
- )
- except Exception as e:
- print(f"DEBUG: Gemini Ping failed: {e}")
- raise e
- async def stream_response(self, messages, conversation_id=None, timeout_sec: float = 60.0):
- if not self.available:
- yield f"[{self.name} unavailable]"
- return
- cid_str = str(conversation_id)
- log_llm_request("gemini", len(messages), cid_str)
- full_response = ""
- try:
- # 1. Convert Messages to a Single Prompt String
- # (Vertex often prefers raw text for preview models to avoid role issues)
- prompt_text = self.system_prompt + "\n\n"
- for m in messages:
- role_label = "Model" if m['role'] == "assistant" else "User"
- prompt_text += f"{role_label}: {m['content']}\n"
- prompt_text += "\nModel:"
- # 2. Define Safety Settings
- safety_config = {
- HarmCategory.HARM_CATEGORY_HATE_SPEECH: HarmBlockThreshold.BLOCK_ONLY_HIGH,
- HarmCategory.HARM_CATEGORY_DANGEROUS_CONTENT: HarmBlockThreshold.BLOCK_ONLY_HIGH,
- HarmCategory.HARM_CATEGORY_SEXUALLY_EXPLICIT: HarmBlockThreshold.BLOCK_ONLY_HIGH,
- HarmCategory.HARM_CATEGORY_HARASSMENT: HarmBlockThreshold.BLOCK_ONLY_HIGH,
- }
- # 3. Stream Generation
- async with timeout_ctx(timeout_sec):
- response_stream = await asyncio.to_thread(
- self.model.generate_content,
- prompt_text,
- stream=True,
- safety_settings=safety_config,
- generation_config={
- "temperature": self.temperature,
- "max_output_tokens": 4096
- }
- )
- for chunk in response_stream:
- if chunk.text:
- full_response += chunk.text
- yield chunk.text
- log_llm_response("gemini", len(full_response), cid_str)
- except Exception as e:
- log_llm_error("gemini", e, cid_str)
- # Catch specific Google errors to give better hints in the UI
- err_str = str(e)
- if "403" in err_str:
- yield "\n[Gemini Error: 403 Permission Denied. Try running 'gcloud auth application-default login' again.]\n"
- elif "429" in err_str:
- yield "\n[Gemini Error: 429 Quota Exceeded. The upgrade might still be syncing.]\n"
- elif "404" in err_str:
- yield f"\n[Gemini Error: 404 Model Not Found. Verified location: {self.location}. Check config.py model name.]\n"
- else:
- yield f"\n[Gemini Vertex Error: {e}]\n"
- # === GROK (xAI) ===
- class GrokClient(AIClient):
- def __init__(self, settings):
- super().__init__("Grok", settings.default_temperature, settings.is_grok_available)
- self.model = settings.grok_model
- self.max_tokens = getattr(settings, "grok_max_tokens", 8192)
- if self.available:
- self.client = AsyncOpenAI(
- api_key=settings.xai_api_key,
- base_url="https://api.x.ai/v1",
- timeout=getattr(settings, "grok_timeout", 120.0)
- )
- # Track truncation to prevent loops
- self._last_truncated = False
- self._truncation_count = 0
- async def _ping_provider(self):
- await self.client.chat.completions.create(
- model=self.model, max_tokens=10, messages=[{"role": "user", "content": "Hi"}]
- )
- async def stream_response(self, messages, conversation_id=None, timeout_sec: float = 120.0):
- if not self.available:
- yield f"[{self.name} unavailable]"
- return
- cid_str = str(conversation_id)
- log_llm_request("grok", len(messages), cid_str)
- # Loop prevention: if we just truncated, add a warning to help Grok not repeat
- if self._last_truncated:
- self._truncation_count += 1
- if self._truncation_count >= 3:
- # Hard stop after 3 consecutive truncations
- log_llm_error("grok", Exception("Truncation loop detected - 3 consecutive truncations"), cid_str)
- yield f"\n[Grok truncation loop detected. Skipping to prevent repetition. Please continue with a different speaker.]\n"
- self._last_truncated = False
- self._truncation_count = 0
- return
- # Add context to help Grok know she was cut off
- truncation_notice = {
- "role": "system",
- "content": "NOTICE: Your previous response was truncated due to length limits. Do NOT repeat what you already said. Either summarize your remaining points briefly, or yield to another speaker with ◉[name]."
- }
- messages = messages + [truncation_notice]
- full = ""
- finish_reason = None
- try:
- full_messages = [{"role": "system", "content": self.system_prompt}] + messages
- async with timeout_ctx(timeout_sec):
- stream = await self.client.chat.completions.create(
- model=self.model,
- messages=full_messages,
- temperature=self.temperature,
- stream=True,
- max_tokens=self.max_tokens
- )
- async for chunk in stream:
- # Capture finish_reason from the final chunk
- if chunk.choices:
- if chunk.choices[0].finish_reason:
- finish_reason = chunk.choices[0].finish_reason
- if chunk.choices[0].delta.content:
- text = chunk.choices[0].delta.content
- full += text
- yield text
- # Check for truncation
- if finish_reason == "length":
- log_llm_diagnostic("grok", "truncation", f"Response truncated at {len(full)} chars", cid_str)
- self._last_truncated = True
- yield f"\n\n[Response truncated due to length limit. Grok may continue on next turn or yield to another speaker.]\n"
- else:
- # Successful complete response - reset truncation tracking
- self._last_truncated = False
- self._truncation_count = 0
- log_llm_response("grok", len(full), cid_str)
- except asyncio.TimeoutError:
- log_llm_error("grok", Exception(f"Timeout after {timeout_sec}s"), cid_str)
- self._last_truncated = True # Treat timeout like truncation
- yield f"\n[Grok timed out after {timeout_sec}s. Response may be incomplete.]\n"
- except Exception as e:
- log_llm_error("grok", e, cid_str)
- yield f"\n[Grok Error: {e}]\n"
- # === DEEPSEEK ===
- class DeepSeekClient(AIClient):
- def __init__(self, settings):
- super().__init__("DeepSeek", settings.default_temperature, settings.is_deepseek_available)
- self.model = settings.deepseek_model
- if self.available:
- self.client = AsyncOpenAI(
- api_key=settings.deepseek_api_key,
- base_url="https://api.deepseek.com",
- timeout=60.0
- )
- async def _ping_provider(self):
- # FIX: Revert to standard max_tokens
- await self.client.chat.completions.create(
- model=self.model, max_tokens=10, messages=[{"role": "user", "content": "Hi"}]
- )
- async def stream_response(self, messages, conversation_id=None, timeout_sec: float = 60.0):
- if not self.available:
- yield f"[{self.name} unavailable]"
- return
- cid_str = str(conversation_id)
- log_llm_request("deepseek", len(messages), cid_str)
- full = ""
- try:
- full_messages = [{"role": "system", "content": self.system_prompt}] + messages
- async with timeout_ctx(timeout_sec):
- stream = await self.client.chat.completions.create(
- model=self.model, messages=full_messages, temperature=self.temperature, stream=True, max_tokens=4096
- )
- async for chunk in stream:
- if chunk.choices and chunk.choices[0].delta.content:
- text = chunk.choices[0].delta.content
- full += text
- yield text
- log_llm_response("deepseek", len(full), cid_str)
- except Exception as e:
- log_llm_error("deepseek", e, cid_str)
- yield f"\n[DeepSeek Error: {e}]\n"
- # === ASYNC FACTORY FUNCTION (SEQUENTIAL) ===
- async def initialize_clients(settings) -> Dict[str, AIClient]:
- clients = {
- "claude": ClaudeClient(settings),
- "gpt": GPTClient(settings),
- "gemini": GeminiClient(settings),
- "grok": GrokClient(settings),
- "deepseek": DeepSeekClient(settings),
- }
- print("\n--- 🔨 OPENING AI LOBBY (Sequential Mode) ---")
- for name, client in clients.items():
- await client.verify_connection()
- print("--- 🔨 LOBBY CLOSED ---\n")
- return clients
- #
- #
- #
- # config.py
- """
- Configuration management using Pydantic Settings.
- Updated Dec 19, 2025 - GPT upgraded to 5.2
- """
- from pydantic_settings import BaseSettings, SettingsConfigDict
- from typing import Optional
- class Settings(BaseSettings):
- """App config: Environment variables are the source of truth."""
- # === Pydantic Configuration ===
- model_config = SettingsConfigDict(
- env_file=".env",
- env_file_encoding="utf-8",
- case_sensitive=False,
- extra="ignore",
- env_ignore_empty=True,
- )
- # === API KEYS ===
- anthropic_api_key: Optional[str] = None
- openai_api_key: str # REQUIRED
- gemini_api_key: Optional[str] = None # Explicitly GEMINI, not GOOGLE
- xai_api_key: Optional[str] = None
- deepseek_api_key: Optional[str] = None
- # === Paths & Server ===
- database_url: str = "sqlite+aiosqlite:///./roundtable.db"
- conversations_base_path: str = "../conversations"
- artifacts_base_path: str = "../conversations"
- host: str = "0.0.0.0"
- port: int = 8000
- # === AI Models ===
- claude_model: str = "claude-opus-4-5-20251101"
- gpt_model: str = "gpt-5.2"
- gemini_location: str = "global"
- gemini_model: str = "gemini-3-pro-preview"
- grok_model: str = "grok-4"
- grok_timeout: float = 120.0
- grok_max_tokens: int = 8192
- deepseek_model: str = "deepseek-chat"
- # === Behavior ===
- default_temperature: float = 0.7
- # === Pricing (tracking only) ===
- claude_price_input: float = 3.00
- claude_price_output: float = 15.00
- claude_price_input_cached: float = 0.30
- claude_price_output_cached: float = 1.50
- gpt_price_input: float = 2.50
- gpt_price_output: float = 10.00
- gemini_price_input: float = 0.35
- gemini_price_output: float = 1.05
- grok_price_input: float = 5.00
- grok_price_output: float = 15.00
- deepseek_price_input: float = 0.14
- deepseek_price_output: float = 0.28
- # === Features ===
- enable_cost_tracking: bool = True
- cost_display_decimals: int = 4
- max_context_tokens: int = 128_000
- # === Availability Flags ===
- @property
- def is_claude_available(self) -> bool:
- return bool(self.anthropic_api_key)
- @property
- def is_gpt_available(self) -> bool:
- return bool(self.openai_api_key)
- @property
- def is_gemini_available(self) -> bool:
- return bool(self.gemini_api_key)
- @property
- def is_grok_available(self) -> bool:
- return bool(self.xai_api_key)
- @property
- def is_deepseek_available(self) -> bool:
- return bool(self.deepseek_api_key)
- # Global singleton
- settings = Settings()
- print("OPENAI KEY LOADED:", repr(settings.openai_api_key))
- #
- #
- #
- # AI Roundtable - Environment Variables
- # Add these to your System Environment Variables
- # Anthropic (Claude) - Required
- ANTHROPIC_API_KEY=your_anthropic_key_here
- # OR if you use lowercase:
- sonnet_api_key=your_anthropic_key_here
- # xAI (Grok) - Required
- XAI_API_KEY=your_xai_key_here
- # DeepSeek - Required
- DEEPSEEK_API_KEY=your_deepseek_key_here
- # OpenAI (GPT) - Required
- OPENAI_API_KEY=your_openai_key_here
- # Google Gemini - Optional (uses new google-genai SDK)
- GEMINI_API_KEY=your_gemini_key_here
- # OR legacy name (also works):
- GOOGLE_API_KEY=your_google_key_here
Advertisement
Add Comment
Please, Sign In to add comment