Skip to content

Instantly share code, notes, and snippets.

@adrianbumbas
Created February 20, 2016 08:56
Show Gist options
  • Save adrianbumbas/c1eb4ff2f3ba17e153de to your computer and use it in GitHub Desktop.
Save adrianbumbas/c1eb4ff2f3ba17e153de to your computer and use it in GitHub Desktop.
Reactive consuming a web socket using RxJava, Java8 and Tyrus client
import org.glassfish.tyrus.ext.client.java8.SessionBuilder;
import rx.Observable;
import javax.websocket.Session;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.concurrent.CountDownLatch;
public class WebSocketClientReactive {
private static CountDownLatch messageLatch = new CountDownLatch(1);
public static void main(String[] args) throws Exception {
Observable<Object> observable = Observable.create(subscriber -> {
try {
final Session[] session = new Session[1];
//this is an echo WSS
new SessionBuilder().uri(new URI("wss://sandbox.kaazing.net/echo"))
.onOpen((session1, endpointConfig) -> {
System.out.println("Connection open");
session[0] = session1;
try {
session[0].getBasicRemote().sendText("hello");
} catch (IOException e) {
e.printStackTrace();
}
})
.messageHandler(String.class, (t) -> {
try {
Thread.sleep(100);
session[0].getBasicRemote().sendText("hello");
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
subscriber.onNext(t);
})
.onError((session1, throwable) -> {
System.out.println("Aborted, the error is: " + throwable.getMessage());
messageLatch.countDown();
})
.connectAsync();
} catch (URISyntaxException e) {
e.printStackTrace();
}
});
//subscribe
observable.subscribe(o -> System.out.println("Received message: " + o));
//waits until you hit Ctrl+C
messageLatch.await();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment