Last active
May 11, 2020 10:38
-
-
Save geofmureithi-zz/1106d416f52bc4fe096717fcec4ac860 to your computer and use it in GitHub Desktop.
Actix + Postgres Notification
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
pub mod notify { | |
use actix::prelude::*; | |
use serde::{Deserialize, Serialize}; | |
use futures::{ | |
channel::mpsc, future, stream, StreamExt, TryStreamExt, FutureExt, TryFutureExt | |
}; | |
use tokio_postgres::{ | |
tls::NoTls, AsyncMessage, Config, Notification | |
}; | |
// Allows listen via tokio_postgres | |
pub struct DbNotifier<T> | |
where | |
T : Message + Send + From<PgMessage> + 'static, | |
T::Result: Send | |
{ | |
config: Config, | |
channel: String, | |
addr: Recipient<T> | |
} | |
#[derive(Message)] | |
#[rtype(result = "()")] | |
pub struct PgMessage(pub Notification); | |
impl <T> DbNotifier<T> | |
where | |
T : Message + Send + From<PgMessage> + 'static, | |
T::Result: Send | |
{ | |
pub fn new(cfg: &Config, channel: &str, recipient: Recipient<T>) -> Self { | |
let config = cfg.clone(); | |
DbNotifier{ | |
config, | |
channel: channel.to_owned(), | |
addr: recipient | |
} | |
} | |
} | |
impl <T> Actor for DbNotifier<T> | |
where | |
T : Message + Send + From<PgMessage>, | |
T::Result: Send | |
{ | |
type Context = Context<Self>; | |
fn started(&mut self, _ctx: &mut Self::Context){ | |
let config = self.config.clone(); | |
let (tx, mut rx) = mpsc::unbounded(); | |
let addr = self.addr.clone(); | |
let channel = self.channel.clone(); | |
actix::spawn(async move { | |
let (client, mut connection) = config.connect(NoTls).await.unwrap(); | |
let stream = stream::poll_fn(move |cx| connection.poll_message(cx)).map_err(|e| panic!(e)); | |
let connection = stream.forward(tx).map(|r| r.unwrap()); | |
actix::spawn(connection); | |
let listen = format!("LISTEN {:?};", channel); | |
client | |
.batch_execute(&listen) | |
.await | |
.unwrap(); | |
while let Some(AsyncMessage::Notification(item)) = rx.next().await { | |
let _res = addr.send(PgMessage(item).into()).await.unwrap(); | |
} | |
drop(client); | |
}); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment