Last active
July 22, 2022 09:44
-
-
Save epijim/8018701 to your computer and use it in GitHub Desktop.
Take location data from google takeout, convert from json to kml and overlay on satellite photo
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#I've merged code from two different sources, originally published by github users snowdonjames and Scarygami | |
#Import JSON, spit out KML | |
#usage.. location_history_json_converter.py input output [-h] [-f {kml,json,csv,js,gpx,gpxtracks}] [-v] | |
from __future__ import division | |
import sys | |
import json | |
import math | |
from argparse import ArgumentParser | |
from datetime import datetime | |
def main(argv): | |
arg_parser = ArgumentParser() | |
arg_parser.add_argument("input", help="Input File (JSON)") | |
arg_parser.add_argument("output", help="Output File (will be overwritten!)") | |
arg_parser.add_argument("-f", "--format", choices=["kml", "json", "csv", "js", "gpx", "gpxtracks"], default="kml", help="Format of the output") | |
arg_parser.add_argument("-v", "--variable", default="locationJsonData", help="Variable name to be used for js output") | |
args = arg_parser.parse_args() | |
if args.input == args.output: | |
arg_parser.error("Input and output have to be different files") | |
return | |
try: | |
json_data = open(args.input).read() | |
except: | |
print("Error opening input file") | |
return | |
try: | |
data = json.loads(json_data) | |
except: | |
print("Error decoding json") | |
return | |
if "locations" in data and len(data["locations"]) > 0: | |
try: | |
f_out = open(args.output, "w") | |
except: | |
print("Error creating output file for writing") | |
return | |
items = data["locations"] | |
if args.format == "json" or args.format == "js": | |
if args.format == "js": | |
f_out.write("window.%s = " % args.variable) | |
f_out.write("{\n") | |
f_out.write(" \"data\": {\n") | |
f_out.write(" \"items\": [\n") | |
first = True | |
for item in items: | |
if first: | |
first = False | |
else: | |
f_out.write(",\n") | |
f_out.write(" {\n") | |
f_out.write(" \"timestampMs\": %s,\n" % item["timestampMs"]) | |
f_out.write(" \"latitude\": %s,\n" % (item["latitudeE7"] / 10000000)) | |
f_out.write(" \"longitude\": %s\n" % (item["longitudeE7"] / 10000000)) | |
f_out.write(" }") | |
f_out.write("\n ]\n") | |
f_out.write(" }\n}") | |
if args.format == "js": | |
f_out.write(";") | |
if args.format == "csv": | |
f_out.write("Time,Location\n") | |
for item in items: | |
f_out.write(datetime.fromtimestamp(int(item["timestampMs"]) / 1000).strftime("%Y-%m-%d %H:%M:%S")) | |
f_out.write(",") | |
f_out.write("%s %s\n" % (item["latitudeE7"] / 10000000, item["longitudeE7"] / 10000000)) | |
if args.format == "kml": | |
f_out.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n") | |
f_out.write("<kml xmlns=\"http://www.opengis.net/kml/2.2\">\n") | |
f_out.write(" <Document>\n") | |
f_out.write(" <name>Location History</name>\n") | |
for item in items: | |
f_out.write(" <Placemark>\n") | |
# Order of these tags is important to make valid KML: TimeStamp, ExtendedData, then Point | |
f_out.write(" <TimeStamp><when>") | |
f_out.write(datetime.fromtimestamp(int(item["timestampMs"]) / 1000).strftime("%Y-%m-%dT%H:%M:%SZ")) | |
f_out.write("</when></TimeStamp>\n") | |
if "accuracy" in item or "speed" in item or "altitude" in item: | |
f_out.write(" <ExtendedData>\n") | |
if "accuracy" in item: | |
f_out.write(" <Data name=\"accuracy\">\n") | |
f_out.write(" <value>%d</value>\n" % item["accuracy"]) | |
f_out.write(" </Data>\n") | |
if "speed" in item: | |
f_out.write(" <Data name=\"speed\">\n") | |
f_out.write(" <value>%d</value>\n" % item["speed"]) | |
f_out.write(" </Data>\n") | |
if "altitude" in item: | |
f_out.write(" <Data name=\"altitude\">\n") | |
f_out.write(" <value>%d</value>\n" % item["altitude"]) | |
f_out.write(" </Data>\n") | |
f_out.write(" </ExtendedData>\n") | |
f_out.write(" <Point><coordinates>%s,%s</coordinates></Point>\n" % (item["longitudeE7"] / 10000000, item["latitudeE7"] / 10000000)) | |
f_out.write(" </Placemark>\n") | |
f_out.write(" </Document>\n</kml>\n") | |
if args.format == "gpx" or args.format == "gpxtracks": | |
f_out.write("<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n") | |
f_out.write("<gpx xmlns=\"http://www.topografix.com/GPX/1/1\" version=\"1.1\" creator=\"Google Latitude JSON Converter\" xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xsi:schemaLocation=\"http://www.topografix.com/GPX/1/1 http://www.topografix.com/GPX/1/1/gpx.xsd\">\n") | |
f_out.write(" <metadata>\n") | |
f_out.write(" <name>Location History</name>\n") | |
f_out.write(" </metadata>\n") | |
if args.format == "gpx": | |
for item in items: | |
f_out.write(" <wpt lat=\"%s\" lon=\"%s\">\n" % (item["latitudeE7"] / 10000000, item["longitudeE7"] / 10000000)) | |
if "altitude" in item: | |
f_out.write(" <ele>%d</ele>\n" % item["altitude"]) | |
f_out.write(" <time>%s</time>\n" % str(datetime.fromtimestamp(int(item["timestampMs"]) / 1000).strftime("%Y-%m-%dT%H:%M:%SZ"))) | |
f_out.write(" <desc>%s" % datetime.fromtimestamp(int(item["timestampMs"]) / 1000).strftime("%Y-%m-%d %H:%M:%S")) | |
if "accuracy" in item or "speed" in item: | |
f_out.write(" (") | |
if "accuracy" in item: | |
f_out.write("Accuracy: %d" % item["accuracy"]) | |
if "accuracy" in item and "speed" in item: | |
f_out.write(", ") | |
if "speed" in item: | |
f_out.write("Speed:%d" % item["speed"]) | |
f_out.write(")") | |
f_out.write("</desc>\n") | |
f_out.write(" </wpt>\n") | |
if args.format == "gpxtracks": | |
f_out.write(" <trk>\n") | |
f_out.write(" <trkseg>\n") | |
lastloc = None | |
# The deltas below assume input is in reverse chronological order. If it's not, uncomment this: | |
# items = sorted(data["data"]["items"], key=lambda x: x['timestampMs'], reverse=True) | |
for item in items: | |
if lastloc: | |
timedelta = -((int(item['timestampMs']) - int(lastloc['timestampMs'])) / 1000 / 60) | |
distancedelta = getDistanceFromLatLonInKm(item['latitudeE7'] / 10000000, item['longitudeE7'] / 10000000, lastloc['latitudeE7'] / 10000000, lastloc['longitudeE7'] / 10000000) | |
if timedelta > 10 or distancedelta > 40: | |
# No points for 10 minutes or 40km in under 10m? Start a new track. | |
f_out.write(" </trkseg>\n") | |
f_out.write(" </trk>\n") | |
f_out.write(" <trk>\n") | |
f_out.write(" <trkseg>\n") | |
f_out.write(" <trkpt lat=\"%s\" lon=\"%s\">\n" % (item["latitudeE7"] / 10000000, item["longitudeE7"] / 10000000)) | |
if "altitude" in item: | |
f_out.write(" <ele>%d</ele>\n" % item["altitude"]) | |
f_out.write(" <time>%s</time>\n" % str(datetime.fromtimestamp(int(item["timestampMs"]) / 1000).strftime("%Y-%m-%dT%H:%M:%SZ"))) | |
if "accuracy" in item or "speed" in item: | |
f_out.write(" <desc>\n") | |
if "accuracy" in item: | |
f_out.write(" Accuracy: %d\n" % item["accuracy"]) | |
if "speed" in item: | |
f_out.write(" Speed:%d\n" % item["speed"]) | |
f_out.write(" </desc>\n") | |
f_out.write(" </trkpt>\n") | |
lastloc = item | |
f_out.write(" </trkseg>\n") | |
f_out.write(" </trk>\n") | |
f_out.write("</gpx>\n") | |
f_out.close() | |
else: | |
print("No data found in json") | |
return | |
# Haversine formula | |
def getDistanceFromLatLonInKm(lat1,lon1,lat2,lon2): | |
R = 6371 # Radius of the earth in km | |
dlat = deg2rad(lat2-lat1) | |
dlon = deg2rad(lon2-lon1) | |
a = math.sin(dlat/2) * math.sin(dlat/2) + \ | |
math.cos(deg2rad(lat1)) * math.cos(deg2rad(lat2)) * \ | |
math.sin(dlon/2) * math.sin(dlon/2) | |
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1-a)) | |
d = R * c # Distance in km | |
return d | |
def deg2rad(deg): | |
return deg * (math.pi/180) | |
if __name__ == "__main__": | |
sys.exit(main(sys.argv)) | |
# Load KML, picture and ImageData.csv and print picture | |
# Need a csv with the following. Nudge is for correcting lat and long. | |
# IMAGE.jpg,TOPOFBOXLAT, BOTTOMBOXLAT, LEFTBOXLONG, RIGHTBOXLONG,NUDGELAT,NUDGELONG | |
import os, time, math | |
from datetime import datetime | |
from time import mktime | |
import xml.etree.ElementTree as ET | |
from PIL import Image, ImageDraw | |
def GetKmlFiles(): | |
"""Locates and reads local .kml files, returns a list of kml dictionary data""" | |
KmlData = [] | |
for dirname, dirnames, filenames in os.walk('.'): | |
for filename in filenames: | |
sp = filename.split('.') | |
if sp[len(sp)-1]== "kml": #locate kml files | |
print "Reading kml file " + filename | |
KmlData.append(ReadKmlFile(dirname, filename)) | |
return KmlData | |
def ReadKmlFile(dirname, filename): | |
"""Parses a single kml file, returns a dict of format {time: [lat, long]}""" | |
KmlData = {} | |
kmltime = datetime.time | |
latlist = [] | |
longlist = [] | |
timelist = [] | |
cnt =0 | |
f = open(filename) | |
line = f.readline() | |
while line: | |
if 'when' in line: | |
timelist.append(time.strptime(ET.fromstring(line)[0].text,"%Y-%m-%dT%H:%M:%SZ")) | |
if 'coordinates' in line: | |
latlist.append(float(ET.fromstring(line)[0].text.split(',')[0])) | |
longlist.append(float(ET.fromstring(line)[0].text.split(',')[1])) | |
cnt+=1 | |
if cnt % 5000 ==0: | |
print "Parsing " + filename + ": points found: " + str(cnt) | |
line = f.readline() | |
f.close() | |
return [latlist, longlist, timelist] | |
def DrawMapData(KmlData,InputImage, OutputImage, itop, ibottom, ileft, iright,xnudge,ynudge): | |
"""Draws kml line data on top of the specified image""" | |
im = Image.open(InputImage) | |
draw = ImageDraw.Draw(im) | |
cnt =0 | |
for KmlD in KmlData: | |
for d in range(len(KmlD[0])-1): | |
#Get points x and y coordinates and draw line | |
x1=(LongToX(KmlD[0][d],ileft,iright,im.size[0]))+xnudge | |
y1=(LatToY(KmlD[1][d],itop,ibottom,im.size[1]))+ynudge | |
x2=(LongToX(KmlD[0][d+1],ileft,iright,im.size[0]))+xnudge | |
y2=(LatToY(KmlD[1][d+1],itop,ibottom,im.size[1]))+ynudge | |
if(EuclidDistance(x1,y1,x2,y2) < 10000): | |
#setting this around 80 works okay. Attempts to remove some noise | |
draw.line((x1,y1, x2,y2), fill=255) | |
cnt+=1 | |
if cnt % 10000 ==0: | |
print "Drawing point number " + str(cnt) | |
im.save(OutputImage) | |
def LongToX(InputLong, LeftLong, RightLong, ImWidth): | |
"""Converts a longitude value in to an x coordinate""" | |
return ScalingFunc(InputLong+360, LeftLong+360, RightLong+360, ImWidth); | |
def LatToY(InputLat, TopLat, BottomLat, ImHeight): | |
"""Converts a latitude value in to a y coordinate""" | |
return ScalingFunc(InputLat+360, TopLat+360, BottomLat+360, ImHeight); | |
def EuclidDistance(x1, y1, x2, y2): | |
"""Calculates the euclidean distance between two points""" | |
return math.sqrt((x1 - x2)**2+(y1 - y2)**2) | |
def ScalingFunc(inputv, minv, maxv, size): | |
"""Helps convert latitudes and longitudes to x and y""" | |
if((float(maxv) -float(minv)) ==0): | |
return 0 | |
return ((((float(inputv) - float(minv)) / (float(maxv) -float(minv))) * float(size))); | |
def ParseImageFile(): | |
"""Reads SatelliteImageData.csv containing: | |
<File name of image to draw data on>, | |
<image top latitude>, | |
<image bottom lattitude>, | |
<image left longitude>, | |
<image right longitude>, | |
(optional) <x value nudge>, | |
(optional) <y value nudge>""" | |
with open('ImageData.csv', 'r') as f: | |
read_data = f.read().split(',') | |
while 5 <= len(read_data) < 7: | |
read_data.append(0) | |
ReturnData = [0]*7 | |
ReturnData[0]=read_data[0] | |
for i in range(1,7): | |
ReturnData[i] = float(read_data[i]) | |
return ReturnData | |
if __name__ == "__main__": | |
ImageData = ParseImageFile() | |
DrawMapData(GetKmlFiles(),ImageData[0], "LatitudeData.png", ImageData[1], ImageData[2], ImageData[3], ImageData[4],ImageData[5],ImageData[6]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment