Pastebin launched a little side project called VERYVIRAL.com, check it out ;-) Want more features on Pastebin? Sign Up, it's FREE!
Guest

BufferingSMTPHandler

By: a guest on Dec 5th, 2011  |  syntax: Python  |  size: 6.46 KB  |  views: 232  |  expires: Never
download  |  raw  |  embed  |  report abuse  |  print
Text below is selected. Please press Ctrl+C to copy to your clipboard. (⌘+C on Mac)
  1. import datetime, logging, smtplib, threading, time, Queue
  2.  
  3. class BufferingSMTPHandler(logging.Handler):
  4.     """Set up a buffering SMTP logging handler."""
  5.    
  6.     # Configurable parameters
  7.     _POLL_INTERVAL = 5       # Interval between checks for sets of new records.
  8.     _POLL_DURATION_MAX = 10  # If a record is available, max time to continue
  9.     #                          polling for more records.
  10.     _SEND_INTERVAL = 2 * 60  # Interval between sends.
  11.    
  12.     # Setup class environment
  13.     _Q = Queue.Queue()
  14.     _LOCK = threading.Lock()
  15.     _LAST_SEND_TIME = float('-inf')
  16.    
  17.     def state(self):
  18.         """Return a dict containing keys and values providing information about
  19.        the state of the handler."""
  20.        
  21.         #time_now = time.time()
  22.         time_now = datetime.datetime.utcnow()
  23.        
  24.         # Calculate time since last email
  25.         if self._LAST_SEND_TIME != float('-inf'):
  26.             time_of_last =  datetime.datetime.utcfromtimestamp(self._LAST_SEND_TIME)
  27.             time_since_last = time_now - time_of_last
  28.         else:
  29.             time_since_last = '(none sent yet)'
  30.            
  31.         # Time to next earliest possible email
  32.         if self._LAST_SEND_TIME != float('-inf'):
  33.             time_of_next = datetime.datetime.utcfromtimestamp(self._LAST_SEND_TIME+self._SEND_INTERVAL)
  34.             time_of_next = max(time_now, time_of_next)
  35.             time_until_next = time_of_next - time_now
  36.         else:
  37.             time_until_next = time_now - time_now
  38.        
  39.         return {'Total number of unprocessed errors': self._Q.qsize() + self._q.qsize(),
  40.                 'Intervals': 'Poll: {}s, Send: {}s'.format(self._POLL_INTERVAL, self._SEND_INTERVAL),
  41.                 'Poll duration max': '{}s'.format(self._POLL_DURATION_MAX),
  42.                 'Time since last email': time_since_last,
  43.                 'Time to next earliest possible email': 'at least {}'.format(time_until_next),
  44.                     # This simplification doesn't account for _POLL_INTERVAL and _POLL_DURATION_MAX, etc.
  45.                 'Recipients': self._header['toaddrs_str'],
  46.                 }
  47.    
  48.     def __init__(self, fromaddr, toaddrs, subject):
  49.        
  50.         # Setup instance environment
  51.         self._active = True
  52.         self._q = Queue.Queue()  # this is different from self._Q
  53.        
  54.         # Construct email header
  55.         self._header = {'fromaddr': fromaddr,
  56.                         'toaddrs': toaddrs,
  57.                         'toaddrs_str': ','.join(toaddrs),
  58.                         'subject': subject,
  59.                         }
  60.         self._header['header'] = 'From: {fromaddr}\r\nTo: {toaddrs_str}\r\nSubject: {subject}\r\n\r\n'.format(**self._header)
  61.        
  62.         # Start main buffer-processor thread
  63.         thread_name = '{}Thread'.format(self.__class__.__name__)
  64.         # Note: The class is intentionally not inherited from threading.Thread,
  65.         #       as doing so was found to result in the target thread not being
  66.         #       named correctly, possibly due to a namespace collision.
  67.         thread = threading.Thread(target=self.run, name=thread_name)
  68.         thread.daemon = True
  69.         thread.start()
  70.        
  71.         super(BufferingSMTPHandler, self).__init__()
  72.    
  73.     def close(self):
  74.         """Process some remaining records."""
  75.        
  76.         super(BufferingSMTPHandler, self).close()
  77.        
  78.         self._active = False
  79.         self._POLL_DURATION_MAX = min(0.25, self._POLL_DURATION_MAX)
  80.             # no need to set self.__class__._POLL_DURATION_MAX
  81.         self._process_recordset()
  82.    
  83.     def emit(self, record):
  84.         """Queue a record into the class queue so it can be emitted
  85.        collectively."""
  86.        
  87.         # This method can be called by various threads.
  88.         self._Q.put(self.format(record))
  89.  
  90.     def run(self):
  91.         """Periodically flush the buffer."""
  92.        
  93.         while self._active:
  94.             with self._LOCK: # protects _LAST_SEND_TIME and _q
  95.                 next_send_time = self._LAST_SEND_TIME + self._SEND_INTERVAL
  96.                 if time.time() > next_send_time:
  97.                     self._process_recordset()
  98.                     sleep_time = self._POLL_INTERVAL
  99.                 else:
  100.                     # assert (next_send_time != -inf)
  101.                     sleep_time = max(next_send_time - time.time(), 0)
  102.             time.sleep(sleep_time)
  103.                
  104.     def _process_recordset(self):
  105.         """Process a set of records buffered in class queue."""
  106.        
  107.         try:
  108.             self._move_recordset_from_Q_to_q()
  109.             if not self._q.empty():
  110.                 self._send_records_from_q()
  111.                 self.__class__._LAST_SEND_TIME = time.time()
  112.         except (KeyboardInterrupt, SystemExit):
  113.             pass
  114.        
  115.     def _move_recordset_from_Q_to_q(self):
  116.         """Move a set of records from class queue to instance queue."""
  117.        
  118.         deadline = time.time() + self._POLL_DURATION_MAX
  119.         while time.time() < deadline:
  120.             try:
  121.                 self._q.put(self._Q.get_nowait())
  122.                 self._Q.task_done()
  123.             except Queue.Empty:
  124.                 if self._q.empty():
  125.                     break
  126.                 time.sleep(0.1)
  127.    
  128.     def _send_records_from_q(self):
  129.         """Send records that are in instance queue."""
  130.        
  131.         records = []
  132.         try:
  133.            
  134.             # Get formatted records from instance queue
  135.             while True:
  136.                 records.append(self._q.get_nowait())
  137.                 self._q.task_done()
  138.                
  139.         except (Queue.Empty, KeyboardInterrupt, SystemExit):
  140.             pass
  141.         finally:
  142.            
  143.             # Send formatted records from instance queue
  144.             if records:
  145.                
  146.                 body =  'Included messages: {}\r\n'.format(len(records))
  147.                
  148.                 num_pending_messages = self._Q.qsize() + self._q.qsize()
  149.                 if num_pending_messages > 0:
  150.                     body += 'Pending messages:  {}\r\n'.format(num_pending_messages)
  151.                
  152.                 # Add main content of message body
  153.                 body += '\r\n'
  154.                 body += '\r\n\r\n'.join(records)
  155.                 msg = self._header['header'] + body
  156.                
  157.                 smtp = smtplib.SMTP()
  158.                 smtp.connect()
  159.                 smtp.sendmail(self._header['fromaddr'], self._header['toaddrs'], msg)
  160.                 smtp.quit()
  161.  
  162.