Last active
November 1, 2021 15:47
-
-
Save Swoorup/cf9590be68850e4bfa89392c5c8f519d to your computer and use it in GitHub Desktop.
Type safe message router/broker
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.implicits.* | |
import cats.Id | |
import scala.collection.mutable.ListBuffer | |
import fs2.Stream | |
// The message broking interface | |
trait MessageBroker[F[_], Router[_]]: | |
def push[MsgKey: ValueOf](value: Router[MsgKey]): F[Unit] | |
def subscribe[MsgKey: ValueOf]: Stream[F, Router[MsgKey]] | |
// ========================================================================= | |
/** Example of usage. | |
* Contains 2 message types: | |
* "order" -> OrderRecord, | |
* "person" -> PersonRecord | |
*/ | |
case class OrderRecord(qty: Int = 100, price: Double = 5.0) | |
case class PersonRecord(name: String = "David") | |
type MsgKeyOrder = "order" | |
type MsgKeyPerson = "person" | |
/// Type based mssage payload router | |
type CustomerMsgRouter[MsgKey] = MsgKey match | |
case MsgKeyOrder => OrderRecord | |
case MsgKeyPerson => PersonRecord | |
/// In memory example | |
object CustomerMessageBroker { | |
def apply(): MessageBroker[Id, CustomerMsgRouter] = new MessageBroker[Id, CustomerMsgRouter] { | |
// handle all the grunt work. | |
private val personBuffer = ListBuffer[PersonRecord]() | |
private val orderBuffer = ListBuffer[OrderRecord]() | |
private def getRoutedBuffer[MsgKey: ValueOf]: ListBuffer[CustomerMsgRouter[MsgKey]] = { | |
val msgType = valueOf[MsgKey] | |
msgType match | |
case "order" => orderBuffer.asInstanceOf[ListBuffer[CustomerMsgRouter[MsgKey]]] | |
case "person" => personBuffer.asInstanceOf[ListBuffer[CustomerMsgRouter[MsgKey]]] | |
} | |
def push[MsgKey: ValueOf](value: CustomerMsgRouter[MsgKey]): Id[Unit] = { | |
val buffer = getRoutedBuffer[MsgKey] | |
buffer.addOne(value) | |
} | |
def subscribe[MsgKey: ValueOf]: Stream[Id, CustomerMsgRouter[MsgKey]] = { | |
val buffer = getRoutedBuffer[MsgKey] | |
Stream.emits(buffer.toList) | |
} | |
} | |
} | |
@main def hello: Unit = { | |
val broker = CustomerMessageBroker() | |
broker.push[MsgKeyPerson](PersonRecord()) | |
broker.push[MsgKeyOrder](OrderRecord()) | |
// broker.push[MsgKeyPerson](OrderRecord()) // Compiler Error!!! | |
println(s"Orders: ${broker.subscribe[MsgKeyOrder].compile.toList}") | |
println(s"Persons: ${broker.subscribe[MsgKeyPerson].compile.toList}") | |
// Outputs: | |
// Orders: List(OrderRecord(100,5.0)) | |
// Persons: List(PersonRecord(David)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment