View difference between Paste ID: 4w2PZZgL and 4RVCT48Y
SHOW: | | - or go back to the newest paste.
1
import udp2p, os, logging, pickle, threading, hashlib
2
3
class IncomingUpdate:
4
	def __init__(self,mtime,txt):
5
		self.modified = mtime
6
		self.body = txt
7
8
class UpdateRequest:
9
	def __init__(self,mtime):
10
		self.modified = mtime
11
12
class PersistentTextNode:
13
	filename = "text.txt"
14
	MAGIC = b"PICKLE"
15
	LOG_FORMAT = '[{name:>10}]{levelname:>8}: {message}'
16
	def __init__(self):
17
		self.last_checked = -1
18-
		self.known_text = ""
18+
		self.known_text = None
19
		self.log = logging.getLogger("persistext")
20
		self.log_handler = logging.StreamHandler()
21
		self.log_formatter = logging.Formatter(self.LOG_FORMAT,style='{')
22
		self.log_handler.setFormatter(self.log_formatter)
23
		self.log.addHandler(self.log_handler)
24
		self.log.setLevel(logging.INFO)
25
		self.on_load_update = udp2p.Event()
26
		self.on_receive_update = udp2p.Event()
27
		self.on_received_pickle = udp2p.Event()
28
		self.on_need_update = udp2p.Event()
29
		self.on_outgoing_update = udp2p.Event()
30
		self.on_merge = udp2p.Event()
31
		self.on_store_update = udp2p.Event()
32
		self.send_datagram = udp2p.Event()
33
		self.on_received_pickle.handlers.add(self.handle_pickle)
34
		self.on_receive_update.handlers.add(self.store_update)
35
		self.on_need_update.handlers.add(self.send_object)
36
		self.on_outgoing_update.handlers.add(self.send_object)
37
	def load_update(self):
38
		# get modified time of text file.
39
		original_hash = None
40
		if self.known_text is not None:
41-
		original_hash = n.digest()
41+
			n = hashlib.md5()
42-
		with open(self.filename,'r') as f:
42+
			n.update(self.known_text)
43
			original_hash = n.digest()
44
		with open(self.filename,'rb') as f:
45
			self.known_text = f.read()
46
		n = hashlib.md5()
47
		n.update(self.known_text)
48-
			self.log.info("Successfully updated text ({} @ {:20})".format(self.last_checked,self.known_text))
48+
49
			self.last_checked = os.path.getmtime(self.filename)
50
			self.log.info("Successfully updated text ({} @ {})".format(self.last_checked,self.known_text[:20]))
51-
		with open(self.filename,'w') as f:
51+
52
	def store_update(self,incoming_update):
53
		with open(self.filename,'wb') as f:
54-
		self.log.info("Successfully stored text ({:20X} @ {:20})".format(incoming_update.modified,incoming_update.body))
54+
55
		os.utime(self.filename,(os.path.getatime(self.filename),incoming_update.modified))
56
		self.log.info("Successfully stored text ({} @ {})".format(incoming_update.modified,incoming_update.body[:20]))
57
		self.on_store_update(self,incoming_update)
58
	def receive_update(self,incoming_update):
59
		self.known_text = incoming_update.body
60
		self.last_checked = incoming_update.modified
61
		self.log.info("Received update to {}".format(incoming_update.modified))
62
		self.on_receive_update(incoming_update)
63
	def handle_datagram(self,dgram):
64
		if dgram.data[:len(self.MAGIC)]==self.MAGIC:
65
			d = pickle.loads(dgram.data[len(self.MAGIC):])
66
			self.on_received_pickle(d,dgram)
67
	def handle_pickle(self,obj,dgram):
68
		self.log.info("Received {} object. ( 0x{:X} )".format(type(obj).__name__,id(obj)))
69
		if isinstance(obj,IncomingUpdate):
70
			if obj.modified>self.last_checked:
71
				self.receive_update(obj)
72
			elif obj.modified<self.last_checked:
73
				if self.should_merge(obj,dgram):
74
					self.log.info("Attempting merge to {}".format(obj.modified))
75
					try:
76
						self.merge(obj)
77
					except Exception as e:
78
						self.log.exception("Error while merging",exc_info=e)
79
				else:
80
					self.log.info("Not merging to {}".format(obj.modified))
81
		elif isinstance(obj,UpdateRequest):
82
			# the other person was offline
83
			if obj.modified<self.last_checked:
84
				self.send_update()
85
			elif obj.modified>self.last_checked:
86
				self.request_update()
87
	def should_merge(self,update,dgram):
88
		return False
89
	def merge(self,update):
90
		# merge the previous update
91
		# with current info.
92
		assert False
93
		self.on_merge(update)
94
	def send_object(self,obj):
95
		bts = pickle.dumps(obj)
96
		self.log.info("Sending {} object 0x{:X}".format(type(obj).__name__,id(obj)))
97
		self.send_datagram(udp2p.Datagram(self.MAGIC + bts))
98
	def send_update(self):
99
		update = IncomingUpdate(self.last_checked,self.known_text)
100
		self.log.info("Sending update {}".format(update.modified))
101
		self.on_outgoing_update(update)
102
	def request_update(self):
103
		request = UpdateRequest(self.last_checked)
104
		self.log.info("Requesting update since {}".format(request.modified))
105
		self.on_need_update(request)
106
	def listen_to(self,udpnode):
107
		udpnode.on_complete_datagram.handlers.add(self.handle_datagram)
108
		self.send_datagram.handlers.add(udpnode.send)
109
110
class FileWatcher(threading.Thread):
111
	FREQUENCY = 5#(seconds)
112
	def __init__(self,filename):
113
		threading.Thread.__init__(self,daemon=True)
114
		self.last_checked = -1
115
		self.timer = threading.Event()
116
		self.filename = filename
117
		self.on_update = udp2p.Event()
118
	def run(self):
119
		while True:
120
			mt = os.path.getmtime(self.filename)
121
			if mt >self.last_checked:
122
				self.last_checked = mt
123
				self.on_update()
124
			self.timer.wait(self.FREQUENCY)
125
126
if __name__=="__main__":
127
	FILENAME = 'text.txt'
128
	MCAST = '224.0.0.0'
129
	PORTNMBR = 500
130
	udpnode = udp2p.UDP2P(MCAST,PORTNMBR)
131
	persistent_text = PersistentTextNode()
132
	persistent_text.filename = FILENAME
133
	persistent_text.listen_to(udpnode)
134-
	fw.on_update.handlers.add(persistent_text.send_update)
134+
	persistent_text.on_load_update.handlers.add(persistent_text.send_object)
135
	fw = FileWatcher(persistent_text.filename)
136
	fw.on_update.handlers.add(persistent_text.load_update)
137
	udpnode.start()
138
	fw.start()