Last active
June 26, 2022 18:33
-
-
Save petered/da8ba1018a79e795119baaacc514da4d to your computer and use it in GitHub Desktop.
Syncs data from a DJI SD Card to your local file system, renaming files using modified time
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import shutil | |
import sys | |
from argparse import ArgumentParser | |
from datetime import datetime | |
from typing import Optional, Sequence | |
def modified_timestamp_to_filename(timestamp: float) -> str: | |
return datetime.fromtimestamp(timestamp).strftime('dji_%Y-%m-%d_%H-%M-%S') | |
def get_dest_filepath(src_path: str, src_root_dir: str, dest_root_dir: str) -> str: | |
src_root_dir = src_root_dir.rstrip(os.sep) + os.sep | |
assert src_path.startswith(src_root_dir), f"File {src_path} was not in root dir {src_root_dir}" | |
src_rel_folder, src_filename = os.path.split(src_path[len(src_root_dir):]) | |
src_name, ext = os.path.splitext(src_filename) | |
src_order_number = src_name.split('_', 1)[1] # 'DJI_0215' -> '0215' | |
timestamp = os.path.getmtime(src_path) | |
new_filename = f'{modified_timestamp_to_filename(timestamp)}_{src_order_number}{ext.lower()}' | |
return os.path.join(dest_root_dir, src_rel_folder, new_filename) | |
def iter_filepaths_in_directory_recursive(directory, allowed_extensions: Optional[Sequence[str]]): | |
allowed_extensions = tuple(e.lower() for e in allowed_extensions) | |
yield from (os.path.join(dp, f) for dp, dn, filenames in os.walk(directory) | |
for f in filenames if allowed_extensions is None or any(f.lower().endswith(e) for e in allowed_extensions)) | |
def copy_creating_dir_if_needed(src_path: str, dest_path: str): | |
parent, _ = os.path.split(dest_path) | |
if not os.path.exists(parent): | |
os.makedirs(parent) | |
shutil.copyfile(src_path, dest_path) | |
def get_recursive_directory_contents_string(directory: str, indent_level=0, indent=' ', max_entries: Optional[int] = None) -> str: | |
lines = [] | |
this_indent = indent * indent_level | |
for i, f in enumerate(os.listdir(directory)): | |
if max_entries is not None and i >= max_entries: | |
lines.append(this_indent + '...') | |
break | |
lines.append(this_indent + f) | |
fpath = os.path.join(directory, f) | |
if os.path.isdir(fpath): | |
lines.append(get_recursive_directory_contents_string(fpath, indent_level=indent_level + 1, max_entries=max_entries)) | |
return '\n'.join(lines) | |
def sync_data_from_dji_sd_card( | |
drone_sd_card_dcim_folder='/Volumes/Untitled/DCIM', # DCIM folder on SD card (this path worked for me on Mac) | |
media_subfolder_name='100MEDIA', # This subfolder should exist if you have any recorded data on drone | |
pano_subfolder_name='PANORAMA', # This subfolder may exist if you've taken panos | |
destination_folder='~/dji_data', # Place to put the data (does not need to exist yet) | |
extensions_to_copy=('.mp4', '.jpg'), # File types to copy (here to avoid copying html files, etc) | |
overwrite: bool = False, # Overwrite existing files on machine | |
check_byte_sizes=True, # Check that, for existing files, file-size matches source. If not, overwrite. | |
include_pano_source_images=True, # Copy panorama source images too (note - does not copy generated panos, as those only exist on phone) | |
verbose: bool = True # Prints a lot. | |
): | |
""" | |
Syncs data from DJI SD card to a local destination folder, renaming files using modified date (GMT). | |
Does not re-copy files that already exist at the destination (unless override is selected or file-size does not match) | |
This is useful because DJI's original file names are just sequence numbers like DJI_0214 - which will reset once card is reformatted. | |
Files and folders are renamed using the modified date and original sequence number on the drone. E.g. | |
/Volumes/Untitled/DCIM/100MEDIA/DJI_0214.JPG -> /Users/peter/dji_data/dji_2022-06-15_20-50-18_0214.jpg | |
/Volumes/Untitled/DCIM/100MEDIA/DJI_0215.MP4 -> /Users/peter/dji_data/dji_2022-06-16_12-36-51_0215.mp4 | |
^ Sequence Number ^ Date ^ Time ^ Sequence Number | |
Panorama source files (not - not the panos themselves, which only exist on the phone) will bave their folders renamed, e.g. | |
/Volumes/Untitled/DCIM/PANORAMA/100_0050/DJI_0015.JPG -> /Users/peter/dji_data/PANORAMA/dji_2022-04-09_10-03-28_0050/DJI_0015.JPG | |
/Volumes/Untitled/DCIM/PANORAMA/100_0050/DJI_0016.JPG -> /Users/peter/dji_data/PANORAMA/dji_2022-04-09_10-03-28_0050/DJI_0016.JPG | |
""" | |
assert os.path.isdir(drone_sd_card_dcim_folder), f"SDCard DCIM folder '{drone_sd_card_dcim_folder}' does not exist" | |
src_media_folder = os.path.join(drone_sd_card_dcim_folder, media_subfolder_name) | |
assert os.path.isdir(src_media_folder), f"Did not find folder {media_subfolder_name} in DCIM folder" | |
if verbose: | |
print(f'Data found in source DCIM folder {drone_sd_card_dcim_folder}:') | |
print(get_recursive_directory_contents_string(drone_sd_card_dcim_folder, max_entries=3, indent_level=1)) | |
destination_folder = os.path.expanduser(destination_folder) | |
# Find media files in DCIM/100MEDIA and give them new names using modified date | |
src_paths = [os.path.join(src_media_folder, f) for f in os.listdir(src_media_folder) if not f.startswith('.') and any(f.lower().endswith(e) for e in extensions_to_copy)] | |
src_path_to_new_path = {src_path: get_dest_filepath(src_path=src_path, src_root_dir=src_media_folder, dest_root_dir=destination_folder) | |
for src_path in src_paths} | |
# Find pano-source photos in DCIM/PANORAMA and give each folder a new name using modified date | |
if include_pano_source_images: | |
pano_folder_path = os.path.join(drone_sd_card_dcim_folder, pano_subfolder_name) | |
if os.path.exists(pano_folder_path): | |
pano_dirs = [ppath for f in os.listdir(pano_folder_path) if os.path.isdir(ppath := os.path.join(pano_folder_path, f))] | |
for src_pano_dir_path in pano_dirs: | |
pano_dir_order_number = src_pano_dir_path.split('_')[-1] | |
new_pano_dir_name = modified_timestamp_to_filename(os.path.getmtime(src_pano_dir_path)) + '_' + pano_dir_order_number | |
for filename in os.listdir(src_pano_dir_path): | |
src_path_to_new_path[os.path.join(src_pano_dir_path, filename)] = os.path.join(destination_folder, pano_subfolder_name, new_pano_dir_name, filename) | |
# Filter to only copy when destination file does not exist. TODO: Maybe check file size match here too | |
src_path_to_size = {src_path: os.path.getsize(src_path) for src_path in src_path_to_new_path} | |
src_to_dest_to_copy = {src: dest for src, dest in src_path_to_new_path.items() if | |
overwrite or not os.path.exists(dest) or (check_byte_sizes and src_path_to_size[src] != os.path.getsize(dest))} | |
# Get file size data and prompt user to confirm copy | |
size_to_be_copied = sum(src_path_to_size[src] for src in src_to_dest_to_copy) | |
if verbose: | |
print('Files to be copied: ') | |
print(' ' + '\n '.join(f'{i}: {src} -> {dest} ' for i, (src, dest) in enumerate(src_to_dest_to_copy.items()))) | |
response = input( | |
f"{len(src_to_dest_to_copy)}/{len(src_path_to_new_path)} files ({size_to_be_copied:,} bytes) in {src_media_folder} will be copied to {destination_folder}.\n Type 'copy' to copy >>") | |
# Do the actual copying. | |
if response.strip(' ') == 'copy': | |
print('Copying...') | |
data_copied = 0 | |
for i, src_path in enumerate(sorted(src_to_dest_to_copy), start=1): | |
dest_path = src_to_dest_to_copy[src_path] | |
print( | |
f'Copied {i}/{len(src_to_dest_to_copy)} files ({data_copied / size_to_be_copied:.1%} of data). Next: {src_path} -> {dest_path} ({src_path_to_size[src_path]:,} B)') | |
copy_creating_dir_if_needed(src_path, dest_path) | |
data_copied += src_path_to_size[src_path] | |
print('Done copying') | |
if verbose: | |
print(f'Dest folder {destination_folder} now contains:') | |
print(get_recursive_directory_contents_string(destination_folder, max_entries=3, indent_level=1)) | |
else: | |
print("You didn't type 'copy'") | |
if __name__ == '__main__': | |
# See docstring for sync_data_from_dji_sd_card function above | |
parser = ArgumentParser() | |
parser.add_argument('-s', '--src_dcim', help='DCIM folder from SD Card', default='/Volumes/Untitled/DCIM') | |
parser.add_argument('-d', '--dest', help='Destination folder to save data', default='~/dji_data') | |
parser.add_argument('-o', '--overwrite', help='Overwrite existing destination files from source', action='store_true') | |
args = parser.parse_args(sys.argv[1:]) | |
sync_data_from_dji_sd_card(drone_sd_card_dcim_folder=args.src_dcim, destination_folder=args.dest, overwrite=args.overwrite) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment