Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import io
- import csv
- import zlib
- import boto3
- from tqdm.auto import tqdm
- s3 = boto3.resource('s3')
- def iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE):
- """
- Lets you use an iterable (e.g. a generator) that yields bytestrings as a read-only
- input stream.
- The stream implements Python 3's newer I/O API (available in Python 2's io module).
- For efficiency, the stream is buffered.
- """
- class IterStream(io.RawIOBase):
- def __init__(self):
- self.leftover = None
- def readable(self):
- return True
- def readinto(self, b):
- try:
- l = len(b) # We're supposed to return at most this much
- chunk = self.leftover or next(iterable)
- output, self.leftover = chunk[:l], chunk[l:]
- b[:len(output)] = output
- return len(output)
- except StopIteration:
- return 0 # indicate EOF
- return io.BufferedReader(IterStream(), buffer_size=buffer_size)
- def stream_s3_obj(obj):
- if 'deflate' in obj.key:
- d = zlib.decompressobj()
- byte_stream = iterable_to_stream(d.decompress(chunk) for chunk in obj.get()['Body'].iter_chunks())
- else:
- byte_stream = iterable_to_stream(obj.get()['Body'].iter_chunks())
- return io.TextIOWrapper(byte_stream, encoding='utf-8', newline='\n')
- obj = s3.Object('wp-dw-datasets', 'wp_identity_newsletters/000000_0.deflate')
- unique_newsletters = set()
- for row in tqdm(csv.reader(stream_s3_obj(obj), delimiter='\t')):
- name = row[2]
- if name not in unique_newsletters:
- unique_newsletters.add(name)
- print(f"added {name}. {len(unique_newsletters)} unique newsletters found thus far")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement