Last active
June 18, 2023 00:21
-
-
Save mantasu/474d35930dd827ae13eff10d05c39132 to your computer and use it in GitHub Desktop.
Animated plotting on a separate process
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import abc | |
import time | |
import threading | |
import numpy as np | |
import multiprocessing | |
import matplotlib.pyplot as plt | |
from functools import partial | |
from matplotlib.axes import Axes | |
from matplotlib.figure import Figure | |
from matplotlib.artist import Artist | |
from typing import Any, Optional, List, Tuple | |
class MPPlotter(multiprocessing.Process, abc.ABC): | |
"""Multi-processor compatible plotter. | |
This class is the core part of a plotter that can plot and update | |
(animate) matplotlib plots on a separate process while taking the | |
data from another process. This is useful when the main process | |
generates the data but an animated plotting is required while the | |
data is being received. Since matplotlib cannot handle multiple | |
threads, this class provides a multi-process interface. | |
Note: | |
A child class must be implemented with 3 methods: | |
:meth:`init_plot` that initializes the plot (we want the plot | |
to be initialized on a separate process), :meth:`update_data` | |
that updates the incoming data, and :meth:`update_plot` that | |
updates the plot. | |
Args: | |
figpath (str, optional): The path to the file where the final | |
figure right after the animation finishes should be saved. | |
Should end with .png/.jpg etc. If not specified, no figure | |
is saved. Defaults to None. | |
fps (int, optional): Frames per second. I.e., the number of | |
times to render the plot per second. Adjusting this value | |
will have no effect when it takes more than a corresponding | |
fraction of the second to render the plot. However, if not | |
the full fraction is required to render, then, when fps is | |
increased, the plot animations will be smoother and, when | |
fps is decreased, system may work faster on by not | |
allocating too many resources for rendering. It is not | |
suggested to set it more than 24. Defaults to 20. | |
daemon (bool, optional): Whether to set the daemon flag. The | |
entire Python program exits when only daemon threads are | |
left. Defaults to True. | |
**kwargs: Additional keyword arguments to be passed to | |
:class:`multiprocessing.Process`. | |
""" | |
def __init__( | |
self, | |
figpath: Optional[str] = None, | |
fps: int = 20, | |
daemon: bool = True, | |
**kwargs | |
): | |
super().__init__(daemon=daemon, **kwargs) | |
# Assign attributes | |
self.figpath = figpath | |
self.interval = 1 / fps | |
self.terminated = threading.Event() | |
self.parent_pipe, self.child_pipe = multiprocessing.Pipe() | |
def feed(self, data: Optional[Any] = None): | |
"""Feeds the data from the another process. | |
This function feeds the data from the another process and passes | |
it to the process that will plot the animated plot. | |
Note: | |
When there is no more data, you must call ``feed()`` without | |
any arguments (or ``feed(data=None)``) to indicate the | |
process with the plot should terminate (though it will not | |
terminate until the user closes all figure windows). | |
Args: | |
data (Any, optional): The data to pass. Defaults to None. | |
""" | |
# Send the data through pipe | |
self.parent_pipe.send(data) | |
if data is None: | |
# Wait for the process to finish | |
self.parent_pipe.close() | |
self.join() | |
def _pipe_worker(self): | |
while True: | |
if not self.child_pipe.poll(): | |
# If waiting for the input | |
continue | |
if self.terminated.is_set(): | |
# If exited while piping | |
self.child_pipe.close() | |
break | |
# Get the data | |
data = self.child_pipe.recv() | |
if data is None: | |
# The pipe has finished | |
self.terminated.set() | |
break | |
else: | |
# Call the update method | |
self.update_data(data) | |
def run(self): | |
"""The overwritten Process method. | |
This is the method that starts running on a separate process | |
where the plot will be drawn. Note that this process will call | |
``init_figure()`` that should be implemented by the child class. | |
""" | |
# Init plot on *this* process | |
self.init_plot() | |
start_time = time.time() | |
# Launch new thread to deal with incoming data | |
t = threading.Thread(target=self._pipe_worker) | |
t.start() | |
# Start showing plot | |
plt.show(block=False) | |
plt.pause(0.1) | |
while True: | |
if self.terminated.is_set(): | |
# If pipe finished | |
self.update_plot() | |
if self.figpath is not None: | |
# Save if path for figure provided | |
plt.savefig(self.figpath, dpi=300) | |
break | |
elif not plt.get_fignums(): | |
# If closed, terminate | |
self.terminated.set() | |
break | |
elif time.time() - start_time > self.interval: | |
# Update falls within FPS | |
start_time = time.time() | |
self.update_plot() | |
while plt.get_fignums(): | |
# Wait till closed | |
plt.pause(0.1) | |
# Wait | |
t.join() | |
@abc.abstractmethod | |
def init_plot(self): | |
pass | |
@abc.abstractmethod | |
def update_data(self, data: Optional[Any]): | |
pass | |
@abc.abstractmethod | |
def update_plot(self): | |
pass | |
class BlitManager(): | |
"""Blit manager that doesn't redraw repetitive things. | |
This class updates the figure by not redrawing the elements of it | |
that were not changed, thus saving a lot of rendering time. For more | |
details about the implementation of it, visit | |
`Matplotlib tutorial <https://matplotlib.org/stable/tutorials/advanced/blitting.html#class-based-example>`_ | |
""" | |
def __init__(self, canvas, animated_artists=()): | |
self.canvas, self._artists, self._bg = canvas, [], None | |
self.cid = canvas.mpl_connect("draw_event", self.on_draw) | |
for a in animated_artists: | |
self.add_artist(a) | |
def on_draw(self, event): | |
if event is not None and event.canvas != self.canvas: | |
raise RuntimeError | |
self._bg = self.canvas.copy_from_bbox(self.canvas.figure.bbox) | |
self._draw_animated() | |
def add_artist(self, art): | |
if art.figure != self.canvas.figure: | |
raise RuntimeError | |
art.set_animated(True) | |
self._artists.append(art) | |
def _draw_animated(self): | |
for a in self._artists: | |
self.canvas.figure.draw_artist(a) | |
def update(self): | |
if self._bg is None: | |
self.on_draw(None) | |
else: | |
self.canvas.restore_region(self._bg) | |
self._draw_animated() | |
self.canvas.blit(self.canvas.figure.bbox) | |
self.canvas.flush_events() | |
class _RobotDynamicsPlotter(MPPlotter): | |
"""Partial child class of :class:`.MPPlotter`. | |
This class inherits :class:`.MPPlotter` but only to prepare for an | |
actual child class :class:`.RobotDynamicsPlotter`. This class only | |
serves as a helper class with custom initialization methods, i.e., | |
*figure* and *event_handler* initializers. | |
This basically sets up the overall figure on how it should look | |
and what should happen to it when some curves are hovered on. | |
Args: | |
title (str, optional): The title of the plot and the window | |
it is plotted on. Defaults to "Robot Dynamics". | |
figsize (Tuple[int, int], optional): The size of the whole | |
figure (width, height). Defaults to (10, 10). | |
max_timesteps (int, optional): The number of maximum | |
time-steps. If provided, the x axis will be set-up in | |
advance, leading curves to be growing in the x | |
direction. If not known, the plot will update itself | |
while never being blank in the x axis. Defaults to -1. | |
**kwargs: Additional keyword arguments taken by the parent class | |
:class:`.BlitManager`. | |
""" | |
DYNAMICS_LABELS = ["pos_diff", "rad_diff", "linear_vel", "angular_vel", "force_K", "torque_K"] | |
POSITION_COLORS = {'y': 'b', 'x': 'g', 'z': 'r', 'c': "y--"} | |
def __init__( | |
self, | |
title: str = "Robot Dynamics", | |
figsize: Tuple[int, int] = (10, 10), | |
max_timesteps: int = -1, | |
**kwargs | |
): | |
super().__init__(**kwargs) | |
# Pass attributes | |
self.title = title | |
self.figsize = figsize | |
self.max_timesteps = max_timesteps | |
def _update_annots(self, annot, circle, line, x, y): | |
# Update annotation | |
annot.xy = (x, y) | |
annot.set_text(f"{line.axes.get_ylabel()}: {y:.6f}\ntimestep: {x}") | |
# Update circle | |
yrange = line.axes.get_ylim()[1] - line.axes.get_ylim()[0] | |
circle.set_position((x, y - 0.03 * yrange)) | |
circle.set_color(line.get_color()) | |
def _hover(self, axs, annots, circles, lw, event): | |
if event.inaxes not in axs: | |
return | |
# Init current attrs as None, get default line width | |
curr_annot, curr_circle, curr_line = None, None, None | |
for line in event.inaxes.lines: | |
# Check if line is being hovered | |
cont, ind = line.contains(event) | |
if not cont: | |
continue | |
# Get x and y data from curve | |
xdata, ydata = line.get_data() | |
x, y = xdata[ind["ind"][0]], ydata[ind["ind"][0]] | |
# Get current axes index, update current circle & annot | |
index = axs.tolist().index(line.axes) | |
curr_annot, curr_circle = annots[index], circles[index] | |
self._update_annots(curr_annot, curr_circle, line, x, y) | |
# Set curr line, exit loop | |
curr_line = line | |
break | |
for annot in annots: | |
# Update visibility of every annotation | |
annot.set_visible(annot is curr_annot) | |
for circle in circles: | |
# Update visibility of every circle | |
circle.set_visible(circle is curr_circle) | |
for ax in axs: | |
for line in ax.get_lines(): | |
# Update highlighting (line width) of every curve | |
line.set_linewidth(lw * 1.5 if line is curr_line else lw) | |
# Redraw new things | |
self.update_plot() | |
def init_figure(self) -> Tuple[Figure, List[Axes]]: | |
"""Initializes the figure. | |
Initializes the figure, its sublots (axes) and returns them. It | |
is a very unique method that depends on the data and the way it | |
should be presented. | |
Returns: | |
Tuple[Figure, List[Axes]]: A created figure and axes objects | |
after calling :meth:`plt.subplots`. | |
""" | |
# Create subplots (figure + axes) | |
fig, axs = plt.subplots( | |
nrows=len(self.DYNAMICS_LABELS), | |
ncols=1, | |
sharex=True, | |
figsize=self.figsize, | |
) | |
# Set plot and window titles | |
fig.suptitle(self.title) | |
fig.canvas.manager.set_window_title(self.title) | |
for i, ylabel in enumerate(self.DYNAMICS_LABELS): | |
# Set y axis label name | |
axs[i].set_ylabel(ylabel) | |
if self.max_timesteps > 0: | |
# Set maximum number of timesteps (x) | |
axs[i].set_xlim(0, self.max_timesteps) | |
for label, color in self.POSITION_COLORS.items(): | |
# Init the animated line (curve) & append to list | |
axs[i].plot([], color, label=label, animated=True) | |
# Create the legend | |
axs[0].legend( | |
loc="upper center", | |
bbox_to_anchor=(0.5, 1.4), | |
ncol=len(self.POSITION_COLORS) | |
) | |
return fig, axs | |
def init_event_handler(self, fig: Figure, axs: List[Axes]) -> List[Artist]: | |
"""Initializes the event handler for mouse movement events. | |
This method creates some annotations that will be showed when | |
the mouse is moved through some subplot. Specifically, it makes | |
the hovered line and point to be highlighted and shows the | |
current x and y values in a box near the cursor. | |
Args: | |
fig (Figure): The figure to attach the mouse movement | |
handler to. | |
axs (List[Axes]): The list of axes objects that contain | |
lines which should be emphasized when hovered. | |
Returns: | |
List[Artist]: The list of artists created and to be passed | |
to :class:`.BlitManager` that will use them to check | |
which parts of the plot should be redrawn. | |
""" | |
# Int annotations and circles and get the default line width | |
annots, circles, lw = [], [], axs[0].get_lines()[0].get_linewidth() | |
for ax in axs: | |
# (x, y) annotation | |
annots.append(ax.annotate( | |
'', | |
xy=(0, 0), | |
xytext=(0, 30), | |
textcoords="offset points", | |
bbox=dict(boxstyle="round", fc='w', alpha=0.7), | |
arrowprops=dict(arrowstyle="->"), | |
zorder=3, | |
visible=False, | |
animated=True, | |
)) | |
# Circle to show at the cursor position on the line | |
circles.append(ax.annotate( | |
'•', | |
xy=(0, 0), | |
size=30, | |
ha="center", | |
va="center", | |
zorder=3, | |
visible=False, | |
animated=True, | |
)) | |
# Bind the motion notification to the hover handler | |
hover_handle = partial(self._hover, axs, annots, circles, lw) | |
fig.canvas.mpl_connect("motion_notify_event", hover_handle) | |
return annots + circles | |
class RobotDynamicsPlotter(_RobotDynamicsPlotter): | |
"""Multi-processor compatible robot dynamics plotter. | |
This class can plot and update (animate) matplotlib plot on a | |
separate process while taking the data from another process. This | |
is useful when the main process generates the data but an animated | |
plotting is required for that data. Since matplotlib cannot handle | |
multiple threads, this class provides a multi-process interface. | |
Args: | |
use_events (bool, optional): Whether to use event handlers for | |
mouse movements. If yes, this helps to emphasize and | |
annotate lines and points being hovered on. If no, less | |
resources will be used. Defaults to True. | |
title (str, optional): The title of the plot and the window | |
it is plotted on. Defaults to "Robot Dynamics". | |
figsize (Tuple[int, int], optional): The size of the whole | |
figure (width, height). Defaults to (10, 10). | |
max_timesteps (int, optional): The number of maximum | |
time-steps. If provided, the x axis will be set-up in | |
advance, leading curves to be growing in the x | |
direction. If not known, the plot will update itself | |
while never being blank in the x axis. Defaults to -1. | |
figpath (str, optional): The path to the file where the final | |
figure right after the animation finishes should be saved. | |
Should end with .png/.jpg etc. If not specified, no figure | |
is saved. Defaults to None. | |
fps (int, optional): Frames per second. I.e., the number of | |
times to render the plot per second. Adjusting this value | |
will have no effect when it takes more than a corresponding | |
fraction of the second to render the plot. However, if not | |
the full fraction is required to render, then, when fps is | |
increased, the plot animations will be smoother and, when | |
fps is decreased, system may work faster on by not | |
allocating too many resources for rendering. It is not | |
suggested to set it more than 24. Defaults to 20. | |
daemon (bool, optional): Whether to set the daemon flag. The | |
entire Python program exits when only daemon threads are | |
left. Defaults to True. | |
**kwargs: Additional keyword arguments to be passed to | |
:class:`multiprocessing.Process`. | |
""" | |
def __init__( | |
self, | |
use_events: bool = True, | |
title: str = "Robot Dynamics", | |
figsize: Tuple[int, int] = (10, 10), | |
max_timesteps: int = -1, | |
figpath: Optional[str] = None, | |
fps: int = 20, | |
daemon: bool = True, | |
**kwargs | |
): | |
super().__init__( | |
title=title, | |
figsize=figsize, | |
max_timesteps=max_timesteps, | |
figpath=figpath, | |
fps=fps, | |
daemon=daemon, | |
**kwargs | |
) | |
# Set some custom properties | |
self.use_events = use_events | |
self.prev_data_len = 0 | |
self.data = [] | |
# Pre-assign later-assigned attributes | |
self.bm: Optional[BlitManager] = None | |
self.fig: Optional[Figure] = None | |
self.axs: List[Axes] = [] | |
def init_plot(self): | |
"""Initializes the plot. | |
This method initializes the plot and assigns ``self.fig``, | |
``self.axs`` and ``self.bm`` attributes used during plot | |
updates. This method will be called by the method defined by | |
the parent class, i.e., :meth:`run` which will, therefore, | |
initialize this plot on a separate process. | |
""" | |
# Init the figure and retrieve artists | |
self.fig, self.axs = self.init_figure() | |
artists = [ln for ax in self.axs for ln in ax.get_lines()] | |
if self.use_events: | |
# Add artists from the event handler | |
artists += self.init_event_handler(self.fig, self.axs) | |
# Blit manager for non-repetitive redrawing | |
self.bm = BlitManager(self.fig.canvas, artists) | |
def update_data(self, dynamics_vectors: Optional[np.ndarray]): | |
"""Handles the received data from another process. | |
This method receives a new data increment and attaches it to | |
``self.data``. It also keeps track of the previous data length | |
for more robust updates. | |
Args: | |
dynamics_vectors (Optional[np.ndarray]): The data increment. | |
""" | |
if dynamics_vectors is None: | |
return | |
# Update the overall data history | |
self.prev_data_len = len(self.data) | |
self.data.append(dynamics_vectors) | |
def update_plot(self): | |
"""Updates the plot with the data currently possessed. | |
Checks if the data is actually new. If it is not new, then the | |
curves are not redrawn, only blit manager is called to update | |
the plot at places where the events have occurred. If it is new, | |
the data is set for each curve. | |
""" | |
if len(self.data) == self.prev_data_len: | |
# Don't update plot with same curves | |
self.bm.update() | |
return | |
# Reshape the whole data history for convenience | |
data = np.moveaxis(np.stack(self.data), 0, -1) | |
for i, vector_histories in enumerate(data): | |
for j, point_history in enumerate(vector_histories): | |
# Update the data of the current line (xs and ys) | |
xs, ys = range(len(point_history)), point_history | |
self.axs[i].get_lines()[j].set_data(xs, ys) | |
# Rescale the view | |
self.axs[i].relim() | |
self.axs[i].autoscale_view() | |
self.bm.update() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import json | |
import quaternion | |
import numpy as np | |
import multiprocessing as mp | |
import matplotlib.pyplot as plt | |
from typing import Any, List, Mapping | |
from mp_plotter import RobotDynamicsPlotter | |
LOG_PATH = "plan2control.py20230614200745.log" | |
def load_data(log_path: str = LOG_PATH) -> List[Mapping[str, Any]]: | |
"""Loads data from the log file. | |
Loads the data from the log file and converts to a list of | |
dictionary entries. | |
Args: | |
log_path (str, optional): The path ot the log file. Defaults to | |
LOG_PATH. | |
Returns: | |
List[Mapping[str, Any]]: A list of dictionary entries each | |
representing some specific robot dynamics information at | |
some timestep. | |
""" | |
return list(map(json.loads, open(log_path, 'r').readlines())) | |
def quatdiff4(quat_curr, quat_des): | |
# Compute the relative quaternion | |
rel_quat = quat_curr * quat_des.conj() | |
# Convert quaternion to axis-angle representation | |
axis_angle = quaternion.as_rotation_vector(rel_quat) | |
# Return axis-angle vector | |
return axis_angle | |
def compute_plot_vecs(entry: Mapping[str, Any]) -> np.ndarray: | |
"""Computes vectors to plot from dynamics dictionary. | |
Takes a robot dynamics dictionary at some time-step and converts it | |
to a plottable vector. I.e., each vector will be plotted on separate | |
subplots, each point in vector will be assigned to its own curve. | |
Args: | |
entry (Mapping[str, Any]): The robot dynamics data at some | |
timestep. | |
Returns: | |
np.ndarray: A converted vector of shape (6, 4) to be plotted on | |
6 separate subplots. | |
""" | |
plot_vecs = np.array([ | |
np.array(entry["robot"]["tip_state"]["position"]) - np.array(entry["des_traj"]), | |
quatdiff4( | |
quaternion.from_float_array(entry["robot"]["tip_state"]["orientation"]), | |
quaternion.from_float_array(entry["des_rot"]) | |
), | |
np.array(entry["robot"]["tip_state"]["linear_vel"]), | |
np.array(entry["robot"]["tip_state"]["angular_vel"]), | |
np.array(entry["robot"]["tip_state"]["force_K"]), | |
-np.array(entry["robot"]["tip_state"]["torque_K"]), | |
]) | |
norm = np.linalg.norm(plot_vecs, axis=1)[:, None] | |
plot_vecs = np.concatenate([plot_vecs, norm], axis=1) | |
return plot_vecs | |
def main(duration = 10.0): | |
"""Starts the main process. | |
The main function - starts data generation on the current process | |
and plotting on a separate process. | |
Args: | |
duration (float, optional): The time desired to *artificially* | |
generate the data (in seconds). Defaults to 10.0. | |
""" | |
# Get the data, compute interval | |
data = load_data() | |
interval = duration / len(data) | |
# Initialize the plotter and start | |
plotter = RobotDynamicsPlotter( | |
use_events=True, | |
title="robot Dynamics", | |
figsize=(10, 10), | |
max_timesteps=1000, | |
figpath=None, | |
fps=60 | |
) | |
plotter.start() | |
for data_i in data: | |
# Compute vectors to plot from data, feed them | |
plot_vecs = compute_plot_vecs(data_i) | |
plotter.feed(data=plot_vecs) | |
time.sleep(interval) | |
if not plotter.is_alive(): | |
# Break if not active | |
break | |
# Finish the process | |
plotter.feed() | |
if __name__ == '__main__': | |
if plt.get_backend() == "MacOSX": | |
mp.set_start_method("forkserver") | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment