Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import time
- import machine
- import network
- import esp
- import usocket
- try:
- from typing import Tuple, Callable, List, Optional
- from builtins import const
- except:
- pass
- esp.osdebug('*', esp.LOG_VERBOSE)
- class logging(object):
- class Logger(object):
- def __init__(self, tag:str):
- self.tag = tag
- def debug(self, fmt:str, *args):
- print('[DEBUG] ', end='')
- print(fmt % args)
- def info(self, fmt:str, *args):
- print('[INFO] ', end='')
- print(fmt % args)
- def warn(self, fmt:str, *args):
- print('[WARN] ', end='')
- print(fmt % args)
- def error(self, fmt:str, *args):
- print('[ERROR] ', end='')
- print(fmt % args)
- class GSMError(Exception):
- def __init__(self, message:str):
- super().__init__(message)
- class GSM(object):
- "Controls u-blox GSM Module"
- CR = const(0x0d)
- LF = const(0x0a)
- SOCKET_TCP = const(0)
- SOCKET_UDP = const(1)
- MAX_CONNECT_ID = const(12)
- MAX_SOCKET_DATA_SIZE = const(1460)
- def __init__(self):
- self.__l = logging.Logger('GSM')
- self.__uart = machine.UART(2, tx=17, rx=16, baudrate=115200)
- self.__urcs = None #type: Optional[List[bytes]]
- self.__ppp = None #type: network.PPP
- def initialize(self) -> None:
- "Initialize I/O ports and peripherals to communicate with the module."
- self.__l.debug('initialize')
- self.__uart.init(baudrate=115200, timeout=30)
- def reset(self) -> bool:
- "Turn on or reset the module and wait until the LTE commucation gets available."
- self.__urcs = []
- self.write_command(b'~+++')
- if not self.write_command_wait(b'AT', b'OK'): # Check if the module can accept commands.
- self.__l.info("The module did not respond.")
- return False
- if not self.write_command_wait(b'ATZ', b'OK'): # Reset module.
- self.__l.info("Failed to reset the module.")
- return False
- time.sleep_ms(100)
- if not self.write_command_wait(b'ATE0', b'OK'): # Disable command echo
- self.__l.info("Failed to disable command echo.")
- return False
- if not self.write_command_wait(b'AT+CFUN=1', b'OK'): # Enable RF.
- self.__l.info("Failed to enable RF.")
- return False
- buffer = bytearray(1024)
- self.__l.info('Waiting SIM goes active...')
- while True:
- result, responses = self.execute_command(b'AT+CPIN?', buffer, timeout=1000)
- self.__l.info('AT+CPIN result={0}, response={1}'.format(result, len(responses)))
- if len(responses) == 0: return False
- if result:
- return True
- def get_IMEI(self) -> Optional[str]:
- "Gets International Mobile Equipment Identity (IMEI)"
- response = self.execute_command_single_response(b'AT+GSN')
- return str(response, 'utf-8') if response is not None else None
- def get_IMSI(self) -> Optional[str]:
- "Gets International Mobile Subscriber Identity (IMSI)"
- response = self.execute_command_single_response(b'AT+CIMI')
- return str(response, 'utf-8') if response is not None else None
- def get_phone_number(self) -> Optional[str]:
- "Gets phone number (subscriber number)"
- response = self.execute_command_single_response(b'AT+CNUM', b'+CNUM:')
- return str(response[6:], 'utf-8') if response is not None else None
- def get_RSSI(self) -> Optional[Tuple[int,int]]:
- "Gets received signal strength indication (RSSI)"
- response = self.execute_command_single_response(b'AT+CSQ', b'+CSQ:')
- if response is None:
- return None
- try:
- s = str(response[5:], 'utf-8')
- rssi, ber = s.split(',', 2)
- return (int(rssi), int(ber))
- except ValueError:
- return None
- def activate(self, access_point:str, user:str, password:str, timeout:int=None) -> bool:
- self.__l.info("Activating network...")
- while True:
- # Read network registration status.
- response = self.execute_command_single_response(b'AT+CGREG?', b'+CGREG:', timeout)
- if response is None:
- raise GSMError('Failed to get registration status.')
- s = str(response, 'utf-8')
- self.__l.debug('AT+CGREG?:%s', s)
- n, stat = s.split(',')[:2]
- if stat == '4': # Not registered and not searching (0), or unknown (4).
- raise GSMError('Invalid registration status.')
- elif stat == '0':
- time.sleep_ms(1)
- elif stat == '1' or stat == '5': # Registered.
- break
- # No action
- if not self.write_command_wait(b'AT&D0', b'OK', timeout):
- return False
- # Enable verbose error
- if not self.write_command_wait(b'AT+CMEE=2', b'OK', timeout):
- return False
- # Define a PDP context
- command = bytes('AT+CGDCONT=1,"IP","{0}"'.format(access_point), 'utf-8')
- if not self.write_command_wait(command, b'OK', timeout):
- return False
- # Activate a PDP context
- if not self.write_command_wait(b'AT+CGACT=1', b'OK', timeout):
- return False
- if not self.write_command_wait(b'AT+CGACT?', b'OK', timeout):
- return False
- # Enter to PPP mode
- if not self.write_command_wait(b'AT+CGDATA="PPP",1', b'CONNECT', timeout):
- return False
- # Construct PPP
- self.__ppp = network.PPP(self.__uart, use_peer_dns=True, auth_method="PAP", user_name=user, password=password)
- return True
- def write(self, s:bytes) -> None:
- self.__l.debug('<- ' + str(s, 'utf-8'))
- self.__uart.write(s)
- def read(self, length:int) -> bytes:
- return self.__uart.read(length)
- def write_command(self, command:bytes) -> None:
- self.__l.debug('<- %s', command)
- self.__uart.write(command)
- self.__uart.write(b'\r')
- def write_command_wait(self, command:bytes, expected_response:bytes, timeout:int=None) -> bool:
- self.write_command(command)
- return self.wait_response(expected_response, timeout=timeout) is not None
- def read_response_into(self, buffer:bytearray, offset:int=0, timeout:int=None) -> int:
- while True:
- length = self.__read_response_into(buffer=buffer, offset=offset, timeout=timeout)
- mv = memoryview(buffer)
- if length is not None and length >= 8 and mv[0:8] == b"+QIURC: ":
- #self.__l.info("URC: {0}".format(str(mv[:length], 'utf-8')))
- if length > 17 and mv[8:16] == b'"closed"':
- connect_id = int(str(mv[17:length], 'utf-8'))
- self.__l.info("Connection {0} closed".format(connect_id))
- self.__urcs.append( ("closed", connect_id) )
- continue
- return length
- def __read_response_into(self, buffer:bytearray, offset:int=0, timeout:int=None) -> Optional[int]:
- buffer_length = len(buffer)
- response_length = 0
- state = 0
- start_time_ms = time.ticks_ms()
- cb = bytearray(1)
- while True:
- n = self.__uart.readinto(cb) #type: int
- if n == 0:
- if timeout is not None and (time.ticks_ms()-start_time_ms) >= timeout:
- return None
- time.sleep_ms(1)
- continue
- c = cb[0]
- #self.__l.debug('S:%d R:%c', state, c)
- if state == 0 and c == GSM.CR:
- state = 1
- elif state == 1 and c == GSM.LF:
- state = 2
- elif state == 1 and c == GSM.CR:
- state = 1
- elif state == 1 and c != GSM.LF:
- response_length = 0
- state = 0
- elif state == 2 and c == GSM.CR:
- if response_length == 0:
- state = 1 # Maybe there is another corresponding CR-LF followed by actual response data. So we have to return to state 1.
- else:
- state = 4
- elif state == 2 and c != GSM.CR:
- buffer[offset+response_length] = c
- response_length += 1
- if offset+response_length == buffer_length:
- state = 3
- elif state == 3 and c == GSM.CR:
- state = 4
- elif state == 4 and c == GSM.LF:
- return response_length
- def __process_remaining_urcs(self, timeout:int=None):
- for urc_type, urc_params in self.__urcs:
- if urc_type == 'closed':
- self.socket_close(urc_params, timeout=timeout)
- self.__urcs.clear()
- def wait_response(self, expected_response:bytes, max_response_size:int=1024, timeout:int=None) -> bytes:
- self.__l.debug('wait_response: target=%s', expected_response)
- response = bytearray(max_response_size)
- expected_length = len(expected_response)
- while True:
- length = self.read_response_into(response, timeout=timeout)
- if length is None: return None
- self.__l.debug("wait_response: response=%s", response[:length])
- if length >= expected_length and response[:expected_length] == expected_response:
- return response[:length]
- def wait_response_into(self, expected_response:bytes, response_buffer:bytearray, timeout:int=None) -> memoryview:
- self.__l.debug('wait_response_into: target=%s', expected_response)
- expected_length = len(expected_response)
- mv = memoryview(response_buffer)
- while True:
- length = self.read_response_into(response_buffer, timeout=timeout)
- if length is None: return None
- self.__l.debug("wait_response_into: response=%s", str(mv[:length], 'utf-8'))
- if length >= expected_length and mv[:expected_length] == expected_response:
- return mv[:length]
- def wait_prompt(self, expected_prompt:bytes, timeout:int=None) -> bool:
- prompt_length = len(expected_prompt)
- index = 0
- start_time_ms = time.ticks_ms()
- while True:
- c = self.__uart.readchar()
- if c < 0:
- if time.ticks_ms() - start_time_ms > timeout:
- return False
- asyncio.sleep_ms(1)
- continue
- if expected_prompt[index] == c:
- index += 1
- if index == prompt_length:
- return True
- else:
- index = 0
- def execute_command(self, command:bytes, response_buffer:bytearray, index:int=0, expected_response_predicate:Callable[[memoryview],bool]=None, expected_response_list:List[bytes]=[b'OK'], timeout:int=None) -> Tuple[bool, List[memoryview]]:
- assert expected_response_predicate is not None or expected_response_list is not None
- if expected_response_predicate is None:
- expected_response_predicate = lambda mv: mv in expected_response_list
- self.write_command(command)
- buffer_length = len(response_buffer)
- responses = []
- mv = memoryview(response_buffer)
- while True:
- length = self.read_response_into(response_buffer, index, timeout=timeout)
- if length is None:
- return (False, responses)
- response = mv[index:index+length]
- responses.append(response)
- if expected_response_predicate(response):
- return (True, responses)
- index += length
- def execute_command_single_response(self, command:bytes, starts_with:bytes=None, timeout:int=None) -> bytes:
- buffer = bytearray(1024)
- result, responses = self.execute_command(command, buffer, timeout=timeout)
- if not result: return None
- starts_with_length = len(starts_with) if starts_with is not None else 0
- for response in responses:
- if starts_with_length == 0 and len(response) > 0:
- response = bytes(response)
- self.__l.debug('-> %s', response)
- return response
- if starts_with_length > 0 and len(response) >= starts_with_length and response[:starts_with_length] == starts_with:
- response = bytes(response)
- self.__l.debug('-> %s', response)
- return response
- return None
- g = GSM()
- g.initialize()
- g.reset()
- g.activate('soracom.io', 'sora', 'sora')
- g.__ppp.active(True)
- # Check IP address of the interface
- # g.__ppp.ifconfig()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement