Created
January 13, 2023 20:19
-
-
Save soujiro32167/8c913df449702bfbf24ab0c7514cee01 to your computer and use it in GitHub Desktop.
Doobie postgres listen + notify CDC
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect.{IO, IOApp, Resource} | |
import doobie.postgres.PHC | |
import doobie.{ConnectionIO, HC, LogHandler, Transactor} | |
import fs2.Pipe | |
import org.postgresql.PGNotification | |
import scala.concurrent.duration.* | |
import cats.syntax.all.* | |
import doobie.implicits.* | |
import fs2.Stream | |
object pgnotify extends IOApp.Simple { | |
val xa = Transactor.fromDriverManager[IO]( | |
"org.postgresql.Driver", "jdbc:postgresql://localhost:5432/postgres", "postgres", "postgres" | |
) | |
/** A resource that listens on a channel and unlistens when we're done. */ | |
def channel(name: String): Resource[ConnectionIO, Unit] = | |
Resource.make(PHC.pgListen(name) *> HC.commit)(_ => PHC.pgUnlisten(name) *> HC.commit) | |
/** | |
* Stream of PGNotifications on the specified channel, polling at the specified | |
* rate. Note that this stream, when run, will commit the current transaction. | |
*/ | |
def notificationStream( | |
channelName: String, | |
pollingInterval: FiniteDuration | |
): Stream[IO, PGNotification] = { | |
val inner: Pipe[ConnectionIO, FiniteDuration, PGNotification] = ticks => for { | |
_ <- Stream.resource(channel(channelName)) | |
_ <- ticks | |
ns <- Stream.eval(PHC.pgGetNotifications <* HC.commit) | |
n <- Stream.emits(ns) | |
} yield n | |
Stream.awakeEvery[IO](pollingInterval).through(inner.transact(xa)) | |
} | |
val create = | |
sql""" | |
create table if not exists t ( | |
state text | |
) | |
""".update.run | |
val insert = sql"insert into t (state) values ('b')".updateWithLogHandler(LogHandler.jdkLogHandler).run | |
val update = | |
sql""" | |
update t set state = state || '1' where state = 'b' | |
returning (select pg_notify('state_change', 'old: ' || old.state || ', new: ' || t.state) from t as old where state = 'b' limit 1) | |
""".updateWithLogHandler(LogHandler.jdkLogHandler).withGeneratedKeys[Unit]("1").compile.last | |
val app = for { | |
f <- notificationStream("state_change", 500 millis) | |
.map(n => show"${n.getPID} ${n.getName} ${n.getParameter}") | |
.take(1) | |
.evalTap(IO.println) | |
.compile | |
.drain | |
.start | |
_ <- (create *> insert *> update).transact(xa) | |
_ <- f.join | |
} yield () | |
override def run: IO[Unit] = | |
app.void | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment