Skip to content

Instantly share code, notes, and snippets.

@davidselassie
Created March 7, 2023 01:01
Show Gist options
  • Save davidselassie/c6e8a5cb7b21b3b6e255c7187e58605f to your computer and use it in GitHub Desktop.
Save davidselassie/c6e8a5cb7b21b3b6e255c7187e58605f to your computer and use it in GitHub Desktop.
Sample partitioned input from a time series DB
class BTRDBUUIDInput(PartInput):
def __init__(start_ts_ns, end_ts_ns, batch_size_ns):
self.start_ts_ns = start_ts_ns
self.end_ts_ns = end_ts_ns
self.batch_size_ns = batch_size_ns
def list_parts(self):
# Each UUID is its own partition. The resume state is the time
# offset in the stream for that UUID.
conn = btrdb.connect(BTRDB_ENDPOINT)
rows = conn.query(SELECTOR_QUERY)
uuids = set()
for row in rows:
for col in SELECTOR_COLS:
uuids.add(str(row[col]))
# I'm not totally sure this is right here, but the goal is to
# get a flat set of all stream UUIDs to use in the
# builder. Bytewax will divvy them up.
#
# This idea will only work if the set of UUIDs isn't "huge"
# and is fixed across the execution of the entire dataflow,
# even across restarts. Which from your comment it sounds like
# it is.
return uuids
def build_part(self, for_uuid, resume_start_ts_ns):
conn = btrdb.connect(BTRDB_ENDPOINT)
stream = conn.stream_from_uuid(str_to_uuid(for_uuid))
cursor_ts_ns = resume_start_ts_ns or self.start_ts_ns
while cursor_ts_ns < self.end_ts_ns:
batch_end_ts_ns = cursor_ts_ns + self.batch_size_ns
# This yields out entire data batches as the single item.
# Can map into a data frame later. Unless it's not
# pickleable, then do it here.
batch = stream.values(cursor_ts_ns, batch_end_ts_ns)
# Since we're storing the cursor position in time, this
# gives us correct rewind semantics.
yield batch_end_ts_ns, batch
cursor_ts_ns = batch_end_ts_ns
class BTRDBTimeInput(PartInput):
def __init__(start_ts_ns, end_ts_ns, batch_size_ns):
self.start_ts_ns = start_ts_ns
self.end_ts_ns = end_ts_ns
self.batch_size_ns = batch_size_ns
def list_parts(self):
# The beginning of each time batch its own partition. The
# resume state is the current UUID we're reading that time
# batch for.
parts = set()
cursor_ts_ns = self.start_ts_ns
while cursor_ts_ns < self.end_ts_ns:
parts.add(str(cursor_ts_ns))
cursor_ts_ns += self.batch_size_ns
return parts
def build_part(self, cursor_ts_ns, resume_i):
cursor_ts_ns = int(cursor_ts_ns)
conn = btrdb.connect(BTRDB_ENDPOINT)
# This
rows = conn.query(SELECTOR_QUERY)
uuids = []
for row in rows:
for col in SELECTOR_COLS:
uuids.append(row[col])
uuids.sort()
for i, uuid in enumerate(uuids[resume_i:]):
stream = conn.stream_from_uuid(uuid)
batch_end_ts_ns = cursor_ts_ns + self.batch_size_ns
# This yields out entire data batches as the single item.
# Can map into a data frame later. Unless it's not
# pickleable, then do it here.
batch = stream.values(cursor_ts_ns, batch_end_ts_ns)
# Since we're storing how far we are in the sorted list of
# all the UUIDs, we can resume from that point.
yield i + 1, batch
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment