Created
February 27, 2021 07:47
-
-
Save sunaot/a987c33ad8af562fe0889ab33edce56b to your computer and use it in GitHub Desktop.
Ractor 使って Java Stream API のようなお手軽並列処理。map や filter の単位での並列化 version.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: true | |
require_relative "ractor_stream/version" | |
# [1,2,3,4].parallel.filter {|n| n.even?}.map {|n| n * 2}.each {|item| puts item} | |
class Enumerator | |
class RactorParallelStream | |
Consumer = Struct.new(:message, :processor, :args, :keyword_args) | |
def initialize(enumerator) | |
@enumerator = enumerator | |
@consumers = [] | |
end | |
def map(&block) | |
@consumers << Consumer.new(:map, block) | |
self | |
end | |
def filter(&block) | |
@consumers << Consumer.new(:filter, block) | |
self | |
end | |
# termination | |
def each(&block) | |
enumerator = pipeline_call(@enumerator, @consumers) | |
enumerator.map do |item| | |
Ractor.new(item, &block) | |
end.each(&:take) | |
end | |
private | |
def pipeline_call(enumerator, consumers) | |
return enumerator if consumers.empty? | |
consumer, rest = consumers.first, consumers.drop(1) | |
e = case consumer.message | |
when :map | |
parallel_map(enumerator, consumer) | |
when :filter | |
parallel_filter(enumerator, consumer) | |
end | |
pipeline_call(e, rest) | |
end | |
def parallel_map(enumerator, consumer) | |
enumerator.map do |item| | |
Ractor.new(item, &consumer.processor) | |
end.map(&:take) | |
end | |
def parallel_filter(enumerator, consumer) | |
items = enumerator.map do |item| | |
ractor = Ractor.new(item, &consumer.processor) | |
[item, ractor] | |
end | |
items.inject([]) do |result, (item, ractor)| | |
if ractor.take | |
result << item | |
else | |
result | |
end | |
end | |
end | |
end | |
end | |
module ParallelStream | |
class Error < StandardError; end | |
def parallel_stream(&block) | |
Enumerator::RactorParallelStream.new(self) | |
end | |
alias_method :parallel, :parallel_stream | |
end | |
class Enumerator | |
include ParallelStream | |
end | |
class Array | |
include ParallelStream | |
end | |
def tarai(x, y, z) = | |
x <= y ? y : tarai(tarai(x-1, y, z), | |
tarai(y-1, z, x), | |
tarai(z-1, x, y)) | |
require 'benchmark' | |
Benchmark.bm do |x| | |
# sequential version | |
x.report('seq'){ 4.times{ tarai(14, 7, 0) } } | |
# parallel version | |
x.report('par'){ | |
4.times.map do | |
Ractor.new { tarai(14, 7, 0) } | |
end.each(&:take) | |
} | |
# stream version | |
x.report('str'){ 4.times.parallel.map{ tarai(14,7,0) }.each {} } | |
end | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment