Last active
August 29, 2015 14:22
-
-
Save gjesse/944efd001ee9b7e4abd6 to your computer and use it in GitHub Desktop.
using RxJava to implement downtime alarms
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* as long as new Observations are added to _lastSeen, we | |
* will emit CLEARED events. If interval passes without any | |
* observation, a stream of BREACH events will be emitted | |
* until such time we see more data incoming | |
**/ | |
PublishSubject<Observation> lastSeen = PublishSubject.create(); | |
// dump incoming events in to _lastSeen.onNext() | |
_lastSeen = new SerializedObserver<>(lastSeen); | |
Observable<Observation> share = lastSeen.share(); | |
share | |
// for each observation emit an OK state downstream | |
.doOnNext(obs -> emitState(clear(obs))) | |
// this only passes through an event if we haven't seen one in the timeframe | |
.debounce(alarm.interval(), TimeUnit.SECONDS) | |
.flatMap(obs -> | |
// this maps the breach event to a repeating observable | |
// of BREACHED events, either 10 or until we hear from the main | |
// stream again | |
Observable.timer(0, alarm.interval(), TimeUnit.SECONDS) | |
.map(i -> breached(obs)) | |
.take(10) | |
.takeUntil(share) | |
) | |
.subscribe(state -> { | |
LOG.info("DownTime alarm triggered: {}", state); | |
this.emitState(state); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment