Skip to content

Instantly share code, notes, and snippets.

@debu999
Last active July 5, 2024 13:41
Show Gist options
  • Save debu999/e2f7d9363780845d3c9027982b4a9ec1 to your computer and use it in GitHub Desktop.
Save debu999/e2f7d9363780845d3c9027982b4a9ec1 to your computer and use it in GitHub Desktop.
Reactive Programming - Java with Mutiny
  • Mutiny provides two main classes: Uni and Multi.2 Uni represents an asynchronous action or operation.
  • It can emit a single item or failure, if the represented action fails. Multi represents a stream of items.
  • It can convey multiple items, as well as a terminal failure or completion event
Uni<UserProfile> uni = users.getUserByName(name);
return uni
        .onItem().transform(user -> user.name)
        .onFailure().recoverWithItem("anonymous");
  • Multi can emit 0, 1, n, or an infinite number of items.
  • It can also emit a failure, which is a terminal event. Finally, when there are no more items to emit,
  • Multi emits the completion event. As a result, the API is slightly different.
  • Let’s now imagine we need all the users.
Multi<UserProfile> users = this.users.getAllUsers();
return users
        .onItem().transform(user -> user.name);
        
Multi<UserProfile> multi = users.getAllUsers();
multi
        .onItem().transform(user -> user.name.toLowerCase())
        .select().where(name -> name.startsWith("l"))
        .collect().asList()
        .subscribe().with(
                list -> System.out.println("User names starting with `l`" + list)
);

Screenshot 2022-09-20 at 6 26 28 AM

Uni and Multi are lazy evaluation. Need to subscribe with @Blocking

Uni<UserProfile> uni = users.getUserByName("leia");
Multi<UserProfile> multi = users.getAllUsers();

uni.subscribe().with(
        user -> System.out.println("User is " + user.name),
        failure -> System.out.println("D'oh! " + failure)
);

multi.subscribe().with(
        user -> System.out.println("User is " + user.name),
        failure -> System.out.println("D'oh! " + failure),
        () -> System.out.println("No more user")
);

in Quarkus http and messaging takes care of subscibing for you.

Backpressure a.k.a. onOverflow group

public Multi<Product> getRecommendations() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(1))
            .onOverflow().drop()
            .onItem().transformToUniAndConcatenate(
                x -> products.getRecommendedProduct());
}

Once you have a Uni or Multi instance, it’s natural to observe the events emitted by these instances. For each type of event, an invoke method is called when it sees a matching event; onSubscribe/onCancellation/onItem/onFailure/onCompletion Groups

multi
        .onSubscribe().invoke(sub -> System.out.println("Subscribed!"))
        .onCancellation().invoke(() -> System.out.println("Cancelled"))
        .onItem().invoke(s -> System.out.println("Item: " + s))
        .onFailure().invoke(f -> System.out.println("Failure: " + f))
        .onCompletion().invoke(() -> System.out.println("Completed!"))
        .subscribe().with(
                item -> System.out.println("Received: " + item)
);

transform method is similar to map method of streams api.

Multi<String> transformed = multi
        .onItem().transform(String::toUpperCase)
        .onFailure().transform(MyBusinessException::new);

Unlike invoke, transform produces a new event. It invokes the passed function and sends the result to the downstream subscriber. The synchronous nature of transform is important. After receiving the event, transform calls the transformation logic and emits the result downstream. If the transformation logic takes a long time to complete, transform waits until the logic terminates. So, use transform when the transformation is fast enough.

products.getAllProducts()
            .onItem().transform(p -> captializeAllFirstLetter(p.name))
            .onItem().transform(ProductModel::new);

Chaining Asynchronous Actions

uni
    .onItem().transformToUni(item -> callMyRemoteService(item))
    .subscribe().with(s -> System.out.println("Received: " + s));

uni
    .onItem().transformToMulti(s -> getAMulti(s))
    .subscribe().with(
        s -> System.out.println("Received item: " + s),
        () -> System.out.println("Done!")
);

transform to Multi or Uni to keep things on IO thread and execute asynchronously

users.getUserByName(username)
            .onItem().transformToMulti(user -> orders.getOrderForUser(user));

Uni to Multi and Preserve order - transformToMultiAndConcatenate (Limited concurrency)

users.getAllUsers().onItem().transformToMultiAndConcatenate(user -> orders.getOrderForUser(user));

no order - transformToMultiAndMerge (Highest Concurrency)

users.getAllUsers().onItem().transformToMultiAndMerge(user -> orders.getOrderForUser(user));

Error Handling

users.createUser(name)
            .onItem().transform(id -> "New User " + name + " inserted")
            .onFailure().recoverWithItem(
                failure -> "User not inserted: " + failure.getMessage());

While a failure is an event, it’s a terminal one. That’s not a problem if you are dealing with Uni; you won’t get the item, just the failure. So, the Uni would replace the failure with the fallback item. With Multi, you won’t get any more items after the failure. The recovery emits the fallback item followed by the completion event. Another common possibility is to retry. atMost: max retry attempts to subscribe to upstream withBackOff delay between attempts.

users.createUser(name)
            .onItem().transform(id -> "New User " + name + " inserted")
            .onFailure().retry()
            .withBackOff(Duration.ofSeconds(3))
            .atMost(3);

Refer The onFailure group methods which contains a lot more possibilities.

Combining Unis to form result from 2 or more async operations

Uni<UserProfile> uni1 = users.getRandomUser();
Uni<Product> uni2 = products.getRecommendedProduct();
Uni.combine().all().unis(uni1, uni2).asTuple()
             .onItem().transform(tuple -> "Hello " + tuple.getItem1().name +
             ", we recommend you " + tuple.getItem2().name);

Combining Multis to form result from 2 or more async operations

Multi<UserProfile> u = Multi.createFrom().ticks()
                            .every(Duration.ofSeconds(1)).onOverflow().drop()
                            .onItem().transformToUniAndConcatenate(x -> users.getRandomUser());
Multi<Product> p = Multi.createFrom().ticks()
                        .every(Duration.ofSeconds(1)).onOverflow().drop()
                        .onItem().transformToUniAndConcatenate(x -> products.getRecommendedProduct());

Multi.createBy().combining()
     .streams(u, p).asTuple().onItem()
     .transform(tuple -> "Hello " + tuple.getItem1().name + ", we recommend you " + tuple.getItem2().name);

When joining Multis, the resulting stream completes as soon as one of the joined Multi sends the completion event. Indeed, it won’t be possible to combine the items anymore.

Selecting Items

select.where and select.when (select.when is async )

getAllOrders()
            .select().where(order -> order.products.size() > 3);
getAllOrders()
            .select().when(order ->
                    users.getUserByName(username)
                        .onItem().transform(u -> u.name.equalsIgnoreCase(username))
            );

Duplicate resoulution with select

orders.getAllOrders()
            .onItem().transformToIterable(order -> order.products)
            .select().distinct();

Note that we can’t use distinct on unbounded streams, as it needs to keep in memory all the already seen items.

In addition to selection, Mutiny provides a skip group. It provides the opposite functionality, and so allows skipping items matching predicates and repetitions.

Collecting Items

Finally, when dealing with bounded Multi, you may want to accumulate the items into a list or a collection. The resulting structure is emitted when the Multi completes. Uni<List<Product>> ulp = getAllOrderedProducts().collect().asList(); here getAllOrderedProducts returns a Multi which is stored and converted to a list for consumption in one go.

The collect group provides other methods to aggregate items into maps, collections, or events, using your own collector

Uni<List<String>> itemsAsList = multi.collect().asList();
Uni<Map<String, String>> itemsAsMap = multi.collect().asMap(item ->
    getKeyForItem(item));
Uni<Long> count = multi.collect().with(Collectors.counting());

The transformToUni & transformToMulti operations are generally called flatMap in traditional reactive programming libraries. transformToMultiAndConcatenate is called concatMap in traditional reactive programming libraries. transformToMultiAndMerge is generally named flatMap.

@Channel("my-channel")
MutinyEmitter<Person> personEmitter;

public Uni<Void> send(Person p) {
    return personEmitter.send(p);
}
@Channel("my-second-channel")
MutinyEmitter<Person> messageEmitter;

public void sendMessage(Person p) {
    messageEmitter.send(
            Message.of(p,
                    () -> {
                        // Acknowledgment callback
                        return CompletableFuture.completedFuture(null);
                    },
                    failure -> {
                        // Negative-acknowledgment callback
                        return CompletableFuture.completedFuture(null);
                    })
    );
}

@Outgoing("my-channel")
Multi<Person> produceAStreamOfPersons() {
    return Multi.createFrom().items(
            new Person("Luke"),
            new Person("Leia"),
            new Person("Obiwan")
    );
}
// You can produce infinite streams every second, as demonstrated in Example 10-4.

Example 10-4. Generate infinite streams by using @Outgoing
@Outgoing("ticks")
Multi<Long> ticks() {
    return Multi.createFrom().ticks()
            .every(Duration.ofSeconds(1))
            .onOverflow().drop();
}
@Channel("my-channel")
MutinyEmitter<Person> personEmitter;

public Uni<Void> send(Person p) {
    return personEmitter.send(p);
}
@Channel("my-second-channel")
MutinyEmitter<Person> messageEmitter;

public void sendMessage(Person p) {
    messageEmitter.send(
            Message.of(p,
                    () -> {
                        // Acknowledgment callback
                        return CompletableFuture.completedFuture(null);
                    },
                    failure -> {
                        // Negative-acknowledgment callback
                        return CompletableFuture.completedFuture(null);
                    })
    );
}
@Outgoing("my-channel")
Multi<Person> produceAStreamOfPersons() {
    return Multi.createFrom().items(
            new Person("Luke"),
            new Person("Leia"),
            new Person("Obiwan")
    );
}
// You can produce infinite streams every second, as demonstrated in Example 10-4.

Example 10-4. Generate infinite streams by using @Outgoing
@Outgoing("ticks")
Multi<Long> ticks() {
    return Multi.createFrom().ticks()
            .every(Duration.ofSeconds(1))
            .onOverflow().drop();
}

With Quarkus It’s the same development model as classic RESTEasy, except that RESTEasy Reactive variant is aware of the reactive engine and relies on it.

Introduction of the @NonBlocking annotation. This is one of the essential differences with classic RESTEasy; RESTEasy Reactive can dispatch the requests on the I/O thread.

@GET
    @Produces(MediaType.TEXT_PLAIN)
    @NonBlocking
    public String hello() {
        return "Hello RESTEasy Reactive";
    }

Method returning an object, such as String in the previous example, is executed on a worker thread, except if the @NonBlocking annotation is used. In this case, the method uses an I/O thread. Method returning Uni is executed on an I/O thread, except if the method is annotated with @Blocking. In this case, the method uses a worker thread. Method returning Multi is executed on an I/O thread, except if the method is annotated with @Blocking. In this case, the method uses a worker thread.

There’s no need to use @NonBlocking when returning a Uni. RESTEasy Reactive recognizes it and automatically considers it nonblocking As with Uni, a method retuning a Multi is considered nonblocking by default. There’s no need to use @NonBlocking.

Vert.x

quarkus-vertx extension add in the project @Inject Vertx vertx; ## imoprt io.vertx.mutiny.core.Vertx;

Reactive fileaystem access

Uni<String> uni = vertx.fileSystem().readFile(path)
        .onItem().transform(buffer -> buffer.toString("UTF-8"));

This can be returned directly or quarkus can subscribe and send info as needed.

@Path("/")
public class MutinyExampleResource {

    @Inject
    Vertx vertx;

    @GET
    @Path("/lorem")
    public Uni<String> getLoremIpsum() {
        return vertx.fileSystem().readFile("lorem.txt")
                .onItem().transform(buffer -> buffer.toString("UTF-8"));
    }

}

Most Quarkus APIs have reactive variants using Mutiny, such as the mailer service, database access viz. Hibernate Reactive, messaging, templating, gRPC, and so on. Besides, the Mutiny variant of Vert.x gives you access to a vast reactive ecosystem ranging from network protocols (DNS, TCP, UDP, HTTP), to messaging (Apache Kafka, AMQP, RabbitMQ, MQTT) via data accesses and web utilities.

TIMEOUT in reactive world

vertx.fileSystem().readFile("slow.txt").onItem().transform(buffer -> buffer.toString("UTF-8")).ifNoItem().after(Duration.ofSeconds(1)).fail();

Exception Mapper

    @ServerExceptionMapper
    public Response mapFileSystemException(FileSystemException ex) {
        return Response.status(Response.Status.NOT_FOUND)
                .entity(ex.getMessage())
                .build();
    }

But when returning Multi, we need to ask ourselves: What envelope do we want? Do we want to stream bytes? Do we want to send a JSON array instead? Or maybe individual events using Server-Sent Events

Raw Streaming

@GET
@Path("/book")
@Produces(MediaType.TEXT_PLAIN)
public Multi<String> book() {
    return vertx.fileSystem().open("war-and-peace.txt",
                new OpenOptions().setRead(true))
            .onItem().transformToMulti(AsyncFile::toMulti)
            .onItem().transform(b -> b.toString("UTF-8"));
}

A client reading SSE, such as a JavaScript EventSource, can process the quotes one by one as they come.

    @GET
    @Path("/market")
    @Produces(MediaType.SERVER_SENT_EVENTS)  //this is for unbounded events.
    public Multi<Quote> market() {
        return market.getEventStream();
    }

Reactive Scores

You can see scores going from 50/100 (rather bad) to 100/100 (excellent!) Reactive score can be obtained from q/dev -> Resteasy Reactive -> Endpoint scores

quarkus dev - resteasy reactive reactive score

Buffer is a convenient way to represent a bag of bytes in Vert.x.

Sequential composition pattern This gurantess order but is not parallel

System.out.println("Leia: " + greeting1);
    service.greeting("Luke", greeting2 -> {
        System.out.println("Luke: " + greeting2);
    });
});

Simplified parallel composition pattern

String resultForLeia = null;
String resultForLuke = null;
BiConsumer<String, String> continuation = ...;

service.greeting("Leia", greeting -> {
    resultForLeia = greeting;
    if (resultForLuke != null) {
        continuation.accept(resultForLeia, resultForLuke);
    }
});
service.greeting("Luke", greeting -> {
    resultForLuke = greeting;
    if (resultForLeia != null) {
        continuation.accept(resultForLeia, resultForLuke);
    }
    });
});
<dependency>
    <groupId>io.quarkus</groupId>
    <artifactId>quarkus-hibernate-reactive-panache</artifactId>
</dependency>

quarkus.datasource.db-kind=postgresql
%prod.quarkus.datasource.username=quarkus_test
%prod.quarkus.datasource.password=quarkus_test
%prod.quarkus.datasource.reactive.url=vertx-reactive:postgresql://localhost/quarkus_test

now we have reactive db client which understands when to run in io thread

public Multi<Customer> findAll() {
  return Customer.streamAll(Sort.by("name"));
}

delete a customer

return Panache
    .withTransaction(() -> Customer.deleteById(id))
    .map(deleted -> deleted
        ? Response.ok().status(Response.Status.NO_CONTENT).build()
        : Response.ok().status(Response.Status.NOT_FOUND).build());

sample data on failure

Uni<Customer> customerUni = Customer.<Customer>findById(id)
    .onItem().ifNull().failWith(
        new WebApplicationException("Failed to find customer",
        Response.Status.NOT_FOUND)
    );
Uni<List<Order>> customerOrdersUni = orderService.getOrdersForCustomer(id);

combine async data

return Uni.combine()
    .all().unis(customerUni, customerOrdersUni)
    .combinedWith((customer, orders) -> {
      customer.orders = orders;
      return customer;
    })
    .onItem().transform(customer -> Response.ok(customer).build());

With Transaction

return Panache
    .withTransaction(customer::persist)
    .replaceWith(Response.ok(customer).status(Response.Status.CREATED).build());

--------------
return Panache
    .withTransaction(
        () -> Customer.<Customer>findById(id)
            .onItem().ifNotNull().invoke(entity -> entity.name = customer.name)
    )

nosql viz. Redis

public Multi<Customer> allCustomers() {
  return reactiveRedisClient.keys("*")
      .onItem().transformToMulti(response -> {
        return Multi.createFrom().iterable(response).map(Response::toString);
      })
      .onItem().transformToUniAndMerge(key ->
          reactiveRedisClient.hgetall(key)
              .map(resp ->
                  constructCustomer(
                      Long.parseLong(
                          key.substring(CUSTOMER_HASH_PREFIX.length())),
                      resp)
              )
      );
}
public Uni<Void> deleteCustomer(Long id) {
  return reactiveRedisClient.hdel(Arrays.asList(CUSTOMER_HASH_PREFIX + id, "name"))
      .map(resp -> resp.toInteger() == 1 ? true : null)
      .onItem().ifNull().failWith(new NotFoundException())
      .onItem().ifNotNull().transformToUni(r -> Uni.createFrom().nullItem());
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment