Created
January 11, 2019 20:11
-
-
Save spencergibb/fd5d344cfaa9db7fccdd7ed1db1d758e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example.demorsocket.channel; | |
import io.micrometer.core.instrument.MeterRegistry; | |
import io.micrometer.core.instrument.Tag; | |
import io.rsocket.AbstractRSocket; | |
import io.rsocket.ConnectionSetupPayload; | |
import io.rsocket.Payload; | |
import io.rsocket.RSocket; | |
import io.rsocket.RSocketFactory; | |
import io.rsocket.SocketAcceptor; | |
import io.rsocket.micrometer.MicrometerRSocketInterceptor; | |
import io.rsocket.transport.netty.client.TcpClientTransport; | |
import io.rsocket.transport.netty.server.CloseableChannel; | |
import io.rsocket.transport.netty.server.TcpServerTransport; | |
import io.rsocket.util.DefaultPayload; | |
import lombok.extern.log4j.Log4j2; | |
import org.reactivestreams.Publisher; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.boot.SpringApplication; | |
import org.springframework.boot.autoconfigure.SpringBootApplication; | |
import org.springframework.boot.context.event.ApplicationReadyEvent; | |
import org.springframework.context.ApplicationListener; | |
import org.springframework.core.Ordered; | |
import org.springframework.stereotype.Component; | |
import org.springframework.util.StringUtils; | |
import reactor.core.publisher.Flux; | |
import reactor.core.publisher.Mono; | |
import java.time.Duration; | |
@SpringBootApplication | |
public class PingPong { | |
public static void main(String[] args) { | |
SpringApplication.run(PingPong.class, args); | |
} | |
static String reply(String in) { | |
switch (in.toLowerCase()) { | |
case "ping": return "pong"; | |
case "pong": return "ping"; | |
default: throw new IllegalArgumentException("Value must be ping or pong, not " + in); | |
} | |
} | |
} | |
@Log4j2 | |
@Component | |
class Ping implements Ordered, ApplicationListener<ApplicationReadyEvent> { | |
@Autowired | |
MeterRegistry meterRegistry; | |
@Override | |
public int getOrder() { | |
return Ordered.LOWEST_PRECEDENCE; | |
} | |
@Override | |
public void onApplicationEvent(ApplicationReadyEvent event) { | |
log.info("Starting Ping"); | |
MicrometerRSocketInterceptor interceptor = new MicrometerRSocketInterceptor(meterRegistry, Tag.of("component", "ping")); | |
RSocketFactory.connect() | |
.addClientPlugin(interceptor) | |
.transport(TcpClientTransport.create(7002)) | |
.start() | |
.flatMapMany(socket -> | |
socket.requestChannel( | |
Flux.interval(Duration.ofSeconds(1)) | |
.map(i -> DefaultPayload.create("ping", "pong")) | |
).map(Payload::getDataUtf8) | |
.doOnNext(str -> log.info("received " + str + " in Ping")) | |
// .take(10) | |
.doFinally(signal -> socket.dispose()) | |
) | |
.then() | |
.block(); | |
} | |
} | |
@Log4j2 | |
@Component | |
class Proxy implements SocketAcceptor, Ordered, ApplicationListener<ApplicationReadyEvent> { | |
@Autowired | |
MeterRegistry meterRegistry; | |
private Mono<RSocket> rsocketClient; | |
private MicrometerRSocketInterceptor interceptor; | |
@Override | |
public int getOrder() { | |
return 0; | |
} | |
@Override | |
public void onApplicationEvent(ApplicationReadyEvent event) { | |
log.info("Starting Proxy"); | |
interceptor = new MicrometerRSocketInterceptor(meterRegistry, Tag.of("component", "proxy")); | |
TcpServerTransport transport = TcpServerTransport.create(7002); | |
Mono<CloseableChannel> rsocketServer = RSocketFactory.receive() | |
.addServerPlugin(interceptor) | |
.acceptor(this) | |
.transport(transport) | |
.start(); | |
rsocketServer.subscribe(); | |
rsocketClient = null; | |
} | |
@Override | |
public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) { | |
log.info("entering proxy accept"); | |
if (!setup.hasMetadata()) { // and metadata is routing | |
return Mono.error(new IllegalStateException("No routing metadata")); | |
} | |
String metadata = setup.getMetadataUtf8(); | |
if (StringUtils.isEmpty(metadata)) { | |
return Mono.error(new IllegalStateException("No routing metadata")); | |
} | |
if (!"pong".equalsIgnoreCase(metadata)) { // service discovery does not have entry | |
return Mono.error(new IllegalStateException("Unknown service " + metadata)); | |
} | |
if (rsocketClient == null) { // if not connected previously, initialize connection | |
rsocketClient = RSocketFactory.connect() | |
.addClientPlugin(interceptor) | |
.transport(TcpClientTransport.create(7001)) // get values from service discovery | |
.start(); | |
} | |
AbstractRSocket proxy = new AbstractRSocket() { | |
@Override | |
public Flux<Payload> requestChannel(Publisher<Payload> payloads) { | |
log.info("entering proxy requestChannel"); | |
return rsocketClient.flatMapMany(rSocket -> rSocket.requestChannel(payloads)); | |
/*return Flux.from(payloads) | |
.map(Payload::getDataUtf8) // wouldn't normally do this in a proxy | |
.doOnNext(str -> log.info("received " + str + " in Proxy")) | |
.map(DefaultPayload::create);*/ | |
} | |
}; | |
return Mono.just(proxy); | |
} | |
} | |
@Log4j2 | |
@Component | |
class Pong implements SocketAcceptor, Ordered, ApplicationListener<ApplicationReadyEvent> { | |
@Autowired | |
MeterRegistry meterRegistry; | |
@Override | |
public int getOrder() { | |
return Ordered.HIGHEST_PRECEDENCE; | |
} | |
@Override | |
public void onApplicationEvent(ApplicationReadyEvent event) { | |
log.info("Starting Pong"); | |
MicrometerRSocketInterceptor interceptor = new MicrometerRSocketInterceptor(meterRegistry, Tag.of("component", "pong")); | |
TcpServerTransport transport = TcpServerTransport.create(7001); | |
RSocketFactory.receive() | |
.addServerPlugin(interceptor) | |
.acceptor(this) | |
.transport(transport) | |
.start() | |
.subscribe(); | |
} | |
@Override | |
public Mono<RSocket> accept(ConnectionSetupPayload setup, RSocket sendingSocket) { | |
AbstractRSocket pong = new AbstractRSocket() { | |
@Override | |
public Flux<Payload> requestChannel(Publisher<Payload> payloads) { | |
return Flux.from(payloads) | |
.map(Payload::getDataUtf8) | |
.doOnNext(str -> log.info("received " + str + " in Pong")) | |
.map(PingPong::reply) | |
.map(DefaultPayload::create); | |
} | |
}; | |
return Mono.just(pong); | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment