- Mutiny provides two main classes: Uni and Multi.2 Uni represents an asynchronous action or operation.
- It can emit a single item or failure, if the represented action fails. Multi represents a stream of items.
- It can convey multiple items, as well as a terminal failure or completion event
Uni<UserProfile> uni = users.getUserByName(name);
return uni
.onItem().transform(user -> user.name)
.onFailure().recoverWithItem("anonymous");
- Multi can emit 0, 1, n, or an infinite number of items.
- It can also emit a failure, which is a terminal event. Finally, when there are no more items to emit,
- Multi emits the completion event. As a result, the API is slightly different.
- Let’s now imagine we need all the users.
Multi<UserProfile> users = this.users.getAllUsers();
return users
.onItem().transform(user -> user.name);
Multi<UserProfile> multi = users.getAllUsers();
multi
.onItem().transform(user -> user.name.toLowerCase())
.select().where(name -> name.startsWith("l"))
.collect().asList()
.subscribe().with(
list -> System.out.println("User names starting with `l`" + list)
);
Uni and Multi are lazy evaluation. Need to subscribe with @Blocking
Uni<UserProfile> uni = users.getUserByName("leia");
Multi<UserProfile> multi = users.getAllUsers();
uni.subscribe().with(
user -> System.out.println("User is " + user.name),
failure -> System.out.println("D'oh! " + failure)
);
multi.subscribe().with(
user -> System.out.println("User is " + user.name),
failure -> System.out.println("D'oh! " + failure),
() -> System.out.println("No more user")
);
in Quarkus http and messaging takes care of subscibing for you.
Backpressure a.k.a. onOverflow group
public Multi<Product> getRecommendations() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.onOverflow().drop()
.onItem().transformToUniAndConcatenate(
x -> products.getRecommendedProduct());
}
Once you have a Uni or Multi instance, it’s natural to observe the events emitted by these instances. For each type of event, an invoke method is called when it sees a matching event; onSubscribe/onCancellation/onItem/onFailure/onCompletion Groups
multi
.onSubscribe().invoke(sub -> System.out.println("Subscribed!"))
.onCancellation().invoke(() -> System.out.println("Cancelled"))
.onItem().invoke(s -> System.out.println("Item: " + s))
.onFailure().invoke(f -> System.out.println("Failure: " + f))
.onCompletion().invoke(() -> System.out.println("Completed!"))
.subscribe().with(
item -> System.out.println("Received: " + item)
);
transformmethod is similar tomapmethod of streams api.
Multi<String> transformed = multi
.onItem().transform(String::toUpperCase)
.onFailure().transform(MyBusinessException::new);
Unlike
invoke,transformproduces a new event. It invokes the passed function and sends the result to the downstream subscriber. The synchronous nature oftransformis important. After receiving the event,transformcalls the transformation logic and emits the result downstream. If the transformation logic takes a long time to complete, transform waits until the logic terminates. So, use transform when the transformation is fast enough.
products.getAllProducts()
.onItem().transform(p -> captializeAllFirstLetter(p.name))
.onItem().transform(ProductModel::new);
uni
.onItem().transformToUni(item -> callMyRemoteService(item))
.subscribe().with(s -> System.out.println("Received: " + s));
uni
.onItem().transformToMulti(s -> getAMulti(s))
.subscribe().with(
s -> System.out.println("Received item: " + s),
() -> System.out.println("Done!")
);
transform to Multi or Uni to keep things on IO thread and execute asynchronously
users.getUserByName(username)
.onItem().transformToMulti(user -> orders.getOrderForUser(user));
users.getAllUsers().onItem().transformToMultiAndConcatenate(user -> orders.getOrderForUser(user));
users.getAllUsers().onItem().transformToMultiAndMerge(user -> orders.getOrderForUser(user));
users.createUser(name)
.onItem().transform(id -> "New User " + name + " inserted")
.onFailure().recoverWithItem(
failure -> "User not inserted: " + failure.getMessage());
While a failure is an event, it’s a terminal one. That’s not a problem if you are dealing with Uni; you won’t get the item, just the failure. So, the Uni would replace the failure with the fallback item. With Multi, you won’t get any more items after the failure. The recovery emits the fallback item followed by the completion event. Another common possibility is to retry.
atMost: max retry attempts to subscribe to upstreamwithBackOffdelay between attempts.
users.createUser(name)
.onItem().transform(id -> "New User " + name + " inserted")
.onFailure().retry()
.withBackOff(Duration.ofSeconds(3))
.atMost(3);
Refer The
onFailuregroup methods which contains a lot more possibilities.
Uni<UserProfile> uni1 = users.getRandomUser();
Uni<Product> uni2 = products.getRecommendedProduct();
Uni.combine().all().unis(uni1, uni2).asTuple()
.onItem().transform(tuple -> "Hello " + tuple.getItem1().name +
", we recommend you " + tuple.getItem2().name);
Multi<UserProfile> u = Multi.createFrom().ticks()
.every(Duration.ofSeconds(1)).onOverflow().drop()
.onItem().transformToUniAndConcatenate(x -> users.getRandomUser());
Multi<Product> p = Multi.createFrom().ticks()
.every(Duration.ofSeconds(1)).onOverflow().drop()
.onItem().transformToUniAndConcatenate(x -> products.getRecommendedProduct());
Multi.createBy().combining()
.streams(u, p).asTuple().onItem()
.transform(tuple -> "Hello " + tuple.getItem1().name + ", we recommend you " + tuple.getItem2().name);
select.whereandselect.when(select.whenis async )
getAllOrders()
.select().where(order -> order.products.size() > 3);
getAllOrders()
.select().when(order ->
users.getUserByName(username)
.onItem().transform(u -> u.name.equalsIgnoreCase(username))
);
orders.getAllOrders()
.onItem().transformToIterable(order -> order.products)
.select().distinct();
Finally, when dealing with bounded Multi, you may want to accumulate the items into a list or a collection. The resulting structure is emitted when the Multi completes.
Uni<List<Product>> ulp = getAllOrderedProducts().collect().asList();here getAllOrderedProducts returns a Multi which is stored and converted to a list for consumption in one go.
Uni<List<String>> itemsAsList = multi.collect().asList();
Uni<Map<String, String>> itemsAsMap = multi.collect().asMap(item ->
getKeyForItem(item));
Uni<Long> count = multi.collect().with(Collectors.counting());
The
transformToUni&transformToMultioperations are generally calledflatMapin traditionalreactive programming libraries.transformToMultiAndConcatenateis calledconcatMapin traditionalreactive programming libraries.transformToMultiAndMergeis generally namedflatMap.


