Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from pushover import init, Client
- import obd
- import time
- import serial
- import requests
- import subprocess
- # Flask endpoint
- api_endpoint = "PLACEHOLDER"
- # Define functions
- def dmm_to_dd(latitude_dmm, longitude_dmm):
- # Extracting degrees and decimal minutes for latitude
- lat_sign = -1 if latitude_dmm < 0 else 1
- lat_degrees = abs(int(latitude_dmm)) // 100 # Extracting degrees
- lat_minutes = abs(latitude_dmm) - lat_degrees * 100 # Extracting decimal minutes
- # Extracting degrees and decimal minutes for longitude
- lon_sign = -1 if longitude_dmm < 0 else 1
- lon_degrees = abs(int(longitude_dmm)) // 100 # Extracting degrees
- lon_minutes = abs(longitude_dmm) - lon_degrees * 100 # Extracting decimal minutes
- # Converting to Decimal Degrees (DD)
- latitude_dd = lat_sign * (lat_degrees + lat_minutes / 60)
- longitude_dd = lon_sign * (lon_degrees + lon_minutes / 60)
- return latitude_dd, longitude_dd
- def send_at_command(gps, command, timeout=1):
- gps.write((command + '\r\n').encode())
- time.sleep(timeout)
- return gps.read_all().decode()
- # Initialize OBD connection
- connection = obd.OBD(portstr="/dev/serial/by-id/usb-1a86_USB_Serial-if00-port0")
- #connection = obd.OBD("/dev/pts/2")
- # Initialize GPS serial connection
- gps = serial.Serial('/dev/serial/by-id/usb-SimTech__Incorporated_SimTech__Incorporated_0123456789ABCDEF-if02-port0', baudrate=115200, timeout=1)
- gpsresponse = send_at_command(gps, 'AT+CGPS=1')
- # Initialize pushover notifications
- init("PLACEHOLDER")
- previouslyrunning = False
- previous_dtc_count = 9999
- while True:
- try:
- # Gather GPS data
- gpsresponse = send_at_command(gps, 'AT+CGPSINFO')
- noheader = gpsresponse.split(':')
- gpsdata = noheader[1].split(',')
- # Format GPS data
- gps_latitude_dmm = gpsdata[0]
- if gpsdata[1] == "S":
- gps_latitude_dmm = "-" + str(gps_latitude_dmm)
- gps_longitude_dmm = gpsdata[2]
- if gpsdata[3] == "W":
- gps_longitude_dmm = "-" + str(gps_longitude_dmm)
- # Set GPS data metrics
- latitude_metric, longitude_metric = dmm_to_dd(float(gps_latitude_dmm), float(gps_longitude_dmm))
- gps_data = {
- "longitude": longitude_metric,
- "latitude": latitude_metric,
- }
- # Print GPS formatted data
- print(f"GPS Longitude: {longitude_metric}, "
- f"GPS Latitude: {latitude_metric}")
- if connection.status() == obd.OBDStatus.CAR_CONNECTED:
- # Query OBD metrics and set metric variables
- rpm_response = connection.query(obd.commands.RPM)
- speed_response = connection.query(obd.commands.SPEED)
- throttle_position_response = connection.query(obd.commands.THROTTLE_POS)
- coolant_temp_response = connection.query(obd.commands.COOLANT_TEMP)
- timing_advance_response = connection.query( obd.commands.TIMING_ADVANCE)
- catalyst_temp_response = connection.query(obd.commands.CATALYST_TEMP_B1S1)
- intake_temp_response = connection.query(obd.commands.INTAKE_TEMP)
- fuel_level_response = connection.query(obd.commands.FUEL_LEVEL)
- dtc_response = connection.query(obd.commands.GET_DTC)
- # Detect Ignition OFF
- try:
- rpm_metric = rpm_response.value.magnitude
- except AttributeError:
- rpm_metric = None
- print("Ignition OFF Detected")
- if rpm_metric is None:
- connection = obd.OBD(portstr="/dev/serial/by-id/usb-1a86_USB_Serial-if00-port0")
- speed_metric = speed_response.value.to("mph").magnitude
- throttle_position_metric = throttle_position_response.value.magnitude
- coolant_temp_metric = coolant_temp_response.value.magnitude
- timing_advance_metric = timing_advance_response.value.magnitude
- catalyst_temp_metric = catalyst_temp_response.value.magnitude
- intake_temp_metric = intake_temp_response.value.magnitude
- fuel_level_metric = fuel_level_response.value.magnitude
- dtc_response_metric = dtc_response.value
- metrics_data = {
- "rpm": rpm_metric,
- "speed": speed_metric,
- "throttle_position": throttle_position_metric,
- "coolant_temp": coolant_temp_metric,
- "timing_advance": timing_advance_metric,
- "catalyst_temp": catalyst_temp_metric,
- "intake_temp": intake_temp_metric,
- "fuel_level": fuel_level_metric,
- }
- post_data = {"gps_data": gps_data, "metrics_data": metrics_data, "dtc_codes": dtc_response_metric}
- #Print to terminal ##Fuel Level: {fuel_level_metric} %,
- print(f"Engine RPM: {rpm_metric} RPM, Speed: {speed_metric} MPH, "
- f"Throttle Position: {throttle_position_metric}%, Coolant Temperature: {coolant_temp_metric} C, "
- f"Timing Advance: {timing_advance_metric} degrees, Catalyst Temperature: {catalyst_temp_metric} C, "
- f"Intake Air Temperature: {intake_temp_metric} C, "
- f"DTC Codes: {dtc_response_metric}")
- #Send engine start push notification
- if rpm_metric > 0 and previouslyrunning == False:
- previouslyrunning = True
- print("Engine Started!")
- Client("").send_message("Engine Started!", title="AutoPi Telemetry")
- elif rpm_metric == 0 and previouslyrunning == True:
- previouslyrunning = False
- #Send DTC code push notification if present
- if len(dtc_response_metric) != 0 and previous_dtc_count != len(dtc_response_metric):
- counter = 0
- for element in dtc_response_metric:
- Client("u2cgnp8ugt94pqznpn4z7wfdaocpjq").send_message("DTC #"+str(counter+1)+": "+dtc_response_metric[counter], title="AutoPi Telemetry")
- print("DTC #"+str(counter+1)+": "+dtc_response_metric[counter])
- counter = counter + 1
- previous_dtc_count = len(dtc_response_metric)
- else:
- print("Ignition is OFF, skipping OBD metrics and trying again")
- connection = obd.OBD(portstr="/dev/serial/by-id/usb-1a86_USB_Serial-if00-port0")
- #connection = obd.OBD("/dev/pts/2")
- post_data = {"gps_data": gps_data}
- # Send metrics
- response = requests.post(api_endpoint, json=post_data)
- if response.status_code == 200:
- print("Metrics sent successfully")
- else:
- print("Failed to send metrics:", response.text)
- except Exception as e:
- print(f"Error: {e}")
- time.sleep(1)
Advertisement
Add Comment
Please, Sign In to add comment