Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import datetime, logging, smtplib, threading, time, Queue
- class BufferingSMTPHandler(logging.Handler):
- """Set up a buffering SMTP logging handler."""
- # Configurable parameters
- _POLL_INTERVAL = 5 # Interval between checks for sets of new records.
- _POLL_DURATION_MAX = 10 # If a record is available, max time to continue
- # polling for more records.
- _SEND_INTERVAL = 2 * 60 # Interval between sends.
- # Setup class environment
- _Q = Queue.Queue()
- _LOCK = threading.Lock()
- _LAST_SEND_TIME = float('-inf')
- def state(self):
- """Return a dict containing keys and values providing information about
- the state of the handler."""
- #time_now = time.time()
- time_now = datetime.datetime.utcnow()
- # Calculate time since last email
- if self._LAST_SEND_TIME != float('-inf'):
- time_of_last = datetime.datetime.utcfromtimestamp(self._LAST_SEND_TIME)
- time_since_last = time_now - time_of_last
- else:
- time_since_last = '(none sent yet)'
- # Time to next earliest possible email
- if self._LAST_SEND_TIME != float('-inf'):
- time_of_next = datetime.datetime.utcfromtimestamp(self._LAST_SEND_TIME+self._SEND_INTERVAL)
- time_of_next = max(time_now, time_of_next)
- time_until_next = time_of_next - time_now
- else:
- time_until_next = time_now - time_now
- return {'Total number of unprocessed errors': self._Q.qsize() + self._q.qsize(),
- 'Intervals': 'Poll: {}s, Send: {}s'.format(self._POLL_INTERVAL, self._SEND_INTERVAL),
- 'Poll duration max': '{}s'.format(self._POLL_DURATION_MAX),
- 'Time since last email': time_since_last,
- 'Time to next earliest possible email': 'at least {}'.format(time_until_next),
- # This simplification doesn't account for _POLL_INTERVAL and _POLL_DURATION_MAX, etc.
- 'Recipients': self._header['toaddrs_str'],
- }
- def __init__(self, fromaddr, toaddrs, subject):
- # Setup instance environment
- self._active = True
- self._q = Queue.Queue() # this is different from self._Q
- # Construct email header
- self._header = {'fromaddr': fromaddr,
- 'toaddrs': toaddrs,
- 'toaddrs_str': ','.join(toaddrs),
- 'subject': subject,
- }
- self._header['header'] = 'From: {fromaddr}\r\nTo: {toaddrs_str}\r\nSubject: {subject}\r\n\r\n'.format(**self._header)
- # Start main buffer-processor thread
- thread_name = '{}Thread'.format(self.__class__.__name__)
- # Note: The class is intentionally not inherited from threading.Thread,
- # as doing so was found to result in the target thread not being
- # named correctly, possibly due to a namespace collision.
- thread = threading.Thread(target=self.run, name=thread_name)
- thread.daemon = True
- thread.start()
- super(BufferingSMTPHandler, self).__init__()
- def close(self):
- """Process some remaining records."""
- super(BufferingSMTPHandler, self).close()
- self._active = False
- self._POLL_DURATION_MAX = min(0.25, self._POLL_DURATION_MAX)
- # no need to set self.__class__._POLL_DURATION_MAX
- self._process_recordset()
- def emit(self, record):
- """Queue a record into the class queue so it can be emitted
- collectively."""
- # This method can be called by various threads.
- self._Q.put(self.format(record))
- def run(self):
- """Periodically flush the buffer."""
- while self._active:
- with self._LOCK: # protects _LAST_SEND_TIME and _q
- next_send_time = self._LAST_SEND_TIME + self._SEND_INTERVAL
- if time.time() > next_send_time:
- self._process_recordset()
- sleep_time = self._POLL_INTERVAL
- else:
- # assert (next_send_time != -inf)
- sleep_time = max(next_send_time - time.time(), 0)
- time.sleep(sleep_time)
- def _process_recordset(self):
- """Process a set of records buffered in class queue."""
- try:
- self._move_recordset_from_Q_to_q()
- if not self._q.empty():
- self._send_records_from_q()
- self.__class__._LAST_SEND_TIME = time.time()
- except (KeyboardInterrupt, SystemExit):
- pass
- def _move_recordset_from_Q_to_q(self):
- """Move a set of records from class queue to instance queue."""
- deadline = time.time() + self._POLL_DURATION_MAX
- while time.time() < deadline:
- try:
- self._q.put(self._Q.get_nowait())
- self._Q.task_done()
- except Queue.Empty:
- if self._q.empty():
- break
- time.sleep(0.1)
- def _send_records_from_q(self):
- """Send records that are in instance queue."""
- records = []
- try:
- # Get formatted records from instance queue
- while True:
- records.append(self._q.get_nowait())
- self._q.task_done()
- except (Queue.Empty, KeyboardInterrupt, SystemExit):
- pass
- finally:
- # Send formatted records from instance queue
- if records:
- body = 'Included messages: {}\r\n'.format(len(records))
- num_pending_messages = self._Q.qsize() + self._q.qsize()
- if num_pending_messages > 0:
- body += 'Pending messages: {}\r\n'.format(num_pending_messages)
- # Add main content of message body
- body += '\r\n'
- body += '\r\n\r\n'.join(records)
- msg = self._header['header'] + body
- smtp = smtplib.SMTP()
- smtp.connect()
- smtp.sendmail(self._header['fromaddr'], self._header['toaddrs'], msg)
- smtp.quit()
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement