Last active
December 21, 2016 19:24
-
-
Save benjchristensen/e771fa58ce723e517ccdfbdd8a6bb4d8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package lithium; | |
import org.reactivestreams.Publisher; | |
//import com.fasterxml.jackson.dataformat.cbor.CBORFactory; | |
import io.reactivesocket.AbstractReactiveSocket; | |
import io.reactivesocket.ConnectionSetupPayload; | |
import io.reactivesocket.Payload; | |
import io.reactivesocket.ReactiveSocket; | |
import io.reactivesocket.lease.DisabledLeaseAcceptingSocket; | |
import io.reactivesocket.server.ReactiveSocketServer; | |
import io.reactivesocket.transport.tcp.server.TcpTransportServer; | |
import io.reactivesocket.util.PayloadImpl; | |
import io.reactivex.Flowable; | |
public class AggregatorServer { | |
// private static final CBORFactory cborFactory = new CBORFactory(); | |
public static void main(String[] args) { | |
ReactiveSocketServer | |
.create(TcpTransportServer.create(4500)) | |
.start((ConnectionSetupPayload setup, ReactiveSocket reactiveSocket) -> { | |
System.out.println("Received new connection to AggregatorServer"); | |
subscribeToClientsForMetrics(reactiveSocket); | |
// | |
return createAcceptor(); | |
}) | |
.awaitShutdown(); | |
} | |
private static void subscribeToClientsForMetrics(ReactiveSocket reactiveSocket) { | |
Flowable | |
.fromPublisher(reactiveSocket.requestStream(new PayloadImpl("getMetrics".getBytes()))) | |
.takeUntil(reactiveSocket.onClose()) | |
.doOnNext(payload -> System.out.println("metrics output ==> " + new String(payload.getData().array()))) | |
.doOnSubscribe(s -> System.out.println("sending getMetrics request")) | |
.doOnComplete(() -> System.out.println("completed getMetrics request")) | |
.doOnError(t -> System.out.println("Error sending getMetrics request: " + t.getMessage())) | |
.subscribe(); // kick off async and let it finish when the socket closes | |
} | |
private static DisabledLeaseAcceptingSocket createAcceptor() { | |
return new DisabledLeaseAcceptingSocket(new AbstractReactiveSocket() { | |
public Publisher<Payload> requestStream(Payload payload) { | |
System.out.println("got requestStream on aggregator"); | |
// this will be clients subscribing to this system | |
return super.requestStream(payload); | |
} | |
}); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package lithium; | |
import java.io.IOException; | |
import java.net.InetSocketAddress; | |
import org.reactivestreams.Publisher; | |
//import com.fasterxml.jackson.core.JsonToken; | |
//import com.fasterxml.jackson.dataformat.cbor.CBORFactory; | |
//import com.fasterxml.jackson.dataformat.cbor.CBORParser; | |
import io.reactivesocket.AbstractReactiveSocket; | |
import io.reactivesocket.Payload; | |
import io.reactivesocket.ReactiveSocket; | |
import io.reactivesocket.client.KeepAliveProvider; | |
import io.reactivesocket.client.ReactiveSocketClient; | |
import io.reactivesocket.client.ReactiveSocketClient.SocketAcceptor; | |
import io.reactivesocket.client.SetupProvider; | |
import io.reactivesocket.lease.DisabledLeaseAcceptingSocket; | |
import io.reactivesocket.lease.LeaseEnforcingSocket; | |
import io.reactivesocket.transport.tcp.client.TcpTransportClient; | |
import io.reactivesocket.util.PayloadImpl; | |
import io.reactivex.Flowable; | |
import io.reactivex.Single; | |
public class GatewayMock { | |
// private static final CBORFactory cborFactory = new CBORFactory(); | |
public static void main(String... args) { | |
InetSocketAddress host = InetSocketAddress.createUnresolved("localhost", 4500); | |
new LithiumGatewayMock().attachToInsightsStreamAggregator(host).blockingGet(); | |
// hold open ... since this is a client there is no await | |
while (true) { | |
try { | |
Thread.sleep(100000); | |
} catch (InterruptedException e) { | |
} | |
} | |
} | |
private Single<? extends ReactiveSocket> | |
attachToInsightsStreamAggregator(InetSocketAddress host) { | |
System.out.println("Connecting to host: " + host); | |
ReactiveSocketClient insightsStreamAggregatorClient = ReactiveSocketClient.createDuplex( | |
TcpTransportClient.create(host), | |
new GatewayInsightsAcceptor(), | |
SetupProvider | |
//.keepAlive(KeepAliveProvider.from(10000, Flowable.rangeLong(1, Long.MAX_VALUE))) | |
.keepAlive(KeepAliveProvider.never()) | |
.disableLease() | |
.dataMimeType("application/cbor")); | |
return Single | |
.fromPublisher(insightsStreamAggregatorClient.connect()) | |
.doOnSuccess(s -> System.out.println("Successfully connected to aggregator")) | |
.doOnError(t -> System.out.println("Error connecting: " + t.getMessage())); | |
} | |
/** | |
* Handler that responds to subscriptions from the InsightsStreamAggregator to deliver metrics and logs. | |
*/ | |
private static class GatewayInsightsAcceptor implements SocketAcceptor { | |
@Override | |
public LeaseEnforcingSocket accept(ReactiveSocket reactiveSocket) { | |
System.out.println("the mock got a socket"); | |
// TODO use leasing, or restrict with a counter the number of concurrent subscriptions | |
// TODO to protect Gateway | |
return new DisabledLeaseAcceptingSocket(new AbstractReactiveSocket() { | |
@Override | |
public Publisher<Payload> requestStream(Payload request) { | |
System.out.println("Got a subscription to the GatewayMock"); | |
// try { | |
// CBORParser parser = cborFactory.createParser(request.getData().array()); | |
// JsonToken nextToken = null; | |
// while ((nextToken = parser.nextToken()) != null) { | |
// System.out.println("got request: " + nextToken.asString()); | |
// } | |
// | |
// } catch (IOException e) { | |
// e.printStackTrace(); | |
// } | |
return Flowable.just(new PayloadImpl("hello".getBytes())); | |
} | |
}); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment