Created
March 20, 2018 17:58
-
-
Save rpicatoste/dc68e89c69c169fb254d6621e9ab6438 to your computer and use it in GitHub Desktop.
Example of the quadcopter flying
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# %% | |
import csv | |
import numpy as np | |
from task import Task | |
from agents.agent import DDPG | |
from plot_functions import plot_results | |
from collections import defaultdict | |
import copy | |
import matplotlib.pyplot as plt | |
from agents.ou_noise import OUNoise | |
## Modify the values below to give the quadcopter a different starting position. | |
file_output = 'data.txt' # file name for saved results | |
plt.close('all') | |
# Run task with agent | |
def run_test_episode(agent: DDPG, task: Task, file_output): | |
print('\nRunning test episode ...') | |
labels = ['time', 'x', 'y', 'z', 'phi', 'theta', 'psi', 'x_velocity', | |
'y_velocity', 'z_velocity', 'phi_velocity', 'theta_velocity', | |
'psi_velocity', 'rotor_speed1', 'rotor_speed2', 'rotor_speed3', 'rotor_speed4', 'reward'] | |
results = {x: [] for x in labels} | |
aux_noise = copy.copy(agent.noise) | |
agent.noise = OUNoise(agent.action_size, 0.0, 0.0, 0.0) | |
state = agent.reset_episode() # start a new episode | |
rewards_lists = defaultdict(list) | |
print('state', state) | |
print('state.shape', state.shape) | |
# Run the simulation, and save the results. | |
with open(file_output, 'w') as csvfile: | |
writer = csv.writer(csvfile) | |
writer.writerow(labels) | |
while True: | |
rotor_speeds = agent.act(state) | |
rotor_speeds = [405] * 4 | |
rotor_speeds = [640, 605, 605, 605] | |
next_state, reward, done, new_rewards = task.step(rotor_speeds) | |
for key, value in new_rewards.items(): | |
rewards_lists[key].append(value) | |
to_write = [task.sim.time] + list(task.sim.pose) + list(task.sim.v) + list(task.sim.angular_v) + list( | |
rotor_speeds) + [reward] | |
for ii in range(len(labels)): | |
results[labels[ii]].append(to_write[ii]) | |
writer.writerow(to_write) | |
state = next_state | |
if done: | |
break | |
# Restore noise | |
agent.noise = copy.copy(aux_noise) | |
print('Finished test episode!\n') | |
return results, rewards_lists | |
# %% Parameters | |
exploration_mu = 405 | |
exploration_theta = 0.15 * 3 | |
exploration_sigma = 0.2 * 5 | |
buffer_size = 100000 | |
batch_size = 64 | |
gamma = 0.99 | |
tau = 0.01 # 0.001 | |
actor_learning_rate = 0.0001 * 0.10 | |
critic_learning_rate = 0.001 * 0.10 | |
num_episodes = 35 # 1000 | |
# %% Training with agen | |
print('\n\nStart training...') | |
num_episodes_to_plot = max(100, num_episodes / 5) | |
target_pos = np.array([0.0, 0.0, 10.0]) | |
init_pose = np.array([0.0, 0.0, 5.0, 0.0, 0.0, 0.0]) | |
init_velocities = np.array([0.0, 0.0, 0.0]) | |
task = Task(init_pose=init_pose, | |
init_velocities=init_velocities, | |
target_pos=target_pos) | |
agent = DDPG(task, | |
exploration_mu=exploration_mu, | |
exploration_theta=exploration_theta, | |
exploration_sigma=exploration_sigma, | |
buffer_size=buffer_size, | |
batch_size=batch_size, | |
gamma=gamma, | |
tau=tau, | |
actor_learning_rate=actor_learning_rate, | |
critic_learning_rate=critic_learning_rate | |
) | |
results, rewards_lists = run_test_episode(agent, task, file_output) | |
plot_results(results, target_pos, 'Run without training', rewards_lists) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment