Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import requests
- import json
- import sys
- import os
- from ddgs import DDGS
- def validate_path(path, working_directory):
- abs_work_dir = os.path.abspath(working_directory)
- abs_full_path = os.path.abspath(os.path.join(working_directory, path))
- if not (abs_full_path == abs_work_dir or abs_full_path.startswith(abs_work_dir + os.sep)):
- return None, f"Error: Path '{path}' is outside the working directory."
- return abs_full_path, None
- class Search:
- def run(self, **kwargs):
- arguments = kwargs.get('arguments', {})
- working_directory = kwargs.get('working_directory', '.')
- query = arguments.get('value', '')
- max_results = int(arguments.get('max_results', 5))
- backend = arguments.get('backend', 'auto')
- timeout = float(arguments.get('timeout', 30.0))
- save_to = arguments.get('save_to')
- print(f"> search {query}")
- try:
- with DDGS() as ddgs:
- results = list(ddgs.text(query, max_results=max_results, backend=backend))
- result_str = str(results)
- if save_to:
- valid_path, error = validate_path(save_to, working_directory)
- if error:
- return error
- with open(valid_path, 'w') as f:
- f.write(result_str)
- return f"Search results saved to {save_to}"
- return result_str
- except Exception as e:
- return f"Search error: {e}"
- def spec(self):
- return {
- "type": "function",
- "function": {
- "name": "search",
- "description": "Perform a web search using DuckDuckGo and add the results to the chat context.",
- "parameters": {
- "type": "object",
- "properties": {
- "value": {"type": "string", "description": "The search query."},
- "max_results": {"type": "integer", "description": "Maximum hits per query.", "default": 5},
- "backend": {"type": "string", "description": "DDG backend.", "default": "auto"},
- "timeout": {"type": "number", "description": "Seconds to wait for the search.", "default": 30.0},
- "save_to": {"type": "string", "description": "Optional file path to save results (relative to working directory)."}
- },
- "required": ["value"],
- "additionalProperties": False,
- },
- },
- }
- class Fetch:
- def run(self, **kwargs):
- arguments = kwargs.get('arguments', {})
- working_directory = kwargs.get('working_directory', '.')
- url = arguments.get('url', None)
- save_to = arguments.get('save_to')
- if url:
- try:
- resp = requests.get(url, timeout=30)
- resp.raise_for_status()
- content = resp.text
- if save_to:
- valid_path, error = validate_path(save_to, working_directory)
- if error:
- return error
- with open(valid_path, 'w') as f:
- f.write(content)
- return f"Content saved to {save_to}"
- return content
- except Exception as e:
- return f"Failed to read the response: {e}"
- else:
- return f"no url provided"
- def spec(self):
- return {
- "type": "function",
- "function": {
- "name": "fetch",
- "description": "download a url into context",
- "parameters": {
- "type": "object",
- "properties": {
- "url": {
- "type": "string",
- "description": "url to fetch"
- },
- "save_to": {
- "type": "string",
- "description": "Optional file path to save content (relative to working directory)."
- }
- },
- "required": ["url"]
- }
- }
- }
- class FileIO:
- def run(self, **kwargs):
- arguments = kwargs.get('arguments', {})
- working_directory = kwargs.get('working_directory', '.')
- action = arguments.get('action')
- path = arguments.get('path')
- content = arguments.get('content', '')
- valid_path, error = validate_path(path, working_directory)
- if error:
- return error
- try:
- if action == 'read':
- with open(valid_path, 'r') as f:
- return f.read()
- elif action == 'write':
- with open(valid_path, 'w') as f:
- f.write(content)
- return f"Successfully wrote to {path}"
- elif action == 'append':
- with open(valid_path, 'a') as f:
- f.write(content)
- return f"Successfully appended to {path}"
- elif action == 'list':
- return str(os.listdir(valid_path))
- else:
- return f"Unknown action: {action}"
- except Exception as e:
- return f"Error: {e}"
- def spec(self):
- return {
- "type": "function",
- "function": {
- "name": "fileio",
- "description": "Perform file operations: read, write, append, list.",
- "parameters": {
- "type": "object",
- "properties": {
- "action": {
- "type": "string",
- "enum": ["read", "write", "append", "list"],
- "description": "The action to perform."
- },
- "path": {
- "type": "string",
- "description": "The file or directory path (relative)."
- },
- "content": {
- "type": "string",
- "description": "Content to write (for write or append)."
- }
- },
- "required": ["action", "path"]
- }
- }
- }
- class ToolManager:
- def __init__(self, working_directory="."):
- self.tools = {}
- self.working_directory = working_directory
- def add(self, tool):
- try:
- self.tools[tool.spec()["function"]["name"]] = tool
- except Exception as e:
- print(f"Error loading tool: {e}")
- def spec(self):
- return [tool.spec() for tool in self.tools.values()]
- def get(self, name):
- return self.tools[name] if name in self.tools else None
- def normalize_llama_url(url):
- url = url.rstrip('/')
- if not url.endswith('/v1/chat/completions'):
- url += '/v1/chat/completions'
- return url
- def parse_args():
- args = sys.argv[1:]
- config = {
- "prompt_file": None,
- "system_prompt": None,
- "working_dir": ".",
- "llama_server": "http://localhost:8080",
- "exit": False
- }
- i = 0
- while i < len(args):
- if args[i] == "--prompt_file" and i + 1 < len(args):
- config["prompt_file"] = args[i+1]
- i += 2
- elif args[i] == "--system_prompt" and i + 1 < len(args):
- config["system_prompt"] = args[i+1]
- i += 2
- elif args[i] == "--working_dir" and i + 1 < len(args):
- config["working_dir"] = args[i+1]
- i += 2
- elif args[i] == "--llama_server" and i + 1 < len(args):
- config["llama_server"] = args[i+1]
- i += 2
- elif args[i] == "--exit":
- config["exit"] = True
- i += 1
- else:
- i += 1
- return config
- def main():
- config = parse_args()
- llama_server = normalize_llama_url(config["llama_server"])
- tools = ToolManager(working_directory=config["working_dir"])
- tools.add(Search())
- tools.add(Fetch())
- tools.add(FileIO())
- payload = {
- "messages": [
- {"role": "system", "content": "You are a helpful assistant."}
- ],
- "tools": tools.spec(),
- "tool_choice": "auto",
- "temperature": 0
- }
- if config["system_prompt"]:
- try:
- with open(config["system_prompt"], "r") as f:
- payload["messages"][0]["content"] = f.read()
- except Exception as e:
- print(f"Error reading system prompt file: {e}")
- headers = {
- "Authorization": "Bearer dummy",
- "Content-Type": "application/json"
- }
- user_turn = True
- prompt_file_content = None
- prompt_file_used = False
- if config["prompt_file"]:
- try:
- with open(config["prompt_file"], "r") as f:
- prompt_file_content = f.read()
- except Exception as e:
- print(f"Error reading prompt file: {e}")
- while True:
- if user_turn:
- user_turn = True
- if prompt_file_content is not None:
- user_message = prompt_file_content
- prompt_file_content = None
- prompt_file_used = True
- user_turn = False
- else:
- if config["exit"] and prompt_file_used:
- break
- user_message = input(">")
- if user_message in ["exit", "quit"]:
- break
- payload["messages"].append({"role": "user", "content": user_message})
- else:
- user_turn = True
- response = requests.post(llama_server, headers=headers, json=payload).json()
- if "choices" not in response:
- print(f"{response}")
- break
- message = response["choices"][0]["message"]
- payload["messages"].append(message)
- if "content" in message:
- print(f"{message['role']}: {message['content']}")
- if "reasoning_content" in message:
- print(f"reasoning> {message['reasoning_content']}")
- if "tool_calls" in message:
- print("\n=== Tool use ===")
- for tool_call in message["tool_calls"]:
- tool_response = {"role": "tool", "content": ""}
- tool = tools.get(tool_call["function"]["name"])
- if tool:
- arguments = json.loads(tool_call["function"]["arguments"])
- tool_response["content"] = tool.run(
- arguments=arguments,
- working_directory=tools.working_directory
- )
- payload["messages"].append(tool_response)
- user_turn = False
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment