Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import json
- import pyarrow
- import apache_beam as beam
- from datetime import datetime
- from toolz.dicttoolz import update_in
- class NullDict(dict):
- def __missing__(self, key):
- return None
- inputs_pattern = '*.json'
- outputs_prefix = 'outputs/json/part'
- parquet_schema = pyarrow.schema([
- ("name", pyarrow.string()),
- ("birthdate", pyarrow.timestamp("s")),
- ("value", pyarrow.int16()),
- ])
- with beam.Pipeline() as pipeline:
- (
- pipeline
- | 'Read lines' >> beam.io.ReadFromText(inputs_pattern)
- | 'Parse JSON' >> beam.Map(json.loads)
- | 'Parse Date' >> beam.Map(lambda doc: update_in(
- doc,
- keys=["birthdate"],
- func=lambda dt: datetime.strptime(dt, "%Y-%m-%d"),
- factory=NullDict
- ))
- | 'Write results' >> beam.io.parquetio.WriteToParquet(
- outputs_prefix,
- parquet_schema
- )
- )
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement