Last active
January 14, 2019 15:14
-
-
Save Slakah/bf1fd23d3165ec9fb0b0ecc83d8f6ce2 to your computer and use it in GitHub Desktop.
Scala kinesis cosumer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env amm | |
| import $ivy.{ | |
| `software.amazon.awssdk:apache-client:2.2.0`, | |
| `software.amazon.awssdk:netty-nio-client:2.2.0`, | |
| `software.amazon.awssdk:sts:2.2.0`, | |
| `software.amazon.awssdk:kinesis:2.2.0`, | |
| `org.slf4j:slf4j-nop:1.7.25`, | |
| `io.monix::monix:3.0.0-RC2`, | |
| `org.typelevel::cats-core:1.5.0`, | |
| `org.typelevel::cats-effect:1.1.0` | |
| } | |
| import scala.annotation.tailrec | |
| import scala.concurrent._, duration._ | |
| import cats.data.{NonEmptyList => NEL} | |
| import cats.implicits._ | |
| import monix.reactive.Observable | |
| import software.amazon.awssdk.services.kinesis._ | |
| import software.amazon.awssdk.services.kinesis.model._ | |
| import ammonite.ops._ | |
| /** | |
| * TODO: | |
| * - Handle shards being added/deleted. | |
| * - Stop blocking. | |
| * - Handle throttling. | |
| * - Command line shard iterator type. | |
| * - Error handling. | |
| * - Command line args. | |
| * - cats-effect/monix refactor. | |
| */ | |
| @main | |
| def run(streamName: String): Unit = { | |
| val kinesis = KinesisClient.create() | |
| def getRecordsObs(streamName: String): Observable[List[Record]] = { | |
| val shardIters = getLatestShardIterators(streamName) | |
| Observable | |
| .fromIteratorUnsafe(zipGetRecordsIter(shardIters)) | |
| .delayOnNext(2.second) // delay to prevent rate limiting | |
| } | |
| def getLatestShardIterators(streamName: String): NEL[String] = { | |
| val shardIteratorList: List[String] = for { | |
| shard <- getKinesisShards(streamName) | |
| shardIterator = blocking { | |
| kinesis | |
| .getShardIterator( | |
| GetShardIteratorRequest | |
| .builder() | |
| .streamName(streamName) | |
| .shardId(shard.shardId()) | |
| .shardIteratorType(ShardIteratorType.LATEST) | |
| .build() | |
| ) | |
| .shardIterator() | |
| } | |
| } yield shardIterator | |
| NEL | |
| .fromList(shardIteratorList) | |
| .getOrElse(throw new Exception(s"Unable to get shard iterators for $streamName")) | |
| } | |
| def getKinesisShards(streamName: String): List[Shard] = blocking { | |
| import scala.collection.JavaConverters._ | |
| @tailrec | |
| def loop(acc: List[Shard]): List[Shard] = { | |
| val desc = kinesis | |
| .describeStream( | |
| DescribeStreamRequest | |
| .builder() | |
| .streamName(streamName) | |
| .exclusiveStartShardId(acc.lastOption.map(_.shardId()).orNull) | |
| .build() | |
| ) | |
| .streamDescription() | |
| val shards = desc.shards().asScala.toList | |
| if (desc.hasMoreShards && shards.nonEmpty) { | |
| loop(acc ::: shards) | |
| } else { | |
| acc ::: shards | |
| } | |
| } | |
| loop(List.empty) | |
| } | |
| def zipGetRecordsIter( | |
| shardIterators: NEL[String] | |
| ): Iterator[List[Record]] = { | |
| traverseZipConcat(shardIterators.map(getRecordsIter)) | |
| } | |
| /** Zip a list of iterators together, an concat the resultant lists. */ | |
| def traverseZipConcat[A]( | |
| iters: NEL[Iterator[List[A]]] | |
| ): Iterator[List[A]] = { | |
| iters.tail.foldLeft(iters.head) { | |
| case (acc, iter) => acc.zip(iter).map { case (x, y) => x ::: y } | |
| } | |
| } | |
| /** Produce an iterator of kinesis records for the supplied shard iterator. */ | |
| def getRecordsIter(shardIterator: String) = { | |
| import scala.collection.JavaConverters._ | |
| var nextShardIterator: Option[String] = Some(shardIterator) | |
| Iterator.continually { | |
| nextShardIterator match { | |
| case None => List.empty // shard has been closed | |
| case Some(shardIter) => | |
| val resp = kinesis | |
| .getRecords( | |
| GetRecordsRequest | |
| .builder() | |
| .shardIterator(shardIter) | |
| .build() | |
| ) | |
| nextShardIterator = Option(resp.nextShardIterator()) | |
| resp.records().asScala.toList | |
| } | |
| } | |
| } | |
| def showRecord(record: Record): String = { | |
| scala.util.Try(record.data.asUtf8String) | |
| .toEither | |
| .leftMap(err => s"""{"errorClass": "${err.getClass.getTypeName}", "message": "${err.getMessage}"}""") | |
| .merge | |
| .replaceAllLiterally("\n", "") | |
| } | |
| import monix.execution.Scheduler.Implicits.global | |
| val fut = getRecordsObs(streamName) | |
| .foreach(_.foreach(record => println(showRecord(record)))) | |
| Await.result(fut, Duration.Inf) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment