Advertisement
Guest User

Untitled

a guest
Jul 19th, 2019
141
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 12.56 KB | None | 0 0
  1. import time
  2. import machine
  3. import network
  4. import esp
  5. import usocket
  6.  
  7. try:
  8. from typing import Tuple, Callable, List, Optional
  9. from builtins import const
  10. except:
  11. pass
  12.  
  13. esp.osdebug('*', esp.LOG_VERBOSE)
  14.  
  15. class logging(object):
  16. class Logger(object):
  17. def __init__(self, tag:str):
  18. self.tag = tag
  19. def debug(self, fmt:str, *args):
  20. print('[DEBUG] ', end='')
  21. print(fmt % args)
  22. def info(self, fmt:str, *args):
  23. print('[INFO] ', end='')
  24. print(fmt % args)
  25. def warn(self, fmt:str, *args):
  26. print('[WARN] ', end='')
  27. print(fmt % args)
  28. def error(self, fmt:str, *args):
  29. print('[ERROR] ', end='')
  30. print(fmt % args)
  31.  
  32. class GSMError(Exception):
  33. def __init__(self, message:str):
  34. super().__init__(message)
  35.  
  36. class GSM(object):
  37. "Controls u-blox GSM Module"
  38. CR = const(0x0d)
  39. LF = const(0x0a)
  40.  
  41. SOCKET_TCP = const(0)
  42. SOCKET_UDP = const(1)
  43.  
  44. MAX_CONNECT_ID = const(12)
  45. MAX_SOCKET_DATA_SIZE = const(1460)
  46.  
  47. def __init__(self):
  48. self.__l = logging.Logger('GSM')
  49.  
  50.  
  51. self.__uart = machine.UART(2, tx=17, rx=16, baudrate=115200)
  52. self.__urcs = None #type: Optional[List[bytes]]
  53. self.__ppp = None #type: network.PPP
  54.  
  55. def initialize(self) -> None:
  56. "Initialize I/O ports and peripherals to communicate with the module."
  57. self.__l.debug('initialize')
  58.  
  59. self.__uart.init(baudrate=115200, timeout=30)
  60.  
  61. def reset(self) -> bool:
  62. "Turn on or reset the module and wait until the LTE commucation gets available."
  63. self.__urcs = []
  64.  
  65. self.write_command(b'~+++')
  66. if not self.write_command_wait(b'AT', b'OK'): # Check if the module can accept commands.
  67. self.__l.info("The module did not respond.")
  68. return False
  69. if not self.write_command_wait(b'ATZ', b'OK'): # Reset module.
  70. self.__l.info("Failed to reset the module.")
  71. return False
  72. time.sleep_ms(100)
  73. if not self.write_command_wait(b'ATE0', b'OK'): # Disable command echo
  74. self.__l.info("Failed to disable command echo.")
  75. return False
  76. if not self.write_command_wait(b'AT+CFUN=1', b'OK'): # Enable RF.
  77. self.__l.info("Failed to enable RF.")
  78. return False
  79.  
  80. buffer = bytearray(1024)
  81.  
  82. self.__l.info('Waiting SIM goes active...')
  83. while True:
  84. result, responses = self.execute_command(b'AT+CPIN?', buffer, timeout=1000)
  85. self.__l.info('AT+CPIN result={0}, response={1}'.format(result, len(responses)))
  86. if len(responses) == 0: return False
  87. if result:
  88.  
  89. return True
  90.  
  91. def get_IMEI(self) -> Optional[str]:
  92. "Gets International Mobile Equipment Identity (IMEI)"
  93. response = self.execute_command_single_response(b'AT+GSN')
  94. return str(response, 'utf-8') if response is not None else None
  95.  
  96. def get_IMSI(self) -> Optional[str]:
  97. "Gets International Mobile Subscriber Identity (IMSI)"
  98. response = self.execute_command_single_response(b'AT+CIMI')
  99. return str(response, 'utf-8') if response is not None else None
  100.  
  101. def get_phone_number(self) -> Optional[str]:
  102. "Gets phone number (subscriber number)"
  103. response = self.execute_command_single_response(b'AT+CNUM', b'+CNUM:')
  104. return str(response[6:], 'utf-8') if response is not None else None
  105.  
  106. def get_RSSI(self) -> Optional[Tuple[int,int]]:
  107. "Gets received signal strength indication (RSSI)"
  108. response = self.execute_command_single_response(b'AT+CSQ', b'+CSQ:')
  109. if response is None:
  110. return None
  111. try:
  112. s = str(response[5:], 'utf-8')
  113. rssi, ber = s.split(',', 2)
  114. return (int(rssi), int(ber))
  115. except ValueError:
  116. return None
  117.  
  118. def activate(self, access_point:str, user:str, password:str, timeout:int=None) -> bool:
  119. self.__l.info("Activating network...")
  120. while True:
  121. # Read network registration status.
  122. response = self.execute_command_single_response(b'AT+CGREG?', b'+CGREG:', timeout)
  123. if response is None:
  124. raise GSMError('Failed to get registration status.')
  125. s = str(response, 'utf-8')
  126. self.__l.debug('AT+CGREG?:%s', s)
  127. n, stat = s.split(',')[:2]
  128. if stat == '4': # Not registered and not searching (0), or unknown (4).
  129. raise GSMError('Invalid registration status.')
  130. elif stat == '0':
  131. time.sleep_ms(1)
  132. elif stat == '1' or stat == '5': # Registered.
  133. break
  134. # No action
  135. if not self.write_command_wait(b'AT&D0', b'OK', timeout):
  136. return False
  137.  
  138. # Enable verbose error
  139. if not self.write_command_wait(b'AT+CMEE=2', b'OK', timeout):
  140. return False
  141.  
  142. # Define a PDP context
  143. command = bytes('AT+CGDCONT=1,"IP","{0}"'.format(access_point), 'utf-8')
  144. if not self.write_command_wait(command, b'OK', timeout):
  145. return False
  146.  
  147. # Activate a PDP context
  148. if not self.write_command_wait(b'AT+CGACT=1', b'OK', timeout):
  149. return False
  150. if not self.write_command_wait(b'AT+CGACT?', b'OK', timeout):
  151. return False
  152.  
  153. # Enter to PPP mode
  154. if not self.write_command_wait(b'AT+CGDATA="PPP",1', b'CONNECT', timeout):
  155. return False
  156.  
  157. # Construct PPP
  158. self.__ppp = network.PPP(self.__uart, use_peer_dns=True, auth_method="PAP", user_name=user, password=password)
  159.  
  160. return True
  161.  
  162.  
  163. def write(self, s:bytes) -> None:
  164. self.__l.debug('<- ' + str(s, 'utf-8'))
  165. self.__uart.write(s)
  166.  
  167. def read(self, length:int) -> bytes:
  168. return self.__uart.read(length)
  169.  
  170. def write_command(self, command:bytes) -> None:
  171. self.__l.debug('<- %s', command)
  172. self.__uart.write(command)
  173. self.__uart.write(b'\r')
  174.  
  175. def write_command_wait(self, command:bytes, expected_response:bytes, timeout:int=None) -> bool:
  176. self.write_command(command)
  177. return self.wait_response(expected_response, timeout=timeout) is not None
  178.  
  179.  
  180. def read_response_into(self, buffer:bytearray, offset:int=0, timeout:int=None) -> int:
  181. while True:
  182. length = self.__read_response_into(buffer=buffer, offset=offset, timeout=timeout)
  183. mv = memoryview(buffer)
  184. if length is not None and length >= 8 and mv[0:8] == b"+QIURC: ":
  185. #self.__l.info("URC: {0}".format(str(mv[:length], 'utf-8')))
  186. if length > 17 and mv[8:16] == b'"closed"':
  187. connect_id = int(str(mv[17:length], 'utf-8'))
  188. self.__l.info("Connection {0} closed".format(connect_id))
  189. self.__urcs.append( ("closed", connect_id) )
  190. continue
  191.  
  192. return length
  193.  
  194.  
  195. def __read_response_into(self, buffer:bytearray, offset:int=0, timeout:int=None) -> Optional[int]:
  196. buffer_length = len(buffer)
  197. response_length = 0
  198. state = 0
  199. start_time_ms = time.ticks_ms()
  200. cb = bytearray(1)
  201. while True:
  202. n = self.__uart.readinto(cb) #type: int
  203. if n == 0:
  204. if timeout is not None and (time.ticks_ms()-start_time_ms) >= timeout:
  205. return None
  206. time.sleep_ms(1)
  207. continue
  208. c = cb[0]
  209.  
  210. #self.__l.debug('S:%d R:%c', state, c)
  211. if state == 0 and c == GSM.CR:
  212. state = 1
  213. elif state == 1 and c == GSM.LF:
  214. state = 2
  215. elif state == 1 and c == GSM.CR:
  216. state = 1
  217. elif state == 1 and c != GSM.LF:
  218. response_length = 0
  219. state = 0
  220. elif state == 2 and c == GSM.CR:
  221. if response_length == 0:
  222. state = 1 # Maybe there is another corresponding CR-LF followed by actual response data. So we have to return to state 1.
  223. else:
  224. state = 4
  225. elif state == 2 and c != GSM.CR:
  226. buffer[offset+response_length] = c
  227. response_length += 1
  228. if offset+response_length == buffer_length:
  229. state = 3
  230. elif state == 3 and c == GSM.CR:
  231. state = 4
  232. elif state == 4 and c == GSM.LF:
  233. return response_length
  234.  
  235. def __process_remaining_urcs(self, timeout:int=None):
  236. for urc_type, urc_params in self.__urcs:
  237. if urc_type == 'closed':
  238. self.socket_close(urc_params, timeout=timeout)
  239. self.__urcs.clear()
  240.  
  241. def wait_response(self, expected_response:bytes, max_response_size:int=1024, timeout:int=None) -> bytes:
  242. self.__l.debug('wait_response: target=%s', expected_response)
  243. response = bytearray(max_response_size)
  244. expected_length = len(expected_response)
  245. while True:
  246. length = self.read_response_into(response, timeout=timeout)
  247. if length is None: return None
  248. self.__l.debug("wait_response: response=%s", response[:length])
  249. if length >= expected_length and response[:expected_length] == expected_response:
  250. return response[:length]
  251.  
  252. def wait_response_into(self, expected_response:bytes, response_buffer:bytearray, timeout:int=None) -> memoryview:
  253. self.__l.debug('wait_response_into: target=%s', expected_response)
  254. expected_length = len(expected_response)
  255. mv = memoryview(response_buffer)
  256. while True:
  257. length = self.read_response_into(response_buffer, timeout=timeout)
  258. if length is None: return None
  259. self.__l.debug("wait_response_into: response=%s", str(mv[:length], 'utf-8'))
  260. if length >= expected_length and mv[:expected_length] == expected_response:
  261. return mv[:length]
  262.  
  263. def wait_prompt(self, expected_prompt:bytes, timeout:int=None) -> bool:
  264. prompt_length = len(expected_prompt)
  265. index = 0
  266. start_time_ms = time.ticks_ms()
  267.  
  268. while True:
  269. c = self.__uart.readchar()
  270. if c < 0:
  271. if time.ticks_ms() - start_time_ms > timeout:
  272. return False
  273. asyncio.sleep_ms(1)
  274. continue
  275. if expected_prompt[index] == c:
  276. index += 1
  277. if index == prompt_length:
  278. return True
  279. else:
  280. index = 0
  281.  
  282. def execute_command(self, command:bytes, response_buffer:bytearray, index:int=0, expected_response_predicate:Callable[[memoryview],bool]=None, expected_response_list:List[bytes]=[b'OK'], timeout:int=None) -> Tuple[bool, List[memoryview]]:
  283. assert expected_response_predicate is not None or expected_response_list is not None
  284. if expected_response_predicate is None:
  285. expected_response_predicate = lambda mv: mv in expected_response_list
  286. self.write_command(command)
  287. buffer_length = len(response_buffer)
  288. responses = []
  289. mv = memoryview(response_buffer)
  290. while True:
  291. length = self.read_response_into(response_buffer, index, timeout=timeout)
  292. if length is None:
  293. return (False, responses)
  294. response = mv[index:index+length]
  295. responses.append(response)
  296. if expected_response_predicate(response):
  297. return (True, responses)
  298. index += length
  299.  
  300. def execute_command_single_response(self, command:bytes, starts_with:bytes=None, timeout:int=None) -> bytes:
  301. buffer = bytearray(1024)
  302. result, responses = self.execute_command(command, buffer, timeout=timeout)
  303. if not result: return None
  304. starts_with_length = len(starts_with) if starts_with is not None else 0
  305.  
  306. for response in responses:
  307. if starts_with_length == 0 and len(response) > 0:
  308. response = bytes(response)
  309. self.__l.debug('-> %s', response)
  310. return response
  311. if starts_with_length > 0 and len(response) >= starts_with_length and response[:starts_with_length] == starts_with:
  312. response = bytes(response)
  313. self.__l.debug('-> %s', response)
  314. return response
  315. return None
  316.  
  317. g = GSM()
  318. g.initialize()
  319. g.reset()
  320. g.activate('soracom.io', 'sora', 'sora')
  321. g.__ppp.active(True)
  322.  
  323. # Check IP address of the interface
  324. # g.__ppp.ifconfig()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement