-
-
Save dangthaison91/4ad50870600b8c9705406ec495122218 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// ObservableBuffer.swift | |
// | |
// Created by Daniel Tartaglia | |
// Copyright © 2017 Daniel Tartaglia. MIT License. | |
extension Observable { | |
/// collects elements from the source sequence until the boundary sequence fires. Then it emits the elements as an array and begins collecting again. | |
func buffer<U>(_ boundary: Observable<U>) -> Observable<[E]> { | |
return Observable<[E]>.create { observer in | |
var buffer: [E] = [] | |
let lock = NSRecursiveLock() | |
let boundaryDisposable = boundary.subscribe { event in | |
lock.lock(); defer { lock.unlock() } | |
switch event { | |
case .next: | |
observer.onNext(buffer) | |
buffer = [] | |
default: | |
break | |
} | |
} | |
let disposable = self.subscribe { event in | |
lock.lock(); defer { lock.unlock() } | |
switch event { | |
case .next(let element): | |
buffer.append(element) | |
case .completed: | |
observer.onNext(buffer) | |
observer.onCompleted() | |
case .error(let error): | |
observer.onError(error) | |
buffer = [] | |
} | |
} | |
return Disposables.create([disposable, boundaryDisposable]) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment