Last active
February 4, 2020 00:52
-
-
Save jakehawken/98d06ea5217333b48aa80dfb772a451f to your computer and use it in GitHub Desktop.
StreamPubSub: A bare-bones data stream implementation.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class StreamSubscriber<DataModel, LEEOModel> { //LEEO stands for "loading, empty, error, & offline" | |
typealias Callback = (State) -> () | |
typealias CancelBlock = ()->() | |
enum State { | |
case newData(DataModel) | |
case leeo(LEEOModel) | |
} | |
private let callbackList: SinglyLinkedList<Callback> | |
private var cancelAction: CancelBlock? | |
private(set) var lastState: State? | |
fileprivate init() { | |
self.callbackList = SinglyLinkedList(firstValue:{(_) in}) | |
} | |
func addCallback(callback: @escaping Callback) { | |
callbackList.insert(value: callback) | |
} | |
func removeAllCallbacks() { | |
callbackList.trimToRoot() | |
} | |
func cancelStream() { | |
cancelAction?() | |
} | |
fileprivate func setCancelBlock(cancelBlock: @escaping CancelBlock) { | |
cancelAction = cancelBlock | |
} | |
fileprivate func emitNewState(_ newState: State) { | |
callbackList.forEach { (callback) in | |
callback(newState) | |
} | |
lastState = newState | |
} | |
} | |
class StreamPublisher<DataModel, LEEOModel> { | |
typealias CancelBlock = ()->() | |
let subscriber: StreamSubscriber<DataModel, LEEOModel> | |
init() { | |
subscriber = StreamSubscriber<DataModel, LEEOModel>() | |
} | |
func publishNewState(_ state: StreamSubscriber<DataModel, LEEOModel>.State) { | |
subscriber.emitNewState(state) | |
} | |
func setCancelAction(cancelBlock: @escaping CancelBlock) { | |
subscriber.setCancelBlock(cancelBlock: cancelBlock) | |
} | |
} | |
class SinglyLinkedListNode<T> { | |
let data: T | |
var next: SinglyLinkedListNode<T>? | |
init(data: T) { | |
self.data = data | |
} | |
func insert(value: T) { | |
if let next = next { | |
next.insert(value: value) | |
return | |
} | |
next = SinglyLinkedListNode(data: value) | |
} | |
func forEachFromHere(doBlock: (T)->()) { | |
doBlock(data) | |
next?.forEachFromHere(doBlock: doBlock) | |
} | |
func findTerminalNode() -> SinglyLinkedListNode<T> { | |
guard let next = next else { | |
return self | |
} | |
return next.findTerminalNode() | |
} | |
func removeAllChildren() { | |
let next = self.next | |
self.next = nil | |
next?.removeAllChildren() | |
} | |
} | |
class SinglyLinkedList<T> { | |
let rootNode: SinglyLinkedListNode<T> | |
private(set) var tailNode: SinglyLinkedListNode<T> | |
init(firstValue: T) { | |
let first = SinglyLinkedListNode(data: firstValue) | |
self.rootNode = first | |
self.tailNode = first | |
} | |
func insert(value: T) { | |
let newNode = SinglyLinkedListNode(data: value) | |
tailNode.next = newNode | |
tailNode = newNode | |
} | |
func forEach(doBlock: (T)->()) { | |
rootNode.forEachFromHere(doBlock: doBlock) | |
} | |
func trimToRoot() { | |
rootNode.removeAllChildren() | |
} | |
} | |
/* | |
// Example usage: | |
import FirebaseDatabase | |
extension DatabaseReference { | |
func streamSubscriber<T>(expectedDataType: T.Type) -> StreamSubscriber<T, GetDataError> { | |
let publisher = StreamPublisher<T, GetDataError>() | |
let cancelRef = observe(.value) { (snapshot) in | |
if let newValue = snapshot.value as? T { | |
publisher.publishNewState(.newData(newValue)) | |
} | |
} | |
publisher.setCancelAction { [weak self] in | |
self?.removeObserver(withHandle: cancelRef) | |
} | |
return publisher.subscriber | |
} | |
} | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment