This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| async fn get_tile_file_name(x: i32, y: i32, z:i32) -> String { | |
| let quad_key = Quadkey::tile_to_quadkey(x, y, z as usize); | |
| let mut file_name = format!("./tiles/{}/{}.png", z, quad_key); | |
| let path = Path::new(&file_name); | |
| if !path.exists() { | |
| let tile_quad_keys = get_tile_quad_keys(&quad_key).await; | |
| if tile_quad_keys.is_empty() { | |
| file_name = get_default_filename(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #[get("/density/<x>/<y>/<z>")] | |
| async fn get_density_tile(x: i32, y: i32, z: i32) -> Result<NamedFile> { | |
| let file_name = if !(1..=18).contains(&z) { | |
| get_default_filename() | |
| } else { | |
| get_tile_file_name(x, y, z).await | |
| }; | |
| NamedFile::open(PathBuf::from(file_name)).await | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| fn make_rgba_tile(r: u8, g: u8, b: u8, a: u8) -> RgbaImage { | |
| ImageBuffer::from_pixel(256, 256, Rgba([r, g, b, a])) | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| use std::result::Result; | |
| use sqlx::{SqlitePool, FromRow}; | |
| #[derive(FromRow, Debug, PartialEq, Clone)] | |
| struct LevelRange { | |
| level_num: i64, | |
| level_min: f64, | |
| level_max: f64, | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| @classmethod | |
| def _trim_array(cls, array: np.ndarray) -> np.ndarray: | |
| if array.shape[0] > 2: | |
| return array[1:-1] | |
| else: | |
| return array | |
| def to_trajectory(self) -> Trajectory: | |
| lat = np.concatenate([self._trim_array(segment.lat) for segment in self.segments]) | |
| lon = np.concatenate([self._trim_array(segment.lon) for segment in self.segments]) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def segment_to_trajectory(segment: list[LatLon], | |
| dt: float, | |
| t0: float) -> Trajectory: | |
| lat = np.array([point.lat for point in segment]) | |
| lon = np.array([point.lon for point in segment]) | |
| time = np.zeros_like(lat) | |
| seg_lens = vec_haversine(lat[1:], lon[1:], lat[:-1], lon[:-1]) | |
| avg_speed = np.sum(seg_lens) / dt | |
| time[0] = t0 |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def merge_trajectory(trajectory: Trajectory, | |
| map_lat: np.ndarray, | |
| map_lon: np.ndarray) -> list[list[LatLon]]: | |
| segments: list[list[LatLon]] = [] | |
| j = 0 | |
| for i in range(trajectory.lat.shape[0]-1): | |
| pt0 = LatLon(float(trajectory.lat[i]), float(trajectory.lon[i])) | |
| pt1 = LatLon(float(trajectory.lat[i+1]), float(trajectory.lon[i+1])) | |
| seg_len = pt0.haversine(*pt1.to_tuple()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class CompoundTrajectory: | |
| def __init__(self, trajectory: Trajectory, | |
| map_lat: np.ndarray, | |
| map_lon: np.ndarray): | |
| self.segments: list[Trajectory] = [] | |
| t0 = 0 | |
| merged = merge_trajectory(trajectory, map_lat, map_lon) | |
| for i, segment in enumerate(merged): | |
| dt = trajectory.dt[i] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def main(): | |
| config = get_config(tile_extract='./valhalla/custom_files/valhalla_tiles.tar', | |
| verbose=True) | |
| trips = get_all_trips() | |
| for traj_id, vehicle_id, trip_id in trips: | |
| print(f"Vehicle {vehicle_id}, trip {trip_id}, trajectory: {traj_id}") | |
| trip_df = get_trip_signals(vehicle_id, trip_id) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| def evolve_path(self: PredictedPath, | |
| h0: int, | |
| probability: float): | |
| path = PredictedPath(probability=self.probability * probability, | |
| step=self.step+1, | |
| size=self.size) | |
| path.array = self.array.copy() | |
| path.array[self.step] = h0 | |
| return path |