Skip to content

Instantly share code, notes, and snippets.

@pbrumblay
Created May 25, 2018 17:14
Show Gist options
  • Save pbrumblay/1e6bdedb6e77131370452ab38c151bd2 to your computer and use it in GitHub Desktop.
Save pbrumblay/1e6bdedb6e77131370452ab38c151bd2 to your computer and use it in GitHub Desktop.
Use low level com.google.api.services.bigquery.Bigquery client to create load job to truncate partition
package com.fearlesstg;
import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
import com.google.api.client.http.HttpTransport;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.json.JsonFactory;
import com.google.api.client.json.jackson.JacksonFactory;
import com.google.api.services.bigquery.Bigquery;
import com.google.api.services.bigquery.BigqueryScopes;
import com.google.api.services.bigquery.model.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
public class ManualLoad {
private static final Logger LOG = LoggerFactory.getLogger(ManualLoad.class);
public static void main(String[] args) {
String projectId = args[0];
String datasetId = args[1];
String tableId = args[2];
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("PARTITION_DATE").setType("DATE"));
fields.add(new TableFieldSchema().setName("TEXT_FIELD").setType("STRING"));
fields.add(new TableFieldSchema().setName("INT_FIELD").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("LoadDate").setType("DATE"));
fields.add(new TableFieldSchema().setName("RecordSource").setType("STRING"));
TableSchema tableSchema = new TableSchema().setFields(fields);
TableReference tableRef = new TableReference();
tableRef.setProjectId(projectId);
tableRef.setDatasetId(datasetId);
tableRef.setTableId(tableId + "$20180303");
List<String> gcsUris = new ArrayList<>();
gcsUris.add(args[3]);
JobConfigurationLoad loadConfig =
new JobConfigurationLoad()
.setDestinationTable(tableRef)
.setSchema(tableSchema)
.setSourceUris(gcsUris)
.setWriteDisposition("WRITE_TRUNCATE")
.setCreateDisposition("CREATE_NEVER")
.setSourceFormat("NEWLINE_DELIMITED_JSON")
.setTimePartitioning(new TimePartitioning().setField("PARTITION_DATE").setType("DAY"));
JobReference jobRef =
new JobReference().setProjectId(projectId).setJobId(args[4]).setLocation("US");
Job job = new Job().setJobReference(jobRef).setConfiguration(new JobConfiguration().setLoad(loadConfig));
try {
Bigquery client = createAuthorizedClient();
client.jobs().insert(jobRef.getProjectId(), job).execute();
LOG.info("Started BigQuery job: {}.\n{}", jobRef, formatBqStatusCommand(jobRef.getProjectId(), jobRef.getJobId()));
return; // SUCCEEDED
} catch (IOException e) {
// ignore and retry
LOG.info("Failed to insert job " + jobRef + ", will retry:", e);
}
}
private static String formatBqStatusCommand(String projectId, String jobId) {
return String.format("bq show -j --format=prettyjson --project_id=%s %s",
projectId, jobId);
}
// [START get_service]
private static Bigquery createAuthorizedClient() throws IOException {
Collection<String> BIGQUERY_SCOPES = BigqueryScopes.all();
HttpTransport TRANSPORT = new NetHttpTransport();
JsonFactory JSON_FACTORY = new JacksonFactory();
GoogleCredential credential = GoogleCredential.getApplicationDefault(TRANSPORT, JSON_FACTORY);
if(credential.createScopedRequired()){
credential = credential.createScoped(BIGQUERY_SCOPES);
}
return new Bigquery.Builder(TRANSPORT, JSON_FACTORY, credential).setApplicationName("bigqueryioproblems-manualload").build();
}
// [END get_service]
}
@pbrumblay
Copy link
Author

pbrumblay commented May 25, 2018

java -cp <jarfile> com.fearlesstg.ManualLoad <project> <dataset> <table> <gcs json file> <job id>

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment