Skip to content

Instantly share code, notes, and snippets.

@cquest
Last active January 2, 2025 09:46
Show Gist options
  • Save cquest/c8573c34d8865198928c228c35079c4b to your computer and use it in GitHub Desktop.
Save cquest/c8573c34d8865198928c228c35079c4b to your computer and use it in GitHub Desktop.
Picture deduplication script for Panoramax
#! /usr/bin/env python3
"""
Picture deduplication script for Panoramax
written by cquest, shared under WTFPL
"""
import os, sys, argparse, json, subprocess
from math import radians, cos, sin, asin, sqrt, degrees, atan2
import requests, tomllib
def haversine(lon1, lat1, lon2, lat2):
"""
Calculate the great circle distance in meters between two points
on the earth (specified in decimal degrees)
"""
# convert decimal degrees to radians
lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
# haversine formula
dlon = lon2 - lon1
dlat = lat2 - lat1
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
c = 2 * asin(sqrt(a))
r = 6371 # Radius of earth in kilometers. Use 3956 for miles. Determines return value units.
return c * r * 1000
def bearing(lat1, lon1, lat2, lon2):
# Convert decimal degrees to radians
lat1_rad = radians(lat1)
lon1_rad = radians(lon1)
lat2_rad = radians(lat2)
lon2_rad = radians(lon2)
dlon = lon2_rad - lon1_rad
y = sin(dlon) * cos(lat2_rad)
x = cos(lat1_rad) * sin(lat2_rad) - \
sin(lat1_rad) * cos(lat2_rad) * cos(dlon)
bearing_rad = atan2(y, x)
bearing_deg = degrees(bearing_rad)
# Normalize to [0, 360) degrees
bearing_deg = (bearing_deg + 360) % 360
return bearing_deg
parser = argparse.ArgumentParser(
prog='dedup_seq',
description='Deduplicates sequences of geolocated pictures',
epilog='written by cquest, shared under WTFPL')
parser.add_argument('--distance', nargs=1, type=int, default=5, help='Minimal distance in meters between 2 pictures')
parser.add_argument('--action', nargs=1, type=str, choices=['hide','delete'], help='Action to apply')
parser.add_argument('--api-url', nargs=1, type=str, help='Set API to query, default from ~/.config/geovisio/config.toml')
parser.add_argument('--token', nargs=1, type=str, help='Set TOKEN to use for API auth, default from ~/.config/geovisio/config.toml')
parser.add_argument('--uuid', nargs=1, help='List of sequence UUIDs to analyze (comma separated)')
parser.add_argument('--input', nargs=1, help='Directory containing pictures to analyse')
parser.add_argument('--output', nargs=1, help='Directory to store hardlinked pictures to keep')
args = parser.parse_args()
try:
# read default instance / auth token from panoramax_cli config file
with open(os.path.expanduser("~/.config/geovisio/config.toml"), "rb") as f:
config = tomllib.load(f)
TOKEN=config['instances'][0]['jwt_token']
except:
config = None
if args.uuid:
if args.api_url:
API = args.api_url
elif config:
API = config['instances'][0]['url']
else:
raise AssertionError("missing --api-url")
if args.action:
if args.token:
TOKEN = args.token
elif config:
TOKEN = config['instances'][0]['jwt_token']
else:
raise AssertionError("missing --token to apply action")
elif args.input is None:
raise AssertionError("missing --uuid or --input")
try:
DEDUP_DIST=args.distance[0]
except:
DEDUP_DIST=args.distance
pass
if args.output:
try:
os.mkdir(args.output[0])
except:
pass
pictures = []
nb_dups=0
if args.uuid:
headers = {"Authorization": "Bearer %s" % TOKEN} if TOKEN else None
# query list of pictures from the sequences
for SEQ_ID in args.uuid[0].split(','):
seq = requests.get('%s/api/collections/%s/items' % (API, SEQ_ID), headers=headers)
items = json.loads(seq.text)
pictures = pictures + (items['features'])
print('Pictures to check:',len(pictures), file=sys.stderr)
for p1 in range(1, len(pictures)):
latlon1 = pictures[p1]['geometry']['coordinates']
query = '%s/api/collections/%s/items/%s' % (API, pictures[p1]['collection'], pictures[p1]['id'])
for p2 in range(p1):
if pictures[p2]['geometry'] is None:
continue
latlon2 = pictures[p2]['geometry']['coordinates']
dist = haversine(latlon1[0], latlon1[1], latlon2[0], latlon2[1])
if dist < DEDUP_DIST:
pictures[p1]['geometry'] = None
nb_dups += 1
if args.action:
if args.action[0] == 'delete':
resp = requests.delete(query, headers=headers)
elif args.action[0] == 'hide':
if pictures[p1]['properties']['geovisio:status']!='hidden':
resp = requests.patch(query, {'visible': 'false'}, headers=headers)
else:
raise AssertionError("unknown action")
break
if (args.action and pictures[p1]['geometry'] is not None
and args.action[0] == 'hide'
and pictures[p1]['properties']['geovisio:status']=='hidden'):
resp = requests.patch(query, {'visible': 'true'}, headers=headers)
elif args.input:
exiftool = subprocess.run('exiftool -r -j -c "%%+.7f" %s' % args.input[0], shell=True, capture_output=True)
pictures = json.loads(exiftool.stdout)
pics = sorted(pictures, key=lambda x:x['SourceFile'])
print('Pictures to check:',len(pictures), file=sys.stderr)
if args.output:
os.link(pics[0]['SourceFile'], args.output[0]+'/'+os.path.basename(pics[0]['SourceFile']))
for p1 in range(1,len(pics)):
if not 'GPSLatitude' in pics[p1]:
continue
lat1,lon1 = (float(pics[p1]['GPSLatitude']), float(pics[p1]['GPSLongitude']))
for p2 in reversed(range(p1)):
if not 'GPSLatitude' in pics[p2] or pics[p2]['GPSLatitude'] is None:
continue
lat2 = float(pics[p2]['GPSLatitude'])
lon2 = float(pics[p2]['GPSLongitude'])
dist = haversine(lat1, lon1, lat2, lon2)
if dist < DEDUP_DIST:
print("dupe", pics[p1]['SourceFile'], dist)
nb_dups += 1
pics[p1]['GPSLatitude'] = None
break
if args.output and pics[p1]['GPSLatitude'] is not None:
os.link(pics[p1]['SourceFile'], args.output[0]+'/'+os.path.basename(pics[p1]['SourceFile']))
if nb_dups>0:
print(nb_dups,'duplicates found', file=sys.stderr)
else:
print("no duplicate found", file=sys.stderr)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment