Created
October 12, 2015 19:36
-
-
Save zbjornson/ac6a378129f1675ca5ff to your computer and use it in GitHub Desktop.
Running arbitrary jobs in parallel on Google Dataflow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package demo.pipeline; | |
import java.io.File; | |
import java.io.IOException; | |
import java.io.PrintWriter; | |
import java.io.Serializable; | |
import java.text.SimpleDateFormat; | |
import java.util.Date; | |
import java.util.HashSet; | |
import java.util.LinkedList; | |
import java.util.List; | |
import java.util.UUID; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import com.google.cloud.dataflow.sdk.Pipeline; | |
import com.google.cloud.dataflow.sdk.coders.SerializableCoder; | |
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder; | |
import com.google.cloud.dataflow.sdk.coders.VoidCoder; | |
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions; | |
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; | |
import com.google.cloud.dataflow.sdk.transforms.Create; | |
import com.google.cloud.dataflow.sdk.transforms.MapElements; | |
import com.google.cloud.dataflow.sdk.transforms.SimpleFunction; | |
import com.google.cloud.dataflow.sdk.util.GcsUtil; | |
import com.google.cloud.dataflow.sdk.util.GcsUtil.GcsUtilFactory; | |
import com.google.cloud.dataflow.sdk.util.gcsfs.GcsPath; | |
@SuppressWarnings("serial") | |
public class StarterPipeline2 { | |
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class); | |
private static final File TEMP_PATH = new File("/dataflow/logs/taskrunner/harness/"); | |
private static final int QUOTA = 90; // CPUs | |
public static void main(String[] args) { | |
PipelineOptionsFactory.Builder opts = PipelineOptionsFactory.fromArgs(args) | |
.withValidation(); | |
GcsUtil gcsUtil = new GcsUtilFactory().create(opts.create()); | |
try { | |
// List files in bucket | |
List<GcsPath> bucketListing = gcsUtil.expand(GcsPath.fromUri("gs://my-bucket/*")); | |
// GcsPath cannot be serialized directly. Make a list of Strings. | |
List<String> files = new LinkedList<String>(); | |
for (GcsPath path : bucketListing) { | |
files.add(path.toString()); | |
} | |
// Use one worker per file, up to the quota. | |
DataflowPipelineOptions dfopts = opts.as(DataflowPipelineOptions.class); | |
dfopts.setNumWorkers(Math.min(files.size(), QUOTA)); | |
dfopts.setWorkerMachineType("n1-standard-1"); | |
Pipeline p = Pipeline.create(dfopts); | |
p.apply(Create.of(files).withCoder(StringUtf8Coder.of())) | |
.apply(MapElements.via(new Processor())) | |
.setCoder(VoidCoder.of()); | |
p.run(); | |
} catch (IOException e) { | |
e.printStackTrace(); | |
} | |
} | |
static File getTempFile() { | |
String uuid = UUID.randomUUID().toString(); | |
return new File(TEMP_PATH, uuid); | |
} | |
static void exec(String cmd) throws InterruptedException, IOException { | |
LOG.info("Running: " + cmd); | |
Process proc = new ProcessBuilder().inheritIO().command(cmd.split(" ")).start(); | |
try { | |
int exitVal = proc.waitFor(); | |
LOG.info("Command (" + cmd + ") exited with code " + exitVal); | |
} catch (InterruptedException e) { | |
LOG.error("Command (" + cmd + ") failed"); | |
throw e; | |
} | |
} | |
static class Processor extends SimpleFunction<String, Void> { | |
@Override | |
public Void apply(String filePath) { | |
try { | |
File tempDir = StarterPipeline.getTempFile(); | |
tempDir.mkdirs(); | |
// Copy file to <temp>/input. Could also use GscUtil.open to get a SeekableByteStream. | |
File inputDir = new File(tempDir, "input"); | |
inputDir.mkdirs(); | |
String cmd = String.format("gsutil -m cp %s %s/", filePath, inputDir.getPath()); | |
StarterPipeline.exec(cmd); | |
File outputDir = new File(tempDir, "output"); | |
outputDir.mkdirs(); | |
// | |
// Do work here on the file in <temp>/input, writing to <temp>/output | |
// | |
cmd = String.format("gsutil -m cp %s/* gs://my-bucket/output/", outputDir.getPath(), filePath); | |
StarterPipeline.exec(cmd); | |
} catch (IOException | InterruptedException e) { | |
e.printStackTrace(); | |
} | |
return null; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Context: http://stackoverflow.com/questions/33046724/parallel-file-processing-using-cloud-services