Skip to content

Instantly share code, notes, and snippets.

@frant-hartm
Created December 1, 2022 10:08
Show Gist options
  • Save frant-hartm/33cacf141a02e4020dc38673889e6e03 to your computer and use it in GitHub Desktop.
Save frant-hartm/33cacf141a02e4020dc38673889e6e03 to your computer and use it in GitHub Desktop.
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.jet.JetService;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.aggregate.AggregateOperations;
import com.hazelcast.jet.pipeline.BatchStage;
import com.hazelcast.jet.pipeline.Pipeline;
import com.hazelcast.jet.pipeline.Sinks;
import com.hazelcast.jet.pipeline.file.FileFormat;
import com.hazelcast.jet.pipeline.file.FileSources;
import java.io.File;
import java.util.concurrent.atomic.AtomicLong;
import static com.hazelcast.jet.aggregate.AggregateOperations.summingLong;
public class Calories {
public static void main(String[] args) {
HazelcastInstance hz = Hazelcast.bootstrappedInstance();
JetService jet = hz.getJet();
Pipeline p = Pipeline.create();
BatchStage<Long> caloriesByElfs =
p.readFrom(
FileSources.files(new File("src/main/resources/day/1/").getAbsolutePath())
.glob("input")
.format(FileFormat.lines())
.build()
).setLocalParallelism(1)
.mapStateful(AtomicLong::new, (state, s) -> {
if (s.isBlank()) {
long calories = state.get();
state.set(0);
return calories;
} else {
state.addAndGet(Long.parseLong(s));
return null;
}
});
caloriesByElfs
.aggregate(AggregateOperations.maxBy(Long::compare))
.writeTo(Sinks.logger(calories -> "Elf carrying most calories carries " + calories + "."));
caloriesByElfs
.aggregate(AggregateOperations.topN(3, Long::compareTo))
.flatMap(Traversers::traverseIterable)
.aggregate(summingLong(l -> l))
.writeTo(Sinks.logger(calories -> "3 elfs carrying most calories carry " + calories + "."));
jet.newJob(p).join();
hz.shutdown();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment