Last active
January 2, 2025 09:46
-
-
Save cquest/c8573c34d8865198928c228c35079c4b to your computer and use it in GitHub Desktop.
Picture deduplication script for Panoramax
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
""" | |
Picture deduplication script for Panoramax | |
written by cquest, shared under WTFPL | |
""" | |
import os, sys, argparse, json, subprocess | |
from math import radians, cos, sin, asin, sqrt, degrees, atan2 | |
import requests, tomllib | |
def haversine(lon1, lat1, lon2, lat2): | |
""" | |
Calculate the great circle distance in meters between two points | |
on the earth (specified in decimal degrees) | |
""" | |
# convert decimal degrees to radians | |
lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2]) | |
# haversine formula | |
dlon = lon2 - lon1 | |
dlat = lat2 - lat1 | |
a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2 | |
c = 2 * asin(sqrt(a)) | |
r = 6371 # Radius of earth in kilometers. Use 3956 for miles. Determines return value units. | |
return c * r * 1000 | |
def bearing(lat1, lon1, lat2, lon2): | |
# Convert decimal degrees to radians | |
lat1_rad = radians(lat1) | |
lon1_rad = radians(lon1) | |
lat2_rad = radians(lat2) | |
lon2_rad = radians(lon2) | |
dlon = lon2_rad - lon1_rad | |
y = sin(dlon) * cos(lat2_rad) | |
x = cos(lat1_rad) * sin(lat2_rad) - \ | |
sin(lat1_rad) * cos(lat2_rad) * cos(dlon) | |
bearing_rad = atan2(y, x) | |
bearing_deg = degrees(bearing_rad) | |
# Normalize to [0, 360) degrees | |
bearing_deg = (bearing_deg + 360) % 360 | |
return bearing_deg | |
parser = argparse.ArgumentParser( | |
prog='dedup_seq', | |
description='Deduplicates sequences of geolocated pictures', | |
epilog='written by cquest, shared under WTFPL') | |
parser.add_argument('--distance', nargs=1, type=int, default=5, help='Minimal distance in meters between 2 pictures') | |
parser.add_argument('--action', nargs=1, type=str, choices=['hide','delete'], help='Action to apply') | |
parser.add_argument('--api-url', nargs=1, type=str, help='Set API to query, default from ~/.config/geovisio/config.toml') | |
parser.add_argument('--token', nargs=1, type=str, help='Set TOKEN to use for API auth, default from ~/.config/geovisio/config.toml') | |
parser.add_argument('--uuid', nargs=1, help='List of sequence UUIDs to analyze (comma separated)') | |
parser.add_argument('--input', nargs=1, help='Directory containing pictures to analyse') | |
parser.add_argument('--output', nargs=1, help='Directory to store hardlinked pictures to keep') | |
args = parser.parse_args() | |
try: | |
# read default instance / auth token from panoramax_cli config file | |
with open(os.path.expanduser("~/.config/geovisio/config.toml"), "rb") as f: | |
config = tomllib.load(f) | |
TOKEN=config['instances'][0]['jwt_token'] | |
except: | |
config = None | |
if args.uuid: | |
if args.api_url: | |
API = args.api_url | |
elif config: | |
API = config['instances'][0]['url'] | |
else: | |
raise AssertionError("missing --api-url") | |
if args.action: | |
if args.token: | |
TOKEN = args.token | |
elif config: | |
TOKEN = config['instances'][0]['jwt_token'] | |
else: | |
raise AssertionError("missing --token to apply action") | |
elif args.input is None: | |
raise AssertionError("missing --uuid or --input") | |
try: | |
DEDUP_DIST=args.distance[0] | |
except: | |
DEDUP_DIST=args.distance | |
pass | |
if args.output: | |
try: | |
os.mkdir(args.output[0]) | |
except: | |
pass | |
pictures = [] | |
nb_dups=0 | |
if args.uuid: | |
headers = {"Authorization": "Bearer %s" % TOKEN} if TOKEN else None | |
# query list of pictures from the sequences | |
for SEQ_ID in args.uuid[0].split(','): | |
seq = requests.get('%s/api/collections/%s/items' % (API, SEQ_ID), headers=headers) | |
items = json.loads(seq.text) | |
pictures = pictures + (items['features']) | |
print('Pictures to check:',len(pictures), file=sys.stderr) | |
for p1 in range(1, len(pictures)): | |
latlon1 = pictures[p1]['geometry']['coordinates'] | |
query = '%s/api/collections/%s/items/%s' % (API, pictures[p1]['collection'], pictures[p1]['id']) | |
for p2 in range(p1): | |
if pictures[p2]['geometry'] is None: | |
continue | |
latlon2 = pictures[p2]['geometry']['coordinates'] | |
dist = haversine(latlon1[0], latlon1[1], latlon2[0], latlon2[1]) | |
if dist < DEDUP_DIST: | |
pictures[p1]['geometry'] = None | |
nb_dups += 1 | |
if args.action: | |
if args.action[0] == 'delete': | |
resp = requests.delete(query, headers=headers) | |
elif args.action[0] == 'hide': | |
if pictures[p1]['properties']['geovisio:status']!='hidden': | |
resp = requests.patch(query, {'visible': 'false'}, headers=headers) | |
else: | |
raise AssertionError("unknown action") | |
break | |
if (args.action and pictures[p1]['geometry'] is not None | |
and args.action[0] == 'hide' | |
and pictures[p1]['properties']['geovisio:status']=='hidden'): | |
resp = requests.patch(query, {'visible': 'true'}, headers=headers) | |
elif args.input: | |
exiftool = subprocess.run('exiftool -r -j -c "%%+.7f" %s' % args.input[0], shell=True, capture_output=True) | |
pictures = json.loads(exiftool.stdout) | |
pics = sorted(pictures, key=lambda x:x['SourceFile']) | |
print('Pictures to check:',len(pictures), file=sys.stderr) | |
if args.output: | |
os.link(pics[0]['SourceFile'], args.output[0]+'/'+os.path.basename(pics[0]['SourceFile'])) | |
for p1 in range(1,len(pics)): | |
if not 'GPSLatitude' in pics[p1]: | |
continue | |
lat1,lon1 = (float(pics[p1]['GPSLatitude']), float(pics[p1]['GPSLongitude'])) | |
for p2 in reversed(range(p1)): | |
if not 'GPSLatitude' in pics[p2] or pics[p2]['GPSLatitude'] is None: | |
continue | |
lat2 = float(pics[p2]['GPSLatitude']) | |
lon2 = float(pics[p2]['GPSLongitude']) | |
dist = haversine(lat1, lon1, lat2, lon2) | |
if dist < DEDUP_DIST: | |
print("dupe", pics[p1]['SourceFile'], dist) | |
nb_dups += 1 | |
pics[p1]['GPSLatitude'] = None | |
break | |
if args.output and pics[p1]['GPSLatitude'] is not None: | |
os.link(pics[p1]['SourceFile'], args.output[0]+'/'+os.path.basename(pics[p1]['SourceFile'])) | |
if nb_dups>0: | |
print(nb_dups,'duplicates found', file=sys.stderr) | |
else: | |
print("no duplicate found", file=sys.stderr) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment