Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- with beam.Pipeline(argv=pipeline_args) as p:
- from apache_beam.io.gcp.internal.clients import bigquery # pylint: disable=wrong-import-order, wrong-import-position
- table_schema = bigquery.TableSchema()
- # Fields that use standard types.
- alpha_schema = bigquery.TableFieldSchema()
- alpha_schema.name = 'alpha'
- alpha_schema.type = 'string'
- alpha_schema.mode = 'nullable'
- table_schema.fields.append(alpha_schema)
- # A nested field
- # beta_schema
- # |-- delta
- # |-- gama
- beta_schema = bigquery.TableFieldSchema()
- beta_schema.name = 'beta'
- beta_schema.type = 'record'
- beta_schema.mode = 'nullable'
- delta = bigquery.TableFieldSchema()
- delta.name = 'delta'
- delta.type = 'integer'
- delta.mode = 'nullable'
- beta_schema.fields.append(delta) # Append data to beta
- gamma = bigquery.TableFieldSchema()
- gamma.name = 'gamma'
- gamma.type = 'integer'
- gamma.mode = 'nullable'
- beta_schema.fields.append(gamma) # Append data to beta
- table_schema.fields.append(beta_schema) # Append the nested fields to the table_schema
- # A repeated field.
- children_schema = bigquery.TableFieldSchema()
- children_schema.name = 'children'
- children_schema.type = 'string'
- children_schema.mode = 'repeated'
- table_schema.fields.append(children_schema)
- # pylint: disable=expression-not-assigned
- output_data | 'WriteToBigQuery' >> beam.io.Write(
- beam.io.BigQuerySink(
- known_args.output,
- schema=table_schema, # Pass the defined table_schema
- create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
- write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement