Skip to content

Instantly share code, notes, and snippets.

@cherniag
Last active August 21, 2019 13:26
Show Gist options
  • Save cherniag/16a9d9a88ccc3a959d8c32833c2eb134 to your computer and use it in GitHub Desktop.
Save cherniag/16a9d9a88ccc3a959d8c32833c2eb134 to your computer and use it in GitHub Desktop.
Hystrix reactive command
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixObservableCommand;
import reactor.core.publisher.Mono;
import rx.Observable;
import rx.RxReactiveStreams;
class AsyncCommand extends HystrixObservableCommand<SourceAwareRestResponse> {
private final Mono<HttpResponse> adaptee;
private final Callable<T> fallbackProvider;
private final Predicate<Throwable> skipFallbackPredicate;
/**
* @param adaptee Mono
* @param commandGroupKey Used to group together multiple HystrixObservableCommand objects.
* * The HystrixCommandGroupKey is used to represent a common relationship between commands. For example, a library or team name, the system all related commands interace with,
* * common business purpose etc.
* @param commandKey The HystrixCommandKey is used to associate a HystrixObservableCommand with HystrixCircuitBreaker, HystrixCommandMetrics and other objects.
* Can be used to override override default properties
* @param fallbackProvider Used to generate fallback in case of error
* @param skipFallbackPredicate Used to decide should we use fallback or emit thrown exception
*/
AsyncCommand(Mono<HttpResponse> adaptee,
HystrixCommandGroupKey commandGroupKey,
HystrixCommandKey commandKey,
Callable<T> fallbackProvider,
Predicate<Throwable> skipFallbackPredicate) {
super(Setter
.withGroupKey(commandGroupKey)
.andCommandKey(commandKey));
this.adaptee = adaptee;
this.fallbackProvider = fallbackProvider;
this.skipFallbackPredicate = skipFallbackPredicate;
}
Mono<HttpResponse> execute() {
return Mono.from(RxReactiveStreams.toPublisher(observe()));
}
@Override
protected Observable<SourceAwareRestResponse> resumeWithFallback() {
return Observable.fromCallable(fallbackProvider);
}
@Override
protected Observable<HttpResponse> construct() {
return RxReactiveStreams.toObservable(adaptee);
}
/**
* Since there is no access to exception in fallback, we can skip fallback and emit thrown exception then process it later
*/
@Override
protected boolean shouldNotBeWrapped(Throwable t) {
return skipFallbackPredicate.test(t);
}
}
Mono<HttpResponse> command = ... // non blocking http call
Mono<HttpResponse> wrapped = HystrixCommands
.from(command)
.setter(
HystrixObservableCommand.Setter
.withGroupKey(HystrixCommandGroupKey.Factory.asKey(groupName)) // general
.andCommandKey(HystrixCommandKey.Factory.asKey(commandName)) // command specific, for example service discovery instance id
)
.fallback((e) -> {
log.info("Can not get inventory from {}", commandName, e);
return Mono.just(HttpResponse.error(e.getMessage(), commandName));
})
.commandName(commandName)
.toMono();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment