Skip to content

Instantly share code, notes, and snippets.

@philsch
Last active February 1, 2019 15:40
Show Gist options
  • Save philsch/0f1946dd080a34bc5e365c27f924f365 to your computer and use it in GitHub Desktop.
Save philsch/0f1946dd080a34bc5e365c27f924f365 to your computer and use it in GitHub Desktop.
Blogpost: How to update row keys in Google Big Table (CellTransformDoFn)
class CellTransformDoFn(beam.DoFn):
def __init__(self):
super(CellTransformDoFn, self).__init__()
self.total_counter = Metrics.counter(self.__class__, 'total')
def map_hash_to_new_prefix(self, old_hash):
"""
Our new rowkey should have a prefix 0 - 3, as 4 is our
expected number of nodes in the BigTable cluster
:param old_hash:
:return:
"""
hash_first_char = old_hash[0].lower()
if hash_first_char == 'a':
return 2
if hash_first_char == 'b':
return 3
if hash_first_char == 'c':
return 0
if hash_first_char == 'd':
return 1
if hash_first_char == 'e':
return 2
if hash_first_char == 'f':
return 3
return int(hash_first_char) % 4
def create_row_key(self, cell):
"""
Transform existing row key to [0-3]_YYYYmmDD_HHMMSS_ff_<hash>
:param cell:
:return:
"""
old_row_key_parts = cell.get('key').split('_')
return "{}_{}_{}_{}_{}".format(
self.map_hash_to_new_prefix(old_row_key_parts[0]), # 0-4 prefix
old_row_key_parts[1], # date
old_row_key_parts[2], # time
old_row_key_parts[3], # ms
old_row_key_parts[0] # random hash
)
def process(self, element, *args, **kwargs):
element['key'] = self.create_row_key(element)
self.total_counter.inc()
yield element
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment