Advertisement
Guest User

Untitled

a guest
Oct 21st, 2019
98
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
Python 19.37 KB | None | 0 0
  1. #! /usr/bin/env python3
  2.  
  3. """This script captures F1 2019 telemetry packets (sent over UDP) and stores them into SQLite3 database files.
  4.  
  5. One database file will contain all packets from one session.
  6.  
  7. From UDP packet to database entry
  8. ---------------------------------
  9.  
  10. The data flow of UDP packets into the database is managed by 2 threads.
  11.  
  12. PacketReceiver thread:
  13.  
  14.  (1) The PacketReceiver thread does a select() to wait on incoming packets in the UDP socket.
  15.  (2) When woken up with the notification that a UDP packet is available for reading, it is actually read from the socket.
  16.  (3) The receiver thread calls the recorder_thread.record_packet() method with a TimedPacket containing
  17.      the reception timestamp and the packet just read.
  18.  (4) The recorder_thread.record_packet() method locks its packet queue, inserts the packet there,
  19.      then unlocks the queue. Note that this method is only called from within the receiver thread!
  20.  (5) repeat from (1).
  21.  
  22. PacketRecorder thread:
  23.  
  24.  (1) The PacketRecorder thread sleeps for a given period, then wakes up.
  25.  (2) It locks its packet queue, moves the queue's packets to a local variable, empties the packet queue,
  26.      then unlocks the packet queue.
  27.  (3) The packets just moved out of the queue are passed to the 'process_incoming_packets' method.
  28.  (4) The 'process_incoming_packets' method inspects the packet headers, and converts the packet data
  29.      into SessionPacket instances that are suitable for inserting into the database.
  30.      In the process, it collects packets from the same session. After collecting all
  31.      available packets from the same session, it passed them on to the
  32.      'process_incoming_same_session_packets' method.
  33.  (5) The 'process_incoming_same_session_packets' method makes sure that the appropriate SQLite database file
  34.      is opened (i.e., the one with matching sessionUID), then writes the packets into the 'packets' table.
  35.  
  36. By decoupling the packet capture and database writing in different threads, we minimize the risk of
  37. dropping UDP packets. This risk is real because SQLite3 database commits can take a considerable time.
  38. """
  39.  
  40. import argparse
  41. import sys
  42. import time
  43. import socket
  44. import sqlite3
  45. import threading
  46. import logging
  47. import ctypes
  48. import selectors
  49.  
  50. from collections import namedtuple
  51.  
  52. from .threading_utils import WaitConsoleThread, Barrier
  53. from ..packets import PacketHeader, PacketID, HeaderFieldsToPacketType, unpack_udp_packet
  54.  
  55. # The type used by the PacketReceiverThread to represent incoming telemetry packets, with timestamp.
  56. TimestampedPacket = namedtuple('TimestampedPacket', 'timestamp, packet')
  57.  
  58. # The type used by the PacketRecorderThread to represent incoming telemetry packets for storage in the SQLite3 database.
  59. SessionPacket = namedtuple('SessionPacket', 'timestamp, packetFormat, gameMajorVersion, gameMinorVersion, packetVersion, packetId, sessionUID, sessionTime, frameIdentifier, playerCarIndex, packet')
  60.  
  61.  
  62. class PacketRecorder:
  63.     """The PacketRecorder records incoming packets to SQLite3 database files.
  64.  
  65.    A single SQLite3 file stores packets from a single session.
  66.    Whenever a new session starts, any open file is closed, and a new database file is created.
  67.    """
  68.  
  69.     # The SQLite3 query that creates the 'packets' table in the database file.
  70.     _create_packets_table_query = """
  71.        CREATE TABLE packets (
  72.            pkt_id            INTEGER  PRIMARY KEY, -- Alias for SQLite3's 'rowid'.
  73.            timestamp         REAL     NOT NULL,    -- The POSIX time right after capturing the telemetry packet.
  74.            packetFormat      INTEGER  NOT NULL,    -- Header field: packet format.
  75.            gameMajorVersion  INTEGER  NOT NULL,    -- Header field: game major version.
  76.            gameMinorVersion  INTEGER  NOT NULL,    -- Header field: game minor version.
  77.            packetVersion     INTEGER  NOT NULL,    -- Header field: packet version.
  78.            packetId          INTEGER  NOT NULL,    -- Header field: packet type ('packetId' is a bit of a misnomer).
  79.            sessionUID        CHAR(16) NOT NULL,    -- Header field: unique session id as hex string.
  80.            sessionTime       REAL     NOT NULL,    -- Header field: session time.
  81.            frameIdentifier   INTEGER  NOT NULL,    -- Header field: frame identifier.
  82.            playerCarIndex    INTEGER  NOT NULL,    -- Header field: player car index.
  83.            packet            BLOB     NOT NULL     -- The packet itself
  84.        );
  85.        """
  86.  
  87.     # The SQLite3 query that inserts packet data into the 'packets' table of an open database file.
  88.     _insert_packets_query = """
  89.        INSERT INTO packets(
  90.            timestamp,
  91.            packetFormat, gameMajorVersion, gameMinorVersion, packetVersion, packetId, sessionUID,
  92.            sessionTime, frameIdentifier, playerCarIndex,
  93.            packet) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
  94.        """
  95.  
  96.     def __init__(self):
  97.         self._conn = None
  98.         self._cursor = None
  99.         self._filename = None
  100.         self._sessionUID = None
  101.  
  102.     def close(self):
  103.         """Make sure that no database remains open."""
  104.         if self._conn is not None:
  105.             self._close_database()
  106.  
  107.     def _open_database(self, sessionUID: str):
  108.         """Open SQLite3 database file and make sure it has the correct schema."""
  109.         assert self._conn is None
  110.         filename = "F1_2019_{:s}.sqlite3".format(sessionUID)
  111.         logging.info("Opening file {!r}.".format(filename))
  112.         conn = sqlite3.connect(filename)
  113.         cursor = conn.cursor()
  114.  
  115.         # Get rid of indentation and superfluous newlines in the 'CREATE TABLE' command.
  116.         query = "".join(line[8:] + "\n" for line in PacketRecorder._create_packets_table_query.split("\n")[1:-1])
  117.  
  118.         # Try to execute the 'CREATE TABLE' statement. If it already exists, this will raise an exception.
  119.         try:
  120.             cursor.execute(query)
  121.         except sqlite3.OperationalError:
  122.             logging.info("    (Appending to existing file.)")
  123.         else:
  124.             logging.info("    (Created new file.)")
  125.  
  126.         self._conn = conn
  127.         self._cursor = cursor
  128.         self._filename = filename
  129.         self._sessionUID = sessionUID
  130.  
  131.     def _close_database(self):
  132.         """Close SQLite3 database file."""
  133.         assert self._conn is not None
  134.         logging.info("Closing file {!r}.".format(self._filename))
  135.         self._cursor.close()
  136.         self._cursor = None
  137.         self._conn.close()
  138.         self._conn = None
  139.         self._filename = None
  140.         self._sessionUID = None
  141.  
  142.     def _insert_and_commit_same_session_packets(self, same_session_packets):
  143.         """Insert session packets to database and commit."""
  144.         assert self._conn is not None
  145.         self._cursor.executemany(PacketRecorder._insert_packets_query, same_session_packets)
  146.         self._conn.commit()
  147.  
  148.     def _process_same_session_packets(self, same_session_packets):
  149.         """Insert packets from the same session into the 'packets' table of the appropriate database file.
  150.  
  151.        Precondition: all packets in 'same_session_packets' are from the same session (identical 'sessionUID' field).
  152.  
  153.        We need to handle four different cases:
  154.  
  155.        (1) 'same_session_packets' is empty:
  156.  
  157.            --> return (no-op).
  158.  
  159.        (2) A database file is currently open, but it stores packets with a different session UID:
  160.  
  161.            --> Close database;
  162.            --> Open database with correct session UID;
  163.            --> Insert 'same_session_packets'.
  164.  
  165.        (3) No database file is currently open:
  166.  
  167.            --> Open database with correct session UID;
  168.            --> Insert 'same_session_packets'.
  169.  
  170.        (4) A database is currently open, with correct session UID:
  171.  
  172.            --> Insert 'same_session_packets'.
  173.        """
  174.  
  175.         if not same_session_packets:
  176.             # Nothing to insert.
  177.             return
  178.  
  179.         if self._conn is not None and self._sessionUID != same_session_packets[0].sessionUID:
  180.             # Close database if it's recording a different session.
  181.             self._close_database()
  182.  
  183.         if self._conn is None:
  184.             # Open database with the correct sessionID.
  185.             self._open_database(same_session_packets[0].sessionUID)
  186.  
  187.         # Write packets.
  188.         self._insert_and_commit_same_session_packets(same_session_packets)
  189.  
  190.     def process_incoming_packets(self, timestamped_packets):
  191.         """Process incoming packets by recording them into the correct database file.
  192.  
  193.        The incoming 'timestamped_packets' is a list of timestamped raw UDP packets.
  194.  
  195.        We process them to a variable 'same_session_packets', which is a list of consecutive
  196.        packets having the same 'sessionUID' field. In this list, each packet is a 11-element tuple
  197.        that can be inserted into the 'packets' table of the database.
  198.  
  199.        The 'same_session_packets' are then passed on to the '_process_same_session_packets'
  200.        method that writes them into the appropriate database file.
  201.        """
  202.  
  203.         t1 = time.monotonic()
  204.  
  205.         # Invariant to be guaranteed: all packets in 'same_session_packets' have the same 'sessionUID' field.
  206.         same_session_packets = []
  207.  
  208.         for (timestamp, packet) in timestamped_packets:
  209.  
  210.             if len(packet) < ctypes.sizeof(PacketHeader):
  211.                 logging.error("Dropped bad packet of size {} (too short).".format(len(packet)))
  212.                 continue
  213.  
  214.             header = PacketHeader.from_buffer_copy(packet)
  215.  
  216.             packet_type_tuple = (header.packetFormat, header.packetVersion, header.packetId)
  217.  
  218.             packet_type = HeaderFieldsToPacketType.get(packet_type_tuple)
  219.             if packet_type is None:
  220.                 logging.error("Dropped unrecognized packet (format, version, id) = {!r}.".format(packet_type_tuple))
  221.                 continue
  222.  
  223.             if len(packet) != ctypes.sizeof(packet_type):
  224.                 logging.error("Dropped packet with unexpected size; "
  225.                               "(format, version, id) = {!r} packet, size = {}, expected {}.".format(
  226.                                   packet_type_tuple, len(packet), ctypes.sizeof(packet_type)))
  227.                 continue
  228.  
  229.             if header.packetId == PacketID.EVENT:  # Log Event packets
  230.                 event_packet = unpack_udp_packet(packet)
  231.                 logging.info("Recording event packet: {}".format(event_packet.eventStringCode.decode()))
  232.  
  233.             # NOTE: the sessionUID is not reliable at the start of a session (in F1 2018, need to check for F1 2019).
  234.             # See: http://forums.codemasters.com/discussion/138130/bug-f1-2018-pc-v1-0-4-udp-telemetry-bad-session-uid-in-first-few-packets-of-a-session
  235.  
  236.             # Create an INSERT-able tuple for the data in this packet.
  237.             #
  238.             # Note that we convert the sessionUID to a 16-digit hex string here.
  239.             # SQLite3 can store 64-bit numbers, but only signed ones.
  240.             # To prevent any issues, we represent the sessionUID as a 16-digit hex string instead.
  241.  
  242.             session_packet = SessionPacket(
  243.                 timestamp,
  244.                 header.packetFormat, header.gameMajorVersion, header.gameMinorVersion,
  245.                 header.packetVersion, header.packetId, "{:016x}".format(header.sessionUID),
  246.                 header.sessionTime, header.frameIdentifier, header.playerCarIndex,
  247.                 packet
  248.             )
  249.  
  250.             if len(same_session_packets) > 0 and same_session_packets[0].sessionUID != session_packet.sessionUID:
  251.                 # Write 'same_session_packets' collected so far to the correct session database, then forget about them.
  252.                 self._process_same_session_packets(same_session_packets)
  253.                 same_session_packets.clear()
  254.  
  255.             same_session_packets.append(session_packet)
  256.  
  257.         # Write 'same_session_packets' to the correct session database, then forget about them.
  258.         # The 'same_session_packets.clear()' is not strictly necessary here, because 'same_session_packets' is about to
  259.         #   go out of scope; but we make it explicit for clarity.
  260.  
  261.         self._process_same_session_packets(same_session_packets)
  262.         same_session_packets.clear()
  263.  
  264.         t2 = time.monotonic()
  265.  
  266.         duration = (t2 - t1)
  267.  
  268.         logging.info("Recorded {} packets in {:.3f} ms.".format(len(timestamped_packets), duration * 1000.0))
  269.  
  270.     def no_packets_received(self, age: float) -> None:
  271.         """No packets were received for a considerable time. If a database file is open, close it."""
  272.         if self._conn is None:
  273.             logging.info("No packets to record for {:.3f} seconds.".format(age))
  274.         else:
  275.             logging.info("No packets to record for {:.3f} seconds; closing file due to inactivity.".format(age))
  276.             self._close_database()
  277.  
  278.  
  279. class PacketRecorderThread(threading.Thread):
  280.     """The PacketRecorderThread writes telemetry data to SQLite3 files."""
  281.  
  282.     def __init__(self, record_interval):
  283.         super().__init__(name='recorder')
  284.         self._record_interval = record_interval
  285.         self._packets = []
  286.         self._packets_lock = threading.Lock()
  287.         self._socketpair = socket.socketpair()
  288.  
  289.     def close(self):
  290.         for sock in self._socketpair:
  291.             sock.close()
  292.  
  293.     def run(self):
  294.         """Receive incoming packets and hand them over the the PacketRecorder.
  295.  
  296.        This method runs in its own thread.
  297.        """
  298.  
  299.         selector = selectors.DefaultSelector()
  300.         key_socketpair = selector.register(self._socketpair[0], selectors.EVENT_READ)
  301.  
  302.         recorder = PacketRecorder()
  303.  
  304.         packets = []
  305.  
  306.         logging.info("Recorder thread started.")
  307.  
  308.         quitflag = False
  309.         inactivity_timer = time.time()
  310.         while not quitflag:
  311.  
  312.             # Calculate the timeout value that will bring us in sync with the next period.
  313.             timeout = (-time.time()) % self._record_interval
  314.             # If the timeout interval is too short, increase its length by 1 period.
  315.             if timeout < 0.5 * self._record_interval:
  316.                 timeout += self._record_interval
  317.  
  318.             for (key, events) in selector.select(timeout):
  319.                 if key == key_socketpair:
  320.                     quitflag = True
  321.  
  322.             # Swap packets, so the 'record_packet' method can be called uninhibited as soon as possible.
  323.             with self._packets_lock:
  324.                 (packets, self._packets) = (self._packets, packets)
  325.  
  326.             if len(packets) != 0:
  327.                 inactivity_timer = packets[-1].timestamp
  328.                 recorder.process_incoming_packets(packets)
  329.                 packets.clear()
  330.             else:
  331.                 t_now = time.time()
  332.                 age = t_now - inactivity_timer
  333.                 recorder.no_packets_received(age)
  334.                 inactivity_timer = t_now
  335.  
  336.         recorder.close()
  337.  
  338.         selector.close()
  339.  
  340.         logging.info("Recorder thread stopped.")
  341.  
  342.     def request_quit(self):
  343.         """Request termination of the PacketRecorderThread.
  344.  
  345.        Called from the main thread to request that we quit.
  346.        """
  347.         self._socketpair[1].send(b'\x00')
  348.  
  349.     def record_packet(self, timestamped_packet):
  350.         """Called from the receiver thread for every UDP packet received."""
  351.         with self._packets_lock:
  352.             self._packets.append(timestamped_packet)
  353.  
  354.  
  355. class PacketReceiverThread(threading.Thread):
  356.     """The PacketReceiverThread receives incoming telemetry packets via the network and passes them to the PacketRecorderThread for storage."""
  357.  
  358.     def __init__(self, udp_port, recorder_thread):
  359.         super().__init__(name='receiver')
  360.         self._udp_port = udp_port
  361.         self._recorder_thread = recorder_thread
  362.         self._socketpair = socket.socketpair()
  363.  
  364.     def close(self):
  365.         for sock in self._socketpair:
  366.             sock.close()
  367.  
  368.     def run(self):
  369.         """Receive incoming packets and hand them over to the PacketRecorderThread.
  370.  
  371.        This method runs in its own thread.
  372.        """
  373.  
  374.         udp_socket = socket.socket(family=socket.AF_INET, type=socket.SOCK_DGRAM)
  375.  
  376.         # Allow multiple receiving endpoints.
  377.         if sys.platform in ['darwin']:
  378.             udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
  379.         elif sys.platform in ['linux', 'win32']:
  380.             udp_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  381.  
  382.         # Accept UDP packets from any host.
  383.         address = ('', self._udp_port)
  384.         udp_socket.bind(address)
  385.  
  386.         selector = selectors.DefaultSelector()
  387.  
  388.         key_udp_socket = selector.register(udp_socket, selectors.EVENT_READ)
  389.         key_socketpair = selector.register(self._socketpair[0], selectors.EVENT_READ)
  390.  
  391.         logging.info("Receiver thread started, reading UDP packets from port {}.".format(self._udp_port))
  392.  
  393.         quitflag = False
  394.         while not quitflag:
  395.             for (key, events) in selector.select():
  396.                 timestamp = time.time()
  397.                 if key == key_udp_socket:
  398.                     # All telemetry UDP packets fit in 2048 bytes with room to spare.
  399.                     packet = udp_socket.recv(2048)
  400.                     timestamped_packet = TimestampedPacket(timestamp, packet)
  401.                     self._recorder_thread.record_packet(timestamped_packet)
  402.                 elif key == key_socketpair:
  403.                     quitflag = True
  404.  
  405.         selector.close()
  406.         udp_socket.close()
  407.         for sock in self._socketpair:
  408.             sock.close()
  409.  
  410.         logging.info("Receiver thread stopped.")
  411.  
  412.     def request_quit(self):
  413.         """Request termination of the PacketReceiverThread.
  414.  
  415.        Called from the main thread to request that we quit.
  416.        """
  417.         self._socketpair[1].send(b'\x00')
  418.  
  419.  
  420. def main():
  421.     """Record incoming telemetry data until the user presses enter."""
  422.  
  423.     # Configure logging.
  424.  
  425.     logging.basicConfig(level=logging.DEBUG, format="%(asctime)-23s | %(threadName)-10s | %(levelname)-5s | %(message)s")
  426.     logging.Formatter.default_msec_format = '%s.%03d'
  427.  
  428.     # Parse command line arguments.
  429.  
  430.     parser = argparse.ArgumentParser(description="Record F1 2019 telemetry data to SQLite3 files.")
  431.  
  432.     parser.add_argument("-p", "--port", default=20777, type=int, help="UDP port to listen to (default: 20777)", dest='port')
  433.     parser.add_argument("-i", "--interval", default=1.0, type=float, help="interval for writing incoming data to SQLite3 file, in seconds (default: 1.0)", dest='interval')
  434.  
  435.     args = parser.parse_args()
  436.  
  437.     # Start recorder thread first, then receiver thread.
  438.  
  439.     quit_barrier = Barrier()
  440.  
  441.     recorder_thread = PacketRecorderThread(args.interval)
  442.     recorder_thread.start()
  443.  
  444.     receiver_thread = PacketReceiverThread(args.port, recorder_thread)
  445.     receiver_thread.start()
  446.  
  447.     wait_console_thread = WaitConsoleThread(quit_barrier)
  448.     wait_console_thread.start()
  449.  
  450.     # Recorder, receiver, and wait_console threads are now active. Run until we're asked to quit.
  451.  
  452.     quit_barrier.wait()
  453.  
  454.     # Stop threads.
  455.  
  456.     wait_console_thread.request_quit()
  457.     wait_console_thread.join()
  458.     wait_console_thread.close()
  459.  
  460.     receiver_thread.request_quit()
  461.     receiver_thread.join()
  462.     receiver_thread.close()
  463.  
  464.     recorder_thread.request_quit()
  465.     recorder_thread.join()
  466.     recorder_thread.close()
  467.  
  468.     # All done.
  469.  
  470.     logging.info("All done.")
  471.  
  472.  
  473. if __name__ == "__main__":
  474.     main()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement