Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- from datetime import datetime
- from queue import PriorityQueue
- def generate_sid():
- current_sid = 1
- while True:
- yield str(current_sid)
- current_sid += 1
- class SessionManager:
- __sid_generator = generate_sid()
- __sessions = {}
- __sessions_queue = PriorityQueue()
- def createSession(self, id):
- sid = next(self.__sid_generator)
- current_time = datetime.now()
- self.__sessions[sid] = (id, current_time)
- self.__sessions_queue.put((current_time, sid))
- return sid
- def checkSession(self, sid: str):
- session = self.__sessions.get(sid)
- if session:
- id, previous_time = session
- current_time = datetime.now()
- time_delta = current_time - previous_time
- if time_delta.seconds > 600:
- del self.__sessions[sid]
- return None
- else:
- self.__sessions[sid] = (id, current_time)
- self.__sessions_queue.put((current_time, sid))
- return id
- else:
- return None
- def cleanup(self):
- current_time = datetime.now()
- if self.__sessions_queue.empty():
- return
- queue_time, sid = self.__sessions_queue.get()
- while (current_time - queue_time).seconds > 600 and not self.__sessions_queue.empty():
- session = self.__sessions.get(sid)
- if session:
- if (current_time - session[1]).seconds > 600:
- del self.__sessions[sid]
- queue_time, sid = self.__sessions_queue.get()
- self.__sessions_queue.put((queue_time, sid))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement