Skip to content

Instantly share code, notes, and snippets.

@EricCousineau-TRI
Created January 18, 2019 17:18
Show Gist options
  • Select an option

  • Save EricCousineau-TRI/403b9608f2f5b9d930ddee77d908c76c to your computer and use it in GitHub Desktop.

Select an option

Save EricCousineau-TRI/403b9608f2f5b9d930ddee77d908c76c to your computer and use it in GitHub Desktop.
Snippets for Jupyter + Director Teleop with pydrake
import numpy as np
from director import affordancemanager
from director import lcmUtils
from director import objectmodel as om
from director.thirdparty import transformations
from my_lcm_types import object_detection_t, object_detection_array_t
def quat_to_rot(q):
# Director does not have this conversion...
return transformations.quaternion_matrix(q)[:3, :3]
def rot_to_quat(R):
X = np.eye(4)
X[:3, :3] = R
q_wxyz = transformations.quaternion_from_matrix(X, isprecise=True)
return q_wxyz
# See `director.affordanceitems` for more types.
DEFAULT_DESC = dict(
classname='BoxAffordanceItem',
# (xyz, q_wxyz)
pose=((0, 0, 0), (1, 0, 0, 0)),
# `grasp_frame` for WSG.
Dimensions=[0.03, 0.146, 0.116],
)
# TODO(eric.cousineau): Consider using `affordancecollection` to manage this
# stuff, if it simplifies things.
class LcmMarkerServer(object):
"""Handles marker commands and publishes status."""
def __init__(
self, view, period_sec=0.1,
pub_channel="MARKER_STATUS", sub_channel="MARKER_COMMAND"):
self._manager = affordancemanager.AffordanceObjectModelManager(view)
self._obj_list = []
self._pub_channel = pub_channel
self._sub = lcmUtils.addSubscriber(
sub_channel, object_detection_array_t, self._sub_callback)
self._sub.setNotifyAllMessagesEnabled(True)
def add(self, desc):
obj = self._manager.newAffordanceFromDescription(desc)
obj.setProperty("Alpha", 0.3)
frame = obj.getChildFrame()
frame.setProperty("Edit", True)
assert frame.onTransformModifiedCallback is None
frame.onTransformModifiedCallback = self._frame_callback
self._obj_list.append(obj)
return obj
def reset(self):
print("Reset markers")
for obj in self._obj_list:
self._manager.removeAffordance(obj)
self._obj_list = []
om.removeFromObjectModel(om.getOrCreateContainer("affordances"))
def _frame_callback(self, _):
# Publish all when any frame is moved.
msg = object_detection_array_t()
for obj in self._obj_list:
obj_msg = object_detection_t()
obj_msg.header.frame_name = "world"
obj_msg.model_name = obj.getProperty("Name")
xyz, q_wxyz = obj.getPose()
obj_msg.translation[:] = xyz
R = np.array(quat_to_rot(q_wxyz), order='F')
obj_msg.rotation[:] = R.flat
msg.detections.append(obj_msg)
msg.num_detections = len(msg.detections)
lcmUtils.publish(self._pub_channel, msg)
def _is_mode(self, msg, mode):
if len(msg.detections) != 1:
return False
return msg.detections[0].header.frame_name == mode
def _sub_callback(self, msg):
if self._is_mode(msg, "RESET"):
self.reset()
return
for obj_msg in msg.detections:
for obj in self._obj_list:
if obj.getProperty("Name") == obj_msg.model_name:
break
else:
print("New marker: {}".format(obj_msg.model_name))
desc = dict(DEFAULT_DESC)
desc.update(Name=obj_msg.model_name)
obj = self.add(desc)
xyz = obj_msg.translation
R = np.array(obj_msg.rotation, order='F').reshape((3, 3))
q_wxyz = rot_to_quat(R)
obj.repositionFromDescription(dict(pose=(xyz, q_wxyz)))
print("Update marker: {}".format(obj_msg.model_name))
if __name__ == "__main__":
markers = LcmMarkerServer(view)
"""
Jupyter widgets to interface with `pydrake` stuff.
"""
from contextlib import contextmanager
from functools import partial
from IPython.display import clear_output, display
from ipywidgets.widgets.interaction import show_inline_matplotlib_plots
import ipywidgets as widgets
import numpy as np
from pydrake.multibody.tree import JointIndex
from pydrake.solvers.mathematicalprogram import SolutionResult
from pydrake.multibody.inverse_kinematics import InverseKinematics
from pydrake.math import RigidTransform, RotationMatrix
from my_lcm_types import object_detection_t, object_detection_array_t
from my_stuff.lcm_util import LcmMessagePoll # Just gets latest message
# Adapted from `ipywidets.widgets.interaction`
@contextmanager
def interactive_update(out):
with out:
clear_output(wait=True)
yield
show_inline_matplotlib_plots()
class PlantJointSlider(object):
"""
Example:
slider = PlantJointSlider(plant, update, q=q)
display(slider.widget)
"""
def __init__(
self, plant, update_callback, q=None, continuous_update=False,
num_steps=100):
self._update_callback = update_callback
nq = plant.num_positions()
if q is None:
q = np.zeros(nq)
self._q = np.array(q)
assert self._q.shape == (nq,)
joints = []
for i in range(plant.num_joints()):
joint = plant.get_joint(JointIndex(i))
if joint.num_positions() == 0:
continue
assert joint.num_positions() == 1
joints.append(joint)
joints = sorted(joints, key=lambda x: x.position_start())
assert len(joints) == nq
def joint_callback(i, value):
self._q[i] = value["new"]
self._update()
# Setup widgets
joint_widgets = []
rows = []
for i in range(len(q)):
joint = joints[i]
step = (joint.upper_limits() - joint.lower_limits()) / num_steps
joint_widget = widgets.FloatSlider(
value=self._q[i],
min=joint.lower_limits(),
max=joint.upper_limits(),
step=step,
continuous_update=continuous_update,
)
joint_widget.observe(partial(joint_callback, i), "value")
joint_widgets.append(joint_widget)
name = (
plant.GetModelInstanceName(joint.model_instance()) + "::" +
joint.name())
label = widgets.Label(name, layout={"width": "300px"})
rows.append(widgets.HBox([label, joint_widget]))
self._joint_widgets = joint_widgets
self.widget = widgets.VBox(
rows,
layout=widgets.Layout(min_width="800px"),
)
self._update()
def _update(self):
self._update_callback(self.q)
@property
def q(self):
return np.copy(self._q)
@q.setter
def q(self, q):
self._q[:] = q
for i, joint_widget in enumerate(self._joint_widgets):
joint_widget.value = self._q[i]
self._update()
class TrajectorySlider(object):
def __init__(self, traj, update, continuous_update=False, **kwargs):
self._traj = traj
self._update = update
widgets.interact(
self._callback,
t=widgets.FloatSlider(
value=self._traj.start_time(),
min=self._traj.start_time(),
max=self._traj.end_time(),
description="t [s]",
continuous_update=continuous_update,
**kwargs
),
)
def _callback(self, t):
x = self._traj.value(t)
self._update(x)
def AddPoseConstraint(plant, ik, W, F, X_WF, pos_tol=0., th_tol=0.):
"""Simplified position and orientation IK constraints."""
p_WF = X_WF.translation()
if pos_tol is not None:
ik.AddPositionConstraint(
frameB=F, p_BQ=np.zeros(3),
frameA=W, p_AQ_lower=p_WF - pos_tol, p_AQ_upper=p_WF + pos_tol)
if th_tol is not None:
ik.AddOrientationConstraint(
frameBbar=F, R_BbarB=RotationMatrix.Identity(),
frameAbar=W, R_AbarA=X_WF.rotation(),
theta_bound=th_tol)
def FormulateMarkerIk(plant, X_WF_map, **kwargs):
"""Simple IK formulation, for use with markers."""
ik = InverseKinematics(plant)
W = plant.world_frame()
for F, X_WF in X_WF_map.items():
AddPoseConstraint(plant, ik, W, F, X_WF, **kwargs)
return ik.prog(), ik.q()
class PlantIkWithMarkers(object):
"""
Semi-modular IK for use with markers.
@param plant MBP instance
@param markers Marker client. Main interface is `LcmMarkerClient`.
@param frames List of Frame's from plant.
@param set_q(q) Called when a solution is found (or undo/redo used).
@param get_q0() Called to get initial guess + marker poses.
@param formulate_ik(plant, X_WF_map) Called to formulate IK, given plant
and map of (Frame, X_WF) where X_WF is a RigidTransform. Returns tuple,
(prog, q_vars).
"""
def __init__(
self, plant, markers, frames,
set_q, get_q0, formulate_ik=FormulateMarkerIk):
frame_map = {F.name(): F for F in frames}
frame_names = [F.name() for F in frames]
context = plant.CreateDefaultContext()
def reset(frames):
plant.SetPositions(context, get_q0())
for name in frame_names:
F = frame_map[name]
X_WF = plant.CalcRelativeTransform(context, W, F)
self._X_WF_map[F] = X_WF
markers.set_X_WF(name, X_WF)
# State
self._cur_frame_names = frame_names
self._q_history = SimpleHistory(get_q0())
self._X_WF_map = {}
# - Initilialize.
W = plant.world_frame()
reset(frame_names)
def with_w_out(f):
# Decorates function to ensure output goes to `w_out`.
def wrapped(*args, **kwargs):
with interactive_update(w_out):
return f(*args, **kwargs)
return wrapped
@with_w_out
def on_reset(_):
print("Reset: {}".format(self._cur_frame_names))
reset(self._cur_frame_names)
@with_w_out
def on_cur_frames(c):
self._cur_frame_names = c["new"]
@with_w_out
def on_solve(_):
for F in frames:
X_WF = markers.get_X_WF(F.name())
if X_WF:
self._X_WF_map[F] = X_WF
prog, q = formulate_ik(plant, self._X_WF_map)
prog.SetInitialGuess(q, get_q0())
if prog.Solve() == SolutionResult.kSolutionFound:
print("Success")
q_sol = prog.GetSolution(q)
print("q = np.{}".format(repr(q_sol)))
self._q_history.add(q_sol)
set_q(q_sol)
else:
print("Solution Failed")
@with_w_out
def on_undo(_):
print("Undo")
q = self._q_history.undo()
print("q = np.{}".format(repr(q)))
set_q(q)
@with_w_out
def on_redo(_):
print("Redo")
q = self._q_history.redo()
print("q = np.{}".format(repr(q)))
set_q(q)
w_title = widgets.HTML("<b>Inverse Kinematics</b>")
w_markers_label = widgets.Label("Selected Markers")
w_cur_frames = widgets.SelectMultiple(
value=self._cur_frame_names, options=self._cur_frame_names)
w_cur_frames.observe(on_cur_frames, "value")
w_reset = widgets.Button(description="Reset Selected Markers")
w_reset.on_click(on_reset)
w_solve = widgets.Button(description="Solve")
w_solve.on_click(on_solve)
w_undo = widgets.Button(description="Undo")
w_undo.on_click(on_undo)
w_redo = widgets.Button(description="Redo")
w_redo.on_click(on_redo)
w_out = widgets.Output(layout={"min_width": "700px"})
self.widgets = widgets.VBox([
w_title,
widgets.HBox([
widgets.VBox([
w_markers_label,
w_cur_frames,
widgets.HBox([w_reset]),
widgets.HBox([], layout={"height": "10px"}),
widgets.HBox([w_solve, w_undo, w_redo]),
]),
w_out,
]),
])
class LcmMarkerClient(object):
"""Thin client to communicate with Director affordance markers with LCM."""
def __init__(self, lcm):
self._lcm = lcm
self._poll = LcmMessagePoll(
lcm, "MARKER_STATUS", object_detection_array_t, latch=True)
self.reset()
def __del__(self):
self.reset()
def reset(self):
# Clear out markers.
msg = object_detection_array_t()
obj_msg = object_detection_t()
obj_msg.header.frame_name = "RESET"
msg.detections.append(obj_msg)
msg.num_detections = len(msg.detections)
self._lcm.publish("MARKER_COMMAND", msg.encode())
def get_X_WF(self, name):
msg = self._poll.get()
if msg is None:
return None
for obj_msg in msg.detections:
if obj_msg.model_name == name:
break
else:
return None
R = np.array(obj_msg.rotation, order='F').reshape((3, 3))
return RigidTransform(RotationMatrix(R), obj_msg.translation)
def set_X_WF(self, name, X_WF):
X_WF = RigidTransform(X_WF)
msg = object_detection_array_t()
obj_msg = object_detection_t()
obj_msg.header.frame_name = "world"
obj_msg.model_name = name
obj_msg.rotation = X_WF.rotation().matrix().flatten().tolist()
obj_msg.translation = X_WF.translation()
msg.detections.append(obj_msg)
msg.num_detections = len(msg.detections)
self._lcm.publish("MARKER_COMMAND", msg.encode())
class SimpleHistory(object):
"""Keeps track of redos + undos."""
def __init__(self, value):
# There should always be one valid value.
self._history = [value]
self._cur_index = 0
def add(self, value):
redo_index = self._cur_index + 1
if redo_index != len(self._history):
del self._history[redo_index:] # Discard redos.
self._cur_index = len(self._history) - 1
self._history.append(value)
self._cur_index += 1
return value
def undo(self):
if self._cur_index > 0:
self._cur_index -= 1
return self._history[self._cur_index]
def redo(self):
if self._cur_index + 1 < len(self._history):
self._cur_index += 1
return self._history[self._cur_index]
package my_lcm_types;
// A message for indicating a series of object detections.
struct object_detection_array_t {
// Number of detections.
int8_t num_detections;
// Array of detections.
object_detection_t detections[num_detections];
}
package my_lcm_types;
// A message for indicating an object detection using perception.
struct object_detection_t {
robotlocomotion.header_t header;
// Model which was detected.
string model_name;
// On a scale of [0, 1], 0 being worst.
float confidence;
// Column-major rotation matrix.
float rotation[9];
float translation[3];
}
@EricCousineau-TRI
Copy link
Author

Example:

image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment