Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from fastapi import FastAPI
- from mcp.server.fastmcp import FastMCP, Context
- import argparse
- import asyncio
- import uvicorn
- app = FastAPI()
- mcp = FastMCP("sse_log_test")
- @mcp.tool()
- async def log_test(ctx: Context):
- """Log testing tool."""
- await ctx.session.send_log_message("info", "Initial log message")
- await asyncio.sleep(2)
- await ctx.session.send_log_message("info", "Delayed log message")
- return {"status": "Logging completed"}
- app.mount("/", mcp.sse_app())
- def main():
- parser = argparse.ArgumentParser(
- description="Run Simple SSE Logger server")
- parser.add_argument("--port", type=int, default=8888,
- help="Port to run the server on")
- parser.add_argument("--host", type=str, default="0.0.0.0",
- help="Host to bind the server to")
- args = parser.parse_args()
- uvicorn.run(app, host=args.host, port=args.port)
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement