Skip to content

Instantly share code, notes, and snippets.

@Swoorup
Last active November 2, 2021 04:45
Show Gist options
  • Save Swoorup/009832a386a4d0a3a6bf8d446845332f to your computer and use it in GitHub Desktop.
Save Swoorup/009832a386a4d0a3a6bf8d446845332f to your computer and use it in GitHub Desktop.
Type safe Message Broker/routing using MapK
import cats.Id
import cats.effect.{ IO, Resource }
import cats.effect.unsafe.IORuntime
import cats.implicits.*
import com.codedx.util.MapK
import fs2.Stream
import fs2.concurrent.Topic
import scala.collection.mutable.ListBuffer
import scala.concurrent.duration.*
implicit val ioRuntime: IORuntime = IORuntime.global
// The message broking interface
trait MessageBroker[F[_], K[_]]:
def push[T](key: K[T], value: T): F[Unit]
def subscribe[T](key: K[T]): Stream[F, T]
def subscribeMultiple[T <: Tuple: Tuple.IsMappedBy[K]](keys: T): Stream[F, Tuple.Union[Tuple.InverseMap[T, K]]]
// =========================================================================
// Sample API
// =========================================================================
/** Contains 2 message keys with their associated values.
* CustomerMessageKey.Order -> OrderRecord,
* CustomerMessageKey.Person -> PersonRecord
*/
enum CustomerMessageKey[T](val name: String):
case Order extends CustomerMessageKey[OrderRecord]("order")
case Person extends CustomerMessageKey[PersonRecord]("person")
case class OrderRecord(num: Long = 0, qty: Int = 100, price: Double = 5.0)
case class PersonRecord(name: String = "David")
trait CustomerMessageBroker[F[_]] extends MessageBroker[F, CustomerMessageKey]
// =========================================================================
// Implementation
// =========================================================================
object CustomerMessageBroker {
def apply(): IO[CustomerMessageBroker[IO]] =
for {
personTopic <- Topic[IO, PersonRecord]
orderTopic <- Topic[IO, OrderRecord]
mapK = MapK
.empty[CustomerMessageKey, Topic[IO, *]]
.updated(CustomerMessageKey.Order, orderTopic)
.updated(CustomerMessageKey.Person, personTopic)
} yield new CustomerMessageBroker[IO] {
def push[T](key: CustomerMessageKey[T], value: T): IO[Unit] = mapK(key).publish1(value) *> IO.unit
def subscribe[T](key: CustomerMessageKey[T]): Stream[IO, T] = mapK(key).subscribe(100)
def subscribeMultiple[T <: Tuple: Tuple.IsMappedBy[CustomerMessageKey]](
keys: T
): Stream[IO, Tuple.Union[Tuple.InverseMap[T, CustomerMessageKey]]] = {
val buffers = keys.toList.map(key => mapK(key.asInstanceOf[CustomerMessageKey[?]]))
val mergedStream = Stream.emits(buffers.map(_.subscribe(100))).parJoin(8)
mergedStream.asInstanceOf[Stream[IO, Tuple.Union[Tuple.InverseMap[T, CustomerMessageKey]]]]
}
}
}
@main def hello: Unit =
(for {
broker <- Resource.eval(CustomerMessageBroker())
// subscribe to `OrderRecord` only
_ <- broker
.subscribe(CustomerMessageKey.Order)
.evalMap(order => IO.println(s"Subcriber #1: => ${order}"))
.compile
.drain
.background
// subscribe to either `PersonRecord` or `OrderRecord`
_ <- broker
.subscribeMultiple((CustomerMessageKey.Order, CustomerMessageKey.Person))
.evalMap(msg => IO.println(s"Subcriber #2: ${msg}")) // inferred as `OrderRecord | (PersonRecord | Nothing)`
.compile
.drain
.background
_ <- Stream
.awakeEvery[IO](1.second)
.evalScan(0)((counter, _) => IO(counter + 1))
.evalMap(c =>
broker.push(CustomerMessageKey.Order, OrderRecord(c, c * 10))
*> broker.push(CustomerMessageKey.Person, PersonRecord())
)
.compile
.drain
.background
} yield ()).use(_ => IO.readLine).unsafeRunSync()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment