-
-
Save bogdanRada/e59a9747da7a63320687e7dc69b4eb70 to your computer and use it in GitHub Desktop.
Experimenting with forking and unix sockets in Ruby
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env ruby | |
require 'base64' | |
require 'socket' | |
require 'fileutils' | |
require 'securerandom' | |
# UnixSocketForker is an experiment of inter-process communication using | |
# plain unix sockets to communicate between forked processes and the | |
# parent process. This can also be done via IO.pipe. In this experiment, | |
# the jobs are simply random arrays whose sums are calculated in the forked | |
# worker processes. | |
class UnixSocketForker | |
UNIX_SOCKET_FILE = '/tmp/sock' | |
NUM_PROCESSES = 5 | |
def initialize | |
@jobs = [] | |
@job_mutex = Mutex.new | |
start_server | |
spawn_workers | |
end | |
def submit(job) | |
@job_mutex.synchronize do | |
@jobs << job | |
end | |
end | |
private | |
def spawn_workers | |
pids = NUM_PROCESSES.times.map do | |
fork do | |
socket = UNIXSocket.new(UNIX_SOCKET_FILE) | |
while job = socket.gets | |
ary = decode_job(job.chomp) | |
socket.puts("Process #{$$} executed sum: #{ary.inject(:+)}") | |
end | |
socket.close | |
end | |
end | |
at_exit { pids.each { |pid| Process.wait(pid) } } | |
end | |
def start_server | |
FileUtils.rm(UNIX_SOCKET_FILE) if File.exists?(UNIX_SOCKET_FILE) | |
@server = UNIXServer.new(UNIX_SOCKET_FILE) | |
@read_sockets = [@server] | |
@write_sockets = [] | |
Thread.new do | |
loop do | |
readables, writeables, _ = IO.select(@read_sockets, @write_sockets) | |
handle_readables(readables) | |
handle_writeables(writeables) | |
end | |
end | |
end | |
def handle_readables(sockets) | |
sockets.each do |socket| | |
if socket == @server | |
conn = socket.accept | |
@read_sockets << conn | |
@write_sockets << conn | |
else | |
puts socket.gets | |
end | |
end | |
end | |
def handle_writeables(sockets) | |
sockets.each do |socket| | |
@job_mutex.synchronize do | |
unless @jobs.empty? | |
socket.puts(encode_job(@jobs.shift)) | |
end | |
end | |
end | |
end | |
def encode_job(job) | |
# remove silly newlines injected by Ruby's base64 library | |
Base64.encode64(Marshal.dump(job)).delete("\n") | |
end | |
def decode_job(job) | |
Marshal.load(Base64.decode64(job)) | |
end | |
end | |
if $0 == __FILE__ | |
# Create forker and submit jobs. | |
forker = UnixSocketForker.new | |
100.times do | |
job = (1..rand(1_000_000)).to_a | |
forker.submit(job) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment