Created
March 4, 2011 01:42
-
-
Save armstrjare/854007 to your computer and use it in GitHub Desktop.
Buffer redis pipeline calls, which flush on definable intervals.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Helper class for buffering pipelined redis calls | |
# | |
# Options: | |
# - flush_at : The number of commands to keep in the buffer before flushing. | |
# A value of 1 will flush after every call. | |
# A value of 0 will *never* flush (must call flush! explicitly) | |
# | |
# Usage: | |
# redis_pipeline = RedisPipeline.new(redis_client, :flush_at => 50) | |
# | |
# redis_pipeline.hset(some_redis_key, some_field, some_value) | |
# redis_pipeline.sadd(some_redis_key, some_value) | |
# ... 1000 times | |
# # will auto flush to redis | |
# ... some more times ... | |
# redis_pipeline.flush! # to make sure everything in the buffer is flushed. | |
# | |
class RedisPipelineBuffer | |
@@default_options = { :flush_at => 1000 } | |
cattr_accessor :default_options | |
@redis = nil # Redis client instance to send commands to | |
@queue = [] # Queue of unflushed commands | |
def initialize(redis, options = {}) | |
@redis = redis | |
@options = @@default_options.merge(options) | |
end | |
# Flush the buffer | |
# Returns: the number of commands sent through the pipeline | |
def flush! | |
len = @queue.length | |
@redis.pipeline do | |
@queue.each { |cmd| @redis.send(*cmd) } | |
end unless len.zero? | |
@queue.clear | |
len | |
end | |
# Catch any command sent to the client, and dispatch it to the the Redis client only when | |
# reaching the speficied buffer limit | |
def method_missing(*args) | |
@queue << args | |
flush_if_needed! | |
end | |
private | |
def flush_if_needed! | |
if @options[:flush_at] && @options[:flush_at] != 0 | |
flush! if @queue.length >= @options[:flush_at] | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment