Guest User

Untitled

a guest
Sep 7th, 2025
17
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.92 KB | None | 0 0
  1. from pushover import init, Client
  2. import obd
  3. import time
  4. import serial
  5. import requests
  6. import subprocess
  7.  
  8. # Flask endpoint
  9. api_endpoint = "PLACEHOLDER"
  10.  
  11. # Define functions
  12. def dmm_to_dd(latitude_dmm, longitude_dmm):
  13. # Extracting degrees and decimal minutes for latitude
  14. lat_sign = -1 if latitude_dmm < 0 else 1
  15. lat_degrees = abs(int(latitude_dmm)) // 100 # Extracting degrees
  16. lat_minutes = abs(latitude_dmm) - lat_degrees * 100 # Extracting decimal minutes
  17.  
  18. # Extracting degrees and decimal minutes for longitude
  19. lon_sign = -1 if longitude_dmm < 0 else 1
  20. lon_degrees = abs(int(longitude_dmm)) // 100 # Extracting degrees
  21. lon_minutes = abs(longitude_dmm) - lon_degrees * 100 # Extracting decimal minutes
  22.  
  23. # Converting to Decimal Degrees (DD)
  24. latitude_dd = lat_sign * (lat_degrees + lat_minutes / 60)
  25. longitude_dd = lon_sign * (lon_degrees + lon_minutes / 60)
  26.  
  27. return latitude_dd, longitude_dd
  28.  
  29. def send_at_command(gps, command, timeout=1):
  30. gps.write((command + '\r\n').encode())
  31. time.sleep(timeout)
  32. return gps.read_all().decode()
  33.  
  34. # Initialize OBD connection
  35. connection = obd.OBD(portstr="/dev/serial/by-id/usb-1a86_USB_Serial-if00-port0")
  36. #connection = obd.OBD("/dev/pts/2")
  37.  
  38. # Initialize GPS serial connection
  39. gps = serial.Serial('/dev/serial/by-id/usb-SimTech__Incorporated_SimTech__Incorporated_0123456789ABCDEF-if02-port0', baudrate=115200, timeout=1)
  40. gpsresponse = send_at_command(gps, 'AT+CGPS=1')
  41.  
  42. # Initialize pushover notifications
  43. init("PLACEHOLDER")
  44. previouslyrunning = False
  45. previous_dtc_count = 9999
  46.  
  47.  
  48. while True:
  49. try:
  50. # Gather GPS data
  51. gpsresponse = send_at_command(gps, 'AT+CGPSINFO')
  52.  
  53. noheader = gpsresponse.split(':')
  54. gpsdata = noheader[1].split(',')
  55.  
  56. # Format GPS data
  57. gps_latitude_dmm = gpsdata[0]
  58. if gpsdata[1] == "S":
  59. gps_latitude_dmm = "-" + str(gps_latitude_dmm)
  60.  
  61. gps_longitude_dmm = gpsdata[2]
  62. if gpsdata[3] == "W":
  63. gps_longitude_dmm = "-" + str(gps_longitude_dmm)
  64.  
  65. # Set GPS data metrics
  66. latitude_metric, longitude_metric = dmm_to_dd(float(gps_latitude_dmm), float(gps_longitude_dmm))
  67.  
  68. gps_data = {
  69. "longitude": longitude_metric,
  70. "latitude": latitude_metric,
  71. }
  72.  
  73. # Print GPS formatted data
  74. print(f"GPS Longitude: {longitude_metric}, "
  75. f"GPS Latitude: {latitude_metric}")
  76.  
  77. if connection.status() == obd.OBDStatus.CAR_CONNECTED:
  78.  
  79. # Query OBD metrics and set metric variables
  80. rpm_response = connection.query(obd.commands.RPM)
  81. speed_response = connection.query(obd.commands.SPEED)
  82. throttle_position_response = connection.query(obd.commands.THROTTLE_POS)
  83. coolant_temp_response = connection.query(obd.commands.COOLANT_TEMP)
  84. timing_advance_response = connection.query( obd.commands.TIMING_ADVANCE)
  85. catalyst_temp_response = connection.query(obd.commands.CATALYST_TEMP_B1S1)
  86. intake_temp_response = connection.query(obd.commands.INTAKE_TEMP)
  87. fuel_level_response = connection.query(obd.commands.FUEL_LEVEL)
  88. dtc_response = connection.query(obd.commands.GET_DTC)
  89.  
  90. # Detect Ignition OFF
  91. try:
  92. rpm_metric = rpm_response.value.magnitude
  93. except AttributeError:
  94. rpm_metric = None
  95. print("Ignition OFF Detected")
  96.  
  97. if rpm_metric is None:
  98. connection = obd.OBD(portstr="/dev/serial/by-id/usb-1a86_USB_Serial-if00-port0")
  99.  
  100. speed_metric = speed_response.value.to("mph").magnitude
  101. throttle_position_metric = throttle_position_response.value.magnitude
  102. coolant_temp_metric = coolant_temp_response.value.magnitude
  103. timing_advance_metric = timing_advance_response.value.magnitude
  104. catalyst_temp_metric = catalyst_temp_response.value.magnitude
  105. intake_temp_metric = intake_temp_response.value.magnitude
  106. fuel_level_metric = fuel_level_response.value.magnitude
  107. dtc_response_metric = dtc_response.value
  108.  
  109. metrics_data = {
  110. "rpm": rpm_metric,
  111. "speed": speed_metric,
  112. "throttle_position": throttle_position_metric,
  113. "coolant_temp": coolant_temp_metric,
  114. "timing_advance": timing_advance_metric,
  115. "catalyst_temp": catalyst_temp_metric,
  116. "intake_temp": intake_temp_metric,
  117. "fuel_level": fuel_level_metric,
  118. }
  119.  
  120. post_data = {"gps_data": gps_data, "metrics_data": metrics_data, "dtc_codes": dtc_response_metric}
  121.  
  122. #Print to terminal ##Fuel Level: {fuel_level_metric} %,
  123. print(f"Engine RPM: {rpm_metric} RPM, Speed: {speed_metric} MPH, "
  124. f"Throttle Position: {throttle_position_metric}%, Coolant Temperature: {coolant_temp_metric} C, "
  125. f"Timing Advance: {timing_advance_metric} degrees, Catalyst Temperature: {catalyst_temp_metric} C, "
  126. f"Intake Air Temperature: {intake_temp_metric} C, "
  127. f"DTC Codes: {dtc_response_metric}")
  128.  
  129. #Send engine start push notification
  130. if rpm_metric > 0 and previouslyrunning == False:
  131. previouslyrunning = True
  132. print("Engine Started!")
  133. Client("").send_message("Engine Started!", title="AutoPi Telemetry")
  134. elif rpm_metric == 0 and previouslyrunning == True:
  135. previouslyrunning = False
  136.  
  137. #Send DTC code push notification if present
  138. if len(dtc_response_metric) != 0 and previous_dtc_count != len(dtc_response_metric):
  139. counter = 0
  140. for element in dtc_response_metric:
  141. Client("u2cgnp8ugt94pqznpn4z7wfdaocpjq").send_message("DTC #"+str(counter+1)+": "+dtc_response_metric[counter], title="AutoPi Telemetry")
  142. print("DTC #"+str(counter+1)+": "+dtc_response_metric[counter])
  143. counter = counter + 1
  144. previous_dtc_count = len(dtc_response_metric)
  145.  
  146. else:
  147. print("Ignition is OFF, skipping OBD metrics and trying again")
  148. connection = obd.OBD(portstr="/dev/serial/by-id/usb-1a86_USB_Serial-if00-port0")
  149. #connection = obd.OBD("/dev/pts/2")
  150.  
  151. post_data = {"gps_data": gps_data}
  152.  
  153. # Send metrics
  154. response = requests.post(api_endpoint, json=post_data)
  155.  
  156. if response.status_code == 200:
  157. print("Metrics sent successfully")
  158. else:
  159. print("Failed to send metrics:", response.text)
  160.  
  161.  
  162. except Exception as e:
  163. print(f"Error: {e}")
  164.  
  165. time.sleep(1)
  166.  
Advertisement
Add Comment
Please, Sign In to add comment