Skip to content

Instantly share code, notes, and snippets.

@inkel
Last active August 29, 2015 13:57
Show Gist options
  • Save inkel/9559042 to your computer and use it in GitHub Desktop.
Save inkel/9559042 to your computer and use it in GitHub Desktop.
#! /usr/bin/env ruby
require "redic"
require "clap"
require "pry"
$verbose = 0
$timeout = 10_000_000
action, *args = Clap.run ARGV, {
"-v" => -> { $verbose += 1 },
"-t" => ->(ms) { $timeout = ms }
}
abort "Usage: #{$0} <action> ip:port [...]" if action.nil? or args.nil? or args.empty?
module UI
def err msg
$stderr.puts msg
end
def abort msg, backtrace=[]
# err "\033[1;31m#{msg}\033[0m"
err msg
backtrace.each { |line| err line } if $verbose > 1
exit 1
end
def info *args
$stdout.puts args.join(" ")
end
def log *args
$stdout.puts args.join(" ") if $verbose > 0
end
def debug *args
$stdout.puts args.join(" ") if $verbose > 1
end
extend self
end
class Node
[:id, :ip_port, :flags, :master_id, :ping, :pong, :config, :state, :slots].each do |a|
attr a
end
def initialize(ip_port)
@ip_port = ip_port
load_info!
end
def cluster_enabled?
call("INFO", "cluster").include?("cluster_enabled:1")
end
def only_node?
call("CLUSTER", "INFO").include?("cluster_known_nodes:1")
end
def empty?
call("INFO", "keyspace").strip == "# Keyspace"
end
def load_info!
call("CLUSTER", "NODES").split("\n").each do |line|
parts = line.split
next unless parts[2].include?("myself")
set_info!(*parts)
end
end
def set_info!(id, ip_port, flags, master_id, ping, pong, config, state, *slots)
@id = id
@flags = flags.split(",")
@master_id = master_id
@ping = ping
@pong = pong
@config = config
@state = state
@slots = slots
@ip_port = ip_port unless flags.include?("myself")
end
def client
@client ||= Redic.new("redis://#{@ip_port}")
end
def call(*args)
UI.debug ">", *args
client.call(*args)
end
def dead?
%w{ disconnected fail noaddr }.any? do |flag|
flags.include?(flag)
end
end
def alive?
p [ip_port, flags, state]
!dead?
end
def to_s
"#{@id} [#{@ip_port}]"
end
end
class Cluster
SLOTS = 16384
def initialize(addrs)
@addrs = Array(addrs)
end
def nodes
@nodes ||= @addrs.map { |addr| Node.new(addr) }
end
def allocate_slots(node, slots)
UI.log "Allocating #{slots.size} slots (#{slots.first}..#{slots.last}) in node #{node}"
UI.debug "> CLUSTER ADDSLOTS #{slots.first}..#{slots.last}"
res = node.client.call("CLUSTER", "ADDSLOTS", *slots)
UI.abort res.message if res.is_a?(RuntimeError)
end
def add_node(node)
default = nodes.first
UI.log "Joining node #{node} to node #{default}"
ip, port = node.ip_port.split(":")
res = default.call("CLUSTER", "MEET", ip, port)
UI.abort res.message if res.is_a?(RuntimeError)
end
def remove_node(node)
default = nodes.first
UI.log "Removing node #{node} from cluster"
res = default.call("CLUSTER", "FORGET", node.id)
UI.abort res.message if res.is_a?(RuntimeError)
end
def call(*args)
nodes.sample.call("CLUSTER", "NODES").split("\n").each do |line|
_, ip_port, _ = line.split
node = Node.new(ip_port)
UI.info "#{node}: #{args.join(' ')}"
res = node.call(*args)
UI.info res
UI.info "--"
end
end
def create!
nodes.each do |node|
raise ArgumentError, "Redis Server at #{node.ip_port} not running in cluster mode" unless node.cluster_enabled?
raise ArgumentError, "Redis Server at #{node.ip_port} already exists in a cluster" unless node.only_node?
raise ArgumentError, "Redis Server at #{node.ip_port} is not empty" unless node.empty?
end
UI.log "Allocating #{SLOTS} slots in #{nodes.length} nodes"
available_slots = 0.upto(SLOTS - 1).each_slice((SLOTS.to_f / nodes.length).ceil)
nodes.each do |node|
slots = available_slots.next.to_a
allocate_slots(node, slots)
end
nodes.each { |node| add_node node }
end
def info
{}.tap do |info|
nodes.sample.call("CLUSTER", "INFO").split.each do |line|
stat, val = line.split(":")
info[stat.to_sym] = if stat == "cluster_state"
val
else
val.to_i
end
end
end
end
end
begin
case action
when "create"
cluster = Cluster.new(args)
cluster.create!
when "add"
cluster = Cluster.new(args.shift)
args.each do |addr|
cluster.add_node(Node.new(addr))
end
when "remove"
cluster = Cluster.new(args.shift)
args.each do |addr|
cluster.remove_node(Node.new(addr))
end
when "call"
cluster = Cluster.new(args.shift)
cluster.call(*args)
end
rescue => ex
UI.abort ex.message, ex.backtrace
end
#! /usr/bin/env bash
for port in $*
do
./redis-server --cluster-enabled yes \
--port $port \
--cluster-config-file nodes.$port.conf \
--logfile /dev/null \
--daemonize yes
done
watch "redis-cli -p $1 ping > /dev/null && redis-cli -p $1 cluster info && redis-cli -p $1 cluster nodes"
kill $(pgrep -f ./redis-server)
for port in $*; do rm nodes.$port.conf; done
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment