Skip to content

Instantly share code, notes, and snippets.

@richzw
Created November 5, 2019 04:36
Show Gist options
  • Save richzw/10a9ad17a0fdad744601bf2617a16860 to your computer and use it in GitHub Desktop.
Save richzw/10a9ad17a0fdad744601bf2617a16860 to your computer and use it in GitHub Desktop.
public static void main(String[] args) throws IOException {
//create scope list with DataFlow's scopes
Set<String> scopeList = new HashSet<>();
scopeList.addAll(DataflowScopes.all());
GoogleCredentials credential = null;
try {
String curDir = Paths.get(".").toAbsolutePath().normalize().toString();
FileInputStream credFile = new FileInputStream(curDir + "/secrete.json");
credential = GoogleCredentials.fromStream(credFile).createScoped(scopeList);
} catch (Exception ex) {
System.out.println("Catch exception on read credential file...");
}
GCSToPubSubOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(GCSToPubSubOptions.class);
options.setGcpCredential(credential);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
String sourcePath = "gs://bucket-test";
PCollection<String> lines = pipeline.apply("readDataFromGCS",
TextIO.read().from(sourcePath + "/fileprefix*.log.gz")
.watchForNewFiles(Duration.standardMinutes(2), Watch.Growth.never()));
PCollection<KV<String, Map<String, String>>> filter_event = lines.apply("ParseAndFilterFn", ParDo.of(new ParseAndFilterFn()));
PCollection<KV<String, Map<String, String>>> minute_window_events = filter_event.apply("MinuteFixwindow",
Window.<KV<String, Map<String, String>>>into(FixedWindows.of(Duration.standardMinutes(3)))
.triggering(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(2)))
.discardingFiredPanes()
.withAllowedLateness(Duration.standardMinutes(1))
);
minute_window_events.apply("GroupByUserId", Combine.perKey(new MaxFn()))
.apply("AssembleUserMsg", MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
.via((KV<String, Map<String, String>> kv) ->
KV.of(String.format("uid:%s,le:%s,ts:%s", kv.getKey(), kv.getValue().get("le"), kv.getValue().get("ts")),
kv.getValue().get("ts"))))
.apply("ConvertToPubSubMsg", ParDo.of(new Stamp(null)))
.apply(new PubSubSink());
// Execute the pipeline and wait until it finishes running.
pipeline.run().waitUntilFinish();
}
private static class PubSubSink extends PTransform<PCollection<PubsubMessage>, PDone> { // Void
@Override
public PDone expand(PCollection<PubsubMessage> input) {
PubsubUnboundedSink sink = new PubsubUnboundedSink(
PubsubJsonClient.FACTORY,
ValueProvider.StaticValueProvider.of(TOPIC),
"timestamp",
"id",
5
);
return input.apply(sink);
}
}
private static class SimpleStamp extends DoFn<KV<String, String>, PubsubMessage> {
@ProcessElement
public void processElement(@Element KV<String, String> elem, OutputReceiver<PubsubMessage> out) {
out.output(new PubsubMessage(elem.getKey().getBytes(StandardCharsets.UTF_8), new HashMap<>()));
}
}
private static class Stamp extends DoFn<KV<String, String>, PubsubMessage> {
private final Map<String, String> attributes;
private Stamp() {
this(ImmutableMap.of());
}
private Stamp(Map<String, String> attributes) {
this.attributes = attributes;
}
@ProcessElement
public void processElement(@Element KV<String, String> elem, OutputReceiver<PubsubMessage> out) {
out.outputWithTimestamp(new PubsubMessage(elem.getKey().getBytes(StandardCharsets.UTF_8), attributes),
new Instant(elem.getValue()));
}
}
static class ParseAndFilterFn extends DoFn<String, KV<String, Map<String, String>>> {
@ProcessElement
public void processElement(@Element String element, OutputReceiver<KV<String, Map<String, String>>> out) {
try {
element = element.trim();
HashMap m = gson.fromJson(element, HashMap.class);
String ts = String.valueOf(m.get("ts"));
String uid = (String) m.getOrDefault("uid", "");
String le = Double.toString((Double) m.getOrDefault("le", 0));
if (!uid.equals("") && !le.equals("0")) {
Instant instant = Instant.parse(ts);
Map<String, String> map = new HashMap<>();
map.put("uid", uid);
map.put("ts", String.valueOf(instant.getMillis() / 1000));
map.put("le", le);
out.output(KV.of(uid, map));
}
} catch (Exception e) {
System.out.println("Parse json exception of ParseAndFilterFn:" + e.toString());
}
}
}
static class MaxFn extends Combine.CombineFn<Map<String, String>, Map<String, String>, Map<String, String>> {
@Override
public Map<String, String> createAccumulator() {
return new HashMap<>();
}
@Override
public Map<String, String> addInput(Map<String, String> mutableAccumulator, Map<String, String> input) {
int le = (int) Float.parseFloat((input.get("le")));
if (le > (int) Float.parseFloat(mutableAccumulator.getOrDefault("le", "0"))) {
mutableAccumulator.put("le", input.get("le"));
mutableAccumulator.put("ts", input.get("ts"));
}
return mutableAccumulator;
}
@Override
public Map<String, String> mergeAccumulators(Iterable<Map<String, String>> accumulators) {
HashMap<String, String> m = new HashMap<>();
for (Map<String, String> next : accumulators) {
String le = m.getOrDefault("le", "0");
String next_le = next.getOrDefault("le", "0");
if ((int) Float.parseFloat(le) < (int) Float.parseFloat(next_le)) {
m.put("le", next.get("le"));
m.put("ts", next.get("ts"));
}
}
return m;
}
@Override
public Map<String, String> extractOutput(Map<String, String> accumulator) {
return accumulator;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment