Created
May 16, 2021 03:18
-
-
Save cmpute/60ca9eebd0ebf4ad017916d34a41f39b to your computer and use it in GitHub Desktop.
Load and parse INTERACTION dataset
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import enum | |
from io import RawIOBase | |
import os.path as osp | |
import xml.etree.ElementTree as xml | |
from collections import namedtuple | |
from pathlib import Path | |
from matplotlib import pyplot as plt | |
from matplotlib import cm, colors as mcolors | |
from itertools import islice, filterfalse | |
import numpy as np | |
from numpy.core.numeric import count_nonzero, outer | |
from numpy.lib.arraysetops import isin | |
from numpy.ma.core import inner | |
from scipy.io import savemat, loadmat | |
from scipy.spatial.distance import cdist | |
from intervaltree import Interval, IntervalTree, node | |
# from shapely.geometry import Polygon, LineString | |
Point = namedtuple("Point", ['x', 'y']) | |
Way = namedtuple("Way", ['nodes', 'tags']) | |
Lanelet = namedtuple("Lanelet", ['left', 'right', 'regulations', 'tags']) | |
MultiPolygon = namedtuple("MultiPolygon", ['outer', 'tags']) | |
RegulatoryElement = namedtuple("RegulatoryElement", ['tags']) | |
roundabout_outer_ways = { # id of ways defining outer boundary of roundabouts | |
"DR_DEU_Roundabout_OF": [10024, 10075, 10038, 10103, 10044, 10023, 10083, 10019, 10105, 10110, 10005, 10085, 10007] | |
} | |
roundabout_inter_polygon = { # id of multipolygon defining inner boundary of roundabouts | |
"DR_DEU_Roundabout_OF": 40002 | |
} | |
map_inward_lanelet = { # id of lanelet defining inward roads | |
"DR_DEU_Roundabout_OF": [ | |
[30006, 30025, 30026, 30027, 30015, 30034], | |
[30031, 30033, 30039, 30043, 30000], | |
[30029, 30021, 30014, 30012, 30010, 30046, 30038] | |
] | |
} | |
map_outward_lanelet = { # id of lanelet defining outward roads | |
"DR_DEU_Roundabout_OF": [ | |
[30003, 30009, 30011, 30013, 30020, 30028], | |
[30032, 30045, 30008, 30007, 30024, 30022], | |
[30019, 30044, 30041, 30035, 30037] | |
] | |
} | |
class LaneletMap: | |
def __init__(self, root_folder, map_id) -> None: | |
self.nodes = {} | |
self.ways = {} | |
self.relations = {} | |
self.parse(Path(root_folder, 'maps', map_id + '.osm_xy')) | |
self.map_id = map_id | |
self.root = root_folder | |
def parse(self, map_path): | |
''' | |
Load map info from osm_xy map | |
''' | |
assert map_path.suffix == ".osm_xy", "Only support map format in xy coordinate" | |
e = xml.parse(map_path).getroot() | |
for node in e.findall("node"): | |
point = Point(float(node.get('x')), float(node.get('y'))) | |
self.nodes[int(node.get('id'))] = point | |
for way in e.findall('way'): | |
tags = {tag.get("k"): tag.get("v") for tag in way.findall("tag")} | |
nodes = [int(nd.get("ref")) for nd in way.findall("nd")] | |
self.ways[int(way.get('id'))] = Way(nodes, tags) | |
for relation in e.findall('relation'): | |
content = dict() | |
for member in relation.findall('member'): | |
role = member.get('role') | |
if role == 'left': | |
assert member.get('type') == 'way' | |
content['left'] = int(member.get('ref')) | |
elif role == 'right': | |
assert member.get('type') == 'way' | |
content['right'] = int(member.get('ref')) | |
elif role == 'regulatory_element': | |
assert member.get('type') == 'relation' | |
if 'regulations' not in content: | |
content['regulations'] = [] | |
content['regulations'].append(int(member.get('ref'))) | |
elif role == 'outer': | |
assert member.get('type') == 'way' | |
if 'outer' not in content: | |
content['outer'] = [] | |
content['outer'].append(int(member.get('ref'))) | |
content['tags'] = {tag.get("k"): tag.get("v") for tag in relation.findall("tag")} | |
rtype = content['tags'].pop('type') | |
rid = int(relation.get('id')) | |
if rtype == "lanelet": | |
self.relations[rid] = Lanelet(**content) | |
elif rtype == "regulatory_element": | |
self.relations[rid] = RegulatoryElement(**content) | |
elif rtype == "multipolygon": | |
self.relations[rid] = MultiPolygon(**content) | |
print("Loaded map with %d nodes, %d ways, %d relations" % (len(self.nodes), len(self.ways), len(self.relations))) | |
def _append_way_to_waypoints(self, points, wid): | |
if points: | |
if self.ways[wid].nodes[0] == points[-1]: | |
points.extend(self.ways[wid].nodes[1:]) | |
elif self.ways[wid].nodes[-1] == points[-1]: | |
points.extend(self.ways[wid].nodes[-2::-1]) | |
elif self.ways[wid].nodes[0] == points[0]: | |
points.reverse() | |
points.extend(self.ways[wid].nodes[1:]) | |
elif self.ways[wid].nodes[-1] == points[0]: | |
points.reverse() | |
points.extend(self.ways[wid].nodes[-2::-1]) | |
else: | |
raise ValueError("Need a adjacent way to append") | |
else: | |
points.extend(self.ways[wid].nodes) | |
def get_roundabout_area(self): | |
assert self.map_id in roundabout_outer_ways | |
def polygon_from_ways(ways): | |
points = [] | |
for wid in ways: | |
self._append_way_to_waypoints(points, wid) | |
assert points[-1] == points[0] | |
points.pop() | |
return np.array([(self.nodes[nid].x, self.nodes[nid].y) for nid in points]) | |
# fetch outline points | |
outer_poly = polygon_from_ways(roundabout_outer_ways[self.map_id]) | |
inter_poly = polygon_from_ways(self.relations[roundabout_inter_polygon[self.map_id]].outer[::-1]) | |
# other properties | |
center = (np.max(inter_poly, axis=0) + np.min(inter_poly, axis=0)) / 2 | |
dim = np.mean((np.max(outer_poly, axis=0) - np.min(outer_poly, axis=0)) / 2) | |
return outer_poly, inter_poly, center, dim | |
def _get_roads(self, inward): | |
lanelet_list = map_inward_lanelet if inward else map_outward_lanelet | |
assert self.map_id in lanelet_list | |
result = [] | |
for roads in lanelet_list[self.map_id]: | |
left = [] | |
right = [] | |
for lanelet in roads: | |
left.append(self.relations[lanelet].left) | |
right.append(self.relations[lanelet].right) | |
left_points = [] | |
for way in left: | |
self._append_way_to_waypoints(left_points, way) | |
lbound = np.array([(self.nodes[nid].x, self.nodes[nid].y) for nid in left_points]) | |
right_points = [] | |
for way in right: | |
self._append_way_to_waypoints(right_points, way) | |
rbound = np.array([(self.nodes[nid].x, self.nodes[nid].y) for nid in right_points]) | |
result.append((lbound, rbound)) | |
return result | |
def get_inward_roads(self): | |
return self._get_roads(inward=True) | |
def get_outward_roads(self): | |
return self._get_roads(inward=False) | |
def interpolate_boundary(self, left, right, ratio=0.5, circular=False): | |
N = 2 * max(len(left), len(right)) | |
if circular: | |
# close loop | |
if not np.allclose(left[0], left[-1]): | |
left = np.concatenate([left, left[[0],:]]) | |
if not np.allclose(right[0], right[-1]): | |
right = np.concatenate([right, right[[0],:]]) | |
N += 1 # overlap of the first and the last point | |
def interpolate_line(line, num): | |
d = line[1:,:] - line[:-1,:] | |
s = np.concatenate([[0], np.cumsum(np.hypot(d[:,0], d[:,1]))]) | |
t = np.linspace(0, s[-1], num=num) | |
x = np.interp(t, s, line[:,0]) | |
y = np.interp(t, s, line[:,1]) | |
return np.array([x, y]).T | |
left_points = interpolate_line(left, N) | |
right_points = interpolate_line(right, N) | |
# align phase if circular | |
if circular: | |
# remove duplicate point | |
left_points = left_points[:-1,:] | |
right_points = right_points[:-1,:] | |
# calculate phase angle | |
left_center = (np.max(left_points, axis=0) + np.min(left_points, axis=0)) / 2 | |
left_diff = left_points - left_center | |
left_angle = np.arctan2(left_diff[:,0], left_diff[:,1]) | |
right_center = (np.max(right_points, axis=0) + np.min(right_points, axis=0)) / 2 | |
right_diff = right_points - right_center | |
right_angle = np.arctan2(right_diff[:,0], right_diff[:,1]) | |
# align direction to counter-clock wise | |
if np.sum(np.unwrap(np.diff(left_angle))) > 0: | |
left_points = left_points[::-1] | |
left_angle = left_angle[::-1] | |
if np.sum(np.unwrap(np.diff(right_angle))) > 0: | |
right_points = right_points[::-1] | |
right_angle = right_angle[::-1] | |
# find best match | |
imin = 0 | |
dmin = np.linalg.norm(np.unwrap(left_angle - right_angle), ord=1) | |
for i in range(1, N): | |
left_aligned = np.concatenate([left_angle[i:], left_angle[:i]], axis=0) | |
dalign = np.linalg.norm(np.unwrap(left_aligned - right_angle), ord=1) | |
if dalign < dmin: | |
imin = i | |
dmin = dalign | |
left_points = np.concatenate([left_points[imin:], left_points[:imin]], axis=0) | |
interpolated = left_points * ratio + right_points * (1-ratio) | |
return interpolated | |
def load_tracks(self): | |
vehicle_tracks = {} | |
vehicle_dtype = [ | |
('track_id', 'u4'), | |
('frame_id', 'u4'), | |
('timestamp_ms', 'u4'), | |
('agent_type', 'S3'), | |
('x', 'f4'), ('y', 'f4'), | |
('vx', 'f4'), ('vy', 'f4'), | |
('psi_rad', 'f4'), | |
('length', 'f4'), ('width', 'f4') | |
] | |
pedestrian_tracks = {} | |
pedestrian_dtype = vehicle_dtype[:-3] | |
for p in Path(self.root, "recorded_trackfiles", self.map_id).iterdir(): | |
track_file_idx = int(p.stem[-3:]) | |
if p.stem.startswith('vehicle'): | |
data = np.genfromtxt(p, dtype=vehicle_dtype, delimiter=",", skip_header=1) | |
else: | |
data = np.genfromtxt(p, dtype=pedestrian_dtype, delimiter=",", skip_header=1, | |
converters={0: lambda s: int(s[1:])}) | |
assert np.max(data['frame_id']) < 10000 | |
data['frame_id'] += track_file_idx * 10000 | |
assert np.max(data['track_id']) < 10000 | |
data['track_id'] += track_file_idx * 10000 | |
if p.stem.startswith('vehicle'): | |
for vid in np.unique(data['track_id']): | |
vehicle_tracks[vid] = data[data['track_id'] == vid] | |
else: | |
for vid in np.unique(data['track_id']): | |
pedestrian_tracks[vid] = data[data['track_id'] == vid] | |
return vehicle_tracks, pedestrian_tracks | |
def generate_demonstrations(map_folder, map_id, N_prev=20, N_post=40): | |
''' | |
N_prev: Number of frames previous to the entry time of a vehicle | |
N_post: Number of frames next to the entry time of a vehicle | |
''' | |
# some parameters | |
min_angle = -0.8*np.pi | |
max_angle = 0.4*np.pi | |
# load map | |
map = LaneletMap(map_folder, map_id) | |
outer, inter, center, dim = map.get_roundabout_area() | |
vtracks, ptracks = map.load_tracks() | |
# create center lines and reference paths | |
inroads = [map.interpolate_boundary(left, right) for left, right in map.get_inward_roads()] | |
outroads = [map.interpolate_boundary(left, right) for left, right in map.get_outward_roads()] | |
ninroads = len(inroads) | |
rdcenter = map.interpolate_boundary(inter, outer, circular=True) | |
inpoint = [np.argmin(np.linalg.norm(rdcenter-road[-1], axis=1)) for road in inroads] | |
outpoint = [np.argmin(np.linalg.norm(rdcenter-road[0], axis=1)) for road in outroads] | |
reference_paths = {} | |
for i, iroad in enumerate(inroads): | |
for o, oroad in enumerate(outroads): | |
if inpoint[i] > outpoint[o]: | |
rdpart = np.concatenate([rdcenter[inpoint[i]:], rdcenter[:outpoint[o]+1]], axis=0) | |
else: | |
rdpart = rdcenter[inpoint[i]:outpoint[o]+1] | |
reference_paths[(i, o)] = np.concatenate([iroad, rdpart, oroad], axis=0) | |
dump_paths = {('r%d' % (k[0] * ninroads + k[1])): v for k, v in reference_paths.items()} | |
savemat("roads-%s.mat" % map_id, dump_paths) | |
inroad_indicator = np.concatenate([np.insert(iroad, 2, i, axis=1) for i, iroad in enumerate(inroads)], axis=0) | |
outroad_indicator = np.concatenate([np.insert(oroad, 2, o, axis=1) for o, oroad in enumerate(outroads)], axis=0) | |
# create timespan mapping | |
frame_tree = IntervalTree() | |
for vid, track in vtracks.items(): | |
frame_tree[track['frame_id'][0]:track['frame_id'][-1]] = vid | |
# iterator over tracks | |
interactions = [] | |
for vid, track in vtracks.items(): | |
dx = track['x'] - center[0] | |
dy = track['y'] - center[1] | |
vangle = np.arctan2(dx, dy) | |
vd = np.hypot(dx, dy) - dim | |
# find starting and ending road TODO: some trajectory ends inside roundabout | |
dist_inroad = np.hypot(inroad_indicator[:, 0] - track['x'][0], inroad_indicator[:, 1] - track['y'][0]) | |
inroad = inroad_indicator[np.argmin(dist_inroad), 2] | |
dist_outroad = np.hypot(outroad_indicator[:, 0] - track['x'][-1], outroad_indicator[:, 1] - track['y'][-1]) | |
outroad = outroad_indicator[np.argmin(dist_outroad), 2] | |
# find entry to the roundabout | |
entry_points = np.argwhere((vd[:-1] > 0) & (vd[1:] < 0)) | |
if len(entry_points) == 0: | |
continue | |
entry_pidx = entry_points[0][0] | |
entry_frame = track[entry_pidx]['frame_id'] | |
# find interactions | |
for interval in frame_tree[track[entry_pidx]['frame_id']]: | |
vid2 = interval.data | |
if vid2 == vid: # skip self | |
continue | |
track2 = vtracks[vid2] | |
track2_t = track2[track2['frame_id'] == entry_frame] | |
dx = track2_t['x'] - center[0] | |
dy = track2_t['y'] - center[1] | |
if np.hypot(dx, dy) > dim: # skip the other vehicles outside roundabout | |
continue | |
dangle = np.unwrap(np.arctan2(dx, dy) - vangle[entry_pidx]) | |
if dangle < min_angle or dangle > max_angle: # skip vehicles too far away | |
# continue | |
pass # disable this by now | |
# find starting and ending road | |
dist_inroad = np.hypot(inroad_indicator[:, 0] - track2['x'][0], inroad_indicator[:, 1] - track2['y'][0]) | |
inroad2 = inroad_indicator[np.argmin(dist_inroad), 2] | |
dist_outroad = np.hypot(outroad_indicator[:, 0] - track2['x'][-1], outroad_indicator[:, 1] - track2['y'][-1]) | |
outroad2 = outroad_indicator[np.argmin(dist_outroad), 2] | |
interactions.append((vid, vid2, entry_frame, inroad, outroad, inroad2, outroad2)) | |
# dump data | |
result_dict = {} | |
for vid, vid2, entry_frame, inroad, outroad, inroad2, outroad2 in interactions: | |
cols = [] | |
track1 = vtracks[vid] | |
track2 = vtracks[vid2] | |
start_frame = max(entry_frame - N_prev, track1['frame_id'][0], track2['frame_id'][0]) | |
end_frame = min(entry_frame + N_post, track1['frame_id'][-1], track2['frame_id'][-1]) | |
cols.append(list(range(start_frame, end_frame))) | |
track1 = track1[(start_frame <= track1['frame_id']) & (track1['frame_id'] < end_frame)] | |
cols.append(track1['x']) | |
cols.append(track1['y']) | |
cols.append(track1['vx']) | |
cols.append(track1['vy']) | |
cols.append(track1['psi_rad']) | |
track2 = track2[(start_frame <= track2['frame_id']) & (track2['frame_id'] < end_frame)] | |
cols.append(track2['x']) | |
cols.append(track2['y']) | |
cols.append(track2['vx']) | |
cols.append(track2['vy']) | |
cols.append(track2['psi_rad']) | |
path1 = inroad * ninroads + outroad | |
path2 = inroad2 * ninroads + outroad2 | |
result_dict['t%d_%d_%d_%d_%d' % (entry_frame, vid, vid2, path1, path2)] = np.array(cols).T | |
print("Created %d demonstrations" % len(result_dict)) | |
savemat("demos-%s.mat" % map_id, result_dict) | |
def visualize_demonstrations(map_folder, map_id, v1path=None, v2path=None, index=slice(None)): | |
data = loadmat("demos-%s.mat" % map_id) | |
roads = loadmat("roads-%s.mat" % map_id) | |
invalid_keys = [k for k in data if k.startswith('__')] | |
for k in invalid_keys: | |
data.pop(k) | |
map = LaneletMap(map_folder, map_id) | |
# draw roundabout | |
outer, inter, center, dim = map.get_roundabout_area() | |
fig, ax = plt.subplots(figsize=(10, 5)) | |
outer = np.concatenate([outer, outer[[0],:]]) # close the loop | |
inter = np.concatenate([inter, inter[[0],:]]) | |
ax.plot(outer[:,0], outer[:,1], 'k') | |
ax.plot(inter[:,0], inter[:,1], 'k') | |
ax.scatter([center[0]], [center[1]], c='k') | |
# ax.set_xlim([960, 1040]) | |
# ax.set_ylim([970, 1030]) | |
phi = np.linspace(-np.pi, np.pi) | |
ax.plot(center[0] + np.cos(phi)*dim, center[1] + np.sin(phi)*dim, '--', c='gray') | |
# draw in and out roads | |
for lbound, rbound in map.get_inward_roads(): | |
ax.plot(lbound[:,0], lbound[:,1], 'k') | |
ax.plot(rbound[:,0], rbound[:,1], 'k') | |
for lbound, rbound in map.get_outward_roads(): | |
ax.plot(lbound[:,0], lbound[:,1], 'k') | |
ax.plot(rbound[:,0], rbound[:,1], 'k') | |
# draw trajectories | |
index = slice(index, index+1) if isinstance(index, int) else index | |
demo_items = filterfalse(lambda t: t[0].startswith('__'), data.items()) | |
if v1path is not None: | |
demo_items = filterfalse(lambda t: t[0].split('_')[3] != str(v1path), demo_items) | |
path_arr = roads['r%d' % int(v1path)] | |
ax.plot(path_arr[:,0], path_arr[:,1], 'k--') | |
if v2path is not None: | |
demo_items = filterfalse(lambda t: t[0].split('_')[4] != str(v2path), demo_items) | |
path_arr = roads['r%d' % int(v2path)] | |
ax.plot(path_arr[:,0], path_arr[:,1], 'k--') | |
for key, demo in islice(demo_items, index.start, index.stop, index.step): | |
h1 = ax.scatter(demo[:,1], demo[:,2], c=np.arange(len(demo)), cmap="winter", alpha=0.5, lw=0) | |
h2 = ax.scatter(demo[:,6], demo[:,7], c=np.arange(len(demo)), cmap="autumn", alpha=0.5, lw=0) | |
plt.colorbar(h1, label='Ego vehicle') | |
plt.colorbar(h2, label='Other vehicle', ticks=[]) | |
plt.show() | |
if __name__ == "__main__": | |
# generate_demonstrations("D:/SVO/interaction-dataset-master", "DR_DEU_Roundabout_OF") | |
visualize_demonstrations("D:/SVO/interaction-dataset-master", "DR_DEU_Roundabout_OF", index=slice(500)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment