Created
March 7, 2023 01:01
-
-
Save davidselassie/c6e8a5cb7b21b3b6e255c7187e58605f to your computer and use it in GitHub Desktop.
Sample partitioned input from a time series DB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BTRDBUUIDInput(PartInput): | |
def __init__(start_ts_ns, end_ts_ns, batch_size_ns): | |
self.start_ts_ns = start_ts_ns | |
self.end_ts_ns = end_ts_ns | |
self.batch_size_ns = batch_size_ns | |
def list_parts(self): | |
# Each UUID is its own partition. The resume state is the time | |
# offset in the stream for that UUID. | |
conn = btrdb.connect(BTRDB_ENDPOINT) | |
rows = conn.query(SELECTOR_QUERY) | |
uuids = set() | |
for row in rows: | |
for col in SELECTOR_COLS: | |
uuids.add(str(row[col])) | |
# I'm not totally sure this is right here, but the goal is to | |
# get a flat set of all stream UUIDs to use in the | |
# builder. Bytewax will divvy them up. | |
# | |
# This idea will only work if the set of UUIDs isn't "huge" | |
# and is fixed across the execution of the entire dataflow, | |
# even across restarts. Which from your comment it sounds like | |
# it is. | |
return uuids | |
def build_part(self, for_uuid, resume_start_ts_ns): | |
conn = btrdb.connect(BTRDB_ENDPOINT) | |
stream = conn.stream_from_uuid(str_to_uuid(for_uuid)) | |
cursor_ts_ns = resume_start_ts_ns or self.start_ts_ns | |
while cursor_ts_ns < self.end_ts_ns: | |
batch_end_ts_ns = cursor_ts_ns + self.batch_size_ns | |
# This yields out entire data batches as the single item. | |
# Can map into a data frame later. Unless it's not | |
# pickleable, then do it here. | |
batch = stream.values(cursor_ts_ns, batch_end_ts_ns) | |
# Since we're storing the cursor position in time, this | |
# gives us correct rewind semantics. | |
yield batch_end_ts_ns, batch | |
cursor_ts_ns = batch_end_ts_ns | |
class BTRDBTimeInput(PartInput): | |
def __init__(start_ts_ns, end_ts_ns, batch_size_ns): | |
self.start_ts_ns = start_ts_ns | |
self.end_ts_ns = end_ts_ns | |
self.batch_size_ns = batch_size_ns | |
def list_parts(self): | |
# The beginning of each time batch its own partition. The | |
# resume state is the current UUID we're reading that time | |
# batch for. | |
parts = set() | |
cursor_ts_ns = self.start_ts_ns | |
while cursor_ts_ns < self.end_ts_ns: | |
parts.add(str(cursor_ts_ns)) | |
cursor_ts_ns += self.batch_size_ns | |
return parts | |
def build_part(self, cursor_ts_ns, resume_i): | |
cursor_ts_ns = int(cursor_ts_ns) | |
conn = btrdb.connect(BTRDB_ENDPOINT) | |
# This | |
rows = conn.query(SELECTOR_QUERY) | |
uuids = [] | |
for row in rows: | |
for col in SELECTOR_COLS: | |
uuids.append(row[col]) | |
uuids.sort() | |
for i, uuid in enumerate(uuids[resume_i:]): | |
stream = conn.stream_from_uuid(uuid) | |
batch_end_ts_ns = cursor_ts_ns + self.batch_size_ns | |
# This yields out entire data batches as the single item. | |
# Can map into a data frame later. Unless it's not | |
# pickleable, then do it here. | |
batch = stream.values(cursor_ts_ns, batch_end_ts_ns) | |
# Since we're storing how far we are in the sorted list of | |
# all the UUIDs, we can resume from that point. | |
yield i + 1, batch |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment