Last active
April 10, 2020 18:58
-
-
Save jbarotin/18fb9d6b164b3c0a79d5b91f9da32df1 to your computer and use it in GitHub Desktop.
"Full" implementation of "Scheduling messages to self" example Akka 2.6.4
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package s4e.scheduler; | |
import akka.actor.typed.ActorRef; | |
import akka.actor.typed.ActorSystem; | |
import akka.actor.typed.Behavior; | |
import akka.actor.typed.Terminated; | |
import akka.actor.typed.javadsl.AbstractBehavior; | |
import akka.actor.typed.javadsl.ActorContext; | |
import akka.actor.typed.javadsl.Behaviors; | |
import akka.actor.typed.javadsl.Receive; | |
import akka.actor.typed.javadsl.TimerScheduler; | |
import jdocs.akka.typed.IntroTest; | |
import java.io.IOException; | |
import java.time.Duration; | |
import java.util.ArrayList; | |
import java.util.Collections; | |
import java.util.List; | |
import java.util.Objects; | |
class TimerBot extends AbstractBehavior<Buncher.Batch> { | |
public static Behavior<Buncher.Batch> create() { | |
return Behaviors.setup(context -> new TimerBot(context)); | |
} | |
private TimerBot(ActorContext<Buncher.Batch> context) { | |
super(context); | |
} | |
@Override | |
public Receive<Buncher.Batch> createReceive() { | |
return newReceiveBuilder().onMessage(Buncher.Batch.class, this::onGreeted).build(); | |
} | |
private Behavior<Buncher.Batch> onGreeted(Buncher.Batch message) { | |
getContext().getLog().info("TimerBot Hello"); | |
return this; | |
} | |
} | |
class Buncher { | |
public interface Command {} | |
public static final class Batch { | |
private final List<Command> messages; | |
public Batch(List<Command> messages) { | |
this.messages = Collections.unmodifiableList(messages); | |
} | |
public List<Command> getMessages() { | |
return messages; | |
} | |
// #timer | |
@Override | |
public boolean equals(Object o) { | |
if (this == o) return true; | |
if (o == null || getClass() != o.getClass()) return false; | |
Batch batch = (Batch) o; | |
return Objects.equals(messages, batch.messages); | |
} | |
@Override | |
public int hashCode() { | |
return Objects.hash(messages); | |
} // #timer | |
} | |
public static final class ExcitingMessage implements Command { | |
public final String message; | |
public ExcitingMessage(String message) { | |
this.message = message; | |
} | |
} | |
private static final Object TIMER_KEY = new Object(); | |
private enum Timeout implements Command { | |
INSTANCE | |
} | |
public static Behavior<Command> create(ActorRef<Batch> target, Duration after, int maxSize) { | |
return Behaviors.withTimers(timers -> new Buncher(timers, target, after, maxSize).idle()); | |
} | |
private final TimerScheduler<Command> timers; | |
private final ActorRef<Batch> target; | |
private final Duration after; | |
private final int maxSize; | |
private Buncher(TimerScheduler<Command> timers, ActorRef<Batch> target, Duration after, int maxSize) { | |
this.timers = timers; | |
this.target = target; | |
this.after = after; | |
this.maxSize = maxSize; | |
} | |
private Behavior<Command> idle() { | |
return Behaviors.receive(Command.class) | |
.onMessage(Command.class, this::onIdleCommand) | |
.build(); | |
} | |
private Behavior<Command> onIdleCommand(Command message) { | |
timers.startSingleTimer(TIMER_KEY, Timeout.INSTANCE, after); | |
return Behaviors.setup(context -> new Active(context, message)); | |
} | |
private class Active extends AbstractBehavior<Command> { | |
private final List<Command> buffer = new ArrayList<>(); | |
Active(ActorContext<Command> context, Command firstCommand) { | |
super(context); | |
buffer.add(firstCommand); | |
} | |
@Override | |
public Receive<Command> createReceive() { | |
return newReceiveBuilder() | |
.onMessage(Timeout.class, message -> onTimeout()) | |
.onMessage(Command.class, this::onCommand) | |
.build(); | |
} | |
private Behavior<Command> onTimeout() { | |
target.tell(new Batch(buffer)); | |
return idle(); // switch to idle | |
} | |
private Behavior<Command> onCommand(Command message) { | |
buffer.add(message); | |
if (buffer.size() == maxSize) { | |
timers.cancel(TIMER_KEY); | |
target.tell(new Batch(buffer)); | |
return idle(); // switch to idle | |
} else { | |
return this; // stay Active | |
} | |
} | |
} | |
} | |
public class ExampleTimer { | |
public static Behavior<Void> create() { | |
return Behaviors.setup( | |
context -> { | |
ActorRef<Buncher.Batch> timerBot = context.spawn(TimerBot.create(), "timerBot"); | |
ActorRef<Buncher.Command> buncher = context.spawn(Buncher.create(timerBot,Duration.ofSeconds(60), 10), "buncher"); | |
context.watch(buncher); | |
buncher.tell(new Buncher.ExcitingMessage("toto1")); | |
buncher.tell(new Buncher.ExcitingMessage("toto2")); | |
buncher.tell(new Buncher.ExcitingMessage("toto3")); | |
return Behaviors.receive(Void.class) | |
.onSignal(Terminated.class, sig -> Behaviors.stopped()) | |
.build(); | |
}); | |
} | |
public static void main(String[] args) { | |
ActorSystem.create(ExampleTimer.create(), "ExampleTimer"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
It displays "TimerBot Hello" after 60s in only 169 lines of code :)