Last active
January 20, 2021 01:39
-
-
Save IronSavior/d2cbb6e4f492150d989d to your computer and use it in GitHub Desktop.
Composable Enumerators in Ruby
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# @author Erik Elmore <[email protected]> | |
# This is getting a little out of hand... :dizzy_face: | |
# For printing trace output for demonstration | |
module Status | |
def status( method_name, args = [], extra = nil ) | |
extra = ' => %s' % extra if extra | |
puts '%s#%s(%s)%s' % [self.class, method_name, args.join(', '), extra] | |
end | |
end | |
# Enumerator which is composed of an Enumerable and an external iterator block. | |
# This allows compatibility with ruby 1.9 | |
class EnumWithIterator | |
include Enumerable | |
def initialize( enum, &iter ) | |
raise ArgumentError, 'The given enum is not Enumerable' unless enum.respond_to? :each | |
raise ArgumentError, 'Block required but not given' unless block_given? | |
raise ArgumentError, 'Iterator block must accept at least one param.' unless iter.arity.abs > 0 | |
@enum = enum | |
@iter = iter | |
end | |
def each( *args ) | |
return enum_for __callee__, *args unless block_given? | |
@iter.call(@enum, *args){ |yielded| yield yielded } | |
end | |
# @return true when the built-in Enumerator class allows args at call-time. (expect false for ruby 1.9) | |
def self.native_support? | |
@native_support ||= Enumerator.new{}.method(:each).arity.abs > 0 | |
end | |
# Constructs either a native Enumerator (for ruby 2+) or an EnumWithIterator (ruby 1.9) | |
def self.build( enum, &iter ) | |
return new enum, &iter unless native_support? | |
raise ArgumentError, 'The given enum is not Enumerable' unless enum.respond_to? :each | |
raise ArgumentError, 'Iterator block is required' unless block_given? | |
raise ArgumentError, 'Iterator block must accept at least one param.' unless iter.arity.abs > 0 | |
Enumerator.new do |y, *args| | |
iter.call(enum, *args){ |yielded| y.yield yielded } | |
end | |
end | |
def self.build_chain( *args ) | |
args.reduce{ |enum, iter| build enum, &iter } | |
end | |
end | |
# Alternative way to wrap access to an Enumerable with an external iterator | |
module Enumerable | |
def with_iterator( &iter ) | |
EnumWithIterator.build self, &iter | |
end | |
end | |
# Mockup for an object which fetches the same number of messages infinitely. | |
class Fetcher | |
include Status | |
def initialize( batch_size = 10 ) | |
@batch_size = batch_size | |
end | |
def each( src, *args ) | |
status __callee__, args, 'start' | |
src.each_slice(@batch_size){ |batch| | |
status __callee__, args, batch.join(', ') | |
yield batch | |
} | |
end | |
end | |
# Same as Fetcher, but each iteration can yield a random number of messages up to the max batch size, including zero. | |
# Simulates a message queue that may give less than a full batch (or zero) per fetch | |
class RandomBatchFetcher | |
include Status | |
def initialize( max_batch_size = 10 ) | |
@max_batch_size = max_batch_size | |
end | |
def each( src, *args, &blk ) | |
enum = Enumerator.new do |y| | |
status __callee__, args, 'start' | |
src = src.enum_for :each | |
loop do | |
batch = Array[] | |
rand(@max_batch_size).times{ batch << src.next } | |
status __callee__, args, batch.join(', ') | |
y.yield batch | |
end | |
end | |
return enum unless block_given? | |
enum.each &blk | |
end | |
end | |
# Tracks messages which are fetched, but not handled. Demonstrates two iterators from the same object. | |
class Tracker | |
include Status | |
def unhandled | |
@unhandled ||= Array[] | |
end | |
def register_fetch( src, *args ) | |
status __callee__, args, 'start' | |
unhandled.clear | |
src.each(*args){ |batch| | |
unhandled.concat batch | |
yield batch | |
} | |
end | |
def msg_handled( src, *args ) | |
status __callee__, args, 'start' | |
src.each(*args){ |msg| | |
yield msg | |
unhandled.delete msg | |
} | |
ensure | |
puts '** Unhandled messages: [%s]' % unhandled.join(', ') unless unhandled.empty? | |
end | |
end | |
# Stops iteration after yielding a maximum number of messages | |
class LimitBreak | |
include Status | |
def initialize( max ) | |
@max = max | |
end | |
def each( src, *args ) | |
status __callee__, args, 'start' | |
n = 0 | |
src.each(*args){ |item| | |
yield item | |
if (n += 1) >= @max | |
status __callee__, args, 'Breaking, limit reached (%s)' % @max | |
break | |
end | |
} | |
end | |
end | |
# Receives batches and yields individual messages. Demonstrates iterator which is just a bare Proc. | |
batch_splitter = proc{ |src, *args, &blk| | |
src.each(*args){ |batch| | |
batch.each{ |msg| blk.call msg } | |
} | |
} | |
# Iterate over all messages the given enum will provide. | |
def demo( enum ) | |
enum.each(some: 'args') do |msg| | |
puts ' --> Handled %s' % msg | |
end | |
end | |
integers = 1.upto Float::INFINITY | |
tracker = Tracker.new | |
# Build a composed enum and run the demo! | |
# Some messages are expected to go unhandled to show that functionality. | |
enum = EnumWithIterator.build_chain( | |
integers, | |
Fetcher.new.method(:each), | |
tracker.method(:register_fetch), | |
batch_splitter, | |
LimitBreak.new(17).method(:each), | |
tracker.method(:msg_handled) | |
) | |
demo enum |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment