Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import udp2p, os, logging, pickle, threading, hashlib
- class IncomingUpdate:
- def __init__(self,mtime,txt):
- self.modified = mtime
- self.body = txt
- class UpdateRequest:
- def __init__(self,mtime):
- self.modified = mtime
- class PersistentTextNode:
- filename = "text.txt"
- MAGIC = b"PICKLE"
- LOG_FORMAT = '[{name:>10}]{levelname:>8}: {message}'
- def __init__(self):
- self.last_checked = -1
- self.known_text = None
- self.log = logging.getLogger("persistext")
- self.log_handler = logging.StreamHandler()
- self.log_formatter = logging.Formatter(self.LOG_FORMAT,style='{')
- self.log_handler.setFormatter(self.log_formatter)
- self.log.addHandler(self.log_handler)
- self.log.setLevel(logging.INFO)
- self.on_load_update = udp2p.Event()
- self.on_receive_update = udp2p.Event()
- self.on_received_pickle = udp2p.Event()
- self.on_need_update = udp2p.Event()
- self.on_outgoing_update = udp2p.Event()
- self.on_merge = udp2p.Event()
- self.on_store_update = udp2p.Event()
- self.send_datagram = udp2p.Event()
- self.on_received_pickle.handlers.add(self.handle_pickle)
- self.on_receive_update.handlers.add(self.store_update)
- self.on_need_update.handlers.add(self.send_object)
- self.on_outgoing_update.handlers.add(self.send_object)
- def load_update(self):
- # get modified time of text file.
- original_hash = None
- if self.known_text is not None:
- n = hashlib.md5()
- n.update(self.known_text)
- original_hash = n.digest()
- with open(self.filename,'rb') as f:
- self.known_text = f.read()
- n = hashlib.md5()
- n.update(self.known_text)
- if n.digest()!=original_hash:
- self.last_checked = os.path.getmtime(self.filename)
- self.log.info("Successfully updated text ({} @ {})".format(self.last_checked,self.known_text[:20]))
- self.on_load_update(IncomingUpdate(self.last_checked,self.known_text))
- def store_update(self,incoming_update):
- with open(self.filename,'wb') as f:
- f.write(incoming_update.body)
- os.utime(self.filename,(os.path.getatime(self.filename),incoming_update.modified))
- self.log.info("Successfully stored text ({} @ {})".format(incoming_update.modified,incoming_update.body[:20]))
- self.on_store_update(self,incoming_update)
- def receive_update(self,incoming_update):
- self.known_text = incoming_update.body
- self.last_checked = incoming_update.modified
- self.log.info("Received update to {}".format(incoming_update.modified))
- self.on_receive_update(incoming_update)
- def handle_datagram(self,dgram):
- if dgram.data[:len(self.MAGIC)]==self.MAGIC:
- d = pickle.loads(dgram.data[len(self.MAGIC):])
- self.on_received_pickle(d,dgram)
- def handle_pickle(self,obj,dgram):
- self.log.info("Received {} object. ( 0x{:X} )".format(type(obj).__name__,id(obj)))
- if isinstance(obj,IncomingUpdate):
- if obj.modified>self.last_checked:
- self.receive_update(obj)
- elif obj.modified<self.last_checked:
- if self.should_merge(obj,dgram):
- self.log.info("Attempting merge to {}".format(obj.modified))
- try:
- self.merge(obj)
- except Exception as e:
- self.log.exception("Error while merging",exc_info=e)
- else:
- self.log.info("Not merging to {}".format(obj.modified))
- elif isinstance(obj,UpdateRequest):
- # the other person was offline
- if obj.modified<self.last_checked:
- self.send_update()
- elif obj.modified>self.last_checked:
- self.request_update()
- def should_merge(self,update,dgram):
- return False
- def merge(self,update):
- # merge the previous update
- # with current info.
- assert False
- self.on_merge(update)
- def send_object(self,obj):
- bts = pickle.dumps(obj)
- self.log.info("Sending {} object 0x{:X}".format(type(obj).__name__,id(obj)))
- self.send_datagram(udp2p.Datagram(self.MAGIC + bts))
- def send_update(self):
- update = IncomingUpdate(self.last_checked,self.known_text)
- self.log.info("Sending update {}".format(update.modified))
- self.on_outgoing_update(update)
- def request_update(self):
- request = UpdateRequest(self.last_checked)
- self.log.info("Requesting update since {}".format(request.modified))
- self.on_need_update(request)
- def listen_to(self,udpnode):
- udpnode.on_complete_datagram.handlers.add(self.handle_datagram)
- self.send_datagram.handlers.add(udpnode.send)
- class FileWatcher(threading.Thread):
- FREQUENCY = 5#(seconds)
- def __init__(self,filename):
- threading.Thread.__init__(self,daemon=True)
- self.last_checked = -1
- self.timer = threading.Event()
- self.filename = filename
- self.on_update = udp2p.Event()
- def run(self):
- while True:
- mt = os.path.getmtime(self.filename)
- if mt >self.last_checked:
- self.last_checked = mt
- self.on_update()
- self.timer.wait(self.FREQUENCY)
- if __name__=="__main__":
- FILENAME = 'text.txt'
- MCAST = '224.0.0.0'
- PORTNMBR = 500
- udpnode = udp2p.UDP2P(MCAST,PORTNMBR)
- persistent_text = PersistentTextNode()
- persistent_text.filename = FILENAME
- persistent_text.listen_to(udpnode)
- persistent_text.on_load_update.handlers.add(persistent_text.send_object)
- fw = FileWatcher(persistent_text.filename)
- fw.on_update.handlers.add(persistent_text.load_update)
- udpnode.start()
- fw.start()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement