Skip to content

Instantly share code, notes, and snippets.

@joost-de-vries
Created December 5, 2017 05:40
Show Gist options
  • Save joost-de-vries/e2940e022112efafce1a370b56048229 to your computer and use it in GitHub Desktop.
Save joost-de-vries/e2940e022112efafce1a370b56048229 to your computer and use it in GitHub Desktop.
import java.io.{BufferedWriter, File, FileOutputStream, OutputStreamWriter}
import java.nio.ByteBuffer
import java.nio.charset.Charset
import java.util.{HashMap => JHMap, Map => JMap}
import akka.stream.alpakka.dynamodb.scaladsl.DynamoImplicits._
import akka.actor.ActorSystem
import akka.stream.alpakka.dynamodb.scaladsl.DynamoClient
import akka.stream.alpakka.dynamodb.scaladsl.DynamoImplicits._
import com.amazonaws.services.dynamodbv2.model.{AttributeValue, PutItemRequest}
import core.dynamodb.DynamodbAkka._
import core.dynamodb.DynamodbQuerySpout.{Item, Key, num, par, pay, scanRequest}
import play.api.Logger
import swave.core.Spout
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.control.NonFatal
object Actions {
def toLocalFile(dir: File): AkkaEvent => AkkaEvent = {
case event @ AkkaEvent(partitionKey, seq, PersMessage(_, _, _, payload)) =>
Logger.debug(s"$partitionKey $seq: ${payload.pay}")
val file = new File(dir, s"${partitionKey}_$seq.json")
file.getParentFile.mkdirs()
val oswriter = new OutputStreamWriter(
new FileOutputStream(file),
Charset.forName("UTF-8").newEncoder()
)
val writer = new BufferedWriter(oswriter)
try {
writer.write(payload.pay)
} finally {
writer.close()
}
event
}
def writeEvent(tableName: String)(event: TableEntry)(implicit client: DynamoClient,
actorSystem: ActorSystem): Future[TableEntry] = {
import actorSystem.dispatcher
val item = event.toItem
client
.single(
new PutItemRequest()
.withTableName(tableName)
.withItem(item))
.map(_ => event)
}
def queryAll(tableName: String, attributes: Set[String] = Set(par, num, pay, seq))(
implicit client: DynamoClient,
executionContext: ExecutionContext): Option[Key] => Future[Spout.Unfolding[Option[Key], Seq[Item]]] = {
case None => //initial
Logger.debug("initial")
client
.single(scanRequest(tableName, key = null, attributes = attributes)) //scalastyle:ignore null
.map { result =>
val nextState = Some(result.getLastEvaluatedKey)
Logger.debug(s"received initial")
Spout.Unfolding.Emit(elem = result.getItems.asScala, next = nextState)
}
.recoverWith {
case NonFatal(t) =>
Logger.error("initial failed", t)
Future.failed(t)
}
case Some(null) => //scalastyle:ignore null
//no more
Logger.debug("no more")
Future.successful(Spout.Unfolding.EmitFinal(Seq.empty))
case Some(lastkey) => //more
Logger.debug("more")
client
.single(scanRequest(tableName = tableName, key = lastkey, attributes = attributes))
.map { result =>
Logger.debug(s"received more ${result.getItems.size}")
Spout.Unfolding.Emit(elem = result.getItems.asScala, next = Some(result.getLastEvaluatedKey))
}
.recoverWith {
case NonFatal(t) =>
Logger.error("more failed", t)
Future.failed(t)
}
}
}
import java.io.ByteArrayOutputStream
import java.nio.ByteBuffer
import java.util.zip.Deflater
import java.util.{HashMap => JHMap, Map => JMap}
import akka.persistence.serialization.MessageFormats
import akka.persistence.serialization.MessageFormats.PersistentMessage
import akka.protobuf.{ByteString => ProtoByteString}
import akka.stream.alpakka.dynamodb.scaladsl.DynamoClient
import akka.stream.alpakka.dynamodb.scaladsl.DynamoImplicits._
import akka.util.ByteString
import com.amazonaws.services.dynamodbv2.model.{AttributeValue, ScanRequest}
import core.dynamodb.DynamodbAkka.{fromDynamodb, toPersistentMessage}
import play.api.Logger
import swave.core.Spout
import Actions.queryAll
import core.SerializerUtil
import core.dynamodb.DynamodbQuerySpout.{Item, num, par, pay}
import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Try
import scala.util.control.NonFatal
object DynamodbQuerySpout extends DynamodbAkka {
def apply(tableName: String)(implicit client: DynamoClient, executionContext: ExecutionContext): Spout[TableEntry] = {
val initialState: Option[Key] = None
Spout
.unfoldAsync(initialState) { queryAll(tableName) }
.map(_.toIterator)
.takeWhile(_.nonEmpty)
.flattenConcat(parallelism = 1)
// .onElement(e => Logger.debug(s"received $e"))
.map(item => fromDynamodb(item))
}
}
sealed trait TableEntry {
def toItem: Item
}
case class Payload(pay: String, serializerId: Int, payloadManifest: String)
case class PersMessage(persistenceId: String, seqNumber: Long, writerUUID: String, payload: Payload)
case class AkkaEvent(par: String, num: Long, persistentMessage: PersMessage) extends TableEntry {
override def toItem = {
val bytes = toPersistentMessage(this).toByteArray
val item: Item = new JHMap
item.put(DynamodbAkka.par, new AttributeValue().withS(par))
item.put(DynamodbAkka.num, new AttributeValue().withN(num.toString))
item.put(pay, new AttributeValue().withB(ByteBuffer.wrap(bytes)))
item
}
}
case class SHEvent(par: String, num: Long, seq: Long) extends TableEntry {
override def toItem = {
val item: Item = new JHMap
item.put(DynamodbAkka.par, new AttributeValue().withS(par))
item.put(DynamodbAkka.num, new AttributeValue().withN(num.toString))
item.put(DynamodbAkka.seq, new AttributeValue().withN(seq.toString))
item
}
}
case class SLEvent(par: String, num: Long, seq: Long) extends TableEntry {
override def toItem = {
val item: Item = new JHMap
item.put(DynamodbAkka.par, new AttributeValue().withS(par))
item.put(DynamodbAkka.num, new AttributeValue().withN(num.toString))
item.put(DynamodbAkka.seq, new AttributeValue().withN(seq.toString))
item
}
}
trait DynamodbAkka {
type Key = JMap[String, AttributeValue]
type Item = Key
val par = "par"
val num = "num"
val pay = "pay"
val seq = "seq"
def scanRequest(tableName: String, key: Key, attributes: Set[String]): ScanRequest =
new ScanRequest()
.withTableName(tableName)
.withExclusiveStartKey(key)
.withProjectionExpression(attributes.mkString(","))
}
object DynamodbAkka extends DynamodbAkka {
def protoToString(byteString: ProtoByteString): String = {
val bytes = byteString.toByteArray
new String(bytes, "UTF-8")
}
def protoUncompressToString(byteString: ProtoByteString): String = {
val compressedBytes = byteString.toByteArray
val bytes = Try(SerializerUtil.unzip(compressedBytes)).getOrElse(
throw new IllegalArgumentException(
s"expected compressed bytes received uncompressed ${new String(compressedBytes, "UTF-8")}"))
new String(bytes, "UTF-8")
}
def fromDynamodb(item: Item): TableEntry = {
if (Option(item.get(pay)).isDefined) {
val partitionKey = item.get(par).getS
val numVal = item.get(num).getN.toLong
val payload = item.get(pay).getB
val persistMessageBytes = ByteString(payload).toArray
val persistentMessage = MessageFormats.PersistentMessage.parseFrom(persistMessageBytes)
val payloadString = protoUncompressToString(persistentMessage.getPayload.getPayload)
AkkaEvent(
par = partitionKey,
num = numVal,
persistentMessage = PersMessage(
persistenceId = persistentMessage.getPersistenceId,
seqNumber = persistentMessage.getSequenceNr,
writerUUID = persistentMessage.getWriterUuid,
Payload(
pay = payloadString,
serializerId = persistentMessage.getPayload.getSerializerId,
payloadManifest = protoToString(persistentMessage.getPayload.getPayloadManifest)
)
)
)
} else {
val partitionKey = item.get(par).getS
val numVal = item.get(num).getN.toLong
val seqVal = item.get(seq).getN.toLong
if (partitionKey.startsWith("scv-SH")) {
SHEvent(par = partitionKey, num = numVal, seq = seqVal)
} else if (partitionKey.startsWith("scv-SL")) {
SLEvent(par = partitionKey, num = numVal, seq = seqVal)
} else {
throw new IllegalArgumentException(s"unexpected value $item")
}
}
}
def toPersistentMessage(event: AkkaEvent): PersistentMessage = {
// payload {
// serializerId: 90020004
// payload: "{\"event\":{\"meta\":{\"recordTime\":\"2017-07-13T00:04:22.966Z\",\"id\":\"b06fc5e3-8a8a-4b1c-aa96-0e28fa6e3add\",
// \"onBehalfOf\":\"Samsung\",\"eventType\":\"TerminalEvent\",\"version\":1},\"terminalParty\":\"Ect\",\"containerId\":{\"value\":\"HDMU6439395\"},
// \"terminal\":\"EctDelta\"}}"
// payloadManifest: "1.0"
// }
// sequenceNr: 144
// persistenceId: "Ect-HDMU6439395"
// writerUuid: "ece610b9-38ef-4463-929a-3a05c70d55bc"
persistentMessageBuilder(event).build()
}
private def persistentMessageBuilder(event: AkkaEvent) = {
// sequenceNr: 144
// persistenceId: "Ect-HDMU6439395"
// writerUuid: "ece610b9-38ef-4463-929a-3a05c70d55bc"
// payload
val builder = MessageFormats.PersistentMessage.newBuilder
builder.setPersistenceId(event.persistentMessage.persistenceId)
builder.setPayload(persistentPayloadBuilder(event.persistentMessage.payload))
builder.setSequenceNr(event.persistentMessage.seqNumber)
builder.setWriterUuid(event.persistentMessage.writerUUID)
builder
}
private def persistentPayloadBuilder(payload: Payload) = {
// payload {
// serializerId: 90020004
// payload: "{\"event\":{\"meta\":{\"recordTime\":\"2017-07-13T00:04:22.966Z\",\"id\":\"b06fc5e3-8a8a-4b1c-aa96-0e28fa6e3add\",
// \"onBehalfOf\":\"Samsung\",\"eventType\":\"TerminalEvent\",\"version\":1},\"terminalParty\":\"Ect\",
// \"containerId\":{\"value\":\"HDMU6439395\"},\"terminal\":\"EctDelta\"}}"
// payloadManifest: "1.0"
// }
val builder = MessageFormats.PersistentPayload.newBuilder()
builder.setPayloadManifest(ProtoByteString.copyFromUtf8(payload.payloadManifest))
val payBytes = payload.pay.getBytes("UTF-8")
val compressedPayBytes = SerializerUtil.zip(payBytes)
val pbByteString = ProtoByteString.copyFrom(compressedPayBytes)
builder.setPayload(pbByteString)
builder.setSerializerId(payload.serializerId)
builder
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment