Skip to content

Instantly share code, notes, and snippets.

@pagetronic
Created October 16, 2025 14:35
Show Gist options
  • Save pagetronic/7f5961a2d94411759eee56549ae365d6 to your computer and use it in GitHub Desktop.
Save pagetronic/7f5961a2d94411759eee56549ae365d6 to your computer and use it in GitHub Desktop.
/*
* Copyright 2019 Laurent PAGE, Apache Licence 2.0
*/
package live.page.hubd.system.socket;
import com.mongodb.CursorType;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.CreateCollectionOptions;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.Sorts;
import jakarta.servlet.ServletContextEvent;
import jakarta.servlet.ServletContextListener;
import jakarta.servlet.annotation.WebListener;
import jakarta.websocket.CloseReason;
import jakarta.websocket.Session;
import live.page.hubd.system.Language;
import live.page.hubd.system.db.Db;
import live.page.hubd.system.json.Json;
import live.page.hubd.system.utils.Fx;
import live.page.hubd.system.utils.HubDThreadFactory;
import java.util.Date;
import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* For multiServer system, you can't have a live websocket update if you don't store update in a common system to all servers
* <p>
* Here we use Capped Collection from MongodDB <a href="https://docs.mongodb.com/manual/core/capped-collections/">...</a>
* and Tailable Cursor <a href="https://docs.mongodb.com/manual/core/tailable-cursors/">...</a>
* <p>
* Every push are live.
* In Javascript use : socket.follow('channel/tofollow', function (msg) {});
*/
@WebListener
public class SocketPusher implements ServletContextListener {
private static final String collectionName = "Pusher";
static MongoCursor<Json> pushCursor;
private static final ExecutorService executor = Executors.newSingleThreadExecutor(new HubDThreadFactory("SocketCapped"));
/**
* Send push message to all
*
* @param channel followed
* @param message to send
*/
public static void send(String channel, Json message) {
send(channel, List.of(), message);
}
/**
* Send push message to user
*
* @param channel followed
* @param message to send
*/
public static void send(String channel, String user, Json message) {
send(channel, user == null ? null : List.of(user), message);
}
/**
* Send push message to users
*
* @param channel followed
* @param message to send
*/
public static void send(String channel, List<String> users, Json message) {
if (message == null) {
return;
}
Json push = new Json("channel", channel).put("message", message).put("date", new Date());
if (users != null && !users.isEmpty()) {
push.put("users", users);
}
Db.getDb(collectionName).insertOne(push);
}
/**
* Create and Run Capped Collection
*
*/
private void makeCapped() {
executor.submit(() -> {
try {
Date last_date = new Date();
if (Db.getDb().listCollectionNames().filter(Filters.eq("name", collectionName)).first() == null) {
Fx.log("Collection " + collectionName + " creation");
MongoDatabase db = Db.getDb();
MongoCollection<Json> pushDb = db.getCollection(collectionName, Json.class);
CreateCollectionOptions options = new CreateCollectionOptions().capped(true).sizeInBytes(600 * 1024 * 1024);
db.createCollection(collectionName, options);
pushDb.createIndex(Sorts.descending("date"), new IndexOptions().name("date"));
pushDb.insertOne(new Json().put("date", new Date(last_date.getTime() - 1L)).put("seed", true));
Fx.log("Collection " + collectionName + " created");
}
while (!executor.isShutdown() && !executor.isTerminated()) {
pushCursor = Db.getDb(collectionName).find(Filters.gt("date", last_date)).sort(Sorts.ascending("$natural"))
.noCursorTimeout(true).cursorType(CursorType.TailableAwait).iterator();
while (pushCursor.hasNext() && !executor.isShutdown() && !executor.isTerminated()) {
try {
Json push = pushCursor.next();
if (push.containsKey("users")) {
pushToUser(push.getString("channel"), push.getList("users"), push.getJson("message"));
} else {
pushToAll(push.getString("channel"), push.getJson("message"), push.getList("excludes"));
}
last_date = push.getDate("date");
} catch (Exception ignored) {
}
}
pushCursor.close();
}
} catch (Exception e) {
if (Fx.IS_DEBUG) {
Fx.log(SocketPusher.class.getSimpleName() + " closed");
}
if (!executor.isShutdown() && !executor.isTerminated()) {
try {
Thread.sleep(5 * 1000);
makeCapped();
} catch (InterruptedException ignored) {
}
}
}
});
}
/**
* Create and Run Capped Collection
*
* @param sce not used
*/
@Override
public void contextInitialized(ServletContextEvent sce) {
makeCapped();
}
@Override
public void contextDestroyed(ServletContextEvent sce) {
try {
if (pushCursor != null) {
pushCursor.close();
}
} catch (Exception ignore) {
}
Fx.shutdownService(executor);
}
/**
* Push to all users
*
* @param message to push
* @param excludes users
*/
private void pushToAll(String channel, Json message, List<String> excludes) {
for (Entry<String, SocketSessions.SocketSession> entry : SocketSessions.getSessions().entrySet()) {
SocketSessions.SocketSession sessionData = entry.getValue();
if ((excludes == null || !excludes.contains(sessionData.getUser().getId())) && sessionData.getChannels().contains(channel)) {
try {
SocketMessage data = new SocketMessage(channel);
data.setMessage(message);
sessionData.getSession().getAsyncRemote().sendText(data.toString());
} catch (Exception e) {
if (Fx.IS_DEBUG) {
e.printStackTrace();
}
}
}
}
}
/**
* Push to a specifics users
*
* @param channel where push
* @param users to push
* @param message to push
*/
private void pushToUser(String channel, List<String> users, Json message) {
if (channel == null || message == null) {
return;
}
for (Entry<String, SocketSessions.SocketSession> entry : SocketSessions.getSessions().entrySet()) {
try {
SocketSessions.SocketSession datas = entry.getValue();
Session session = datas.getSession();
String user_id = (String) session.getUserProperties().get("user_id");
if (users.contains(user_id)) {
if (datas.getChannels().contains(channel)) {
SocketMessage data = new SocketMessage(channel);
if (message.get("notification") != null) {
Json notification = message.getJson("notification");
notification.put("title", Language.get(notification.getString("title"), session.getUserProperties().get("hl").toString()));
notification.put("message", Language.get(notification.getString("message"), session.getUserProperties().get("hl").toString()));
Json message_clone = message.clone();
message_clone.put("notification", notification);
data.setMessage(message_clone);
} else {
data.setMessage(message);
}
session.getAsyncRemote().sendText(data.toString());
}
if (message.getString("action", "").equals("logout")) {
datas.getSession().close(new CloseReason(() -> 401, "Session ended by other"));
}
}
} catch (Exception e) {
if (Fx.IS_DEBUG) {
e.printStackTrace();
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment