Skip to content

Instantly share code, notes, and snippets.

@renatocron
Created October 23, 2024 06:24
Show Gist options
  • Save renatocron/a02ee51f3ec06119d4ae34f21994f64e to your computer and use it in GitHub Desktop.
Save renatocron/a02ee51f3ec06119d4ae34f21994f64e to your computer and use it in GitHub Desktop.
# app.pl
use Mojolicious::Lite -signatures;
use Mojo::Redis;
use Mojo::JSON qw(encode_json decode_json);
use Mojo::IOLoop;
# Initialize Redis with explicit connection handling
my $redis = Mojo::Redis->new('redis://127.0.0.1:6379');
$redis->on(error => sub ($redis, $error) {
app->log->error("Redis error: $error");
});
# Initialize PubSub after Redis connection is established
my $pubsub = $redis->pubsub;
setup_pubsub();
# Store WebSocket connections for this worker
my %connections;
# Setup PubSub handlers
sub setup_pubsub {
$pubsub->on(error => sub ($pubsub, $error) {
app->log->error("PubSub error: $error");
});
$pubsub->on(disconnect => sub ($pubsub, $conn) {
app->log->warn("Redis PubSub disconnected (Worker PID: $$)");
});
$pubsub->on(reconnect => sub ($pubsub, $conn) {
app->log->info("Redis PubSub reconnected (Worker PID: $$)");
# Resubscribe to channel after reconnect
subscribe_to_channel();
});
subscribe_to_channel();
}
sub subscribe_to_channel {
$pubsub->json('chat_channel')->listen('chat_channel' => sub ($pubsub, $data, $channel) {
eval {
for my $conn (values %connections) {
$conn->send(encode_json({
type => 'message',
worker_pid => $$,
text => $data->{text},
from => $data->{from}
}));
}
};
if ($@) {
app->log->error("Error broadcasting message: $@");
}
});
}
# WebSocket route
websocket '/chat' => sub ($c) {
my $id = sprintf "%s", $c->tx;
app->log->info("New WebSocket connection: $id (Worker PID: $$)");
# Store connection
$connections{$id} = $c->tx;
# Welcome message
eval {
$c->send(encode_json({
type => 'system',
worker_pid => $$,
text => "Connected to worker PID: $$"
}));
};
# Handle incoming WebSocket messages
$c->on(message => sub ($c, $msg) {
eval {
my $data = decode_json($msg);
use DDP; p $data;
return unless $pubsub; # Skip if Redis not ready
$pubsub->notify('chat_channel' => {
text => $data->{text},
from => $data->{from}
});
};
if ($@) {
app->log->error("Error processing message: $@");
eval {
$c->send(encode_json({
type => 'system',
worker_pid => $$,
text => "Error processing message"
}));
};
}
});
# Handle connection closed
$c->on(finish => sub ($c, $code = 1000, $reason = '') {
app->log->info("WebSocket closed: $id (Worker PID: $$)");
delete $connections{$id};
});
};
# Serve static index page
get '/' => sub ($c) {
$c->reply->static('index.html');
};
# Configure hypnotoad
app->config(hypnotoad => {
listen => ['http://*:8080'],
workers => 3,
});
# Ensure clean shutdown
app->hook(after_server_start => sub ($app, $server) {
$app->log->info("Server started with PID $$");
});
app->hook(before_server_stop => sub ($app, $server) {
$app->log->info("Server stopping, closing Redis connections");
$redis->disconnect if $redis;
});
app->start;
__DATA__
@@ index.html
<!DOCTYPE html>
<html>
<head>
<title>Mojo Chat</title>
<style>
body {
max-width: 800px;
margin: 20px auto;
padding: 0 20px;
font-family: Arial, sans-serif;
}
#messages {
height: 400px;
overflow-y: auto;
border: 1px solid #ccc;
margin-bottom: 10px;
padding: 10px;
background: #f9f9f9;
}
.system-message {
color: #666;
font-style: italic;
padding: 5px 0;
border-bottom: 1px solid #eee;
}
.chat-message {
margin: 5px 0;
padding: 5px 0;
border-bottom: 1px solid #eee;
}
.connection-status {
position: fixed;
top: 10px;
right: 10px;
padding: 5px 10px;
border-radius: 3px;
font-size: 14px;
}
.connected {
background: #dff0d8;
color: #3c763d;
}
.disconnected {
background: #f2dede;
color: #a94442;
}
.connecting {
background: #fcf8e3;
color: #8a6d3b;
}
.input-container {
display: flex;
gap: 10px;
margin-top: 10px;
}
input {
padding: 8px;
border: 1px solid #ccc;
border-radius: 4px;
}
#username {
width: 150px;
}
#message {
flex-grow: 1;
}
button {
padding: 8px 15px;
background: #337ab7;
color: white;
border: none;
border-radius: 4px;
cursor: pointer;
}
button:hover {
background: #286090;
}
button:disabled {
background: #cccccc;
cursor: not-allowed;
}
</style>
</head>
<body>
<div id="connection-status" class="connection-status"></div>
<div id="messages"></div>
<div class="input-container">
<input type="text" id="username" placeholder="Your name" value="User">
<input type="text" id="message" placeholder="Type a message...">
<button id="sendButton" onclick="sendMessage()">Send</button>
</div>
<script>
let ws = null;
let reconnectAttempts = 0;
const maxReconnectAttempts = 5;
const reconnectDelay = 3000;
const messages = document.getElementById('messages');
const messageInput = document.getElementById('message');
const usernameInput = document.getElementById('username');
const sendButton = document.getElementById('sendButton');
const connectionStatus = document.getElementById('connection-status');
function connect() {
if (ws && ws.readyState <= 1) return; // Already connecting or connected
updateConnectionStatus('connecting');
ws = new WebSocket(`ws://${window.location.host}/chat`);
setupWebSocket();
}
function setupWebSocket() {
ws.onopen = () => {
console.log('WebSocket connected');
updateConnectionStatus('connected');
reconnectAttempts = 0;
enableInterface(true);
};
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
appendMessage(data);
} catch (e) {
console.error('Error processing message:', e);
appendSystemMessage('Error processing message');
}
};
ws.onclose = () => {
console.log('WebSocket closed');
updateConnectionStatus('disconnected');
enableInterface(false);
handleReconnect();
};
ws.onerror = (error) => {
console.error('WebSocket error:', error);
appendSystemMessage('Connection error occurred');
};
}
function handleReconnect() {
if (reconnectAttempts < maxReconnectAttempts) {
reconnectAttempts++;
const delay = reconnectDelay * reconnectAttempts;
appendSystemMessage(`Reconnecting in ${delay/1000} seconds... (Attempt ${reconnectAttempts}/${maxReconnectAttempts})`);
setTimeout(connect, delay);
} else {
appendSystemMessage('Maximum reconnection attempts reached. Please refresh the page.');
}
}
function updateConnectionStatus(status) {
const statusTexts = {
'connected': 'Connected',
'disconnected': 'Disconnected',
'connecting': 'Connecting...'
};
connectionStatus.textContent = statusTexts[status];
connectionStatus.className = `connection-status ${status}`;
}
function enableInterface(enabled) {
messageInput.disabled = !enabled;
usernameInput.disabled = !enabled;
sendButton.disabled = !enabled;
}
function appendMessage(data) {
const div = document.createElement('div');
if (data.type === 'system') {
div.className = 'system-message';
div.textContent = `${data.text}`;
} else {
div.className = 'chat-message';
div.textContent = `[Worker ${data.worker_pid}] ${data.from}: ${data.text}`;
}
messages.appendChild(div);
messages.scrollTop = messages.scrollHeight;
}
function appendSystemMessage(text) {
const div = document.createElement('div');
div.className = 'system-message';
div.textContent = text;
messages.appendChild(div);
messages.scrollTop = messages.scrollHeight;
}
function sendMessage() {
if (messageInput.value && ws && ws.readyState === WebSocket.OPEN) {
try {
ws.send(JSON.stringify({
text: messageInput.value,
from: usernameInput.value || 'Anonymous'
}));
messageInput.value = '';
} catch (e) {
console.error('Error sending message:', e);
appendSystemMessage('Error sending message');
}
}
}
messageInput.addEventListener('keypress', (e) => {
if (e.key === 'Enter' && !e.shiftKey) {
e.preventDefault();
sendMessage();
}
});
// Initial connection
connect();
</script>
</body>
</html>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment