Guest User

wifi_BLE_token

a guest
Apr 7th, 2025
11
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import sys
  2. import dbus, dbus.mainloop.glib
  3. from gi.repository import GLib
  4. import subprocess, json, re, time
  5. import os
  6. import requests
  7. import ssl
  8. import paho.mqtt.client as mqtt
  9.  
  10. from example_advertisement import Advertisement, register_ad_cb, register_ad_error_cb
  11. from example_gatt_server import Service, Characteristic, register_app_cb, register_app_error_cb
  12.  
  13. BLUEZ_SERVICE_NAME = 'org.bluez'
  14. DBUS_OM_IFACE = 'org.freedesktop.DBus.ObjectManager'
  15. LE_ADVERTISING_MANAGER_IFACE = 'org.bluez.LEAdvertisingManager1'
  16. GATT_MANAGER_IFACE = 'org.bluez.GattManager1'
  17. GATT_CHRC_IFACE = 'org.bluez.GattCharacteristic1'
  18.  
  19. UART_SERVICE_UUID = '6e400001-b5a3-f393-e0a9-e50e24dcca9e'
  20. UART_RX_CHARACTERISTIC_UUID = '6e400002-b5a3-f393-e0a9-e50e24dcca9e'
  21. UART_TX_CHARACTERISTIC_UUID = '6e400003-b5a3-f393-e0a9-e50e24dcca9e'
  22. LOCAL_NAME = 'rpi-gatt-server'
  23. mainloop = None
  24.  
  25. current_ssid = ""
  26. current_password = ""
  27. user_token = ""
  28.  
  29. ##################################################################
  30. # Wi-Fi and Certificate Routines
  31. ##################################################################
  32.  
  33. def scan_wifi():
  34. print("[DEBUG] Starting Wi-Fi scan...")
  35. try:
  36. result = subprocess.check_output(["sudo", "iwlist", "wlan0", "scan"]).decode()
  37. networks = []
  38. cells = result.split("Cell ")
  39. for cell in cells[1:]:
  40. ssid_match = re.search(r'ESSID:"(.*?)"', cell)
  41. mac_match = re.search(r'Address: ([\da-fA-F:]{17})', cell)
  42. if ssid_match and mac_match:
  43. ssid = ssid_match.group(1)
  44. if ssid:
  45. networks.append({"ssid": ssid, "mac": mac_match.group(1)})
  46. print(f"[DEBUG] Found {len(networks)} Wi-Fi networks.")
  47. return networks
  48. except Exception as e:
  49. print("[DEBUG] Wi-Fi scan failed:", e)
  50. return []
  51.  
  52. def get_default_gateway():
  53. print("[DEBUG] Getting default gateway...")
  54. try:
  55. output = subprocess.check_output(["ip", "route"]).decode()
  56. for line in output.splitlines():
  57. if line.startswith("default"):
  58. gw = line.split()[2]
  59. print(f"[DEBUG] Default gateway: {gw}")
  60. return gw
  61. except Exception as e:
  62. print("[DEBUG] Failed to get gateway:", e)
  63. return "192.168.1.1"
  64.  
  65. def check_wifi_connection():
  66. print("[DEBUG] Checking if Pi can ping 8.8.8.8...")
  67. try:
  68. subprocess.check_output(["ping", "-c", "1", "8.8.8.8"])
  69. print("[DEBUG] Internet connectivity is up.")
  70. return True
  71. except subprocess.CalledProcessError:
  72. print("[DEBUG] Internet is NOT reachable.")
  73. return False
  74.  
  75. ##################################################################
  76. # BLE -> Wi-Fi + Cert + AWS IoT
  77. ##################################################################
  78.  
  79. def connect_to_wifi(ssid, password, token):
  80. global current_ssid, current_password, user_token
  81. print(f"[DEBUG] Connecting to SSID: {ssid}")
  82. try:
  83. subprocess.run(["sudo", "nmcli", "device", "disconnect", "wlan0"], capture_output=True)
  84. print("[DEBUG] Disconnected previous connection if any.")
  85. result = subprocess.run(
  86. ["sudo", "nmcli", "device", "wifi", "connect", ssid, "password", password],
  87. capture_output=True, text=True
  88. )
  89. if result.returncode != 0:
  90. print("[DEBUG] nmcli error:", result.stderr)
  91. TxCharacteristic.instance.send_tx("Wi-Fi connection error")
  92. return False
  93.  
  94. for i in range(15):
  95. status = subprocess.run(["nmcli", "-t", "-f", "DEVICE,STATE", "device"], capture_output=True, text=True).stdout
  96. if "connected" in status:
  97. current_ssid, current_password, user_token = ssid, password, token
  98. TxCharacteristic.instance.send_tx(f"[DEBUG] Connected to {ssid}")
  99. print("[DEBUG] Connection successful. Fetching certificates...")
  100. if download_device_certificates():
  101. print("[DEBUG] Certificates downloaded. Connecting to AWS IoT...")
  102. connect_to_aws_iot()
  103. return True
  104. time.sleep(1)
  105. TxCharacteristic.instance.send_tx("[DEBUG] Wi-Fi connection timed out")
  106. return False
  107. except Exception as e:
  108. print("[DEBUG] Exception during Wi-Fi connect:", e)
  109. TxCharacteristic.instance.send_tx("[DEBUG] Wi-Fi error")
  110. return False
  111.  
  112. def download_device_certificates():
  113. print("[DEBUG] Starting certificate download with user_token...")
  114.  
  115. url = "https://api.naptick.com/user/user-service/iot/certificates"
  116. headers = {
  117. "Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VySUQiOiIzMTEwNTJlYTgxZjZmMzM2ZjQ2ZjlmY2JkMjRiYzRkNzFhNWI0MjJkMTkwOTIzNGZjMDlhMGRiNWUwNzQ3YzAzIiwiaWF0IjoxNzQzMDY4Njc0LCJleHAiOjE3NDU2NjA2NzR9.4juUOuYY_zocVMsF1YDtgGwR3bvZT3o5CnlaleKgcKk",
  118. "Content-Type": "application/json"
  119. }
  120. payload = {"deviceId": "SOMNUS20"} # Or whichever ID you want
  121.  
  122. save_dir = os.path.expanduser("~/Desktop/certificates")
  123. os.makedirs(save_dir, exist_ok=True)
  124. print(f"[DEBUG] Certificate files will be saved to: {save_dir}")
  125.  
  126. try:
  127. print(f"[DEBUG] Sending POST request to {url}...")
  128. response = requests.post(url, json=payload, headers=headers)
  129. print(f"[DEBUG] Certificate fetch status code: {response.status_code}")
  130.  
  131. if response.status_code != 200:
  132. print("[DEBUG] Non-200 response. Failed to fetch certs.")
  133. TxCharacteristic.instance.send_tx("[DEBUG] Cert fetch failed")
  134. return False
  135.  
  136. data = response.json()
  137. print(f"[DEBUG] Certificate response: {data}")
  138.  
  139. certificate_url = data.get("data", {}).get("certificate_url")
  140. private_url = data.get("data", {}).get("private_url")
  141.  
  142. if not certificate_url or not private_url:
  143. print("[DEBUG] One or both URLs are missing in API response.")
  144. TxCharacteristic.instance.send_tx("[DEBUG] Invalid cert URL response")
  145. return False
  146.  
  147. def download_file(file_url, file_name):
  148. path = os.path.join(save_dir, file_name)
  149. print(f"[DEBUG] Downloading {file_name} from {file_url}")
  150. try:
  151. with requests.get(file_url, stream=True) as r:
  152. r.raise_for_status()
  153. with open(path, 'wb') as f:
  154. for chunk in r.iter_content(8192):
  155. f.write(chunk)
  156. print(f"[DEBUG] Saved: {path}")
  157. except Exception as e:
  158. print(f"[DEBUG] Failed to download {file_name}:", e)
  159. TxCharacteristic.instance.send_tx(f"[DEBUG] Failed to download {file_name}")
  160. raise
  161.  
  162. download_file(certificate_url, "certificate.pem")
  163. download_file(private_url, "private.key")
  164.  
  165. TxCharacteristic.instance.send_tx("[DEBUG] Certificates downloaded successfully")
  166. return True
  167.  
  168. except Exception as e:
  169. print("[DEBUG] Exception in cert download:", e)
  170. TxCharacteristic.instance.send_tx("[DEBUG] Cert download error")
  171. return False
  172.  
  173. ##################################################################
  174. # AWS IoT Connection + Receiving Messages
  175. ##################################################################
  176.  
  177. def connect_to_aws_iot():
  178. print("[DEBUG] Starting AWS IoT MQTT connection...")
  179. cert_path = os.path.expanduser("~/Desktop/certificates/certificate.pem")
  180. key_path = os.path.expanduser("~/Desktop/certificates/private.key")
  181. ca_path = "/home/somnus/Desktop/certificates/AmazonRootCA1.pem"
  182. endpoint = "a2w3ko3hrweita-ats.iot.ap-south-1.amazonaws.com" # Replace with your actual endpoint from AWS
  183.  
  184. # Updated so we subscribe to "SOMNUS20"
  185. subscribe_topic = "SOMNUS20"
  186.  
  187. def on_connect(client, userdata, flags, rc):
  188. print(f"[DEBUG] AWS IoT connected with result code {rc}")
  189. if rc == 0:
  190. TxCharacteristic.instance.send_tx("[DEBUG] AWS IoT connected")
  191. # Subscribe to SOMNUS20
  192. client.subscribe(subscribe_topic, qos=1)
  193. print(f"[DEBUG] Subscribed to topic: {subscribe_topic}")
  194. else:
  195. TxCharacteristic.instance.send_tx(f"[DEBUG] AWS IoT connection failed: {rc}")
  196.  
  197. def on_disconnect(client, userdata, rc):
  198. print(f"[DEBUG] AWS IoT disconnected with result code {rc}")
  199.  
  200. def on_message(client, userdata, msg):
  201. print(f"[DEBUG] Received MQTT message on {msg.topic}")
  202. payload_str = msg.payload.decode('utf-8', errors='replace')
  203. print(f"[DEBUG] Raw message payload: {payload_str}")
  204. try:
  205. data = json.loads(payload_str)
  206. print(f"[DEBUG] JSON object: {data}")
  207. # Send a quick BLE message that we received something
  208. TxCharacteristic.instance.send_tx("[DEBUG] Received AWS msg!")
  209. except json.JSONDecodeError:
  210. print("[DEBUG] Payload not valid JSON")
  211.  
  212. client = mqtt.Client()
  213. client.on_connect = on_connect
  214. client.on_disconnect = on_disconnect
  215. client.on_message = on_message
  216.  
  217. try:
  218. for path in [cert_path, key_path, ca_path]:
  219. if not os.path.exists(path):
  220. print(f"[DEBUG] Missing cert file: {path}")
  221. TxCharacteristic.instance.send_tx(f"[DEBUG] Missing file: {path}")
  222. return
  223.  
  224. client.tls_set(
  225. ca_certs=ca_path,
  226. certfile=cert_path,
  227. keyfile=key_path,
  228. tls_version=ssl.PROTOCOL_TLSv1_2
  229. )
  230. client.tls_insecure_set(False)
  231.  
  232. client.connect(endpoint, 8883, keepalive=60)
  233. client.loop_start()
  234. time.sleep(5)
  235. # We'll keep the loop running so we can receive messages from AWS
  236.  
  237. print("[DEBUG] AWS IoT loop started. Listening for inbound messages...")
  238. except Exception as e:
  239. print("[DEBUG] AWS IoT connection error:", e)
  240. TxCharacteristic.instance.send_tx("[DEBUG] AWS IoT connect error")
  241.  
  242. ##################################################################
  243. # BLE Characteristics
  244. ##################################################################
  245.  
  246. class TxCharacteristic(Characteristic):
  247. instance = None
  248. def __init__(self, bus, index, service):
  249. super().__init__(bus, index, UART_TX_CHARACTERISTIC_UUID, ['notify'], service)
  250. self.notifying = False
  251. TxCharacteristic.instance = self
  252.  
  253. def send_tx(self, msg):
  254. if not self.notifying:
  255. return
  256. try:
  257. encoded = msg.encode('utf-8')
  258. chunks = [encoded[i:i+20] for i in range(0, len(encoded), 20)]
  259. for chunk in chunks:
  260. value = [dbus.Byte(b) for b in chunk]
  261. self.PropertiesChanged(GATT_CHRC_IFACE, {'Value': value}, [])
  262. except Exception as e:
  263. print("[DEBUG] TX send error:", e)
  264.  
  265. def StartNotify(self):
  266. print("[DEBUG] TX notifications started")
  267. self.notifying = True
  268.  
  269. def StopNotify(self):
  270. print("[DEBUG] TX notifications stopped")
  271. self.notifying = False
  272.  
  273. class RxCharacteristic(Characteristic):
  274. def __init__(self, bus, index, service, tx_char):
  275. super().__init__(bus, index, UART_RX_CHARACTERISTIC_UUID, ['write'], service)
  276. self.tx_char = tx_char
  277.  
  278. def WriteValue(self, value, options):
  279. global current_ssid, current_password, user_token
  280. msg = bytearray(value).decode().strip()
  281. print(f"[DEBUG] Received from phone: {msg}")
  282. try:
  283. data = json.loads(msg)
  284. except Exception as e:
  285. print("[DEBUG] JSON parse error:", e)
  286. self.tx_char.send_tx("Bad JSON format")
  287. return
  288.  
  289. action = data.get("action")
  290. if action == "SCAN":
  291. self.tx_char.send_tx("[DEBUG] SCAN command received")
  292. networks = scan_wifi()
  293. self.tx_char.send_tx(json.dumps(networks))
  294. elif action == "CONNECT_WIFI":
  295. ssid = data.get("ssid")
  296. password = data.get("password")
  297. token = data.get("user_token")
  298. if ssid and password and token:
  299. self.tx_char.send_tx(f"[DEBUG] Connecting to {ssid}")
  300. connect_to_wifi(ssid, password, token)
  301. else:
  302. self.tx_char.send_tx("Missing fields in JSON")
  303. else:
  304. self.tx_char.send_tx("Unknown action")
  305.  
  306. ##################################################################
  307. # BLE Service / App / Advertisement
  308. ##################################################################
  309.  
  310. class UartService(Service):
  311. def __init__(self, bus, index):
  312. super().__init__(bus, index, UART_SERVICE_UUID, True)
  313. self.tx = TxCharacteristic(bus, 0, self)
  314. self.rx = RxCharacteristic(bus, 1, self, self.tx)
  315. self.add_characteristic(self.tx)
  316. self.add_characteristic(self.rx)
  317.  
  318. class Application(dbus.service.Object):
  319. def __init__(self, bus):
  320. self.path = '/'
  321. self.services = []
  322. super().__init__(bus, self.path)
  323.  
  324. def get_path(self):
  325. return dbus.ObjectPath(self.path)
  326.  
  327. def add_service(self, service):
  328. self.services.append(service)
  329.  
  330. @dbus.service.method(DBUS_OM_IFACE, out_signature='a{oa{sa{sv}}}')
  331. def GetManagedObjects(self):
  332. result = {}
  333. for service in self.services:
  334. result[service.get_path()] = service.get_properties()
  335. for chrc in service.get_characteristics():
  336. result[chrc.get_path()] = chrc.get_properties()
  337. return result
  338.  
  339. class UartApplication(Application):
  340. def __init__(self, bus):
  341. super().__init__(bus)
  342. self.add_service(UartService(bus, 0))
  343.  
  344. class UartAdvertisement(Advertisement):
  345. def __init__(self, bus, index):
  346. super().__init__(bus, index, 'peripheral')
  347. self.add_service_uuid(UART_SERVICE_UUID)
  348. self.add_local_name(LOCAL_NAME)
  349. self.include_tx_power = True
  350.  
  351. def find_adapter(bus):
  352. remote_om = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, '/'), DBUS_OM_IFACE)
  353. objects = remote_om.GetManagedObjects()
  354. for o, props in objects.items():
  355. if LE_ADVERTISING_MANAGER_IFACE in props and GATT_MANAGER_IFACE in props:
  356. print(f"[DEBUG] Found BLE adapter: {o}")
  357. return o
  358. print("[DEBUG] No BLE adapter found.")
  359. return None
  360.  
  361. ##################################################################
  362. # MAIN
  363. ##################################################################
  364.  
  365. def main():
  366. global mainloop
  367. dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
  368. bus = dbus.SystemBus()
  369.  
  370. adapter = find_adapter(bus)
  371. if not adapter:
  372. return
  373.  
  374. service_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), GATT_MANAGER_IFACE)
  375. ad_manager = dbus.Interface(bus.get_object(BLUEZ_SERVICE_NAME, adapter), LE_ADVERTISING_MANAGER_IFACE)
  376.  
  377. app = UartApplication(bus)
  378. adv = UartAdvertisement(bus, 0)
  379. mainloop = GLib.MainLoop()
  380.  
  381. print("[DEBUG] Registering GATT app and BLE advertisement...")
  382. service_manager.RegisterApplication(app.get_path(), {},
  383. reply_handler=register_app_cb,
  384. error_handler=register_app_error_cb)
  385. ad_manager.RegisterAdvertisement(adv.get_path(), {},
  386. reply_handler=register_ad_cb,
  387. error_handler=register_ad_error_cb)
  388.  
  389. def check_and_reconnect():
  390. if not check_wifi_connection():
  391. TxCharacteristic.instance.send_tx("[DEBUG] Wi-Fi lost. Reconnecting...")
  392. connect_to_wifi(current_ssid, current_password, user_token)
  393. return True
  394.  
  395. GLib.timeout_add_seconds(60, check_and_reconnect)
  396.  
  397. print("[DEBUG] Entering mainloop. Ready to receive BLE commands.")
  398. try:
  399. mainloop.run()
  400. except KeyboardInterrupt:
  401. print("[DEBUG] Interrupted. Releasing BLE advertisement...")
  402. adv.Release()
  403.  
  404. if __name__ == '__main__':
  405. main()
  406.  
  407.  
  408.  
Add Comment
Please, Sign In to add comment