This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
async fn get_tile_file_name(x: i32, y: i32, z:i32) -> String { | |
let quad_key = Quadkey::tile_to_quadkey(x, y, z as usize); | |
let mut file_name = format!("./tiles/{}/{}.png", z, quad_key); | |
let path = Path::new(&file_name); | |
if !path.exists() { | |
let tile_quad_keys = get_tile_quad_keys(&quad_key).await; | |
if tile_quad_keys.is_empty() { | |
file_name = get_default_filename(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#[get("/density/<x>/<y>/<z>")] | |
async fn get_density_tile(x: i32, y: i32, z: i32) -> Result<NamedFile> { | |
let file_name = if !(1..=18).contains(&z) { | |
get_default_filename() | |
} else { | |
get_tile_file_name(x, y, z).await | |
}; | |
NamedFile::open(PathBuf::from(file_name)).await | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
fn make_rgba_tile(r: u8, g: u8, b: u8, a: u8) -> RgbaImage { | |
ImageBuffer::from_pixel(256, 256, Rgba([r, g, b, a])) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::result::Result; | |
use sqlx::{SqlitePool, FromRow}; | |
#[derive(FromRow, Debug, PartialEq, Clone)] | |
struct LevelRange { | |
level_num: i64, | |
level_min: f64, | |
level_max: f64, | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@classmethod | |
def _trim_array(cls, array: np.ndarray) -> np.ndarray: | |
if array.shape[0] > 2: | |
return array[1:-1] | |
else: | |
return array | |
def to_trajectory(self) -> Trajectory: | |
lat = np.concatenate([self._trim_array(segment.lat) for segment in self.segments]) | |
lon = np.concatenate([self._trim_array(segment.lon) for segment in self.segments]) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def segment_to_trajectory(segment: list[LatLon], | |
dt: float, | |
t0: float) -> Trajectory: | |
lat = np.array([point.lat for point in segment]) | |
lon = np.array([point.lon for point in segment]) | |
time = np.zeros_like(lat) | |
seg_lens = vec_haversine(lat[1:], lon[1:], lat[:-1], lon[:-1]) | |
avg_speed = np.sum(seg_lens) / dt | |
time[0] = t0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def merge_trajectory(trajectory: Trajectory, | |
map_lat: np.ndarray, | |
map_lon: np.ndarray) -> list[list[LatLon]]: | |
segments: list[list[LatLon]] = [] | |
j = 0 | |
for i in range(trajectory.lat.shape[0]-1): | |
pt0 = LatLon(float(trajectory.lat[i]), float(trajectory.lon[i])) | |
pt1 = LatLon(float(trajectory.lat[i+1]), float(trajectory.lon[i+1])) | |
seg_len = pt0.haversine(*pt1.to_tuple()) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class CompoundTrajectory: | |
def __init__(self, trajectory: Trajectory, | |
map_lat: np.ndarray, | |
map_lon: np.ndarray): | |
self.segments: list[Trajectory] = [] | |
t0 = 0 | |
merged = merge_trajectory(trajectory, map_lat, map_lon) | |
for i, segment in enumerate(merged): | |
dt = trajectory.dt[i] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def main(): | |
config = get_config(tile_extract='./valhalla/custom_files/valhalla_tiles.tar', | |
verbose=True) | |
trips = get_all_trips() | |
for traj_id, vehicle_id, trip_id in trips: | |
print(f"Vehicle {vehicle_id}, trip {trip_id}, trajectory: {traj_id}") | |
trip_df = get_trip_signals(vehicle_id, trip_id) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def evolve_path(self: PredictedPath, | |
h0: int, | |
probability: float): | |
path = PredictedPath(probability=self.probability * probability, | |
step=self.step+1, | |
size=self.size) | |
path.array = self.array.copy() | |
path.array[self.step] = h0 | |
return path |