Skip to content

Instantly share code, notes, and snippets.

@samuelorji
Last active June 5, 2019 16:35
Show Gist options
  • Save samuelorji/6bf985dd638f8d0ad1d8c87ce7edb604 to your computer and use it in GitHub Desktop.
Save samuelorji/6bf985dd638f8d0ad1d8c87ce7edb604 to your computer and use it in GitHub Desktop.
object Run extends App
with JsonHelper{
val queueName : String = "Support"
val redisHost : String = "localhost"
val redisPort : Int = 6379
implicit val system : ActorSystem = ActorSystem()
implicit val timeout : Timeout = FiniteDuration(5,"seconds")
val redis = new RedisDbT() {
override val host: String = redisHost
override val port: Int = redisPort
override val timeout: FiniteDuration = FiniteDuration(5,"seconds")
override implicit val _system: ActorSystem = system
}
val redisClient : ActorRef = redis.getRedisInstance
val messagingService : ActorRef = system.actorOf(Props(new MessagingService(){
override def getRedisClient: RedisDbT = redis
}))
/*
This is simulating sending 10 messages to the messaging service
*/
(1 to 10).foreach(x => messagingService ! SendMessageRequest(x.toString,x.toString,true,queueName))
/*
this is the worker function that we will supply when creating our worker,
the element variable is the string that was gotten from redis
*/
val workerFunc : String => Unit = element => {
try {
val queuedData = element.parseJson.convertTo[QueueElement[SendMessageRequest]]
def shouldRetry(numTimes: Int): Boolean = numTimes >= 1
def handleQueuedElement(num: Int) : Unit = if (shouldRetry(num)) {
println(s"Data is ${queuedData.data} and number of times tried is ${queuedData.numRetry}")
redisClient ! EnqueueElementRequest(queuedData.data.queueName, queuedData.copy(numRetry = num - 1).toJson.toString())
} else {
//LOG Error That the user cannot be reached after trying NumTimes
println(s"Not Enqeueing ${queuedData.data} again,since it has been enqueued ${queuedData.numRetry} time(s)")
}
(messagingService ? queuedData.data.copy(enqueue = false)).onComplete{
case Success(res) =>
res match {
case SendMessageResponse(true) =>
//Yaay...our message was delivered, on to other things
case SendMessageResponse(false) =>
handleQueuedElement(queuedData.numRetry)
}
case Failure(ex) =>
handleQueuedElement(queuedData.numRetry)
}
}
catch {
case ex : ClassCastException =>
println(s"Error received : ${ex.getMessage}")
}
}
val worker = Worker.createWorker(workerFunc)
val supportScheduler = Scheduler.createScheduler(
worker = worker,
redis = redis,
maxNumDeq = 3,
queueName = queueName,
delay = FiniteDuration(10, "seconds")
)
val queueManagers = system.actorOf(QueueManager.createSchedulers(List(supportScheduler)))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment