Advertisement
Guest User

Untitled

a guest
Sep 15th, 2014
257
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import udp2p, os, logging, pickle, threading, hashlib
  2.  
  3. class IncomingUpdate:
  4.     def __init__(self,mtime,txt):
  5.         self.modified = mtime
  6.         self.body = txt
  7.  
  8. class UpdateRequest:
  9.     def __init__(self,mtime):
  10.         self.modified = mtime
  11.  
  12. class PersistentTextNode:
  13.     filename = "text.txt"
  14.     MAGIC = b"PICKLE"
  15.     LOG_FORMAT = '[{name:>10}]{levelname:>8}: {message}'
  16.     def __init__(self):
  17.         self.last_checked = -1
  18.         self.known_text = None
  19.         self.log = logging.getLogger("persistext")
  20.         self.log_handler = logging.StreamHandler()
  21.         self.log_formatter = logging.Formatter(self.LOG_FORMAT,style='{')
  22.         self.log_handler.setFormatter(self.log_formatter)
  23.         self.log.addHandler(self.log_handler)
  24.         self.log.setLevel(logging.INFO)
  25.         self.on_load_update = udp2p.Event()
  26.         self.on_receive_update = udp2p.Event()
  27.         self.on_received_pickle = udp2p.Event()
  28.         self.on_need_update = udp2p.Event()
  29.         self.on_outgoing_update = udp2p.Event()
  30.         self.on_merge = udp2p.Event()
  31.         self.on_store_update = udp2p.Event()
  32.         self.send_datagram = udp2p.Event()
  33.         self.on_received_pickle.handlers.add(self.handle_pickle)
  34.         self.on_receive_update.handlers.add(self.store_update)
  35.         self.on_need_update.handlers.add(self.send_object)
  36.         self.on_outgoing_update.handlers.add(self.send_object)
  37.     def load_update(self):
  38.         # get modified time of text file.
  39.         original_hash = None
  40.         if self.known_text is not None:
  41.             n = hashlib.md5()
  42.             n.update(self.known_text)
  43.             original_hash = n.digest()
  44.         with open(self.filename,'rb') as f:
  45.             self.known_text = f.read()
  46.         n = hashlib.md5()
  47.         n.update(self.known_text)
  48.         if n.digest()!=original_hash:
  49.             self.last_checked = os.path.getmtime(self.filename)
  50.             self.log.info("Successfully updated text ({} @ {})".format(self.last_checked,self.known_text[:20]))
  51.             self.on_load_update(IncomingUpdate(self.last_checked,self.known_text))
  52.     def store_update(self,incoming_update):
  53.         with open(self.filename,'wb') as f:
  54.             f.write(incoming_update.body)
  55.         os.utime(self.filename,(os.path.getatime(self.filename),incoming_update.modified))
  56.         self.log.info("Successfully stored text ({} @ {})".format(incoming_update.modified,incoming_update.body[:20]))
  57.         self.on_store_update(self,incoming_update)
  58.     def receive_update(self,incoming_update):
  59.         self.known_text = incoming_update.body
  60.         self.last_checked = incoming_update.modified
  61.         self.log.info("Received update to {}".format(incoming_update.modified))
  62.         self.on_receive_update(incoming_update)
  63.     def handle_datagram(self,dgram):
  64.         if dgram.data[:len(self.MAGIC)]==self.MAGIC:
  65.             d = pickle.loads(dgram.data[len(self.MAGIC):])
  66.             self.on_received_pickle(d,dgram)
  67.     def handle_pickle(self,obj,dgram):
  68.         self.log.info("Received {} object. ( 0x{:X} )".format(type(obj).__name__,id(obj)))
  69.         if isinstance(obj,IncomingUpdate):
  70.             if obj.modified>self.last_checked:
  71.                 self.receive_update(obj)
  72.             elif obj.modified<self.last_checked:
  73.                 if self.should_merge(obj,dgram):
  74.                     self.log.info("Attempting merge to {}".format(obj.modified))
  75.                     try:
  76.                         self.merge(obj)
  77.                     except Exception as e:
  78.                         self.log.exception("Error while merging",exc_info=e)
  79.                 else:
  80.                     self.log.info("Not merging to {}".format(obj.modified))
  81.         elif isinstance(obj,UpdateRequest):
  82.             # the other person was offline
  83.             if obj.modified<self.last_checked:
  84.                 self.send_update()
  85.             elif obj.modified>self.last_checked:
  86.                 self.request_update()
  87.     def should_merge(self,update,dgram):
  88.         return False
  89.     def merge(self,update):
  90.         # merge the previous update
  91.         # with current info.
  92.         assert False
  93.         self.on_merge(update)
  94.     def send_object(self,obj):
  95.         bts = pickle.dumps(obj)
  96.         self.log.info("Sending {} object 0x{:X}".format(type(obj).__name__,id(obj)))
  97.         self.send_datagram(udp2p.Datagram(self.MAGIC + bts))
  98.     def send_update(self):
  99.         update = IncomingUpdate(self.last_checked,self.known_text)
  100.         self.log.info("Sending update {}".format(update.modified))
  101.         self.on_outgoing_update(update)
  102.     def request_update(self):
  103.         request = UpdateRequest(self.last_checked)
  104.         self.log.info("Requesting update since {}".format(request.modified))
  105.         self.on_need_update(request)
  106.     def listen_to(self,udpnode):
  107.         udpnode.on_complete_datagram.handlers.add(self.handle_datagram)
  108.         self.send_datagram.handlers.add(udpnode.send)
  109.  
  110. class FileWatcher(threading.Thread):
  111.     FREQUENCY = 5#(seconds)
  112.     def __init__(self,filename):
  113.         threading.Thread.__init__(self,daemon=True)
  114.         self.last_checked = -1
  115.         self.timer = threading.Event()
  116.         self.filename = filename
  117.         self.on_update = udp2p.Event()
  118.     def run(self):
  119.         while True:
  120.             mt = os.path.getmtime(self.filename)
  121.             if mt >self.last_checked:
  122.                 self.last_checked = mt
  123.                 self.on_update()
  124.             self.timer.wait(self.FREQUENCY)
  125.  
  126. if __name__=="__main__":
  127.     FILENAME = 'text.txt'
  128.     MCAST = '224.0.0.0'
  129.     PORTNMBR = 500
  130.     udpnode = udp2p.UDP2P(MCAST,PORTNMBR)
  131.     persistent_text = PersistentTextNode()
  132.     persistent_text.filename = FILENAME
  133.     persistent_text.listen_to(udpnode)
  134.     persistent_text.on_load_update.handlers.add(persistent_text.send_object)
  135.     fw = FileWatcher(persistent_text.filename)
  136.     fw.on_update.handlers.add(persistent_text.load_update)
  137.     udpnode.start()
  138.     fw.start()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement