Skip to content

Instantly share code, notes, and snippets.

@mantasu
Last active June 18, 2023 00:21
Show Gist options
  • Save mantasu/474d35930dd827ae13eff10d05c39132 to your computer and use it in GitHub Desktop.
Save mantasu/474d35930dd827ae13eff10d05c39132 to your computer and use it in GitHub Desktop.
Animated plotting on a separate process
import abc
import time
import threading
import numpy as np
import multiprocessing
import matplotlib.pyplot as plt
from functools import partial
from matplotlib.axes import Axes
from matplotlib.figure import Figure
from matplotlib.artist import Artist
from typing import Any, Optional, List, Tuple
class MPPlotter(multiprocessing.Process, abc.ABC):
"""Multi-processor compatible plotter.
This class is the core part of a plotter that can plot and update
(animate) matplotlib plots on a separate process while taking the
data from another process. This is useful when the main process
generates the data but an animated plotting is required while the
data is being received. Since matplotlib cannot handle multiple
threads, this class provides a multi-process interface.
Note:
A child class must be implemented with 3 methods:
:meth:`init_plot` that initializes the plot (we want the plot
to be initialized on a separate process), :meth:`update_data`
that updates the incoming data, and :meth:`update_plot` that
updates the plot.
Args:
figpath (str, optional): The path to the file where the final
figure right after the animation finishes should be saved.
Should end with .png/.jpg etc. If not specified, no figure
is saved. Defaults to None.
fps (int, optional): Frames per second. I.e., the number of
times to render the plot per second. Adjusting this value
will have no effect when it takes more than a corresponding
fraction of the second to render the plot. However, if not
the full fraction is required to render, then, when fps is
increased, the plot animations will be smoother and, when
fps is decreased, system may work faster on by not
allocating too many resources for rendering. It is not
suggested to set it more than 24. Defaults to 20.
daemon (bool, optional): Whether to set the daemon flag. The
entire Python program exits when only daemon threads are
left. Defaults to True.
**kwargs: Additional keyword arguments to be passed to
:class:`multiprocessing.Process`.
"""
def __init__(
self,
figpath: Optional[str] = None,
fps: int = 20,
daemon: bool = True,
**kwargs
):
super().__init__(daemon=daemon, **kwargs)
# Assign attributes
self.figpath = figpath
self.interval = 1 / fps
self.terminated = threading.Event()
self.parent_pipe, self.child_pipe = multiprocessing.Pipe()
def feed(self, data: Optional[Any] = None):
"""Feeds the data from the another process.
This function feeds the data from the another process and passes
it to the process that will plot the animated plot.
Note:
When there is no more data, you must call ``feed()`` without
any arguments (or ``feed(data=None)``) to indicate the
process with the plot should terminate (though it will not
terminate until the user closes all figure windows).
Args:
data (Any, optional): The data to pass. Defaults to None.
"""
# Send the data through pipe
self.parent_pipe.send(data)
if data is None:
# Wait for the process to finish
self.parent_pipe.close()
self.join()
def _pipe_worker(self):
while True:
if not self.child_pipe.poll():
# If waiting for the input
continue
if self.terminated.is_set():
# If exited while piping
self.child_pipe.close()
break
# Get the data
data = self.child_pipe.recv()
if data is None:
# The pipe has finished
self.terminated.set()
break
else:
# Call the update method
self.update_data(data)
def run(self):
"""The overwritten Process method.
This is the method that starts running on a separate process
where the plot will be drawn. Note that this process will call
``init_figure()`` that should be implemented by the child class.
"""
# Init plot on *this* process
self.init_plot()
start_time = time.time()
# Launch new thread to deal with incoming data
t = threading.Thread(target=self._pipe_worker)
t.start()
# Start showing plot
plt.show(block=False)
plt.pause(0.1)
while True:
if self.terminated.is_set():
# If pipe finished
self.update_plot()
if self.figpath is not None:
# Save if path for figure provided
plt.savefig(self.figpath, dpi=300)
break
elif not plt.get_fignums():
# If closed, terminate
self.terminated.set()
break
elif time.time() - start_time > self.interval:
# Update falls within FPS
start_time = time.time()
self.update_plot()
while plt.get_fignums():
# Wait till closed
plt.pause(0.1)
# Wait
t.join()
@abc.abstractmethod
def init_plot(self):
pass
@abc.abstractmethod
def update_data(self, data: Optional[Any]):
pass
@abc.abstractmethod
def update_plot(self):
pass
class BlitManager():
"""Blit manager that doesn't redraw repetitive things.
This class updates the figure by not redrawing the elements of it
that were not changed, thus saving a lot of rendering time. For more
details about the implementation of it, visit
`Matplotlib tutorial <https://matplotlib.org/stable/tutorials/advanced/blitting.html#class-based-example>`_
"""
def __init__(self, canvas, animated_artists=()):
self.canvas, self._artists, self._bg = canvas, [], None
self.cid = canvas.mpl_connect("draw_event", self.on_draw)
for a in animated_artists:
self.add_artist(a)
def on_draw(self, event):
if event is not None and event.canvas != self.canvas:
raise RuntimeError
self._bg = self.canvas.copy_from_bbox(self.canvas.figure.bbox)
self._draw_animated()
def add_artist(self, art):
if art.figure != self.canvas.figure:
raise RuntimeError
art.set_animated(True)
self._artists.append(art)
def _draw_animated(self):
for a in self._artists:
self.canvas.figure.draw_artist(a)
def update(self):
if self._bg is None:
self.on_draw(None)
else:
self.canvas.restore_region(self._bg)
self._draw_animated()
self.canvas.blit(self.canvas.figure.bbox)
self.canvas.flush_events()
class _RobotDynamicsPlotter(MPPlotter):
"""Partial child class of :class:`.MPPlotter`.
This class inherits :class:`.MPPlotter` but only to prepare for an
actual child class :class:`.RobotDynamicsPlotter`. This class only
serves as a helper class with custom initialization methods, i.e.,
*figure* and *event_handler* initializers.
This basically sets up the overall figure on how it should look
and what should happen to it when some curves are hovered on.
Args:
title (str, optional): The title of the plot and the window
it is plotted on. Defaults to "Robot Dynamics".
figsize (Tuple[int, int], optional): The size of the whole
figure (width, height). Defaults to (10, 10).
max_timesteps (int, optional): The number of maximum
time-steps. If provided, the x axis will be set-up in
advance, leading curves to be growing in the x
direction. If not known, the plot will update itself
while never being blank in the x axis. Defaults to -1.
**kwargs: Additional keyword arguments taken by the parent class
:class:`.BlitManager`.
"""
DYNAMICS_LABELS = ["pos_diff", "rad_diff", "linear_vel", "angular_vel", "force_K", "torque_K"]
POSITION_COLORS = {'y': 'b', 'x': 'g', 'z': 'r', 'c': "y--"}
def __init__(
self,
title: str = "Robot Dynamics",
figsize: Tuple[int, int] = (10, 10),
max_timesteps: int = -1,
**kwargs
):
super().__init__(**kwargs)
# Pass attributes
self.title = title
self.figsize = figsize
self.max_timesteps = max_timesteps
def _update_annots(self, annot, circle, line, x, y):
# Update annotation
annot.xy = (x, y)
annot.set_text(f"{line.axes.get_ylabel()}: {y:.6f}\ntimestep: {x}")
# Update circle
yrange = line.axes.get_ylim()[1] - line.axes.get_ylim()[0]
circle.set_position((x, y - 0.03 * yrange))
circle.set_color(line.get_color())
def _hover(self, axs, annots, circles, lw, event):
if event.inaxes not in axs:
return
# Init current attrs as None, get default line width
curr_annot, curr_circle, curr_line = None, None, None
for line in event.inaxes.lines:
# Check if line is being hovered
cont, ind = line.contains(event)
if not cont:
continue
# Get x and y data from curve
xdata, ydata = line.get_data()
x, y = xdata[ind["ind"][0]], ydata[ind["ind"][0]]
# Get current axes index, update current circle & annot
index = axs.tolist().index(line.axes)
curr_annot, curr_circle = annots[index], circles[index]
self._update_annots(curr_annot, curr_circle, line, x, y)
# Set curr line, exit loop
curr_line = line
break
for annot in annots:
# Update visibility of every annotation
annot.set_visible(annot is curr_annot)
for circle in circles:
# Update visibility of every circle
circle.set_visible(circle is curr_circle)
for ax in axs:
for line in ax.get_lines():
# Update highlighting (line width) of every curve
line.set_linewidth(lw * 1.5 if line is curr_line else lw)
# Redraw new things
self.update_plot()
def init_figure(self) -> Tuple[Figure, List[Axes]]:
"""Initializes the figure.
Initializes the figure, its sublots (axes) and returns them. It
is a very unique method that depends on the data and the way it
should be presented.
Returns:
Tuple[Figure, List[Axes]]: A created figure and axes objects
after calling :meth:`plt.subplots`.
"""
# Create subplots (figure + axes)
fig, axs = plt.subplots(
nrows=len(self.DYNAMICS_LABELS),
ncols=1,
sharex=True,
figsize=self.figsize,
)
# Set plot and window titles
fig.suptitle(self.title)
fig.canvas.manager.set_window_title(self.title)
for i, ylabel in enumerate(self.DYNAMICS_LABELS):
# Set y axis label name
axs[i].set_ylabel(ylabel)
if self.max_timesteps > 0:
# Set maximum number of timesteps (x)
axs[i].set_xlim(0, self.max_timesteps)
for label, color in self.POSITION_COLORS.items():
# Init the animated line (curve) & append to list
axs[i].plot([], color, label=label, animated=True)
# Create the legend
axs[0].legend(
loc="upper center",
bbox_to_anchor=(0.5, 1.4),
ncol=len(self.POSITION_COLORS)
)
return fig, axs
def init_event_handler(self, fig: Figure, axs: List[Axes]) -> List[Artist]:
"""Initializes the event handler for mouse movement events.
This method creates some annotations that will be showed when
the mouse is moved through some subplot. Specifically, it makes
the hovered line and point to be highlighted and shows the
current x and y values in a box near the cursor.
Args:
fig (Figure): The figure to attach the mouse movement
handler to.
axs (List[Axes]): The list of axes objects that contain
lines which should be emphasized when hovered.
Returns:
List[Artist]: The list of artists created and to be passed
to :class:`.BlitManager` that will use them to check
which parts of the plot should be redrawn.
"""
# Int annotations and circles and get the default line width
annots, circles, lw = [], [], axs[0].get_lines()[0].get_linewidth()
for ax in axs:
# (x, y) annotation
annots.append(ax.annotate(
'',
xy=(0, 0),
xytext=(0, 30),
textcoords="offset points",
bbox=dict(boxstyle="round", fc='w', alpha=0.7),
arrowprops=dict(arrowstyle="->"),
zorder=3,
visible=False,
animated=True,
))
# Circle to show at the cursor position on the line
circles.append(ax.annotate(
'•',
xy=(0, 0),
size=30,
ha="center",
va="center",
zorder=3,
visible=False,
animated=True,
))
# Bind the motion notification to the hover handler
hover_handle = partial(self._hover, axs, annots, circles, lw)
fig.canvas.mpl_connect("motion_notify_event", hover_handle)
return annots + circles
class RobotDynamicsPlotter(_RobotDynamicsPlotter):
"""Multi-processor compatible robot dynamics plotter.
This class can plot and update (animate) matplotlib plot on a
separate process while taking the data from another process. This
is useful when the main process generates the data but an animated
plotting is required for that data. Since matplotlib cannot handle
multiple threads, this class provides a multi-process interface.
Args:
use_events (bool, optional): Whether to use event handlers for
mouse movements. If yes, this helps to emphasize and
annotate lines and points being hovered on. If no, less
resources will be used. Defaults to True.
title (str, optional): The title of the plot and the window
it is plotted on. Defaults to "Robot Dynamics".
figsize (Tuple[int, int], optional): The size of the whole
figure (width, height). Defaults to (10, 10).
max_timesteps (int, optional): The number of maximum
time-steps. If provided, the x axis will be set-up in
advance, leading curves to be growing in the x
direction. If not known, the plot will update itself
while never being blank in the x axis. Defaults to -1.
figpath (str, optional): The path to the file where the final
figure right after the animation finishes should be saved.
Should end with .png/.jpg etc. If not specified, no figure
is saved. Defaults to None.
fps (int, optional): Frames per second. I.e., the number of
times to render the plot per second. Adjusting this value
will have no effect when it takes more than a corresponding
fraction of the second to render the plot. However, if not
the full fraction is required to render, then, when fps is
increased, the plot animations will be smoother and, when
fps is decreased, system may work faster on by not
allocating too many resources for rendering. It is not
suggested to set it more than 24. Defaults to 20.
daemon (bool, optional): Whether to set the daemon flag. The
entire Python program exits when only daemon threads are
left. Defaults to True.
**kwargs: Additional keyword arguments to be passed to
:class:`multiprocessing.Process`.
"""
def __init__(
self,
use_events: bool = True,
title: str = "Robot Dynamics",
figsize: Tuple[int, int] = (10, 10),
max_timesteps: int = -1,
figpath: Optional[str] = None,
fps: int = 20,
daemon: bool = True,
**kwargs
):
super().__init__(
title=title,
figsize=figsize,
max_timesteps=max_timesteps,
figpath=figpath,
fps=fps,
daemon=daemon,
**kwargs
)
# Set some custom properties
self.use_events = use_events
self.prev_data_len = 0
self.data = []
# Pre-assign later-assigned attributes
self.bm: Optional[BlitManager] = None
self.fig: Optional[Figure] = None
self.axs: List[Axes] = []
def init_plot(self):
"""Initializes the plot.
This method initializes the plot and assigns ``self.fig``,
``self.axs`` and ``self.bm`` attributes used during plot
updates. This method will be called by the method defined by
the parent class, i.e., :meth:`run` which will, therefore,
initialize this plot on a separate process.
"""
# Init the figure and retrieve artists
self.fig, self.axs = self.init_figure()
artists = [ln for ax in self.axs for ln in ax.get_lines()]
if self.use_events:
# Add artists from the event handler
artists += self.init_event_handler(self.fig, self.axs)
# Blit manager for non-repetitive redrawing
self.bm = BlitManager(self.fig.canvas, artists)
def update_data(self, dynamics_vectors: Optional[np.ndarray]):
"""Handles the received data from another process.
This method receives a new data increment and attaches it to
``self.data``. It also keeps track of the previous data length
for more robust updates.
Args:
dynamics_vectors (Optional[np.ndarray]): The data increment.
"""
if dynamics_vectors is None:
return
# Update the overall data history
self.prev_data_len = len(self.data)
self.data.append(dynamics_vectors)
def update_plot(self):
"""Updates the plot with the data currently possessed.
Checks if the data is actually new. If it is not new, then the
curves are not redrawn, only blit manager is called to update
the plot at places where the events have occurred. If it is new,
the data is set for each curve.
"""
if len(self.data) == self.prev_data_len:
# Don't update plot with same curves
self.bm.update()
return
# Reshape the whole data history for convenience
data = np.moveaxis(np.stack(self.data), 0, -1)
for i, vector_histories in enumerate(data):
for j, point_history in enumerate(vector_histories):
# Update the data of the current line (xs and ys)
xs, ys = range(len(point_history)), point_history
self.axs[i].get_lines()[j].set_data(xs, ys)
# Rescale the view
self.axs[i].relim()
self.axs[i].autoscale_view()
self.bm.update()
import time
import json
import quaternion
import numpy as np
import multiprocessing as mp
import matplotlib.pyplot as plt
from typing import Any, List, Mapping
from mp_plotter import RobotDynamicsPlotter
LOG_PATH = "plan2control.py20230614200745.log"
def load_data(log_path: str = LOG_PATH) -> List[Mapping[str, Any]]:
"""Loads data from the log file.
Loads the data from the log file and converts to a list of
dictionary entries.
Args:
log_path (str, optional): The path ot the log file. Defaults to
LOG_PATH.
Returns:
List[Mapping[str, Any]]: A list of dictionary entries each
representing some specific robot dynamics information at
some timestep.
"""
return list(map(json.loads, open(log_path, 'r').readlines()))
def quatdiff4(quat_curr, quat_des):
# Compute the relative quaternion
rel_quat = quat_curr * quat_des.conj()
# Convert quaternion to axis-angle representation
axis_angle = quaternion.as_rotation_vector(rel_quat)
# Return axis-angle vector
return axis_angle
def compute_plot_vecs(entry: Mapping[str, Any]) -> np.ndarray:
"""Computes vectors to plot from dynamics dictionary.
Takes a robot dynamics dictionary at some time-step and converts it
to a plottable vector. I.e., each vector will be plotted on separate
subplots, each point in vector will be assigned to its own curve.
Args:
entry (Mapping[str, Any]): The robot dynamics data at some
timestep.
Returns:
np.ndarray: A converted vector of shape (6, 4) to be plotted on
6 separate subplots.
"""
plot_vecs = np.array([
np.array(entry["robot"]["tip_state"]["position"]) - np.array(entry["des_traj"]),
quatdiff4(
quaternion.from_float_array(entry["robot"]["tip_state"]["orientation"]),
quaternion.from_float_array(entry["des_rot"])
),
np.array(entry["robot"]["tip_state"]["linear_vel"]),
np.array(entry["robot"]["tip_state"]["angular_vel"]),
np.array(entry["robot"]["tip_state"]["force_K"]),
-np.array(entry["robot"]["tip_state"]["torque_K"]),
])
norm = np.linalg.norm(plot_vecs, axis=1)[:, None]
plot_vecs = np.concatenate([plot_vecs, norm], axis=1)
return plot_vecs
def main(duration = 10.0):
"""Starts the main process.
The main function - starts data generation on the current process
and plotting on a separate process.
Args:
duration (float, optional): The time desired to *artificially*
generate the data (in seconds). Defaults to 10.0.
"""
# Get the data, compute interval
data = load_data()
interval = duration / len(data)
# Initialize the plotter and start
plotter = RobotDynamicsPlotter(
use_events=True,
title="robot Dynamics",
figsize=(10, 10),
max_timesteps=1000,
figpath=None,
fps=60
)
plotter.start()
for data_i in data:
# Compute vectors to plot from data, feed them
plot_vecs = compute_plot_vecs(data_i)
plotter.feed(data=plot_vecs)
time.sleep(interval)
if not plotter.is_alive():
# Break if not active
break
# Finish the process
plotter.feed()
if __name__ == '__main__':
if plt.get_backend() == "MacOSX":
mp.set_start_method("forkserver")
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment