-
-
Save yevhen/a2c4bd56eb2602021c7293330f95e63b to your computer and use it in GitHub Desktop.
/* | |
This represents simple durable task which do some work in background. | |
The task is automatically saved after every successful transition. | |
It is modeled as workflow with the following transitions: | |
1. Initial -> Start -> Started | |
Accept user request and start preparation | |
2. Started -> Prepared -> Running | |
On activation of Started it schedules a one-off timer to delay actual execution of Prepare | |
to finish previous user request and return control to the caller (Orleans is RPC, so it's the hack) | |
3. Running -> Executed -> Running | |
Running -> Completed -> Completed | |
While running it slices the work and sends message to self after every slice to checkpoint the state | |
(ie last cursor position in batch). Once it completes batch it fires Completed | |
There also 2 superstates: Active and Inactive. When become Active durable reminder is registered to keep actor alive | |
in case of restarts or crashes, which is unregistered in OnUnbecome. Started, Running, and Failed - are Active states. | |
When Failed actor need to periodically notify interested parties to resolve the problem. Any failure will switch actor to Failed. | |
*/ | |
public class Task : Actor<TaskData> | |
{ | |
... | |
[Behavior] void Active() | |
{ | |
this.OnBecome(() => Reminders.Register("keepalive", due: TimeSpan.FromMinutes(1), period: TimeSpan.FromMinutes(1))); | |
this.OnUnbecome(() => Reminders.Unregister("keepalive")); | |
this.OnReceive<Completed>(_ => this.Become(Completed)); | |
this.OnReceive<Cancel>(_ => this.Become(Completed)); | |
this.OnReceive<Failed>(async x => | |
{ | |
Data.LastError = x.Exception; | |
await this.Become(Failed); | |
}); | |
this.OnReminder("keepalive", () => {}); | |
} | |
[Behavior] void Inactive() | |
{ | |
this.OnReminder(id => Reminders.Unregister(id)); | |
} | |
[Behavior] void Initial() | |
{ | |
this.Super(Inactive); | |
this.OnReceive<Start>(async x => | |
{ | |
Data.Command = x.Command; | |
Data.Initiator = x.UserId; | |
await this.Become(Started); | |
}); | |
} | |
[Behavior] void Started() | |
{ | |
this.Super(Active); | |
this.OnActivate(()=> Timers.Register("prepare", TimeSpan.FromSeconds(1), () => Try(Prepare))); | |
this.OnReceive<Prepared>(async x => | |
{ | |
// change some state | |
await this.Become(Running); | |
}); | |
async Task Prepare() | |
{ | |
// do some preparation | |
await this.Fire(new Prepared {...}); | |
} | |
} | |
[Behavior] public void Running() | |
{ | |
this.Super(Active); | |
this.OnBecome(() => Notify(TaskStatus.Running)); | |
this.OnActivate(() => Timers.Register("execute", TimeSpan.FromSeconds(1), TimeSpan.FromMilliseconds(1), () => Try(Execute))); | |
this.OnDeactivate(() => Timers.Unregister("execute")); | |
this.OnReceive<Executed>(async x => | |
{ | |
// change some state | |
await Save(); // checkpoint | |
await Notify(TaskStatus.Running); | |
}); | |
async Task Execute() | |
{ | |
bool completed = ...; | |
if (completed) | |
{ | |
await this.Fire(new Completed()); | |
return; | |
} | |
// do work, loop | |
await this.Fire(new Executed {lastSlice}); | |
} | |
} | |
[Behavior] new void Failed() | |
{ | |
this.Super(Active); | |
this.OnBecome(()=> Notify(TaskStatus.Failed)); | |
this.OnActivate(() => Timers.Register("err", TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(10), () => Try(NotifyError))); | |
this.OnDeactivate(() => Timers.Unregister("err")); | |
this.OnReceive<Retry>(_ => this.Become(Running)); | |
this.OnReceive<Scream>(_ => Notify(TaskStatus.Failed)); | |
Task NotifyError() => this.Fire(new Scream()); | |
} | |
[Behavior] new void Completed() | |
{ | |
this.Super(Inactive); | |
this.OnBecome(()=> Notify(TaskStatus.Completed)); | |
} | |
async Task Notify(TaskStatus status) => {} // notify interested parties | |
} |
In Akka, keep alive reminders are not necessary: contrary, to GC actor automatically after some inactivity period, you'll need to Context.SetReceiveTimeout(timeout) and then handle ReceiveTimeout message.
Same in Orleans. I can set infinite GC period to keep it in-mem forever, but this is more about node crashes and restarts, in such cases someone need to respawn a task. This could be solved with dedicated task storage and node bootstrapper script. ReminderService in Orleans is built-in and gives you a simple and robust way to resurrect actors in case of node failures/restarts (think, durable supervisor).
Building custom Become version which persists current state and behavior or become switch, and recreates it during actor's PreStart call.
I see. Basically, fallback to pattern matching and no support for transition/activation callbacks.
PersistentFSM doesn't have hooks like OnBecome/OnUnbecome (AFAIK).
Having those is really handy, telling from experience ))
It doesn't expose async/await API. However you can reach it by wrapping your async lambdas with RunTask(Func) - it freezes current actor's mailbox, making it work effectively like non-reentrant actor.
I don't allow to switch behaviors from timer (ie interleaving). You need to send msg to Self to become when running from timer callback and FSM-actors should be non-reentrant, otherwise it's impossible to make any guarantees about transitions, order of callbacks and state mutations.
In Akka, keep alive reminders are not necessary: contrary, to GC actor automatically after some inactivity period, you'll need to
Context.SetReceiveTimeout(timeout)
and then handleReceiveTimeout
message.Regarding building FSM, I see two ways here:
Building custom
Become
version which persists current state and behavior or become switch, and recreates it during actor'sPreStart
call. Something like:Using PersistenceFSM
PersistenceFSM is a special type of actor having
FSM
-like API but with all state transitions being eventsourced. I'm not great fan of its API, but we're still working on making it more approachable. Two things to note here:RunTask(Func<Task>)
- it freezes current actor's mailbox, making it work effectively like non-reentrant actor.