Last active
August 21, 2019 13:26
-
-
Save cherniag/16a9d9a88ccc3a959d8c32833c2eb134 to your computer and use it in GitHub Desktop.
Hystrix reactive command
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.netflix.hystrix.HystrixCommandGroupKey; | |
import com.netflix.hystrix.HystrixCommandKey; | |
import com.netflix.hystrix.HystrixObservableCommand; | |
import reactor.core.publisher.Mono; | |
import rx.Observable; | |
import rx.RxReactiveStreams; | |
class AsyncCommand extends HystrixObservableCommand<SourceAwareRestResponse> { | |
private final Mono<HttpResponse> adaptee; | |
private final Callable<T> fallbackProvider; | |
private final Predicate<Throwable> skipFallbackPredicate; | |
/** | |
* @param adaptee Mono | |
* @param commandGroupKey Used to group together multiple HystrixObservableCommand objects. | |
* * The HystrixCommandGroupKey is used to represent a common relationship between commands. For example, a library or team name, the system all related commands interace with, | |
* * common business purpose etc. | |
* @param commandKey The HystrixCommandKey is used to associate a HystrixObservableCommand with HystrixCircuitBreaker, HystrixCommandMetrics and other objects. | |
* Can be used to override override default properties | |
* @param fallbackProvider Used to generate fallback in case of error | |
* @param skipFallbackPredicate Used to decide should we use fallback or emit thrown exception | |
*/ | |
AsyncCommand(Mono<HttpResponse> adaptee, | |
HystrixCommandGroupKey commandGroupKey, | |
HystrixCommandKey commandKey, | |
Callable<T> fallbackProvider, | |
Predicate<Throwable> skipFallbackPredicate) { | |
super(Setter | |
.withGroupKey(commandGroupKey) | |
.andCommandKey(commandKey)); | |
this.adaptee = adaptee; | |
this.fallbackProvider = fallbackProvider; | |
this.skipFallbackPredicate = skipFallbackPredicate; | |
} | |
Mono<HttpResponse> execute() { | |
return Mono.from(RxReactiveStreams.toPublisher(observe())); | |
} | |
@Override | |
protected Observable<SourceAwareRestResponse> resumeWithFallback() { | |
return Observable.fromCallable(fallbackProvider); | |
} | |
@Override | |
protected Observable<HttpResponse> construct() { | |
return RxReactiveStreams.toObservable(adaptee); | |
} | |
/** | |
* Since there is no access to exception in fallback, we can skip fallback and emit thrown exception then process it later | |
*/ | |
@Override | |
protected boolean shouldNotBeWrapped(Throwable t) { | |
return skipFallbackPredicate.test(t); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Mono<HttpResponse> command = ... // non blocking http call | |
Mono<HttpResponse> wrapped = HystrixCommands | |
.from(command) | |
.setter( | |
HystrixObservableCommand.Setter | |
.withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupName)) // general | |
.andCommandKey(HystrixCommandKey.Factory.asKey(commandName)) // command specific, for example service discovery instance id | |
) | |
.fallback((e) -> { | |
log.info("Can not get inventory from {}", commandName, e); | |
return Mono.just(HttpResponse.error(e.getMessage(), commandName)); | |
}) | |
.commandName(commandName) | |
.toMono(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment