Skip to content

Instantly share code, notes, and snippets.

@spencergibb
Created January 11, 2019 20:11
Show Gist options
  • Save spencergibb/fd5d344cfaa9db7fccdd7ed1db1d758e to your computer and use it in GitHub Desktop.
Save spencergibb/fd5d344cfaa9db7fccdd7ed1db1d758e to your computer and use it in GitHub Desktop.
package com.example.demorsocket.channel;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.rsocket.AbstractRSocket;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.SocketAcceptor;
import io.rsocket.micrometer.MicrometerRSocketInterceptor;
import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.server.CloseableChannel;
import io.rsocket.transport.netty.server.TcpServerTransport;
import io.rsocket.util.DefaultPayload;
import lombok.extern.log4j.Log4j2;
import org.reactivestreams.Publisher;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.time.Duration;
@SpringBootApplication
public class PingPong {
public static void main(String[] args) {
SpringApplication.run(PingPong.class, args);
}
static String reply(String in) {
switch (in.toLowerCase()) {
case "ping": return "pong";
case "pong": return "ping";
default: throw new IllegalArgumentException("Value must be ping or pong, not " + in);
}
}
}
@Log4j2
@Component
class Ping implements Ordered, ApplicationListener<ApplicationReadyEvent> {
@Autowired
MeterRegistry meterRegistry;
@Override
public int getOrder() {
return Ordered.LOWEST_PRECEDENCE;
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
log.info("Starting Ping");
MicrometerRSocketInterceptor interceptor = new MicrometerRSocketInterceptor(meterRegistry, Tag.of("component", "ping"));
RSocketFactory.connect()
.addClientPlugin(interceptor)
.transport(TcpClientTransport.create(7002))
.start()
.flatMapMany(socket ->
socket.requestChannel(
Flux.interval(Duration.ofSeconds(1))
.map(i -> DefaultPayload.create("ping", "pong"))
).map(Payload::getDataUtf8)
.doOnNext(str -> log.info("received " + str + " in Ping"))
// .take(10)
.doFinally(signal -> socket.dispose())
)
.then()
.block();
}
}
@Log4j2
@Component
class Proxy implements SocketAcceptor, Ordered, ApplicationListener<ApplicationReadyEvent> {
@Autowired
MeterRegistry meterRegistry;
private Mono<RSocket> rsocketClient;
private MicrometerRSocketInterceptor interceptor;
@Override
public int getOrder() {
return 0;
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
log.info("Starting Proxy");
interceptor = new MicrometerRSocketInterceptor(meterRegistry, Tag.of("component", "proxy"));
TcpServerTransport transport = TcpServerTransport.create(7002);
Mono<CloseableChannel> rsocketServer = RSocketFactory.receive()
.addServerPlugin(interceptor)
.acceptor(this)
.transport(transport)
.start();
rsocketServer.subscribe();
rsocketClient = null;
}
@Override
public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
log.info("entering proxy accept");
if (!setup.hasMetadata()) { // and metadata is routing
return Mono.error(new IllegalStateException("No routing metadata"));
}
String metadata = setup.getMetadataUtf8();
if (StringUtils.isEmpty(metadata)) {
return Mono.error(new IllegalStateException("No routing metadata"));
}
if (!"pong".equalsIgnoreCase(metadata)) { // service discovery does not have entry
return Mono.error(new IllegalStateException("Unknown service " + metadata));
}
if (rsocketClient == null) { // if not connected previously, initialize connection
rsocketClient = RSocketFactory.connect()
.addClientPlugin(interceptor)
.transport(TcpClientTransport.create(7001)) // get values from service discovery
.start();
}
AbstractRSocket proxy = new AbstractRSocket() {
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
log.info("entering proxy requestChannel");
return rsocketClient.flatMapMany(rSocket -> rSocket.requestChannel(payloads));
/*return Flux.from(payloads)
.map(Payload::getDataUtf8) // wouldn't normally do this in a proxy
.doOnNext(str -> log.info("received " + str + " in Proxy"))
.map(DefaultPayload::create);*/
}
};
return Mono.just(proxy);
}
}
@Log4j2
@Component
class Pong implements SocketAcceptor, Ordered, ApplicationListener<ApplicationReadyEvent> {
@Autowired
MeterRegistry meterRegistry;
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
@Override
public void onApplicationEvent(ApplicationReadyEvent event) {
log.info("Starting Pong");
MicrometerRSocketInterceptor interceptor = new MicrometerRSocketInterceptor(meterRegistry, Tag.of("component", "pong"));
TcpServerTransport transport = TcpServerTransport.create(7001);
RSocketFactory.receive()
.addServerPlugin(interceptor)
.acceptor(this)
.transport(transport)
.start()
.subscribe();
}
@Override
public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) {
AbstractRSocket pong = new AbstractRSocket() {
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.from(payloads)
.map(Payload::getDataUtf8)
.doOnNext(str -> log.info("received " + str + " in Pong"))
.map(PingPong::reply)
.map(DefaultPayload::create);
}
};
return Mono.just(pong);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment