Skip to content

Instantly share code, notes, and snippets.

@dbousamra
Created April 28, 2015 22:04
Show Gist options
  • Select an option

  • Save dbousamra/f9de7155cb117ebc1a0e to your computer and use it in GitHub Desktop.

Select an option

Save dbousamra/f9de7155cb117ebc1a0e to your computer and use it in GitHub Desktop.
object Example {
def main(args: Array[String]) {
val client = new AmazonDynamoDBAsyncClient(creds)
val system = ActorSystem()
// THIS GUY. I am passing in an ec to DynamoDb. Is it safe to have a reference here? I have to have one because the map on line 10 complains.
implicit val ec = system.dispatcher
val snapshotDao = new DynamoDbSnapshotDao(client, ec)
val f = snapshotDao.getSnapshot2().map { x =>
x match {
case Left(e) => println(e)
case Right(a) => println(a)
}
}
Await.result(f, 10.seconds)
}
}
package com.cammy.dao
import java.util.concurrent.{Future => JFuture}
import com.amazonaws.AmazonWebServiceRequest
import com.amazonaws.handlers.AsyncHandler
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClient
import com.amazonaws.services.dynamodbv2.document._
import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec
import com.amazonaws.services.dynamodbv2.model.{GetItemRequest, GetItemResult}
import scala.concurrent.{ExecutionContext, Future, Promise}
sealed trait KVOperation[T, S]
sealed trait DynamoDbOperation[T, S] extends KVOperation[T, S]
case class DynamoDbPutOperation(value: Item) extends DynamoDbOperation[Item, Item]
case class DynamoDbGetOperation(value: GetItemRequest) extends DynamoDbOperation[GetItemRequest, GetItemResult]
case class DynamoDbDeleteOperation(value: PrimaryKey) extends DynamoDbOperation[PrimaryKey, DeleteItemOutcome]
case class DynamoDbQueryOperation(value: QuerySpec) extends DynamoDbOperation[PrimaryKey, Stream[Item]]
case class AkkaDynamoClient(ec: ExecutionContext, client: AmazonDynamoDBAsyncClient)
case class DynamoDbAction[A](run: (ExecutionContext, AmazonDynamoDBAsyncClient) => Future[Either[Exception, A]]) {
def map[B](f: A => B): DynamoDbAction[B] =
DynamoDbAction((ec, client) => run(ec, client).map {
r => r.fold(Left.apply, a => Right(f(a)))
}(ec))
// def flatMap[B](f: A => DynamoDbAction[B]): DynamoDbAction[B] =
// DynamoDbAction((ec, client) => run(ec, client).flatMap { a =>
// a match {
// case Left(e) => e
// case Right(a) => f(a)
// }
// }(ec))
}
object DynamoDbAction {
def withDynamo[T](f: (ExecutionContext, AmazonDynamoDBAsyncClient) => Future[Either[Exception, T]]): DynamoDbAction[T] =
DynamoDbAction(f)
//
// def ok[A](value: A): DynamoDbAction[A] =
// withDynamo(_ => Right(value))
//
// def fail[A](error: Exception): DynamoDbAction[A] =
// withDynamo(_ => Left(error))
// def store(item: Item): DynamoDbAction[Option[Item]] =
// execute(DynamoDbPutOperation(item)).map(x => Some(x))
def fetch(req: GetItemRequest): DynamoDbAction[GetItemResult] =
execute(DynamoDbGetOperation(req))
// def delete(key: PrimaryKey): DynamoDbAction[Option[DeleteItemOutcome]] =
// execute(DynamoDbDeleteOperation(key)).map(x => Some(x))
//
// def search(query: QuerySpec): DynamoDbAction[Stream[Item]] =
// execute(DynamoDbQueryOperation(query))
def execute[T, S](op: DynamoDbOperation[T, S]): DynamoDbAction[S] = {
DynamoDbAction.withDynamo { (ec, client) =>
implicit val executionContext = ec;
try {
op match {
case DynamoDbGetOperation(req) => wrapAsyncMethod(client.getItemAsync, req).map(x => Right(x))
// case DynamoDbDeleteOperation(key) => Right(table.deleteItem(key))
// case DynamoDbQueryOperation(querySpec) => Right(table.query(querySpec).iterator().asInstanceOf[java.util.Iterator[Item]].asScala.toStream)
}
} catch {
case e: Exception => Future { Left(e) }
}
}
}
// implicit def DynamoDbActionMonad: Monad[DynamoDbAction] =
// new Monad[DynamoDbAction] {
// def point[A](v: => A) = ok[A](v)
// def bind[A, B](m: DynamoDbAction[A])(f: A => DynamoDbAction[B]) = m.flatMap(f)
// }
private def promiseToAsyncHandler[Request <: AmazonWebServiceRequest, Result](p: Promise[Result]) =
new AsyncHandler[Request, Result] {
override def onError(exception: Exception): Unit = { p.failure(exception); () }
override def onSuccess(request: Request, result: Result): Unit = { p.success(result); () }
}
private def promiseToVoidAsyncHandler[Request <: AmazonWebServiceRequest](p: Promise[Unit]) =
new AsyncHandler[Request, Void] {
override def onError(exception: Exception): Unit = { p.failure(exception); () }
override def onSuccess(request: Request, result: Void): Unit = { p.success(()); () }
}
def wrapAsyncMethod[Request <: AmazonWebServiceRequest, Result](f: (Request, AsyncHandler[Request, Result]) => JFuture[Result], request: Request): Future[Result] = {
val p = Promise[Result]
f(request, promiseToAsyncHandler(p))
p.future
}
def wrapVoidAsyncMethod[Request <: AmazonWebServiceRequest](f: (Request, AsyncHandler[Request, Void]) => JFuture[Void], request: Request): Future[Unit] = {
val p = Promise[Unit]
f(request, promiseToVoidAsyncHandler(p))
p.future
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment