Skip to content

Instantly share code, notes, and snippets.

@richzw
Created September 4, 2019 12:10
Show Gist options
  • Save richzw/6ad504646bbf461968df6a7fbfbda49f to your computer and use it in GitHub Desktop.
Save richzw/6ad504646bbf461968df6a7fbfbda49f to your computer and use it in GitHub Desktop.
class RedisSink(object):
def __init__(self, host='localhost', port=6379, password=None, field=None):
self._host = host
self._port = port
self._field = field
self._password = password
self._client = None
self._pool = None
def _connect(self):
self._pool = redis.ConnectionPool(host=self._host, port=self._port, password=self._password, decode_responses=True, max_connections=2)
self._client = redis.Redis(connection_pool=self._pool)
def write(self, dstype, values):
if dstype == REDIS_HSET:
self.write_hset(values)
elif dstype == REDIS_SET:
self.write_set(values)
else:
logging.ERROR("Redis data structure operation is not implemented yet!!!")
def write_hset(self, obj):
if self._client is None:
self._connect()
for k, v in obj.iteritems():
self._client.hmset('home_' + k + '_status', dict(zip(self._field, v)))
def write_set(self, obj):
if self._client is None:
self._connect()
for k, v in obj.iteritems():
self._client.sadd('home_' + k + '_logindates', *v)
def __enter__(self):
if self._client is None:
self._connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._pool is not None:
self._pool.disconnect()
class WriteToRedis(beam.PTransform):
def __init__(self, host='localhost', port=6379, dstype=None, field=None, password=None, batch_size=100):
self._host = host
self._port = port
self._field = field
self._dstype = dstype
self._password = password
self._batch_size = batch_size
def expand(self, pcoll):
logging.info("Write to redis...")
return pcoll \
| beam.ParDo(WriteRedisFn(self._host, self._port, self._field, self._dstype, self._password, self._batch_size))
class WriteRedisFn(beam.DoFn):
def __init__(self, host='localhost', port=6379, field=None, dstype=None, password=None, batch_size=100):
self._host = host
self._port = port
self._field = field
self._dstype = dstype
self._password = password
self._batch_size = batch_size
self._batch = {}
def finish_bundle(self):
self._flush()
def process(self, element):
self._batch[element['field']] = element['value']
if len(self._batch) >= self._batch_size:
self._flush()
def _flush(self):
if len(self._batch) == 0:
return
with RedisSink(self._host, self._port, self._password, self._field) as sink:
sink.write(self._dstype, self._batch)
self._batch = {}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment