Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- """
- mode01_tester.py
- Minimal script to test OBD-II Mode 01 (live data) using an ELM327.
- """
- import time
- import sys
- PORT = "/dev/rfcomm0" # Change to your port (e.g., COM6, /dev/ttyUSB0)
- # Protocol will determine baud; do not set baud manually here
- BAUD = None
- TIMEOUT = 1.0
- OBD_PIDS = {
- "010C": ("Engine RPM", lambda A, B: ((A * 256) + B) / 4, "rpm"),
- "010D": ("Vehicle Speed", lambda A: A, "km/h"),
- "0105": ("Coolant Temp", lambda A: A - 40, "°C"),
- "0111": ("Throttle Position", lambda A: A * 100 / 255, "%"),
- "0104": ("Engine Load", lambda A: A * 100 / 255, "%"),
- "010F": ("Intake Air Temp", lambda A: A - 40, "°C"),
- "0110": ("MAF Rate", lambda A, B: ((A * 256) + B) / 100, "g/s"),
- "012F": ("Fuel Level", lambda A: A * 100 / 255, "%"),
- "1303": ("Fuel System Status", lambda A, B: f"Status A: {A}, Status B: {B}", ""),
- "1501": ("DTC Count", lambda A: A & 0x7F, "count"), # Number of stored DTCs
- "1503": ("Pending DTC Count", lambda A: A & 0x7F, "count"),
- }
- def _decode_obd_value(val):
- """Decode common python-obd returned values into readable strings.
- Handles bytes/bytearray (decodes to ASCII/UTF-8), lists/tuples of bytes
- or ints (joins sensible representations), and falls back to str().
- """
- if val is None:
- return "NO DATA"
- # Helper to produce hex representation for bytes-like
- def _hex(b):
- try:
- return b.hex().upper()
- except Exception:
- try:
- return bytes(b).hex().upper()
- except Exception:
- return None
- try:
- # If it's an object that wraps a value (many python-obd result types), unwrap common attrs
- if not isinstance(val, (str, bytes, bytearray, list, tuple, int, float, memoryview)):
- for attr in ('value', 'raw', 'data', 'payload'):
- if hasattr(val, attr):
- try:
- inner = getattr(val, attr)
- # recurse
- return _decode_obd_value(inner)
- except Exception:
- pass
- # bytes/bytearray/memoryview -> try decode and also show hex
- if isinstance(val, (bytes, bytearray, memoryview)):
- try:
- b = bytes(val)
- except Exception:
- b = val
- # try textual decode
- try:
- text = b.decode('utf-8', errors='replace')
- except Exception:
- try:
- text = b.decode('latin-1', errors='replace')
- except Exception:
- text = None
- hexs = _hex(b)
- # prefer printable text, but show hex in parenthesis for clarity
- if text:
- text_clean = ''.join(ch for ch in text if ch.isprintable()).strip()
- if text_clean:
- return f"{text_clean} (hex:{hexs})" if hexs else text_clean
- # fallback: show hex only
- return f"hex:{hexs}" if hexs else repr(val)
- # list/tuple often contains ints (bytes) or mixed items
- if isinstance(val, (list, tuple)):
- # if all ints in 0-255, treat as byte array
- if all(isinstance(x, int) and 0 <= x <= 255 for x in val):
- b = bytes(val)
- hexs = _hex(b)
- try:
- text = b.decode('utf-8', errors='replace')
- text_clean = ''.join(ch for ch in text if ch.isprintable()).strip()
- if text_clean:
- return f"{text_clean} (hex:{hexs})"
- except Exception:
- pass
- return f"hex:{hexs}"
- # otherwise join items sensibly
- parts = []
- for item in val:
- try:
- parts.append(_decode_obd_value(item))
- except Exception:
- parts.append(str(item))
- return ','.join(parts)
- # primitive types
- if isinstance(val, str):
- return val
- if isinstance(val, (int, float)):
- return str(val)
- # fallback to repr
- try:
- return repr(val)
- except Exception:
- return '<undecodable>'
- except Exception:
- try:
- return repr(val)
- except Exception:
- return '<undecodable>'
- # Note: Raw serial helpers and AT command fallbacks removed.
- # This script now uses only the `python-obd` library for all communication.
- def parse_pid_response(resp, pid):
- """Parse Mode 01 response (41 xx A B)."""
- tokens = resp.split()
- if "41" not in tokens:
- return None
- try:
- idx = tokens.index("41")
- pid_returned = tokens[idx + 1]
- A = int(tokens[idx + 2], 16)
- B = int(tokens[idx + 3], 16) if len(tokens) > idx + 3 else 0
- name, func, unit = OBD_PIDS[pid]
- value = func(A, B) if func.__code__.co_argcount == 2 else func(A)
- return f"{name}: {value:.1f} {unit}"
- except Exception:
- return None
- def main():
- import argparse as _arg
- p = _arg.ArgumentParser(description="OBD scanner using python-obd")
- p.add_argument("-p", "--port", required=False, default=PORT, help="serial port (e.g. /dev/rfcomm0)")
- p.add_argument("--timeout", type=float, default=TIMEOUT, help="connection timeout")
- p.add_argument("--protocol", type=str, default="3", help="diagnostic protocol to request (e.g. '6' for ISO15765-4). Default is KWP-like '3'")
- p.add_argument("--scan-protos", action="store_true", help="try multiple common protocols until one connects")
- p.add_argument("--discover", choices=['none','kwp','uds','all'], default='kwp', help="discovery mode to run: 'kwp' tries KWP protocol variants; 'uds' tries UDS/CAN variants; 'all' tries both. Default: kwp")
- p.add_argument("--protocols", type=str, default="6,1,2,3,5", help="comma-separated list of protocols to try when --scan-protos is used (eg '6,1,2,3,5')")
- p.add_argument("--dry-run", action="store_true", help="don't open real port")
- p.add_argument("--no-log", action="store_true", help="disable logging")
- p.add_argument("--live", action="store_true", help="query common PIDs after DTC scan")
- p.add_argument("--probe-vin", action="store_true", help="query Mode 09 VIN and basic vehicle info after connect")
- p.add_argument("--did", type=str, default=None, help="optional hex DID to probe using ReadDataByIdentifier (0x22). Example: --did F190")
- p.add_argument("--did-discover", type=str, default=None, help="discover DIDs by range (e.g. 1000-10FF) or comma list (e.g. F190,1001). Scans using 0x22 reads.")
- p.add_argument("--did-delay", type=float, default=0.1, help="delay seconds between DID probes (default 0.1)")
- p.add_argument("--did-out", type=str, default=None, help="optional file to write discovered DIDs")
- p.add_argument("--did-header", type=str, default=None, help="optional header (CAN ID) to use for DID probes, e.g. 7E0")
- p.add_argument("--did-try-headers", action="store_true", help="try a small set of common headers (7E0,7E1,7E8,7DF)")
- p.add_argument("--did-preset", type=str, default=None, help="use a curated preset list of DIDs (e.g. 'bmw')")
- args = p.parse_args()
- if args.no_log:
- global LOG_ENABLED
- LOG_ENABLED = False
- # Dry-run: just print intended actions
- if args.dry_run:
- print("[DRY-RUN] Would open OBD connection on", args.port)
- if args.live:
- print("[DRY-RUN] Would query live PIDs: RPM, SPEED, COOLANT_TEMP, THROTTLE_POS, ENGINE_LOAD")
- return
- # Import obd library
- # Ensure serial.SerialException exists on the serial module (some pyserial installs expose it under serial.serialutil)
- try:
- import serial
- if not hasattr(serial, 'SerialException'):
- try:
- from serial.serialutil import SerialException as _SerialException
- serial.SerialException = _SerialException
- except Exception:
- pass
- # Ensure serial_for_url exists (pyserial usually provides this helper).
- if not hasattr(serial, 'serial_for_url'):
- try:
- # Use the POSIX backend directly for simple device paths (e.g. /dev/rfcomm0)
- from serial import serialposix
- def _serial_for_url(url, **kwargs):
- # For simple filesystem device paths, use Serial directly
- return serialposix.Serial(url, **kwargs)
- serial.serial_for_url = _serial_for_url
- except Exception:
- # best-effort only
- pass
- # Ensure common constants and classes exist on serial module
- try:
- from serial import serialutil as _su
- from serial import serialposix as _sp
- if not hasattr(serial, 'SerialException') and hasattr(_su, 'SerialException'):
- serial.SerialException = _su.SerialException
- # copy common constants
- for const in ('PARITY_NONE','PARITY_EVEN','PARITY_ODD','PARITY_MARK','PARITY_SPACE',
- 'STOPBITS_ONE','STOPBITS_ONE_POINT_FIVE','STOPBITS_TWO',
- 'FIVEBITS','SIXBITS','SEVENBITS','EIGHTBITS'):
- if hasattr(_su, const) and not hasattr(serial, const):
- setattr(serial, const, getattr(_su, const))
- # ensure top-level Serial points to posix Serial when available
- if not hasattr(serial, 'Serial') and hasattr(_sp, 'Serial'):
- serial.Serial = _sp.Serial
- # ensure serial_for_url exists
- if not hasattr(serial, 'serial_for_url') and hasattr(_su, 'serial_for_url'):
- serial.serial_for_url = _su.serial_for_url
- except Exception:
- pass
- except Exception:
- # ignore: serial might not be installed yet; obd import will surface that error
- pass
- try:
- import obd
- from obd import commands as cmds
- except Exception as e:
- print(f"[ERROR] could not import 'obd' library: {e}\nInstall it with: conda activate obd-scraper && pip install obd")
- return
- port = args.port
- timeout = args.timeout
- protocol = args.protocol
- # If RFCOMM, give the link a short moment to settle before opening.
- if port and ("rfcomm" in str(port) or "/dev/rf" in str(port)):
- print("[INFO] RFCOMM port detected; waiting briefly for the link to settle...")
- time.sleep(2.0)
- print(f"[INFO] Opening OBD connection on {port} (protocol-selected baud) ...")
- con = None
- max_attempts = 3
- # Build protocol list if scanning requested
- # Build protocol list based on discover mode or explicit scanning
- proto_list = [protocol]
- if args.scan_protos:
- try:
- proto_list = [p.strip() for p in args.protocols.split(',') if p.strip()]
- except Exception:
- proto_list = [protocol]
- else:
- # If discover mode is provided and not 'none', pick protocol sets
- if args.discover and args.discover != 'none':
- if args.discover == 'kwp':
- # KWP variants: ISO 9141-2 / KWP2000 slow/fast (common numeric aliases used by ELM/obd lib)
- proto_list = ['3', '1', '5'] # 3=ISO9141-2, 1=KWP? (varies), 5=ISO14230 (fast)
- elif args.discover == 'uds':
- # UDS/CAN variants (ISO15765-4 and others)
- proto_list = ['6', '7', '8'] # 6=ISO15765-4, 7/8 placeholder CAN variants
- elif args.discover == 'all':
- proto_list = ['6', '1', '5', '3', '7', '8']
- for proto in proto_list:
- for attempt in range(1, max_attempts + 1):
- try:
- # Do not pass baudrate; let the protocol/adapter negotiation decide
- print(f"[INFO] Trying protocol '{proto}' (attempt {attempt}) ...")
- con = obd.OBD(portstr=port, fast=False, timeout=timeout, protocol=proto)
- except Exception as e:
- print(f"[ERROR] Could not open OBD connection (proto={proto} attempt {attempt}): {e}")
- con = None
- if con is not None and con.is_connected():
- print(f"[OK] OBD connected with protocol {proto} (attempt {attempt}).")
- # update the active protocol variable
- protocol = proto
- break
- else:
- if con is not None:
- try:
- con.close()
- except Exception:
- pass
- print(f"[WARN] OBD not connected (proto={proto} attempt {attempt}). Retrying in 2s...")
- time.sleep(2.0)
- if con is not None and con.is_connected():
- break
- if con is None or not con.is_connected():
- print("[ERROR] Unable to establish OBD connection after retries. Check pairing, port, and adapter baud settings.")
- # continue to attempt queries — the library will return null responses
- # Query DTCs using the library
- # Protocol selection is handled by the python-obd library and adapter auto-detection.
- print("[SCAN] Querying DTCs via python-obd ...")
- try:
- resp = con.query(cmds.GET_DTC)
- except Exception as e:
- print(f"[ERROR] GET_DTC query failed: {e}")
- resp = None
- if resp is None or resp.is_null():
- print("[OK] No DTCs or GET_DTC not supported/returned no data.")
- else:
- # resp.value could be a list of DTC tuples or similar; print defensively
- val = getattr(resp, 'value', None)
- if val is None:
- print("[OK] GET_DTC returned no value.")
- else:
- try:
- # Some versions return list of tuples (code, count)
- if isinstance(val, (list, tuple)):
- if len(val) == 0:
- print("[OK] No DTCs reported.")
- else:
- print("[RESULT] DTCs found:")
- for entry in val:
- print(" ", entry)
- else:
- print("[RESULT] ", val)
- except Exception:
- print("[RESULT] ", val)
- # Probe VIN and basic vehicle info (Mode 09) if requested
- if args.probe_vin:
- print("\n[PROBE] Querying Mode 09 (VIN/calibration IDs) ...")
- # python-obd exposes some Mode-09 commands; try VIN and a few common fields
- vin_cmd = getattr(cmds, 'VIN', None)
- if vin_cmd is not None:
- try:
- r = con.query(vin_cmd)
- if r is None or r.is_null():
- print("VIN: NO DATA")
- else:
- print("VIN:", _decode_obd_value(r.value))
- except Exception as e:
- print("VIN query failed:", e)
- else:
- print("VIN command not available in this python-obd build.")
- # Some adapters/vehicles expose calibration or module IDs via specific Mode-09 PIDs.
- # Try a few common names if present in obd.commands
- cal_like = ['CAL_ID', 'CALIBRATION_ID', 'CVN', 'ECU_NAME']
- for name in cal_like:
- c = getattr(cmds, name, None)
- if c is None:
- continue
- try:
- r = con.query(c)
- print(f"{name}:", _decode_obd_value(r.value) if r and not r.is_null() else "NO DATA")
- except Exception as e:
- print(f"{name} query failed:", e)
- # If user provided a DID, attempt a ReadDataByIdentifier (0x22) probe
- if args.did:
- did_hex = args.did.strip()
- # normalize: accept F190 or 0xF190
- if did_hex.startswith('0x') or did_hex.startswith('0X'):
- did_hex = did_hex[2:]
- # ensure even-length
- if len(did_hex) % 2 != 0:
- did_hex = '0' + did_hex
- try:
- # Build raw command bytes for service 0x22 + DID
- raw = '22' + did_hex.upper()
- print(f"\n[DID] Sending ReadDataByIdentifier (0x22) => {raw}")
- # Create a minimal decode function that returns raw hex bytes
- from obd import OBDCommand, Unit
- def decode_raw(messages):
- # messages is a list of obd.replies.Response or raw strings depending on python-obd
- try:
- # If python-obd returns a Response object, get .value or .data
- m = messages[0]
- txt = str(m)
- except Exception:
- txt = repr(messages)
- return txt
- custom = OBDCommand(name=f"DID_{did_hex}", command=raw, ECU=None, value=decode_raw)
- try:
- r = con.query(custom)
- if r is None or r.is_null():
- print(f"DID {did_hex}: NO DATA or not supported")
- else:
- print(f"DID {did_hex}: {r.value}")
- except Exception as e:
- print(f"DID probe failed: {e}")
- except Exception as e:
- print(f"DID probe setup failed: {e}")
- # Automated DID discovery
- if args.did_discover:
- print(f"\n[DID-DISCOVER] Starting DID discovery: {args.did_discover}")
- # parse range or csv
- did_targets = []
- s = args.did_discover.strip()
- # support presets
- if args.did_preset:
- preset = args.did_preset.lower()
- if preset == 'bmw':
- # BMW commonly uses DIDs in the F1xx/F19x ranges for body/engine modules.
- # Expand to the F100-F1FF block to increase discovery coverage.
- did_targets = [format(x, 'X') for x in range(int('F100', 16), int('F1FF', 16) + 1)]
- else:
- print(f"Unknown DID preset: {args.did_preset}")
- did_targets = []
- elif '-' in s:
- a, b = s.split('-', 1)
- try:
- start = int(a, 16)
- end = int(b, 16)
- if start > end:
- start, end = end, start
- did_targets = [format(x, 'X') for x in range(start, end + 1)]
- except Exception as e:
- print("Invalid DID range:", e)
- did_targets = []
- else:
- for part in s.split(','):
- p = part.strip()
- if p:
- did_targets.append(p.upper().lstrip('0X'))
- discovered = []
- from obd import OBDCommand
- # build header list
- hdrs = []
- if args.did_header:
- hdrs = [args.did_header.strip()]
- elif args.did_try_headers:
- hdrs = ['7E0', '7E1', '7E8', '7DF']
- else:
- hdrs = [None]
- for did_hex in did_targets:
- # normalize
- dh = did_hex.upper()
- if len(dh) % 2 != 0:
- dh = '0' + dh
- raw = '22' + dh
- def decode_raw_once(messages):
- try:
- m = messages[0]
- return str(m)
- except Exception:
- return repr(messages)
- # try each header until a response or exhaustion
- for h in hdrs:
- header_kw = {}
- if h:
- # python-obd expects header as bytes
- try:
- header_kw['header'] = bytes(h, 'ascii')
- except Exception:
- header_kw['header'] = None
- try:
- # pass bytes for command and _bytes to avoid string/bytes concat issues
- cmd = OBDCommand(f"DID_{dh}", f"DID_{dh}", bytes.fromhex(raw), bytes.fromhex(raw), decode_raw_once, **header_kw)
- except TypeError:
- # fallback signature without header
- cmd = OBDCommand(f"DID_{dh}", f"DID_{dh}", bytes.fromhex(raw), bytes.fromhex(raw), decode_raw_once)
- try:
- r = con.query(cmd)
- if r is not None and not r.is_null():
- print(f"DID {dh} (header={h or 'default'}): {r.value}")
- discovered.append((dh, r.value))
- break
- else:
- # no data; try next header
- pass
- except Exception as e:
- # show error and try next header
- print(f"DID {dh} probe error (header={h}): {e}")
- time.sleep(max(0.0, args.did_delay))
- if args.did_out:
- try:
- with open(args.did_out, 'w') as f:
- for dh, val in discovered:
- f.write(f"{dh},{val}\n")
- print(f"Wrote {len(discovered)} discovered DIDs to {args.did_out}")
- except Exception as e:
- print("Failed to write DID output:", e)
- print("[DID-DISCOVER] Finished")
- # Optionally query common PIDs
- if args.live:
- print("\n[MODE 01] Querying common PIDs...")
- pid_cmds = [
- ("RPM", getattr(cmds, 'RPM', None)),
- ("SPEED", getattr(cmds, 'SPEED', None)),
- ("COOLANT_TEMP", getattr(cmds, 'COOLANT_TEMP', None)),
- ("THROTTLE_POS", getattr(cmds, 'THROTTLE_POS', None)),
- ("ENGINE_LOAD", getattr(cmds, 'ENGINE_LOAD', None)),
- ("FUEL_LEVEL", getattr(cmds, 'FUEL_LEVEL', None)),
- ]
- for name, c in pid_cmds:
- if c is None:
- print(f"[SKIP] {name} not available in obd.commands")
- continue
- # if connection dropped, try to reconnect
- if not con.is_connected():
- print(f"[WARN] Connection lost before querying {name}; attempting reconnect...")
- try:
- con = obd.OBD(port, fast=False, timeout=timeout, protocol=protocol)
- except Exception:
- try:
- con = obd.OBD(port, fast=False, timeout=timeout, protocol=protocol)
- except Exception as e:
- print(f"[ERROR] reconnect failed: {e}")
- break
- try:
- r = con.query(c)
- if r is None or r.is_null():
- print(f"{name}: NO DATA")
- else:
- print(f"{name}: {r.value}")
- except Exception as e:
- print(f"{name}: query failed: {e}")
- # Close connection
- try:
- con.close()
- except Exception:
- pass
- print("\n[COMPLETE] OBD scan finished.")
- if __name__ == "__main__":
- main()
Advertisement
Add Comment
Please, Sign In to add comment