-
-
Save antonpus/4dc24a81929a6ed22b147f5d047f3bbb to your computer and use it in GitHub Desktop.
Rx based state machine
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.netflix.experiments.rx; | |
import java.util.HashMap; | |
import java.util.Map; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import rx.Observable; | |
import rx.Observable.OnSubscribe; | |
import rx.Subscriber; | |
import rx.functions.Action1; | |
import rx.functions.Action2; | |
import rx.subjects.PublishSubject; | |
public class StateMachine<T, E> implements Action1<E> { | |
private static final Logger LOG = LoggerFactory.getLogger(StateMachine.class); | |
public static class State<T, E> { | |
private String name; | |
private Action2<T, State<T, E>> enter; | |
private Action2<T, State<T, E>> exit; | |
private Map<E, State<T, E>> transitions = new HashMap<E, State<T, E>>(); | |
public State(String name) { | |
this.name = name; | |
} | |
public State<T, E> onEnter(Action2<T, State<T, E>> func) { | |
this.enter = func; | |
return this; | |
} | |
public State<T, E> onExit(Action2<T, State<T, E>> func) { | |
this.exit = func; | |
return this; | |
} | |
public void enter(T context) { | |
enter.call(context, this); | |
} | |
public void exit(T context) { | |
exit.call(context, this); | |
} | |
public State<T, E> transition(E event, State<T, E> state) { | |
transitions.put(event, state); | |
return this; | |
} | |
public State<T, E> next(E event) { | |
return transitions.get(event); | |
} | |
public String toString() { | |
return name; | |
} | |
} | |
private volatile State<T, E> state; | |
private final T context; | |
private final PublishSubject<E> events = PublishSubject.create(); | |
protected StateMachine(T context, State<T, E> initial) { | |
this.state = initial; | |
this.context = context; | |
} | |
public Observable<Void> connect() { | |
return Observable.create(new OnSubscribe<Void>() { | |
@Override | |
public void call(Subscriber<? super Void> sub) { | |
state.enter(context); | |
sub.add(events.collect(context, new Action2<T, E>() { | |
@Override | |
public void call(T context, E event) { | |
final State<T, E> next = state.next(event); | |
if (next != null) { | |
state.exit(context); | |
state = next; | |
next.enter(context); | |
} | |
else { | |
LOG.info("Invalid event : " + event); | |
} | |
} | |
}) | |
.subscribe()); | |
} | |
}); | |
} | |
@Override | |
public void call(E event) { | |
events.onNext(event); | |
} | |
public State<T, E> getState() { | |
return state; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.netflix.experiments.rx; | |
import org.junit.BeforeClass; | |
import org.junit.Test; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import rx.functions.Action2; | |
import com.netflix.experiments.rx.StateMachine.State; | |
public class StateMachineTest { | |
private static final Logger LOG = LoggerFactory.getLogger(StateMachineTest.class); | |
public static enum Event { | |
IDLE, | |
CONNECT, | |
CONNECTED, | |
FAILED, | |
UNQUARANTINE, | |
REMOVE | |
} | |
public static Action2<SomeContext, State<SomeContext, Event>> log(final String text) { | |
return new Action2<SomeContext, State<SomeContext, Event>>() { | |
@Override | |
public void call(SomeContext t1, State<SomeContext, Event> state) { | |
LOG.info("" + t1 + ":" + state + ":" + text); | |
} | |
}; | |
} | |
public static class SomeContext { | |
@Override | |
public String toString() { | |
return "Foo []"; | |
} | |
} | |
public static State<SomeContext, Event> IDLE = new State<SomeContext, Event>("IDLE"); | |
public static State<SomeContext, Event> CONNECTING = new State<SomeContext, Event>("CONNECTING"); | |
public static State<SomeContext, Event> CONNECTED = new State<SomeContext, Event>("CONNECTED"); | |
public static State<SomeContext, Event> QUARANTINED = new State<SomeContext, Event>("QUARANTINED"); | |
public static State<SomeContext, Event> REMOVED = new State<SomeContext, Event>("REMOVED"); | |
@BeforeClass | |
public static void beforeClass() { | |
IDLE | |
.onEnter(log("enter")) | |
.onExit(log("exit")) | |
.transition(Event.CONNECT, CONNECTING) | |
.transition(Event.REMOVE, REMOVED); | |
CONNECTING | |
.onEnter(log("enter")) | |
.onExit(log("exit")) | |
.transition(Event.CONNECTED, CONNECTED) | |
.transition(Event.FAILED, QUARANTINED) | |
.transition(Event.REMOVE, REMOVED); | |
CONNECTED | |
.onEnter(log("enter")) | |
.onExit(log("exit")) | |
.transition(Event.IDLE, IDLE) | |
.transition(Event.FAILED, QUARANTINED) | |
.transition(Event.REMOVE, REMOVED); | |
QUARANTINED | |
.onEnter(log("enter")) | |
.onExit(log("exit")) | |
.transition(Event.IDLE, IDLE) | |
.transition(Event.REMOVE, REMOVED); | |
REMOVED | |
.onEnter(log("enter")) | |
.onExit(log("exit")) | |
.transition(Event.CONNECT, CONNECTING); | |
} | |
@Test | |
public void test() { | |
StateMachine<SomeContext, Event> sm = new StateMachine<SomeContext, Event>(new SomeContext(), IDLE); | |
sm.connect().subscribe(); | |
sm.call(Event.CONNECT); | |
sm.call(Event.CONNECTED); | |
sm.call(Event.FAILED); | |
sm.call(Event.REMOVE); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment