Created
December 4, 2018 12:28
-
-
Save vasily-kirichenko/c39926d7ea11b79a61055c7b54071036 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.NotUsed | |
import akka.actor.Scheduler | |
import akka.actor.typed.{ActorRef, _} | |
import akka.actor.typed.scaladsl.AskPattern._ | |
import akka.actor.typed.scaladsl.Behaviors | |
import akka.actor.typed.scaladsl.adapter._ | |
import akka.cluster.Cluster | |
import akka.cluster.ddata.{LWWMap, LWWMapKey, ReplicatedData} | |
import akka.cluster.ddata.typed.scaladsl.{Replicator, _} | |
import akka.stream._ | |
import akka.util.Timeout | |
import com.typesafe.config.ConfigFactory | |
import org.joda.time.DateTime | |
import org.slf4j.LoggerFactory | |
import scala.collection.JavaConverters._ | |
import scala.concurrent._ | |
import scala.concurrent.duration._ | |
import scala.util.Random | |
case class Hashes | |
( | |
md5: Seq[Byte], | |
sha1: Seq[Byte], | |
sha256: Seq[Byte] | |
) | |
case class FileMeta | |
( | |
hashes: Hashes, | |
size: Long, | |
categories: Seq[Int], | |
created: DateTime | |
) extends ReplicatedData { | |
override type T = FileMeta | |
override def merge(that: FileMeta) = that | |
} | |
trait FileMetaStorage { | |
def add(bundleUrn: String, meta: FileMeta): Future[Unit] | |
def get(bundleUrn: String): Future[Option[FileMeta]] | |
} | |
class DDataFileMetaStorage()(implicit replicator: ActorRef[Replicator.Command], ctx: ExecutionContext, scheduler: Scheduler, node: Cluster) | |
extends FileMetaStorage { | |
private val Key = LWWMapKey[String, FileMeta]("file-meta") | |
private implicit val timeout = Timeout(5.seconds) | |
override def add(bundleUrn: String, meta: FileMeta): Future[Unit] = { | |
(replicator ? ((ref: ActorRef[Replicator.UpdateResponse[_]]) => | |
Replicator.Update(Key, LWWMap.empty[String, FileMeta], Replicator.WriteLocal, ref, Some(meta)) { | |
m => m + (bundleUrn -> meta) | |
})) | |
.map(_ => ()) | |
} | |
override def get(bundleUrn: String): Future[Option[FileMeta]] = { | |
(replicator ? ((ref: ActorRef[Replicator.GetResponse[_]]) => Replicator.Get(Key, Replicator.ReadLocal, ref))) | |
.map { | |
case [email protected](Key, _) => | |
resp.get(Key).get(bundleUrn) | |
case _ => | |
//sys.log.error(failure.toString) | |
None | |
} | |
} | |
} | |
object Main { | |
def main(args: Array[String]): Unit = { | |
val log = LoggerFactory.getLogger(getClass) | |
val config = ConfigFactory.load() | |
log.info(s"Akka cluster listening ${config.getString("akka.remote.netty.tcp.hostname")}:${config.getString("akka.remote.netty.tcp.port")}, " + | |
s"seeds: ${config.getList("akka.cluster.seed-nodes").unwrapped().asScala.toList}") | |
implicit val system = ActorSystem(Behaviors.empty[NotUsed], "ddata-test", config) | |
implicit val untypedSystem = system.toUntyped | |
implicit val ctx = system.executionContext | |
implicit val scheduler = system.scheduler | |
implicit val cluster = Cluster(untypedSystem) | |
implicit val replicator = DistributedData(system).replicator | |
val fileMetaStorage = new DDataFileMetaStorage | |
if (config.getBoolean("generate-data")) { | |
for (n <- 1 to 1000000) { | |
val md5 = Array.fill(16) { | |
0: Byte | |
} | |
Random.nextBytes(md5) | |
val sha1 = Array.fill(20) { | |
0: Byte | |
} | |
Random.nextBytes(sha1) | |
val sha256 = Array.fill(32) { | |
0: Byte | |
} | |
Random.nextBytes(sha256) | |
val cats = Seq.fill(3)(Random.nextInt()) | |
val meta = FileMeta(Hashes(md5, sha1, sha256), Random.nextLong(), cats, DateTime.now()) | |
fileMetaStorage.add(Random.nextString(50), meta) | |
if (n % 1000 == 0) println(s"$n added") | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment