Skip to content

Instantly share code, notes, and snippets.

@geofmureithi-zz
Last active May 11, 2020 10:38
Show Gist options
  • Save geofmureithi-zz/1106d416f52bc4fe096717fcec4ac860 to your computer and use it in GitHub Desktop.
Save geofmureithi-zz/1106d416f52bc4fe096717fcec4ac860 to your computer and use it in GitHub Desktop.
Actix + Postgres Notification
pub mod notify {
use actix::prelude::*;
use serde::{Deserialize, Serialize};
use futures::{
channel::mpsc, future, stream, StreamExt, TryStreamExt, FutureExt, TryFutureExt
};
use tokio_postgres::{
tls::NoTls, AsyncMessage, Config, Notification
};
// Allows listen via tokio_postgres
pub struct DbNotifier<T>
where
T : Message + Send + From<PgMessage> + 'static,
T::Result: Send
{
config: Config,
channel: String,
addr: Recipient<T>
}
#[derive(Message)]
#[rtype(result = "()")]
pub struct PgMessage(pub Notification);
impl <T> DbNotifier<T>
where
T : Message + Send + From<PgMessage> + 'static,
T::Result: Send
{
pub fn new(cfg: &Config, channel: &str, recipient: Recipient<T>) -> Self {
let config = cfg.clone();
DbNotifier{
config,
channel: channel.to_owned(),
addr: recipient
}
}
}
impl <T> Actor for DbNotifier<T>
where
T : Message + Send + From<PgMessage>,
T::Result: Send
{
type Context = Context<Self>;
fn started(&mut self, _ctx: &mut Self::Context){
let config = self.config.clone();
let (tx, mut rx) = mpsc::unbounded();
let addr = self.addr.clone();
let channel = self.channel.clone();
actix::spawn(async move {
let (client, mut connection) = config.connect(NoTls).await.unwrap();
let stream = stream::poll_fn(move |cx| connection.poll_message(cx)).map_err(|e| panic!(e));
let connection = stream.forward(tx).map(|r| r.unwrap());
actix::spawn(connection);
let listen = format!("LISTEN {:?};", channel);
client
.batch_execute(&listen)
.await
.unwrap();
while let Some(AsyncMessage::Notification(item)) = rx.next().await {
let _res = addr.send(PgMessage(item).into()).await.unwrap();
}
drop(client);
});
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment