Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import asyncio
- import warnings
- from fastmcp import Client
- # Suppress specific deprecation warnings
- warnings.filterwarnings("ignore", category=DeprecationWarning, message=".*httpx.*")
- warnings.filterwarnings("ignore", category=DeprecationWarning, message=".*pydantic.*")
- # Standard MCP configuration with multiple servers
- config = {
- "mcpServers": {
- "context7": {"url": "https://mcp.context7.com/mcp"},
- }
- }
- # Create a client that connects to all servers
- client = Client(config)
- async def resolve_library_id_async(library_name: str):
- """
- Resolve a library name to a unique library ID using the Context7 API.
- This is an async function.
- """
- async with client:
- response = await client.call_tool("resolve-library-id", {"libraryName": library_name})
- return response
- async def get_library_docs_async(library_id: str, topic: str = "", tokens: int = 10000):
- """
- Get the documentation for a library using its ID and optional topic and token count.
- This is an async function.
- """
- params = {
- "context7CompatibleLibraryID": str(library_id), # Ensure library_id is a string
- "topic": topic, # Pass an empty string for topic if None
- "tokens": tokens
- }
- async with client:
- response = await client.call_tool("get-library-docs", params)
- return response
- def resolve_library_id(library_name: str):
- """
- Synchronous wrapper for resolving library ID.
- Runs inside the current event loop if one exists.
- """
- # Check if we are already in an event loop
- if asyncio.get_event_loop().is_running():
- return asyncio.ensure_future(resolve_library_id_async(library_name))
- else:
- return asyncio.run(resolve_library_id_async(library_name))
- def get_library_docs(library_id: str, topic: str = "", tokens: int = 10000):
- """
- Synchronous wrapper for getting library documentation.
- Runs inside the current event loop if one exists.
- """
- # Check if we are already in an event loop
- if asyncio.get_event_loop().is_running():
- return asyncio.ensure_future(get_library_docs_async(library_id, topic, tokens))
- else:
- return asyncio.run(get_library_docs_async(library_id, topic, tokens))
Advertisement
Add Comment
Please, Sign In to add comment