Last active
April 3, 2023 15:05
-
-
Save mrakitin/0aca1499d96b54f3ffbbcbcb085e9a2d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import pprint | |
import uuid | |
import cv2 | |
import matplotlib.pyplot as plt | |
from bloptools.gp.utils import get_beam_stats | |
from bluesky_kafka import RemoteDispatcher | |
from bluesky_kafka.consume import BasicConsumer | |
from nslsii.kafka_utils import _read_bluesky_kafka_config_file | |
# plt.ion() | |
def print_kafka_messages(beamline_acronym): | |
# fig, ax = plt.subplots(1, 1) | |
print(f"Listening for Kafka messages for {beamline_acronym}") | |
# from databroker import Broker | |
# db = Broker.named(beamline_acronym) | |
def print_message(msg, *args, **kwargs): | |
if "optimization" in args[1]: | |
print( | |
f"{datetime.datetime.now().isoformat()}: {type(msg) = } {msg = }\n{args = }\n{kwargs = }" | |
) | |
video_stream_url = "http://10.68.57.34/mjpg/video.mjpg" | |
cap = cv2.VideoCapture(video_stream_url) | |
img = cap.read()[1].sum(axis=-1) | |
stats = get_beam_stats(img) | |
x_min, x_max, y_min, y_max, parability = stats | |
print(f"{msg = }") | |
print(f"{stats = }") | |
print(f"{img.shape = }") | |
# ax.cla() | |
# ax.imshow(img) | |
# fig.canvas.draw_idle() | |
return True | |
# this consumer should not be in a group with other consumers | |
# so generate a unique consumer group id for it | |
unique_group_id = f"echo-{beamline_acronym}-{str(uuid.uuid4())[:8]}" | |
kafka_config = _read_bluesky_kafka_config_file( | |
config_file_path="/etc/bluesky/kafka.yml" | |
) | |
# import _thread | |
# def consumer_func(): | |
consumer = BasicConsumer( | |
topics=[f"{beamline_acronym}.test"], | |
bootstrap_servers=kafka_config["bootstrap_servers"], | |
group_id=unique_group_id, | |
consumer_config=kafka_config["runengine_producer_config"], | |
process_message=print_message, | |
) | |
consumer.start_polling() | |
# _thread.start_new_thread(consumer_func, ()) | |
# consumer_func() | |
# plt.show() | |
if __name__ == "__main__": | |
import sys | |
print_kafka_messages(sys.argv[1]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import uuid | |
import numpy as np | |
from bluesky_kafka.produce import BasicProducer | |
from nslsii.kafka_utils import _read_bluesky_kafka_config_file | |
if __name__ == "__main__": | |
kafka_config = _read_bluesky_kafka_config_file("/etc/bluesky/kafka.yml") | |
basic_producer = BasicProducer( | |
topic="tes.test", | |
bootstrap_servers=kafka_config["bootstrap_servers"], | |
key=str(uuid.uuid4()), | |
producer_config=kafka_config["runengine_producer_config"], | |
) | |
basic_producer.produce("test") | |
data = np.array([1, 2, 3]) | |
basic_producer.produce(data) | |
def f(*args, **kwargs): | |
print(f"{args = }\n{kwargs = }") | |
if "obj" in kwargs: | |
kwargs["obj"] = kwargs["obj"].name | |
msg = { | |
"optimization": True, | |
"time": ttime.time(), | |
"motor_args": args, | |
"motor_kwargs": kwargs, | |
} | |
print(f"{msg = }") | |
basic_producer.produce(msg) | |
return msg | |
kbv.ush.subscribe(f) | |
kbv.dsh.subscribe(f) | |
kbh.ush.subscribe(f) | |
kbh.dsh.subscribe(f) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import pprint | |
import uuid | |
import nslsii | |
from bluesky_kafka import RemoteDispatcher | |
def print_kafka_messages(beamline_acronym): | |
def print_message(name, doc): | |
print( | |
f"{datetime.datetime.now().isoformat()} document: {name}\n" | |
f"contents: {pprint.pformat(doc)}\n" | |
) | |
kafka_config = nslsii._read_bluesky_kafka_config_file( | |
config_file_path="/etc/bluesky/kafka.yml" | |
) | |
# this consumer should not be in a group with other consumers | |
# so generate a unique consumer group id for it | |
unique_group_id = f"echo-{beamline_acronym}-{str(uuid.uuid4())[:8]}" | |
kafka_dispatcher = RemoteDispatcher( | |
topics=[f"{beamline_acronym}.bluesky.runengine.documents"], | |
bootstrap_servers=",".join(kafka_config["bootstrap_servers"]), | |
group_id=unique_group_id, | |
consumer_config=kafka_config["runengine_producer_config"], | |
) | |
kafka_dispatcher.subscribe(print_message) | |
kafka_dispatcher.start() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import pprint | |
import uuid | |
from bluesky_kafka import RemoteDispatcher | |
from nslsii.kafka_utils import _read_bluesky_kafka_config_file | |
def print_kafka_messages(beamline_acronym): | |
print(f"Listening for Kafka messages for {beamline_acronym}") | |
from databroker import Broker | |
db = Broker.named(beamline_acronym) | |
def print_message(name, doc): | |
if name == "stop": | |
print( | |
f"{datetime.datetime.now().isoformat()} document: {name}\n" | |
f"contents: {pprint.pformat(doc)}\n" | |
) | |
print(db[doc["run_start"]].table()) | |
kafka_config = _read_bluesky_kafka_config_file( | |
config_file_path="/etc/bluesky/kafka.yml" | |
) | |
# this consumer should not be in a group with other consumers | |
# so generate a unique consumer group id for it | |
unique_group_id = f"echo-{beamline_acronym}-{str(uuid.uuid4())[:8]}" | |
kafka_dispatcher = RemoteDispatcher( | |
topics=[f"{beamline_acronym}.bluesky.runengine.documents"], | |
bootstrap_servers=",".join(kafka_config["bootstrap_servers"]), | |
group_id=unique_group_id, | |
consumer_config=kafka_config["runengine_producer_config"], | |
) | |
kafka_dispatcher.subscribe(print_message) | |
kafka_dispatcher.start() | |
if __name__ == "__main__": | |
import sys | |
print_kafka_messages(sys.argv[1]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment