-
-
Save charmby/a93c77bf29f33d2be5c1635e40ddc8de to your computer and use it in GitHub Desktop.
Parallel SCAN per node on a Redis Cluster as an Akka Stream Source
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.NotUsed | |
import akka.stream.scaladsl.Source | |
import redis.clients.jedis.{Jedis, JedisCluster, ScanParams} | |
import scala.collection.JavaConverters._ | |
import scala.concurrent.Future | |
class RedisStream(maxNodeParallelism: Int) | |
(implicit jedisCluster: JedisCluster, | |
blockingEC: RedisBlockingEC) { | |
implicit val ec = blockingEC.value | |
def scan(scanParams: ScanParams): Source[String, NotUsed] = { | |
lazy val futMasterNodes = Future(jedisCluster.getClusterNodes.asScala.toMap) | |
.flatMap { nodes => | |
Future.traverse(nodes) { case (_, node) => | |
Future { | |
ScalaCloseable.withResource(node.getResource()) { conn => | |
val info = conn.info("replication") | |
if (info.contains("role:master")) Some(node) | |
else None | |
} | |
} | |
} | |
} | |
.map(_.flatten) | |
Source.lazily(() => Source.fromFuture(futMasterNodes)) | |
.mapConcat(_.toList) | |
.flatMapMerge(maxNodeParallelism, { node => | |
Source.unfoldAsync[Long, List[String]](0) { cursor => | |
if (cursor == -1) Future.successful(None) | |
else { | |
Future { | |
ScalaCloseable.withResource(node.getResource()) { conn => | |
conn.scan(cursor.toString, scanParams) | |
} | |
} | |
.map { scanResult => | |
val keys = scanResult.getResult.asScala.toList | |
val scanResultCursor = scanResult.getStringCursor.toLong | |
val nextCursor = if (scanResultCursor == 0) -1 else scanResultCursor | |
Some(nextCursor -> keys) | |
} | |
} | |
} | |
}) | |
.mapConcat(_.toList) | |
.mapMaterializedValue(_ => NotUsed) | |
} | |
} | |
object ScalaCloseable { | |
def withResource[A >: Null <: AutoCloseable, B](a: => A)(f: A => B) = { | |
var aOrNull: A = null | |
try { | |
aOrNull = a | |
f(aOrNull) | |
} finally { | |
if (aOrNull != null) aOrNull.close() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment