Created
October 23, 2024 06:24
-
-
Save renatocron/a02ee51f3ec06119d4ae34f21994f64e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# app.pl | |
use Mojolicious::Lite -signatures; | |
use Mojo::Redis; | |
use Mojo::JSON qw(encode_json decode_json); | |
use Mojo::IOLoop; | |
# Initialize Redis with explicit connection handling | |
my $redis = Mojo::Redis->new('redis://127.0.0.1:6379'); | |
$redis->on(error => sub ($redis, $error) { | |
app->log->error("Redis error: $error"); | |
}); | |
# Initialize PubSub after Redis connection is established | |
my $pubsub = $redis->pubsub; | |
setup_pubsub(); | |
# Store WebSocket connections for this worker | |
my %connections; | |
# Setup PubSub handlers | |
sub setup_pubsub { | |
$pubsub->on(error => sub ($pubsub, $error) { | |
app->log->error("PubSub error: $error"); | |
}); | |
$pubsub->on(disconnect => sub ($pubsub, $conn) { | |
app->log->warn("Redis PubSub disconnected (Worker PID: $$)"); | |
}); | |
$pubsub->on(reconnect => sub ($pubsub, $conn) { | |
app->log->info("Redis PubSub reconnected (Worker PID: $$)"); | |
# Resubscribe to channel after reconnect | |
subscribe_to_channel(); | |
}); | |
subscribe_to_channel(); | |
} | |
sub subscribe_to_channel { | |
$pubsub->json('chat_channel')->listen('chat_channel' => sub ($pubsub, $data, $channel) { | |
eval { | |
for my $conn (values %connections) { | |
$conn->send(encode_json({ | |
type => 'message', | |
worker_pid => $$, | |
text => $data->{text}, | |
from => $data->{from} | |
})); | |
} | |
}; | |
if ($@) { | |
app->log->error("Error broadcasting message: $@"); | |
} | |
}); | |
} | |
# WebSocket route | |
websocket '/chat' => sub ($c) { | |
my $id = sprintf "%s", $c->tx; | |
app->log->info("New WebSocket connection: $id (Worker PID: $$)"); | |
# Store connection | |
$connections{$id} = $c->tx; | |
# Welcome message | |
eval { | |
$c->send(encode_json({ | |
type => 'system', | |
worker_pid => $$, | |
text => "Connected to worker PID: $$" | |
})); | |
}; | |
# Handle incoming WebSocket messages | |
$c->on(message => sub ($c, $msg) { | |
eval { | |
my $data = decode_json($msg); | |
use DDP; p $data; | |
return unless $pubsub; # Skip if Redis not ready | |
$pubsub->notify('chat_channel' => { | |
text => $data->{text}, | |
from => $data->{from} | |
}); | |
}; | |
if ($@) { | |
app->log->error("Error processing message: $@"); | |
eval { | |
$c->send(encode_json({ | |
type => 'system', | |
worker_pid => $$, | |
text => "Error processing message" | |
})); | |
}; | |
} | |
}); | |
# Handle connection closed | |
$c->on(finish => sub ($c, $code = 1000, $reason = '') { | |
app->log->info("WebSocket closed: $id (Worker PID: $$)"); | |
delete $connections{$id}; | |
}); | |
}; | |
# Serve static index page | |
get '/' => sub ($c) { | |
$c->reply->static('index.html'); | |
}; | |
# Configure hypnotoad | |
app->config(hypnotoad => { | |
listen => ['http://*:8080'], | |
workers => 3, | |
}); | |
# Ensure clean shutdown | |
app->hook(after_server_start => sub ($app, $server) { | |
$app->log->info("Server started with PID $$"); | |
}); | |
app->hook(before_server_stop => sub ($app, $server) { | |
$app->log->info("Server stopping, closing Redis connections"); | |
$redis->disconnect if $redis; | |
}); | |
app->start; | |
__DATA__ | |
@@ index.html | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<title>Mojo Chat</title> | |
<style> | |
body { | |
max-width: 800px; | |
margin: 20px auto; | |
padding: 0 20px; | |
font-family: Arial, sans-serif; | |
} | |
#messages { | |
height: 400px; | |
overflow-y: auto; | |
border: 1px solid #ccc; | |
margin-bottom: 10px; | |
padding: 10px; | |
background: #f9f9f9; | |
} | |
.system-message { | |
color: #666; | |
font-style: italic; | |
padding: 5px 0; | |
border-bottom: 1px solid #eee; | |
} | |
.chat-message { | |
margin: 5px 0; | |
padding: 5px 0; | |
border-bottom: 1px solid #eee; | |
} | |
.connection-status { | |
position: fixed; | |
top: 10px; | |
right: 10px; | |
padding: 5px 10px; | |
border-radius: 3px; | |
font-size: 14px; | |
} | |
.connected { | |
background: #dff0d8; | |
color: #3c763d; | |
} | |
.disconnected { | |
background: #f2dede; | |
color: #a94442; | |
} | |
.connecting { | |
background: #fcf8e3; | |
color: #8a6d3b; | |
} | |
.input-container { | |
display: flex; | |
gap: 10px; | |
margin-top: 10px; | |
} | |
input { | |
padding: 8px; | |
border: 1px solid #ccc; | |
border-radius: 4px; | |
} | |
#username { | |
width: 150px; | |
} | |
#message { | |
flex-grow: 1; | |
} | |
button { | |
padding: 8px 15px; | |
background: #337ab7; | |
color: white; | |
border: none; | |
border-radius: 4px; | |
cursor: pointer; | |
} | |
button:hover { | |
background: #286090; | |
} | |
button:disabled { | |
background: #cccccc; | |
cursor: not-allowed; | |
} | |
</style> | |
</head> | |
<body> | |
<div id="connection-status" class="connection-status"></div> | |
<div id="messages"></div> | |
<div class="input-container"> | |
<input type="text" id="username" placeholder="Your name" value="User"> | |
<input type="text" id="message" placeholder="Type a message..."> | |
<button id="sendButton" onclick="sendMessage()">Send</button> | |
</div> | |
<script> | |
let ws = null; | |
let reconnectAttempts = 0; | |
const maxReconnectAttempts = 5; | |
const reconnectDelay = 3000; | |
const messages = document.getElementById('messages'); | |
const messageInput = document.getElementById('message'); | |
const usernameInput = document.getElementById('username'); | |
const sendButton = document.getElementById('sendButton'); | |
const connectionStatus = document.getElementById('connection-status'); | |
function connect() { | |
if (ws && ws.readyState <= 1) return; // Already connecting or connected | |
updateConnectionStatus('connecting'); | |
ws = new WebSocket(`ws://${window.location.host}/chat`); | |
setupWebSocket(); | |
} | |
function setupWebSocket() { | |
ws.onopen = () => { | |
console.log('WebSocket connected'); | |
updateConnectionStatus('connected'); | |
reconnectAttempts = 0; | |
enableInterface(true); | |
}; | |
ws.onmessage = (event) => { | |
try { | |
const data = JSON.parse(event.data); | |
appendMessage(data); | |
} catch (e) { | |
console.error('Error processing message:', e); | |
appendSystemMessage('Error processing message'); | |
} | |
}; | |
ws.onclose = () => { | |
console.log('WebSocket closed'); | |
updateConnectionStatus('disconnected'); | |
enableInterface(false); | |
handleReconnect(); | |
}; | |
ws.onerror = (error) => { | |
console.error('WebSocket error:', error); | |
appendSystemMessage('Connection error occurred'); | |
}; | |
} | |
function handleReconnect() { | |
if (reconnectAttempts < maxReconnectAttempts) { | |
reconnectAttempts++; | |
const delay = reconnectDelay * reconnectAttempts; | |
appendSystemMessage(`Reconnecting in ${delay/1000} seconds... (Attempt ${reconnectAttempts}/${maxReconnectAttempts})`); | |
setTimeout(connect, delay); | |
} else { | |
appendSystemMessage('Maximum reconnection attempts reached. Please refresh the page.'); | |
} | |
} | |
function updateConnectionStatus(status) { | |
const statusTexts = { | |
'connected': 'Connected', | |
'disconnected': 'Disconnected', | |
'connecting': 'Connecting...' | |
}; | |
connectionStatus.textContent = statusTexts[status]; | |
connectionStatus.className = `connection-status ${status}`; | |
} | |
function enableInterface(enabled) { | |
messageInput.disabled = !enabled; | |
usernameInput.disabled = !enabled; | |
sendButton.disabled = !enabled; | |
} | |
function appendMessage(data) { | |
const div = document.createElement('div'); | |
if (data.type === 'system') { | |
div.className = 'system-message'; | |
div.textContent = `${data.text}`; | |
} else { | |
div.className = 'chat-message'; | |
div.textContent = `[Worker ${data.worker_pid}] ${data.from}: ${data.text}`; | |
} | |
messages.appendChild(div); | |
messages.scrollTop = messages.scrollHeight; | |
} | |
function appendSystemMessage(text) { | |
const div = document.createElement('div'); | |
div.className = 'system-message'; | |
div.textContent = text; | |
messages.appendChild(div); | |
messages.scrollTop = messages.scrollHeight; | |
} | |
function sendMessage() { | |
if (messageInput.value && ws && ws.readyState === WebSocket.OPEN) { | |
try { | |
ws.send(JSON.stringify({ | |
text: messageInput.value, | |
from: usernameInput.value || 'Anonymous' | |
})); | |
messageInput.value = ''; | |
} catch (e) { | |
console.error('Error sending message:', e); | |
appendSystemMessage('Error sending message'); | |
} | |
} | |
} | |
messageInput.addEventListener('keypress', (e) => { | |
if (e.key === 'Enter' && !e.shiftKey) { | |
e.preventDefault(); | |
sendMessage(); | |
} | |
}); | |
// Initial connection | |
connect(); | |
</script> | |
</body> | |
</html> |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment