Created
March 13, 2023 00:07
-
-
Save CEZERT/014605ce644b6d388b99eb038c24d388 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
mport datetime as dt | |
import json | |
import os | |
import contextily as cx | |
import folium | |
import geopandas as gp | |
import mapclassify | |
import matplotlib.pyplot as plt | |
import pandas as pd | |
import shapely.geometry as sg | |
from matplotlib import cm | |
from matplotlib.lines import Line2D | |
json_folder = "Historique des positions/" | |
def extract_activity(record): | |
activity_type = record['activitySegment']['activityType'] | |
confidence = record['activitySegment']['confidence'] | |
start_time = dt.datetime.fromisoformat(record['activitySegment']['duration']['startTimestamp'].rstrip('Z')) | |
end_time = dt.datetime.fromisoformat(record['activitySegment']['duration']['endTimestamp'].rstrip('Z')) | |
distance = record['activitySegment']['distance'] | |
start_lat = record['activitySegment']['startLocation']['latitudeE7'] / 10 ** 7 | |
start_lon = record['activitySegment']['startLocation']['longitudeE7'] / 10 ** 7 | |
end_lat = record['activitySegment']['endLocation']['latitudeE7'] / 10 ** 7 | |
end_lon = record['activitySegment']['endLocation']['longitudeE7'] / 10 ** 7 | |
activities = record['activitySegment']['activities'] | |
return activity_type, confidence, start_time, end_time, distance, start_lat, start_lon, end_lat, end_lon | |
def extract_placevisite(record): | |
location_name = record['placeVisit']['location']['name'] if 'name' in record['placeVisit']['location'] else '' | |
start_time = dt.datetime.fromisoformat(record['placeVisit']['duration']['startTimestamp'].rstrip('Z')) | |
end_time = dt.datetime.fromisoformat(record['placeVisit']['duration']['endTimestamp'].rstrip('Z')) | |
start_lat = record['placeVisit']['location']['latitudeE7'] / 10 ** 7 if 'latitudeE7' in record['placeVisit'][ | |
'location'] else 0 | |
start_lon = record['placeVisit']['location']['longitudeE7'] / 10 ** 7 if 'longitudeE7' in record['placeVisit'][ | |
'location'] else 0 | |
address = record['placeVisit']['location']['address'] if 'address' in record['placeVisit']['location'] else '' | |
return location_name, address, start_time, end_time, start_lat, start_lon | |
def parse_json_activities(json_file): | |
with open(json_file, 'r') as f: | |
data = json.load(f) | |
records = data['timelineObjects'] | |
activities = [] | |
for record in records: | |
if 'activitySegment' in record: | |
activity = extract_activity(record) | |
activities.append(activity) | |
df_activity = pd.DataFrame(activities, columns=['activity_type', 'confidence', 'start_time', 'end_time', 'distance', | |
'start_lat', 'start_lon', 'end_lat', 'end_lon']) | |
return df_activity | |
def parse_json_visites(folder): | |
place_visites = [] | |
points = [] | |
for root, dirs, files in os.walk(folder): | |
for file in files: | |
if file.endswith('.json') and file[0] == '2' and file[4] == '_': | |
with open(os.path.join(root, file), 'r') as f: | |
data = json.load(f) | |
records = data['timelineObjects'] | |
for record in records: | |
if 'placeVisit' in record: | |
visites = extract_placevisite(record) | |
place_visites.append(visites) | |
dff = pd.DataFrame(place_visites, | |
columns=['location_name', 'address', 'start_time', 'end_time', 'start_lat', 'start_lon']) | |
points = [sg.Point(lon, lat) for lat, lon in zip(dff['start_lat'], dff['start_lon'])] | |
df = gp.GeoDataFrame(place_visites, | |
columns=['location_name', 'address', 'start_time', 'end_time', 'start_lat', 'start_lon'], | |
geometry=points, crs=4326) | |
return df | |
gdf = parse_json_visites(json_folder) | |
world = gp.read_file('./TM_WORLD_BORDERS-0.3.shp', allow_override=True).to_crs('EPSG:4326') | |
gdf.set_crs(epsg=4326, allow_override=True) | |
def compute_legend(df, ax, cmap, steps): | |
vals = [x / (steps - 1) for x in range(steps)] | |
custom_lines = [Line2D([0], [0], color=cmap(step), lw=4) for step in vals] | |
labels = [df.start_time.quantile(step).strftime("%Y-%m-%d") for step in vals] | |
ax.legend(custom_lines, labels, loc="lower right") | |
def seconds(timestamp): | |
return timestamp.to_pydatetime().timestamp() | |
def calculate_pal(df2, cmap): | |
min_ts = seconds(df2.start_time.min()) | |
max_ts = seconds(df2.start_time.max()) | |
return df2.start_time.apply(lambda ts: cmap((seconds(ts) - min_ts) / (max_ts - min_ts))).tolist() | |
def draw_map(df, box): | |
box_only = df[df.geometry.within(box)] | |
minx, miny, maxx, maxy = box.bounds | |
base = world.plot(color='white', edgecolor='silver', figsize=(16, 12)) | |
base.set_xlim(minx, maxx) | |
base.set_ylim(miny, maxy) | |
cmap = cm.get_cmap('viridis') | |
pal = calculate_pal(box_only, cmap) | |
ax = box_only.plot(ax=None, marker='o', color=pal, markersize=8) | |
cx.add_basemap(ax, source=cx.providers.OpenTopoMap, zoom=8) | |
compute_legend(box_only, ax, cmap, 5) | |
plt.show() | |
draw_map(gdf, sg.box(1.2, 43.75, 1.3, 43.80)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment