Skip to content

Instantly share code, notes, and snippets.

@cmpute
Created May 16, 2021 03:18
Show Gist options
  • Save cmpute/60ca9eebd0ebf4ad017916d34a41f39b to your computer and use it in GitHub Desktop.
Save cmpute/60ca9eebd0ebf4ad017916d34a41f39b to your computer and use it in GitHub Desktop.
Load and parse INTERACTION dataset
import enum
from io import RawIOBase
import os.path as osp
import xml.etree.ElementTree as xml
from collections import namedtuple
from pathlib import Path
from matplotlib import pyplot as plt
from matplotlib import cm, colors as mcolors
from itertools import islice, filterfalse
import numpy as np
from numpy.core.numeric import count_nonzero, outer
from numpy.lib.arraysetops import isin
from numpy.ma.core import inner
from scipy.io import savemat, loadmat
from scipy.spatial.distance import cdist
from intervaltree import Interval, IntervalTree, node
# from shapely.geometry import Polygon, LineString
Point = namedtuple("Point", ['x', 'y'])
Way = namedtuple("Way", ['nodes', 'tags'])
Lanelet = namedtuple("Lanelet", ['left', 'right', 'regulations', 'tags'])
MultiPolygon = namedtuple("MultiPolygon", ['outer', 'tags'])
RegulatoryElement = namedtuple("RegulatoryElement", ['tags'])
roundabout_outer_ways = { # id of ways defining outer boundary of roundabouts
"DR_DEU_Roundabout_OF": [10024, 10075, 10038, 10103, 10044, 10023, 10083, 10019, 10105, 10110, 10005, 10085, 10007]
}
roundabout_inter_polygon = { # id of multipolygon defining inner boundary of roundabouts
"DR_DEU_Roundabout_OF": 40002
}
map_inward_lanelet = { # id of lanelet defining inward roads
"DR_DEU_Roundabout_OF": [
[30006, 30025, 30026, 30027, 30015, 30034],
[30031, 30033, 30039, 30043, 30000],
[30029, 30021, 30014, 30012, 30010, 30046, 30038]
]
}
map_outward_lanelet = { # id of lanelet defining outward roads
"DR_DEU_Roundabout_OF": [
[30003, 30009, 30011, 30013, 30020, 30028],
[30032, 30045, 30008, 30007, 30024, 30022],
[30019, 30044, 30041, 30035, 30037]
]
}
class LaneletMap:
def __init__(self, root_folder, map_id) -> None:
self.nodes = {}
self.ways = {}
self.relations = {}
self.parse(Path(root_folder, 'maps', map_id + '.osm_xy'))
self.map_id = map_id
self.root = root_folder
def parse(self, map_path):
'''
Load map info from osm_xy map
'''
assert map_path.suffix == ".osm_xy", "Only support map format in xy coordinate"
e = xml.parse(map_path).getroot()
for node in e.findall("node"):
point = Point(float(node.get('x')), float(node.get('y')))
self.nodes[int(node.get('id'))] = point
for way in e.findall('way'):
tags = {tag.get("k"): tag.get("v") for tag in way.findall("tag")}
nodes = [int(nd.get("ref")) for nd in way.findall("nd")]
self.ways[int(way.get('id'))] = Way(nodes, tags)
for relation in e.findall('relation'):
content = dict()
for member in relation.findall('member'):
role = member.get('role')
if role == 'left':
assert member.get('type') == 'way'
content['left'] = int(member.get('ref'))
elif role == 'right':
assert member.get('type') == 'way'
content['right'] = int(member.get('ref'))
elif role == 'regulatory_element':
assert member.get('type') == 'relation'
if 'regulations' not in content:
content['regulations'] = []
content['regulations'].append(int(member.get('ref')))
elif role == 'outer':
assert member.get('type') == 'way'
if 'outer' not in content:
content['outer'] = []
content['outer'].append(int(member.get('ref')))
content['tags'] = {tag.get("k"): tag.get("v") for tag in relation.findall("tag")}
rtype = content['tags'].pop('type')
rid = int(relation.get('id'))
if rtype == "lanelet":
self.relations[rid] = Lanelet(**content)
elif rtype == "regulatory_element":
self.relations[rid] = RegulatoryElement(**content)
elif rtype == "multipolygon":
self.relations[rid] = MultiPolygon(**content)
print("Loaded map with %d nodes, %d ways, %d relations" % (len(self.nodes), len(self.ways), len(self.relations)))
def _append_way_to_waypoints(self, points, wid):
if points:
if self.ways[wid].nodes[0] == points[-1]:
points.extend(self.ways[wid].nodes[1:])
elif self.ways[wid].nodes[-1] == points[-1]:
points.extend(self.ways[wid].nodes[-2::-1])
elif self.ways[wid].nodes[0] == points[0]:
points.reverse()
points.extend(self.ways[wid].nodes[1:])
elif self.ways[wid].nodes[-1] == points[0]:
points.reverse()
points.extend(self.ways[wid].nodes[-2::-1])
else:
raise ValueError("Need a adjacent way to append")
else:
points.extend(self.ways[wid].nodes)
def get_roundabout_area(self):
assert self.map_id in roundabout_outer_ways
def polygon_from_ways(ways):
points = []
for wid in ways:
self._append_way_to_waypoints(points, wid)
assert points[-1] == points[0]
points.pop()
return np.array([(self.nodes[nid].x, self.nodes[nid].y) for nid in points])
# fetch outline points
outer_poly = polygon_from_ways(roundabout_outer_ways[self.map_id])
inter_poly = polygon_from_ways(self.relations[roundabout_inter_polygon[self.map_id]].outer[::-1])
# other properties
center = (np.max(inter_poly, axis=0) + np.min(inter_poly, axis=0)) / 2
dim = np.mean((np.max(outer_poly, axis=0) - np.min(outer_poly, axis=0)) / 2)
return outer_poly, inter_poly, center, dim
def _get_roads(self, inward):
lanelet_list = map_inward_lanelet if inward else map_outward_lanelet
assert self.map_id in lanelet_list
result = []
for roads in lanelet_list[self.map_id]:
left = []
right = []
for lanelet in roads:
left.append(self.relations[lanelet].left)
right.append(self.relations[lanelet].right)
left_points = []
for way in left:
self._append_way_to_waypoints(left_points, way)
lbound = np.array([(self.nodes[nid].x, self.nodes[nid].y) for nid in left_points])
right_points = []
for way in right:
self._append_way_to_waypoints(right_points, way)
rbound = np.array([(self.nodes[nid].x, self.nodes[nid].y) for nid in right_points])
result.append((lbound, rbound))
return result
def get_inward_roads(self):
return self._get_roads(inward=True)
def get_outward_roads(self):
return self._get_roads(inward=False)
def interpolate_boundary(self, left, right, ratio=0.5, circular=False):
N = 2 * max(len(left), len(right))
if circular:
# close loop
if not np.allclose(left[0], left[-1]):
left = np.concatenate([left, left[[0],:]])
if not np.allclose(right[0], right[-1]):
right = np.concatenate([right, right[[0],:]])
N += 1 # overlap of the first and the last point
def interpolate_line(line, num):
d = line[1:,:] - line[:-1,:]
s = np.concatenate([[0], np.cumsum(np.hypot(d[:,0], d[:,1]))])
t = np.linspace(0, s[-1], num=num)
x = np.interp(t, s, line[:,0])
y = np.interp(t, s, line[:,1])
return np.array([x, y]).T
left_points = interpolate_line(left, N)
right_points = interpolate_line(right, N)
# align phase if circular
if circular:
# remove duplicate point
left_points = left_points[:-1,:]
right_points = right_points[:-1,:]
# calculate phase angle
left_center = (np.max(left_points, axis=0) + np.min(left_points, axis=0)) / 2
left_diff = left_points - left_center
left_angle = np.arctan2(left_diff[:,0], left_diff[:,1])
right_center = (np.max(right_points, axis=0) + np.min(right_points, axis=0)) / 2
right_diff = right_points - right_center
right_angle = np.arctan2(right_diff[:,0], right_diff[:,1])
# align direction to counter-clock wise
if np.sum(np.unwrap(np.diff(left_angle))) > 0:
left_points = left_points[::-1]
left_angle = left_angle[::-1]
if np.sum(np.unwrap(np.diff(right_angle))) > 0:
right_points = right_points[::-1]
right_angle = right_angle[::-1]
# find best match
imin = 0
dmin = np.linalg.norm(np.unwrap(left_angle - right_angle), ord=1)
for i in range(1, N):
left_aligned = np.concatenate([left_angle[i:], left_angle[:i]], axis=0)
dalign = np.linalg.norm(np.unwrap(left_aligned - right_angle), ord=1)
if dalign < dmin:
imin = i
dmin = dalign
left_points = np.concatenate([left_points[imin:], left_points[:imin]], axis=0)
interpolated = left_points * ratio + right_points * (1-ratio)
return interpolated
def load_tracks(self):
vehicle_tracks = {}
vehicle_dtype = [
('track_id', 'u4'),
('frame_id', 'u4'),
('timestamp_ms', 'u4'),
('agent_type', 'S3'),
('x', 'f4'), ('y', 'f4'),
('vx', 'f4'), ('vy', 'f4'),
('psi_rad', 'f4'),
('length', 'f4'), ('width', 'f4')
]
pedestrian_tracks = {}
pedestrian_dtype = vehicle_dtype[:-3]
for p in Path(self.root, "recorded_trackfiles", self.map_id).iterdir():
track_file_idx = int(p.stem[-3:])
if p.stem.startswith('vehicle'):
data = np.genfromtxt(p, dtype=vehicle_dtype, delimiter=",", skip_header=1)
else:
data = np.genfromtxt(p, dtype=pedestrian_dtype, delimiter=",", skip_header=1,
converters={0: lambda s: int(s[1:])})
assert np.max(data['frame_id']) < 10000
data['frame_id'] += track_file_idx * 10000
assert np.max(data['track_id']) < 10000
data['track_id'] += track_file_idx * 10000
if p.stem.startswith('vehicle'):
for vid in np.unique(data['track_id']):
vehicle_tracks[vid] = data[data['track_id'] == vid]
else:
for vid in np.unique(data['track_id']):
pedestrian_tracks[vid] = data[data['track_id'] == vid]
return vehicle_tracks, pedestrian_tracks
def generate_demonstrations(map_folder, map_id, N_prev=20, N_post=40):
'''
N_prev: Number of frames previous to the entry time of a vehicle
N_post: Number of frames next to the entry time of a vehicle
'''
# some parameters
min_angle = -0.8*np.pi
max_angle = 0.4*np.pi
# load map
map = LaneletMap(map_folder, map_id)
outer, inter, center, dim = map.get_roundabout_area()
vtracks, ptracks = map.load_tracks()
# create center lines and reference paths
inroads = [map.interpolate_boundary(left, right) for left, right in map.get_inward_roads()]
outroads = [map.interpolate_boundary(left, right) for left, right in map.get_outward_roads()]
ninroads = len(inroads)
rdcenter = map.interpolate_boundary(inter, outer, circular=True)
inpoint = [np.argmin(np.linalg.norm(rdcenter-road[-1], axis=1)) for road in inroads]
outpoint = [np.argmin(np.linalg.norm(rdcenter-road[0], axis=1)) for road in outroads]
reference_paths = {}
for i, iroad in enumerate(inroads):
for o, oroad in enumerate(outroads):
if inpoint[i] > outpoint[o]:
rdpart = np.concatenate([rdcenter[inpoint[i]:], rdcenter[:outpoint[o]+1]], axis=0)
else:
rdpart = rdcenter[inpoint[i]:outpoint[o]+1]
reference_paths[(i, o)] = np.concatenate([iroad, rdpart, oroad], axis=0)
dump_paths = {('r%d' % (k[0] * ninroads + k[1])): v for k, v in reference_paths.items()}
savemat("roads-%s.mat" % map_id, dump_paths)
inroad_indicator = np.concatenate([np.insert(iroad, 2, i, axis=1) for i, iroad in enumerate(inroads)], axis=0)
outroad_indicator = np.concatenate([np.insert(oroad, 2, o, axis=1) for o, oroad in enumerate(outroads)], axis=0)
# create timespan mapping
frame_tree = IntervalTree()
for vid, track in vtracks.items():
frame_tree[track['frame_id'][0]:track['frame_id'][-1]] = vid
# iterator over tracks
interactions = []
for vid, track in vtracks.items():
dx = track['x'] - center[0]
dy = track['y'] - center[1]
vangle = np.arctan2(dx, dy)
vd = np.hypot(dx, dy) - dim
# find starting and ending road TODO: some trajectory ends inside roundabout
dist_inroad = np.hypot(inroad_indicator[:, 0] - track['x'][0], inroad_indicator[:, 1] - track['y'][0])
inroad = inroad_indicator[np.argmin(dist_inroad), 2]
dist_outroad = np.hypot(outroad_indicator[:, 0] - track['x'][-1], outroad_indicator[:, 1] - track['y'][-1])
outroad = outroad_indicator[np.argmin(dist_outroad), 2]
# find entry to the roundabout
entry_points = np.argwhere((vd[:-1] > 0) & (vd[1:] < 0))
if len(entry_points) == 0:
continue
entry_pidx = entry_points[0][0]
entry_frame = track[entry_pidx]['frame_id']
# find interactions
for interval in frame_tree[track[entry_pidx]['frame_id']]:
vid2 = interval.data
if vid2 == vid: # skip self
continue
track2 = vtracks[vid2]
track2_t = track2[track2['frame_id'] == entry_frame]
dx = track2_t['x'] - center[0]
dy = track2_t['y'] - center[1]
if np.hypot(dx, dy) > dim: # skip the other vehicles outside roundabout
continue
dangle = np.unwrap(np.arctan2(dx, dy) - vangle[entry_pidx])
if dangle < min_angle or dangle > max_angle: # skip vehicles too far away
# continue
pass # disable this by now
# find starting and ending road
dist_inroad = np.hypot(inroad_indicator[:, 0] - track2['x'][0], inroad_indicator[:, 1] - track2['y'][0])
inroad2 = inroad_indicator[np.argmin(dist_inroad), 2]
dist_outroad = np.hypot(outroad_indicator[:, 0] - track2['x'][-1], outroad_indicator[:, 1] - track2['y'][-1])
outroad2 = outroad_indicator[np.argmin(dist_outroad), 2]
interactions.append((vid, vid2, entry_frame, inroad, outroad, inroad2, outroad2))
# dump data
result_dict = {}
for vid, vid2, entry_frame, inroad, outroad, inroad2, outroad2 in interactions:
cols = []
track1 = vtracks[vid]
track2 = vtracks[vid2]
start_frame = max(entry_frame - N_prev, track1['frame_id'][0], track2['frame_id'][0])
end_frame = min(entry_frame + N_post, track1['frame_id'][-1], track2['frame_id'][-1])
cols.append(list(range(start_frame, end_frame)))
track1 = track1[(start_frame <= track1['frame_id']) & (track1['frame_id'] < end_frame)]
cols.append(track1['x'])
cols.append(track1['y'])
cols.append(track1['vx'])
cols.append(track1['vy'])
cols.append(track1['psi_rad'])
track2 = track2[(start_frame <= track2['frame_id']) & (track2['frame_id'] < end_frame)]
cols.append(track2['x'])
cols.append(track2['y'])
cols.append(track2['vx'])
cols.append(track2['vy'])
cols.append(track2['psi_rad'])
path1 = inroad * ninroads + outroad
path2 = inroad2 * ninroads + outroad2
result_dict['t%d_%d_%d_%d_%d' % (entry_frame, vid, vid2, path1, path2)] = np.array(cols).T
print("Created %d demonstrations" % len(result_dict))
savemat("demos-%s.mat" % map_id, result_dict)
def visualize_demonstrations(map_folder, map_id, v1path=None, v2path=None, index=slice(None)):
data = loadmat("demos-%s.mat" % map_id)
roads = loadmat("roads-%s.mat" % map_id)
invalid_keys = [k for k in data if k.startswith('__')]
for k in invalid_keys:
data.pop(k)
map = LaneletMap(map_folder, map_id)
# draw roundabout
outer, inter, center, dim = map.get_roundabout_area()
fig, ax = plt.subplots(figsize=(10, 5))
outer = np.concatenate([outer, outer[[0],:]]) # close the loop
inter = np.concatenate([inter, inter[[0],:]])
ax.plot(outer[:,0], outer[:,1], 'k')
ax.plot(inter[:,0], inter[:,1], 'k')
ax.scatter([center[0]], [center[1]], c='k')
# ax.set_xlim([960, 1040])
# ax.set_ylim([970, 1030])
phi = np.linspace(-np.pi, np.pi)
ax.plot(center[0] + np.cos(phi)*dim, center[1] + np.sin(phi)*dim, '--', c='gray')
# draw in and out roads
for lbound, rbound in map.get_inward_roads():
ax.plot(lbound[:,0], lbound[:,1], 'k')
ax.plot(rbound[:,0], rbound[:,1], 'k')
for lbound, rbound in map.get_outward_roads():
ax.plot(lbound[:,0], lbound[:,1], 'k')
ax.plot(rbound[:,0], rbound[:,1], 'k')
# draw trajectories
index = slice(index, index+1) if isinstance(index, int) else index
demo_items = filterfalse(lambda t: t[0].startswith('__'), data.items())
if v1path is not None:
demo_items = filterfalse(lambda t: t[0].split('_')[3] != str(v1path), demo_items)
path_arr = roads['r%d' % int(v1path)]
ax.plot(path_arr[:,0], path_arr[:,1], 'k--')
if v2path is not None:
demo_items = filterfalse(lambda t: t[0].split('_')[4] != str(v2path), demo_items)
path_arr = roads['r%d' % int(v2path)]
ax.plot(path_arr[:,0], path_arr[:,1], 'k--')
for key, demo in islice(demo_items, index.start, index.stop, index.step):
h1 = ax.scatter(demo[:,1], demo[:,2], c=np.arange(len(demo)), cmap="winter", alpha=0.5, lw=0)
h2 = ax.scatter(demo[:,6], demo[:,7], c=np.arange(len(demo)), cmap="autumn", alpha=0.5, lw=0)
plt.colorbar(h1, label='Ego vehicle')
plt.colorbar(h2, label='Other vehicle', ticks=[])
plt.show()
if __name__ == "__main__":
# generate_demonstrations("D:/SVO/interaction-dataset-master", "DR_DEU_Roundabout_OF")
visualize_demonstrations("D:/SVO/interaction-dataset-master", "DR_DEU_Roundabout_OF", index=slice(500))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment