Skip to content

Instantly share code, notes, and snippets.

@lukestanley
Created July 17, 2023 16:24
Show Gist options
  • Save lukestanley/9a15c5fef5681aed2ac8cc8818e545c7 to your computer and use it in GitHub Desktop.
Save lukestanley/9a15c5fef5681aed2ac8cc8818e545c7 to your computer and use it in GitHub Desktop.
An Android Termux Python script to move local images and videos to a Nextcloud instance. It deletes the local files to save space. Made after frustration with Nextcloud Android app.
import os
from webdav3.client import Client
from tenacity import retry, stop_after_attempt, wait_fixed
import time
BATCH_SIZE = 100
uploaded_files = []
# get password from environment variable
webdav_password = os.getenv('webdav_password')
assert webdav_password
assert len(webdav_password) > 1
# Configure the client
client_options = {
'webdav_hostname': 'https://example.com',
'webdav_root': '/remote.php/dav/',
'webdav_login': 'user',
'webdav_password': webdav_password
}
client = Client(client_options)
# Define your local and remote directories:
local_directory = "/data/data/com.termux/files/home/storage/dcim/Camera/"
remote_directory = 'files/user/InstantUpload/Camera/'
def yield_files_sorted_by_time(local_directory):
list_of_files = os.listdir(local_directory)
full_paths = [os.path.join(local_directory, i) for i in list_of_files]
sorted_files = sorted(full_paths, key=lambda x: os.path.getmtime(x), reverse=True)
for file in sorted_files:
if 'thumbnails' not in str(file):
yield file
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
def check_remote_file_exists(client, remote_file_path):
return client.check(remote_file_path)
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
def upload_file(file):
local_file_path = file
remote_file_path = remote_directory + os.path.basename(local_file_path)
if not check_remote_file_exists(client, remote_file_path):
print(f"Starting upload of file: {os.path.basename(local_file_path)}")
upload_file(client, local_file_path, remote_file_path)
print(f"Finished uploading file: {os.path.basename(local_file_path)}")
else:
print(f"Already exists on the server:{os.path.basename(local_file_path)}")
uploaded_files.append(local_file_path)
def local_file_size(local_file_path):
return os.path.getsize(local_file_path)
@retry(stop=stop_after_attempt(3), wait=wait_fixed(2))
def remote_file_size(client, remote_file_path):
info = client.info(remote_file_path)
return int(info['size'])
def upload_recent_files():
for local_file_path in yield_files_sorted_by_time(local_directory):
remote_file_path = remote_directory + os.path.basename(local_file_path)
if not check_remote_file_exists(client, remote_file_path):
print(f"Starting upload of file: {os.path.basename(local_file_path)}")
upload_file(client, local_file_path, remote_file_path)
print(f"Finished uploading file: {os.path.basename(local_file_path)}")
else:
print(f"Already exists on the server:{os.path.basename(local_file_path)}")
with open("uploaded_files.txt", "a") as f:
f.write(local_file_path + "\n")
def delete_local_uploaded_files():
for local_file_path in uploaded_files:
# Check if local file exists before checking its size
if not os.path.exists(local_file_path):
print(f"Local file: {local_file_path} does not exist. Skipping.")
continue
remote_file_path = remote_directory + os.path.basename(local_file_path)
# Check if remote file exists before comparing file sizes
if not check_remote_file_exists(client, remote_file_path):
print(f"Remote file: {remote_file_path} does not exist.")
print("It may not have uploaded properly. Skipping deletion.")
continue
if local_file_size(local_file_path) == remote_file_size(client, remote_file_path):
os.remove(local_file_path)
print(f"Local file: {local_file_path} successfully deleted." )
else:
print(f"File sizes do not match for file: {local_file_path}. Skipping deletion.")
uploaded_files.clear()
while True:
BATCH_SIZE = 100
all_files = list(yield_files_sorted_by_time(local_directory))
upload_count = 0
uploaded_files = []
# Upload files and delete them in batches
for file in all_files:
upload_file(file)
upload_count += 1
if upload_count == BATCH_SIZE:
delete_local_uploaded_files()
upload_count = 0
if uploaded_files:
delete_local_uploaded_files()
# Wait before running again
time.sleep(20)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment