Skip to content

Instantly share code, notes, and snippets.

@JonasBernard
Last active September 25, 2022 14:11
Show Gist options
  • Save JonasBernard/33e98fd44a23a46e5e83ada0000bb3cb to your computer and use it in GitHub Desktop.
Save JonasBernard/33e98fd44a23a46e5e83ada0000bb3cb to your computer and use it in GitHub Desktop.
Gtk-application to recursively move all files that are older than a given date to another location.
from enum import Enum
import shutil
import datetime
import threading
import time
import errno
import uuid
import gi
import os
gi.require_version("Gtk", "3.0")
from gi.repository import Gio, Gtk, GLib
class Log(Enum):
NORMAL = 0
ERROR = 1
class ProcessDialog(Gtk.Dialog):
def __init__(self, parent):
Gtk.Dialog.__init__(self, title="Backup im process", transient_for=parent, flags=0)
self.add_buttons(
Gtk.STOCK_CLOSE, Gtk.ResponseType.CANCEL,
)
self.set_default_size(500, 300)
self.scoll = Gtk.ScrolledWindow(hexpand=True, vexpand=True)
self.log = Gtk.ListBox()
self.log.set_selection_mode(Gtk.SelectionMode.NONE)
self.grid = Gtk.Grid()
self.scoll.add(self.log)
self.grid.attach(self.scoll,0,0,1,1)
self.progressbar = Gtk.ProgressBar()
self.grid.attach(self.progressbar, 0, 1, 1, 1)
self.add_log("Logging content will appear here...")
box = self.get_content_area()
box.add(self.grid)
# self.show_all()
def set_progress(self, fraction):
self.progressbar.set_fraction(fraction)
def stop_progressbar(self):
self.progressbar.set_fraction(0.0)
def start_progressbar(self):
self.progressbar.pulse()
def add_log(self, string, type=Log.NORMAL):
self.progressbar.set_text(string)
print(string)
l = Gtk.ListBoxRow()
text = Gtk.Label(xalign=0)
style = ""
if type == Log.ERROR:
style = "foreground=\"red\""
text.set_markup("<span {0}>{1}: {2}</span>".format(style, time.time(), string))
l.add(text)
self.log.prepend(l)
self.show_all()
class MainWindow(Gtk.Window):
def __init__(self):
Gtk.Window.__init__(self, title="Mobile Media Backup")
self.set_border_width(10)
self.grid = Gtk.Grid()
self.add(self.grid)
self.header = Gtk.HeaderBar(title="Backup all media files from a folder that are older than a certain date")
self.grid.attach(self.header, 0, 0, 3, 1)
self.spinner = Gtk.Spinner()
self.start_button = Gtk.Button(label="Start")
self.start_button.connect("clicked", self.start)
self.header.pack_end(self.start_button)
self.files_box = Gtk.ListBox()
self.files_box.set_selection_mode(Gtk.SelectionMode.NONE)
self.grid.attach(self.files_box, 0, 1, 1, 1)
self.source = Gtk.ListBoxRow()
self.hbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
self.source.add(self.hbox)
self.source_label = Gtk.Label()
self.source_label.set_text("Select source")
self.source_button = Gtk.Button(label="Select Source")
self.source_button = Gtk.FileChooserButton(action=Gtk.FileChooserAction.SELECT_FOLDER)
self.hbox.pack_start(self.source_label, True, True, 10)
self.hbox.pack_end(self.source_button, True, True, 10)
self.dest = Gtk.ListBoxRow()
self.hbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL)
self.dest.add(self.hbox)
self.dest_label = Gtk.Label()
self.dest_label.set_text("Select destination")
self.dest_button = Gtk.FileChooserButton(action=Gtk.FileChooserAction.SELECT_FOLDER)
self.hbox.pack_start(self.dest_label, True, True, 10)
self.hbox.pack_end(self.dest_button, True, True, 10)
self.files_box.add(self.source)
self.files_box.add(self.dest)
self.calendar = Gtk.Calendar(hexpand=False, vexpand=True)
self.grid.attach(self.calendar, 1, 1, 1, 1)
self.ignoreFileExtensions = Gtk.TextView(hexpand=True, vexpand=True)
self.grid.attach(self.ignoreFileExtensions, 2, 1, 1, 1)
def get_ignored_file_extensions(self):
return []
buffer = self.ignoreFileExtensions.get_buffer()
startIter, endIter = buffer.get_bounds()
ignored_file_extensions = buffer.get_text(startIter, endIter, True).split("\n")
return list(filter(lambda s: s != "" and s.startswith("."), ignored_file_extensions))
def start(self, widget):
if not os.path.isdir(str(self.source_button.get_filename())):
msg = Gtk.MessageDialog(transient_for=self, flags=0, message_type=Gtk.MessageType.INFO,
title="No source file selected", buttons=Gtk.ButtonsType.OK,
text="There isn't any valid source file selected.")
msg.format_secondary_text(
"You cannot start the process before selecting a source directory."
)
msg.connect("response", self.dialog_response)
msg.show()
return
if not os.path.isdir(str(self.dest_button.get_filename())):
msg = Gtk.MessageDialog(transient_for=self, flags=0, message_type=Gtk.MessageType.INFO,
title="No destination file selected", buttons=Gtk.ButtonsType.OK,
text="There isn't any valid destination file selected.")
msg.format_secondary_text(
"You cannot start the process before selecting a destination directory."
)
msg.connect("response", self.dialog_response)
msg.show()
return
cal_min_date = self.calendar.get_date()
min_date = datetime.datetime(cal_min_date[0], cal_min_date[1] + 1, cal_min_date[2])
ignored_file_extensions = self.get_ignored_file_extensions()
dialog = ProcessDialog(self)
process = threading.Thread(target=self.copy_to_meta, name="File thread", daemon=True,
args=(dialog.start_progressbar, dialog.stop_progressbar, dialog.add_log, dialog.set_progress,
min_date, self.source_button.get_filename(), self.dest_button.get_filename(), ignored_file_extensions))
process.start()
self.spinner.start()
self.header.pack_start(self.spinner)
dialog.run()
self.spinner.stop()
dialog.destroy()
def dialog_response(self, widget, response_id):
widget.destroy()
def copy_to_meta(self, on_start, on_stop, log_function, set_progress, min_date, source_dir, dest_dir, ignored_file_extensions):
GLib.idle_add(on_start)
self.copy_to(log_function, set_progress, min_date, source_dir, dest_dir, ignored_file_extensions)
log_function("Finished indexing!")
GLib.idle_add(on_stop)
def copy_to(self, log_function_handle, set_progress, min_date, source_dir, dest_dir, ignored_file_extensions):
log_function = lambda s,t=Log.NORMAL: GLib.idle_add(log_function_handle, s, t)
progress_function = lambda f: GLib.idle_add(set_progress, f)
log_function("Scanning {0}...".format(source_dir))
listdir = os.listdir(source_dir)
found_files = len(listdir)
for (index, file) in enumerate(listdir):
fraction = index / found_files
progress_function(fraction)
source_file = os.path.join(source_dir, file)
dest_file = os.path.join(dest_dir, file)
if os.path.isfile(source_file):
continue_l = False
for ext in ignored_file_extensions:
if source_file.endswith(ext):
continue_l = True
if continue_l:
continue
if datetime.datetime.fromtimestamp(os.path.getmtime(source_file)) < min_date:
log_function("Found file {0}. Move it to {1}.".format(self.shorten_path(source_file), self.shorten_path(dest_file)))
try:
self.safe_move(source_file, dest_file)
except Exception as e:
log_function("Error while moving: {0}".format(e), Log.ERROR)
else:
log_function("Found file {0} but its to new.".format(source_file))
elif os.path.isdir(source_file): # source_file is a directory
log_function("Found directory {0} and scanning with new destination {1}".format(self.shorten_path(source_file), self.shorten_path(dest_file)))
if not os.path.isdir(dest_file):
log_function("Creating directory {0}.".format(self.shorten_path(dest_file)))
try:
os.mkdir(dest_file)
except Exception as e:
log_function("Error while creating directory {0}: {1}".format(dest_file, e), Log.ERROR)
log_function("Do you have write permissions in {0}?".format(dest_dir), Log.ERROR)
return
process = threading.Thread(target=self.copy_to, name="File thread for {0}".format(source_file), daemon=True,
args=(log_function_handle, set_progress, min_date, source_file, dest_file, ignored_file_extensions))
process.start()
def safe_move(self, src, dst):
"""Rename a file from ``src`` to ``dst``.
* Moves must be atomic. ``shutil.move()`` is not atomic.
Note that multiple threads may try to write to the cache at once,
so atomicity is required to ensure the serving on one thread doesn't
pick up a partially saved image from another thread.
* Moves must work across filesystems. Often temp directories and the
cache directories live on different filesystems. ``os.rename()`` can
throw errors if run across filesystems.
So we try ``os.rename()``, but if we detect a cross-filesystem copy, we
switch to ``shutil.move()`` with some wrappers to make it atomic.
"""
try:
os.rename(src, dst)
except OSError as err:
if err.errno == errno.EXDEV:
# Generate a unique ID, and copy `<src>` to the target directory
# with a temporary name `<dst>.<ID>.tmp`. Because we're copying
# across a filesystem boundary, this initial copy may not be
# atomic. We intersperse a random UUID so if different processes
# are copying into `<dst>`, they don't overlap in their tmp copies.
copy_id = uuid.uuid4()
tmp_dst = "%s.%s.tmp" % (dst, copy_id)
shutil.copyfile(src, tmp_dst)
# Then do an atomic rename onto the new name, and clean up the
# source image.
os.rename(tmp_dst, dst)
os.unlink(src)
elif err.errno == errno.EOPNOTSUPP:
raise RuntimeError("Could not write file. Make sure " +
f"the program has write privileges. File: {dst}.")
else:
raise
def shorten_path(self, path):
folders= path.split("/")
if len(folders) <= 2:
return path
return "/" + folders[1] + "/.../" + folders[len(folders)-1]
win = MainWindow()
win.connect("destroy", Gtk.main_quit)
win.show_all()
Gtk.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment