Created
November 5, 2019 04:36
-
-
Save richzw/10a9ad17a0fdad744601bf2617a16860 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) throws IOException { | |
//create scope list with DataFlow's scopes | |
Set<String> scopeList = new HashSet<>(); | |
scopeList.addAll(DataflowScopes.all()); | |
GoogleCredentials credential = null; | |
try { | |
String curDir = Paths.get(".").toAbsolutePath().normalize().toString(); | |
FileInputStream credFile = new FileInputStream(curDir + "/secrete.json"); | |
credential = GoogleCredentials.fromStream(credFile).createScoped(scopeList); | |
} catch (Exception ex) { | |
System.out.println("Catch exception on read credential file..."); | |
} | |
GCSToPubSubOptions options = PipelineOptionsFactory.fromArgs(args) | |
.withValidation() | |
.as(GCSToPubSubOptions.class); | |
options.setGcpCredential(credential); | |
options.setStreaming(true); | |
Pipeline pipeline = Pipeline.create(options); | |
String sourcePath = "gs://bucket-test"; | |
PCollection<String> lines = pipeline.apply("readDataFromGCS", | |
TextIO.read().from(sourcePath + "/fileprefix*.log.gz") | |
.watchForNewFiles(Duration.standardMinutes(2), Watch.Growth.never())); | |
PCollection<KV<String, Map<String, String>>> filter_event = lines.apply("ParseAndFilterFn", ParDo.of(new ParseAndFilterFn())); | |
PCollection<KV<String, Map<String, String>>> minute_window_events = filter_event.apply("MinuteFixwindow", | |
Window.<KV<String, Map<String, String>>>into(FixedWindows.of(Duration.standardMinutes(3))) | |
.triggering(AfterProcessingTime | |
.pastFirstElementInPane() | |
.plusDelayOf(Duration.standardMinutes(2))) | |
.discardingFiredPanes() | |
.withAllowedLateness(Duration.standardMinutes(1)) | |
); | |
minute_window_events.apply("GroupByUserId", Combine.perKey(new MaxFn())) | |
.apply("AssembleUserMsg", MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings())) | |
.via((KV<String, Map<String, String>> kv) -> | |
KV.of(String.format("uid:%s,le:%s,ts:%s", kv.getKey(), kv.getValue().get("le"), kv.getValue().get("ts")), | |
kv.getValue().get("ts")))) | |
.apply("ConvertToPubSubMsg", ParDo.of(new Stamp(null))) | |
.apply(new PubSubSink()); | |
// Execute the pipeline and wait until it finishes running. | |
pipeline.run().waitUntilFinish(); | |
} | |
private static class PubSubSink extends PTransform<PCollection<PubsubMessage>, PDone> { // Void | |
@Override | |
public PDone expand(PCollection<PubsubMessage> input) { | |
PubsubUnboundedSink sink = new PubsubUnboundedSink( | |
PubsubJsonClient.FACTORY, | |
ValueProvider.StaticValueProvider.of(TOPIC), | |
"timestamp", | |
"id", | |
5 | |
); | |
return input.apply(sink); | |
} | |
} | |
private static class SimpleStamp extends DoFn<KV<String, String>, PubsubMessage> { | |
@ProcessElement | |
public void processElement(@Element KV<String, String> elem, OutputReceiver<PubsubMessage> out) { | |
out.output(new PubsubMessage(elem.getKey().getBytes(StandardCharsets.UTF_8), new HashMap<>())); | |
} | |
} | |
private static class Stamp extends DoFn<KV<String, String>, PubsubMessage> { | |
private final Map<String, String> attributes; | |
private Stamp() { | |
this(ImmutableMap.of()); | |
} | |
private Stamp(Map<String, String> attributes) { | |
this.attributes = attributes; | |
} | |
@ProcessElement | |
public void processElement(@Element KV<String, String> elem, OutputReceiver<PubsubMessage> out) { | |
out.outputWithTimestamp(new PubsubMessage(elem.getKey().getBytes(StandardCharsets.UTF_8), attributes), | |
new Instant(elem.getValue())); | |
} | |
} | |
static class ParseAndFilterFn extends DoFn<String, KV<String, Map<String, String>>> { | |
@ProcessElement | |
public void processElement(@Element String element, OutputReceiver<KV<String, Map<String, String>>> out) { | |
try { | |
element = element.trim(); | |
HashMap m = gson.fromJson(element, HashMap.class); | |
String ts = String.valueOf(m.get("ts")); | |
String uid = (String) m.getOrDefault("uid", ""); | |
String le = Double.toString((Double) m.getOrDefault("le", 0)); | |
if (!uid.equals("") && !le.equals("0")) { | |
Instant instant = Instant.parse(ts); | |
Map<String, String> map = new HashMap<>(); | |
map.put("uid", uid); | |
map.put("ts", String.valueOf(instant.getMillis() / 1000)); | |
map.put("le", le); | |
out.output(KV.of(uid, map)); | |
} | |
} catch (Exception e) { | |
System.out.println("Parse json exception of ParseAndFilterFn:" + e.toString()); | |
} | |
} | |
} | |
static class MaxFn extends Combine.CombineFn<Map<String, String>, Map<String, String>, Map<String, String>> { | |
@Override | |
public Map<String, String> createAccumulator() { | |
return new HashMap<>(); | |
} | |
@Override | |
public Map<String, String> addInput(Map<String, String> mutableAccumulator, Map<String, String> input) { | |
int le = (int) Float.parseFloat((input.get("le"))); | |
if (le > (int) Float.parseFloat(mutableAccumulator.getOrDefault("le", "0"))) { | |
mutableAccumulator.put("le", input.get("le")); | |
mutableAccumulator.put("ts", input.get("ts")); | |
} | |
return mutableAccumulator; | |
} | |
@Override | |
public Map<String, String> mergeAccumulators(Iterable<Map<String, String>> accumulators) { | |
HashMap<String, String> m = new HashMap<>(); | |
for (Map<String, String> next : accumulators) { | |
String le = m.getOrDefault("le", "0"); | |
String next_le = next.getOrDefault("le", "0"); | |
if ((int) Float.parseFloat(le) < (int) Float.parseFloat(next_le)) { | |
m.put("le", next.get("le")); | |
m.put("ts", next.get("ts")); | |
} | |
} | |
return m; | |
} | |
@Override | |
public Map<String, String> extractOutput(Map<String, String> accumulator) { | |
return accumulator; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment