Last active
May 9, 2023 17:33
-
-
Save JEnoch/9c0690d6af83d94bbeeef8a26c38fa7a to your computer and use it in GitHub Desktop.
A Zenoh Python script that queries a time-series Storage (typically InfluxDB) for past publications and replay them
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# A Zenoh Python script that queries a time-series Storage (typically InfluxDB) | |
# for past publications and replay them | |
# | |
import sys | |
import time, datetime | |
import argparse | |
import json | |
import zenoh | |
from zenoh import config, QueryTarget | |
# --- Command line argument parsing --- --- --- --- --- --- | |
parser = argparse.ArgumentParser( | |
prog='z_replay', | |
description='zenoh replay example') | |
parser.add_argument('--mode', '-m', dest='mode', | |
choices=['peer', 'client'], | |
type=str, | |
help='The zenoh session mode.') | |
parser.add_argument('--connect', '-e', dest='connect', | |
metavar='ENDPOINT', | |
action='append', | |
type=str, | |
help='Endpoints to connect to.') | |
parser.add_argument('--listen', '-l', dest='listen', | |
metavar='ENDPOINT', | |
action='append', | |
type=str, | |
help='Endpoints to listen on.') | |
parser.add_argument('--selector', '-s', dest='selector', | |
default='demo/example/**', | |
type=str, | |
help='The selection of resources to query.') | |
parser.add_argument('--target', '-t', dest='target', | |
choices=['ALL', 'BEST_MATCHING', 'ALL_COMPLETE', 'NONE'], | |
default='BEST_MATCHING', | |
type=str, | |
help='The target queryables of the query.') | |
parser.add_argument('--config', '-c', dest='config', | |
metavar='FILE', | |
type=str, | |
help='A configuration file.') | |
parser.add_argument('--time-filter', dest='time_filter', | |
default='[now(-5m)..]', | |
type=str, | |
help='A time filter used for query (e.g.: "[now(-1h)..]")') | |
parser.add_argument('--replay-prefix', dest='replay_prefix', | |
default='replay/', | |
type=str, | |
help='A prefix added to each key expression when re-publishing') | |
parser.add_argument('--time-scale', dest='time_scale', | |
default=1.0, | |
type=float, | |
help='The time scale (i.e. multiplier of time interval between each re-publication)') | |
args = parser.parse_args() | |
conf = zenoh.Config.from_file( | |
args.config) if args.config is not None else zenoh.Config() | |
if args.mode is not None: | |
conf.insert_json5(zenoh.config.MODE_KEY, json.dumps(args.mode)) | |
if args.connect is not None: | |
conf.insert_json5(zenoh.config.CONNECT_KEY, json.dumps(args.connect)) | |
if args.listen is not None: | |
conf.insert_json5(zenoh.config.LISTEN_KEY, json.dumps(args.listen)) | |
selector = args.selector | |
target = { | |
'ALL': QueryTarget.ALL(), | |
'BEST_MATCHING': QueryTarget.BEST_MATCHING(), | |
'ALL_COMPLETE': QueryTarget.ALL_COMPLETE(), | |
}.get(args.target) | |
# Zenoh code --- --- --- --- --- --- --- --- --- --- --- | |
# initiate logging | |
zenoh.init_logger() | |
print("Opening session...") | |
session = zenoh.open(conf) | |
# Get past publications from a storage with the selector and the time filter | |
query = selector + "?_time=" + args.time_filter | |
print("Query on '{}'...".format(query)) | |
replies = session.get(query, zenoh.Queue(), target=target) | |
# Sort replies by timestamp | |
sorted_replies = sorted(replies.receiver, key=lambda reply: reply.ok.timestamp) | |
# If no reply, exit | |
if sorted_replies.__len__() == 0: | |
print("No publications found - nothing to replay.") | |
exit(0) | |
# Get first and last timestamps | |
first_ts = sorted_replies[0].ok.timestamp.seconds_since_unix_epoch | |
last_ts = sorted_replies[-1].ok.timestamp.seconds_since_unix_epoch | |
print("Replay {} publications made between {} and {} ".format( | |
sorted_replies.__len__(), | |
datetime.datetime.utcfromtimestamp(first_ts), | |
datetime.datetime.utcfromtimestamp(last_ts))) | |
print(" Initial duration: {:.1f} seconds => with time-scale={}, new duration: {:.1f} seconds".format( | |
last_ts - first_ts, | |
args.time_scale, | |
(last_ts - first_ts) * args.time_scale)) | |
# Iterate throught the replies, re-publishing each and sleeping between each | |
previous_ts = first_ts | |
for reply in sorted_replies: | |
# sleep the interval between this reply and the previous reply, multiplied by time_scale | |
current_ts = reply.ok.timestamp.seconds_since_unix_epoch | |
time.sleep((current_ts - previous_ts) * args.time_scale) | |
# re-publish, adding the replay_prefix to the key expression | |
replay_ke = args.replay_prefix + reply.ok.key_expr.__str__() | |
print(" - [{}] Replay publication from '{}' to '{}'".format( | |
datetime.datetime.utcfromtimestamp(current_ts), | |
reply.ok.key_expr, | |
replay_ke)) | |
session.put(replay_ke, reply.ok.payload, encoding=reply.ok.encoding) | |
# update previous_ts | |
previous_ts = current_ts | |
session.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment