Created
January 18, 2019 17:18
-
-
Save EricCousineau-TRI/403b9608f2f5b9d930ddee77d908c76c to your computer and use it in GitHub Desktop.
Snippets for Jupyter + Director Teleop with pydrake
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import numpy as np | |
| from director import affordancemanager | |
| from director import lcmUtils | |
| from director import objectmodel as om | |
| from director.thirdparty import transformations | |
| from my_lcm_types import object_detection_t, object_detection_array_t | |
| def quat_to_rot(q): | |
| # Director does not have this conversion... | |
| return transformations.quaternion_matrix(q)[:3, :3] | |
| def rot_to_quat(R): | |
| X = np.eye(4) | |
| X[:3, :3] = R | |
| q_wxyz = transformations.quaternion_from_matrix(X, isprecise=True) | |
| return q_wxyz | |
| # See `director.affordanceitems` for more types. | |
| DEFAULT_DESC = dict( | |
| classname='BoxAffordanceItem', | |
| # (xyz, q_wxyz) | |
| pose=((0, 0, 0), (1, 0, 0, 0)), | |
| # `grasp_frame` for WSG. | |
| Dimensions=[0.03, 0.146, 0.116], | |
| ) | |
| # TODO(eric.cousineau): Consider using `affordancecollection` to manage this | |
| # stuff, if it simplifies things. | |
| class LcmMarkerServer(object): | |
| """Handles marker commands and publishes status.""" | |
| def __init__( | |
| self, view, period_sec=0.1, | |
| pub_channel="MARKER_STATUS", sub_channel="MARKER_COMMAND"): | |
| self._manager = affordancemanager.AffordanceObjectModelManager(view) | |
| self._obj_list = [] | |
| self._pub_channel = pub_channel | |
| self._sub = lcmUtils.addSubscriber( | |
| sub_channel, object_detection_array_t, self._sub_callback) | |
| self._sub.setNotifyAllMessagesEnabled(True) | |
| def add(self, desc): | |
| obj = self._manager.newAffordanceFromDescription(desc) | |
| obj.setProperty("Alpha", 0.3) | |
| frame = obj.getChildFrame() | |
| frame.setProperty("Edit", True) | |
| assert frame.onTransformModifiedCallback is None | |
| frame.onTransformModifiedCallback = self._frame_callback | |
| self._obj_list.append(obj) | |
| return obj | |
| def reset(self): | |
| print("Reset markers") | |
| for obj in self._obj_list: | |
| self._manager.removeAffordance(obj) | |
| self._obj_list = [] | |
| om.removeFromObjectModel(om.getOrCreateContainer("affordances")) | |
| def _frame_callback(self, _): | |
| # Publish all when any frame is moved. | |
| msg = object_detection_array_t() | |
| for obj in self._obj_list: | |
| obj_msg = object_detection_t() | |
| obj_msg.header.frame_name = "world" | |
| obj_msg.model_name = obj.getProperty("Name") | |
| xyz, q_wxyz = obj.getPose() | |
| obj_msg.translation[:] = xyz | |
| R = np.array(quat_to_rot(q_wxyz), order='F') | |
| obj_msg.rotation[:] = R.flat | |
| msg.detections.append(obj_msg) | |
| msg.num_detections = len(msg.detections) | |
| lcmUtils.publish(self._pub_channel, msg) | |
| def _is_mode(self, msg, mode): | |
| if len(msg.detections) != 1: | |
| return False | |
| return msg.detections[0].header.frame_name == mode | |
| def _sub_callback(self, msg): | |
| if self._is_mode(msg, "RESET"): | |
| self.reset() | |
| return | |
| for obj_msg in msg.detections: | |
| for obj in self._obj_list: | |
| if obj.getProperty("Name") == obj_msg.model_name: | |
| break | |
| else: | |
| print("New marker: {}".format(obj_msg.model_name)) | |
| desc = dict(DEFAULT_DESC) | |
| desc.update(Name=obj_msg.model_name) | |
| obj = self.add(desc) | |
| xyz = obj_msg.translation | |
| R = np.array(obj_msg.rotation, order='F').reshape((3, 3)) | |
| q_wxyz = rot_to_quat(R) | |
| obj.repositionFromDescription(dict(pose=(xyz, q_wxyz))) | |
| print("Update marker: {}".format(obj_msg.model_name)) | |
| if __name__ == "__main__": | |
| markers = LcmMarkerServer(view) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """ | |
| Jupyter widgets to interface with `pydrake` stuff. | |
| """ | |
| from contextlib import contextmanager | |
| from functools import partial | |
| from IPython.display import clear_output, display | |
| from ipywidgets.widgets.interaction import show_inline_matplotlib_plots | |
| import ipywidgets as widgets | |
| import numpy as np | |
| from pydrake.multibody.tree import JointIndex | |
| from pydrake.solvers.mathematicalprogram import SolutionResult | |
| from pydrake.multibody.inverse_kinematics import InverseKinematics | |
| from pydrake.math import RigidTransform, RotationMatrix | |
| from my_lcm_types import object_detection_t, object_detection_array_t | |
| from my_stuff.lcm_util import LcmMessagePoll # Just gets latest message | |
| # Adapted from `ipywidets.widgets.interaction` | |
| @contextmanager | |
| def interactive_update(out): | |
| with out: | |
| clear_output(wait=True) | |
| yield | |
| show_inline_matplotlib_plots() | |
| class PlantJointSlider(object): | |
| """ | |
| Example: | |
| slider = PlantJointSlider(plant, update, q=q) | |
| display(slider.widget) | |
| """ | |
| def __init__( | |
| self, plant, update_callback, q=None, continuous_update=False, | |
| num_steps=100): | |
| self._update_callback = update_callback | |
| nq = plant.num_positions() | |
| if q is None: | |
| q = np.zeros(nq) | |
| self._q = np.array(q) | |
| assert self._q.shape == (nq,) | |
| joints = [] | |
| for i in range(plant.num_joints()): | |
| joint = plant.get_joint(JointIndex(i)) | |
| if joint.num_positions() == 0: | |
| continue | |
| assert joint.num_positions() == 1 | |
| joints.append(joint) | |
| joints = sorted(joints, key=lambda x: x.position_start()) | |
| assert len(joints) == nq | |
| def joint_callback(i, value): | |
| self._q[i] = value["new"] | |
| self._update() | |
| # Setup widgets | |
| joint_widgets = [] | |
| rows = [] | |
| for i in range(len(q)): | |
| joint = joints[i] | |
| step = (joint.upper_limits() - joint.lower_limits()) / num_steps | |
| joint_widget = widgets.FloatSlider( | |
| value=self._q[i], | |
| min=joint.lower_limits(), | |
| max=joint.upper_limits(), | |
| step=step, | |
| continuous_update=continuous_update, | |
| ) | |
| joint_widget.observe(partial(joint_callback, i), "value") | |
| joint_widgets.append(joint_widget) | |
| name = ( | |
| plant.GetModelInstanceName(joint.model_instance()) + "::" + | |
| joint.name()) | |
| label = widgets.Label(name, layout={"width": "300px"}) | |
| rows.append(widgets.HBox([label, joint_widget])) | |
| self._joint_widgets = joint_widgets | |
| self.widget = widgets.VBox( | |
| rows, | |
| layout=widgets.Layout(min_width="800px"), | |
| ) | |
| self._update() | |
| def _update(self): | |
| self._update_callback(self.q) | |
| @property | |
| def q(self): | |
| return np.copy(self._q) | |
| @q.setter | |
| def q(self, q): | |
| self._q[:] = q | |
| for i, joint_widget in enumerate(self._joint_widgets): | |
| joint_widget.value = self._q[i] | |
| self._update() | |
| class TrajectorySlider(object): | |
| def __init__(self, traj, update, continuous_update=False, **kwargs): | |
| self._traj = traj | |
| self._update = update | |
| widgets.interact( | |
| self._callback, | |
| t=widgets.FloatSlider( | |
| value=self._traj.start_time(), | |
| min=self._traj.start_time(), | |
| max=self._traj.end_time(), | |
| description="t [s]", | |
| continuous_update=continuous_update, | |
| **kwargs | |
| ), | |
| ) | |
| def _callback(self, t): | |
| x = self._traj.value(t) | |
| self._update(x) | |
| def AddPoseConstraint(plant, ik, W, F, X_WF, pos_tol=0., th_tol=0.): | |
| """Simplified position and orientation IK constraints.""" | |
| p_WF = X_WF.translation() | |
| if pos_tol is not None: | |
| ik.AddPositionConstraint( | |
| frameB=F, p_BQ=np.zeros(3), | |
| frameA=W, p_AQ_lower=p_WF - pos_tol, p_AQ_upper=p_WF + pos_tol) | |
| if th_tol is not None: | |
| ik.AddOrientationConstraint( | |
| frameBbar=F, R_BbarB=RotationMatrix.Identity(), | |
| frameAbar=W, R_AbarA=X_WF.rotation(), | |
| theta_bound=th_tol) | |
| def FormulateMarkerIk(plant, X_WF_map, **kwargs): | |
| """Simple IK formulation, for use with markers.""" | |
| ik = InverseKinematics(plant) | |
| W = plant.world_frame() | |
| for F, X_WF in X_WF_map.items(): | |
| AddPoseConstraint(plant, ik, W, F, X_WF, **kwargs) | |
| return ik.prog(), ik.q() | |
| class PlantIkWithMarkers(object): | |
| """ | |
| Semi-modular IK for use with markers. | |
| @param plant MBP instance | |
| @param markers Marker client. Main interface is `LcmMarkerClient`. | |
| @param frames List of Frame's from plant. | |
| @param set_q(q) Called when a solution is found (or undo/redo used). | |
| @param get_q0() Called to get initial guess + marker poses. | |
| @param formulate_ik(plant, X_WF_map) Called to formulate IK, given plant | |
| and map of (Frame, X_WF) where X_WF is a RigidTransform. Returns tuple, | |
| (prog, q_vars). | |
| """ | |
| def __init__( | |
| self, plant, markers, frames, | |
| set_q, get_q0, formulate_ik=FormulateMarkerIk): | |
| frame_map = {F.name(): F for F in frames} | |
| frame_names = [F.name() for F in frames] | |
| context = plant.CreateDefaultContext() | |
| def reset(frames): | |
| plant.SetPositions(context, get_q0()) | |
| for name in frame_names: | |
| F = frame_map[name] | |
| X_WF = plant.CalcRelativeTransform(context, W, F) | |
| self._X_WF_map[F] = X_WF | |
| markers.set_X_WF(name, X_WF) | |
| # State | |
| self._cur_frame_names = frame_names | |
| self._q_history = SimpleHistory(get_q0()) | |
| self._X_WF_map = {} | |
| # - Initilialize. | |
| W = plant.world_frame() | |
| reset(frame_names) | |
| def with_w_out(f): | |
| # Decorates function to ensure output goes to `w_out`. | |
| def wrapped(*args, **kwargs): | |
| with interactive_update(w_out): | |
| return f(*args, **kwargs) | |
| return wrapped | |
| @with_w_out | |
| def on_reset(_): | |
| print("Reset: {}".format(self._cur_frame_names)) | |
| reset(self._cur_frame_names) | |
| @with_w_out | |
| def on_cur_frames(c): | |
| self._cur_frame_names = c["new"] | |
| @with_w_out | |
| def on_solve(_): | |
| for F in frames: | |
| X_WF = markers.get_X_WF(F.name()) | |
| if X_WF: | |
| self._X_WF_map[F] = X_WF | |
| prog, q = formulate_ik(plant, self._X_WF_map) | |
| prog.SetInitialGuess(q, get_q0()) | |
| if prog.Solve() == SolutionResult.kSolutionFound: | |
| print("Success") | |
| q_sol = prog.GetSolution(q) | |
| print("q = np.{}".format(repr(q_sol))) | |
| self._q_history.add(q_sol) | |
| set_q(q_sol) | |
| else: | |
| print("Solution Failed") | |
| @with_w_out | |
| def on_undo(_): | |
| print("Undo") | |
| q = self._q_history.undo() | |
| print("q = np.{}".format(repr(q))) | |
| set_q(q) | |
| @with_w_out | |
| def on_redo(_): | |
| print("Redo") | |
| q = self._q_history.redo() | |
| print("q = np.{}".format(repr(q))) | |
| set_q(q) | |
| w_title = widgets.HTML("<b>Inverse Kinematics</b>") | |
| w_markers_label = widgets.Label("Selected Markers") | |
| w_cur_frames = widgets.SelectMultiple( | |
| value=self._cur_frame_names, options=self._cur_frame_names) | |
| w_cur_frames.observe(on_cur_frames, "value") | |
| w_reset = widgets.Button(description="Reset Selected Markers") | |
| w_reset.on_click(on_reset) | |
| w_solve = widgets.Button(description="Solve") | |
| w_solve.on_click(on_solve) | |
| w_undo = widgets.Button(description="Undo") | |
| w_undo.on_click(on_undo) | |
| w_redo = widgets.Button(description="Redo") | |
| w_redo.on_click(on_redo) | |
| w_out = widgets.Output(layout={"min_width": "700px"}) | |
| self.widgets = widgets.VBox([ | |
| w_title, | |
| widgets.HBox([ | |
| widgets.VBox([ | |
| w_markers_label, | |
| w_cur_frames, | |
| widgets.HBox([w_reset]), | |
| widgets.HBox([], layout={"height": "10px"}), | |
| widgets.HBox([w_solve, w_undo, w_redo]), | |
| ]), | |
| w_out, | |
| ]), | |
| ]) | |
| class LcmMarkerClient(object): | |
| """Thin client to communicate with Director affordance markers with LCM.""" | |
| def __init__(self, lcm): | |
| self._lcm = lcm | |
| self._poll = LcmMessagePoll( | |
| lcm, "MARKER_STATUS", object_detection_array_t, latch=True) | |
| self.reset() | |
| def __del__(self): | |
| self.reset() | |
| def reset(self): | |
| # Clear out markers. | |
| msg = object_detection_array_t() | |
| obj_msg = object_detection_t() | |
| obj_msg.header.frame_name = "RESET" | |
| msg.detections.append(obj_msg) | |
| msg.num_detections = len(msg.detections) | |
| self._lcm.publish("MARKER_COMMAND", msg.encode()) | |
| def get_X_WF(self, name): | |
| msg = self._poll.get() | |
| if msg is None: | |
| return None | |
| for obj_msg in msg.detections: | |
| if obj_msg.model_name == name: | |
| break | |
| else: | |
| return None | |
| R = np.array(obj_msg.rotation, order='F').reshape((3, 3)) | |
| return RigidTransform(RotationMatrix(R), obj_msg.translation) | |
| def set_X_WF(self, name, X_WF): | |
| X_WF = RigidTransform(X_WF) | |
| msg = object_detection_array_t() | |
| obj_msg = object_detection_t() | |
| obj_msg.header.frame_name = "world" | |
| obj_msg.model_name = name | |
| obj_msg.rotation = X_WF.rotation().matrix().flatten().tolist() | |
| obj_msg.translation = X_WF.translation() | |
| msg.detections.append(obj_msg) | |
| msg.num_detections = len(msg.detections) | |
| self._lcm.publish("MARKER_COMMAND", msg.encode()) | |
| class SimpleHistory(object): | |
| """Keeps track of redos + undos.""" | |
| def __init__(self, value): | |
| # There should always be one valid value. | |
| self._history = [value] | |
| self._cur_index = 0 | |
| def add(self, value): | |
| redo_index = self._cur_index + 1 | |
| if redo_index != len(self._history): | |
| del self._history[redo_index:] # Discard redos. | |
| self._cur_index = len(self._history) - 1 | |
| self._history.append(value) | |
| self._cur_index += 1 | |
| return value | |
| def undo(self): | |
| if self._cur_index > 0: | |
| self._cur_index -= 1 | |
| return self._history[self._cur_index] | |
| def redo(self): | |
| if self._cur_index + 1 < len(self._history): | |
| self._cur_index += 1 | |
| return self._history[self._cur_index] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package my_lcm_types; | |
| // A message for indicating a series of object detections. | |
| struct object_detection_array_t { | |
| // Number of detections. | |
| int8_t num_detections; | |
| // Array of detections. | |
| object_detection_t detections[num_detections]; | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package my_lcm_types; | |
| // A message for indicating an object detection using perception. | |
| struct object_detection_t { | |
| robotlocomotion.header_t header; | |
| // Model which was detected. | |
| string model_name; | |
| // On a scale of [0, 1], 0 being worst. | |
| float confidence; | |
| // Column-major rotation matrix. | |
| float rotation[9]; | |
| float translation[3]; | |
| } |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Example: