Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- import asyncio
- import json
- import zlib
- import zmq
- import zmq.asyncio
- SOCKET_URI = "tcp://eddn.edcd.io:9500"
- ALLOWED_SCHEMA = (
- # "https://eddn.edcd.io/schemas/blackmarket/1",
- # "https://eddn.edcd.io/schemas/commodity/3",
- "https://eddn.edcd.io/schemas/journal/1",
- # "https://eddn.edcd.io/schemas/outfitting/2",
- # "https://eddn.edcd.io/schemas/shipyard/2",
- )
- ALLOWED_EVENTS = (
- "FSDJump",
- "Scan",
- )
- ctx = zmq.asyncio.Context.instance()
- async def main():
- sock = ctx.socket(zmq.SUB)
- sock.connect(SOCKET_URI)
- sock.subscribe(b"")
- while True:
- raw, = await sock.recv_multipart()
- try:
- ref, _, msg, = json.loads(zlib.decompress(raw)).values()
- if ref in ALLOWED_SCHEMA:
- if msg["event"] in ALLOWED_EVENTS:
- sys_id64 = msg["SystemAddress"]
- body_id = msg.get("BodyID") or 0
- body_id64 = sys_id64 + (body_id << 55)
- sys_name = msg["StarSystem"]
- body_name = msg.get("BodyName") or "<None>"
- print(f"{sys_name:<40} -> {body_name:<40}")
- except Exception as exc:
- print(exc)
- sock.close()
- if __name__ == "__main__":
- loop = asyncio.get_event_loop()
- try:
- loop.run_until_complete(main())
- except KeyboardInterrupt:
- pass
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement