Skip to content

Instantly share code, notes, and snippets.

@AlexanderFabisch
Last active May 5, 2024 19:42
Show Gist options
  • Save AlexanderFabisch/a69534fe425d305d7550eb526825cf2f to your computer and use it in GitHub Desktop.
Save AlexanderFabisch/a69534fe425d305d7550eb526825cf2f to your computer and use it in GitHub Desktop.

Setup

Install dmpbbo

git clone https://github.com/stulp/dmpbbo.git
cd dmpbbo
./install_dependencies.sh
make build
cd ..
export PYTHONPATH=$PYTHONPATH:dmpbbo

Train DMPs for DMPBBO

python train_dmpbbo.py

Run Experiments

python benchmark_execution_of_dmp.py

Evaluation

Create LaTeX table

python convert_results_table.py

Create plot

python plot_benchmark_results.py
import time
import os
import numpy as np
import pandas as pd
import tqdm
from movement_primitives.data import generate_minimum_jerk
from movement_primitives.dmp import DMP
from dmpbbo.dmps.Dmp import Dmp
from dmpbbo.dmps.Trajectory import Trajectory
from dmpbbo.functionapproximators.FunctionApproximatorRBFN import FunctionApproximatorRBFN
def setup_dmpbbo_dmp(T, X, dt, n_dims, n_weights_per_dim, dmp_type):
traj = Trajectory(T, X)
function_apps = [FunctionApproximatorRBFN(n_weights_per_dim, 0.7) for _ in range(traj.dim)]
dmp = Dmp.from_traj(traj, function_apps, dmp_type=dmp_type)
return dmp
def execute_dmpbbo_dmp(dmp, dt, n_time_steps):
Y = []
x, xd = dmp.integrate_start()
y, yd, ydd = dmp.states_as_pos_vel_acc(x, xd)
Y.append(y)
for tt in range(1, n_time_steps):
x, xd = dmp.integrate_step(dt, x)
y, yd, ydd = dmp.states_as_pos_vel_acc(x, xd)
Y.append(y)
return Y
def setup_movement_primitives_dmp(T, X, dt, n_dims, n_weights_per_dim):
dmp = DMP(n_dims=n_dims, execution_time=execution_time, dt=dt, n_weights_per_dim=n_weights_per_dim, int_dt=0.1 * dt)
dmp.imitate(T, X)
return dmp
def execute_movement_primitives_dmp(dmp, dt, n_time_steps, step_function):
Y = []
y, yd = dmp.start_y, dmp.start_yd
Y.append(y)
for tt in range(1, n_time_steps):
y, yd = dmp.step(y, yd, step_function=step_function)
Y.append(y)
return Y
n_repetitions = 100
dmpbbo_configs = {
"dmpbbo (Kulvicius 2012)": {
"setup_dmp": setup_dmpbbo_dmp,
"execute_dmp": execute_dmpbbo_dmp,
"setup_dmp_params": {"dmp_type": "KULVICIUS_2012_JOINING"}, # IJSPEERT_2002_MOVEMENT, COUNTDOWN_2013
"execute_dmp_params": {}
},
}
movement_primitives_configs = {
"movement_primitives (euler-cython)": {
"setup_dmp": setup_movement_primitives_dmp,
"execute_dmp": execute_movement_primitives_dmp,
"setup_dmp_params": {},
"execute_dmp_params": {"step_function": "euler-cython"}
},
"movement_primitives (euler)": {
"setup_dmp": setup_movement_primitives_dmp,
"execute_dmp": execute_movement_primitives_dmp,
"setup_dmp_params": {},
"execute_dmp_params": {"step_function": "euler"}
},
"movement_primitives (rk4-cython)": {
"setup_dmp": setup_movement_primitives_dmp,
"execute_dmp": execute_movement_primitives_dmp,
"setup_dmp_params": {},
"execute_dmp_params": {"step_function": "rk4-cython"}
},
"movement_primitives (rk4)": {
"setup_dmp": setup_movement_primitives_dmp,
"execute_dmp": execute_movement_primitives_dmp,
"setup_dmp_params": {},
"execute_dmp_params": {"step_function": "rk4"}
},
}
configs = {}
#configs.update(dmpbbo_configs)
configs.update(movement_primitives_configs)
n_weights_per_dim = 10
n_dims = 5
dt = 0.001
execution_time = 1.0
if not os.path.exists("benchmark_results.csv"):
results = []
#for dt in tqdm.tqdm([0.01, 0.001], leave=False, desc="dt"):
for n_dims in tqdm.tqdm([3, 6, 15, 50], leave=False, desc="n_dims"):
for n_weights_per_dim in tqdm.tqdm([10, 30, 60], leave=False, desc="n_weights_per_dim"):
X, Xd, Xdd = generate_minimum_jerk(np.zeros(n_dims), np.ones(n_dims), execution_time=execution_time, dt=dt)
T = np.arange(0, execution_time + dt, dt)
for name, config in tqdm.tqdm(configs.items(), leave=False, desc="Configurations"):
setup_dmp = config["setup_dmp"]
execute_dmp = config["execute_dmp"]
setup_dmp_params = config["setup_dmp_params"]
execute_dmp_params = config["execute_dmp_params"]
for _ in tqdm.tqdm(range(n_repetitions), leave=False, desc="Repetitions"):
start = time.perf_counter()
dmp = setup_dmp(T, X, dt, n_dims, n_weights_per_dim, **setup_dmp_params)
stop = time.perf_counter()
setup_time = stop - start
n_time_steps = int(execution_time / dt) + 1
start = time.perf_counter()
Y = execute_dmp(dmp, dt, n_time_steps, **execute_dmp_params)
stop = time.perf_counter()
execution_time_ = stop - start
results.append([name, n_dims, n_weights_per_dim, dt, "Setup", setup_time])
results.append([name, n_dims, n_weights_per_dim, dt, "Execution", execution_time_])
df = pd.DataFrame(data=results, columns=["Implementation", "#dim.", "#weights/dim.", "dt [s]", "Function", "Time [s]"])
df.to_csv("benchmark_results.csv")
print("Done.")
else:
print("Result file exists, not overwriting")
import os
with open("result_table.csv", "r") as f:
table = f.read()
lines = table.split(os.linesep)
lines = lines[3:-1]
last_implementation = None
last_dimensions = None
for line in lines:
implementation, dimensions, weights, repetitions, mean, std, _, _, _, _, _ = line.split(",")
implementation = implementation.replace("_", "\\_")
midrule = None
assert 100 == int(float(repetitions))
if last_implementation == implementation:
implementation = "&"
else:
midrule = "\\midrule"
last_implementation = implementation
library, impl = implementation.split(" (")
impl = impl[:-1]
#implementation = "\\multirow[c]{12}{*}{" + library + "} & " + impl
implementation = library + " & " + impl
if last_dimensions == dimensions:
dimensions = ""
else:
if midrule is None:
print("\\cmidrule{3-5}")
last_dimensions = dimensions
dimensions = int(float(dimensions))
if midrule is not None:
print(midrule)
print(f"{implementation} & {dimensions} & {int(float(weights))} & {float(mean):.4f} $\pm$ {float(std):.4f} \\\\")
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
df_python = pd.read_csv("benchmark_results_python.csv", index_col=0)
df_cpp = pd.read_csv("benchmark_results_cpp.csv", index_col=0)
df = pd.concat([df_python,
df_cpp])
df.index = range(len(df))
df.columns = ["Implementation", "Dimensions", "Weights per dimension", "dt [s]", "Function", "Time [s]"]
df = df.where(
(df["dt [s]"] == 0.001)
#& (df["Implementation"] != "dmpbbo (Countdown 2012)")
#& (df["Implementation"] != "dmpbbo (Ijspeert 2002)")
).dropna()
print(df)
for fixed_feature, value, varying_feature in [("Weights per dimension", 60, "Dimensions"), ("Dimensions", 15, "Weights per dimension")]:
for function in ["Setup", "Execution"]:
plt.figure(figsize=(8, 4))
sns.set_style("ticks")
sns.set_context("paper")
plt.title(f"{function} of DMP ({value} {fixed_feature.lower()})")
data = df.where(
(df["Function"] == function)
& (df[fixed_feature] == value)
& (df["Implementation"] != "dmpbbo (Kulvicius 2012)")
).dropna()
data = data.astype({"Dimensions": int, "Weights per dimension": int})
ax = sns.barplot(data=data, y="Implementation", x="Time [s]", hue=varying_feature, errorbar=None, orient="h", gap=0.05, palette="muted")
sns.despine(offset=10, trim=True, left=True)
#plt.yscale("symlog")
sns.move_legend(ax, "lower right")
plt.tight_layout()
plt.savefig(f"timing_{function.lower()}_{varying_feature.split(' ')[0].lower()}.pdf")
table = df.where((df["Function"] == function) & (df["dt [s]"] == 0.001)
)[["Implementation", "Dimensions", "Weights per dimension", "Time [s]"]].groupby(
["Implementation", "Dimensions", "Weights per dimension"]).describe()
table.to_csv("result_table.csv")
import tqdm
import numpy as np
from movement_primitives.data import generate_minimum_jerk
import dmpbbo.json_for_cpp as json_for_cpp
from dmpbbo.dmps.Dmp import Dmp
from dmpbbo.dmps.Trajectory import Trajectory
from dmpbbo.functionapproximators.FunctionApproximatorRBFN import FunctionApproximatorRBFN
dt = 0.001
execution_time = 1.0
for n_dims in tqdm.tqdm([3, 6, 15, 50], leave=False, desc="n_dims"):
for n_weights_per_dim in tqdm.tqdm([10, 30, 60], leave=False, desc="n_weights_per_dim"):
X, Xd, Xdd = generate_minimum_jerk(np.zeros(n_dims), np.ones(n_dims), execution_time=execution_time, dt=dt)
T = np.arange(0, execution_time + dt, dt)
traj = Trajectory(T, X)
function_apps = [FunctionApproximatorRBFN(n_weights_per_dim, 0.7) for _ in range(traj.dim)]
dmp = Dmp.from_traj(traj, function_apps)
filename = f"dmp_{n_dims}_{n_weights_per_dim}.json"
json_for_cpp.savejson_for_cpp(filename, dmp)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment