Last active
June 24, 2024 10:29
-
-
Save motebaya/cd89216dd41b0a23ed94158ca04bc248 to your computer and use it in GitHub Desktop.
diving deeper and experiment with concurrency and parallelism in Ruby for I/O bound operations.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/ruby | |
# copyright - 2024.03.12 motebaya | |
require "async" | |
require "async/barrier" | |
require "async/http/internet" | |
require "benchmark" | |
require "httparty" | |
require "concurrent" | |
require "parallel" | |
# based https://www.linkedin.com/advice/0/whats-difference-between-concurrent-parallel-programming | |
# - async -> concurrency != parallelism -> same event loop -> singel thread. | |
# best for I/O bound operation. | |
# - sync -> parallelism == concurrency -> no shared event loop -> different thread == independently. | |
# best for CPU bound operation | |
def get_http(index, client, queue) | |
request = client.get("https://httpbin.org/delay/1.6") | |
queue.push(" request ok: #{index}") | |
request&.close if request # always get drain warning if not close each request, dunno. | |
end | |
# w_pipe (write pipe) for IPC fork process. | |
# https://github.com/jnunemaker/httparty | |
def sync_http(index, w_pipe = nil) | |
HTTParty.get("https://httpbin.org/delay/1.6") | |
if w_pipe.nil? | |
" sync request ok: #{index}" | |
else | |
w_pipe.puts(" sync fork request ok: #{index}") | |
end | |
end | |
# i confused, it seem like no different between using `async-http` (async) and httpparty (sync). | |
# https://socketry.github.io/async/guides/asynchronous-tasks/index.html | |
# https://socketry.github.io/async-http/#multiple-requests | |
# using `HTTParty` | |
# - the function called asynchronously, but the http request are synchronously. | |
# it mean each request is blocking and must complete before the next one. | |
# using `async-http` | |
# - each http request is non-blocking and can overlap in execution with other tasks. | |
# and even though request are made asycn, the barrier ensures that program wait for all request | |
# complete before next step. | |
# The differences don't seem too clear, but i think it would be more efficient to | |
# work with async overall. | |
def async_test | |
Async do | |
results = Concurrent::Array.new | |
client = Async::HTTP::Internet.new | |
barrier = Async::Barrier.new | |
3.times do |i| | |
barrier.async do | |
# sleep 1 | |
results << get_http(i + 1, client, results) | |
# get_http(i + 1, client, results) | |
end | |
end | |
barrier.wait | |
puts results.join | |
end | |
end | |
# based module example: | |
# - https://github.com/socketry/async-http?tab=readme-ov-file#multiple-requests | |
# we can make multiple request using barrier. | |
# it seem like `Promise.all` in javascript, that's mean it will waiting all tasks and | |
# ensure all tasks are finished before continue. | |
# - https://thoughtbot.com/blog/my-adventure-with-async-ruby | |
# | |
# but, based on this article, we can also directly use `async` without using a barrier. | |
# both working and ran in same time, so,, where the different LOL?? | |
# - https://brunosutic.com/blog/async-ruby#async-http | |
# | |
# well, the different using `Async` directly is: | |
# - the program doesn't wait for all tasks to complete before proceeeding to next step | |
# - each task is shceduled to run asynchronously, and the program continue execution to next | |
# step without waiting completion of all tasks. | |
def async_another_test | |
Async do | |
# |task| | |
results = Concurrent::Array.new | |
barrier = Async::Barrier.new | |
3.times do |i| | |
barrier.async do | |
# task.async do | |
results << sync_http(i + 1) | |
end | |
end | |
barrier.wait | |
# look at the 'hello world' string, which will be printed directly when using async | |
# directly. whereas when using a barrier, it won't be printed until all tasks are completed. | |
# puts "hello world" | |
puts results.join | |
end | |
end | |
# a normal call func from loop, no concurrency/parallel. | |
def sync_normal_test | |
results = Concurrent::Array.new | |
3.times do |i| | |
results << sync_http(i+1) | |
end | |
puts results.join | |
end | |
# https://docs.ruby-lang.org/en/3.0/Thread.html | |
def sync_thread_test | |
threads = Concurrent::Array.new | |
3.times do |i| | |
threads << Thread.new do | |
# sleep 1 | |
# sync_http(i + 1) | |
sync_http(i + 1) | |
end | |
end | |
puts threads.map(&:value) | |
end | |
# fork method are from official, based in docs.. this method create child process | |
# and ran in truly parallel/concurrency, each process have different PID (process identifier) | |
# and has its own memory space too. the last is kinda tricky, cause each process is not tied to the other | |
# it require IPC (inter process communication) for sharing the results/data. | |
# i think it not lightweight like `Thread` method which this method sharing memory space with other thread. | |
# and more faster too. | |
# https://docs.ruby-lang.org/en/3.0/Process.html#method-c-fork | |
# https://docs.ruby-lang.org/en/3.0/Process.html#method-c-waitall | |
def sync_fork_test | |
r_pipe, w_pipe = IO.pipe | |
3.times do |i| | |
fork do | |
r_pipe.close | |
sync_http(i + 1, w_pipe) | |
exit! | |
end | |
end | |
w_pipe.close | |
results = Concurrent::Array.new | |
until r_pipe.eof? | |
results << r_pipe.gets.chomp | |
end | |
Process.waitall | |
puts results | |
end | |
# https://github.com/ruby-concurrency/concurrent-ruby | |
# https://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Future.html | |
def sync_concurrent_test | |
results = Concurrent::Array.new | |
3.times do |i| | |
results << Concurrent::Promise.execute do | |
sync_http(i + 1) | |
end | |
end | |
results = results.map(&:value) | |
puts results | |
end | |
# Another method, the docs say `Future` method was deprecated and replaced to `Promises` | |
# https://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/Promises.html | |
def sync_http_promise(index) | |
Concurrent::Promises.future do | |
HTTParty.get("https://httpbin.org/delay/1.6") | |
" sync request ok: #{index}" | |
end | |
end | |
def sync_concurrent_promise_test | |
tasks = Concurrent::Array.new | |
3.times do |i| | |
tasks << sync_http_promise(i + 1) | |
end | |
tasks = Concurrent::Promises.zip(*tasks) | |
puts tasks.value | |
end | |
# it smiliar like python `concurrent.futures.ThreadPoolExecutor` maybe, and it more customable | |
# we can set min/max if threads too. | |
# https://ruby-concurrency.github.io/concurrent-ruby/master/Concurrent/ThreadPoolExecutor.html | |
def sync_concurrent_threadpool_test | |
executor = Concurrent::ThreadPoolExecutor.new( | |
min_threads: 5, | |
max_threads: 5, | |
max_queue: 10, | |
fallback_policy: :caller_runs | |
) | |
results = Concurrent::Array.new | |
3.times do |i| | |
executor.post do | |
results << sync_http(i + 1) | |
end | |
end | |
executor.shutdown | |
executor.wait_for_termination | |
puts results | |
end | |
# simple API for parallelism based on threads. | |
# https://github.com/grosser/parallel | |
def sync_parallel_test | |
results = Concurrent::Array.new | |
Parallel.map( | |
1..3, | |
in_threads: 3 | |
) do |i| | |
results << sync_http(i) | |
end | |
puts results | |
end | |
threads = [] | |
File.foreach(File.expand_path(__FILE__)) do |func| | |
reg = Regexp.new(/^def\s+([^\s\()]+)/).match(func) | |
if !reg.nil? | |
if reg[1].end_with?("test") | |
threads << Thread.new do | |
puts " Running: #{reg[1].upcase}" | |
benchmark = Benchmark.measure { | |
send(reg[1]) | |
} | |
" Benchmark function: #{reg[1].upcase} finished at: #{benchmark.real}\n" | |
end | |
end | |
end | |
end | |
result = threads.map(&:value) | |
puts result.join | |
# async_test | |
# async_another_test | |
# sync_normal_test | |
# sync_thread_test | |
# sync_fork_test | |
# sync_concurrent_test | |
# sync_concurrent_promise_test | |
# sync_concurrent_threadpool_test | |
# sync_parallel_test |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
average concurrency time
3 seconds
having a powerful device to run a threadpool with multi cores is great, but in I/O bound like http requests, it's useless if the internet and server are slow.