Last active
September 25, 2022 14:11
-
-
Save JonasBernard/33e98fd44a23a46e5e83ada0000bb3cb to your computer and use it in GitHub Desktop.
Gtk-application to recursively move all files that are older than a given date to another location.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from enum import Enum | |
import shutil | |
import datetime | |
import threading | |
import time | |
import errno | |
import uuid | |
import gi | |
import os | |
gi.require_version("Gtk", "3.0") | |
from gi.repository import Gio, Gtk, GLib | |
class Log(Enum): | |
NORMAL = 0 | |
ERROR = 1 | |
class ProcessDialog(Gtk.Dialog): | |
def __init__(self, parent): | |
Gtk.Dialog.__init__(self, title="Backup im process", transient_for=parent, flags=0) | |
self.add_buttons( | |
Gtk.STOCK_CLOSE, Gtk.ResponseType.CANCEL, | |
) | |
self.set_default_size(500, 300) | |
self.scoll = Gtk.ScrolledWindow(hexpand=True, vexpand=True) | |
self.log = Gtk.ListBox() | |
self.log.set_selection_mode(Gtk.SelectionMode.NONE) | |
self.grid = Gtk.Grid() | |
self.scoll.add(self.log) | |
self.grid.attach(self.scoll,0,0,1,1) | |
self.progressbar = Gtk.ProgressBar() | |
self.grid.attach(self.progressbar, 0, 1, 1, 1) | |
self.add_log("Logging content will appear here...") | |
box = self.get_content_area() | |
box.add(self.grid) | |
# self.show_all() | |
def set_progress(self, fraction): | |
self.progressbar.set_fraction(fraction) | |
def stop_progressbar(self): | |
self.progressbar.set_fraction(0.0) | |
def start_progressbar(self): | |
self.progressbar.pulse() | |
def add_log(self, string, type=Log.NORMAL): | |
self.progressbar.set_text(string) | |
print(string) | |
l = Gtk.ListBoxRow() | |
text = Gtk.Label(xalign=0) | |
style = "" | |
if type == Log.ERROR: | |
style = "foreground=\"red\"" | |
text.set_markup("<span {0}>{1}: {2}</span>".format(style, time.time(), string)) | |
l.add(text) | |
self.log.prepend(l) | |
self.show_all() | |
class MainWindow(Gtk.Window): | |
def __init__(self): | |
Gtk.Window.__init__(self, title="Mobile Media Backup") | |
self.set_border_width(10) | |
self.grid = Gtk.Grid() | |
self.add(self.grid) | |
self.header = Gtk.HeaderBar(title="Backup all media files from a folder that are older than a certain date") | |
self.grid.attach(self.header, 0, 0, 3, 1) | |
self.spinner = Gtk.Spinner() | |
self.start_button = Gtk.Button(label="Start") | |
self.start_button.connect("clicked", self.start) | |
self.header.pack_end(self.start_button) | |
self.files_box = Gtk.ListBox() | |
self.files_box.set_selection_mode(Gtk.SelectionMode.NONE) | |
self.grid.attach(self.files_box, 0, 1, 1, 1) | |
self.source = Gtk.ListBoxRow() | |
self.hbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL) | |
self.source.add(self.hbox) | |
self.source_label = Gtk.Label() | |
self.source_label.set_text("Select source") | |
self.source_button = Gtk.Button(label="Select Source") | |
self.source_button = Gtk.FileChooserButton(action=Gtk.FileChooserAction.SELECT_FOLDER) | |
self.hbox.pack_start(self.source_label, True, True, 10) | |
self.hbox.pack_end(self.source_button, True, True, 10) | |
self.dest = Gtk.ListBoxRow() | |
self.hbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL) | |
self.dest.add(self.hbox) | |
self.dest_label = Gtk.Label() | |
self.dest_label.set_text("Select destination") | |
self.dest_button = Gtk.FileChooserButton(action=Gtk.FileChooserAction.SELECT_FOLDER) | |
self.hbox.pack_start(self.dest_label, True, True, 10) | |
self.hbox.pack_end(self.dest_button, True, True, 10) | |
self.files_box.add(self.source) | |
self.files_box.add(self.dest) | |
self.calendar = Gtk.Calendar(hexpand=False, vexpand=True) | |
self.grid.attach(self.calendar, 1, 1, 1, 1) | |
self.ignoreFileExtensions = Gtk.TextView(hexpand=True, vexpand=True) | |
self.grid.attach(self.ignoreFileExtensions, 2, 1, 1, 1) | |
def get_ignored_file_extensions(self): | |
return [] | |
buffer = self.ignoreFileExtensions.get_buffer() | |
startIter, endIter = buffer.get_bounds() | |
ignored_file_extensions = buffer.get_text(startIter, endIter, True).split("\n") | |
return list(filter(lambda s: s != "" and s.startswith("."), ignored_file_extensions)) | |
def start(self, widget): | |
if not os.path.isdir(str(self.source_button.get_filename())): | |
msg = Gtk.MessageDialog(transient_for=self, flags=0, message_type=Gtk.MessageType.INFO, | |
title="No source file selected", buttons=Gtk.ButtonsType.OK, | |
text="There isn't any valid source file selected.") | |
msg.format_secondary_text( | |
"You cannot start the process before selecting a source directory." | |
) | |
msg.connect("response", self.dialog_response) | |
msg.show() | |
return | |
if not os.path.isdir(str(self.dest_button.get_filename())): | |
msg = Gtk.MessageDialog(transient_for=self, flags=0, message_type=Gtk.MessageType.INFO, | |
title="No destination file selected", buttons=Gtk.ButtonsType.OK, | |
text="There isn't any valid destination file selected.") | |
msg.format_secondary_text( | |
"You cannot start the process before selecting a destination directory." | |
) | |
msg.connect("response", self.dialog_response) | |
msg.show() | |
return | |
cal_min_date = self.calendar.get_date() | |
min_date = datetime.datetime(cal_min_date[0], cal_min_date[1] + 1, cal_min_date[2]) | |
ignored_file_extensions = self.get_ignored_file_extensions() | |
dialog = ProcessDialog(self) | |
process = threading.Thread(target=self.copy_to_meta, name="File thread", daemon=True, | |
args=(dialog.start_progressbar, dialog.stop_progressbar, dialog.add_log, dialog.set_progress, | |
min_date, self.source_button.get_filename(), self.dest_button.get_filename(), ignored_file_extensions)) | |
process.start() | |
self.spinner.start() | |
self.header.pack_start(self.spinner) | |
dialog.run() | |
self.spinner.stop() | |
dialog.destroy() | |
def dialog_response(self, widget, response_id): | |
widget.destroy() | |
def copy_to_meta(self, on_start, on_stop, log_function, set_progress, min_date, source_dir, dest_dir, ignored_file_extensions): | |
GLib.idle_add(on_start) | |
self.copy_to(log_function, set_progress, min_date, source_dir, dest_dir, ignored_file_extensions) | |
log_function("Finished indexing!") | |
GLib.idle_add(on_stop) | |
def copy_to(self, log_function_handle, set_progress, min_date, source_dir, dest_dir, ignored_file_extensions): | |
log_function = lambda s,t=Log.NORMAL: GLib.idle_add(log_function_handle, s, t) | |
progress_function = lambda f: GLib.idle_add(set_progress, f) | |
log_function("Scanning {0}...".format(source_dir)) | |
listdir = os.listdir(source_dir) | |
found_files = len(listdir) | |
for (index, file) in enumerate(listdir): | |
fraction = index / found_files | |
progress_function(fraction) | |
source_file = os.path.join(source_dir, file) | |
dest_file = os.path.join(dest_dir, file) | |
if os.path.isfile(source_file): | |
continue_l = False | |
for ext in ignored_file_extensions: | |
if source_file.endswith(ext): | |
continue_l = True | |
if continue_l: | |
continue | |
if datetime.datetime.fromtimestamp(os.path.getmtime(source_file)) < min_date: | |
log_function("Found file {0}. Move it to {1}.".format(self.shorten_path(source_file), self.shorten_path(dest_file))) | |
try: | |
self.safe_move(source_file, dest_file) | |
except Exception as e: | |
log_function("Error while moving: {0}".format(e), Log.ERROR) | |
else: | |
log_function("Found file {0} but its to new.".format(source_file)) | |
elif os.path.isdir(source_file): # source_file is a directory | |
log_function("Found directory {0} and scanning with new destination {1}".format(self.shorten_path(source_file), self.shorten_path(dest_file))) | |
if not os.path.isdir(dest_file): | |
log_function("Creating directory {0}.".format(self.shorten_path(dest_file))) | |
try: | |
os.mkdir(dest_file) | |
except Exception as e: | |
log_function("Error while creating directory {0}: {1}".format(dest_file, e), Log.ERROR) | |
log_function("Do you have write permissions in {0}?".format(dest_dir), Log.ERROR) | |
return | |
process = threading.Thread(target=self.copy_to, name="File thread for {0}".format(source_file), daemon=True, | |
args=(log_function_handle, set_progress, min_date, source_file, dest_file, ignored_file_extensions)) | |
process.start() | |
def safe_move(self, src, dst): | |
"""Rename a file from ``src`` to ``dst``. | |
* Moves must be atomic. ``shutil.move()`` is not atomic. | |
Note that multiple threads may try to write to the cache at once, | |
so atomicity is required to ensure the serving on one thread doesn't | |
pick up a partially saved image from another thread. | |
* Moves must work across filesystems. Often temp directories and the | |
cache directories live on different filesystems. ``os.rename()`` can | |
throw errors if run across filesystems. | |
So we try ``os.rename()``, but if we detect a cross-filesystem copy, we | |
switch to ``shutil.move()`` with some wrappers to make it atomic. | |
""" | |
try: | |
os.rename(src, dst) | |
except OSError as err: | |
if err.errno == errno.EXDEV: | |
# Generate a unique ID, and copy `<src>` to the target directory | |
# with a temporary name `<dst>.<ID>.tmp`. Because we're copying | |
# across a filesystem boundary, this initial copy may not be | |
# atomic. We intersperse a random UUID so if different processes | |
# are copying into `<dst>`, they don't overlap in their tmp copies. | |
copy_id = uuid.uuid4() | |
tmp_dst = "%s.%s.tmp" % (dst, copy_id) | |
shutil.copyfile(src, tmp_dst) | |
# Then do an atomic rename onto the new name, and clean up the | |
# source image. | |
os.rename(tmp_dst, dst) | |
os.unlink(src) | |
elif err.errno == errno.EOPNOTSUPP: | |
raise RuntimeError("Could not write file. Make sure " + | |
f"the program has write privileges. File: {dst}.") | |
else: | |
raise | |
def shorten_path(self, path): | |
folders= path.split("/") | |
if len(folders) <= 2: | |
return path | |
return "/" + folders[1] + "/.../" + folders[len(folders)-1] | |
win = MainWindow() | |
win.connect("destroy", Gtk.main_quit) | |
win.show_all() | |
Gtk.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment