Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import sys
- import dbus, dbus.mainloop.glib
- from gi.repository import GLib
- import subprocess, json, re, time
- import os
- import requests
- import ssl
- import paho.mqtt.client as mqtt
- from example_advertisement import Advertisement, register_ad_cb, register_ad_error_cb
- from example_gatt_server import Service, Characteristic, register_app_cb, register_app_error_cb
- BLUEZ_SERVICE_NAME = 'org.bluez'
- DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
- LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
- GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
- GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
- UART_SERVICE_UUID = '6e400001-b5a3-f393-e0a9-e50e24dcca9e'
- UART_RX_CHARACTERISTIC_UUID = '6e400002-b5a3-f393-e0a9-e50e24dcca9e'
- UART_TX_CHARACTERISTIC_UUID = '6e400003-b5a3-f393-e0a9-e50e24dcca9e'
- LOCAL_NAME = 'rpi-gatt-server'
- mainloop = None
- current_ssid = ""
- current_password = ""
- user_token = ""
- ##################################################################
- # Wi-Fi and Certificate Routines
- ##################################################################
- def scan_wifi():
- print("[DEBUG] Starting Wi-Fi scan...")
- try:
- result = subprocess.check_output(["sudo", "iwlist", "wlan0", "scan"]).decode()
- networks = []
- cells = result.split("Cell ")
- for cell in cells[1:]:
- ssid_match = re.search(r'ESSID:"(.*?)"', cell)
- mac_match = re.search(r'Address: ([\da-fA-F:]{17})', cell)
- if ssid_match and mac_match:
- ssid = ssid_match.group(1)
- if ssid:
- networks.append({"ssid": ssid, "mac": mac_match.group(1)})
- print(f"[DEBUG] Found {len(networks)} Wi-Fi networks.")
- return networks
- except Exception as e:
- print("[DEBUG] Wi-Fi scan failed:", e)
- return []
- def get_default_gateway():
- print("[DEBUG] Getting default gateway...")
- try:
- output = subprocess.check_output(["ip", "route"]).decode()
- for line in output.splitlines():
- if line.startswith("default"):
- gw = line.split()[2]
- print(f"[DEBUG] Default gateway: {gw}")
- return gw
- except Exception as e:
- print("[DEBUG] Failed to get gateway:", e)
- return "192.168.1.1"
- def check_wifi_connection():
- print("[DEBUG] Checking if Pi can ping 8.8.8.8...")
- try:
- subprocess.check_output(["ping", "-c", "1", "8.8.8.8"])
- print("[DEBUG] Internet connectivity is up.")
- return True
- except subprocess.CalledProcessError:
- print("[DEBUG] Internet is NOT reachable.")
- return False
- ##################################################################
- # BLE -> Wi-Fi + Cert + AWS IoT
- ##################################################################
- def connect_to_wifi(ssid, password, token):
- global current_ssid, current_password, user_token
- print(f"[DEBUG] Connecting to SSID: {ssid}")
- try:
- subprocess.run(["sudo", "nmcli", "device", "disconnect", "wlan0"], capture_output=True)
- print("[DEBUG] Disconnected previous connection if any.")
- result = subprocess.run(
- ["sudo", "nmcli", "device", "wifi", "connect", ssid, "password", password],
- capture_output=True, text=True
- )
- if result.returncode != 0:
- print("[DEBUG] nmcli error:", result.stderr)
- TxCharacteristic.instance.send_tx("Wi-Fi connection error")
- return False
- for i in range(15):
- status = subprocess.run(["nmcli", "-t", "-f", "DEVICE,STATE", "device"], capture_output=True, text=True).stdout
- if "connected" in status:
- current_ssid, current_password, user_token = ssid, password, token
- TxCharacteristic.instance.send_tx(f"[DEBUG] Connected to {ssid}")
- print("[DEBUG] Connection successful. Fetching certificates...")
- if download_device_certificates():
- print("[DEBUG] Certificates downloaded. Connecting to AWS IoT...")
- connect_to_aws_iot()
- return True
- time.sleep(1)
- TxCharacteristic.instance.send_tx("[DEBUG] Wi-Fi connection timed out")
- return False
- except Exception as e:
- print("[DEBUG] Exception during Wi-Fi connect:", e)
- TxCharacteristic.instance.send_tx("[DEBUG] Wi-Fi error")
- return False
- def download_device_certificates():
- print("[DEBUG] Starting certificate download with user_token...")
- url = "https://api.naptick.com/user/user-service/iot/certificates"
- headers = {
- "Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySUQiOiIzMTEwNTJlYTgxZjZmMzM2ZjQ2ZjlmY2JkMjRiYzRkNzFhNWI0MjJkMTkwOTIzNGZjMDlhMGRiNWUwNzQ3YzAzIiwiaWF0IjoxNzQzMDY4Njc0LCJleHAiOjE3NDU2NjA2NzR9.4juUOuYY_zocVMsF1YDtgGwR3bvZT3o5CnlaleKgcKk",
- "Content-Type": "application/json"
- }
- payload = {"deviceId": "SOMNUS20"} # Or whichever ID you want
- save_dir = os.path.expanduser("~/Desktop/certificates")
- os.makedirs(save_dir, exist_ok=True)
- print(f"[DEBUG] Certificate files will be saved to: {save_dir}")
- try:
- print(f"[DEBUG] Sending POST request to {url}...")
- response = requests.post(url, json=payload, headers=headers)
- print(f"[DEBUG] Certificate fetch status code: {response.status_code}")
- if response.status_code != 200:
- print("[DEBUG] Non-200 response. Failed to fetch certs.")
- TxCharacteristic.instance.send_tx("[DEBUG] Cert fetch failed")
- return False
- data = response.json()
- print(f"[DEBUG] Certificate response: {data}")
- certificate_url = data.get("data", {}).get("certificate_url")
- private_url = data.get("data", {}).get("private_url")
- if not certificate_url or not private_url:
- print("[DEBUG] One or both URLs are missing in API response.")
- TxCharacteristic.instance.send_tx("[DEBUG] Invalid cert URL response")
- return False
- def download_file(file_url, file_name):
- path = os.path.join(save_dir, file_name)
- print(f"[DEBUG] Downloading {file_name} from {file_url}")
- try:
- with requests.get(file_url, stream=True) as r:
- r.raise_for_status()
- with open(path, 'wb') as f:
- for chunk in r.iter_content(8192):
- f.write(chunk)
- print(f"[DEBUG] Saved: {path}")
- except Exception as e:
- print(f"[DEBUG] Failed to download {file_name}:", e)
- TxCharacteristic.instance.send_tx(f"[DEBUG] Failed to download {file_name}")
- raise
- download_file(certificate_url, "certificate.pem")
- download_file(private_url, "private.key")
- TxCharacteristic.instance.send_tx("[DEBUG] Certificates downloaded successfully")
- return True
- except Exception as e:
- print("[DEBUG] Exception in cert download:", e)
- TxCharacteristic.instance.send_tx("[DEBUG] Cert download error")
- return False
- ##################################################################
- # AWS IoT Connection + Receiving Messages
- ##################################################################
- def connect_to_aws_iot():
- print("[DEBUG] Starting AWS IoT MQTT connection...")
- cert_path = os.path.expanduser("~/Desktop/certificates/certificate.pem")
- key_path = os.path.expanduser("~/Desktop/certificates/private.key")
- ca_path = "/home/somnus/Desktop/certificates/AmazonRootCA1.pem"
- endpoint = "a2w3ko3hrweita-ats.iot.ap-south-1.amazonaws.com" # Replace with your actual endpoint from AWS
- # Updated so we subscribe to "SOMNUS20"
- subscribe_topic = "SOMNUS20"
- def on_connect(client, userdata, flags, rc):
- print(f"[DEBUG] AWS IoT connected with result code {rc}")
- if rc == 0:
- TxCharacteristic.instance.send_tx("[DEBUG] AWS IoT connected")
- # Subscribe to SOMNUS20
- client.subscribe(subscribe_topic, qos=1)
- print(f"[DEBUG] Subscribed to topic: {subscribe_topic}")
- else:
- TxCharacteristic.instance.send_tx(f"[DEBUG] AWS IoT connection failed: {rc}")
- def on_disconnect(client, userdata, rc):
- print(f"[DEBUG] AWS IoT disconnected with result code {rc}")
- def on_message(client, userdata, msg):
- print(f"[DEBUG] Received MQTT message on {msg.topic}")
- payload_str = msg.payload.decode('utf-8', errors='replace')
- print(f"[DEBUG] Raw message payload: {payload_str}")
- try:
- data = json.loads(payload_str)
- print(f"[DEBUG] JSON object: {data}")
- # Send a quick BLE message that we received something
- TxCharacteristic.instance.send_tx("[DEBUG] Received AWS msg!")
- except json.JSONDecodeError:
- print("[DEBUG] Payload not valid JSON")
- client = mqtt.Client()
- client.on_connect = on_connect
- client.on_disconnect = on_disconnect
- client.on_message = on_message
- try:
- for path in [cert_path, key_path, ca_path]:
- if not os.path.exists(path):
- print(f"[DEBUG] Missing cert file: {path}")
- TxCharacteristic.instance.send_tx(f"[DEBUG] Missing file: {path}")
- return
- client.tls_set(
- ca_certs=ca_path,
- certfile=cert_path,
- keyfile=key_path,
- tls_version=ssl.PROTOCOL_TLSv1_2
- )
- client.tls_insecure_set(False)
- client.connect(endpoint, 8883, keepalive=60)
- client.loop_start()
- time.sleep(5)
- # We'll keep the loop running so we can receive messages from AWS
- print("[DEBUG] AWS IoT loop started. Listening for inbound messages...")
- except Exception as e:
- print("[DEBUG] AWS IoT connection error:", e)
- TxCharacteristic.instance.send_tx("[DEBUG] AWS IoT connect error")
- ##################################################################
- # BLE Characteristics
- ##################################################################
- class TxCharacteristic(Characteristic):
- instance = None
- def __init__(self, bus, index, service):
- super().__init__(bus, index, UART_TX_CHARACTERISTIC_UUID, ['notify'], service)
- self.notifying = False
- TxCharacteristic.instance = self
- def send_tx(self, msg):
- if not self.notifying:
- return
- try:
- encoded = msg.encode('utf-8')
- chunks = [encoded[i:i+20] for i in range(0, len(encoded), 20)]
- for chunk in chunks:
- value = [dbus.Byte(b) for b in chunk]
- self.PropertiesChanged(GATT_CHRC_IFACE, {'Value': value}, [])
- except Exception as e:
- print("[DEBUG] TX send error:", e)
- def StartNotify(self):
- print("[DEBUG] TX notifications started")
- self.notifying = True
- def StopNotify(self):
- print("[DEBUG] TX notifications stopped")
- self.notifying = False
- class RxCharacteristic(Characteristic):
- def __init__(self, bus, index, service, tx_char):
- super().__init__(bus, index, UART_RX_CHARACTERISTIC_UUID, ['write'], service)
- self.tx_char = tx_char
- def WriteValue(self, value, options):
- global current_ssid, current_password, user_token
- msg = bytearray(value).decode().strip()
- print(f"[DEBUG] Received from phone: {msg}")
- try:
- data = json.loads(msg)
- except Exception as e:
- print("[DEBUG] JSON parse error:", e)
- self.tx_char.send_tx("Bad JSON format")
- return
- action = data.get("action")
- if action == "SCAN":
- self.tx_char.send_tx("[DEBUG] SCAN command received")
- networks = scan_wifi()
- self.tx_char.send_tx(json.dumps(networks))
- elif action == "CONNECT_WIFI":
- ssid = data.get("ssid")
- password = data.get("password")
- token = data.get("user_token")
- if ssid and password and token:
- self.tx_char.send_tx(f"[DEBUG] Connecting to {ssid}")
- connect_to_wifi(ssid, password, token)
- else:
- self.tx_char.send_tx("Missing fields in JSON")
- else:
- self.tx_char.send_tx("Unknown action")
- ##################################################################
- # BLE Service / App / Advertisement
- ##################################################################
- class UartService(Service):
- def __init__(self, bus, index):
- super().__init__(bus, index, UART_SERVICE_UUID, True)
- self.tx = TxCharacteristic(bus, 0, self)
- self.rx = RxCharacteristic(bus, 1, self, self.tx)
- self.add_characteristic(self.tx)
- self.add_characteristic(self.rx)
- class Application(dbus.service.Object):
- def __init__(self, bus):
- self.path = '/'
- self.services = []
- super().__init__(bus, self.path)
- def get_path(self):
- return dbus.ObjectPath(self.path)
- def add_service(self, service):
- self.services.append(service)
- @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
- def GetManagedObjects(self):
- result = {}
- for service in self.services:
- result[service.get_path()] = service.get_properties()
- for chrc in service.get_characteristics():
- result[chrc.get_path()] = chrc.get_properties()
- return result
- class UartApplication(Application):
- def __init__(self, bus):
- super().__init__(bus)
- self.add_service(UartService(bus, 0))
- class UartAdvertisement(Advertisement):
- def __init__(self, bus, index):
- super().__init__(bus, index, 'peripheral')
- self.add_service_uuid(UART_SERVICE_UUID)
- self.add_local_name(LOCAL_NAME)
- self.include_tx_power = True
- def find_adapter(bus):
- remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
- objects = remote_om.GetManagedObjects()
- for o, props in objects.items():
- if LE_ADVERTISING_MANAGER_IFACE in props and GATT_MANAGER_IFACE in props:
- print(f"[DEBUG] Found BLE adapter: {o}")
- return o
- print("[DEBUG] No BLE adapter found.")
- return None
- ##################################################################
- # MAIN
- ##################################################################
- def main():
- global mainloop
- dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
- bus = dbus.SystemBus()
- adapter = find_adapter(bus)
- if not adapter:
- return
- service_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), GATT_MANAGER_IFACE)
- ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), LE_ADVERTISING_MANAGER_IFACE)
- app = UartApplication(bus)
- adv = UartAdvertisement(bus, 0)
- mainloop = GLib.MainLoop()
- print("[DEBUG] Registering GATT app and BLE advertisement...")
- service_manager.RegisterApplication(app.get_path(), {},
- reply_handler=register_app_cb,
- error_handler=register_app_error_cb)
- ad_manager.RegisterAdvertisement(adv.get_path(), {},
- reply_handler=register_ad_cb,
- error_handler=register_ad_error_cb)
- def check_and_reconnect():
- if not check_wifi_connection():
- TxCharacteristic.instance.send_tx("[DEBUG] Wi-Fi lost. Reconnecting...")
- connect_to_wifi(current_ssid, current_password, user_token)
- return True
- GLib.timeout_add_seconds(60, check_and_reconnect)
- print("[DEBUG] Entering mainloop. Ready to receive BLE commands.")
- try:
- mainloop.run()
- except KeyboardInterrupt:
- print("[DEBUG] Interrupted. Releasing BLE advertisement...")
- adv.Release()
- if __name__ == '__main__':
- main()
Add Comment
Please, Sign In to add comment