Guest User

Untitled

a guest
Jun 22nd, 2018
67
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 0.82 KB | None | 0 0
  1. def create_record(columns):
  2. #import re
  3. col_value=record_ids.split('|')
  4. col_name=columns.split(",")
  5. for i in range(length(col_name)):
  6. schmea_dict[col_name[i]]=col_value[i]
  7. return schmea_dict
  8.  
  9. schema = 'tungsten_opcode:STRING,tungsten_seqno:INTEGER
  10. columns="tungsten_opcode,tungsten_seqno"
  11. lines = p | 'Read PubSub' >> beam.io.ReadStringsFromPubSub(INPUT_TOPIC) |
  12. beam.WindowInto(window.FixedWindows(15))
  13. record_ids = lines | 'Split' >>
  14. (beam.FlatMap(split_fn).with_output_types(unicode))
  15. records = record_ids | 'CreateRecords' >> beam.Map(create_record(columns))
  16. records | 'BqInsert' >> beam.io.WriteToBigQuery(
  17. OUTPUT,
  18. schema=schema,
  19. create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
  20. write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
Add Comment
Please, Sign In to add comment