Skip to content

Instantly share code, notes, and snippets.

@Slakah
Last active January 14, 2019 15:14
Show Gist options
  • Select an option

  • Save Slakah/bf1fd23d3165ec9fb0b0ecc83d8f6ce2 to your computer and use it in GitHub Desktop.

Select an option

Save Slakah/bf1fd23d3165ec9fb0b0ecc83d8f6ce2 to your computer and use it in GitHub Desktop.
Scala kinesis cosumer
#!/usr/bin/env amm
import $ivy.{
`software.amazon.awssdk:apache-client:2.2.0`,
`software.amazon.awssdk:netty-nio-client:2.2.0`,
`software.amazon.awssdk:sts:2.2.0`,
`software.amazon.awssdk:kinesis:2.2.0`,
`org.slf4j:slf4j-nop:1.7.25`,
`io.monix::monix:3.0.0-RC2`,
`org.typelevel::cats-core:1.5.0`,
`org.typelevel::cats-effect:1.1.0`
}
import scala.annotation.tailrec
import scala.concurrent._, duration._
import cats.data.{NonEmptyList => NEL}
import cats.implicits._
import monix.reactive.Observable
import software.amazon.awssdk.services.kinesis._
import software.amazon.awssdk.services.kinesis.model._
import ammonite.ops._
/**
* TODO:
* - Handle shards being added/deleted.
* - Stop blocking.
* - Handle throttling.
* - Command line shard iterator type.
* - Error handling.
* - Command line args.
* - cats-effect/monix refactor.
*/
@main
def run(streamName: String): Unit = {
val kinesis = KinesisClient.create()
def getRecordsObs(streamName: String): Observable[List[Record]] = {
val shardIters = getLatestShardIterators(streamName)
Observable
.fromIteratorUnsafe(zipGetRecordsIter(shardIters))
.delayOnNext(2.second) // delay to prevent rate limiting
}
def getLatestShardIterators(streamName: String): NEL[String] = {
val shardIteratorList: List[String] = for {
shard <- getKinesisShards(streamName)
shardIterator = blocking {
kinesis
.getShardIterator(
GetShardIteratorRequest
.builder()
.streamName(streamName)
.shardId(shard.shardId())
.shardIteratorType(ShardIteratorType.LATEST)
.build()
)
.shardIterator()
}
} yield shardIterator
NEL
.fromList(shardIteratorList)
.getOrElse(throw new Exception(s"Unable to get shard iterators for $streamName"))
}
def getKinesisShards(streamName: String): List[Shard] = blocking {
import scala.collection.JavaConverters._
@tailrec
def loop(acc: List[Shard]): List[Shard] = {
val desc = kinesis
.describeStream(
DescribeStreamRequest
.builder()
.streamName(streamName)
.exclusiveStartShardId(acc.lastOption.map(_.shardId()).orNull)
.build()
)
.streamDescription()
val shards = desc.shards().asScala.toList
if (desc.hasMoreShards && shards.nonEmpty) {
loop(acc ::: shards)
} else {
acc ::: shards
}
}
loop(List.empty)
}
def zipGetRecordsIter(
shardIterators: NEL[String]
): Iterator[List[Record]] = {
traverseZipConcat(shardIterators.map(getRecordsIter))
}
/** Zip a list of iterators together, an concat the resultant lists. */
def traverseZipConcat[A](
iters: NEL[Iterator[List[A]]]
): Iterator[List[A]] = {
iters.tail.foldLeft(iters.head) {
case (acc, iter) => acc.zip(iter).map { case (x, y) => x ::: y }
}
}
/** Produce an iterator of kinesis records for the supplied shard iterator. */
def getRecordsIter(shardIterator: String) = {
import scala.collection.JavaConverters._
var nextShardIterator: Option[String] = Some(shardIterator)
Iterator.continually {
nextShardIterator match {
case None => List.empty // shard has been closed
case Some(shardIter) =>
val resp = kinesis
.getRecords(
GetRecordsRequest
.builder()
.shardIterator(shardIter)
.build()
)
nextShardIterator = Option(resp.nextShardIterator())
resp.records().asScala.toList
}
}
}
def showRecord(record: Record): String = {
scala.util.Try(record.data.asUtf8String)
.toEither
.leftMap(err => s"""{"errorClass": "${err.getClass.getTypeName}", "message": "${err.getMessage}"}""")
.merge
.replaceAllLiterally("\n", "")
}
import monix.execution.Scheduler.Implicits.global
val fut = getRecordsObs(streamName)
.foreach(_.foreach(record => println(showRecord(record))))
Await.result(fut, Duration.Inf)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment