Skip to content

Instantly share code, notes, and snippets.

@YoungOG
Created June 7, 2023 19:54
Show Gist options
  • Save YoungOG/f211abd10d5f04e1770a8bfbb306a3b8 to your computer and use it in GitHub Desktop.
Save YoungOG/f211abd10d5f04e1770a8bfbb306a3b8 to your computer and use it in GitHub Desktop.
package net.minelucky.luckyapi.shared.redis.channels;
import com.google.gson.JsonObject;
import lombok.NoArgsConstructor;
@NoArgsConstructor
public abstract class AbstractMessage implements Message<JsonObject> {
protected JsonObject data = new JsonObject();
@Override
public JsonObject encode() {
data.addProperty("class", this.getClass().getCanonicalName());
return this.data;
}
}
package net.minelucky.luckyapi.shared.redis;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Async {
}
package net.minelucky.luckyapi.shared.redis;
import java.lang.reflect.Method;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public final class BakedListener {
private Object instance;
private Class<?> messageType;
private Set<BakedMethod> methods;
private ExecutorService service;
public BakedListener(Object instance, Class<?> messageType, Set<BakedMethod> methods) {
this(instance, messageType, methods, Executors.newSingleThreadExecutor());
}
public BakedListener(Object instance, Class<?> messageType, Set<BakedMethod> methods, ExecutorService executorService) {
this.instance = instance;
this.methods = methods;
this.messageType = messageType;
this.service = executorService;
}
public void dispatch(Object message) {
if (messageType.isAssignableFrom(message.getClass())) {
for (BakedMethod method : methods) {
if (method.isAsync()) {
service.submit(() -> method.getMethod().invoke(this.instance, message));
} else {
try {
method.getMethod().invoke(this.instance, message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}
public static List<BakedListener> create(Object instance) {
final Map<Class<?>, Set<BakedMethod>> createdListeners = new HashMap<>();
for (Method method : instance.getClass().getDeclaredMethods()) {
if (method.isAnnotationPresent(Listener.class)) {
if (method.getParameterCount() == 1) {
final Class<?> messageType = method.getParameterTypes()[0];
final Set<BakedMethod> bakedMethods = createdListeners.computeIfAbsent(messageType, (c) -> new HashSet<>());
method.setAccessible(true);
bakedMethods.add(new BakedMethod(method, method.isAnnotationPresent(Async.class)));
}
}
}
final List<BakedListener> value = new ArrayList<>();
for (Map.Entry<Class<?>, Set<BakedMethod>> bakedMethods : createdListeners.entrySet()) {
value.add(new BakedListener(instance, bakedMethods.getKey(), bakedMethods.getValue()));
}
return value;
}
public Class<?> getMessageType() {
return messageType;
}
public Set<BakedMethod> getMethods() {
return methods;
}
}
package net.minelucky.luckyapi.shared.redis;
import java.lang.reflect.Method;
public final class BakedMethod {
private Method method;
private boolean async;
public BakedMethod(Method method, boolean async) {
this.method = method;
this.async = async;
}
public boolean isAsync() {
return async;
}
public Method getMethod() {
return method;
}
}
package net.minelucky.luckyapi.shared.redis;
import com.google.gson.Gson;
import net.minelucky.luckyapi.shared.redis.gson.GsonChannel;
import redis.clients.jedis.JedisPool;
import java.util.concurrent.ExecutorService;
public interface Channel {
class Builder {
private final String host;
private final String channelName;
private JedisPool callPool, subscriberPool;
private Gson gson = new Gson();
Builder(String name, String host) {
this.channelName = name;
this.host = host;
}
public Builder withGson(Gson gson) {
this.gson = gson;
return this;
}
public Builder withJedisPools(JedisPool call, JedisPool subscriber) {
this.callPool = call;
this.subscriberPool = subscriber;
return this;
}
public Channel build(ExecutorService service) {
if (this.callPool == null) callPool = new JedisPool(host);
if (this.subscriberPool == null) subscriberPool = new JedisPool(host);
return new GsonChannel(this.gson, this.channelName, this.callPool, this.subscriberPool, service);
}
public Channel build() {
return new GsonChannel(this.gson, this.channelName, this.host);
}
}
static Builder create(String name, String host) {
return new Builder(name, host);
}
String name();
void send(Object message);
void register(Object listener);
void close();
}
package net.minelucky.luckyapi.shared.redis.gson;
import com.google.gson.Gson;
import net.minelucky.luckyapi.shared.redis.Channel;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public final class GsonChannel implements Channel {
private final Gson gson;
private final String name;
private final GsonMessageDispatcher dispatcher;
private final JedisPool callPool;
private final ExecutorService executorService;
public GsonChannel(Gson gson, String name, String host) {
this(gson, name, new JedisPool(host), new JedisPool(host), Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
}
public GsonChannel(Gson gson, String name, JedisPool callPool, JedisPool subscriberPool, ExecutorService service) {
this.gson = gson;
this.name = name;
this.callPool = callPool;
this.dispatcher = new GsonMessageDispatcher(this.gson);
this.executorService = service;
executorService.submit(() -> {
// System.out.println("executorServiceSubmit(constructor): " + executorService.isShutdown());
// System.out.println("subscriberPool(" + name + "): " + subscriberPool.isClosed());
while (!executorService.isShutdown()) {
// System.out.println("jedisSubscribe(" + name + "-1): ");
try (Jedis jedis = subscriberPool.getResource()) {
// System.out.println("jedisSubscribe(" + name + "-2): ");
jedis.subscribe(dispatcher, name);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
@Override
public String name() {
return name;
}
@Override
public void send(Object message) {
executorService.submit(() -> {
// System.out.println("executorServiceSubmit(send)");
try (Jedis jedis = callPool.getResource()) {
jedis.publish(this.name, message.getClass().getName() + "|" + this.gson.toJson(message));
// System.out.println("jedisPublish(" + name + "): ");
}
});
}
@Override
public void register(Object listener) {
this.dispatcher.addListener(listener);
}
@Override
public void close() {
this.executorService.shutdown();
this.dispatcher.unsubscribe();
}
}
package net.minelucky.luckyapi.shared.redis.gson;
import com.google.gson.Gson;
import net.minelucky.luckyapi.shared.redis.BakedListener;
import net.minelucky.luckyapi.shared.redis.MessageDispatcher;
import net.minelucky.luckyapi.shared.redis.server.Server;
import redis.clients.jedis.JedisPubSub;
import java.lang.reflect.Type;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public final class GsonMessageDispatcher extends JedisPubSub implements MessageDispatcher {
private final Gson gson;
private final Set<BakedListener> listeners = Collections.newSetFromMap(new ConcurrentHashMap<>());
GsonMessageDispatcher(Gson gson) {
this.gson = gson;
}
@Override
public void onMessage(String channel, String message) {
String[] realMessage = message.split("\\|");
// System.out.println("Channel: " + channel + ", message: " + Arrays.toString(realMessage));
if (realMessage.length == 2) {
try {
Class<?> messageType = Class.forName(realMessage[0]);
this.dispatch(gson.fromJson(realMessage[1], (Type) messageType));
} catch (ClassNotFoundException e) {
System.out.println("Failed to find messageType match for class(" + e.getMessage() + "): " + realMessage[0]);
} catch (Exception e) {
e.printStackTrace();
}
} else {
// plain json message
this.dispatch(gson.fromJson(message, Server.class));
}
}
@Override
public void dispatch(Object message) {
// System.out.println("dispatch(message): " + message.toString());
for (BakedListener listener : listeners) {
// System.out.println(" - dispatch(listener): " + listener.getMessageType().getCanonicalName());
if (listener.getMessageType().isAssignableFrom(message.getClass())) {
listener.dispatch(message);
}
}
}
void addListener(Object listener) {
this.listeners.addAll(BakedListener.create(listener));
}
}
package net.minelucky.luckyapi.shared.redis;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Listener {
}
package net.minelucky.luckyapi.shared.redis.channels;
import com.google.gson.JsonObject;
public interface Message<R> {
void decode(JsonObject message);
R encode();
}
package net.minelucky.luckyapi.shared.redis;
public interface MessageDispatcher {
void dispatch(Object message);
}
package net.minelucky.luckyapi.shared.redis.server;
import java.util.List;
// AVAILABLE
public interface MutableServer extends Server {
void setDisplayName(String displayName);
void setPlayerCount(int playerCount);
void setMaxPlayerCount(int maxPlayerCount);
void setLastHeartbeat(long lastHeartbeat);
void setWhitelisted(boolean whitelisted);
void setWhitelist(List<String> whitelist);
void setReady(boolean isReady);
}
package net.minelucky.luckyapi.shared.redis.packet;
import com.google.gson.Gson;
import net.minelucky.luckyapi.shared.redis.Channel;
import net.minelucky.luckyapi.shared.redis.Request;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public final class PacketChannel implements Channel {
private Gson gson;
private String name;
private PacketMessageDispatcher dispatcher;
private JedisPool callPool, subscriberPool;
private ExecutorService executorService;
public PacketChannel(Gson gson, String name, String host) {
this(gson, name, host, Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
}
public PacketChannel(Gson gson, String name, String host, ExecutorService service) {
this.gson = gson;
this.name = name;
this.callPool = new JedisPool(host);
this.subscriberPool = new JedisPool(host);
this.dispatcher = new PacketMessageDispatcher(this.gson);
this.executorService = service;
executorService.submit(() -> {
while (!executorService.isShutdown()) {
try (Jedis jedis = subscriberPool.getResource()) {
jedis.subscribe(dispatcher, name);
}
}
});
}
@Override
public String name() {
return name;
}
@Override
public void send(Object message) {
if (message instanceof Request) {
executorService.submit(() -> {
try (Jedis jedis = callPool.getResource()) {
dispatcher.addReply((Request<?>) message);
jedis.publish(this.name, message.getClass().getName() + "|" + this.gson.toJson(message));
}
});
}
}
@Override
public void register(Object listener) {
// NO-OP
}
@Override
public void close() {
this.dispatcher.unsubscribe();
this.subscriberPool.close();
this.callPool.close();
}
}
package net.minelucky.luckyapi.shared.redis.packet;
import com.google.gson.Gson;
import com.google.gson.JsonElement;
import net.minelucky.luckyapi.shared.redis.MessageDispatcher;
import net.minelucky.luckyapi.shared.redis.Request;
import redis.clients.jedis.JedisPubSub;
import java.lang.reflect.Type;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
public final class PacketMessageDispatcher extends JedisPubSub implements MessageDispatcher {
private Gson gson;
private final Map<Class<?>, Queue<Request<?>>> awaitingReplies = new ConcurrentHashMap<>();
PacketMessageDispatcher(Gson gson) {
this.gson = gson;
}
@Override
public void onMessage(String channel, String message) {
String[] realMessage = message.split("\\|");
if (realMessage.length == 2) {
try {
Class<?> messagetype = Class.forName(realMessage[0]);
this.dispatch(gson.fromJson(realMessage[1], (Type) messagetype));
} catch (Exception ex) {
}
} else {
// plain json message
this.dispatch(
gson.fromJson(message, JsonElement.class));
}
}
@Override
public void dispatch(Object message) {
Queue<Request<?>> replies = awaitingReplies.get(message.getClass());
if (replies != null) {
Request<?> reply = replies.poll();
if (reply != null) {
reply.reply(message);
}
}
}
void addReply(Request<?> request) {
Queue<Request<?>> replyQueue = awaitingReplies.computeIfAbsent(request.responseType(), (clazz) -> new ConcurrentLinkedQueue<>());
replyQueue.offer(request);
}
}
package net.minelucky.luckyapi.shared.redis.server.redis;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import net.minelucky.luckyapi.shared.redis.server.MutableServer;
import java.util.List;
@Getter
@Setter
@AllArgsConstructor
public final class RedisServer implements MutableServer {
private String name, displayName;
private int playerCount, maxPlayerCount;
private boolean whitelisted, isReady;
private long lastHeartbeat;
private List<String> whitelist;
@Override
public boolean isOnline() {
return (System.currentTimeMillis() - lastHeartbeat) <= 5000L;
}
@Override
public long lastHeartbeat() {
return this.lastHeartbeat;
}
}
package net.minelucky.luckyapi.shared.redis;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
public interface Request<R> {
Class<R> responseType();
Request<R> onReply(Consumer<R> consumer);
void reply(Object response);
Request<R> withTimeout(Runnable runnable, long time, TimeUnit unit);
default Request<R> withTimeout(Runnable runnable) {
return withTimeout(runnable, 5L, TimeUnit.SECONDS);
}
Request<R> withIdentifier(long id);
long id();
boolean hasReplied();
}
package net.minelucky.luckyapi.shared.redis.server;
import java.util.List;
public interface Server {
String getName();
String getDisplayName();
int getPlayerCount();
int getMaxPlayerCount();
long lastHeartbeat();
boolean isOnline();
boolean isWhitelisted();
boolean isReady();
List<String> getWhitelist();
}
package net.minelucky.luckyapi.shared.redis.server.gson;
import com.google.common.collect.Lists;
import com.google.common.reflect.TypeToken;
import com.google.gson.*;
import net.minelucky.luckyapi.shared.redis.server.Server;
import net.minelucky.luckyapi.shared.redis.server.redis.RedisServer;
import java.lang.reflect.Type;
import java.util.List;
public final class ServerTypeAdapter implements JsonSerializer<Server>, JsonDeserializer<Server> {
private final Gson gson = new Gson();
private final Type whitelistType = new TypeToken<List<String>>() {
}.getType();
@Override
public Server deserialize(JsonElement obj, Type type, JsonDeserializationContext ctx) throws JsonParseException {
if (obj.isJsonObject()) {
JsonObject server = obj.getAsJsonObject();
boolean whitelisted = server.get("whitelisted").getAsBoolean();
return new RedisServer(
server.get("name").getAsString(),
server.get("displayName").getAsString(),
server.get("playerCount").getAsInt(),
server.get("maxPlayers").getAsInt(),
whitelisted,
server.get("isReady").getAsBoolean(),
System.currentTimeMillis(),
(whitelisted && server.has("whitelist")) ? gson.fromJson(server.get("whitelist"), whitelistType) : Lists.newArrayList()
);
}
return null;
}
@Override
public JsonElement serialize(Server server, Type type, JsonSerializationContext cxt) {
JsonObject data = new JsonObject();
data.addProperty("playerCount", server.getPlayerCount());
data.addProperty("name", server.getName());
data.addProperty("displayName", server.getDisplayName());
data.addProperty("maxPlayers", server.getMaxPlayerCount());
data.addProperty("whitelisted", server.isWhitelisted());
data.addProperty("isReady", server.isReady());
if (server.isWhitelisted())
data.add("whitelist", this.gson.toJsonTree(server.getWhitelist(), whitelistType));
return data;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment