Created
December 5, 2017 05:40
-
-
Save joost-de-vries/e2940e022112efafce1a370b56048229 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.{BufferedWriter, File, FileOutputStream, OutputStreamWriter} | |
import java.nio.ByteBuffer | |
import java.nio.charset.Charset | |
import java.util.{HashMap => JHMap, Map => JMap} | |
import akka.stream.alpakka.dynamodb.scaladsl.DynamoImplicits._ | |
import akka.actor.ActorSystem | |
import akka.stream.alpakka.dynamodb.scaladsl.DynamoClient | |
import akka.stream.alpakka.dynamodb.scaladsl.DynamoImplicits._ | |
import com.amazonaws.services.dynamodbv2.model.{AttributeValue, PutItemRequest} | |
import core.dynamodb.DynamodbAkka._ | |
import core.dynamodb.DynamodbQuerySpout.{Item, Key, num, par, pay, scanRequest} | |
import play.api.Logger | |
import swave.core.Spout | |
import scala.collection.JavaConverters._ | |
import scala.concurrent.{ExecutionContext, Future} | |
import scala.util.control.NonFatal | |
object Actions { | |
def toLocalFile(dir: File): AkkaEvent => AkkaEvent = { | |
case event @ AkkaEvent(partitionKey, seq, PersMessage(_, _, _, payload)) => | |
Logger.debug(s"$partitionKey $seq: ${payload.pay}") | |
val file = new File(dir, s"${partitionKey}_$seq.json") | |
file.getParentFile.mkdirs() | |
val oswriter = new OutputStreamWriter( | |
new FileOutputStream(file), | |
Charset.forName("UTF-8").newEncoder() | |
) | |
val writer = new BufferedWriter(oswriter) | |
try { | |
writer.write(payload.pay) | |
} finally { | |
writer.close() | |
} | |
event | |
} | |
def writeEvent(tableName: String)(event: TableEntry)(implicit client: DynamoClient, | |
actorSystem: ActorSystem): Future[TableEntry] = { | |
import actorSystem.dispatcher | |
val item = event.toItem | |
client | |
.single( | |
new PutItemRequest() | |
.withTableName(tableName) | |
.withItem(item)) | |
.map(_ => event) | |
} | |
def queryAll(tableName: String, attributes: Set[String] = Set(par, num, pay, seq))( | |
implicit client: DynamoClient, | |
executionContext: ExecutionContext): Option[Key] => Future[Spout.Unfolding[Option[Key], Seq[Item]]] = { | |
case None => //initial | |
Logger.debug("initial") | |
client | |
.single(scanRequest(tableName, key = null, attributes = attributes)) //scalastyle:ignore null | |
.map { result => | |
val nextState = Some(result.getLastEvaluatedKey) | |
Logger.debug(s"received initial") | |
Spout.Unfolding.Emit(elem = result.getItems.asScala, next = nextState) | |
} | |
.recoverWith { | |
case NonFatal(t) => | |
Logger.error("initial failed", t) | |
Future.failed(t) | |
} | |
case Some(null) => //scalastyle:ignore null | |
//no more | |
Logger.debug("no more") | |
Future.successful(Spout.Unfolding.EmitFinal(Seq.empty)) | |
case Some(lastkey) => //more | |
Logger.debug("more") | |
client | |
.single(scanRequest(tableName = tableName, key = lastkey, attributes = attributes)) | |
.map { result => | |
Logger.debug(s"received more ${result.getItems.size}") | |
Spout.Unfolding.Emit(elem = result.getItems.asScala, next = Some(result.getLastEvaluatedKey)) | |
} | |
.recoverWith { | |
case NonFatal(t) => | |
Logger.error("more failed", t) | |
Future.failed(t) | |
} | |
} | |
} | |
import java.io.ByteArrayOutputStream | |
import java.nio.ByteBuffer | |
import java.util.zip.Deflater | |
import java.util.{HashMap => JHMap, Map => JMap} | |
import akka.persistence.serialization.MessageFormats | |
import akka.persistence.serialization.MessageFormats.PersistentMessage | |
import akka.protobuf.{ByteString => ProtoByteString} | |
import akka.stream.alpakka.dynamodb.scaladsl.DynamoClient | |
import akka.stream.alpakka.dynamodb.scaladsl.DynamoImplicits._ | |
import akka.util.ByteString | |
import com.amazonaws.services.dynamodbv2.model.{AttributeValue, ScanRequest} | |
import core.dynamodb.DynamodbAkka.{fromDynamodb, toPersistentMessage} | |
import play.api.Logger | |
import swave.core.Spout | |
import Actions.queryAll | |
import core.SerializerUtil | |
import core.dynamodb.DynamodbQuerySpout.{Item, num, par, pay} | |
import scala.collection.JavaConverters._ | |
import scala.concurrent.{ExecutionContext, Future} | |
import scala.util.Try | |
import scala.util.control.NonFatal | |
object DynamodbQuerySpout extends DynamodbAkka { | |
def apply(tableName: String)(implicit client: DynamoClient, executionContext: ExecutionContext): Spout[TableEntry] = { | |
val initialState: Option[Key] = None | |
Spout | |
.unfoldAsync(initialState) { queryAll(tableName) } | |
.map(_.toIterator) | |
.takeWhile(_.nonEmpty) | |
.flattenConcat(parallelism = 1) | |
// .onElement(e => Logger.debug(s"received $e")) | |
.map(item => fromDynamodb(item)) | |
} | |
} | |
sealed trait TableEntry { | |
def toItem: Item | |
} | |
case class Payload(pay: String, serializerId: Int, payloadManifest: String) | |
case class PersMessage(persistenceId: String, seqNumber: Long, writerUUID: String, payload: Payload) | |
case class AkkaEvent(par: String, num: Long, persistentMessage: PersMessage) extends TableEntry { | |
override def toItem = { | |
val bytes = toPersistentMessage(this).toByteArray | |
val item: Item = new JHMap | |
item.put(DynamodbAkka.par, new AttributeValue().withS(par)) | |
item.put(DynamodbAkka.num, new AttributeValue().withN(num.toString)) | |
item.put(pay, new AttributeValue().withB(ByteBuffer.wrap(bytes))) | |
item | |
} | |
} | |
case class SHEvent(par: String, num: Long, seq: Long) extends TableEntry { | |
override def toItem = { | |
val item: Item = new JHMap | |
item.put(DynamodbAkka.par, new AttributeValue().withS(par)) | |
item.put(DynamodbAkka.num, new AttributeValue().withN(num.toString)) | |
item.put(DynamodbAkka.seq, new AttributeValue().withN(seq.toString)) | |
item | |
} | |
} | |
case class SLEvent(par: String, num: Long, seq: Long) extends TableEntry { | |
override def toItem = { | |
val item: Item = new JHMap | |
item.put(DynamodbAkka.par, new AttributeValue().withS(par)) | |
item.put(DynamodbAkka.num, new AttributeValue().withN(num.toString)) | |
item.put(DynamodbAkka.seq, new AttributeValue().withN(seq.toString)) | |
item | |
} | |
} | |
trait DynamodbAkka { | |
type Key = JMap[String, AttributeValue] | |
type Item = Key | |
val par = "par" | |
val num = "num" | |
val pay = "pay" | |
val seq = "seq" | |
def scanRequest(tableName: String, key: Key, attributes: Set[String]): ScanRequest = | |
new ScanRequest() | |
.withTableName(tableName) | |
.withExclusiveStartKey(key) | |
.withProjectionExpression(attributes.mkString(",")) | |
} | |
object DynamodbAkka extends DynamodbAkka { | |
def protoToString(byteString: ProtoByteString): String = { | |
val bytes = byteString.toByteArray | |
new String(bytes, "UTF-8") | |
} | |
def protoUncompressToString(byteString: ProtoByteString): String = { | |
val compressedBytes = byteString.toByteArray | |
val bytes = Try(SerializerUtil.unzip(compressedBytes)).getOrElse( | |
throw new IllegalArgumentException( | |
s"expected compressed bytes received uncompressed ${new String(compressedBytes, "UTF-8")}")) | |
new String(bytes, "UTF-8") | |
} | |
def fromDynamodb(item: Item): TableEntry = { | |
if (Option(item.get(pay)).isDefined) { | |
val partitionKey = item.get(par).getS | |
val numVal = item.get(num).getN.toLong | |
val payload = item.get(pay).getB | |
val persistMessageBytes = ByteString(payload).toArray | |
val persistentMessage = MessageFormats.PersistentMessage.parseFrom(persistMessageBytes) | |
val payloadString = protoUncompressToString(persistentMessage.getPayload.getPayload) | |
AkkaEvent( | |
par = partitionKey, | |
num = numVal, | |
persistentMessage = PersMessage( | |
persistenceId = persistentMessage.getPersistenceId, | |
seqNumber = persistentMessage.getSequenceNr, | |
writerUUID = persistentMessage.getWriterUuid, | |
Payload( | |
pay = payloadString, | |
serializerId = persistentMessage.getPayload.getSerializerId, | |
payloadManifest = protoToString(persistentMessage.getPayload.getPayloadManifest) | |
) | |
) | |
) | |
} else { | |
val partitionKey = item.get(par).getS | |
val numVal = item.get(num).getN.toLong | |
val seqVal = item.get(seq).getN.toLong | |
if (partitionKey.startsWith("scv-SH")) { | |
SHEvent(par = partitionKey, num = numVal, seq = seqVal) | |
} else if (partitionKey.startsWith("scv-SL")) { | |
SLEvent(par = partitionKey, num = numVal, seq = seqVal) | |
} else { | |
throw new IllegalArgumentException(s"unexpected value $item") | |
} | |
} | |
} | |
def toPersistentMessage(event: AkkaEvent): PersistentMessage = { | |
// payload { | |
// serializerId: 90020004 | |
// payload: "{\"event\":{\"meta\":{\"recordTime\":\"2017-07-13T00:04:22.966Z\",\"id\":\"b06fc5e3-8a8a-4b1c-aa96-0e28fa6e3add\", | |
// \"onBehalfOf\":\"Samsung\",\"eventType\":\"TerminalEvent\",\"version\":1},\"terminalParty\":\"Ect\",\"containerId\":{\"value\":\"HDMU6439395\"}, | |
// \"terminal\":\"EctDelta\"}}" | |
// payloadManifest: "1.0" | |
// } | |
// sequenceNr: 144 | |
// persistenceId: "Ect-HDMU6439395" | |
// writerUuid: "ece610b9-38ef-4463-929a-3a05c70d55bc" | |
persistentMessageBuilder(event).build() | |
} | |
private def persistentMessageBuilder(event: AkkaEvent) = { | |
// sequenceNr: 144 | |
// persistenceId: "Ect-HDMU6439395" | |
// writerUuid: "ece610b9-38ef-4463-929a-3a05c70d55bc" | |
// payload | |
val builder = MessageFormats.PersistentMessage.newBuilder | |
builder.setPersistenceId(event.persistentMessage.persistenceId) | |
builder.setPayload(persistentPayloadBuilder(event.persistentMessage.payload)) | |
builder.setSequenceNr(event.persistentMessage.seqNumber) | |
builder.setWriterUuid(event.persistentMessage.writerUUID) | |
builder | |
} | |
private def persistentPayloadBuilder(payload: Payload) = { | |
// payload { | |
// serializerId: 90020004 | |
// payload: "{\"event\":{\"meta\":{\"recordTime\":\"2017-07-13T00:04:22.966Z\",\"id\":\"b06fc5e3-8a8a-4b1c-aa96-0e28fa6e3add\", | |
// \"onBehalfOf\":\"Samsung\",\"eventType\":\"TerminalEvent\",\"version\":1},\"terminalParty\":\"Ect\", | |
// \"containerId\":{\"value\":\"HDMU6439395\"},\"terminal\":\"EctDelta\"}}" | |
// payloadManifest: "1.0" | |
// } | |
val builder = MessageFormats.PersistentPayload.newBuilder() | |
builder.setPayloadManifest(ProtoByteString.copyFromUtf8(payload.payloadManifest)) | |
val payBytes = payload.pay.getBytes("UTF-8") | |
val compressedPayBytes = SerializerUtil.zip(payBytes) | |
val pbByteString = ProtoByteString.copyFrom(compressedPayBytes) | |
builder.setPayload(pbByteString) | |
builder.setSerializerId(payload.serializerId) | |
builder | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment