Skip to content

Instantly share code, notes, and snippets.

@Swoorup
Last active November 1, 2021 15:47
Show Gist options
  • Save Swoorup/cf9590be68850e4bfa89392c5c8f519d to your computer and use it in GitHub Desktop.
Save Swoorup/cf9590be68850e4bfa89392c5c8f519d to your computer and use it in GitHub Desktop.
Type safe message router/broker
import cats.implicits.*
import cats.Id
import scala.collection.mutable.ListBuffer
import fs2.Stream
// The message broking interface
trait MessageBroker[F[_], Router[_]]:
def push[MsgKey: ValueOf](value: Router[MsgKey]): F[Unit]
def subscribe[MsgKey: ValueOf]: Stream[F, Router[MsgKey]]
// =========================================================================
/** Example of usage.
* Contains 2 message types:
* "order" -> OrderRecord,
* "person" -> PersonRecord
*/
case class OrderRecord(qty: Int = 100, price: Double = 5.0)
case class PersonRecord(name: String = "David")
type MsgKeyOrder = "order"
type MsgKeyPerson = "person"
/// Type based mssage payload router
type CustomerMsgRouter[MsgKey] = MsgKey match
case MsgKeyOrder => OrderRecord
case MsgKeyPerson => PersonRecord
/// In memory example
object CustomerMessageBroker {
def apply(): MessageBroker[Id, CustomerMsgRouter] = new MessageBroker[Id, CustomerMsgRouter] {
// handle all the grunt work.
private val personBuffer = ListBuffer[PersonRecord]()
private val orderBuffer = ListBuffer[OrderRecord]()
private def getRoutedBuffer[MsgKey: ValueOf]: ListBuffer[CustomerMsgRouter[MsgKey]] = {
val msgType = valueOf[MsgKey]
msgType match
case "order" => orderBuffer.asInstanceOf[ListBuffer[CustomerMsgRouter[MsgKey]]]
case "person" => personBuffer.asInstanceOf[ListBuffer[CustomerMsgRouter[MsgKey]]]
}
def push[MsgKey: ValueOf](value: CustomerMsgRouter[MsgKey]): Id[Unit] = {
val buffer = getRoutedBuffer[MsgKey]
buffer.addOne(value)
}
def subscribe[MsgKey: ValueOf]: Stream[Id, CustomerMsgRouter[MsgKey]] = {
val buffer = getRoutedBuffer[MsgKey]
Stream.emits(buffer.toList)
}
}
}
@main def hello: Unit = {
val broker = CustomerMessageBroker()
broker.push[MsgKeyPerson](PersonRecord())
broker.push[MsgKeyOrder](OrderRecord())
// broker.push[MsgKeyPerson](OrderRecord()) // Compiler Error!!!
println(s"Orders: ${broker.subscribe[MsgKeyOrder].compile.toList}")
println(s"Persons: ${broker.subscribe[MsgKeyPerson].compile.toList}")
// Outputs:
// Orders: List(OrderRecord(100,5.0))
// Persons: List(PersonRecord(David))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment