Guest User

Untitled

a guest
Jan 6th, 2019
108
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 2.51 KB | None | 0 0
  1. package com.click.example;
  2.  
  3. import org.apache.beam.sdk.Pipeline;
  4. import org.apache.beam.sdk.options.Description;
  5. import org.apache.beam.sdk.options.PipelineOptions;
  6. import org.apache.beam.sdk.options.PipelineOptionsFactory;
  7. import org.apache.beam.sdk.options.Validation.Required;
  8. import org.apache.beam.sdk.values.KV;
  9. import org.apache.beam.sdk.values.PCollection;
  10. import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
  11. import org.apache.beam.sdk.testing.TestPipeline;
  12. import com.google.api.services.bigquery.model.TableRow;
  13. import org.apache.beam.sdk.transforms.DoFn;
  14. import org.apache.beam.sdk.transforms.ParDo;
  15. import org.apache.beam.sdk.transforms.Create;
  16. import org.apache.beam.sdk.io.jdbc.JdbcIO;
  17. import java.sql.PreparedStatement;
  18. import java.sql.SQLException;
  19. import java.sql.Statement;
  20. import org.apache.commons.dbcp2.BasicDataSource;
  21.  
  22. public class BikeTrip {
  23.  
  24.  
  25. public interface BikeTripOptions extends PipelineOptions {
  26.  
  27. }
  28.  
  29. static class BikeTripStatementSetter implements JdbcIO.PreparedStatementSetter<TableRow>
  30. {
  31. private static final long serialVersionUID = 1L;
  32.  
  33. public void setParameters(TableRow element, PreparedStatement query) throws Exception
  34. {
  35.  
  36. String trip_id = (String) element.get("trip_id");
  37. String subscriber_type = (String) element.get("subscriber_type");
  38. String start_station_name = (String) element.get("start_station_name");
  39. String end_station_name = (String) element.get("end_station_name");
  40.  
  41. query.setLong(1, Long.valueOf(trip_id));
  42. query.setString(2, subscriber_type);
  43. query.setString(3, start_station_name);
  44. query.setString(4, end_station_name);
  45.  
  46. }
  47. }
  48.  
  49. public static void main(String[] args) {
  50.  
  51. BikeTripOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(BikeTripOptions.class);
  52. Pipeline p = Pipeline.create(options);
  53.  
  54. p
  55. .apply(BigQueryIO.read().from("bigquery-public-data:austin_bikeshare.bikeshare_trips"))
  56. .apply(JdbcIO.<TableRow>write()
  57. .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", "jdbc:mysql://google/<DATABASE_NAME>?cloudSqlInstance=<PROJECT_ID>:<INSTANCE_LOCATION>:<INSTANCE_CONNECTION_NAME>&socketFactory=com.google.cloud.sql.mysql.SocketFactory&user=<MYSQL_USER_NAME>&password=<MYSQL_USER_PASSWORD>&useSSL=false")
  58. )
  59. .withStatement("insert into trips values(?,?,?,?)")
  60. .withPreparedStatementSetter(new BikeTripStatementSetter()));
  61. p.run().waitUntilFinish();
  62. }
  63.  
  64. }
Add Comment
Please, Sign In to add comment