Created
October 28, 2016 14:27
-
-
Save adjam/38526e3703e2dea39b25d51d0785b1d3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'fiber' | |
module Spofford | |
class Stage | |
attr_accessor :source, :name | |
def initialize(options = {},&block) | |
@transformer ||= method(:transform) | |
@filter ||= method(:filter) | |
@fiber_delegate = Fiber.new do | |
process | |
end | |
options.each do |k,v| | |
send "#{k}=", v | |
end | |
end | |
def |(other) | |
other.source = self | |
other | |
end | |
def resume | |
if @fiber_delegate.alive? | |
@fiber_delegate.resume | |
else | |
raise StopIteration | |
end | |
end | |
def process | |
while ( value = input ) | |
handle_value(value) | |
end | |
end | |
def input | |
self.source.resume | |
end | |
def output(value) | |
Fiber.yield(value) | |
end | |
def handle_value(value) | |
if value == StopIteration | |
raise StopIteration | |
else | |
output(@transformer.call(value)) if @filter.call(value) | |
end | |
end | |
def transform(value) | |
value | |
end | |
def filter(value) | |
true | |
end | |
def inspect | |
self.name | |
end | |
def path | |
result = [self] | |
result << self.source.path unless self.source.nil? | |
result.flatten | |
end | |
end | |
class Filter < Stage | |
def initialize(options={},&block) | |
@filter = block | |
super | |
end | |
end | |
class Transformer < Stage | |
def initialize(options={},&block) | |
options[:name] ||= "transformer" | |
@transformer = block | |
super | |
end | |
end | |
class Builder | |
attr_reader :stages | |
def initialize | |
@stages = [] | |
end | |
def filter(options={}, &block) | |
check_options(options) | |
@stages << Filter.new(options,&block) | |
end | |
def transform(options={},&block) | |
check_options(options) | |
@stages << Transformer.new(options,&block) | |
end | |
def nonnil | |
filter({:name=>"Filter blanks"}) { |x| not x.nil? } | |
end | |
def upcase | |
transform({:name=>"Upper case"}) {|x| x.upcase} | |
end | |
private | |
def check_options(type,options={}) | |
options[:name] ||= "type-#{@stages.length+1}" | |
end | |
end | |
class Pipeline | |
attr_accessor :enumerable | |
def initialize(enumerable) | |
self.enumerable = enumerable | |
@delegate_fiber = Fiber.new do | |
while v = @enumerable.next | |
Fiber.yield v | |
end | |
end | |
@last_stage = self | |
end | |
def |(next_stage) | |
if @last_stage != self | |
@last_stage | next_stage | |
else | |
next_stage.source=self | |
end | |
@last_stage = next_stage | |
self | |
end | |
def enumerable=(something) | |
if something.respond_to?(:next) | |
@enumerable = something | |
else | |
@enumerable = something.each.lazy | |
end | |
end | |
def resume | |
@delegate_fiber.resume | |
end | |
def name | |
"(start)" | |
end | |
def source | |
nil | |
end | |
def path | |
[] | |
end | |
def debug | |
stages = @last_stage.path.reverse! | |
" (input) ->" << stages.collect { |x| x.name }.join(" -> ") << " -> (output)" | |
end | |
def run | |
loop do | |
begin | |
yield @last_stage.resume | |
rescue StopIteration | |
break | |
end | |
end | |
end | |
def self.setup(&block) | |
builder = Builder.new | |
builder.instance_eval(&block) | |
result = Pipeline.new([]) | |
current = result | |
builder.stages.each { |stage| | |
current = current | stage | |
} | |
result | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment