This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def insert_dead_reckon(locations: List[DeadReckon]) -> None: | |
db = get_eved_db() | |
lat = np.array([p.latitude for p in locations]) | |
lon = np.array([p.longitude for p in locations]) | |
t = [p.t for p in locations] | |
signal_ids = [p.signal_id for p in locations] | |
x = np.append([0.0], | |
vec_haversine(lat[:-1], lon[:-1], |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def main(): | |
traj_df = load_trajectories() | |
traj_ids = traj_df["traj_id"].to_numpy() | |
update_schema() | |
for traj_id in tqdm(traj_ids): | |
polyline_str = load_polyline(int(traj_id)) | |
if polyline_str is not None and len(polyline_str): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def compute_gps_segments(unique_df: pd.DataFrame, | |
polyline: np.ndarray) -> List[GpsSegment]: | |
traj = Trajectory(lat=unique_df["match_latitude"].to_numpy(), | |
lon=unique_df["match_longitude"].to_numpy(), | |
time=unique_df["time_stamp"].to_numpy() / 1000.0, | |
ids=unique_df["signal_id"].to_numpy()) | |
map_tree = traj.merge(map_lat=polyline[:, 0], # Latitude | |
map_lon=polyline[:, 1]) # Longitude |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def delta_location(lat: float, | |
lon: float, | |
bearing: float, | |
meters: float) -> Tuple[float, float]: | |
""" | |
Calculates a destination location from a starting location, a bearing and a | |
distance in meters. | |
:param lat: Start latitude | |
:param lon: Start longitude | |
:param bearing: Bearing (North is zero degrees, measured clockwise) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def decode_polyline(encoded: str, | |
order: str = "lonlat") -> List[List]: | |
""" | |
Code drawn from https://valhalla.github.io/valhalla/decoding/ | |
:param order: Coordinate order: 'lonlat' (default) or 'latlon' | |
:param encoded: String-encoded polyline | |
:return: Decoded polyline as a list of [lat, lon] coordinates | |
""" | |
inv = 1.0 / 1e6 | |
decoded = [] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@dataclass | |
class DeadReckon: | |
signal_id: int | |
t: float | |
latitude: float | |
longitude: float |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class GpsSegment(PointList): | |
def __init__(self, | |
point: RefPoint): | |
super().__init__() | |
self.append(point) | |
def __repr__(self): | |
return f"GpsSegment({self.points})" | |
def append_edge(self, edge: MapEdge) -> None: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def dead_reckon(gps_segment: GpsSegment, | |
distances: np.ndarray, | |
gps_ids: np.ndarray, | |
timestamps: np.ndarray) -> List[DeadReckon]: | |
locations: List[DeadReckon] = [ | |
DeadReckon(signal_id = int(gps_ids[0]), | |
latitude=gps_segment.points[0].lat, | |
longitude=gps_segment.points[0].lon, | |
t=float(timestamps[0])) | |
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import folium | |
from folium.raster_layers import TileLayer | |
html_map = folium.Map(prefer_canvas=True, | |
tiles="cartodbpositron", | |
location=(42.274569, -83.733228), | |
zoom_start=13) | |
tile_layer = TileLayer(tiles="http://127.0.0.1:8000/density/{x}/{y}/{z}", overlay=True, | |
attr="(C) JPF") | |
tile_layer.add_to(html_map) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
fn paint_tile(pixels: Vec<TilePixel>, | |
color_grad: Gradient, | |
range: LevelRange) -> RgbaImage { | |
let mut bitmap = make_rgba_tile(0, 0, 0, 0); | |
if range.width() > 0.0 { | |
for pixel in pixels { | |
let intensity = (pixel.get_c() - range.min()).ln() / range.width().ln(); | |
let rgba = color_grad.at(intensity); | |
let pix = Rgba::from(rgba.to_rgba8()); |
NewerOlder