SHOW:
|
|
- or go back to the newest paste.
1 | import udp2p, os, logging, pickle, threading, hashlib | |
2 | ||
3 | class IncomingUpdate: | |
4 | def __init__(self,mtime,txt): | |
5 | self.modified = mtime | |
6 | self.body = txt | |
7 | ||
8 | class UpdateRequest: | |
9 | def __init__(self,mtime): | |
10 | self.modified = mtime | |
11 | ||
12 | class PersistentTextNode: | |
13 | filename = "text.txt" | |
14 | MAGIC = b"PICKLE" | |
15 | LOG_FORMAT = '[{name:>10}]{levelname:>8}: {message}' | |
16 | def __init__(self): | |
17 | self.last_checked = -1 | |
18 | - | self.known_text = "" |
18 | + | self.known_text = None |
19 | self.log = logging.getLogger("persistext") | |
20 | self.log_handler = logging.StreamHandler() | |
21 | self.log_formatter = logging.Formatter(self.LOG_FORMAT,style='{') | |
22 | self.log_handler.setFormatter(self.log_formatter) | |
23 | self.log.addHandler(self.log_handler) | |
24 | self.log.setLevel(logging.INFO) | |
25 | self.on_load_update = udp2p.Event() | |
26 | self.on_receive_update = udp2p.Event() | |
27 | self.on_received_pickle = udp2p.Event() | |
28 | self.on_need_update = udp2p.Event() | |
29 | self.on_outgoing_update = udp2p.Event() | |
30 | self.on_merge = udp2p.Event() | |
31 | self.on_store_update = udp2p.Event() | |
32 | self.send_datagram = udp2p.Event() | |
33 | self.on_received_pickle.handlers.add(self.handle_pickle) | |
34 | self.on_receive_update.handlers.add(self.store_update) | |
35 | self.on_need_update.handlers.add(self.send_object) | |
36 | self.on_outgoing_update.handlers.add(self.send_object) | |
37 | def load_update(self): | |
38 | # get modified time of text file. | |
39 | original_hash = None | |
40 | if self.known_text is not None: | |
41 | - | original_hash = n.digest() |
41 | + | n = hashlib.md5() |
42 | - | with open(self.filename,'r') as f: |
42 | + | n.update(self.known_text) |
43 | original_hash = n.digest() | |
44 | with open(self.filename,'rb') as f: | |
45 | self.known_text = f.read() | |
46 | n = hashlib.md5() | |
47 | n.update(self.known_text) | |
48 | - | self.log.info("Successfully updated text ({} @ {:20})".format(self.last_checked,self.known_text)) |
48 | + | |
49 | self.last_checked = os.path.getmtime(self.filename) | |
50 | self.log.info("Successfully updated text ({} @ {})".format(self.last_checked,self.known_text[:20])) | |
51 | - | with open(self.filename,'w') as f: |
51 | + | |
52 | def store_update(self,incoming_update): | |
53 | with open(self.filename,'wb') as f: | |
54 | - | self.log.info("Successfully stored text ({:20X} @ {:20})".format(incoming_update.modified,incoming_update.body)) |
54 | + | |
55 | os.utime(self.filename,(os.path.getatime(self.filename),incoming_update.modified)) | |
56 | self.log.info("Successfully stored text ({} @ {})".format(incoming_update.modified,incoming_update.body[:20])) | |
57 | self.on_store_update(self,incoming_update) | |
58 | def receive_update(self,incoming_update): | |
59 | self.known_text = incoming_update.body | |
60 | self.last_checked = incoming_update.modified | |
61 | self.log.info("Received update to {}".format(incoming_update.modified)) | |
62 | self.on_receive_update(incoming_update) | |
63 | def handle_datagram(self,dgram): | |
64 | if dgram.data[:len(self.MAGIC)]==self.MAGIC: | |
65 | d = pickle.loads(dgram.data[len(self.MAGIC):]) | |
66 | self.on_received_pickle(d,dgram) | |
67 | def handle_pickle(self,obj,dgram): | |
68 | self.log.info("Received {} object. ( 0x{:X} )".format(type(obj).__name__,id(obj))) | |
69 | if isinstance(obj,IncomingUpdate): | |
70 | if obj.modified>self.last_checked: | |
71 | self.receive_update(obj) | |
72 | elif obj.modified<self.last_checked: | |
73 | if self.should_merge(obj,dgram): | |
74 | self.log.info("Attempting merge to {}".format(obj.modified)) | |
75 | try: | |
76 | self.merge(obj) | |
77 | except Exception as e: | |
78 | self.log.exception("Error while merging",exc_info=e) | |
79 | else: | |
80 | self.log.info("Not merging to {}".format(obj.modified)) | |
81 | elif isinstance(obj,UpdateRequest): | |
82 | # the other person was offline | |
83 | if obj.modified<self.last_checked: | |
84 | self.send_update() | |
85 | elif obj.modified>self.last_checked: | |
86 | self.request_update() | |
87 | def should_merge(self,update,dgram): | |
88 | return False | |
89 | def merge(self,update): | |
90 | # merge the previous update | |
91 | # with current info. | |
92 | assert False | |
93 | self.on_merge(update) | |
94 | def send_object(self,obj): | |
95 | bts = pickle.dumps(obj) | |
96 | self.log.info("Sending {} object 0x{:X}".format(type(obj).__name__,id(obj))) | |
97 | self.send_datagram(udp2p.Datagram(self.MAGIC + bts)) | |
98 | def send_update(self): | |
99 | update = IncomingUpdate(self.last_checked,self.known_text) | |
100 | self.log.info("Sending update {}".format(update.modified)) | |
101 | self.on_outgoing_update(update) | |
102 | def request_update(self): | |
103 | request = UpdateRequest(self.last_checked) | |
104 | self.log.info("Requesting update since {}".format(request.modified)) | |
105 | self.on_need_update(request) | |
106 | def listen_to(self,udpnode): | |
107 | udpnode.on_complete_datagram.handlers.add(self.handle_datagram) | |
108 | self.send_datagram.handlers.add(udpnode.send) | |
109 | ||
110 | class FileWatcher(threading.Thread): | |
111 | FREQUENCY = 5#(seconds) | |
112 | def __init__(self,filename): | |
113 | threading.Thread.__init__(self,daemon=True) | |
114 | self.last_checked = -1 | |
115 | self.timer = threading.Event() | |
116 | self.filename = filename | |
117 | self.on_update = udp2p.Event() | |
118 | def run(self): | |
119 | while True: | |
120 | mt = os.path.getmtime(self.filename) | |
121 | if mt >self.last_checked: | |
122 | self.last_checked = mt | |
123 | self.on_update() | |
124 | self.timer.wait(self.FREQUENCY) | |
125 | ||
126 | if __name__=="__main__": | |
127 | FILENAME = 'text.txt' | |
128 | MCAST = '224.0.0.0' | |
129 | PORTNMBR = 500 | |
130 | udpnode = udp2p.UDP2P(MCAST,PORTNMBR) | |
131 | persistent_text = PersistentTextNode() | |
132 | persistent_text.filename = FILENAME | |
133 | persistent_text.listen_to(udpnode) | |
134 | - | fw.on_update.handlers.add(persistent_text.send_update) |
134 | + | persistent_text.on_load_update.handlers.add(persistent_text.send_object) |
135 | fw = FileWatcher(persistent_text.filename) | |
136 | fw.on_update.handlers.add(persistent_text.load_update) | |
137 | udpnode.start() | |
138 | fw.start() |