Skip to content

Instantly share code, notes, and snippets.

@elct9620
Last active December 30, 2016 07:36
Show Gist options
  • Save elct9620/bc0a6ec00ade8ad1c3a0dd2f004a1697 to your computer and use it in GitHub Desktop.
Save elct9620/bc0a6ec00ade8ad1c3a0dd2f004a1697 to your computer and use it in GitHub Desktop.
Reactive Programming implement in Ruby (Self implemented version)
=begin
Observable.create( -> (x) { x.on_next(1) }
.subscribe( -> (x) { p x })
=end
require 'set'
class Scheduler
def initialize
@thread = Thread.new { run }
@subscriber = Set.new
end
def run
loop do
@subscriber.each do |observer|
observer.call
end
end
end
def add(observer)
@subscriber.add(observer)
end
end
$scheduler = Scheduler.new
class RxObservable
def initialize(observer)
if observer.is_a?(Observer)
@observer = observer
else
@observer = Observer.new
fn.call(@observer)
end
end
def subscribe(fn)
$scheduler.add -> { fn.call(@observer.next) }
end
def self.create(observer)
RxObservable.new(observer)
end
end
class Observer < Enumerator
def initialize
@buffer = []
super do |yielder|
loop do
yielder << @buffer.shift if @buffer.size > 0
end
end
end
def on_next(value)
@buffer << value
end
end
subject = Observer.new
subject.on_next 1
subject.on_next 2
RxObservable.create(subject)
.subscribe( -> (x) { p "Got value #{x}"} )
subject.on_next 3
loop do
subject.on_next 4
sleep 1
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment