Skip to content

Instantly share code, notes, and snippets.

@mocobeta
Created May 5, 2014 03:03
Show Gist options
  • Save mocobeta/ebb1e4512f637a19ca7a to your computer and use it in GitHub Desktop.
Save mocobeta/ebb1e4512f637a19ca7a to your computer and use it in GitHub Desktop.
Redis : SET のシャーディング
import binascii
import random
TOTAL_ELEMENTS = 2000
SHARD_SIZE = 512
def shard_key(base, key, total_elements, shard_size):
"""
シャードキーを計算する関数
「Redis入門」リスト9-7 から引用
- total_elements: 想定する総データ数
- shard_size: 1シャードに格納する最大データ数
"""
if isinstance(key, int) or key.isdigit():
shard_id = int(str(key), 10) // shard_size
else:
shards = 2 * total_elements // shard_size
shard_id = binascii.crc32(key.encode('utf-8')) % shards
return '%s:%s' % (base, shard_id)
def shard_sadd(conn, base, member,
total_elements=TOTAL_ELEMENTS, shard_size=SHARD_SIZE):
"""
シャーディング対応の SADD 関数
「Redis入門」リスト9-10 を改変(1度に複数のメンバーを追加できるように変更)
"""
if isinstance(member, (list, set, range)):
pipe = conn.pipeline()
for m in member:
shard = shard_key(base, 'x'+str(m), total_elements, shard_size)
pipe.sadd(shard, m)
return pipe.execute()[-1]
else:
shard = shard_key(base, 'x'+str(member), total_elements, shard_size)
return conn.sadd(shard, member)
def shard_srem(conn, base, value,
total_elements=TOTAL_ELEMENTS, shard_size=SHARD_SIZE):
"""
シャーディング対応の SREM 関数
"""
if isinstance(value, (list, set, range)):
pipe = conn.pipeline()
for v in value:
shard = shard_key(base, 'x'+str(v), total_elements, shard_size)
pipe.srem(shard, v)
return pipe.execute()[-1]
else:
shard = shard_key(base, 'x'+str(value), total_elements, shard_size)
return conn.srem(shard, value)
def shard_sismember(conn, base, value,
total_elements=TOTAL_ELEMENTS, shard_size=SHARD_SIZE):
"""
シャーディング対応の SISMEMBER 関数
"""
shard = shard_key(base, 'x'+str(value), total_elements, shard_size)
return conn.sismember(shard, value)
def shard_scard(conn, base,
total_elements=TOTAL_ELEMENTS, shard_size=SHARD_SIZE):
"""
シャーディング対応の SCARD 関数
"""
shards = 2 * total_elements // shard_size
card = 0
for shard_num in range(shards):
shard = '%s:%s' % (base, shard_num)
card += conn.scard(shard)
return card
def shard_smove(conn, src, dest, member,
total_elements=TOTAL_ELEMENTS, shard_size=SHARD_SIZE):
"""
シャーディング対応の SMOVE 関数
"""
src_shard = shard_key(src, 'x'+str(member), total_elements, shard_size)
dest_shard = shard_key(dest, 'x'+str(member), total_elements, shard_size)
return conn.smove(src_shard, dest_shard, member)
def shard_spop(conn, base,
total_elements=TOTAL_ELEMENTS, shard_size=SHARD_SIZE):
"""
シャーディング対応の SPOP 関数
"""
shards = 2 * total_elements // shard_size
# ランダムにシャードを1つ選び、選んだシャードからSPOP
shard_num = random.randrange(0, shards)
shard = '%s:%s' % (base, shard_num)
return conn.spop(shard)
def shard_smembers(conn, base,
total_elements=TOTAL_ELEMENTS, shard_size=SHARD_SIZE):
"""
シャーディング対応の SMEMBERS 関数
"""
shards = 2 * total_elements // shard_size
all_shard_keys = [
'%s:%s' % (base, shard_num) for shard_num in range(shards)]
return conn.sunion(all_shard_keys)
def _store_common(conn, method, dest, keys, total_elements, shard_size):
"""
シャーディング対応 SxxxSTORE 用の共通関数
"""
shards = 2 * total_elements // shard_size
pipe = conn.pipeline()
for shard_num in range(shards):
dest_shard = '%s:%s' % (dest, shard_num)
src_shards = ['%s:%s' % (key, shard_num) for key in keys]
getattr(pipe, method)(dest_shard, *src_shards)
return pipe.execute()[-1]
def shard_sinterstore(conn, dest, keys,
total_elements=TOTAL_ELEMENTS, shard_size=SHARD_SIZE):
"""
シャーディング対応の SINTERSTORE 関数
"""
return _store_common(
conn, 'sinterstore', dest, keys, total_elements, shard_size)
def shard_sunionstore(conn, dest, keys,
total_elements=TOTAL_ELEMENTS, shard_size=SHARD_SIZE):
"""
シャーディング対応の SUNIONSTORE 関数
"""
return _store_common(
conn, 'sunionstore', dest, keys, total_elements, shard_size)
def shard_sdiffstore(conn, dest, keys,
total_elements=TOTAL_ELEMENTS, shard_size=SHARD_SIZE):
return _store_common(
conn, 'sdiffstore', dest, keys, total_elements, shard_size)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment