Skip to content

Instantly share code, notes, and snippets.

@yccheok
Created November 14, 2017 20:44
Show Gist options
  • Select an option

  • Save yccheok/16c07dfff9abf4881014bba8d35f531e to your computer and use it in GitHub Desktop.

Select an option

Save yccheok/16c07dfff9abf4881014bba8d35f531e to your computer and use it in GitHub Desktop.
package org.yccheok.jstock.engine.websocket;
import android.util.Log;
import com.google.gson.Gson;
import org.yccheok.jstock.engine.Code;
import org.yccheok.jstock.engine.Subject;
import org.yccheok.jstock.network.Utils;
import java.net.URISyntaxException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import io.socket.client.IO;
import io.socket.client.Socket;
import io.socket.emitter.Emitter;
/**
* Created by yccheok on 10/11/2017.
*/
public class IEXWebSocketStockServer extends Subject<WebSocketStockServer, WebSocketStockResponse> implements WebSocketStockServer {
public IEXWebSocketStockServer(){
}
private Socket initSocket() {
final Socket socket;
try {
socket = IO.socket(org.yccheok.jstock.network.Utils.getURL(Utils.Type.IEX_WEB_SOCKET));
final Gson gson = new Gson();
socket.on("message", new Emitter.Listener() {
@Override
public void call(Object... args) {
for (Object arg : args) {
String string = arg.toString();
try {
WebSocketStockResponse webSocketStockResponse = gson.fromJson(string, WebSocketStockResponse.class);
IEXWebSocketStockServer.this.notify(
IEXWebSocketStockServer.this,
webSocketStockResponse
);
} catch (Exception e) {
Log.e(TAG, "", e);
}
}
}
}).on("connect", new Emitter.Listener() {
@Override
public void call(Object... args) {
synchronized (connectedMonitor) {
connected = true;
if (false == subscribes.isEmpty()) {
final String string = IEXWebSocketStockServer.toString(subscribes);
// Server side cleanup?
socket.emit("unsubscribe", string);
socket.emit("subscribe", string);
subscribed.addAll(subscribes);
subscribes.clear();
}
if (false == unsubscribes.isEmpty()) {
socket.emit("unsubscribe", IEXWebSocketStockServer.toString(unsubscribes));
subscribed.removeAll(unsubscribes);
unsubscribes.clear();
}
}
}
}).on("disconnect", new Emitter.Listener() {
@Override
public void call(Object... args) {
synchronized (connectedMonitor) {
connected = false;
subscribed.clear();
}
}
});
} catch (URISyntaxException e) {
Log.e(TAG, "", e);
return null;
}
return socket;
}
@Override
public void connect() {
synchronized (connectedMonitor) {
if (connected) {
return;
}
if (this.socket == null) {
this.socket = initSocket();
}
socket.connect();
}
}
@Override
public void disconnect() {
synchronized (connectedMonitor) {
//if (false == connected) {
// return;
//}
socket.disconnect();
}
}
@Override
public void subscribe(List<Code> codes) {
if (codes.isEmpty()) {
return;
}
synchronized (connectedMonitor) {
if (false == connected) {
subscribes.addAll(codes);
unsubscribes.removeAll(codes);
return;
}
Set<Code> notYetSubscribedCodes = toNotYetSubscribedCodes(codes);
if (false == notYetSubscribedCodes.isEmpty()) {
final String string = IEXWebSocketStockServer.toString(notYetSubscribedCodes);
// Server side cleanup?
socket.emit("unsubscribe", string);
socket.emit("subscribe", string);
subscribed.addAll(notYetSubscribedCodes);
}
}
}
@Override
public void unsubscribe(List<Code> codes) {
if (codes.isEmpty()) {
return;
}
synchronized (connectedMonitor) {
if (false == connected) {
subscribes.removeAll(codes);
unsubscribes.addAll(codes);
return;
}
socket.emit("unsubscribe", toString(
new HashSet<>(codes)
));
subscribed.removeAll(codes);
}
}
private Set<Code> toNotYetSubscribedCodes(List<Code> codes) {
Set<Code> s = new HashSet<>(codes);
s.removeAll(subscribed);
return s;
}
private static String toString(Set<Code> codes) {
if (codes.isEmpty()) {
throw new java.lang.IllegalArgumentException();
}
StringBuilder stringBuilder = new StringBuilder();
for (Code code : codes) {
stringBuilder.append(org.yccheok.jstock.engine.Utils.toIEXFormat(code)).append(",");
}
String result = stringBuilder.toString();
if (result.endsWith(",")) {
return result.substring(0, result.length()-1);
}
return result;
}
private Socket socket;
private final Object connectedMonitor = new Object();
private volatile boolean connected = false;
// We have connectedMonitor as thread monitor.
private final Set<Code> subscribes = new HashSet<>();
private final Set<Code> unsubscribes = new HashSet<>();
private final Set<Code> subscribed = new HashSet<>();
private static final String TAG = "IEXWebSocketStockServer";
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment