Skip to content

Instantly share code, notes, and snippets.

View danielkec's full-sized avatar
🚀

Daniel Kec danielkec

🚀
View GitHub Profile
@ApplicationScoped
public class MsgProcessingBean {
private final EmittingPublisher<String> emittingPublisher = new EmittingPublisher<>();
private SseBroadcaster sseBroadcaster;
@Outgoing("multiplyVariants")
public Publisher<String> preparePublisher() {
// Create new publisher for emitting to by this::process
return ReactiveStreams
.fromPublisher(emittingPublisher)
@danielkec
danielkec / ma1.java
Last active March 20, 2020 13:36
messaging-article-1
@Outgoing("publisher-payload")
public PublisherBuilder<Integer> streamOfMessages() {
return ReactiveStreams.of(1, 2, 3);
}
@Incoming("publisher-payload")
@Outgoing("wrapped-message")
public Message<String> rewrapMessageManually(Message<Integer> message) {
return Message.of(Integer.toString(message.getPayload()));
}
@ApplicationScoped
@Connector("example-connector")
public class ExampleConnector implements IncomingConnectorFactory, OutgoingConnectorFactory {
@Override
public PublisherBuilder<? extends Message<?>> getPublisherBuilder(Config config) {
return ReactiveStreams.of("foo", "bar")
.map(Message::of);
}
mp.messaging.incoming.from-connector-channel.connector=example-connector
mp.messaging.incoming.to-connector-channel.connector=example-connector
@Outgoing("from-connector-channel")
public Publisher<Integer> produceInts() {
return Flowable.just(1, 2, 3);
}
> Connector says: 1
> Connector says: 2
> Connector says: 3
@Incoming("to-connector-channel")
public void consumeInt(Integer value) {
System.out.println("Consuming Integer: " + value);
}
> Consuming Integer: 1
> Consuming Integer: 2
> Consuming Integer: 3
// Create a message
Message<String> message = Message.of("payload!", () -> {
System.out.println("Executed when acked!");
return CompletableFuture.completedFuture(null);
});
// Acknowledge
message.ack();
@Outgoing("test-channel")
public Publisher<Message<String>> produceMessage() {
return ReactiveStreams.of(Message.of("test-data", () -> {
System.out.println("Message acked!");
return CompletableFuture.completedStage(null);
})).buildRs();
}
@Incoming("test-channel")
@Acknowledgment(Acknowledgment.Strategy.POST_PROCESSING)
AtomicInteger sum = new AtomicInteger();
ReactiveStreams.of("1", "2", "3", "4", "5")
.limit(3)
.map(Integer::parseInt)
.forEach(sum::addAndGet)
.run()
.whenComplete((r, t) -> System.out.println("Sum: " + sum.get()));
> Sum: 6
AtomicInteger sum = new AtomicInteger();
Flowable<Integer> flowable = Flowable.just("1", "2", "3", "4", "5")
.map(Integer::parseInt);
ReactiveStreams.fromPublisher(flowable)
.limit(3)
.forEach(sum::addAndGet)
.run()
.whenComplete((r, t) -> System.out.println("Sum: " + sum.get()));