Created
April 28, 2015 22:04
-
-
Save dbousamra/f9de7155cb117ebc1a0e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| object Example { | |
| def main(args: Array[String]) { | |
| val client = new AmazonDynamoDBAsyncClient(creds) | |
| val system = ActorSystem() | |
| // THIS GUY. I am passing in an ec to DynamoDb. Is it safe to have a reference here? I have to have one because the map on line 10 complains. | |
| implicit val ec = system.dispatcher | |
| val snapshotDao = new DynamoDbSnapshotDao(client, ec) | |
| val f = snapshotDao.getSnapshot2().map { x => | |
| x match { | |
| case Left(e) => println(e) | |
| case Right(a) => println(a) | |
| } | |
| } | |
| Await.result(f, 10.seconds) | |
| } | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package com.cammy.dao | |
| import java.util.concurrent.{Future => JFuture} | |
| import com.amazonaws.AmazonWebServiceRequest | |
| import com.amazonaws.handlers.AsyncHandler | |
| import com.amazonaws.services.dynamodbv2.AmazonDynamoDBAsyncClient | |
| import com.amazonaws.services.dynamodbv2.document._ | |
| import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec | |
| import com.amazonaws.services.dynamodbv2.model.{GetItemRequest, GetItemResult} | |
| import scala.concurrent.{ExecutionContext, Future, Promise} | |
| sealed trait KVOperation[T, S] | |
| sealed trait DynamoDbOperation[T, S] extends KVOperation[T, S] | |
| case class DynamoDbPutOperation(value: Item) extends DynamoDbOperation[Item, Item] | |
| case class DynamoDbGetOperation(value: GetItemRequest) extends DynamoDbOperation[GetItemRequest, GetItemResult] | |
| case class DynamoDbDeleteOperation(value: PrimaryKey) extends DynamoDbOperation[PrimaryKey, DeleteItemOutcome] | |
| case class DynamoDbQueryOperation(value: QuerySpec) extends DynamoDbOperation[PrimaryKey, Stream[Item]] | |
| case class AkkaDynamoClient(ec: ExecutionContext, client: AmazonDynamoDBAsyncClient) | |
| case class DynamoDbAction[A](run: (ExecutionContext, AmazonDynamoDBAsyncClient) => Future[Either[Exception, A]]) { | |
| def map[B](f: A => B): DynamoDbAction[B] = | |
| DynamoDbAction((ec, client) => run(ec, client).map { | |
| r => r.fold(Left.apply, a => Right(f(a))) | |
| }(ec)) | |
| // def flatMap[B](f: A => DynamoDbAction[B]): DynamoDbAction[B] = | |
| // DynamoDbAction((ec, client) => run(ec, client).flatMap { a => | |
| // a match { | |
| // case Left(e) => e | |
| // case Right(a) => f(a) | |
| // } | |
| // }(ec)) | |
| } | |
| object DynamoDbAction { | |
| def withDynamo[T](f: (ExecutionContext, AmazonDynamoDBAsyncClient) => Future[Either[Exception, T]]): DynamoDbAction[T] = | |
| DynamoDbAction(f) | |
| // | |
| // def ok[A](value: A): DynamoDbAction[A] = | |
| // withDynamo(_ => Right(value)) | |
| // | |
| // def fail[A](error: Exception): DynamoDbAction[A] = | |
| // withDynamo(_ => Left(error)) | |
| // def store(item: Item): DynamoDbAction[Option[Item]] = | |
| // execute(DynamoDbPutOperation(item)).map(x => Some(x)) | |
| def fetch(req: GetItemRequest): DynamoDbAction[GetItemResult] = | |
| execute(DynamoDbGetOperation(req)) | |
| // def delete(key: PrimaryKey): DynamoDbAction[Option[DeleteItemOutcome]] = | |
| // execute(DynamoDbDeleteOperation(key)).map(x => Some(x)) | |
| // | |
| // def search(query: QuerySpec): DynamoDbAction[Stream[Item]] = | |
| // execute(DynamoDbQueryOperation(query)) | |
| def execute[T, S](op: DynamoDbOperation[T, S]): DynamoDbAction[S] = { | |
| DynamoDbAction.withDynamo { (ec, client) => | |
| implicit val executionContext = ec; | |
| try { | |
| op match { | |
| case DynamoDbGetOperation(req) => wrapAsyncMethod(client.getItemAsync, req).map(x => Right(x)) | |
| // case DynamoDbDeleteOperation(key) => Right(table.deleteItem(key)) | |
| // case DynamoDbQueryOperation(querySpec) => Right(table.query(querySpec).iterator().asInstanceOf[java.util.Iterator[Item]].asScala.toStream) | |
| } | |
| } catch { | |
| case e: Exception => Future { Left(e) } | |
| } | |
| } | |
| } | |
| // implicit def DynamoDbActionMonad: Monad[DynamoDbAction] = | |
| // new Monad[DynamoDbAction] { | |
| // def point[A](v: => A) = ok[A](v) | |
| // def bind[A, B](m: DynamoDbAction[A])(f: A => DynamoDbAction[B]) = m.flatMap(f) | |
| // } | |
| private def promiseToAsyncHandler[Request <: AmazonWebServiceRequest, Result](p: Promise[Result]) = | |
| new AsyncHandler[Request, Result] { | |
| override def onError(exception: Exception): Unit = { p.failure(exception); () } | |
| override def onSuccess(request: Request, result: Result): Unit = { p.success(result); () } | |
| } | |
| private def promiseToVoidAsyncHandler[Request <: AmazonWebServiceRequest](p: Promise[Unit]) = | |
| new AsyncHandler[Request, Void] { | |
| override def onError(exception: Exception): Unit = { p.failure(exception); () } | |
| override def onSuccess(request: Request, result: Void): Unit = { p.success(()); () } | |
| } | |
| def wrapAsyncMethod[Request <: AmazonWebServiceRequest, Result](f: (Request, AsyncHandler[Request, Result]) => JFuture[Result], request: Request): Future[Result] = { | |
| val p = Promise[Result] | |
| f(request, promiseToAsyncHandler(p)) | |
| p.future | |
| } | |
| def wrapVoidAsyncMethod[Request <: AmazonWebServiceRequest](f: (Request, AsyncHandler[Request, Void]) => JFuture[Void], request: Request): Future[Unit] = { | |
| val p = Promise[Unit] | |
| f(request, promiseToVoidAsyncHandler(p)) | |
| p.future | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment