Created
May 13, 2015 23:27
-
-
Save kazimuth/a326f8e66ad9032a93b1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import math | |
import lib601.util as util | |
sonarMax = 1.5 | |
class DynamicRobotMaze: | |
ROBOT_DIAMETER = 0.44 | |
ROBOT_RADIUS = ROBOT_DIAMETER / 2 | |
INITIAL_BELIEF = .3 | |
FILLED_THRESHOLD = .9 | |
CORRECT_LIKELIHOOD = .8 | |
INCORRECT_LIKELIHOOD = 1 - CORRECT_LIKELIHOOD | |
def __init__(self, height, width, x0, y0, x1, y1): | |
self.width = width | |
self.height = height | |
self.x0,self.x1 = x0,x1 | |
self.y0,self.y1 = y0,y1 | |
# Probability of being filled | |
self.grid = [[DynamicRobotMaze.INITIAL_BELIEF | |
for c in xrange(width)] | |
for r in xrange(height)] | |
self.dirty = False | |
self.dangerCells = [] | |
self.occupiedCell = None | |
# Compute cells to check for intersection | |
cellW = (self.x1-self.x0)/self.width | |
cellH = (self.y1-self.y0)/self.height | |
xIntRadius = int(math.ceil(DynamicRobotMaze.ROBOT_RADIUS / cellW)) | |
yIntRadius = int(math.ceil(DynamicRobotMaze.ROBOT_RADIUS / cellH)) | |
origin = util.Point(0,0) | |
for x in range(-xIntRadius, xIntRadius+1): | |
for y in range(-yIntRadius, yIntRadius+1): | |
if util.Point(x*cellW,y*cellH).distance(origin) <= DynamicRobotMaze.ROBOT_RADIUS: | |
self.dangerCells.append((x,y)) | |
print "danger cells:", self.dangerCells | |
def pointToIndices(self, point): | |
ix = int(math.floor((point.x-self.x0)*self.width/(self.x1-self.x0))) | |
iix = min(max(0,ix),self.width-1) | |
iy = int(math.floor((point.y-self.y0)*self.height/(self.y1-self.y0))) | |
iiy = min(max(0,iy),self.height-1) | |
return ((self.height-1-iiy,iix)) | |
def indicesToPoint(self, (r,c)): | |
x = self.x0 + (c+0.5)*(self.x1-self.x0)/self.width | |
y = self.y0 + (self.height-r-0.5)*(self.y1-self.y0)/self.height | |
return util.Point(x,y) | |
def isClear(self,(r,c)): | |
if not (0 <= r < self.height and 0 <= c < self.width): | |
return False | |
return self.grid[r][c] < DynamicRobotMaze.FILLED_THRESHOLD | |
def isPassable(self,(r,c)): | |
if (r,c) == self.occupiedCell: | |
return True | |
for (dr, dc) in self.dangerCells: | |
if not self.isClear((r+dr, c+dc)): | |
return False | |
return True | |
def occupy(self, point): | |
self.occupiedCell = self.pointToIndices(point) | |
""" | |
(r,c) = self.pointToIndices(point) | |
for (dr, dc) in self.dangerCells: | |
self.sonarPass((r+dr, c+dc)) | |
""" | |
def probOccupied(self,(r,c)): | |
if not (0 <= r < self.height and 0 <= c < self.width): | |
return 1 | |
return self.grid[r][c] | |
def sonarHit(self,(r,c)): | |
# Bayesian update | |
wasPassable = self.isPassable((r,c)) | |
pObsGF = DynamicRobotMaze.CORRECT_LIKELIHOOD * self.grid[r][c] | |
pObsGE = DynamicRobotMaze.INCORRECT_LIKELIHOOD * (1-self.grid[r][c]) | |
self.grid[r][c] = pObsGF / (pObsGF + pObsGE) | |
if self.isPassable((r,c)) != wasPassable: | |
self.dirty = True | |
def sonarPass(self,(r,c)): | |
# Bayesian update | |
wasPassable = self.isPassable((r,c)) | |
pObsGF = DynamicRobotMaze.CORRECT_LIKELIHOOD * self.grid[r][c] | |
pObsGE = DynamicRobotMaze.INCORRECT_LIKELIHOOD * (1-self.grid[r][c]) | |
self.grid[r][c] = pObsGE / (pObsGF + pObsGE) | |
if self.isPassable((r,c)) != wasPassable: | |
self.dirty = True | |
def isModified(self): | |
return self.dirty | |
def resetModified(self): | |
self.dirty = False |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import noiseModel | |
import maze | |
reload(maze) | |
import search | |
reload(search) | |
import checkoff | |
reload(checkoff) | |
from mazeGraphics import * | |
import lib601.util as util | |
import lib601.sonarDist as sonarDist | |
import lib601.markov as markov | |
from soar.io import io | |
import soar.outputs.simulator as sim | |
import time | |
import math | |
import random | |
###### SETUP | |
NOISE_ON = False | |
sl13World = [0.15, util.Point(7.0, 1.0), (-0.5, 8.5, -0.5, 6.5)] | |
bigFrustrationWorld = [0.2, util.Point(7.0, 1.0), (-0.5, 8.5, -0.5, 8.5)] | |
frustrationWorld = [0.15, util.Point(3.5, 0.5), (-0.5, 5.5, -0.5, 5.5)] | |
raceWorld = [0.1, util.Point(2.0, 5.5), (-0.5, 5.5, -0.5, 8.5)] | |
bigPlanWorld = [0.25, util.Point(3.0, 1.0), (-0.5, 10.5, -0.5, 10.5)] | |
realRobotWorld = [0.1, util.Point(1.5,0.0), (-2.0, 6.0, -2.0, 6.0)] | |
robotRaceWorld = [0.1, util.Point(3.0,0.0), (-2.0, 6.0, -2.0, 6.0)] | |
#THE_WORLD = sl13World | |
THE_WORLD = robotRaceWorld | |
(gridSquareSize, goalPoint, (xMin, xMax, yMin, yMax)) = THE_WORLD | |
# Movement stuff | |
DISTANCE_THRESHOLD = .15 | |
ANGLE_THRESHOLD = math.pi / 16 | |
ANGLE_GAIN = 1 | |
DIST_GAIN = .5 | |
ALIGNED_THRESHOLD = math.pi / 16 | |
def float_approx_eq(a, b, threshold): | |
return abs(a-b) < threshold | |
# graphics options | |
show_heatmap = False | |
show_passable = True | |
keep_old_windows = False | |
def fixAnglePlusMinusPi(theta): | |
return (theta + math.pi) % (2*math.pi) - math.pi | |
###### SOAR CONTROL | |
# this function is called when the brain is (re)loaded | |
def setup(): | |
#initialize robot's internal map | |
width = int((xMax-xMin)/gridSquareSize) | |
height = int((yMax-yMin)/gridSquareSize) | |
robot.map = maze.DynamicRobotMaze(height,width,xMin,yMin,xMax,yMax) | |
robot.goalIndices = robot.map.pointToIndices(goalPoint) | |
sim.SONAR_VARIANCE = (lambda mean: 0.001) if NOISE_ON else (lambda mean: 0) #sonars are accurate to about 1 mm | |
robot.plan = None | |
robot.successors = search.makeMazeSuccessors(robot.map) | |
#initialize graphics | |
robot.windows = [] | |
if show_heatmap: | |
robot.windows.append(HeatMapWindow(robot.map)) | |
robot.windows.append(PathWindow(robot.map, show_passable)) | |
for w in robot.windows: | |
w.redrawWorld() | |
w.render() | |
# this function is called when the start button is pushed | |
def brainStart(): | |
robot.done = False | |
robot.count = 0 | |
#checkoff.getData(globals()) | |
# this function is called 10 times per second | |
def step(): | |
global inp | |
robot.count += 1 | |
inp = io.SensorInput() #io.SensorInput(cheat=True) | |
# discretize sonar readings | |
# each element in discreteSonars is a tuple (d, cells) | |
# d is the distance measured by the sonar | |
# cells is a list of grid cells (r,c) between the sonar and the point d meters away | |
discreteSonars = [] | |
for (sonarPose, distance) in zip(sonarDist.sonarPoses,inp.sonars): | |
if distance > 1.5: | |
continue | |
if NOISE_ON: | |
distance = noiseModel.noisify(distance, gridSquareSize) | |
sensorIndices = robot.map.pointToIndices(inp.odometry.transformPose(sonarPose).point()) | |
hitIndices = robot.map.pointToIndices(sonarDist.sonarHit(distance, sonarPose, inp.odometry)) | |
ray = util.lineIndices(sensorIndices, hitIndices) | |
discreteSonars.append((distance, ray)) | |
# figure out where robot is | |
currentPoint = inp.odometry.point() | |
startCell = robot.map.pointToIndices(currentPoint) | |
# if necessary, make new plan | |
needsReplan = robot.plan is None | |
if not needsReplan: | |
for index in robot.plan: | |
if not robot.map.isPassable(index): | |
needsReplan = True | |
break | |
if needsReplan: | |
print 'REPLANNING' | |
robot.map.resetModified() | |
robot.plan = search.ucSearch(robot.successors, | |
robot.map.pointToIndices(currentPoint), | |
lambda x: x == robot.goalIndices, | |
lambda x: robot.map.indicesToPoint(x).distance(goalPoint)) | |
if robot.plan == None: | |
print "I'm LOST AND ALONE" | |
io.setRotational(5) | |
robot.map.dirty = True | |
return | |
#raise Exception("No possible plan!") | |
robot.pointPlan = map(robot.map.indicesToPoint, robot.plan) | |
robot.targetIndex = 0 | |
# graphics (draw robot's plan, robot's location, goalPoint) | |
# do not change this block | |
for w in robot.windows: | |
w.redrawWorld() | |
robot.windows[-1].markCells(robot.plan,'blue') | |
robot.windows[-1].markCell(robot.plan[robot.targetIndex], 'purple') | |
robot.windows[-1].markCell(robot.map.pointToIndices(currentPoint),'gold') | |
robot.windows[-1].markCell(robot.map.pointToIndices(goalPoint),'green') | |
# update map | |
for (d,cells) in discreteSonars: | |
robot.map.sonarHit(cells[-1]) | |
for cell in cells[:-1]: | |
robot.map.sonarPass(cell) | |
robot.map.occupy(currentPoint) | |
# if we are within 0.1m of the goal point, we are done! | |
if currentPoint.distance(goalPoint) <= 0.1: | |
io.Action(fvel=0,rvel=0).execute() | |
raise Exception('YAY!') | |
#raise Exception('Goal Reached!\n\n%s' % checkoff.generate_code(globals())) | |
# Move around | |
dest = robot.pointPlan[robot.targetIndex] | |
distance = currentPoint.distance(dest) | |
if distance < DISTANCE_THRESHOLD: | |
print "Hit point!" | |
robot.targetIndex += 1 | |
for _ in xrange(robot.targetIndex, len(robot.plan)-1): | |
prevToCur = robot.pointPlan[robot.targetIndex-1].angleTo(robot.pointPlan[robot.targetIndex]) | |
curToNext = robot.pointPlan[robot.targetIndex].angleTo(robot.pointPlan[robot.targetIndex+1]) | |
if float_approx_eq(prevToCur, curToNext, ALIGNED_THRESHOLD): | |
robot.targetIndex += 1 | |
print "Straight line, skipping ahead" | |
else: | |
break | |
if robot.targetIndex >= len(robot.pointPlan): | |
# We straightlined past the end of the plan | |
robot.targetIndex -= 1 | |
dest = robot.pointPlan[robot.targetIndex] | |
distance = currentPoint.distance(dest) | |
currentAngle = inp.odometry.theta | |
angleTo = currentPoint.angleTo(dest) | |
angleDiff = fixAnglePlusMinusPi(angleTo - currentAngle) | |
if abs(angleDiff) < ANGLE_THRESHOLD: | |
# We're close enough angle-wise to start moving forward | |
io.setForward(DIST_GAIN * distance) | |
else: | |
# We need to turn, no forward yet | |
io.setForward(0) | |
io.setRotational(angleDiff * ANGLE_GAIN) | |
# render the drawing windows | |
# do not change this block | |
for w in robot.windows: | |
w.render() | |
# called when the stop button is pushed | |
def brainStop(): | |
stopTime = time.time() | |
print 'Total steps:', robot.count | |
#print 'Elapsed time in seconds:', stopTime - robot.startTime | |
def shutdown(): | |
if not keep_old_windows: | |
for w in robot.windows: | |
w.window.destroy() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment