Skip to content

Instantly share code, notes, and snippets.

@jberger
Created June 4, 2016 22:16
Show Gist options
  • Save jberger/df26d17143cb9baf78034d3ef7eb1d60 to your computer and use it in GitHub Desktop.
Save jberger/df26d17143cb9baf78034d3ef7eb1d60 to your computer and use it in GitHub Desktop.
package Mojolicious::Plugin::Multiplex;
use Mojo::Base 'Mojolicious::Plugin';
use Mojo::EventEmitter;
use Mojo::JSON;
sub register {
my ($plugin, $app, $conf) = @_;
$app->helper('multiplex.attach' => sub {
my ($c, $cb) = (shift, pop);
my $tx = $c->tx;
return unless $c->tx->is_websocket;
my $events = Mojo::EventEmitter->new;
$c->on(text => sub {
my ($tx, $bytes) = @_;
my ($type, $channel, $payload) = split /,/, $bytes, 3;
return $c->app->log->warn("unknown multiplex message type $type");
unless $type =~ /^sub|msg|uns$/;
$events->emit($type, $channel, $payload);
});
return $events;
});
$app->helper('multiplex.send' => sub {
my ($c, $channel, $payload) = @_;
$c->send("msg,$channel,$payload");
});
$app->helper('muliplex.pg.relay' => sub {
my ($c, $pubsub) = @_;
die 'a Mojo::Pg::PubSub instance is required'
unless eval { $pubsub->isa('Mojo::Pg::PubSub') };
my $events = $c->multiplex->attach;
my %channels;
$events->on(sub => sub {
my (undef, $channel) = @_;
return if exists $channels{$channel};
$channels{$channel} = $pubsub->listen($channel => sub {
my ($pubsub, $payload) = @_;
$c->multiplex->send($channel => $payload);
});
});
$events->on(msg => sub {
my (undef, $channel, $payload) = @_;
$pubsub->notify($channel => $payload);
});
$events->on(uns => sub {
my (undef, $channel) = @_;
return unless my $cb = delete $channels{$channel};
$pubsub->unlisten($channel => $cb);
});
$c->on(finish => sub {
$pubsub->unlisten($_ => $channels{$_}) for keys %channels;
});
});
}
1;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment