Last active
July 17, 2021 08:28
-
-
Save rolicot/85fa0c7558844fca2023 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
'''Detects renamed and/or moved files by tracking inodes. | |
Creates a shell script to replay similar changes. Make sure to use relative | |
paths if you want to replay changes in a different absolute location. Does not | |
follow symbolic links. Inode numbers must be identical (do not cross | |
filesystems)! | |
''' | |
__author__ = 'Pavel Krc' | |
__email__ = '[email protected]' | |
__version__ = '1.1' | |
__copyright__ = 'Copyright (C) 2015 Pavel Krc' | |
__license__ = 'GPLv2+' | |
# This program is free software: you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License as published by | |
# the Free Software Foundation, either version 2 of the License, or | |
# (at your option) any later version. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License | |
# along with this program. If not, see <http://www.gnu.org/licenses/>. | |
import sys | |
import os | |
import re | |
generate_python_scripts = True # instead of shell scripts (more reliable) | |
if generate_python_scripts: | |
script_header = '''#!/usr/bin/env python | |
# -*- coding: utf-8 -*- | |
import os | |
def ren(a, b): | |
print(b) | |
assert(not os.path.exists(b)) | |
os.rename(a, b) | |
def mkd(d): | |
print(d) | |
os.mkdir(d) | |
''' | |
mv_cmd = 'ren("{0}", "{1}")\n' | |
mkdir_cmd = 'mkd("{0}")\n' | |
escaped_chars = re.compile(r'(["\\])') | |
esc = lambda s: escaped_chars.sub(r'\\\1', s) | |
else: | |
script_header = '#!/bin/sh\nset -e -v\n\n' | |
mv_cmd = 'mv -n -T -- "{0}" "{1}"\n' | |
mkdir_cmd = 'mkdir -- "{0}"\n' | |
# Since single quotes cannot be escaped in sh, I have to use double qoutes | |
# and do a little bit more escaping. Fortunately regex does it efficiently. | |
escaped_chars = re.compile(r'([`"\$\\])') | |
esc = lambda s: escaped_chars.sub(r'\\\1', s) | |
def dump_inodes(root, log_path): | |
# must be top-down for reconstruction | |
with open(log_path, 'w') as o: | |
o.write('D {0:d} {1}\n'.format(os.lstat(root).st_ino, root)) | |
for dpath, dnames, fnames in os.walk(root): | |
dpath += '/' | |
for n in dnames: | |
p = dpath + n | |
o.write('D {0:d} {1}\n'.format(os.lstat(p).st_ino, p)) | |
for n in fnames: | |
p = dpath + n | |
o.write('F {0:d} {1}\n'.format(os.lstat(p).st_ino, p)) | |
class DirEntry(object): | |
__slots__ = ['path', 'parent', 'dirs', 'files'] | |
def __init__(self, path, parent): | |
self.path = path | |
self.parent = parent | |
self.dirs = set() | |
self.files = set() | |
class FileEntry(object): | |
__slots__ = ['path', 'parent'] | |
def __init__(self, path, parent): | |
self.path = path | |
self.parent = parent | |
class MovingTree(object): | |
def __init__(self, log_path): | |
self.dirs = {} | |
self.files = {} | |
revdirs = {} | |
with open(log_path) as i: | |
# root entry | |
df, ino, path = next(i).rstrip('\n').split(' ', 2) | |
ino = int(ino) | |
assert df == 'D' | |
self.root = path | |
self.dirs[ino] = DirEntry(path, None) | |
revdirs[path] = ino | |
for ln in i: | |
df, ino, path = ln.rstrip('\n').split(' ', 2) | |
ino = int(ino) | |
parent_ino = revdirs[path.rsplit('/', 1)[0]] | |
if df == 'D': | |
self.dirs[ino] = DirEntry(path, parent_ino) | |
revdirs[path] = ino | |
self.dirs[parent_ino].dirs.add(ino) | |
elif df == 'F': | |
self.files[ino] = FileEntry(path, parent_ino) | |
self.dirs[parent_ino].files.add(ino) | |
else: | |
raise RuntimeError() | |
def create_script(self, script_path): | |
# uses os.open to create executable script - still, read it first! | |
cls = lambda: None | |
try: | |
fd = os.open(script_path, os.O_CREAT|os.O_WRONLY|os.O_TRUNC, 0o777) | |
cls = lambda: os.close(fd) | |
o = os.fdopen(fd, 'w') | |
cls = o.close | |
o.write(script_header) | |
self.detect_changes(o) | |
finally: | |
cls() | |
def update_children(self, entry, orig_p, new_p): | |
l = len(orig_p) | |
for i in entry.dirs: | |
centry = self.dirs[i] | |
assert centry.path[:l] == orig_p | |
centry.path = new_p + centry.path[l:] | |
self.update_children(centry, orig_p, new_p) | |
for i in entry.files: | |
centry = self.files[i] | |
assert centry.path[:l] == orig_p | |
centry.path = new_p + centry.path[l:] | |
def detect_changes(self, script): | |
# The order of detecting changes is important. The safest order I could | |
# think of was to start top-bottom according to destination (i.e. | |
# safely constructing new state with guaranteed existing parents), | |
# updating source data structures where necessary. | |
newfiles = [] | |
ok_dirs = ok_files = 0 | |
for dpath, dnames, fnames in os.walk(self.root): | |
dpath += '/' | |
for n in dnames: | |
p = dpath + n | |
ino = os.lstat(p).st_ino | |
try: | |
orig_entry = self.dirs.pop(ino) | |
except KeyError: | |
# new directory | |
script.write(mkdir_cmd.format(esc(p))) | |
else: | |
# existing directory | |
if orig_entry.path == p: | |
ok_dirs += 1 | |
else: | |
# moved | |
script.write(mv_cmd.format(esc(orig_entry.path), esc(p))) | |
# disparent self | |
try: | |
parent_entry = self.dirs[orig_entry.parent] | |
except KeyError: | |
pass #parent already processed | |
else: | |
parent_entry.dirs.remove(ino) | |
# moving under either freshly created or already | |
# processed dir, so no need to register under new | |
# parent. | |
# update all children in the source tree | |
self.update_children(orig_entry, orig_entry.path+'/', p+'/') | |
for n in fnames: | |
p = dpath + n | |
ino = os.lstat(p).st_ino | |
try: | |
orig_entry = self.files.pop(ino) | |
except KeyError: | |
# new file - just log | |
newfiles.append(p) | |
else: | |
# existing file | |
if orig_entry.path == p: | |
ok_files += 1 | |
else: | |
# moved | |
script.write(mv_cmd.format(esc(orig_entry.path), esc(p))) | |
# disparent self | |
try: | |
parent_entry = self.dirs[orig_entry.parent] | |
except KeyError: | |
pass #parent already processed | |
else: | |
parent_entry.files.remove(ino) | |
# list remaining unprocessed | |
script.write('\n### Deleted directories ###\n') | |
for p in sorted(e.path for e in self.dirs.values() | |
if e.path != self.root): | |
script.write('#{0}\n'.format(p)) | |
script.write('\n### Deleted files ###\n') | |
for p in sorted(e.path for e in self.files.values()): | |
script.write('#{0}\n'.format(p)) | |
script.write('\n### Newly created files ###\n') | |
for p in newfiles: | |
script.write('#{0}\n'.format(p)) | |
script.write('\n### {0:d} dirs and {1:d} files have remained in place. ###\n' | |
.format(ok_dirs, ok_files)) | |
if __name__ == '__main__': | |
action = sys.argv[1:2] | |
if action == ['dump']: | |
dump_inodes(sys.argv[2], sys.argv[3]) | |
elif action == ['detect']: | |
tr = MovingTree(sys.argv[2]) | |
tr.create_script(sys.argv[3]) | |
else: | |
sys.exit('''Usage: | |
{0} dump {{root_path}} {{inode_list_path}} | |
Dumps inode numbers inside {{root_path}} to a new | |
file {{inode_list_path}}, thus recording current state. | |
{0} detect {{inode_list_path}} {{script_path}} | |
Compares recorded state within {{inode_list_path}} with current | |
state and creates a script to reconstruct detected changes. | |
'''.format(sys.argv[0])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment