Last active
June 29, 2019 10:41
-
-
Save polleyg/051f07683cfc19932db9c9c4f079b079 to your computer and use it in GitHub Desktop.
This code works out the location of the buckets and also the storage class
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//imports & doc omitted for brevity. See repo for full source file. | |
//https://github.com/polleyg/gcp-dataflow-copy-bigquery/blob/master/src/main/java/org/polleyg/BQTableCopyPipeline.java | |
public class BQTableCopyPipeline { | |
private static final Logger LOG = LoggerFactory.getLogger(BQTableCopyPipeline.class); | |
private static final String DEFAULT_NUM_WORKERS = "1"; | |
private static final String DEFAULT_MAX_WORKERS = "3"; | |
private static final String DEFAULT_TYPE_WORKERS = "n1-standard-1"; | |
private static final String DEFAULT_ZONE = "australia-southeast1-a"; | |
private static final String DEFAULT_WRITE_DISPOSITION = "truncate"; | |
private static final String DEFAULT_DETECT_SCHEMA = "true"; | |
public static void main(String[] args) throws Exception { | |
new BQTableCopyPipeline().copy(args); | |
} | |
private void copy(final String[] args) throws Exception { | |
ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); | |
Config config = mapper.readValue( | |
new File(getClass().getClassLoader().getResource("config.yaml").getFile()), | |
new TypeReference<Config>() { | |
}); | |
PipelineOptionsFactory.register(DataflowPipelineOptions.class); | |
DataflowPipelineOptions options = PipelineOptionsFactory | |
.fromArgs(args) | |
.as(DataflowPipelineOptions.class); | |
options.setProject(config.project); | |
options.setRunner(GCPHelpers.getRunnerClass(config.runner)); | |
LOG.info("Project set to '{}' and runner set to '{}'", config.project, config.runner); | |
config.copies.forEach(copy -> setupAndRunPipeline(options, copy)); | |
} | |
private void setupAndRunPipeline(final DataflowPipelineOptions options, | |
final Map<String, String> copy) { | |
LOG.info("Running a copy for '{}'", copy); | |
String sourceTable = checkNotNull(copy.get("source"), "Source table cannot be null"); | |
String targetTable = checkNotNull(copy.get("target"), "Target table cannot be null"); | |
int numWorkers = Integer.valueOf(copy.getOrDefault("numWorkers", DEFAULT_NUM_WORKERS)); | |
int maxNumWorkers = Integer.valueOf(copy.getOrDefault("maxNumWorkers", DEFAULT_MAX_WORKERS)); | |
boolean detectSchema = Boolean.valueOf(copy.getOrDefault("detectSchema", DEFAULT_DETECT_SCHEMA)); | |
String zone = copy.getOrDefault("zone", DEFAULT_ZONE); | |
String worker = copy.getOrDefault("workerMachineType", DEFAULT_TYPE_WORKERS); | |
WriteDisposition writeDisposition = GCPHelpers.getWriteDisposition(copy.getOrDefault("writeDisposition", DEFAULT_WRITE_DISPOSITION)); | |
String targetDatasetLocation = copy.getOrDefault("targetDatasetLocation", null); | |
options.setNumWorkers(numWorkers); | |
options.setMaxNumWorkers(maxNumWorkers); | |
options.setZone(zone); | |
options.setWorkerMachineType(worker); | |
TableSchema schema = null; //no schema is permitted | |
if (detectSchema) { | |
schema = GCPHelpers.getTableSchema(sourceTable); | |
} | |
runPipeline(options, schema, sourceTable, targetTable, targetDatasetLocation, writeDisposition); | |
} | |
private void runPipeline(final DataflowPipelineOptions options, | |
final TableSchema schema, | |
final String sourceTable, | |
final String targetTable, | |
final String targetDatasetLocation, | |
final WriteDisposition writeDisposition) { | |
String targetLocation = getTargetDatasetLocation(targetTable, targetDatasetLocation); | |
String sourceLocation = GCPHelpers.getDatasetLocation(sourceTable); | |
String exportBucket = format("%s_df_bqcopy_export_%s", options.getProject(), sourceLocation); | |
String importBucket = format("%s_df_bqcopy_import_%s", options.getProject(), targetLocation); | |
handleBucketCreation(exportBucket, sourceLocation); | |
handleBucketCreation(importBucket, targetLocation); | |
options.setTempLocation(format("gs://%s/tmp", exportBucket)); | |
options.setStagingLocation(format("gs://%s/jars", exportBucket)); | |
options.setJobName(format("bq-table-copy-%s-to-%s-%d", sourceLocation, targetLocation, currentTimeMillis())); | |
LOG.info("Running Dataflow pipeline with options '{}'", options); | |
Pipeline pipeline = Pipeline.create(options); | |
PCollection<TableRow> rows = pipeline.apply(format("Read: %s", sourceTable), BigQueryIO.readTableRows().from(sourceTable)); | |
if (schema != null) { | |
rows.apply(format("Write: %s", targetTable), BigQueryIO.writeTableRows() | |
.to(targetTable) | |
.withCreateDisposition(CREATE_IF_NEEDED) | |
.withWriteDisposition(writeDisposition) | |
.withSchema(schema) | |
.withCustomGcsTempLocation(StaticValueProvider.of((format("gs://%s", importBucket))))); | |
} else { | |
rows.apply(format("Write: %s", targetTable), BigQueryIO.writeTableRows() | |
.to(targetTable) | |
.withCreateDisposition(CREATE_NEVER) | |
.withWriteDisposition(writeDisposition) | |
.withCustomGcsTempLocation(StaticValueProvider.of((format("gs://%s", importBucket))))); | |
} | |
pipeline.run(); | |
} | |
private void handleBucketCreation(final String name, | |
final String location) { | |
try { | |
GCPHelpers.createGCSBucket(name, location); | |
} catch (StorageException e) { | |
if (e.getCode() != HttpStatus.SC_CONFLICT) { // 409 == bucket already exists. That's ok. | |
throw new IllegalStateException(e); | |
} | |
} | |
} | |
private String getTargetDatasetLocation(final String targetTable, | |
final String targetDatasetLocation) { | |
String location; | |
if (targetDatasetLocation == null) { | |
//target dataset/table should already exist in this case | |
try { | |
location = GCPHelpers.getDatasetLocation(targetTable); | |
} catch (RuntimeException e) { | |
throw new IllegalStateException("'targetDatasetLocation' wasn't specified in config, but it looks" + | |
" like the target dataset doesn't exist."); | |
} | |
} else { | |
//otherwise, try and create it for the user | |
location = targetDatasetLocation; | |
try { | |
GCPHelpers.createBQDataset(targetTable, targetDatasetLocation); | |
} catch (BigQueryException e) { | |
if (e.getCode() == HttpStatus.SC_CONFLICT) { // 409 == dataset already exists | |
throw new IllegalStateException( | |
format("'targetDatasetLocation' specified in config, but the dataset '%s' already exists", | |
targetTable)); | |
} else { | |
throw new IllegalStateException(e); | |
} | |
} | |
} | |
assert location != null; | |
return location; | |
} | |
private static class Config { | |
@JsonProperty | |
public List<Map<String, String>> copies; | |
@JsonProperty | |
public String project; | |
@JsonProperty | |
public String runner; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment