|
import time |
|
import os |
|
import numpy as np |
|
import pandas as pd |
|
import tqdm |
|
from movement_primitives.data import generate_minimum_jerk |
|
from movement_primitives.dmp import DMP |
|
from dmpbbo.dmps.Dmp import Dmp |
|
from dmpbbo.dmps.Trajectory import Trajectory |
|
from dmpbbo.functionapproximators.FunctionApproximatorRBFN import FunctionApproximatorRBFN |
|
|
|
|
|
def setup_dmpbbo_dmp(T, X, dt, n_dims, n_weights_per_dim, dmp_type): |
|
traj = Trajectory(T, X) |
|
function_apps = [FunctionApproximatorRBFN(n_weights_per_dim, 0.7) for _ in range(traj.dim)] |
|
dmp = Dmp.from_traj(traj, function_apps, dmp_type=dmp_type) |
|
return dmp |
|
|
|
|
|
def execute_dmpbbo_dmp(dmp, dt, n_time_steps): |
|
Y = [] |
|
x, xd = dmp.integrate_start() |
|
y, yd, ydd = dmp.states_as_pos_vel_acc(x, xd) |
|
Y.append(y) |
|
for tt in range(1, n_time_steps): |
|
x, xd = dmp.integrate_step(dt, x) |
|
y, yd, ydd = dmp.states_as_pos_vel_acc(x, xd) |
|
Y.append(y) |
|
return Y |
|
|
|
|
|
def setup_movement_primitives_dmp(T, X, dt, n_dims, n_weights_per_dim): |
|
dmp = DMP(n_dims=n_dims, execution_time=execution_time, dt=dt, n_weights_per_dim=n_weights_per_dim, int_dt=0.1 * dt) |
|
dmp.imitate(T, X) |
|
return dmp |
|
|
|
|
|
def execute_movement_primitives_dmp(dmp, dt, n_time_steps, step_function): |
|
Y = [] |
|
y, yd = dmp.start_y, dmp.start_yd |
|
Y.append(y) |
|
for tt in range(1, n_time_steps): |
|
y, yd = dmp.step(y, yd, step_function=step_function) |
|
Y.append(y) |
|
return Y |
|
|
|
|
|
n_repetitions = 100 |
|
|
|
|
|
dmpbbo_configs = { |
|
"dmpbbo (Kulvicius 2012)": { |
|
"setup_dmp": setup_dmpbbo_dmp, |
|
"execute_dmp": execute_dmpbbo_dmp, |
|
"setup_dmp_params": {"dmp_type": "KULVICIUS_2012_JOINING"}, # IJSPEERT_2002_MOVEMENT, COUNTDOWN_2013 |
|
"execute_dmp_params": {} |
|
}, |
|
} |
|
movement_primitives_configs = { |
|
"movement_primitives (euler-cython)": { |
|
"setup_dmp": setup_movement_primitives_dmp, |
|
"execute_dmp": execute_movement_primitives_dmp, |
|
"setup_dmp_params": {}, |
|
"execute_dmp_params": {"step_function": "euler-cython"} |
|
}, |
|
"movement_primitives (euler)": { |
|
"setup_dmp": setup_movement_primitives_dmp, |
|
"execute_dmp": execute_movement_primitives_dmp, |
|
"setup_dmp_params": {}, |
|
"execute_dmp_params": {"step_function": "euler"} |
|
}, |
|
"movement_primitives (rk4-cython)": { |
|
"setup_dmp": setup_movement_primitives_dmp, |
|
"execute_dmp": execute_movement_primitives_dmp, |
|
"setup_dmp_params": {}, |
|
"execute_dmp_params": {"step_function": "rk4-cython"} |
|
}, |
|
"movement_primitives (rk4)": { |
|
"setup_dmp": setup_movement_primitives_dmp, |
|
"execute_dmp": execute_movement_primitives_dmp, |
|
"setup_dmp_params": {}, |
|
"execute_dmp_params": {"step_function": "rk4"} |
|
}, |
|
} |
|
configs = {} |
|
#configs.update(dmpbbo_configs) |
|
configs.update(movement_primitives_configs) |
|
|
|
n_weights_per_dim = 10 |
|
n_dims = 5 |
|
dt = 0.001 |
|
execution_time = 1.0 |
|
|
|
if not os.path.exists("benchmark_results.csv"): |
|
results = [] |
|
#for dt in tqdm.tqdm([0.01, 0.001], leave=False, desc="dt"): |
|
for n_dims in tqdm.tqdm([3, 6, 15, 50], leave=False, desc="n_dims"): |
|
for n_weights_per_dim in tqdm.tqdm([10, 30, 60], leave=False, desc="n_weights_per_dim"): |
|
X, Xd, Xdd = generate_minimum_jerk(np.zeros(n_dims), np.ones(n_dims), execution_time=execution_time, dt=dt) |
|
T = np.arange(0, execution_time + dt, dt) |
|
for name, config in tqdm.tqdm(configs.items(), leave=False, desc="Configurations"): |
|
setup_dmp = config["setup_dmp"] |
|
execute_dmp = config["execute_dmp"] |
|
setup_dmp_params = config["setup_dmp_params"] |
|
execute_dmp_params = config["execute_dmp_params"] |
|
|
|
for _ in tqdm.tqdm(range(n_repetitions), leave=False, desc="Repetitions"): |
|
start = time.perf_counter() |
|
dmp = setup_dmp(T, X, dt, n_dims, n_weights_per_dim, **setup_dmp_params) |
|
stop = time.perf_counter() |
|
setup_time = stop - start |
|
|
|
n_time_steps = int(execution_time / dt) + 1 |
|
start = time.perf_counter() |
|
Y = execute_dmp(dmp, dt, n_time_steps, **execute_dmp_params) |
|
stop = time.perf_counter() |
|
execution_time_ = stop - start |
|
|
|
results.append([name, n_dims, n_weights_per_dim, dt, "Setup", setup_time]) |
|
results.append([name, n_dims, n_weights_per_dim, dt, "Execution", execution_time_]) |
|
|
|
df = pd.DataFrame(data=results, columns=["Implementation", "#dim.", "#weights/dim.", "dt [s]", "Function", "Time [s]"]) |
|
df.to_csv("benchmark_results.csv") |
|
print("Done.") |
|
else: |
|
print("Result file exists, not overwriting") |