Guest User

Untitled

a guest
May 25th, 2026
27
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 11.02 KB | None | 0 0
  1. import requests
  2. import json
  3. import sys
  4. import os
  5. from ddgs import DDGS
  6.  
  7. def validate_path(path, working_directory):
  8.     abs_work_dir = os.path.abspath(working_directory)
  9.     abs_full_path = os.path.abspath(os.path.join(working_directory, path))
  10.     if not (abs_full_path == abs_work_dir or abs_full_path.startswith(abs_work_dir + os.sep)):
  11.         return None, f"Error: Path '{path}' is outside the working directory."
  12.     return abs_full_path, None
  13.  
  14. class Search:
  15.     def run(self, **kwargs):
  16.         arguments = kwargs.get('arguments', {})
  17.         working_directory = kwargs.get('working_directory', '.')
  18.         query = arguments.get('value', '')
  19.         max_results = int(arguments.get('max_results', 5))
  20.         backend = arguments.get('backend', 'auto')
  21.         timeout = float(arguments.get('timeout', 30.0))
  22.         save_to = arguments.get('save_to')
  23.  
  24.         print(f"> search {query}")
  25.  
  26.         try:
  27.             with DDGS() as ddgs:
  28.                 results = list(ddgs.text(query, max_results=max_results, backend=backend))
  29.             result_str = str(results)
  30.            
  31.             if save_to:
  32.                 valid_path, error = validate_path(save_to, working_directory)
  33.                 if error:
  34.                     return error
  35.                 with open(valid_path, 'w') as f:
  36.                     f.write(result_str)
  37.                 return f"Search results saved to {save_to}"
  38.            
  39.             return result_str
  40.         except Exception as e:
  41.             return f"Search error: {e}"
  42.  
  43.     def spec(self):
  44.         return {
  45.             "type": "function",
  46.             "function": {
  47.                 "name": "search",
  48.                 "description": "Perform a web search using DuckDuckGo and add the results to the chat context.",
  49.                 "parameters": {
  50.                     "type": "object",
  51.                     "properties": {
  52.                         "value": {"type": "string", "description": "The search query."},
  53.                         "max_results": {"type": "integer", "description": "Maximum hits per query.", "default": 5},
  54.                         "backend": {"type": "string", "description": "DDG backend.", "default": "auto"},
  55.                         "timeout": {"type": "number", "description": "Seconds to wait for the search.", "default": 30.0},
  56.                         "save_to": {"type": "string", "description": "Optional file path to save results (relative to working directory)."}
  57.                     },
  58.                     "required": ["value"],
  59.                     "additionalProperties": False,
  60.                 },
  61.             },
  62.         }
  63.  
  64. class Fetch:
  65.     def run(self, **kwargs):
  66.         arguments = kwargs.get('arguments', {})
  67.         working_directory = kwargs.get('working_directory', '.')
  68.         url = arguments.get('url', None)
  69.         save_to = arguments.get('save_to')
  70.  
  71.         if url:
  72.             try:
  73.                 resp = requests.get(url, timeout=30)
  74.                 resp.raise_for_status()
  75.                 content = resp.text
  76.                
  77.                 if save_to:
  78.                     valid_path, error = validate_path(save_to, working_directory)
  79.                     if error:
  80.                         return error
  81.                     with open(valid_path, 'w') as f:
  82.                         f.write(content)
  83.                     return f"Content saved to {save_to}"
  84.                
  85.                 return content
  86.             except Exception as e:
  87.                 return f"Failed to read the response: {e}"
  88.         else:
  89.             return f"no url provided"
  90.  
  91.     def spec(self):
  92.         return {
  93.             "type": "function",
  94.             "function": {
  95.                 "name": "fetch",
  96.                 "description": "download a url into context",
  97.                 "parameters": {
  98.                     "type": "object",
  99.                     "properties": {
  100.                         "url": {
  101.                             "type": "string",
  102.                             "description": "url to fetch"
  103.                         },
  104.                         "save_to": {
  105.                             "type": "string",
  106.                             "description": "Optional file path to save content (relative to working directory)."
  107.                         }
  108.                     },
  109.                     "required": ["url"]
  110.                 }
  111.             }
  112.         }
  113.  
  114. class FileIO:
  115.     def run(self, **kwargs):
  116.         arguments = kwargs.get('arguments', {})
  117.         working_directory = kwargs.get('working_directory', '.')
  118.         action = arguments.get('action')
  119.         path = arguments.get('path')
  120.         content = arguments.get('content', '')
  121.  
  122.         valid_path, error = validate_path(path, working_directory)
  123.         if error:
  124.             return error
  125.  
  126.         try:
  127.             if action == 'read':
  128.                 with open(valid_path, 'r') as f:
  129.                     return f.read()
  130.             elif action == 'write':
  131.                 with open(valid_path, 'w') as f:
  132.                     f.write(content)
  133.                 return f"Successfully wrote to {path}"
  134.             elif action == 'append':
  135.                 with open(valid_path, 'a') as f:
  136.                     f.write(content)
  137.                 return f"Successfully appended to {path}"
  138.             elif action == 'list':
  139.                 return str(os.listdir(valid_path))
  140.             else:
  141.                 return f"Unknown action: {action}"
  142.         except Exception as e:
  143.             return f"Error: {e}"
  144.  
  145.     def spec(self):
  146.         return {
  147.             "type": "function",
  148.             "function": {
  149.                 "name": "fileio",
  150.                 "description": "Perform file operations: read, write, append, list.",
  151.                 "parameters": {
  152.                     "type": "object",
  153.                     "properties": {
  154.                         "action": {
  155.                             "type": "string",
  156.                             "enum": ["read", "write", "append", "list"],
  157.                             "description": "The action to perform."
  158.                         },
  159.                         "path": {
  160.                             "type": "string",
  161.                             "description": "The file or directory path (relative)."
  162.                         },
  163.                         "content": {
  164.                             "type": "string",
  165.                             "description": "Content to write (for write or append)."
  166.                         }
  167.                     },
  168.                     "required": ["action", "path"]
  169.                 }
  170.             }
  171.         }
  172.  
  173. class ToolManager:
  174.     def __init__(self, working_directory="."):
  175.         self.tools = {}
  176.         self.working_directory = working_directory
  177.  
  178.     def add(self, tool):
  179.         try:
  180.             self.tools[tool.spec()["function"]["name"]] = tool
  181.         except Exception as e:
  182.             print(f"Error loading tool: {e}")
  183.  
  184.     def spec(self):
  185.         return [tool.spec() for tool in self.tools.values()]
  186.  
  187.     def get(self, name):
  188.         return self.tools[name] if name in self.tools else None
  189.  
  190. def normalize_llama_url(url):
  191.     url = url.rstrip('/')
  192.     if not url.endswith('/v1/chat/completions'):
  193.         url += '/v1/chat/completions'
  194.     return url
  195.  
  196. def parse_args():
  197.     args = sys.argv[1:]
  198.     config = {
  199.         "prompt_file": None,
  200.         "system_prompt": None,
  201.         "working_dir": ".",
  202.         "llama_server": "http://localhost:8080",
  203.         "exit": False
  204.     }
  205.     i = 0
  206.     while i < len(args):
  207.         if args[i] == "--prompt_file" and i + 1 < len(args):
  208.             config["prompt_file"] = args[i+1]
  209.             i += 2
  210.         elif args[i] == "--system_prompt" and i + 1 < len(args):
  211.             config["system_prompt"] = args[i+1]
  212.             i += 2
  213.         elif args[i] == "--working_dir" and i + 1 < len(args):
  214.             config["working_dir"] = args[i+1]
  215.             i += 2
  216.         elif args[i] == "--llama_server" and i + 1 < len(args):
  217.             config["llama_server"] = args[i+1]
  218.             i += 2
  219.         elif args[i] == "--exit":
  220.             config["exit"] = True
  221.             i += 1
  222.         else:
  223.             i += 1
  224.     return config
  225.  
  226. def main():
  227.     config = parse_args()
  228.  
  229.     llama_server = normalize_llama_url(config["llama_server"])
  230.  
  231.     tools = ToolManager(working_directory=config["working_dir"])
  232.     tools.add(Search())
  233.     tools.add(Fetch())
  234.     tools.add(FileIO())
  235.  
  236.     payload = {
  237.         "messages": [
  238.             {"role": "system", "content": "You are a helpful assistant."}
  239.         ],
  240.         "tools": tools.spec(),
  241.         "tool_choice": "auto",
  242.         "temperature": 0
  243.     }
  244.  
  245.     if config["system_prompt"]:
  246.         try:
  247.             with open(config["system_prompt"], "r") as f:
  248.                 payload["messages"][0]["content"] = f.read()
  249.         except Exception as e:
  250.             print(f"Error reading system prompt file: {e}")
  251.  
  252.     headers = {
  253.         "Authorization": "Bearer dummy",
  254.         "Content-Type": "application/json"
  255.     }
  256.  
  257.     user_turn = True
  258.     prompt_file_content = None
  259.     prompt_file_used = False
  260.     if config["prompt_file"]:
  261.         try:
  262.             with open(config["prompt_file"], "r") as f:
  263.                 prompt_file_content = f.read()
  264.         except Exception as e:
  265.             print(f"Error reading prompt file: {e}")
  266.  
  267.     while True:
  268.         if user_turn:
  269.             user_turn = True
  270.             if prompt_file_content is not None:
  271.                 user_message = prompt_file_content
  272.                 prompt_file_content = None
  273.                 prompt_file_used = True
  274.                 user_turn = False
  275.             else:
  276.                 if config["exit"] and prompt_file_used:
  277.                     break
  278.                 user_message = input(">")
  279.                 if user_message in ["exit", "quit"]:
  280.                     break
  281.             payload["messages"].append({"role": "user", "content": user_message})
  282.         else:
  283.             user_turn = True
  284.  
  285.         response = requests.post(llama_server, headers=headers, json=payload).json()
  286.         if "choices" not in response:
  287.             print(f"{response}")
  288.             break
  289.  
  290.         message = response["choices"][0]["message"]
  291.         payload["messages"].append(message)
  292.  
  293.         if "content" in message:
  294.             print(f"{message['role']}: {message['content']}")
  295.         if "reasoning_content" in message:
  296.             print(f"reasoning> {message['reasoning_content']}")
  297.  
  298.         if "tool_calls" in message:
  299.             print("\n=== Tool use ===")
  300.             for tool_call in message["tool_calls"]:
  301.                 tool_response = {"role": "tool", "content": ""}
  302.                 tool = tools.get(tool_call["function"]["name"])
  303.                 if tool:
  304.                     arguments = json.loads(tool_call["function"]["arguments"])
  305.                     tool_response["content"] = tool.run(
  306.                         arguments=arguments,
  307.                         working_directory=tools.working_directory
  308.                     )
  309.                 payload["messages"].append(tool_response)
  310.                 user_turn = False
  311.  
  312. if __name__ == "__main__":
  313.     main()
Advertisement
Add Comment
Please, Sign In to add comment