Skip to content

Instantly share code, notes, and snippets.

@oscarcs
Last active May 27, 2019 04:56
Show Gist options
  • Save oscarcs/3cd970c7654d67cd1436e82021af1804 to your computer and use it in GitHub Desktop.
Save oscarcs/3cd970c7654d67cd1436e82021af1804 to your computer and use it in GitHub Desktop.
from __future__ import print_function
from builtins import range
import itertools
import MalmoPython
import json
import logging
import math
import os
import random
import sys
import time
import re
import malmoutils
import cs765_utils
from cs765_utils import cuboid,entity
from drawing import SensoryVisualization
import numpy as np
malmoutils.fix_print()
import matplotlib.pyplot as plt
it = 0
class Agent(object):
newid = itertools.count()
def __init__(self):
self.host = MalmoPython.AgentHost()
malmoutils.parse_command_line(self.host)
self.index = next(Agent.newid) # first agent is 0, 2nd is 1...
self.queued_commands = [] # this commands are sent the next time that act() is called
def connect(self, client_pool):
self.client_pool = client_pool
self.client_pool.add( MalmoPython.ClientInfo('127.0.0.1',10000+self.index) )
def safeStartMission(self, mission, client_pool):
self.recording_spec = MalmoPython.MissionRecordSpec()
cs765_utils.safeStartMission(self.host, mission, client_pool, self.recording_spec, self.index, '' )
def is_mission_running(self):
return self.host.peekWorldState().is_mission_running
def pixels_as_arrays(self, pixels):
"""Takes pixel data and returns four arrays, for
RED, GREEN, BLUE, and DEPTH of each pixel. Values
are returned as floats between 0 and 1.
"""
arr = np.array(pixels,dtype=np.uint8).reshape(240,320,4)
r = arr[:,:,0].astype(np.float32)/255.0
g = arr[:,:,1].astype(np.float32)/255.0
b = arr[:,:,2].astype(np.float32)/255.0
d = arr[:,:,3].astype(np.float32)/255.0
return r,g,b,d
def act(self):
if self.index==0:
global it
it += 1
## when you override this method in a subclass, make sure to
## call the parent's class as I did below with `super().act()`
## so that queued commands are sent.
for command in self.queued_commands:
self.host.sendCommand(command)
self.queued_commands.clear()
sv = SensoryVisualization()
class SunWatcher(Agent):
def __init__(self):
super().__init__()
self.yaw = 0.0 # your current facing (yaw)
self.pitch = 0.0 # your current facing (pitch)
self.currentState = "orienting"
self.targetPitch = -36
self.pitchTolerance = 2
self.pitchChange = 0 # Track whether the object is rising/setting
def angle_to_time(self,angle,part_of_day='unspecified'):
""" Takes the angle of the celestial object in the sky
(at horizon = 0, at the top of the sky = 90), and estimates
the time of day.
Without knowing if the object is the sun or moon, and without
knowing if it is rising or setting our estimate can only be so
accurate.
If the part_of_day argument is specified, then
the estimate can be more accurate. For this argument a numerical
value is given that specifies the following.
0: between 0 and 6000 in minecraft time (dawn--midday)
1: between 6000 and 12000 in minecraft time (midday--dusk)
2: between 12000 and 18000 in minecraft time (dusk--midnight)
3: between 18000 and 24000 in minecraft time (midnight--dawn)
"""
estimate = 6000.0*(-self.pitch / 90.0)
if part_of_day == 'unspecified' :
print('Time (%f) is estimated to be one of the following:' %(self.correct_time))
print('\t %f' %(estimate))
print('\t %f' %(12000-estimate))
print('\t %f' %(12000+estimate))
print('\t %f' %(12000+(12000-estimate)))
else :
print('Time (%f) is estimated to be:' %(self.correct_time))
print('\t %f %s' %(estimate,'<---' if part_of_day==0 else ''))
print('\t %f %s' %(12000-estimate,'<---' if part_of_day==1 else ''))
print('\t %f %s' %(12000+estimate,'<---' if part_of_day==2 else ''))
print('\t %f %s' %(12000+(12000-estimate),'<---' if part_of_day==3 else ''))
def sense(self, pixels):
"""Simulate sensor output using an input image."""
width = len(pixels[0])
height = len(pixels)
maxP = pixels[0][0]
maxX = 0
maxY = 0
for y in range(height):
for x in range(width):
i = pixels[y][x]
if i > maxP:
maxP = i
maxX = x
maxY = y
if maxP < 0.8:
x_offset
else:
x_offset = (maxX - (width / 2)) / (width / 2)
y_offset = (maxY - (height / 2)) / (height / 2)
return x_offset * 0.2, y_offset * 0.2
def calc_velocities(self, left, right):
forwardVelocity = max(min(left, right), 0.04) # Don't let it stop completely.
angularVelocity = (left - right) * forwardVelocity
return angularVelocity, forwardVelocity
def act(self):
super().act()
world_state = self.host.getWorldState()
## gets current positional data and stores it in yaw and pitch
if len(world_state.observations) > 0 :
obvsText = world_state.observations[-1].text
data = json.loads(obvsText)
self.yaw = data.get(u'Yaw', 0)
self.yaw += 360.0*2
self.yaw = self.yaw % 360.0
# force yaw to lie between 0 and 360.
self.pitch = data.get(u'Pitch', 0)
self.correct_time = data.get(u'WorldTime',0)
self.correct_time = self.correct_time % 24000.0
# force correct time to lie between 0 and 24000.
if len(world_state.video_frames) >= 1:
pixels = world_state.video_frames[-1].pixels
red, green, blue, depth = self.pixels_as_arrays(pixels)
senses = (np.array(red) + np.array(depth)) / 2
sv.display_data(senses)
# sv.display_data(blue)
## NOTE: Most of your code will go here.
# print(self.pitch, self.yaw, self.currentState)
# 'forward'
# self.host.sendCommand('pitch -0.1')
if self.currentState == "orienting":
if self.pitch > (self.targetPitch + self.pitchTolerance):
self.host.sendCommand('pitch -0.1')
elif self.pitch < (self.targetPitch - self.pitchTolerance):
self.host.sendCommand('pitch 0.1')
else:
self.host.sendCommand('pitch 0')
self.currentState = "searching"
elif self.currentState == "searching":
left, right = self.sense(senses)
# x, y = self.calc_velocities(left, right)
y = left * 0.2
x = right * 0.2
# print('pitch ' + str(round(x, 3)), 'yaw ' + str(round(y, 3)))
if self.pitch > 0:
self.host.sendCommand('pitch -0.1')
else:
self.host.sendCommand('pitch ' + str(x))
self.host.sendCommand('turn ' + str(y))
########################################
#### INITIALISE TWO AGENTS
## the order here corresponds to the order of the AgentSections
agents = [SunWatcher()]
drawing_decorators = ''
drawing_decorators += cuboid(-5,10,-5,5,25,5, 'sand')
drawing_decorators += cuboid(-5,10,-5,-5,25,-5, 'water')
drawing_decorators += cuboid(-5,10,5,-5,25,5, 'water')
drawing_decorators += cuboid(5,10,5,5,25,5, 'water')
drawing_decorators += cuboid(5,10,-5,5,25,-5, 'water')
subst = {
'DrawingDecorators' : drawing_decorators,
'StartTime' : str(np.random.randint(24000)),
}
mission_xml = cs765_utils.load_mission_xml('./desert_island.xml',subst)
print(mission_xml)
my_mission = MalmoPython.MissionSpec(mission_xml, True)
client_pool = MalmoPython.ClientPool()
for agent in agents:
agent.connect(client_pool)
for agent in agents:
agent.safeStartMission(my_mission, client_pool)
cs765_utils.safeWaitForStart([agent.host for agent in agents])# agent_a.host, agent_b.host ])
## Main loop
while any([agent.is_mission_running() for agent in agents]):
time.sleep(0.1)
for agent in agents:
agent.act()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment