Skip to content

Instantly share code, notes, and snippets.

@adjam
Created October 28, 2016 14:27
Show Gist options
  • Save adjam/38526e3703e2dea39b25d51d0785b1d3 to your computer and use it in GitHub Desktop.
Save adjam/38526e3703e2dea39b25d51d0785b1d3 to your computer and use it in GitHub Desktop.
require 'fiber'
module Spofford
class Stage
attr_accessor :source, :name
def initialize(options = {},&block)
@transformer ||= method(:transform)
@filter ||= method(:filter)
@fiber_delegate = Fiber.new do
process
end
options.each do |k,v|
send "#{k}=", v
end
end
def |(other)
other.source = self
other
end
def resume
if @fiber_delegate.alive?
@fiber_delegate.resume
else
raise StopIteration
end
end
def process
while ( value = input )
handle_value(value)
end
end
def input
self.source.resume
end
def output(value)
Fiber.yield(value)
end
def handle_value(value)
if value == StopIteration
raise StopIteration
else
output(@transformer.call(value)) if @filter.call(value)
end
end
def transform(value)
value
end
def filter(value)
true
end
def inspect
self.name
end
def path
result = [self]
result << self.source.path unless self.source.nil?
result.flatten
end
end
class Filter < Stage
def initialize(options={},&block)
@filter = block
super
end
end
class Transformer < Stage
def initialize(options={},&block)
options[:name] ||= "transformer"
@transformer = block
super
end
end
class Builder
attr_reader :stages
def initialize
@stages = []
end
def filter(options={}, &block)
check_options(options)
@stages << Filter.new(options,&block)
end
def transform(options={},&block)
check_options(options)
@stages << Transformer.new(options,&block)
end
def nonnil
filter({:name=>"Filter blanks"}) { |x| not x.nil? }
end
def upcase
transform({:name=>"Upper case"}) {|x| x.upcase}
end
private
def check_options(type,options={})
options[:name] ||= "type-#{@stages.length+1}"
end
end
class Pipeline
attr_accessor :enumerable
def initialize(enumerable)
self.enumerable = enumerable
@delegate_fiber = Fiber.new do
while v = @enumerable.next
Fiber.yield v
end
end
@last_stage = self
end
def |(next_stage)
if @last_stage != self
@last_stage | next_stage
else
next_stage.source=self
end
@last_stage = next_stage
self
end
def enumerable=(something)
if something.respond_to?(:next)
@enumerable = something
else
@enumerable = something.each.lazy
end
end
def resume
@delegate_fiber.resume
end
def name
"(start)"
end
def source
nil
end
def path
[]
end
def debug
stages = @last_stage.path.reverse!
" (input) ->" << stages.collect { |x| x.name }.join(" -> ") << " -> (output)"
end
def run
loop do
begin
yield @last_stage.resume
rescue StopIteration
break
end
end
end
def self.setup(&block)
builder = Builder.new
builder.instance_eval(&block)
result = Pipeline.new([])
current = result
builder.stages.each { |stage|
current = current | stage
}
result
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment