Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- """
- VTTGrabber - Working Version with Driving + Fixed TAB
- """
- import asyncio
- import json
- import evdev
- from evdev import ecodes
- import websockets
- import subprocess
- import sys
- import time
- import signal
- import urllib.request
- SECONDARY_MOUSE = "/dev/input/by-id/usb-30fa_USB_OPTICAL_MOUSE-event-mouse"
- SECONDARY_KBD = "/dev/input/by-id/usb-Usb_KeyBoard_Usb_KeyBoard-event-kbd"
- CDP_URL = "ws://localhost:9222/devtools/browser"
- SCREEN_WIDTH = 1920
- SCREEN_HEIGHT = 1080
- class VTTGrabber:
- def __init__(self):
- self.mouse = None
- self.keyboard = None
- self.ws = None
- self.target_id = None
- self.mouse_x = SCREEN_WIDTH // 2
- self.mouse_y = SCREEN_HEIGHT // 2
- self.running = True
- self.scroll_accumulator = 0
- self.msg_counter = 0
- async def find_chromium_target(self):
- try:
- req = urllib.request.Request("http://localhost:9222/json", headers={"Host": "localhost:9222"})
- with urllib.request.urlopen(req, timeout=5) as response:
- targets = json.loads(response.read().decode())
- for target in targets:
- if target["type"] == "page":
- self.target_id = target["id"]
- print(f"Found target: {target.get('title', 'Unknown')}")
- return target["webSocketDebuggerUrl"]
- except Exception as e:
- print(f"Error: {e}")
- return None
- async def connect_cdp(self):
- ws_url = await self.find_chromium_target()
- if not ws_url:
- raise ConnectionError("No Chromium target")
- self.ws = await websockets.connect(ws_url, ping_interval=None)
- print("Connected to CDP")
- await self.send_cdp("Input.enable", {})
- await self.send_cdp("Runtime.enable", {})
- asyncio.create_task(self._drain_ws())
- async def _drain_ws(self):
- try:
- while self.running and self.ws:
- try:
- msg = await asyncio.wait_for(self.ws.recv(), timeout=0.1)
- data = json.loads(msg)
- if "error" in data:
- print(f"CDP Error: {data['error']}")
- except asyncio.TimeoutError:
- continue
- except Exception:
- break
- except Exception:
- pass
- async def send_cdp(self, method, params):
- if not self.ws:
- return
- self.msg_counter += 1
- msg = {"id": self.msg_counter, "method": method, "params": params}
- try:
- await self.ws.send(json.dumps(msg))
- except Exception as e:
- print(f"CDP error: {e}")
- async def setup_input_devices(self):
- try:
- self.mouse = evdev.InputDevice(SECONDARY_MOUSE)
- self.keyboard = evdev.InputDevice(SECONDARY_KBD)
- print(f"Mouse: {self.mouse.name}")
- print(f"Keyboard: {self.keyboard.name}")
- self.mouse.grab()
- self.keyboard.grab()
- print("Devices grabbed")
- except PermissionError:
- print("Run with sudo")
- sys.exit(1)
- except Exception as e:
- print(f"Error: {e}")
- sys.exit(1)
- async def inject_cursor(self):
- cursor_script = """
- (function() {
- if (document.getElementById('vtt-cursor')) return;
- const c = document.createElement('div');
- c.id = 'vtt-cursor';
- c.style.cssText = 'position:fixed;width:20px;height:20px;background:#ff0055;border:2px solid white;border-radius:50%;pointer-events:none;z-index:999999;left:0;top:0;transform:translate(960px,540px);';
- document.body.appendChild(c);
- })();
- """
- await self.send_cdp("Runtime.evaluate", {"expression": cursor_script})
- async def update_cursor_position(self, x, y):
- move_script = f"const c=document.getElementById('vtt-cursor');if(c)c.style.transform='translate({x}px,{y}px)';"
- await self.send_cdp("Runtime.evaluate", {"expression": move_script})
- async def route_mouse(self):
- print("Mouse routing started")
- async for event in self.mouse.async_read_loop():
- if not self.running:
- break
- if event.type == ecodes.EV_REL:
- if event.code == ecodes.REL_X:
- self.mouse_x = max(0, min(SCREEN_WIDTH - 1, self.mouse_x + event.value))
- elif event.code == ecodes.REL_Y:
- self.mouse_y = max(0, min(SCREEN_HEIGHT - 1, self.mouse_y + event.value))
- await self.send_cdp("Input.dispatchMouseEvent", {
- "type": "mouseMoved", "x": self.mouse_x, "y": self.mouse_y, "modifiers": 0
- })
- await self.update_cursor_position(self.mouse_x, self.mouse_y)
- elif event.type == ecodes.EV_REL and event.code == ecodes.REL_WHEEL:
- self.scroll_accumulator += event.value
- if abs(self.scroll_accumulator) >= 1:
- await self.send_cdp("Input.dispatchMouseEvent", {
- "type": "mouseWheel", "x": self.mouse_x, "y": self.mouse_y,
- "deltaX": 0, "deltaY": int(self.scroll_accumulator * 100), "modifiers": 0
- })
- self.scroll_accumulator = 0
- elif event.type == ecodes.EV_KEY:
- if event.code in (ecodes.BTN_LEFT, ecodes.BTN_RIGHT, ecodes.BTN_MIDDLE):
- button = {ecodes.BTN_LEFT: "left", ecodes.BTN_RIGHT: "right", ecodes.BTN_MIDDLE: "middle"}[event.code]
- action = "mousePressed" if event.value == 1 else "mouseReleased"
- await self.send_cdp("Input.dispatchMouseEvent", {
- "type": action, "x": self.mouse_x, "y": self.mouse_y,
- "button": button, "clickCount": 1, "modifiers": 0
- })
- async def route_keyboard(self):
- print("Keyboard routing started")
- # Full key mapping (base_key, code, shifted_key)
- key_map = {
- ecodes.KEY_A: ("a", "KeyA", "A"), ecodes.KEY_B: ("b", "KeyB", "B"),
- ecodes.KEY_C: ("c", "KeyC", "C"), ecodes.KEY_D: ("d", "KeyD", "D"),
- ecodes.KEY_E: ("e", "KeyE", "E"), ecodes.KEY_F: ("f", "KeyF", "F"),
- ecodes.KEY_G: ("g", "KeyG", "G"), ecodes.KEY_H: ("h", "KeyH", "H"),
- ecodes.KEY_I: ("i", "KeyI", "I"), ecodes.KEY_J: ("j", "KeyJ", "J"),
- ecodes.KEY_K: ("k", "KeyK", "K"), ecodes.KEY_L: ("l", "KeyL", "L"),
- ecodes.KEY_M: ("m", "KeyM", "M"), ecodes.KEY_N: ("n", "KeyN", "N"),
- ecodes.KEY_O: ("o", "KeyO", "O"), ecodes.KEY_P: ("p", "KeyP", "P"),
- ecodes.KEY_Q: ("q", "KeyQ", "Q"), ecodes.KEY_R: ("r", "KeyR", "R"),
- ecodes.KEY_S: ("s", "KeyS", "S"), ecodes.KEY_T: ("t", "KeyT", "T"),
- ecodes.KEY_U: ("u", "KeyU", "U"), ecodes.KEY_V: ("v", "KeyV", "V"),
- ecodes.KEY_W: ("w", "KeyW", "W"), ecodes.KEY_X: ("x", "KeyX", "X"),
- ecodes.KEY_Y: ("y", "KeyY", "Y"), ecodes.KEY_Z: ("z", "KeyZ", "Z"),
- ecodes.KEY_1: ("1", "Digit1", "!"), ecodes.KEY_2: ("2", "Digit2", "@"),
- ecodes.KEY_3: ("3", "Digit3", "#"), ecodes.KEY_4: ("4", "Digit4", "$"),
- ecodes.KEY_5: ("5", "Digit5", "%"), ecodes.KEY_6: ("6", "Digit6", "^"),
- ecodes.KEY_7: ("7", "Digit7", "&"), ecodes.KEY_8: ("8", "Digit8", "*"),
- ecodes.KEY_9: ("9", "Digit9", "("), ecodes.KEY_0: ("0", "Digit0", ")"),
- ecodes.KEY_SPACE: (" ", "Space", " "),
- ecodes.KEY_ENTER: ("Enter", "Enter", "Enter"),
- ecodes.KEY_BACKSPACE: ("Backspace", "Backspace", "Backspace"),
- ecodes.KEY_ESC: ("Escape", "Escape", "Escape"),
- ecodes.KEY_LEFT: ("ArrowLeft", "ArrowLeft", "ArrowLeft"),
- ecodes.KEY_RIGHT: ("ArrowRight", "ArrowRight", "ArrowRight"),
- ecodes.KEY_UP: ("ArrowUp", "ArrowUp", "ArrowUp"),
- ecodes.KEY_DOWN: ("ArrowDown", "ArrowDown", "ArrowDown"),
- ecodes.KEY_MINUS: ("-", "Minus", "_"),
- ecodes.KEY_EQUAL: ("=", "Equal", "+"),
- ecodes.KEY_LEFTBRACE: ("[", "BracketLeft", "{"),
- ecodes.KEY_RIGHTBRACE: ("]", "BracketRight", "}"),
- ecodes.KEY_BACKSLASH: ("\\", "Backslash", "|"),
- ecodes.KEY_SEMICOLON: (";", "Semicolon", ":"),
- ecodes.KEY_APOSTROPHE: ("'", "Quote", '"'),
- ecodes.KEY_GRAVE: ("`", "Backquote", "~"),
- ecodes.KEY_COMMA: (",", "Comma", "<"),
- ecodes.KEY_DOT: (".", "Period", ">"),
- ecodes.KEY_SLASH: ("/", "Slash", "?"),
- }
- shift_pressed = False
- async for event in self.keyboard.async_read_loop():
- if not self.running:
- break
- if event.type == ecodes.EV_KEY:
- keycode = event.code
- value = event.value
- # Track shift
- if keycode in (ecodes.KEY_LEFTSHIFT, ecodes.KEY_RIGHTSHIFT):
- shift_pressed = value > 0
- # Handle TAB via JavaScript injection (cycles Foundry elements)
- if keycode == ecodes.KEY_TAB and value == 1:
- js = """
- (function() {
- const evt = new KeyboardEvent('keydown', {
- key: 'Tab',
- code: 'Tab',
- keyCode: 9,
- which: 9,
- bubbles: true,
- cancelable: true
- });
- (document.activeElement || document.body).dispatchEvent(evt);
- })();
- """
- await self.send_cdp("Runtime.evaluate", {"expression": js})
- continue
- if keycode in key_map:
- base_key, code, shifted_key = key_map[keycode]
- key = shifted_key if shift_pressed else base_key
- if value == 1: # Key down
- await self.send_cdp("Input.dispatchKeyEvent", {
- "type": "keyDown", "key": key, "code": code, "modifiers": 0
- })
- elif value == 0: # Key up
- await self.send_cdp("Input.dispatchKeyEvent", {
- "type": "keyUp", "key": key, "code": code, "modifiers": 0
- })
- async def run(self):
- print("VTTGrabber starting...")
- await self.setup_input_devices()
- await self.connect_cdp()
- print("Connected!")
- await self.inject_cursor()
- await asyncio.gather(self.route_mouse(), self.route_keyboard())
- def cleanup(self):
- self.running = False
- if self.mouse:
- try: self.mouse.ungrab()
- except: pass
- if self.keyboard:
- try: self.keyboard.ungrab()
- except: pass
- print("Released devices")
- if __name__ == "__main__":
- grabber = VTTGrabber()
- def signal_handler(sig, frame):
- grabber.cleanup()
- sys.exit(0)
- signal.signal(signal.SIGINT, signal_handler)
- try:
- asyncio.run(grabber.run())
- except KeyboardInterrupt:
- grabber.cleanup()
- except Exception as e:
- print(f"Error: {e}")
- grabber.cleanup()
Advertisement