Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- #!/usr/bin/env python3
- import boto3
- from datetime import datetime
- import logging
- import socket
- from time import time, sleep
- logger = logging.getLogger(__name__)
- log_format = '%(asctime)s %(name)s %(levelname)5s: %(message)s'
- def main():
- from logging import DEBUG, INFO, Formatter
- logging.getLogger('').setLevel(DEBUG)
- logging.getLogger('botocore').setLevel(INFO)
- sh = logging.StreamHandler()
- sh.setFormatter(Formatter(log_format))
- sh.setLevel(DEBUG)
- logging.getLogger('').addHandler(sh)
- ah = AWSHandler()
- ah.setFormatter(Formatter(log_format))
- ah.setLevel(DEBUG)
- logging.getLogger('').addHandler(ah)
- try:
- for i in range(1000):
- logger.debug('debug %s', i)
- logger.info('info %s', i)
- if i % 20 == 0:
- ah.flush()
- sleep(.3)
- finally:
- ah.flush()
- class AWSHandler (logging.Handler):
- def __init__(self, session=None, logs_region=None, s3_region=None):
- super().__init__()
- session = session or boto3
- self.logs_client = session.client('logs', region_name=logs_region or 'eu-central-1')
- self.s3_client = session.client('s3', region_name=s3_region or 'eu-west-1')
- self.stream_name = '{date}-{host}'.format(
- date=datetime.utcnow().strftime('%Y%m%dT%H%M%SZ'),
- host=socket.getfqdn())
- self.logs_client.create_log_stream(
- logGroupName='demo',
- logStreamName=self.stream_name)
- self.seq_token = None
- self.recursion_guard = False
- self.archive_content = b''
- self.messages_to_flush = []
- def emit(self, record):
- if self.recursion_guard:
- return
- self.recursion_guard = True
- try:
- msg = self.format(record)
- self.messages_to_flush.append(msg)
- r = self.logs_client.put_log_events(
- logGroupName='demo',
- logStreamName=self.stream_name,
- logEvents=[
- {
- 'timestamp': int(time() * 1000),
- 'message': str(msg),
- },
- ],
- # sequenceToken cannot be None, it must be really missing for the first message...
- **({'sequenceToken': self.seq_token} if self.seq_token else {}))
- if r.get('rejectedLogEventsInfo'):
- raise Exception('Rejected: {!r}'.format(r))
- self.seq_token = r['nextSequenceToken']
- finally:
- self.recursion_guard = False
- def flush(self):
- if not self.messages_to_flush:
- return
- self.recursion_guard = True
- try:
- data = self.archive_content
- data += ''.join(msg + '\n' for msg in self.messages_to_flush).encode()
- self.s3_client.put_object(
- #ACL='private',
- ACL='public-read',
- Body=data,
- Bucket='example-bucket',
- ContentType='text/plain',
- Key='test/{name}.log'.format(name=self.stream_name),
- StorageClass='STANDARD_IA',
- )
- self.archive_content = data
- self.messages_to_flush = []
- finally:
- self.recursion_guard = False
- if __name__ == '__main__':
- main()
Add Comment
Please, Sign In to add comment