Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
- import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
- import com.google.api.client.http.HttpTransport;
- import com.google.api.client.json.JsonFactory;
- import com.google.api.client.json.JsonFactory;
- import com.google.api.client.json.jackson2.JacksonFactory;
- import com.google.api.services.bigquery.Bigquery;
- import com.google.api.services.bigquery.BigqueryScopes;
- import com.google.api.client.util.Data;
- import com.google.api.services.bigquery.model.*;
- /* your class starts here */
- private String projectId = ""; /* fill in the project id here */
- private String query = ""; /* enter your query here */
- private Bigquery bigQuery;
- private Job insert;
- private TableDataList tableDataList;
- private Iterator<TableRow> rowsIterator;
- private List<TableRow> rows;
- private long maxResults = 100000L; /* max number of rows in a page */
- /* run query */
- public void open() throws Exception {
- HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
- JsonFactory jsonFactory = new JacksonFactory();
- GoogleCredential credential = GoogleCredential.getApplicationDefault(transport, jsonFactory);
- if (credential.createScopedRequired())
- credential = credential.createScoped(BigqueryScopes.all());
- bigQuery = new Bigquery.Builder(transport, jsonFactory, credential).setApplicationName("my app").build();
- JobConfigurationQuery queryConfig = new JobConfigurationQuery().setQuery(query);
- JobConfiguration jobConfig = new JobConfiguration().setQuery(queryConfig);
- Job job = new Job().setConfiguration(jobConfig);
- insert = bigQuery.jobs().insert(projectId, job).execute();
- JobReference jobReference = insert.getJobReference();
- while (true) {
- Job poll = bigQuery.jobs().get(projectId, jobReference.getJobId()).execute();
- String state = poll.getStatus().getState();
- if ("DONE".equals(state)) {
- ErrorProto errorResult = poll.getStatus().getErrorResult();
- if (errorResult != null)
- throw new Exception("Error running job: " + poll.getStatus().getErrors().get(0));
- break;
- }
- Thread.sleep(10000);
- }
- tableDataList = getPage();
- rows = tableDataList.getRows();
- rowsIterator = rows != null ? rows.iterator() : null;
- }
- /* read data row by row */
- public /* your data object here */ read() throws Exception {
- if (rowsIterator == null) return null;
- if (!rowsIterator.hasNext()) {
- String pageToken = tableDataList.getPageToken();
- if (pageToken == null) return null;
- tableDataList = getPage(pageToken);
- rows = tableDataList.getRows();
- if (rows == null) return null;
- rowsIterator = rows.iterator();
- }
- TableRow row = rowsIterator.next();
- for (TableCell cell : row.getF()) {
- Object value = cell.getV();
- /* extract the data here */
- }
- /* return the data */
- }
- private TableDataList getPage() throws IOException {
- return getPage(null);
- }
- private TableDataList getPage(String pageToken) throws IOException {
- TableReference sourceTable = insert
- .getConfiguration()
- .getQuery()
- .getDestinationTable();
- if (sourceTable == null)
- throw new IllegalArgumentException("Source table not available. Please check the query syntax.");
- return bigQuery.tabledata()
- .list(projectId, sourceTable.getDatasetId(), sourceTable.getTableId())
- .setPageToken(pageToken)
- .setMaxResults(maxResults)
- .execute();
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement