- Mutiny provides two main classes: Uni and Multi.2 Uni represents an asynchronous action or operation.
- It can emit a single item or failure, if the represented action fails. Multi represents a stream of items.
- It can convey multiple items, as well as a terminal failure or completion event
Uni<UserProfile> uni = users.getUserByName(name);
return uni
.onItem().transform(user -> user.name)
.onFailure().recoverWithItem("anonymous");
- Multi can emit 0, 1, n, or an infinite number of items.
- It can also emit a failure, which is a terminal event. Finally, when there are no more items to emit,
- Multi emits the completion event. As a result, the API is slightly different.
- Let’s now imagine we need all the users.
Multi<UserProfile> users = this.users.getAllUsers();
return users
.onItem().transform(user -> user.name);
Multi<UserProfile> multi = users.getAllUsers();
multi
.onItem().transform(user -> user.name.toLowerCase())
.select().where(name -> name.startsWith("l"))
.collect().asList()
.subscribe().with(
list -> System.out.println("User names starting with `l`" + list)
);
Uni
and Multi
are lazy evaluation. Need to subscribe with @Blocking
Uni<UserProfile> uni = users.getUserByName("leia");
Multi<UserProfile> multi = users.getAllUsers();
uni.subscribe().with(
user -> System.out.println("User is " + user.name),
failure -> System.out.println("D'oh! " + failure)
);
multi.subscribe().with(
user -> System.out.println("User is " + user.name),
failure -> System.out.println("D'oh! " + failure),
() -> System.out.println("No more user")
);
in Quarkus http and messaging takes care of subscibing for you.
Backpressure a.k.a. onOverflow group
public Multi<Product> getRecommendations() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
.onOverflow().drop()
.onItem().transformToUniAndConcatenate(
x -> products.getRecommendedProduct());
}
Once you have a Uni or Multi instance, it’s natural to observe the events emitted by these instances. For each type of event, an invoke method is called when it sees a matching event; onSubscribe/onCancellation/onItem/onFailure/onCompletion Groups
multi
.onSubscribe().invoke(sub -> System.out.println("Subscribed!"))
.onCancellation().invoke(() -> System.out.println("Cancelled"))
.onItem().invoke(s -> System.out.println("Item: " + s))
.onFailure().invoke(f -> System.out.println("Failure: " + f))
.onCompletion().invoke(() -> System.out.println("Completed!"))
.subscribe().with(
item -> System.out.println("Received: " + item)
);
transform
method is similar tomap
method of streams api.
Multi<String> transformed = multi
.onItem().transform(String::toUpperCase)
.onFailure().transform(MyBusinessException::new);
Unlike
invoke
,transform
produces a new event. It invokes the passed function and sends the result to the downstream subscriber. The synchronous nature oftransform
is important. After receiving the event,transform
calls the transformation logic and emits the result downstream. If the transformation logic takes a long time to complete, transform waits until the logic terminates. So, use transform when the transformation is fast enough.
products.getAllProducts()
.onItem().transform(p -> captializeAllFirstLetter(p.name))
.onItem().transform(ProductModel::new);
uni
.onItem().transformToUni(item -> callMyRemoteService(item))
.subscribe().with(s -> System.out.println("Received: " + s));
uni
.onItem().transformToMulti(s -> getAMulti(s))
.subscribe().with(
s -> System.out.println("Received item: " + s),
() -> System.out.println("Done!")
);
transform to Multi or Uni to keep things on IO thread and execute asynchronously
users.getUserByName(username)
.onItem().transformToMulti(user -> orders.getOrderForUser(user));
users.getAllUsers().onItem().transformToMultiAndConcatenate(user -> orders.getOrderForUser(user));
users.getAllUsers().onItem().transformToMultiAndMerge(user -> orders.getOrderForUser(user));
users.createUser(name)
.onItem().transform(id -> "New User " + name + " inserted")
.onFailure().recoverWithItem(
failure -> "User not inserted: " + failure.getMessage());
While a failure is an event, it’s a terminal one. That’s not a problem if you are dealing with Uni; you won’t get the item, just the failure. So, the Uni would replace the failure with the fallback item. With Multi, you won’t get any more items after the failure. The recovery emits the fallback item followed by the completion event. Another common possibility is to retry.
atMost
: max retry attempts to subscribe to upstreamwithBackOff
delay between attempts.
users.createUser(name)
.onItem().transform(id -> "New User " + name + " inserted")
.onFailure().retry()
.withBackOff(Duration.ofSeconds(3))
.atMost(3);
Refer The
onFailure
group methods which contains a lot more possibilities.
Uni<UserProfile> uni1 = users.getRandomUser();
Uni<Product> uni2 = products.getRecommendedProduct();
Uni.combine().all().unis(uni1, uni2).asTuple()
.onItem().transform(tuple -> "Hello " + tuple.getItem1().name +
", we recommend you " + tuple.getItem2().name);
Multi<UserProfile> u = Multi.createFrom().ticks()
.every(Duration.ofSeconds(1)).onOverflow().drop()
.onItem().transformToUniAndConcatenate(x -> users.getRandomUser());
Multi<Product> p = Multi.createFrom().ticks()
.every(Duration.ofSeconds(1)).onOverflow().drop()
.onItem().transformToUniAndConcatenate(x -> products.getRecommendedProduct());
Multi.createBy().combining()
.streams(u, p).asTuple().onItem()
.transform(tuple -> "Hello " + tuple.getItem1().name + ", we recommend you " + tuple.getItem2().name);
select.where
andselect.when
(select.when
is async )
getAllOrders()
.select().where(order -> order.products.size() > 3);
getAllOrders()
.select().when(order ->
users.getUserByName(username)
.onItem().transform(u -> u.name.equalsIgnoreCase(username))
);
orders.getAllOrders()
.onItem().transformToIterable(order -> order.products)
.select().distinct();
Finally, when dealing with bounded Multi, you may want to accumulate the items into a list or a collection. The resulting structure is emitted when the Multi completes.
Uni<List<Product>> ulp = getAllOrderedProducts().collect().asList();
here getAllOrderedProducts returns a Multi which is stored and converted to a list for consumption in one go.
Uni<List<String>> itemsAsList = multi.collect().asList();
Uni<Map<String, String>> itemsAsMap = multi.collect().asMap(item ->
getKeyForItem(item));
Uni<Long> count = multi.collect().with(Collectors.counting());
The
transformToUni
&transformToMulti
operations are generally calledflatMap
in traditionalreactive programming libraries
.transformToMultiAndConcatenate
is calledconcatMap
in traditionalreactive programming libraries
.transformToMultiAndMerge
is generally namedflatMap
.