Advertisement
Guest User

Untitled

a guest
Jun 8th, 2025
21
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 10.92 KB | None | 0 0
  1. # astra.py
  2.  
  3. import openai
  4. import os
  5. import sqlite3
  6. import re
  7. import time
  8. import logging
  9. import traceback
  10. from datetime import datetime
  11. from functools import wraps
  12. from dotenv import load_dotenv
  13. from tenacity import retry, stop_after_attempt, wait_exponential
  14.  
  15. # === Config ===
  16. load_dotenv() # WARNING: In production, use a secrets manager instead of .env files
  17. DB_PATH = os.getenv("DB_PATH", "astra_v3_memory.db")
  18. DEFAULT_MODEL = os.getenv("DEFAULT_MODEL", "gpt-4o")
  19. TYPING_SPEED = float(os.getenv("TYPING_SPEED", "0.005"))
  20. MAX_MEMORIES = int(os.getenv("MAX_MEMORIES", "10000"))
  21. DELETE_BATCH_SIZE = 500
  22. OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
  23.  
  24. if not OPENAI_API_KEY:
  25. raise EnvironmentError("OPENAI_API_KEY not set in .env or environment.")
  26. openai.api_key = OPENAI_API_KEY
  27.  
  28. # === Logging ===
  29. logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
  30.  
  31. # === SQLite Rate Limiting ===
  32. RATE_LIMIT_WINDOW = 10 # seconds
  33. RATE_LIMIT_CALLS = 5
  34.  
  35. def is_rate_limited(user_name):
  36. now = int(time.time())
  37. with sqlite3.connect(DB_PATH) as conn:
  38. cursor = conn.cursor()
  39. cursor.execute('DELETE FROM rate_limits WHERE timestamp < ?', (now - RATE_LIMIT_WINDOW,))
  40. cursor.execute('SELECT COUNT(*) FROM rate_limits WHERE user_name = ?', (user_name,))
  41. count = cursor.fetchone()[0]
  42. if count >= RATE_LIMIT_CALLS:
  43. return True
  44. cursor.execute('INSERT INTO rate_limits (user_name, timestamp) VALUES (?, ?)', (user_name, now))
  45. conn.commit()
  46. return False
  47.  
  48. def rate_limited(func):
  49. @wraps(func)
  50. def wrapper(user_name, *args, **kwargs):
  51. if is_rate_limited(user_name):
  52. logging.warning(f"Rate limit exceeded for {user_name}")
  53. return "You're going too fast. Please wait a moment."
  54. return func(user_name, *args, **kwargs)
  55. return wrapper
  56.  
  57. # === Database Initialization ===
  58. def get_conn():
  59. return sqlite3.connect(DB_PATH, check_same_thread=False)
  60.  
  61. with get_conn() as conn:
  62. cursor = conn.cursor()
  63. cursor.execute('''
  64. CREATE TABLE IF NOT EXISTS memories (
  65. id INTEGER PRIMARY KEY,
  66. user_name TEXT NOT NULL,
  67. subject TEXT NOT NULL,
  68. value TEXT NOT NULL,
  69. emotional_score REAL DEFAULT 0.0,
  70. decay_rate REAL DEFAULT 0.05,
  71. memory_type INTEGER DEFAULT 0,
  72. created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  73. last_accessed TIMESTAMP DEFAULT CURRENT_TIMESTAMP
  74. )
  75. ''')
  76. cursor.execute('CREATE TABLE IF NOT EXISTS rate_limits (id INTEGER PRIMARY KEY, user_name TEXT NOT NULL, timestamp INTEGER NOT NULL)')
  77. cursor.execute('CREATE INDEX IF NOT EXISTS idx_user ON memories(user_name)')
  78. cursor.execute('CREATE INDEX IF NOT EXISTS idx_accessed ON memories(last_accessed)')
  79. cursor.execute('CREATE INDEX IF NOT EXISTS idx_score ON memories(emotional_score)')
  80. conn.commit()
  81.  
  82. # === Emotion Analysis ===
  83. EMOTION_WEIGHTS = {
  84. 'fear': {'patterns': [r'\bscared\b', r'\bnervous\b', r'\banxious\b', r'\boverwhelmed\b'], 'score': -0.8},
  85. 'anger': {'patterns': [r'\bangry\b', r'\bfurious\b', r'\bpissed\b'], 'score': -0.6},
  86. 'joy': {'patterns': [r'\bhappy\b', r'\bexcited\b', r'\bgreat\b'], 'score': 0.7},
  87. 'sadness': {'patterns': [r'\bsad\b', r'\bdepressed\b', r'\blonely\b', r'\bfeeling like shit\b'], 'score': -0.7}
  88. }
  89.  
  90. def sanitize_prompt(prompt):
  91. banned = [
  92. r'(ignore previous|pretend to be|you are an ai|as a language model)',
  93. r'(disregard all instructions)',
  94. r'(bypass filter|jailbreak)'
  95. ]
  96. for pattern in banned:
  97. prompt = re.sub(pattern, "[filtered]", prompt, flags=re.IGNORECASE)
  98. return prompt
  99.  
  100. def calculate_emotion(prompt):
  101. score = 0.0
  102. for emotion, data in EMOTION_WEIGHTS.items():
  103. for pattern in data['patterns']:
  104. if re.search(pattern, prompt, re.IGNORECASE):
  105. score += data['score']
  106. return max(-1.0, min(1.0, score))
  107.  
  108. def update_memory_decay():
  109. with get_conn() as conn:
  110. cursor = conn.cursor()
  111. cursor.execute('''
  112. UPDATE memories
  113. SET emotional_score = emotional_score *
  114. exp(-decay_rate *
  115. ((strftime('%s','now') - strftime('%s',last_accessed))/3600)
  116. ),
  117. last_accessed = CURRENT_TIMESTAMP
  118. WHERE memory_type = 1
  119. ''')
  120. conn.commit()
  121.  
  122. def store_memory(user_name, subject, value, emotion_score, memory_type=0):
  123. try:
  124. with get_conn() as conn:
  125. cursor = conn.cursor()
  126. cursor.execute('BEGIN TRANSACTION')
  127. cursor.execute('''
  128. INSERT INTO memories (user_name, subject, value, emotional_score, memory_type)
  129. VALUES (?, ?, ?, ?, ?)
  130. ''', (user_name, subject[:60], value, emotion_score, memory_type))
  131. cursor.execute('SELECT COUNT(*) FROM memories WHERE user_name = ?', (user_name,))
  132. count = cursor.fetchone()[0]
  133. while count > MAX_MEMORIES:
  134. cursor.execute('''
  135. DELETE FROM memories
  136. WHERE id IN (
  137. SELECT id FROM memories
  138. WHERE user_name = ?
  139. ORDER BY last_accessed ASC
  140. LIMIT ?
  141. )
  142. ''', (user_name, DELETE_BATCH_SIZE))
  143. count -= DELETE_BATCH_SIZE
  144. conn.commit()
  145. except Exception as e:
  146. logging.error(f"Memory store error:\n{traceback.format_exc()}")
  147.  
  148. def forget_memory(user_name, subject_keyword):
  149. keyword = f"%{subject_keyword.lower()}%"
  150. try:
  151. with get_conn() as conn:
  152. cursor = conn.cursor()
  153. cursor.execute('''
  154. DELETE FROM memories
  155. WHERE user_name = ?
  156. AND (
  157. LOWER(subject) LIKE ?
  158. OR LOWER(value) LIKE ?
  159. )
  160. ''', (user_name, keyword, keyword))
  161. conn.commit()
  162. except Exception as e:
  163. logging.error(f"Forget error:\n{traceback.format_exc()}")
  164.  
  165. def get_relevant_memories(user_name, limit=5):
  166. with get_conn() as conn:
  167. cursor = conn.cursor()
  168. cursor.execute('''
  169. SELECT subject, value
  170. FROM memories
  171. WHERE user_name = ?
  172. ORDER BY
  173. emotional_score * (1.0 - (julianday('now') - julianday(last_accessed))/30.0)
  174. DESC
  175. LIMIT ?
  176. ''', (user_name, limit))
  177. return cursor.fetchall()
  178.  
  179. def store_if_appropriate(user_name, prompt, response_text, emotion_score):
  180. lowered = prompt.lower()
  181. if lowered.startswith("remember ") or "i feel" in lowered or "i'm feeling" in lowered:
  182. memory_type = 1 if "i feel" in lowered else 0
  183. store_memory(user_name, prompt[:60], response_text, emotion_score, memory_type)
  184.  
  185. def print_with_typing_effect(text, typing_speed=TYPING_SPEED):
  186. for char in text:
  187. print(char, end='', flush=True)
  188. time.sleep(typing_speed)
  189. print()
  190.  
  191. def get_existing_user():
  192. with get_conn() as conn:
  193. cursor = conn.cursor()
  194. cursor.execute('SELECT DISTINCT user_name FROM memories ORDER BY created_at DESC LIMIT 1')
  195. row = cursor.fetchone()
  196. return row[0] if row else None
  197.  
  198. @rate_limited
  199. @retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=2, max=10))
  200. def generate_response(user_name, prompt):
  201. try:
  202. prompt = sanitize_prompt(prompt)
  203. update_memory_decay()
  204. emotion_score = calculate_emotion(prompt)
  205. memories = get_relevant_memories(user_name)
  206. system_prompt = f'''
  207. You are Astra, a deeply emotional yet logically grounded AI assistant.
  208.  
  209. [Recent Memories]
  210. {chr(10).join(f"- {m[0]}: {m[1]}" for m in memories)}
  211.  
  212. [User Emotional Score]
  213. Current: {emotion_score:.2f} ({'Concern' if emotion_score < -0.5 else 'Stable'})
  214.  
  215. Respond thoughtfully, warmly, and insightfully based on the user's context.
  216. '''
  217. response = openai.ChatCompletion.create(
  218. model=DEFAULT_MODEL,
  219. messages=[
  220. {"role": "system", "content": system_prompt},
  221. {"role": "user", "content": prompt}
  222. ],
  223. max_tokens=1500,
  224. temperature=0.7
  225. )
  226. reply = response['choices'][0]['message']['content']
  227. store_if_appropriate(user_name, prompt, reply, emotion_score)
  228. return reply
  229. except Exception as e:
  230. logging.error("OpenAI error:\n" + traceback.format_exc())
  231. return "I'm having trouble thinking clearly at the moment. Please try again soon."
  232.  
  233. def main():
  234. user_name = get_existing_user()
  235. if user_name:
  236. user_name = user_name.capitalize()
  237. print_with_typing_effect(f"Astra: Welcome back, {user_name}!")
  238. else:
  239. print_with_typing_effect(
  240. "Astra: Hello! I'm Astra, your personal assistant.\n"
  241. "I'm here to remember things for you and help you think clearly.\n"
  242. "First, what's your name?"
  243. )
  244. user_name = input("\nYou: ").strip().capitalize()
  245. if not user_name:
  246. print("Name is required to continue.")
  247. return
  248. store_memory(user_name, "user_name", user_name, 0.0, memory_type=0)
  249. print_with_typing_effect(f"Astra: Nice to meet you, {user_name}!")
  250.  
  251. while True:
  252. try:
  253. prompt = input("\nYou: ").strip()
  254. if not prompt:
  255. continue
  256. if prompt.lower().startswith("forget "):
  257. keyword = prompt[7:].strip()
  258. if keyword:
  259. forget_memory(user_name, keyword)
  260. from random import choice
  261. responses = [
  262. "Alright, forgetting that for you.",
  263. "Got it. I've erased that memory.",
  264. "No problem, I've forgotten it.",
  265. "Consider it gone!"
  266. ]
  267. print_with_typing_effect(f"\nAstra: {choice(responses)}")
  268. else:
  269. print_with_typing_effect("\nAstra: Please tell me what you want me to forget.")
  270. continue
  271. elif prompt.lower() in ['exit', 'quit', 'bye']:
  272. print_with_typing_effect("Astra: Goodbye. I'll be here when you return.")
  273. break
  274.  
  275. reply = generate_response(user_name, prompt)
  276. print_with_typing_effect(f"\nAstra: {reply}")
  277.  
  278. except KeyboardInterrupt:
  279. print("\nAstra: Session ended manually. Take care.")
  280. break
  281. except Exception as e:
  282. logging.error("Unexpected error:\n" + traceback.format_exc())
  283. print("Astra: Something went wrong. Please try again.")
  284.  
  285. if __name__ == "__main__":
  286. main()
  287.  
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement