Skip to content

Instantly share code, notes, and snippets.

@gabfssilva
Created October 11, 2018 22:56
Show Gist options
  • Save gabfssilva/e695440decb7d65329dc547b9d0404bb to your computer and use it in GitHub Desktop.
Save gabfssilva/e695440decb7d65329dc547b9d0404bb to your computer and use it in GitHub Desktop.
import java.util.UUID
import akka.Done
import akka.actor.{Actor, ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives
import akka.stream.{ActorMaterializer, DelayOverflowStrategy}
import akka.stream.alpakka.sqs.scaladsl.{SqsAckSink, SqsSource}
import com.amazonaws.AmazonWebServiceRequest
import com.amazonaws.client.builder.AwsClientBuilder.EndpointConfiguration
import com.amazonaws.handlers.AsyncHandler
import com.amazonaws.services.sqs.{AmazonSQSAsync, AmazonSQSAsyncClientBuilder}
import com.amazonaws.services.sqs.model.{CreateQueueRequest, CreateQueueResult, SendMessageRequest, SendMessageResult}
import akka.pattern._
import akka.stream.alpakka.sqs.{MessageAction, SqsSourceSettings}
import akka.stream.scaladsl.RestartSource
import akka.util.Timeout
import com.amazonaws.services.sns.AmazonSNSAsyncClientBuilder
import com.amazonaws.services.sns.model.{PublishRequest, PublishResult}
import com.amazonaws.services.sns.util.Topics
import scala.concurrent.duration._
import scala.collection.mutable
import scala.concurrent.{ExecutionContext, Future, Promise}
case class ScalaAsyncHandler[I <: AmazonWebServiceRequest, O]() extends AsyncHandler[I, O] {
private val promise: Promise[O] = Promise[O]()
val future: Future[O] = promise.future
override def onError(exception: Exception): Unit = promise.failure(exception)
override def onSuccess(request: I, result: O): Unit = promise.success(result)
}
object ScalaAsyncHandler {
implicit class AmazonSQSAsyncImplicits(sqs: AmazonSQSAsync) {
def createQueueScala(name: String): Future[CreateQueueResult] = {
val p = ScalaAsyncHandler[CreateQueueRequest, CreateQueueResult]()
sqs.createQueueAsync(name, p)
p.future
}
def sendMessageScala(queueUrl: String, message: String): Future[SendMessageResult] = {
val p = ScalaAsyncHandler[SendMessageRequest, SendMessageResult]()
sqs.sendMessageAsync(queueUrl, message, p)
p.future
}
}
}
import ScalaAsyncHandler._
case class Authorize(id: Long,
promise: Promise[String])
case class Authorized(id: Long)
class PaymentActor(notifier: PaymentNotifier) extends Actor {
val authorizations: mutable.Map[Long, Promise[String]] = mutable.Map()
override def receive: Receive = {
case Authorize(id, promise) =>
authorizations.put(id, promise)
notifier.authorize(id)
println(s"Authorize($id, $promise) done")
case Authorized(id) =>
authorizations.remove(id) match {
case None =>
println(s"ups, no order id found with this number $id")
case Some(p) =>
p.success("successful")
}
sender() ! Done
println(s"Authorized($id) done")
}
}
class PaymentNotifier(sqs: AmazonSQSAsync)(implicit ec: ExecutionContext) {
val queueUrl: Future[CreateQueueResult] = sqs.createQueueScala("payment-notification")
def authorize(id: Long): Future[SendMessageResult] = {
for {
url <- queueUrl
result <- sqs.sendMessageScala(url.getQueueUrl, id.toString)
} yield result
}
}
object SynchronousPaymentEndpoint extends App with Directives {
implicit val sys: ActorSystem = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val ec: ExecutionContext = sys.dispatcher
implicit val timeout = Timeout(10 second)
implicit val sqs = AmazonSQSAsyncClientBuilder
.standard()
.withEndpointConfiguration(new EndpointConfiguration("http://localhost:9324", ""))
.build()
implicit val sns = AmazonSNSAsyncClientBuilder
.standard()
.withEndpointConfiguration(new EndpointConfiguration("http://localhost:9911", ""))
.build()
val paymentActor = sys.actorOf(Props(new PaymentActor(new PaymentNotifier(sqs))))
val instanceId = UUID.randomUUID().toString.replace("-", "")
val paymentResponseTopic = sns.createTopic("payment-response").getTopicArn
private val paymentResponseQueue: String = sqs.createQueue(s"payment-response-$instanceId").getQueueUrl
private val paymentNotificationQueue: String = sqs.createQueue(s"payment-notification").getQueueUrl
RestartSource
.onFailuresWithBackoff(1 second, 1 seconds, 0.1)(
() =>
SqsSource(paymentResponseQueue, SqsSourceSettings.Defaults.withWaitTimeSeconds(0))
.mapAsync(10) { req =>
paymentActor ? Authorized(req.getBody.toInt) map { _ => req -> MessageAction.Delete }
}
).runWith(SqsAckSink(paymentResponseQueue))
RestartSource
.onFailuresWithBackoff(1 second, 1 seconds, 0.1)(
() =>
SqsSource(paymentNotificationQueue, SqsSourceSettings.Defaults.withWaitTimeSeconds(0))
.delay(4 seconds, DelayOverflowStrategy.backpressure)
.mapAsync(10) { req =>
sqs.sendMessageScala(paymentResponseQueue, req.getBody) map { _ => req -> MessageAction.Delete }
}
)
.runWith(SqsAckSink(paymentNotificationQueue))
Topics.subscribeQueue(sns, sqs, paymentResponseTopic, paymentResponseQueue)
val routes = path("authorize" / LongNumber) { id =>
get {
val promise = Promise[String]()
paymentActor ! Authorize(id, promise)
complete(promise.future)
}
}
Http().bindAndHandle(routes, "localhost", 8080).onComplete(println)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment