Skip to content

Instantly share code, notes, and snippets.

@romanroibu
Last active May 28, 2020 11:35
Show Gist options
  • Save romanroibu/960c0f8b30d2b42177291a4339eece4d to your computer and use it in GitHub Desktop.
Save romanroibu/960c0f8b30d2b42177291a4339eece4d to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"import collections\n",
"import sys\n",
"import time\n",
"\n",
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"\n",
"from IPython.display import display, clear_output\n",
"\n",
"def update(*args):\n",
" clear_output(wait=True)\n",
" display(*args)"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [],
"source": [
"sys.path.append(\"/Users/rom/work/pupil/pupil_src/shared_modules/\")"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"import file_methods as fm\n",
"import zmq_tools"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Load recording"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Visualize calibration timestamps"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"def load_recording_messages(rec_dir):\n",
" N = fm.load_pldata_file(rec_dir, \"notify\")\n",
" P = fm.load_pldata_file(rec_dir, \"pupil\")\n",
"\n",
" messages = []\n",
" messages.extend(N.data)\n",
" messages.extend(P.data)\n",
"\n",
" # Filter 3d data\n",
" def is_not_pupil(msg):\n",
" return not msg[\"topic\"].startswith(\"pupil\")\n",
"\n",
" def is_pupil_3d(msg):\n",
"\n",
" return \"3d\" in msg[\"method\"]\n",
"\n",
"\n",
" messages = filter(lambda m: is_not_pupil(m) or is_pupil_3d(m), messages)\n",
"\n",
" # Filter calibration data\n",
" start, stop = N.timestamps[[0, -1]]\n",
" messages = filter(lambda m: start <= m[\"timestamp\"] <= stop, messages)\n",
"\n",
" # Sort by time\n",
" messages = sorted(messages, key=lambda m: m[\"timestamp\"])\n",
"\n",
" return messages"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"messages = load_recording_messages(\"hmd_gazer_4\")"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"def visualize_calibration_timestamps(messages):\n",
" ts = [m[\"timestamp\"] for m in messages]\n",
" topics_all = [m[\"topic\"] for m in messages]\n",
" topics = sorted(set(topics_all), reverse=True)\n",
" topic_idc = [topics.index(t) for t in topics_all]\n",
"\n",
" plt.scatter(ts, topic_idc)\n",
" plt.yticks(range(len(topics)), topics)\n",
" pass"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {
"scrolled": false
},
"outputs": [
{
"data": {
"image/png": "\n",
"text/plain": [
"<Figure size 432x288 with 1 Axes>"
]
},
"metadata": {
"needs_background": "light"
},
"output_type": "display_data"
}
],
"source": [
"visualize_calibration_timestamps(messages)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Replay recording"
]
},
{
"cell_type": "code",
"execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
"def enumerate_notifications(messages):\n",
" replay_speed = 2 # increasing too much might drop packages\n",
" first_ts = messages[0][\"timestamp\"]\n",
" last_ts = first_ts\n",
"\n",
" for idx, msg in enumerate(messages):\n",
" msg = msg._deep_copy_dict()\n",
" new_ts = msg[\"timestamp\"]\n",
"\n",
" yield idx, msg, new_ts\n",
"\n",
" time_to_sleep = max(0.0, new_ts - last_ts)\n",
" time_to_sleep /= replay_speed\n",
" time.sleep(time_to_sleep)\n",
" last_ts = new_ts"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"class NetworkInterface:\n",
" def __init__(self, req_port: int = 50020):\n",
" zmq = zmq_tools.zmq\n",
" addr = \"tcp://localhost:{}\"\n",
"\n",
" self.ctx = zmq.Context.instance()\n",
"\n",
" self.req = self.ctx.socket(zmq.REQ)\n",
" self.req.connect(addr.format(req_port))\n",
"\n",
" self.req.send_string(\"SUB_PORT\")\n",
" sub_port = self.req.recv_string()\n",
"\n",
" self.req.send_string(\"PUB_PORT\")\n",
" pub_port = self.req.recv_string()\n",
"\n",
" self.pub_msg_streamer = zmq_tools.Msg_Streamer(\n",
" self.ctx,\n",
" addr.format(pub_port),\n",
" )\n",
"\n",
" self.log_msg_receiver = zmq_tools.Msg_Receiver(\n",
" self.ctx,\n",
" addr.format(sub_port),\n",
" topics=(\"logging.\",),\n",
" )\n",
"\n",
" self.calibration_msg_receiver = zmq_tools.Msg_Receiver(\n",
" self.ctx,\n",
" addr.format(sub_port),\n",
" topics=(\"notify.calibration.\",)\n",
" )\n",
"\n",
" def sync_time(self, timestamp):\n",
" self.req.send_string(f\"T {timestamp}\")\n",
" response_string = self.req.recv_string()\n",
" return response_string\n",
"\n",
" def send_start_plugin_notification(self, plugin_name: str):\n",
" self.pub_msg_streamer.send(\n",
" {\n",
" \"topic\": \"notify.start_plugin\",\n",
" \"subject\": \"start_plugin\",\n",
" \"name\": \"HMD3DChoreographyPlugin\",\n",
" }\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Service v1.23"
]
},
{
"cell_type": "code",
"execution_count": 10,
"metadata": {},
"outputs": [],
"source": [
"# replay_speed = 2 # increasing too much might drop packages\n",
"\n",
"# pub = zmq_tools.Msg_Streamer(ctx, addr.format(pub_port))\n",
"# logs = zmq_tools.Msg_Receiver(\n",
"# ctx,\n",
"# addr.format(sub_port),\n",
"# topics=(\"logging.\", \"notify.calibration.successful\"))\n",
"\n",
"# try:\n",
"# first_ts = messages[0][\"timestamp\"]\n",
" \n",
"# pub.send(\n",
"# {\n",
"# \"topic\": \"notify.start_plugin\",\n",
"# \"subject\": \"start_plugin\",\n",
"# \"name\": \"HMD_Calibration_3D\",\n",
"# }\n",
"# )\n",
" \n",
"# req.send_string(f\"T {first_ts}\")\n",
"# print(req.recv_string())\n",
"\n",
"# last_ts = first_ts\n",
"# for idx, msg in enumerate(messages):\n",
"# msg = msg._deep_copy_dict()\n",
"# new_ts = msg[\"timestamp\"]\n",
"# time_to_sleep = max(0.0, new_ts - last_ts)\n",
"# time_to_sleep /= replay_speed\n",
"# time.sleep(time_to_sleep)\n",
"# last_ts = new_ts\n",
"\n",
"# pub.send(msg)\n",
" \n",
"# if idx % 100 == 0:\n",
"# update(f\"({idx}) Sending {msg['topic']}\")\n",
"\n",
"# update(f\"(Finished) Sending {msg['topic']}\")\n",
" \n",
"# except KeyboardInterrupt:\n",
"# del pub\n",
"\n",
"# # Flush all recent logs\n",
"# while logs.new_data:\n",
"# topic, payload = logs.recv()\n",
"# print(f'[{payload[\"levelname\"]}] {payload[\"name\"]} - {payload[\"msg\"]}')\n",
"\n",
"# try:\n",
"# while True:\n",
"# topic, payload = logs.recv()\n",
"# if topic == \"notify.calibration.successful\":\n",
"# print(\"Calibration successful.\")\n",
"# break\n",
"# print(f'[{payload[\"levelname\"]}] {payload[\"name\"]} - {payload[\"msg\"]}')\n",
"# except KeyboardInterrupt:\n",
"# pass\n",
"\n",
"# del logs"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Service 2.0"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [],
"source": [
"def send_notifications(net_api, choreo_plugin_name, messages):\n",
"\n",
" def send_message(msg):\n",
" net_api.pub_msg_streamer.send(msg)\n",
"\n",
" def expect_recv(expected_topics: tuple):\n",
" if not isinstance(expected_topics, tuple):\n",
" expected_topics = (expected_topics,)\n",
" actual_topic, payload = net_api.calibration_msg_receiver.recv()\n",
" assert actual_topic in expected_topics, f'Expected message with topics \"{expected_topics}\", but got \"{actual_topic}\"'\n",
" return actual_topic\n",
"\n",
" net_api.send_start_plugin_notification(choreo_plugin_name)\n",
"\n",
" if len(messages) == 0:\n",
" return\n",
"\n",
" for idx, msg, ts in enumerate_notifications(messages):\n",
" if idx == 0:\n",
" response_string = net_api.sync_time(ts)\n",
" print(f\"(i) {response_string}\")\n",
"\n",
" send_message(msg)\n",
"\n",
" topic = msg[\"topic\"]\n",
" if topic.startswith(\"pupil.\"):\n",
" pass\n",
" elif topic.startswith(\"notify.\"):\n",
" expect_recv(topic) #echo\n",
" print(topic)\n",
"\n",
" if topic == \"notify.calibration.should_start\":\n",
" recv_topic = expect_recv(\"notify.calibration.started\")\n",
" print(recv_topic)\n",
"\n",
" elif topic == \"notify.calibration.add_ref_data\":\n",
" pass\n",
"\n",
" elif topic == \"notify.calibration.should_stop\":\n",
" recv_topic = expect_recv(\"notify.calibration.stopped\")\n",
" print(recv_topic)\n",
"\n",
" recv_topic = expect_recv(\"notify.calibration.setup.v2\")\n",
" print(recv_topic)\n",
"\n",
" recv_topic = expect_recv((\"notify.calibration.successful\", \"notify.calibration.failed\"))\n",
" print(recv_topic)\n",
"\n",
" if recv_topic == \"notify.calibration.successful\":\n",
" expect_recv(\"notify.calibration.result.v2\")\n",
" print(recv_topic)\n",
" else:\n",
" print(f\"UNEXPECTED: {topic}\")\n",
" else:\n",
" print(f\"UNEXPECTED: {topic}\")\n",
"\n",
" print(\"(i) Finished sending messages.\")"
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"(i) Timesync successful.\n",
"notify.calibration.should_start\n",
"notify.calibration.started\n",
"notify.calibration.add_ref_data\n",
"notify.calibration.add_ref_data\n",
"notify.calibration.add_ref_data\n",
"notify.calibration.add_ref_data\n",
"notify.calibration.add_ref_data\n",
"notify.calibration.add_ref_data\n",
"notify.calibration.add_ref_data\n",
"notify.calibration.add_ref_data\n",
"notify.calibration.add_ref_data\n",
"notify.calibration.add_ref_data\n",
"notify.calibration.add_ref_data\n",
"notify.calibration.add_ref_data\n",
"notify.calibration.add_ref_data\n",
"notify.calibration.add_ref_data\n",
"notify.calibration.add_ref_data\n",
"notify.calibration.add_ref_data\n",
"notify.calibration.add_ref_data\n",
"notify.calibration.add_ref_data\n",
"notify.calibration.should_stop\n",
"notify.calibration.stopped\n",
"notify.calibration.setup.v2\n",
"notify.calibration.successful\n",
"notify.calibration.successful\n",
"(i) Finished sending messages.\n"
]
}
],
"source": [
"send_notifications(\n",
" net_api=NetworkInterface(),\n",
" choreo_plugin_name=\"HMD3DChoreographyPlugin\",\n",
"# choreo_plugin_name=\"CustomHMD3DChoreographyPlugin\",\n",
" messages=messages,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.10"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment