Guest User

Untitled

a guest
Sep 18th, 2025
64
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 9.38 KB | Source Code | 0 0
  1. from fastapi import FastAPI, Request
  2. from fastapi.responses import JSONResponse, StreamingResponse
  3. import uvicorn
  4. import httpx  # Changed from requests to httpx for async
  5. import numpy as np
  6. import uuid
  7. import time
  8. import json
  9. import asyncio
  10. import datetime # Import the datetime module
  11.  
  12. # ----------------------
  13. # CONFIG
  14. # ----------------------
  15. SEARXNG_URL = "http://localhost:8888/search"
  16. EMBED_URL = "http://localhost:8081/v1/embeddings"
  17. LLM_URL = "http://localhost:8080/v1/chat/completions"
  18.  
  19. EMBED_MODEL = "granite-embedding-125m-english-Q8_0"
  20. LLM_MODEL = "Qwen3-1.7B-Q4_K_M"
  21.  
  22. # ----------------------
  23. # HELPERS
  24. # ----------------------
  25. def clean_query(query: str) -> str:
  26.     query = query.strip().rstrip("?")
  27.     stopwords = {
  28.         "'s", "a", "about", "above", "after", "against", "again", "an",
  29.         "and", "are", "as", "at", "be", "before", "below", "between", "description",
  30.         "but", "by", "detail", "details", "did", "during", "do", "does", "explain",
  31.         "for", "from", "further", "give", "has", "have", "he", "her", "help", "his", "him",
  32.         "if", "in", "into", "is", "it", "me", "meant", "no", "noob", "not", "of", "on", "please", # Added 'please' here
  33.         "or", "she", "simple", "such", "tell", "terms", "that", "the", "their",
  34.         "then", "there", "these", "they", "this", "to", "through", "was",
  35.         "what", "what's", "where", "when", "will", "why", "with", "write",
  36.         "who", "who's", "you're", "understand"
  37.     }
  38.     return " ".join([w for w in query.split() if w.lower() not in stopwords])
  39.  
  40. # Now an async function using httpx
  41. async def get_embedding(text: str) -> np.ndarray:
  42.     async with httpx.AsyncClient() as client:
  43.         r = await client.post(EMBED_URL, json={"model": EMBED_MODEL, "input": text})
  44.         r.raise_for_status()
  45.         return np.array(r.json()["data"][0]["embedding"])
  46.  
  47. def cosine_similarity(a, b):
  48.     return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
  49.  
  50. # pipeline is now an async generator for real-time streaming
  51. async def pipeline(user_query: str):
  52.     clean_q = clean_query(user_query)
  53.  
  54.     async with httpx.AsyncClient() as client:
  55.         # 1. Search results
  56.         r = await client.get(SEARXNG_URL, params={"q": clean_q, "format": "json"})
  57.         r.raise_for_status()
  58.         results = r.json()["results"][:10]
  59.  
  60.         docs = [f"{res.get('title','')}. {res.get('content','')}" for res in results]
  61.  
  62.         # 2. Rank docs - fetching embeddings concurrently
  63.         q_emb = await get_embedding(clean_q)
  64.  
  65.         # Create a list of tasks for getting document embeddings
  66.         doc_embedding_tasks = [get_embedding(d) for d in docs]
  67.         # Run them concurrently
  68.         doc_embeddings = await asyncio.gather(*doc_embedding_tasks)
  69.  
  70.         scored_docs = []
  71.         for i, d in enumerate(docs):
  72.             d_emb = doc_embeddings[i]
  73.             scored_docs.append((cosine_similarity(q_emb, d_emb), d))
  74.  
  75.         top_docs = [d for _, d in sorted(scored_docs, key=lambda x: x[0], reverse=True)[:4]]
  76.  
  77.         # **Get the current date and format it**
  78.         current_date = datetime.date.today().strftime("%Y-%m-%d")
  79.  
  80.         # **3. LLM call**
  81.         context = "\n\n".join(top_docs)
  82.        
  83.         # **Construct the prompt including the current date**
  84.         prompt = f"The current date is {current_date}. Answer the question based on the following web information. Answer as if you already possess the information:\n\n{context}\n\nQuestion: {user_query}"
  85.  
  86.         # Make the LLM call with stream=True and iterate over its chunks
  87.         # Use client.stream() for an asynchronous stream
  88.         async with client.stream("POST", LLM_URL, json={
  89.             "model": LLM_MODEL,
  90.             "messages": [{"role": "user", "content": prompt}],
  91.             "max_tokens": 512,
  92.             "temperature": 0.7,
  93.             "stream": True # Request streaming from the LLM API
  94.         }, timeout=None) as r: # Set timeout=None for potentially long streams
  95.             r.raise_for_status()
  96.             # Iterate over raw bytes from the stream
  97.             async for chunk in r.aiter_bytes():
  98.                 try:
  99.                     # Decode chunk and split by lines (SSE messages)
  100.                     lines = chunk.decode('utf-8').split('\n')
  101.                     for line in lines:
  102.                         if line.startswith('data: '):
  103.                             json_data = line[len('data: '):]
  104.                             if json_data == '[DONE]':
  105.                                 yield '[DONE]' # Propagate the DONE signal
  106.                                 return # Exit the generator
  107.                            
  108.                             data = json.loads(json_data)
  109.                             # Extract the token. Adjust this path based on your 8080 LLM API's exact structure
  110.                             if 'choices' in data and len(data['choices']) > 0 and 'delta' in data['choices'][0] and 'content' in data['choices'][0]['delta']:
  111.                                 token = data['choices'][0]['delta']['content']
  112.                                 if token: # Only yield if there's actual content
  113.                                     yield token
  114.                 except json.JSONDecodeError:
  115.                     # Handle cases where a chunk might not be a complete JSON line
  116.                     # Or where partial JSON is received. You might buffer partial lines.
  117.                     # For simplicity, we skip malformed chunks for now.
  118.                     pass
  119.                 except Exception as e:
  120.                     print(f"Error processing LLM stream chunk: {e}")
  121.                     # Optionally yield an error message or raise an exception
  122.                     pass
  123.     yield '[DONE]' # Ensure DONE is always yielded if the above loop finishes without explicit [DONE]
  124.  
  125. # ----------------------
  126. # OPENAI-COMPATIBLE API
  127. # ----------------------
  128. app = FastAPI()
  129.  
  130. @app.post("/v1/chat/completions")
  131. async def chat_completions(request: Request):
  132.     body = await request.json()
  133.     messages = body.get("messages", [])
  134.     user_query = messages[-1]["content"] if messages else ""
  135.     stream = body.get("stream", False)
  136.  
  137.     if not stream:
  138.         # For non-streaming, we need to collect the full answer from the pipeline generator
  139.         full_answer_tokens = []
  140.         async for token in pipeline(user_query):
  141.             if token == '[DONE]':
  142.                 break
  143.             full_answer_tokens.append(token)
  144.         answer = "".join(full_answer_tokens)
  145.  
  146.         # Non-streaming (normal JSON response)
  147.         response = {
  148.             "id": f"chatcmpl-{uuid.uuid4()}",
  149.             "object": "chat.completion",
  150.             "created": int(time.time()),
  151.             "model": LLM_MODEL,
  152.             "choices": [
  153.                 {
  154.                     "index": 0,
  155.                     "message": {"role": "assistant", "content": answer},
  156.                     "finish_reason": "stop"
  157.                 }
  158.             ],
  159.             "usage": {
  160.                 "prompt_tokens": 0, # You'd need to calculate these based on your prompt and context
  161.                 "completion_tokens": len(answer.split()), # Approximate
  162.                 "total_tokens": len(answer.split()) # Approximate
  163.             }
  164.         }
  165.         return JSONResponse(content=response)
  166.  
  167.     # Streaming response (Server-Sent Events style, like OpenAI)
  168.     async def event_generator():
  169.         completion_id = f"chatcmpl-{uuid.uuid4()}"
  170.         created = int(time.time())
  171.  
  172.         # Iterate directly over the pipeline's yielded tokens
  173.         async for token in pipeline(user_query):
  174.             if token == '[DONE]':
  175.                 # Final done message
  176.                 done_chunk = {
  177.                     "id": completion_id,
  178.                     "object": "chat.completion.chunk",
  179.                     "created": created,
  180.                     "model": LLM_MODEL,
  181.                     "choices": [
  182.                         {
  183.                             "index": 0,
  184.                             "delta": {},
  185.                             "finish_reason": "stop"
  186.                         }
  187.                     ]
  188.                 }
  189.                 yield f"data: {json.dumps(done_chunk)}\n\n"
  190.                 break # Exit the loop after sending DONE
  191.  
  192.             # Yield each token as it arrives from the LLM
  193.             chunk = {
  194.                 "id": completion_id,
  195.                 "object": "chat.completion.chunk",
  196.                 "created": created,
  197.                 "model": LLM_MODEL,
  198.                 "choices": [
  199.                     {
  200.                         "index": 0,
  201.                         "delta": {"content": token}, # Send token as is, client handles concatenation
  202.                         "finish_reason": None
  203.                     }
  204.                 ]
  205.             }
  206.             yield f"data: {json.dumps(chunk)}\n\n"
  207.  
  208.         # Ensure a final [DONE] if the pipeline somehow exited without yielding it
  209.         # (e.g., due to an unhandled exception)
  210.         yield "data: [DONE]\n\n"
  211.  
  212.     return StreamingResponse(event_generator(), media_type="text/event-stream")
  213.  
  214. # ----------------------
  215. # RUN SERVER
  216. # ----------------------
  217. if __name__ == "__main__":
  218.     # Ensure uvicorn is running with an appropriate worker count
  219.     # For CPU-bound tasks, you might use num_cores; for IO-bound, more workers can help.
  220.     # In an async app, fewer workers might be fine if I/O is truly non-blocking.
  221.     uvicorn.run(app, host="0.0.0.0", port=8000)
Advertisement
Add Comment
Please, Sign In to add comment