Skip to content

Instantly share code, notes, and snippets.

@ha1f
Last active April 25, 2020 08:08
Show Gist options
  • Save ha1f/ac9d2a7d3b216720faa78d036cd13401 to your computer and use it in GitHub Desktop.
Save ha1f/ac9d2a7d3b216720faa78d036cd13401 to your computer and use it in GitHub Desktop.
// Disposable
enum Disposables {
static func create() -> Disposable {
NopDisposable()
}
static func create(_ dispose: @escaping () -> Void) -> Disposable {
ClosureDisposable(dispose)
}
}
protocol Disposable {
func dispose()
}
struct ClosureDisposable: Disposable {
private let _dispose: () -> Void
init(_ dispose: @escaping () -> Void) {
_dispose = dispose
}
func dispose() {
_dispose
}
}
struct NopDisposable: Disposable {
func dispose() { }
}
struct CompositeDisposable: Disposable {
private let _disposables: [Disposable]
init(_ disposables: [Disposable]) {
_disposables = disposables
}
func dispose() {
_disposables.forEach {
$0.dispose()
}
}
}
enum Event<Element> {
case next(Element)
case completed
case error(Error)
}
// Observable
protocol ObservableType {
associatedtype Element
func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.Element == Element
func asObservable() -> Observable<Element>
}
extension ObservableType {
func subscribe(onNext onNextHandler: @escaping (Element) -> Void) -> Disposable {
let observer = Observer(onNext: onNextHandler)
return subscribe(observer)
}
}
/// ベースクラス
class Observable<Element>: ObservableType {
init() { }
func subscribe<O>(_ observer: O) -> Disposable where O: ObserverType, O.Element == Element {
fatalError("We have to overwrite to implement.")
}
func asObservable() -> Observable<Element> {
self
}
}
final class Just<Element>: Observable<Element> {
private let _element: Element
init(_ element: Element) {
_element = element
super.init()
}
override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.Element == Element {
observer.on(event: .next(_element))
observer.on(event: .completed)
return Disposables.create()
}
}
private final class MapSink<T, U>: ObserverType, Disposable {
private let _map: (T) -> U
private let _observer: AnyObserver<U>
private var isDisposed: Bool {
get {
_isDisposedLock.lock()
defer {
_isDisposedLock.unlock()
}
return _isDisposed
}
set {
_isDisposedLock.lock()
defer {
_isDisposedLock.unlock()
}
_isDisposed = newValue
}
}
private var _isDisposed: Bool = false
private let _isDisposedLock = NSLock()
init<O: ObserverType>(_ map: @escaping (T) -> U, observer: O) where O.Element == U {
_map = map
_observer = observer.asObserver()
}
func on(event: Event<T>) {
guard !isDisposed else {
return
}
switch event {
case let .next(element):
_observer.on(event: .next(_map(element)))
case let .error(error):
_observer.on(event: .error(error))
isDisposed = true
case .completed:
_observer.on(event: .completed)
isDisposed = true
}
}
func dispose() {
_isDisposed = true
}
}
final class Map<T, U>: Observable<U> {
private let _source: Observable<T>
private let _map: (T) -> U
init(_ source: Observable<T>, map: @escaping (T) -> U) {
_source = source
_map = map
}
override func subscribe<O>(_ observer: O) -> Disposable where Element == O.Element, O : ObserverType {
let sink = MapSink(_map, observer: observer)
let subscription = _source.subscribe(sink)
return CompositeDisposable([sink, subscription])
}
}
extension ObservableType {
static func just(_ element: Element) -> Observable<Element> {
Just(element)
}
func map<U>(_ map: @escaping (Element) -> U) -> Observable<U> {
Map(self.asObservable(), map: map)
}
}
struct Observer<Element>: ObserverType {
let onNext: ((Element) -> Void)?
let onCompleted: (() -> Void)?
let onError: ((Error) -> Void)?
init(onNext: ((Element) -> Void)? = nil,
onCompleted: (() -> Void)? = nil,
onError: ((Error) -> Void)? = nil) {
self.onNext = onNext
self.onCompleted = onCompleted
self.onError = onError
}
init<T: ObserverType>(_ observer: T) where T.Element == Element {
self.onNext = observer.onNext
self.onCompleted = observer.onCompleted
self.onError = observer.onError
}
func on(event: Event<Element>) {
switch event {
case .next(let element):
onNext?(element)
case .completed:
onCompleted?()
case .error(let error):
onError?(error)
}
}
}
/// ObserverType
protocol ObserverType {
associatedtype Element
func on(event: Event<Element>)
}
extension ObserverType {
func onNext(_ element: Element) {
self.on(event: .next(element))
}
func onCompleted() {
self.on(event: .completed)
}
func onError(_ error: Error) {
self.on(event: .error(error))
}
}
/// AnyObserver
struct AnyObserver<Element>: ObserverType {
private let _onEvent: (Event<Element>) -> Void
init<T: ObserverType>(_ observer: T) where T.Element == Element {
self._onEvent = observer.on
}
func on(event: Event<Element>) {
_onEvent(event)
}
}
extension ObserverType {
func asObserver() -> AnyObserver<Element> {
return AnyObserver(self)
}
}
_ = Observable
.just(40)
.map { $0 * 2 }
.subscribe(onNext: {
print("fsa", $0)
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment