Last active
December 30, 2016 07:36
-
-
Save elct9620/bc0a6ec00ade8ad1c3a0dd2f004a1697 to your computer and use it in GitHub Desktop.
Reactive Programming implement in Ruby (Self implemented version)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
=begin | |
Observable.create( -> (x) { x.on_next(1) } | |
.subscribe( -> (x) { p x }) | |
=end | |
require 'set' | |
class Scheduler | |
def initialize | |
@thread = Thread.new { run } | |
@subscriber = Set.new | |
end | |
def run | |
loop do | |
@subscriber.each do |observer| | |
observer.call | |
end | |
end | |
end | |
def add(observer) | |
@subscriber.add(observer) | |
end | |
end | |
$scheduler = Scheduler.new | |
class RxObservable | |
def initialize(observer) | |
if observer.is_a?(Observer) | |
@observer = observer | |
else | |
@observer = Observer.new | |
fn.call(@observer) | |
end | |
end | |
def subscribe(fn) | |
$scheduler.add -> { fn.call(@observer.next) } | |
end | |
def self.create(observer) | |
RxObservable.new(observer) | |
end | |
end | |
class Observer < Enumerator | |
def initialize | |
@buffer = [] | |
super do |yielder| | |
loop do | |
yielder << @buffer.shift if @buffer.size > 0 | |
end | |
end | |
end | |
def on_next(value) | |
@buffer << value | |
end | |
end | |
subject = Observer.new | |
subject.on_next 1 | |
subject.on_next 2 | |
RxObservable.create(subject) | |
.subscribe( -> (x) { p "Got value #{x}"} ) | |
subject.on_next 3 | |
loop do | |
subject.on_next 4 | |
sleep 1 | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment