Skip to content

Instantly share code, notes, and snippets.

@huntc
Last active May 25, 2020 01:32
Show Gist options
  • Save huntc/e1170f174d18459c6b9c0cde8439dd77 to your computer and use it in GitHub Desktop.
Save huntc/e1170f174d18459c6b9c0cde8439dd77 to your computer and use it in GitHub Desktop.
An example of sourcing and persisting state in Streambed
val storageLoader = storage
.load(LatestDashboardsEvents.Codec(
principal.getSecret(DashboardsEvents.EventsKey)),
latestDashboardsEventsId)
.map { maybeSnapshot =>
maybeSnapshot.getOrElse(LatestDashboardsEvents(List.empty, None))
}
def tailFromOffset(
offset: Option[Long],
finite: Boolean): Source[(DashboardsEvents.Event, Long), NotUsed] =
durableQueue
.source(DashboardsEvents.EventsTopic, offset, finite)
.dropWhile(r => offset.contains(r.offset))
.via(DashboardsEvents.tailer(principal.getSecret))
def nextState(latestDashboardsEvents: LatestDashboardsEvents,
event: DashboardsEvents.Event,
offset: Long): LatestDashboardsEvents = {
event match {
case e: DashboardsEvents.DashboardRemoved =>
LatestDashboardsEvents(
latestDashboardsEvents.elements.filterNot {
case DashboardsEvents.DashboardUpdated(id, _, _) =>
id == e.id
case _: DashboardsEvents.DashboardRemoved =>
false // Should never happen
},
Some(offset)
)
case e: DashboardsEvents.DashboardUpdated =>
LatestDashboardsEvents(
(e :: latestDashboardsEvents.elements.filterNot {
case DashboardsEvents.DashboardUpdated(id, _, _) =>
id == e.id
case _: DashboardsEvents.DashboardRemoved =>
false // Should never happen
}).take(maxDashboards),
Some(offset)
)
}
}
def storageSaver(
latestDashboardsEvents: LatestDashboardsEvents): Future[Done] =
if (saveSnapshots)
storage.save(LatestDashboardsEvents.Codec(
principal.getSecret(DashboardsEvents.EventsKey)),
latestDashboardsEventsId,
latestDashboardsEvents)
else
noopSaver
Source
.fromFuture(storageLoader)
.flatMapConcat { initialState =>
Source(initialState.elements.reverse)
.map(StoredEvent.apply)
.concat(
tailFromOffset(initialState.offset, finite)
.scan((initialState, Option.empty[DashboardsEvents.Event])) {
case ((state, _), (element, offset)) =>
val newState = nextState(state, element, offset)
(newState, Some(element))
}
.wireTap(
Sink.foreach[(LatestDashboardsEvents,
Option[DashboardsEvents.Event])] {
case (latestDashboardsEvents, _) =>
val _ = storageSaver(latestDashboardsEvents)
}
)
.collect {
case (state, Some(e)) =>
StreamedEvent(e, state.offset)
}
.prepend(Source.single[SnapshotEvent[DashboardsEvents.Event]](
StoredEventsDone(initialState.offset, Option.empty)))
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment