Created
June 25, 2015 15:25
-
-
Save beders/03698bd891aa60ff69e8 to your computer and use it in GitHub Desktop.
Simple EventBusBridge
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.saffrontech.vertx; | |
import io.vertx.core.AsyncResult; | |
import io.vertx.core.Handler; | |
import io.vertx.core.MultiMap; | |
import io.vertx.core.Vertx; | |
import io.vertx.core.buffer.Buffer; | |
import io.vertx.core.eventbus.DeliveryOptions; | |
import io.vertx.core.eventbus.Message; | |
import io.vertx.core.http.WebSocket; | |
import io.vertx.core.json.JsonObject; | |
import java.net.URI; | |
import java.util.*; | |
import java.util.List; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.function.Function; | |
/** | |
* Simple event bus bridge using Vert.x websockets. | |
* Note: Only Message<JsonObject> and Message<lt;String> is supported | |
* | |
* Created by beders on 6/23/15. | |
*/ | |
public class EventBusBridge { | |
Vertx vertx; | |
WebSocket webSocket; | |
long pingTimerID; | |
ConcurrentHashMap<String, List<Handler<Message<?>>>> handlers = new ConcurrentHashMap<>(); | |
ConcurrentHashMap<String, Handler<Message<?>>> replyHandlers = new ConcurrentHashMap<>(); | |
public static void connect(URI endPoint, Handler<EventBusBridge> onOpenHandler) { | |
new EventBusBridge(endPoint, onOpenHandler); | |
} | |
private EventBusBridge(URI endPoint, Handler<EventBusBridge> onOpenHandler) { | |
vertx = Vertx.vertx(); | |
vertx.createHttpClient().websocket(endPoint.getPort(), endPoint.getHost(), endPoint.getPath() + "/websocket", ws -> { | |
webSocket = ws; | |
onOpenHandler.handle(EventBusBridge.this); | |
ws.handler(buffer -> { | |
bufferReceived(buffer); | |
}); | |
ws.closeHandler(it -> { | |
if (pingTimerID != 0) { | |
vertx.cancelTimer(pingTimerID); | |
} | |
}); | |
sendPing(); | |
pingTimerID = vertx.setPeriodic(5000L, time -> { | |
sendPing(); | |
}); | |
}); | |
} | |
protected void sendPing() { | |
//System.out.println("Sending ping"); | |
JsonObject msg = new JsonObject().put("type","ping"); | |
try { | |
webSocket.writeMessage(Buffer.buffer(msg.toString())); | |
} catch (IllegalStateException ise) { | |
vertx.cancelTimer(pingTimerID); | |
} | |
} | |
public EventBusBridge send(String address, String message, Handler<Message<?>> replyHandler) { | |
Objects.requireNonNull(webSocket); | |
sendMessage("send", address, message, replyHandler); | |
return this; | |
} | |
public EventBusBridge publish(String address, String message, Handler<Message<?>> replyHandler) { | |
Objects.requireNonNull(webSocket); | |
sendMessage("publish", address, message, replyHandler); | |
return this; | |
} | |
public EventBusBridge send(String address, JsonObject message, Handler<Message<?>> replyHandler) { | |
Objects.requireNonNull(webSocket); | |
sendMessage("send", address, message, replyHandler); | |
return this; | |
} | |
public EventBusBridge publish(String address, JsonObject message, Handler<Message<?>> replyHandler) { | |
Objects.requireNonNull(webSocket); | |
sendMessage("publish", address, message, replyHandler); | |
return this; | |
} | |
public EventBusBridge registerHandler(String address, Handler<Message<?>> dataHandler) { | |
handlers.computeIfAbsent(address, key -> { | |
webSocket.writeMessage(Buffer.buffer(new JsonObject().put("type","register").put("address", address).toString())); | |
return Collections.synchronizedList(new ArrayList<Handler<Message<?>>>()); | |
}).add(dataHandler); | |
return this; | |
} | |
public EventBusBridge unregisterHandler(String address, Handler<Message<JsonObject>> dataHandler) { | |
List<Handler<Message<?>>> handlers = this.handlers.getOrDefault(address, Collections.emptyList()); | |
if (handlers.isEmpty()) { | |
String unregisterMsg = new JsonObject().put("type","unregister").put("address", address).toString(); | |
webSocket.writeMessage(Buffer.buffer(unregisterMsg)); | |
} | |
return this; | |
} | |
public void close() { | |
webSocket.close(); | |
} | |
private void sendMessage(String sendOrPublish, String address, Object message, Handler<Message<?>> replyHandler) { | |
JsonObject msg = new JsonObject().put("type", sendOrPublish).put("address", address).put("body", message); | |
if (replyHandler != null) { | |
String replyAddress = UUID.randomUUID().toString(); | |
replyHandlers.put(replyAddress, replyHandler); | |
msg.put("replyAddress", replyAddress); | |
} | |
webSocket.writeMessage(Buffer.buffer(msg.toString())); | |
} | |
protected void bufferReceived(Buffer buffer) { | |
//System.out.println("Buffer Received"); | |
//System.out.println(buffer.toString()); | |
JsonObject msg = new JsonObject(buffer.toString()); | |
String type = msg.getString("type"); | |
if ("err".equals(type)) { | |
// TODO invoke error handler | |
System.err.println("Error message from the event bus bridge:" + msg.toString()); | |
return; | |
} | |
String address = msg.getString("address"); | |
EventBusMessage result = new EventBusMessage(msg); | |
for (Handler<Message<?>> h : handlers.getOrDefault(address, Collections.emptyList())) { | |
h.handle(result); | |
} | |
if (replyHandlers.containsKey(address)) { | |
Handler<Message<?>> replyHandler = replyHandlers.remove(address); | |
replyHandler.handle(result); | |
} | |
} | |
class EventBusMessage<T> implements Message<T> { | |
String address; | |
String replyAddress; | |
T body; | |
EventBusMessage(JsonObject json) { | |
address = json.getString("address"); | |
replyAddress = json.getString("replyAddress", null); | |
body = (T)json.getValue("body"); | |
} | |
EventBusMessage(Message<T> result) { | |
address = result.address(); | |
replyAddress = result.replyAddress(); | |
body = result.body(); | |
} | |
@Override | |
public String address() { | |
return null; | |
} | |
@Override | |
public MultiMap headers() { | |
throw new UnsupportedOperationException("No headers supported"); | |
} | |
@Override | |
public T body() { | |
return body; | |
} | |
@Override | |
public String replyAddress() { | |
return replyAddress; | |
} | |
@Override | |
public void reply(Object message) { | |
this.reply(message, new DeliveryOptions(), (Handler)null); | |
} | |
@Override | |
public <R> void reply(Object message, Handler<AsyncResult<Message<R>>> replyHandler) { | |
this.reply(message, new DeliveryOptions(), replyHandler); | |
} | |
@Override | |
public void reply(Object message, DeliveryOptions options) { | |
this.reply(message, options, (Handler)null); | |
} | |
@Override | |
public <R> void reply(Object message, DeliveryOptions deliveryOptions, Handler<AsyncResult<Message<R>>> replyHandler) { | |
if (this.replyAddress != null) { | |
EventBusBridge.this.send(replyAddress, message.toString(), result -> { | |
replyHandler.handle(new AsyncResult<Message<R>>() { | |
@Override | |
public Message<R> result() { | |
return new EventBusMessage(result); | |
} | |
@Override | |
public Throwable cause() { | |
return null; | |
} | |
@Override | |
public boolean succeeded() { | |
return true; | |
} | |
@Override | |
public boolean failed() { | |
return false; | |
} | |
}); | |
}); | |
} | |
} | |
@Override | |
public void fail(int i, String s) { | |
throw new UnsupportedOperationException("Failure is not an option"); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
(only works with vert.x 3.0.0-milestone6 which had a writeBuffer method)