Created
August 18, 2012 16:29
-
-
Save therealadam/3388161 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Simple Queue | |
# ------------ | |
# | |
# A riff on bitly's HTTP-based simple queue[1] | |
# | |
# [1] https://github.com/bitly/simplehttp/blob/master/simplequeue/simplequeue.c | |
# | |
# Is it any good? | |
# =============== | |
# | |
# As an exercise, yes! | |
# | |
# Usage | |
# ===== | |
# | |
# ``` | |
# $ ruby simplequeue.rb | |
# $ nc localhost 4242 | |
# PUT 123 | |
# OK | |
# PUT 456 | |
# OK | |
# GET | |
# OK 123 | |
# | |
# GET | |
# OK 456 | |
# | |
# STATS | |
# | |
# puts: 2 | |
# gets: 2 | |
# OK | |
# ``` | |
# | |
# What does it do? | |
# ================ | |
# | |
# - Enqueue via `PUT <whatever>` | |
# - Dequeue via `GET` | |
# - Enqueue multiple via `MPUT <separator> <whatever> <sep> <whatever> ...` | |
# (TODO) | |
# - Dequeue multiple via `MGET <num> <separator>` (TODO) | |
# - Dump the entire queue via `DUMP` (TODO) | |
# - Exit on `EXIT` (TODO) | |
# - Fetch stats via `STATS` | |
# - Tracks put/get operations | |
# - Tracks queue depth and high water mark (TODO) | |
# - Tracks queue size in bytes (TODO) | |
# - Overflows the in-memory queue to an on-disk log (TODO) | |
# - Tracks overflow size (TODO) | |
# - Flush overflow log on SIGHUP (TODO) | |
# | |
# What's a good song to listen to right now? | |
# ========================================== | |
# | |
# "Good Vibrations". Did you realize it's the same song, structurally and | |
# thematically, as "Whole Lotta Love"? But the former is way better! | |
# | |
require 'celluloid/io' | |
class QueueStats | |
include Celluloid | |
def initialize | |
@gets = 0 | |
@puts = 0 | |
end | |
def incr_gets | |
@gets += 1 | |
end | |
def incr_puts | |
@puts += 1 | |
end | |
def dump | |
%{ | |
puts: #{@puts} | |
gets: #{@gets} | |
} | |
end | |
end | |
class SimpleQueue | |
include Celluloid::IO | |
def initialize(queue, host, port) | |
puts "*** Starting queue server on #{host}:#{port}" | |
@queue = queue | |
@stats = QueueStats.new | |
@server = TCPServer.new(host, port) | |
run! | |
end | |
def finalize | |
@server.close if @server | |
end | |
def run | |
loop { handle_connection!(@server.accept) } | |
end | |
def handle_connection(socket) | |
_, port, host = socket.peeraddr | |
puts "*** Received connection from #{host}:#{port}" | |
loop do | |
message = socket.readpartial(4096) | |
puts "*** Message: #{message}" | |
case message | |
when /^GET/ | |
puts "*** GET" | |
obj = @queue.pop | |
socket.write("OK #{obj}\n") | |
@stats.incr_gets! | |
when /^PUT/ | |
body = message[4..-1] | |
puts "*** PUT #{body}" | |
@queue.push(body) | |
socket.write("OK\n") | |
@stats.incr_puts! | |
when /^STATS/ | |
puts "*** STATS" | |
socket.write(@stats.dump) | |
puts @stats.dump | |
socket.write("OK\n") | |
else | |
puts "*** Unknown operation!" | |
end | |
end | |
rescue EOFError | |
puts "*** #{host}:#{port} disconnected" | |
end | |
end | |
SimpleQueue.new(Queue.new, '0.0.0.0', 4242).join |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment