SHARE
TWEET

Untitled

a guest Oct 21st, 2019 71 Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
  1. import json
  2. import pyarrow
  3. import apache_beam as beam
  4. from datetime import datetime
  5. from toolz.dicttoolz import update_in
  6.  
  7.  
  8. class NullDict(dict):
  9.     def __missing__(self, key):
  10.         return None
  11.  
  12.  
  13. inputs_pattern = '*.json'
  14. outputs_prefix = 'outputs/json/part'
  15.  
  16. parquet_schema = pyarrow.schema([
  17.     ("name", pyarrow.string()),
  18.     ("birthdate", pyarrow.timestamp("s")),
  19.     ("value", pyarrow.int16()),
  20. ])
  21.  
  22. with beam.Pipeline() as pipeline:
  23.     (
  24.         pipeline
  25.         | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
  26.         | 'Parse JSON' >> beam.Map(json.loads)
  27.         | 'Parse Date' >> beam.Map(lambda doc: update_in(
  28.             doc,
  29.             keys=["birthdate"],
  30.             func=lambda dt: datetime.strptime(dt, "%Y-%m-%d"),
  31.             factory=NullDict
  32.         ))
  33.         | 'Write results' >> beam.io.parquetio.WriteToParquet(
  34.             outputs_prefix,
  35.             parquet_schema
  36.         )
  37.     )
RAW Paste Data
We use cookies for various purposes including analytics. By continuing to use Pastebin, you agree to our use of cookies as described in the Cookies Policy. OK, I Understand
 
Top