Last active
November 2, 2021 04:45
-
-
Save Swoorup/009832a386a4d0a3a6bf8d446845332f to your computer and use it in GitHub Desktop.
Type safe Message Broker/routing using MapK
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.Id | |
import cats.effect.{ IO, Resource } | |
import cats.effect.unsafe.IORuntime | |
import cats.implicits.* | |
import com.codedx.util.MapK | |
import fs2.Stream | |
import fs2.concurrent.Topic | |
import scala.collection.mutable.ListBuffer | |
import scala.concurrent.duration.* | |
implicit val ioRuntime: IORuntime = IORuntime.global | |
// The message broking interface | |
trait MessageBroker[F[_], K[_]]: | |
def push[T](key: K[T], value: T): F[Unit] | |
def subscribe[T](key: K[T]): Stream[F, T] | |
def subscribeMultiple[T <: Tuple: Tuple.IsMappedBy[K]](keys: T): Stream[F, Tuple.Union[Tuple.InverseMap[T, K]]] | |
// ========================================================================= | |
// Sample API | |
// ========================================================================= | |
/** Contains 2 message keys with their associated values. | |
* CustomerMessageKey.Order -> OrderRecord, | |
* CustomerMessageKey.Person -> PersonRecord | |
*/ | |
enum CustomerMessageKey[T](val name: String): | |
case Order extends CustomerMessageKey[OrderRecord]("order") | |
case Person extends CustomerMessageKey[PersonRecord]("person") | |
case class OrderRecord(num: Long = 0, qty: Int = 100, price: Double = 5.0) | |
case class PersonRecord(name: String = "David") | |
trait CustomerMessageBroker[F[_]] extends MessageBroker[F, CustomerMessageKey] | |
// ========================================================================= | |
// Implementation | |
// ========================================================================= | |
object CustomerMessageBroker { | |
def apply(): IO[CustomerMessageBroker[IO]] = | |
for { | |
personTopic <- Topic[IO, PersonRecord] | |
orderTopic <- Topic[IO, OrderRecord] | |
mapK = MapK | |
.empty[CustomerMessageKey, Topic[IO, *]] | |
.updated(CustomerMessageKey.Order, orderTopic) | |
.updated(CustomerMessageKey.Person, personTopic) | |
} yield new CustomerMessageBroker[IO] { | |
def push[T](key: CustomerMessageKey[T], value: T): IO[Unit] = mapK(key).publish1(value) *> IO.unit | |
def subscribe[T](key: CustomerMessageKey[T]): Stream[IO, T] = mapK(key).subscribe(100) | |
def subscribeMultiple[T <: Tuple: Tuple.IsMappedBy[CustomerMessageKey]]( | |
keys: T | |
): Stream[IO, Tuple.Union[Tuple.InverseMap[T, CustomerMessageKey]]] = { | |
val buffers = keys.toList.map(key => mapK(key.asInstanceOf[CustomerMessageKey[?]])) | |
val mergedStream = Stream.emits(buffers.map(_.subscribe(100))).parJoin(8) | |
mergedStream.asInstanceOf[Stream[IO, Tuple.Union[Tuple.InverseMap[T, CustomerMessageKey]]]] | |
} | |
} | |
} | |
@main def hello: Unit = | |
(for { | |
broker <- Resource.eval(CustomerMessageBroker()) | |
// subscribe to `OrderRecord` only | |
_ <- broker | |
.subscribe(CustomerMessageKey.Order) | |
.evalMap(order => IO.println(s"Subcriber #1: => ${order}")) | |
.compile | |
.drain | |
.background | |
// subscribe to either `PersonRecord` or `OrderRecord` | |
_ <- broker | |
.subscribeMultiple((CustomerMessageKey.Order, CustomerMessageKey.Person)) | |
.evalMap(msg => IO.println(s"Subcriber #2: ${msg}")) // inferred as `OrderRecord | (PersonRecord | Nothing)` | |
.compile | |
.drain | |
.background | |
_ <- Stream | |
.awakeEvery[IO](1.second) | |
.evalScan(0)((counter, _) => IO(counter + 1)) | |
.evalMap(c => | |
broker.push(CustomerMessageKey.Order, OrderRecord(c, c * 10)) | |
*> broker.push(CustomerMessageKey.Person, PersonRecord()) | |
) | |
.compile | |
.drain | |
.background | |
} yield ()).use(_ => IO.readLine).unsafeRunSync() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment