Created
March 8, 2017 17:03
-
-
Save douglaz/e06c1b6dbe95a14407866165af50ad1e to your computer and use it in GitHub Desktop.
Partial example of an Akka FSM with well defined messages and states
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object PeriodDirectorActor { | |
// Events received from other actors or generated internally | |
object Events { | |
sealed abstract trait EventType | |
// Public events | |
case class Start(conf: FullConfiguration) extends EventType | |
case object Stop extends EventType | |
// Called by StreamReaders | |
case class ReaderWorkAvailable(work: CartView) extends EventType | |
case class ReaderWork(event: CartView) extends EventType | |
case object ReaderWorkUnavailable extends EventType // the requested work or the one previously advertised as available is unavailable now | |
case object ReaderStopped extends EventType | |
// Called by Workers | |
case object WorkerIsReadyForWork extends EventType | |
case object WorkRequiresCommit extends EventType | |
case object WorkFinished extends EventType | |
// Internal Events | |
case class ConsumerCreated(consumer: ConsumerConnectorApi, streams: List[KafkaByteIteratorApi]) extends EventType | |
case class ConsumerFailed(t: Throwable) extends EventType | |
case object CommitGraceTimeout extends EventType | |
case object RunningTimeout extends EventType | |
case object CommitSuccess extends EventType | |
case class CommitFailure(t: Throwable) extends EventType | |
} | |
// Possible states | |
object State { | |
sealed abstract trait StateType | |
case object Idle extends StateType | |
case object Starting extends StateType | |
case object Running extends StateType | |
case object WaitingCommit extends StateType | |
case object Stopping extends StateType | |
} | |
// Our internal data representation | |
object Data { | |
sealed abstract trait DataType | |
case object Uninitialized extends DataType | |
case class Initializing(conf: FullConfiguration) extends DataType | |
object ReaderState { | |
sealed abstract trait StateType | |
case object WithoutWork extends StateType | |
case class WithWork(cart: CartView) extends StateType | |
case object ReadingWork extends StateType | |
case object Stopping extends StateType | |
} | |
object WorkerState { | |
sealed abstract trait StateType | |
case object Idle extends StateType | |
case object ReadyToWork extends StateType | |
case class WithWork(cart: CartView) extends StateType | |
case class WaitingCommit(cart: CartView) extends StateType | |
} | |
type ReadersStateMap = Map[ActorRef, ReaderState.StateType] | |
type WorkersStateMap = Map[ActorRef, WorkerState.StateType] | |
case class Initialized(consumer: ConsumerConnectorApi, | |
committed: Boolean, | |
conf: FullConfiguration, | |
readers: ReadersStateMap, | |
workers: WorkersStateMap) extends DataType | |
} | |
def props(period: Period, programManager: ActorRef) = | |
Props(new PeriodDirectorActor(period, programManager)) | |
} | |
class PeriodDirectorActor(period: Period, programManager: ActorRef) extends LoggableFSM[State.StateType, Data.DataType] | |
with ActorLogging | |
with BlockingIOExecutionContext | |
with LogUnhandledEvents[State.StateType, Data.DataType] { | |
startWith(State.Idle, Data.Uninitialized) | |
when(State.Idle) { | |
case Event(Events.WorkerIsReadyForWork, _) => | |
log.debug(s"Received WorkerIsReadyForWork probably sent from a worker before dying, ignoring...") | |
lstay | |
case Event(Events.Start(conf), Data.Uninitialized) => | |
Telemetry.directorTryingStart(period) | |
log.info(s"Starting for period $period") | |
asyncCreateConsumer() | |
goto(State.Starting) using Data.Initializing(conf) | |
} | |
// This is just a snippet. The actual file is 500+ lines of code | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is a coordinator that read some data from Kafka readers and distribute it to workers. It also implements a sort of 2-phase commit.