Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import apache_beam as beam
- import re
- import json
- class JsonFileSource(beam.io.filebasedsource.FileBasedSource):
- """A Beam source for JSON that emits all top-level json objects.
- Note that the beginning of a top-level json object is assumed to begin on a
- new line with no leading spaces, possibly preceded by a square bracket ([)
- or comma (,); and possibly with spaces between the bracket/comma and the
- beginning of the object.
- A custom source is necessary to enable parallelization of processing for
- the elements. The existing TextFileSource emits lines, but the Meteorite
- Landing data is a giant json array, where elements span multiple lines.
- """
- JSON_OBJECT_START = re.compile(r'^([\[,] *)?{')
- def read_records(self, file_name, offset_range_tracker):
- return self.obj_iterator(file_name, offset_range_tracker)
- @staticmethod
- def _iterable_gcs(f):
- """Create a generator for a not-quite-filelike object.
- FileBasedSource.open_file returns an object that doesn't implement the
- file interface completely, so we need this utility function in order to
- iterate over lines, while keeping the .tell() accurate.
- """
- while True:
- line = f.readline()
- if not line:
- break
- yield line
- def obj_iterator(self, file_name, offset_range_tracker):
- with self.open_file(file_name) as f:
- f.seek(offset_range_tracker.start_position() or 0)
- iterable_f = self._iterable_gcs(f)
- while True:
- current_pos = f.tell()
- # First, look for the start of an object
- # If we hit the end of the range allotted to us without finding
- # an element, stop.
- for line in iterable_f:
- if self.JSON_OBJECT_START.match(line):
- if not offset_range_tracker.try_claim(current_pos):
- raise StopIteration()
- content = [line.lstrip('[, ')]
- break
- current_pos = f.tell()
- else:
- # We ran off the end of the file without finding a new
- # object. This means we're done.
- raise StopIteration()
- # We're in an object. Collect its contents and emit it.
- for line in iterable_f:
- content.append(line)
- if line.startswith('}'):
- yield json.loads(''.join(content))
- break
Add Comment
Please, Sign In to add comment