Last active
December 21, 2015 21:39
-
-
Save jhamrick/6369410 to your computer and use it in GitHub Desktop.
Convert "towers of blocks" CPO files to SSO files.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Convert 'tower of blocks' CPO files to SSO files.""" | |
# Standard | |
import os | |
import re | |
import colorsys | |
import sys | |
import pickle | |
# External | |
from path import path | |
import numpy as np | |
import pandac.PandaModules | |
from libpanda import Vec3, Vec4 | |
from optparse import OptionParser | |
# Project | |
from scenesim.objects.sso import SSO | |
from scenesim.objects.gso import GSO | |
from scenesim.objects.pso import RBSO | |
import cogphysics.scrap.nodetools as nt | |
def load_state(node): | |
"""Load state property dicts from NodePath `node`.""" | |
# strip away cpo### prefix from the NodePath name if it exists, | |
# because it is meaningless | |
name = node.getName() | |
matches = re.match(r"cpo[0-9]+-(block[0-9])", name) | |
if matches: | |
name = matches.group(1) | |
# standard scenesim object properties | |
sso_prop = { | |
'pos': node.getPos(), | |
'quat': node.getQuat(), | |
'scale': node.getScale(), | |
'name': name, | |
} | |
# graphics state object properties | |
model = node.getTag('model') | |
if model != '': | |
model = path(model).splitpath()[1] | |
gso_prop = { | |
'color': node.getColor(), | |
'model': model, | |
} | |
else: | |
gso_prop = None | |
# rigid body state object properties | |
pnode = node.getPythonTag('pnode') | |
if pnode is not None: | |
pstate = pnode.state | |
rbso_prop = { | |
'friction': pstate['friction'], | |
'restitution': pstate['elasticity'], | |
'shape': pstate['shape'].capitalize(), | |
'linear_velocity': Vec3(*pstate['linvel']), | |
'angular_velocity': Vec3(*pstate['angvel']), | |
'mass': pstate['mass'], | |
} | |
if not (pstate['force'] == np.zeros(3)).all(): | |
raise RuntimeError("force is: %s" % pstate['force']) | |
if not (pstate['damping'] == np.zeros(1)).all(): | |
raise RuntimeError("damping is: %s" % pstate['damping']) | |
else: | |
rbso_prop = None | |
return sso_prop, gso_prop, rbso_prop | |
def make_sso(oldnode, name=None): | |
"""Convert an old-type cpo node (i.e. that uses nodetools and OdeNode) | |
to a new scenesim object. | |
""" | |
# extract all the graphics/physics nodes, and their partial | |
# ordering, using the old nodetools library | |
nodes, porder = nt._get_nodetree(oldnode) | |
# set the name of the root node | |
if name: | |
nodes[porder.index(0)].setName(name) | |
ssos = [] | |
orders = [] | |
for i, node in enumerate(nodes): | |
# extract the property dictionaries | |
sso_prop, gso_prop, pso_prop = load_state(node) | |
# if there is physics and graphics, then we create a physics | |
# node and then a graphics node underneath it | |
if pso_prop and gso_prop: | |
props = sso_prop.copy() | |
props.update(pso_prop) | |
name = props['name'] | |
props['name'] = name + "-physics" | |
ssos.append(RBSO(props=props)) | |
orders.append(porder[i]) | |
props = gso_prop.copy() | |
props['name'] = name + "-graphics" | |
ssos.append(GSO(props=props)) | |
orders.append(len(orders)) | |
# there is only a graphics node | |
elif gso_prop: | |
props = sso_prop.copy() | |
props.update(gso_prop) | |
ssos.append(GSO(props=props)) | |
orders.append(porder[i]) | |
# there is only a physics node (but this should never be the | |
# case, I think) | |
elif pso_prop: | |
raise NotImplementedError | |
# no physics or graphics -- this is just a basic NodePath | |
else: | |
ssos.append(SSO(props=sso_prop)) | |
orders.append(porder[i]) | |
# construct the hierarchy of nodes | |
sso = SSO.build_tree(ssos, orders) | |
return sso | |
###################################################################### | |
# Functions for updating SSO properties | |
def _block_physics(pso): | |
"""Physics settings for tower blocks.""" | |
density = 170.0 | |
volume = np.prod(pso.getScale()) | |
mass = density * volume | |
pso.set_mass(mass) | |
pso.set_friction(0.8 ** 0.5) | |
pso.set_restitution(0) | |
def _floor(sso, rso): | |
"""Settings for round wooden floor SSO.""" | |
gsos = sso.descendants(type_=GSO) | |
assert len(gsos) == 1 | |
gso = gsos[0] | |
gso.set_model("wood_floor.egg") | |
gso.setColor((0.45, 0.3, 0.1, 1.0)) | |
sso.set_friction(0.8 ** 0.5) | |
sso.set_restitution(0) | |
sso.setScale((10, 10, 1)) | |
def _original(sso, rso): | |
"""Settings for original-type towers (random colors).""" | |
psos = sso.descendants(type_=RBSO) | |
gsos = sso.descendants(type_=GSO) | |
for pso, gso in zip(psos, gsos): | |
r, g, b = colorsys.hsv_to_rgb(rso.rand(), 1, 1) | |
color = Vec4(r, g, b, 1) | |
gso.setColor(color) | |
gso.set_model("block.egg") | |
_block_physics(pso) | |
def _mass_plastic_stone(sso, rso): | |
"""Settings for prediction mass towers (plastic/stone blocks).""" | |
lcscale = 0.15 | |
hcscale = 0.1 | |
clip = lambda x: np.clip(x, 0, 1) | |
psos = sso.descendants(type_=RBSO) | |
gsos = sso.descendants(type_=GSO) | |
bitstr = [int(x) for x in sso.getName().split("_")[-1]] | |
for gso, pso, blocktype in zip(gsos, psos, bitstr): | |
r = rso.randn()*lcscale/2.0 | |
if blocktype == 0: | |
color = Vec4( | |
clip(.3 + r), | |
clip(1. + rso.randn()*lcscale/2. + r), | |
clip(.65 + rso.randn()*lcscale/2. + r), | |
1) | |
elif blocktype == 1: | |
color = Vec4( | |
clip(.65 + rso.randn()*hcscale/2. + r), | |
clip(.65 + r), | |
clip(.65 + rso.randn()*hcscale/2. + r), | |
1) | |
gso.setColor(color) | |
gso.set_model("block.egg") | |
_block_physics(pso) | |
def _mass_red_yellow(sso, rso): | |
"""Settings for RY inference mass towers (red/yellow blocks).""" | |
psos = sso.descendants(type_=RBSO) | |
gsos = sso.descendants(type_=GSO) | |
bitstr = [int(x) for x in sso.getName().split("_")[-1]] | |
for gso, pso, blocktype in zip(gsos, psos, bitstr): | |
if blocktype == 0: | |
model = "red_block.egg" | |
color = Vec4(1, 0, 0, 1) | |
elif blocktype == 1: | |
model = "yellow_block.egg" | |
color = Vec4(1, 1, 0, 1) | |
gso.setColor(color) | |
gso.set_model(model) | |
_block_physics(pso) | |
def _mass_colors(sso, rso): | |
"""Settings for different colored inference mass towers.""" | |
psos = sso.descendants(type_=RBSO) | |
gsos = sso.descendants(type_=GSO) | |
bitstr = [int(x) for x in sso.getName().split("_")[-1]] | |
for gso, pso, blocktype in zip(gsos, psos, bitstr): | |
if blocktype == 0: | |
color = Vec4(1, 0, 0, 1) | |
elif blocktype == 1: | |
color = Vec4(1, 1, 0, 1) | |
gso.setColor(color) | |
gso.set_model("block.egg") | |
_block_physics(pso) | |
# Handlers corresponding to different sets of stimuli. All of these | |
# take arguments sso (scenesim object) and rso (random state object) | |
HANDLERS = { | |
'floors': _floor, | |
'stability-all': _original, | |
'stability-example-stable': _original, | |
'stability-example-stable-F': _original, | |
'stability-example-unstable': _original, | |
'stability-example-unstable-F': _original, | |
'stability-original': _original, | |
'stability-sameheight': _original, | |
'stability-unstable': _original, | |
'mass-oneshot-training-F': _original, | |
'mass-learning-training': _original, | |
'mass-all': _mass_colors, | |
'mass-oneshot-F': _mass_colors, | |
'mass-oneshot-example-F': _mass_colors, | |
'mass-example': _mass_red_yellow, | |
'mass-learning': _mass_red_yellow, | |
'mass-prediction-stability': _mass_plastic_stone, | |
'mass-prediction-direction': _mass_plastic_stone, | |
} | |
def convert(old_pth, new_pth, rso=None): | |
old_pth = path(old_pth) | |
new_pth = path(new_pth) | |
if rso is None: | |
rso = np.random | |
# find the set name from the path | |
setname = old_pth.splitpath()[1] | |
handler = HANDLERS[setname] | |
newset = os.path.join(new_pth, setname) | |
# create the new directory, if it doesn't alreadye exist | |
if not newset.exists(): | |
newset.makedirs() | |
# do something sensible with the name conversion table | |
tbl_pth = os.path.join(old_pth, "name_table.pkl") | |
name_table = {} | |
if tbl_pth.exists(): | |
with open(tbl_pth, "r") as fh: | |
tbl = dict(pickle.load(fh)) | |
for oldname, newname in tbl.iteritems(): | |
if oldname.startswith("stability"): | |
matches = re.match(r"stability([0-9]+)", oldname) | |
num = matches.group(1) | |
bitstr = "0"*10 | |
elif oldname.startswith("mass-tower"): | |
matches = re.match(r"mass-tower_([0-9]+)_([01]+)", oldname) | |
num = matches.group(1) | |
bitstr = matches.group(2) | |
name_table[newname] = "tower_%s_%s" % (num, bitstr) | |
# lookup the list of cpos to conver | |
cpo_pths = [x for x in old_pth.listdir() if x.endswith(".cpo")] | |
cpo_pths = [x for x in cpo_pths if x != "floor.cpo"] | |
for cpo_pth in cpo_pths: | |
# load the old node | |
with open(cpo_pth, 'r') as fh: | |
oldnode = nt.load_node(fh) | |
# convert it to an sso | |
name = cpo_pth.splitpath()[1] | |
name = name_table.get(name, name) | |
if name.endswith(".cpo"): | |
name = name[:-4] | |
sso = make_sso(oldnode, name) | |
# apply settings | |
if name == "floor": | |
_floor(sso, rso) | |
else: | |
handler(sso, rso) | |
# save it | |
sso_pth = os.path.join(newset, "%s.cpo" % name) | |
print cpo_pth | |
print "--> %s" % sso_pth | |
sso.save_tree(sso_pth) | |
if __name__ == "__main__": | |
usage = ("usage: %prog [options] old_path new_path") | |
parser = OptionParser(usage=usage) | |
parser.add_option( | |
"-s", "--seed", dest="seed", action="store", type="int", | |
default=12345, help="random seed", metavar="SEED") | |
(options, args) = parser.parse_args() | |
if len(args) != 2: | |
print "Arguments old_path and new_path not given!" | |
sys.exit(1) | |
print "Using random seed of %d" % options.seed | |
rso = np.random.RandomState(options.seed) | |
convert(old_pth=args[0], new_pth=args[1], rso=rso) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment