Advertisement
Guest User

Untitled

a guest
Oct 21st, 2019
91
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.90 KB | None | 0 0
  1. import json
  2. import pyarrow
  3. import apache_beam as beam
  4. from datetime import datetime
  5. from toolz.dicttoolz import update_in
  6.  
  7.  
  8. class NullDict(dict):
  9. def __missing__(self, key):
  10. return None
  11.  
  12.  
  13. inputs_pattern = '*.json'
  14. outputs_prefix = 'outputs/json/part'
  15.  
  16. parquet_schema = pyarrow.schema([
  17. ("name", pyarrow.string()),
  18. ("birthdate", pyarrow.timestamp("s")),
  19. ("value", pyarrow.int16()),
  20. ])
  21.  
  22. with beam.Pipeline() as pipeline:
  23. (
  24. pipeline
  25. | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
  26. | 'Parse JSON' >> beam.Map(json.loads)
  27. | 'Parse Date' >> beam.Map(lambda doc: update_in(
  28. doc,
  29. keys=["birthdate"],
  30. func=lambda dt: datetime.strptime(dt, "%Y-%m-%d"),
  31. factory=NullDict
  32. ))
  33. | 'Write results' >> beam.io.parquetio.WriteToParquet(
  34. outputs_prefix,
  35. parquet_schema
  36. )
  37. )
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement