Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import serial
- import threading
- import time
- __author__ = 'SteinarrHrafn'
- """
- Example usage:
- from MoteinoBeta import MoteinoNetwork
- class MyNetwork(MoteinoNetwork):
- def __init__(self):
- MoteinoNetwork.__init__(self, port='COM50', baudrate=9600)
- def recieve(self, diction): # overwrite this to respond
- print diction
- mynetwork = MyNetwork()
- mynetwork.add_device('TestDevice', 0, "int Command;int Something;")
- mynetwork.start_listening()
- mynetwork.send({'Send2': 'TestDevice', 'Command': 123, 'Something': 456})
- """
- def dprint(s):
- if True:
- print s
- def _hexprints(n):
- if n > 255:
- raise ValueError('n too big for _hexprints')
- if n > 15:
- return hex(n)[2:]
- else:
- return '0' + hex(n)[2:]
- def _hex2dec(s):
- if len(s) > 2:
- raise ValueError('s too long for _hex2dec')
- else:
- # debug_print("S: " + s)
- return int(s, base=16)
- class Byte:
- """
- A class describing the byte datatype
- """
- NofBytes = 1
- def __init__(self):
- pass
- @staticmethod
- def hexprints(i=0):
- if i > 2**8:
- raise Exception("Of stor tala til ad komast fyrir i byte")
- return _hexprints(i)
- @staticmethod
- def hex2dec(s):
- return _hex2dec(s)
- class Int:
- """
- A class describing the int datatype ATTENTION! this is actually unsigned int
- """
- NofBytes = 2
- def __init__(self):
- pass
- @staticmethod
- def hexprints(i=0):
- if i > 2**16:
- raise Exception("Of stor tala til ad komast fyrir i int")
- return _hexprints(i % 256) + _hexprints(i/2**8)
- @staticmethod
- def hex2dec(s):
- return _hex2dec(s[2:4])*2**8 + _hex2dec(s[:2])
- class Array:
- """
- A class to describe the array datatype, requires the subtype to be defined
- """
- def __init__(self, subtype, n):
- self.SubType = subtype
- self.N = n
- self.NofBytes = subtype.NofBytes*n
- def hexprints(self, l=list()):
- returner = str()
- while len(l) < self.N:
- l.append(0)
- for L in l:
- returner += self.SubType.hexprints(L)
- return returner
- def hex2dec(self, s):
- returner = list()
- for i in range(self.N):
- returner.append(self.SubType.hex2dec(s[:self.SubType.NofBytes*2]))
- s = s[self.SubType.NofBytes:]
- return returner
- # a dictionary of known datatypes to more easily call them
- types = {
- 'byte': Byte,
- 'int': Int,
- }
- class Struct:
- """
- This is a class for parsing a struct through a serial port
- example:
- mystruct = Struct( "int a;"
- "int b;")
- send_this_2_serial = mystruct.encode({'a': 1, 'b': 2])
- incoming = mystruct.decode(str_from_serial)
- """
- def __init__(self, structstring):
- self.Parts = list()
- self.NofBytes = 0
- lines = structstring.rstrip(';').split(';') # remove the last ';' and split by the other ones
- for line in lines:
- temp = line.split() # split by whitespaces
- if not len(temp) == 2: # each line should always contain 2 words
- raise ValueError(temp)
- if '[' in temp[1]: # if we are dealing with an array
- ttemp = temp[1].split('[')
- self.Parts.append((Array(types[temp[0]], int(ttemp[1][:-1])), ttemp[0]))
- else:
- self.Parts.append((types[temp[0]], temp[1]))
- def encode(self, values_dict):
- """
- This function will encode the struct into a HEX string.
- Not all values of the struct must be contained in values_dict,
- those that are not present will be assumed to be 0
- :param values_dict: dict
- :return: str
- """
- returner = str()
- for (Type, Name) in self.Parts:
- if Name in values_dict:
- returner += Type.hexprints(values_dict[Name])
- else:
- returner += Type.hexprints() # hexprints() assumes value is 0
- return returner
- def decode(self, s):
- """
- This function will decode the struct recieved as a HEX string and return
- a dict with the corresponding values.
- The input string must be sufficiently long to contain the entire struct
- :param s: str
- :return: dict
- """
- returner = dict()
- # debug_print("S: \"" + s + "\"")
- # debug_print("Parts: " + str(self.Parts))
- for (Type, Name) in self.Parts:
- returner[Name] = Type.hex2dec(s[:2*Type.NofBytes])
- s = s[2*Type.NofBytes:]
- return returner
- class Device:
- def __init__(self, parent, _id, structstring):
- self.ID = _id
- self.Struct = Struct(structstring)
- self.LastSent = dict()
- self.Parent = parent
- def send2radio(self, diction, expect_response=False):
- """
- :param diction: dict
- :param expect_response: bool
- :return:
- """
- self.Parent.ResponseExpected = expect_response
- self.Parent.send2radio(send2id=self.ID, payload=self.Struct.encode(diction))
- self.LastSent = diction
- def send2parent(self, payload):
- """
- :param payload: string
- :return: None
- """
- d = self.Struct.decode(payload)
- d['SenderID'] = self.ID
- d['Sender'] = self.Parent.inv_IDs[self.ID]
- self.Parent.recieve(d)
- def get_status(self):
- self.send2radio({'Command': self.Parent.Commands['Status']}, expect_response=True)
- class Send2Parent(threading.Thread):
- """
- This is the thread that interprets the struct recieved by the moteino network
- and runs the recieve function.
- """
- def __init__(self, parent, incoming):
- threading.Thread.__init__(self)
- self.Incoming = incoming
- self.Parent = parent
- def run(self):
- # debug_print("Recieved from BaseMoteino: " + self.Incoming)
- # The first byte from the hex string is the sender ID.
- # We use that to get a pointer to the sender (an instance of the Device class)
- sender = self.Parent.devices[self.Parent.inv_IDs[_hex2dec(self.Incoming[:2])]]
- # Information about ACK/no ACK is sent with: "FFF0" for no ACK and "FFF1" for ACK
- if self.Incoming[2:5] == "FFF":
- # print("ACK info recieved")
- if self.Incoming[5] == "0":
- self.Parent.NoACK({'Event': 'MoteinoNoAck',
- 'LastSent': dict(sender.LastSent)})
- if self.Parent.RadioIsBusy:
- self.Parent.RadioIsBusy = False
- else:
- # We recieved an ack and are very happy about it
- if self.Parent.ResponseExpected:
- self.Parent.RadioIsBusy = True
- else:
- self.Parent.RadioIsBusy = False
- else:
- sender.send2parent(self.Incoming[2:])
- class ListeningThread(threading.Thread):
- """
- A thread that listens to the Serial port. When something (that ends with a newline) is recieved
- the thread will start up the Send2Parent thread and go back to listening to the Serial port
- """
- def __init__(self, parent):
- threading.Thread.__init__(self)
- self.Parent = parent
- self.Listen2 = parent.Serial
- def run(self):
- while True:
- incoming = self.Listen2.readline().rstrip('\n') # nota [:-1]?
- fire = Send2Parent(self.Parent, incoming)
- fire.start()
- class MoteinoNetwork:
- """
- This is the class that user should inteface with for example:
- class MyNetwork(MoteinoNetwork):
- def __init__(self):
- MoteinoNetwork.__init__(self, port='COM50', baudrate=9600)
- def recieve(self, diction):
- print "We just recieved:"
- print diction
- print "from the moteino network"
- mynetwork = MyNetwork()
- mynetwork.add_device('TestDevice', 0, "int Command;int Something;")
- mynetwork.start_listening()
- mynetwork.send({'Send2': 'TestDevice', 'Command': 123, 'Something': 456})
- """
- def __init__(self,
- port,
- baudrate=115200,
- parity=serial.PARITY_NONE,
- stopbits=serial.STOPBITS_ONE,
- bytesize=serial.EIGHTBITS):
- self.Serial = serial.Serial(port=port, baudrate=baudrate, parity=parity, stopbits=stopbits, bytesize=bytesize)
- self.SerialLock = threading.Lock()
- self.RadioIsBusy = False
- self.ResponseExpected = False
- self.devices = dict()
- self.IDs = dict()
- self.inv_IDs = dict()
- def _wait_for_radio(self, max_wait=500):
- counter = 1
- # dprint("Waiting for radio....")
- t = time.time()
- while self.RadioIsBusy:
- counter += 1
- time.sleep(0.01)
- if counter > (max_wait/10):
- break
- dprint("I waited for radio for " + str((time.time() - t)*1000) + " ms")
- def send2radio(self, send2id, payload):
- with self.SerialLock:
- # debug_print("Waiting for radio....")
- self._wait_for_radio()
- self.Serial.write(_hexprints(send2id) + payload + '\n')
- self.RadioIsBusy = True
- # debug_print("Sending2Radio: " + _hexprints(send2id) + payload + '\n')
- def add_device(self, name, _id, structstring):
- self.IDs[name] = _id
- self.inv_IDs[_id] = name
- self.devices[name] = Device(parent=self,
- _id=_id,
- structstring=structstring)
- def recieve(self, diction):
- print diction
- def send(self, diction, expect_response=False):
- """
- This function should be called from top level script to send someting.
- Input parameter diction is a dict that contains what should be sent.
- The structure of diction depends on what device will recieve but diction
- must contain the key 'Send2'
- :param diction: dict
- :param expect_response: bool
- :return: Nothing
- """
- if 'Send2' not in diction:
- raise ValueError("Parameter diction di not contain a key \'Send2\'")
- self.devices[diction['Send2']].send2radio(diction, expect_response=expect_response)
- def start_listening(self): # starts a thread that listens to the serial port
- serial_listening_thread = ListeningThread(self)
- serial_listening_thread.start()
- def get_all_statuses(self):
- for d in self.devices.values():
- d.get_status()
Add Comment
Please, Sign In to add comment