Skip to content

Instantly share code, notes, and snippets.

View ghostbuster91's full-sized avatar

Kasper Kondzielski ghostbuster91

View GitHub Profile
Observable
.intervalWithFixedDelay(config.interval)
.mapEval( _ => repository.findAllWaitingOrderedBySerialIdAsc(config.maxBatchSize))
.distinctUntilChanged
.scan(List.empty[Withdrawal]) { (oldSet, newSet) =>
newSet.takeWhile(w => !oldSet.contains(w))
}
def scan[S](seed: => S)(op: (S, A) => S): Observable[S]
def distinctUntilChanged[AA >: A](implicit A: Eq[AA]): Observable[AA]
Observable
.intervalWithFixedDelay(config.interval)
.mapEval( _=> repository.findAllWaitingOrderedBySerialIdAsc())
.flatMap(Observable.fromIterable)
.bufferTimeAndCounted(config.timeWindow, config.batchSize)
Observable
.intervalWithFixedDelay(config.interval)
.mapEval( _=> repository.findAllWaitingOrderedBySerialIdAsc())
.flatMap(Observable.fromIterable)
Observable
.intervalWithFixedDelay(config.interval)
.mapEval( _=> repository.findAllWaitingOrderedBySerialIdAsc())
.map( list => list.groupBy(_.currency))
def startExecutor =
Observable
.intervalWithFixedDelay(config.interval)
.mapEval(_=> repository.findAllWaitingOrderedBySerialIdAsc())
class WithdrawalBatchingExecutor(repository: WithdrawalRepository,
externalSystem: ExternalWithdrawSystem) {
def startExecutor = ???
}
trait ExternalWithdrawSystem {
def executeWithdrawals(List[Withdraw]) : Task[Unit]
}
trait WithdrawalRepository {
def findAllWaitingOrderedBySerialIdAsc() : Task[List[Withdrawal]]
}