Guest User

Untitled

a guest
Jan 18th, 2018
103
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.56 KB | None | 0 0
  1. import apache_beam as beam
  2. import re
  3. import json
  4.  
  5.  
  6. class JsonFileSource(beam.io.filebasedsource.FileBasedSource):
  7. """A Beam source for JSON that emits all top-level json objects.
  8.  
  9. Note that the beginning of a top-level json object is assumed to begin on a
  10. new line with no leading spaces, possibly preceded by a square bracket ([)
  11. or comma (,); and possibly with spaces between the bracket/comma and the
  12. beginning of the object.
  13.  
  14. A custom source is necessary to enable parallelization of processing for
  15. the elements. The existing TextFileSource emits lines, but the Meteorite
  16. Landing data is a giant json array, where elements span multiple lines.
  17. """
  18. JSON_OBJECT_START = re.compile(r'^([\[,] *)?{')
  19.  
  20. def read_records(self, file_name, offset_range_tracker):
  21. return self.obj_iterator(file_name, offset_range_tracker)
  22.  
  23. @staticmethod
  24. def _iterable_gcs(f):
  25. """Create a generator for a not-quite-filelike object.
  26.  
  27. FileBasedSource.open_file returns an object that doesn't implement the
  28. file interface completely, so we need this utility function in order to
  29. iterate over lines, while keeping the .tell() accurate.
  30. """
  31. while True:
  32. line = f.readline()
  33. if not line:
  34. break
  35. yield line
  36.  
  37. def obj_iterator(self, file_name, offset_range_tracker):
  38. with self.open_file(file_name) as f:
  39. f.seek(offset_range_tracker.start_position() or 0)
  40.  
  41. iterable_f = self._iterable_gcs(f)
  42.  
  43. while True:
  44. current_pos = f.tell()
  45. # First, look for the start of an object
  46. # If we hit the end of the range allotted to us without finding
  47. # an element, stop.
  48. for line in iterable_f:
  49. if self.JSON_OBJECT_START.match(line):
  50. if not offset_range_tracker.try_claim(current_pos):
  51. raise StopIteration()
  52. content = [line.lstrip('[, ')]
  53. break
  54. current_pos = f.tell()
  55. else:
  56. # We ran off the end of the file without finding a new
  57. # object. This means we're done.
  58. raise StopIteration()
  59.  
  60. # We're in an object. Collect its contents and emit it.
  61. for line in iterable_f:
  62. content.append(line)
  63. if line.startswith('}'):
  64. yield json.loads(''.join(content))
  65. break
Add Comment
Please, Sign In to add comment