Advertisement
Guest User

Untitled

a guest
Aug 21st, 2019
160
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 1.70 KB | None | 0 0
  1. import io
  2. import csv
  3. import zlib
  4. import boto3
  5. from tqdm.auto import tqdm
  6.  
  7. s3 = boto3.resource('s3')
  8.  
  9. def iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE):
  10. """
  11. Lets you use an iterable (e.g. a generator) that yields bytestrings as a read-only
  12. input stream.
  13.  
  14. The stream implements Python 3's newer I/O API (available in Python 2's io module).
  15. For efficiency, the stream is buffered.
  16. """
  17. class IterStream(io.RawIOBase):
  18. def __init__(self):
  19. self.leftover = None
  20. def readable(self):
  21. return True
  22. def readinto(self, b):
  23. try:
  24. l = len(b) # We're supposed to return at most this much
  25. chunk = self.leftover or next(iterable)
  26. output, self.leftover = chunk[:l], chunk[l:]
  27. b[:len(output)] = output
  28. return len(output)
  29. except StopIteration:
  30. return 0 # indicate EOF
  31. return io.BufferedReader(IterStream(), buffer_size=buffer_size)
  32.  
  33. def stream_s3_obj(obj):
  34. if 'deflate' in obj.key:
  35. d = zlib.decompressobj()
  36. byte_stream = iterable_to_stream(d.decompress(chunk) for chunk in obj.get()['Body'].iter_chunks())
  37. else:
  38. byte_stream = iterable_to_stream(obj.get()['Body'].iter_chunks())
  39. return io.TextIOWrapper(byte_stream, encoding='utf-8', newline='\n')
  40.  
  41. obj = s3.Object('wp-dw-datasets', 'wp_identity_newsletters/000000_0.deflate')
  42.  
  43. unique_newsletters = set()
  44. for row in tqdm(csv.reader(stream_s3_obj(obj), delimiter='\t')):
  45. name = row[2]
  46. if name not in unique_newsletters:
  47. unique_newsletters.add(name)
  48. print(f"added {name}. {len(unique_newsletters)} unique newsletters found thus far")
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement