Skip to content

Instantly share code, notes, and snippets.

@jbarotin
Last active April 10, 2020 18:58
Show Gist options
  • Save jbarotin/18fb9d6b164b3c0a79d5b91f9da32df1 to your computer and use it in GitHub Desktop.
Save jbarotin/18fb9d6b164b3c0a79d5b91f9da32df1 to your computer and use it in GitHub Desktop.
"Full" implementation of "Scheduling messages to self" example Akka 2.6.4
package s4e.scheduler;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorSystem;
import akka.actor.typed.Behavior;
import akka.actor.typed.Terminated;
import akka.actor.typed.javadsl.AbstractBehavior;
import akka.actor.typed.javadsl.ActorContext;
import akka.actor.typed.javadsl.Behaviors;
import akka.actor.typed.javadsl.Receive;
import akka.actor.typed.javadsl.TimerScheduler;
import jdocs.akka.typed.IntroTest;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
class TimerBot extends AbstractBehavior<Buncher.Batch> {
public static Behavior<Buncher.Batch> create() {
return Behaviors.setup(context -> new TimerBot(context));
}
private TimerBot(ActorContext<Buncher.Batch> context) {
super(context);
}
@Override
public Receive<Buncher.Batch> createReceive() {
return newReceiveBuilder().onMessage(Buncher.Batch.class, this::onGreeted).build();
}
private Behavior<Buncher.Batch> onGreeted(Buncher.Batch message) {
getContext().getLog().info("TimerBot Hello");
return this;
}
}
class Buncher {
public interface Command {}
public static final class Batch {
private final List<Command> messages;
public Batch(List<Command> messages) {
this.messages = Collections.unmodifiableList(messages);
}
public List<Command> getMessages() {
return messages;
}
// #timer
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Batch batch = (Batch) o;
return Objects.equals(messages, batch.messages);
}
@Override
public int hashCode() {
return Objects.hash(messages);
} // #timer
}
public static final class ExcitingMessage implements Command {
public final String message;
public ExcitingMessage(String message) {
this.message = message;
}
}
private static final Object TIMER_KEY = new Object();
private enum Timeout implements Command {
INSTANCE
}
public static Behavior<Command> create(ActorRef<Batch> target, Duration after, int maxSize) {
return Behaviors.withTimers(timers -> new Buncher(timers, target, after, maxSize).idle());
}
private final TimerScheduler<Command> timers;
private final ActorRef<Batch> target;
private final Duration after;
private final int maxSize;
private Buncher(TimerScheduler<Command> timers, ActorRef<Batch> target, Duration after, int maxSize) {
this.timers = timers;
this.target = target;
this.after = after;
this.maxSize = maxSize;
}
private Behavior<Command> idle() {
return Behaviors.receive(Command.class)
.onMessage(Command.class, this::onIdleCommand)
.build();
}
private Behavior<Command> onIdleCommand(Command message) {
timers.startSingleTimer(TIMER_KEY, Timeout.INSTANCE, after);
return Behaviors.setup(context -> new Active(context, message));
}
private class Active extends AbstractBehavior<Command> {
private final List<Command> buffer = new ArrayList<>();
Active(ActorContext<Command> context, Command firstCommand) {
super(context);
buffer.add(firstCommand);
}
@Override
public Receive<Command> createReceive() {
return newReceiveBuilder()
.onMessage(Timeout.class, message -> onTimeout())
.onMessage(Command.class, this::onCommand)
.build();
}
private Behavior<Command> onTimeout() {
target.tell(new Batch(buffer));
return idle(); // switch to idle
}
private Behavior<Command> onCommand(Command message) {
buffer.add(message);
if (buffer.size() == maxSize) {
timers.cancel(TIMER_KEY);
target.tell(new Batch(buffer));
return idle(); // switch to idle
} else {
return this; // stay Active
}
}
}
}
public class ExampleTimer {
public static Behavior<Void> create() {
return Behaviors.setup(
context -> {
ActorRef<Buncher.Batch> timerBot = context.spawn(TimerBot.create(), "timerBot");
ActorRef<Buncher.Command> buncher = context.spawn(Buncher.create(timerBot,Duration.ofSeconds(60), 10), "buncher");
context.watch(buncher);
buncher.tell(new Buncher.ExcitingMessage("toto1"));
buncher.tell(new Buncher.ExcitingMessage("toto2"));
buncher.tell(new Buncher.ExcitingMessage("toto3"));
return Behaviors.receive(Void.class)
.onSignal(Terminated.class, sig -> Behaviors.stopped())
.build();
});
}
public static void main(String[] args) {
ActorSystem.create(ExampleTimer.create(), "ExampleTimer");
}
}
@jbarotin
Copy link
Author

It displays "TimerBot Hello" after 60s in only 169 lines of code :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment