Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- def create_record(columns):
- #import re
- col_value=record_ids.split('|')
- col_name=columns.split(",")
- for i in range(length(col_name)):
- schmea_dict[col_name[i]]=col_value[i]
- return schmea_dict
- schema = 'tungsten_opcode:STRING,tungsten_seqno:INTEGER
- columns="tungsten_opcode,tungsten_seqno"
- lines = p | 'Read PubSub' >> beam.io.ReadStringsFromPubSub(INPUT_TOPIC) |
- beam.WindowInto(window.FixedWindows(15))
- record_ids = lines | 'Split' >>
- (beam.FlatMap(split_fn).with_output_types(unicode))
- records = record_ids | 'CreateRecords' >> beam.Map(create_record(columns))
- records | 'BqInsert' >> beam.io.WriteToBigQuery(
- OUTPUT,
- schema=schema,
- create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
- write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
Add Comment
Please, Sign In to add comment