Skip to content

Instantly share code, notes, and snippets.

@samidalouche
Last active May 16, 2017 18:08
Show Gist options
  • Save samidalouche/4eaeb891dcd9c51ad987c1f87386b6d2 to your computer and use it in GitHub Desktop.
Save samidalouche/4eaeb891dcd9c51ad987c1f87386b6d2 to your computer and use it in GitHub Desktop.
Subscription doesn't get notified of new elements when reconnecting too early on EventStore
package esplayground
import akka.Done
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import eventstore.Content.Json
import eventstore.{EventData, EventNumber, EventStoreExtension, EventStream, WriteEvents}
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
object ListenForChanges extends App {
implicit val system = ActorSystem()
import system.dispatcher
implicit val materializer = ActorMaterializer()
val connection = EventStoreExtension(system).connection
val streamId = EventStream.Id("mystream")
def listenForChanges(): Future[Done] = {
val publisher = connection.streamPublisher(
streamId = streamId,
fromNumberExclusive = Some(EventNumber(999998)),
infinite = true
)
Source.fromPublisher(publisher)
.map(_.number.value)
.runWith(Sink.foreach(x => println(s"Received: ${x}")))
}
val done = for {
_ <- listenForChanges()
} yield Done
def shutdown(): Future[Unit] = {
for {
_ <- Future.successful(())
_ <- Http().shutdownAllConnectionPools()
_ = materializer.shutdown()
_ <- system.terminate()
} yield ()
}
val terminated = done.andThen {
case _ => shutdown()
}
Await.result(terminated, Duration.Inf)
}
package esplayground
import akka.Done
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import eventstore.Content.Json
import eventstore.{EventData, EventStoreExtension, EventStream, WriteEvents}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
object PushData extends App {
implicit val system = ActorSystem()
import system.dispatcher
implicit val materializer = ActorMaterializer()
val connection = EventStoreExtension(system).connection
val streamId = EventStream.Id("mystream")
def write(): Future[Done] = {
Source(1 to 1000000)
.grouped(1000)
.mapAsync(1) { events =>
val eventDatas = events.map { i => EventData("my-event", data = Json(s"""{whatever:"${i}"}""")) }
val writeCommand = WriteEvents(
streamId = streamId,
events = eventDatas.toList
)
connection.apply(writeCommand)
}
.runWith(Sink.ignore)
}
val done = for {
_ <- write()
} yield Done
def shutdown(): Future[Unit] = {
for {
_ <- Future.successful(())
_ <- Http().shutdownAllConnectionPools()
_ = materializer.shutdown()
_ <- system.terminate()
} yield ()
}
val terminated = done.andThen {
case _ => shutdown()
}
Await.result(terminated, Duration.Inf)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment