Last active
November 12, 2018 16:23
-
-
Save Redchards/d7c12de1cf7590326356ffec01fa2e4f to your computer and use it in GitHub Desktop.
Retarded Q-learning
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# OH NO ! IT'S RETARDED !! | |
''' | |
1, 1 [[ 0.94235238 0.4 ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[-0.4 -1.35506596] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[-1.13457664 -1.85922768] | |
[-1.373824 -1.89873836] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. -0.784 ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. -0.67546624] | |
[-0.4 -1.26544384] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. -0.784 ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. -0.64 ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0.4 ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. -0.64 ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. -0.4 ] | |
[ 0. -0.8704 ] | |
[ 0. 2.1724221 ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[-0.68224 -1.60657824] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. -0.9899223 ] | |
[-1.84627302 -2.31450877]] | |
1, 2 [[ 0. 3.19539843] | |
[ 0. 0. ] | |
[-0.784 -1.26705434] | |
[ 0. -0.99921636] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[-3.14717287 -4.47898616] | |
[-5.13272036 -5.45997029] | |
[ 0.4 1.0549504 ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. -0.4 ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. -0.64 ] | |
[ 0. -1.2967306 ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. -0.4 ] | |
[ 0. -0.64 ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. -0.64 ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. -0.784 ] | |
[ 0.4 0.92224 ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. -0.4 ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[ 0. -0.9899223 ] | |
[ 0. 3.14102508] | |
[ 0. 0. ] | |
[ 0. -0.928 ] | |
[-1.60660251 -1.71887104] | |
[ 0. 0. ] | |
[ 0. 0. ] | |
[-1.15264 -3.41924685] | |
[-4.99526105 -5.41065665]] | |
''' | |
import rospy | |
import math | |
import random #used for the random choice of a strategy | |
from std_msgs.msg import Int16,Float32,Bool,Float32MultiArray,Int16MultiArray | |
from nav_msgs.msg import Odometry | |
from fastsim.srv import * | |
from subsomption.msg import Channel | |
from sensor_msgs.msg import LaserScan | |
import sys | |
import numpy as np | |
import numpy.random as npr | |
channel=[] | |
lasers=LaserScan() | |
radar = [] | |
odom = Odometry() | |
bumper_l=0 | |
bumper_r=0 | |
goalx = 300 | |
goaly = 480 | |
def s_id(S): | |
state_id = 0 | |
state_id += int(S[0]) | |
state_id += int(S[1]) << 1 | |
state_id += int(S[2]) << 2 | |
state_id += int(S[3]) << 3 | |
return state_id | |
#------------------------------------------- | |
class CallBack_module_cl(object): | |
def __init__(self, num): | |
print "Creating callback for "+str(num) | |
self.num = num | |
def __call__(self, data): | |
return callback_module(self.num,data) | |
#------------------------------------------- | |
def callback_module(n, data): | |
channel[n]=data | |
#rospy.loginfo(rospy.get_caller_id()+" n=%d Activated: %d speed_l: %f speed_r: %f",n,data.activated, data.speed_left, data.speed_right) | |
#------------------------------------------- | |
def callback_right_bumper(data): | |
global bumper_r | |
bumper_r=data.data | |
#rospy.loginfo(rospy.get_caller_id()+"Right bumper %d",data.data) | |
#------------------------------------------- | |
def callback_left_bumper(data): | |
global bumper_l | |
bumper_l=data.data | |
#rospy.loginfo(rospy.get_caller_id()+"Left bumper %d",data.data) | |
#------------------------------------------- | |
def callback_lasers(data): | |
global lasers | |
lasers=data | |
#------------------------------------------- | |
def callback_radar(data): | |
global radar | |
radar=data.data | |
#------------------------------------------- | |
def callback_odom(data): | |
global odom | |
odom=data | |
# ajouter un call back radar avec le bon type de donnees | |
#------------------------------------------- | |
# nbCh is the number of behavioral modules (channels) in competition | |
# gatingType sets the gating algorithm to be used ['random','qlearning'] | |
#------------------------------------------- | |
def strategy_gating(nbCh,gatingType): | |
rospy.init_node('strategy_gating', anonymous=True) | |
v=Channel() | |
v.activated=False | |
v.speed_left=0 | |
v.speed_right=0 | |
# Parameters of State building | |
th_neglectedWall = 65 | |
angleLMin = 0 | |
angleLMax = 55 | |
angleFMin=56 | |
angleFMax=143 | |
angleRMin=144 | |
angleRMax=199 | |
alpha = 0.4 | |
beta = 8 | |
pouyou = 0.9 | |
# The node publishes movement orders for simu_fastsim : | |
pub_l = rospy.Publisher('/simu_fastsim/speed_left', Float32 , queue_size=10) | |
pub_r = rospy.Publisher('/simu_fastsim/speed_right', Float32 , queue_size=10) | |
# The node receives order suggestions by the behavioral modules (channels): | |
for n in range(nbCh): | |
rospy.Subscriber("/navigation_strategies/channel"+str(n), Channel, CallBack_module_cl(n)) | |
# If necessary, the node receives sensory information from simu_fastsim: | |
rospy.Subscriber("/simu_fastsim/laser_scan", LaserScan, callback_lasers) | |
rospy.Subscriber("/simu_fastsim/radars", Int16MultiArray, callback_radar) | |
rospy.Subscriber("/simu_fastsim/odom", Odometry, callback_odom) | |
rospy.Subscriber("/simu_fastsim/right_bumper", Bool, callback_right_bumper) | |
rospy.Subscriber("/simu_fastsim/left_bumper", Bool, callback_left_bumper) | |
# Targetted operating frequency of the node: | |
r = rospy.Rate(10) # 10hz | |
# Q-learning related stuff | |
# definition of states at time t and t-1 | |
S_t = '' | |
S_tm1 = '' | |
Q = {} | |
# start time and timing related things | |
startT = rospy.get_time() | |
rospy.loginfo("Start time"+str(startT)) | |
trial = 0 | |
nbTrials = 10 | |
trialDuration = np.zeros((nbTrials)) | |
collisions = np.zeros((nbTrials)) | |
choice = -1 | |
rew = 0 | |
i2strat = ['wall follower','guidance'] | |
persist_step = 0 | |
q_learning_step = 40 | |
q_values = np.zeros((64, 2)) | |
choice_selection_strategy = 1 | |
q_value_update_strategy = 2 | |
# Main loop: | |
while (not rospy.is_shutdown()) and (trial <nbTrials): | |
print(trial) | |
speed_l=0 | |
speed_r=0 | |
# processing of the sensory data : | |
#------------------------------------------------ | |
# 1) has the robot found the reward ? | |
#rospy.loginfo("pose: "+str(odom.pose.pose.position.x)+", "+str(odom.pose.pose.position.y)) | |
dist2goal = math.sqrt((odom.pose.pose.position.x-goalx)**2+(odom.pose.pose.position.y-goaly)**2) | |
#rospy.loginfo(dist2goal) | |
# if so, teleport it: | |
if (dist2goal<30): | |
rospy.wait_for_service('simu_fastsim/teleport') | |
try: | |
# teleport robot | |
teleport = rospy.ServiceProxy('simu_fastsim/teleport', Teleport) | |
x = 300 #20+random.randrange(520) | |
y = 40 | |
th = random.randrange(360)/2*math.pi | |
resp1 = teleport(x, y, th) | |
# store information about the duration of the finishing trial: | |
currT = rospy.get_time() | |
trialDuration[trial] = currT - startT | |
startT = currT | |
rospy.loginfo("Trial "+str(trial)+" duration:"+str(trialDuration[trial])) | |
trial +=1 | |
rew = 1 | |
odom.pose.pose.position.x = x | |
odom.pose.pose.position.y = y | |
except rospy.ServiceException, e: | |
print "Service call failed: %s"%e | |
# 2) has the robot bumped into a wall ? | |
#rospy.loginfo("BUMPERS "+str(bumper_r)+' '+str(bumper_l)) | |
if bumper_r or bumper_l: | |
collisions[trial] += 1 | |
rew = -1 | |
#rospy.loginfo("BING! A wall...") | |
# 3) build the state, that will be used by learning, from the sensory data | |
#rospy.loginfo("Nb laser scans="+str(len(lasers.ranges))) | |
if len(lasers.ranges) == 200: | |
S_tm1 = S_t | |
S_t = '' | |
# determine if obstacle on the left: | |
wall='0' | |
for l in lasers.ranges[angleLMin:angleLMax]: | |
if l < th_neglectedWall: | |
wall ='1' | |
S_t += wall | |
# determine if obstacle in front: | |
wall='0' | |
for l in lasers.ranges[angleFMin:angleFMax]: | |
#rospy.loginfo("front:"+str(l)) | |
if l < th_neglectedWall: | |
wall ='1' | |
S_t += wall | |
# determine if obstacle in front: | |
wall='0' | |
for l in lasers.ranges[angleRMin:angleRMax]: | |
if l < th_neglectedWall: | |
wall ='1' | |
S_t += wall | |
# check if we are receiving radar measurements | |
if radar != 0: | |
radar_list = [] | |
for i in range(len(radar)): | |
radar_list.append(radar[i]) | |
#rospy.loginfo(str(radar_list)) | |
print(radar_list) | |
S_t += str(radar_list[0]) | |
#rospy.loginfo("S(t)="+S_t+" ; S(t-1)="+S_tm1) | |
# The chosen gating strategy is to be coded here: | |
#------------------------------------------------ | |
if gatingType=='random': | |
choice = random.randrange(nbCh) | |
#choice = 1 | |
rospy.loginfo("Module actif: "+i2strat[choice]) | |
speed_l=channel[choice].speed_left | |
speed_r=channel[choice].speed_right | |
#------------------------------------------------ | |
elif gatingType=='randomPersist': | |
# a choice is made every 5 steps | |
if persist_step < 20: | |
persist_step += 1 | |
else: | |
persist_step = 0 | |
choice = random.randrange(nbCh) | |
speed_l = channel[choice].speed_left | |
speed_r = channel[choice].speed_right | |
rospy.loginfo("Module actif: "+i2strat[choice]) | |
rospy.loginfo("randomPersist: arbitrage a coder.") | |
pass | |
#------------------------------------------------ | |
elif gatingType=='guidance': | |
choice = 1 | |
rospy.loginfo("Module actif: "+i2strat[choice]) | |
speed_l=channel[choice].speed_left | |
speed_r=channel[choice].speed_right | |
#------------------------------------------------ | |
elif gatingType=='wallFollower': | |
choice = 1 | |
rospy.loginfo("Module actif: "+i2strat[choice]) | |
speed_l=channel[choice].speed_left | |
speed_r=channel[choice].speed_right | |
#------------------------------------------------ | |
elif gatingType=='qlearning': | |
print(S_t) | |
no_state = (S_t == '' or S_tm1 == '') | |
sensor_percept = (S_tm1 == S_t) | |
if choice == -1: | |
choice = 0 | |
if q_learning_step < 20 or no_state: | |
q_learning_step += 1 | |
if ((q_learning_step >= 20 and choice_selection_strategy == 1) or (sensor_percept and choice_selection_strategy == 2)) and not no_state: | |
norm_factor = sum([np.exp(beta * q_values[s_id(S_t), a]) for a in range(2)]) | |
probs = [np.exp(beta * q_values[s_id(S_t), a]) / norm_factor for a in range(2)] | |
print(probs) | |
draw = np.random.uniform() | |
choice = 0 if draw < probs[0] else 1 | |
if ((q_learning_step >= 20 and q_value_update_strategy == 1) or (not rew == 0 and q_value_update_strategy == 2)) and not no_state: | |
print(S_t) | |
print("Strategy : ", choice) | |
delta = rew + pouyou * q_values[s_id(S_t), :].max() - q_values[s_id(S_tm1), choice] | |
q_values[s_id(S_tm1), choice] = q_values[s_id(S_tm1), choice] + alpha * delta | |
print(q_values) | |
rew = 0 | |
if q_learning_step >= 20: | |
q_learning_step = 0 | |
speed_l=channel[choice].speed_left | |
speed_r=channel[choice].speed_right | |
rospy.loginfo("qlearning: arbitrage a coder.") | |
pass | |
#------------------------------------------------ | |
else: | |
rospy.loginfo(gatingType+' unknown.') | |
exit() | |
#for i in range(nbCh): | |
# channel[i]=v | |
pub_l.publish(speed_l) | |
pub_r.publish(speed_r) | |
r.sleep() | |
# Log files opening | |
print(trialDuration) | |
print(collisions) | |
logDuration = open('DureesEssais_a_'+str(alpha)+'_b_'+str(beta)+'_g_'+str(pouyou)+'_'+str(startT),'w') | |
logCollision = open('Collisions_a_'+str(alpha)+'_b_'+str(beta)+'_g_'+str(pouyou)+'_'+str(startT),'w') | |
for i in range(nbTrials): | |
rospy.loginfo('T = '+str(trialDuration[i])) | |
logDuration.write(str(i)+' '+str(trialDuration[i])+'\n') | |
logCollision.write(str(i) + ' ' + str(collisions[i]) + '\n') | |
logDuration.close() | |
logCollision.close() | |
#------------------------------------------- | |
if __name__ == '__main__': | |
nbch=int(sys.argv[1]) | |
gatingType=sys.argv[2] | |
v=Channel() | |
v.activated=False | |
v.speed_left=0 | |
v.speed_right=0 | |
channel=[v for i in range(nbch)] | |
try: | |
strategy_gating(nbch,gatingType) | |
except rospy.ROSInterruptException: pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment