Last active
August 29, 2015 14:12
-
-
Save atvKumar/213d180ec96cd94d7e2e to your computer and use it in GitHub Desktop.
File Utilities
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import division | |
from platform import system | |
from os import stat | |
from os.path import basename, splitext, dirname, split as splitpath, \ | |
join as joinpath | |
from datetime import datetime | |
from glob import glob | |
import hashlib | |
import subprocess | |
blocksize_bg = 64000000 # 64MB | |
blocksize_sm = 16384 # 16KB | |
def filesize(filename, mode=1, precision=1): | |
""" | |
Calculate file size | |
:param filename: File Path | |
:param mode: 1 = raw bytes, 2 = rounded raw bytes, 3 = size on disk | |
:param precision: float precision in Mode 2 | |
:return: 1 = int, 2 = float, 3 = string | |
""" | |
file_bytes = stat(filename).st_size | |
if mode == 1: | |
return file_bytes | |
if system() == 'Darwin': | |
file_blocks = stat(filename).st_blocks | |
if file_bytes < 1024**2: # KB | |
if mode == 2: | |
return round(file_bytes / 1000, precision) | |
elif mode == 3: | |
return str(round(file_blocks * 512e-3, 1)) + ' KB' | |
elif file_bytes < 1024**3: # MB | |
if mode == 2: | |
return round(file_bytes / 1000**2, precision) | |
elif mode == 3: | |
return str(round(file_blocks * 512e-6, 1)) + ' MB' | |
elif file_bytes < 1024**4: # GB | |
if mode == 2: | |
return round(file_bytes / 1000**3, precision) | |
elif mode == 3: | |
return str(round(file_blocks * 512e-6 / 1000, 2)) + ' GB' | |
elif file_bytes < 1024**5: # TB | |
if mode == 2: | |
return round(file_bytes / 1000**4, precision) | |
elif mode == 3: | |
return str(round(file_blocks * 512e-6 / 1000**2, 2)) + ' TB' | |
elif system() == 'Windows': | |
# file_blocks = 0 | |
if file_bytes < 1024**2: # KB | |
if mode == 2: | |
return round(file_bytes / 1000, precision) | |
elif mode == 3: | |
return str(round(file_bytes / 1024, 1)) + ' KB' | |
elif file_bytes < 1024**3: # MB | |
if mode == 2: | |
return round(file_bytes / 1000**2, precision) | |
elif mode == 3: | |
return str(round(file_bytes / 1024**2, 1)) + ' MB' | |
elif file_bytes < 1024**4: # GB | |
if mode == 2: | |
return round(file_bytes / 1000**3, precision) | |
elif mode == 3: | |
return str(round(file_bytes / 1024**3, 2)) + ' GB' | |
elif file_bytes < 1024**5: # TB | |
if mode == 2: | |
return round(file_bytes / 1000**4, precision) | |
elif mode == 3: | |
return str(round(file_bytes / 1024**4, 2)) + ' TB' | |
def get_creation_time(filename): | |
p = subprocess.Popen(['stat', '-f%B', filename], | |
stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
if p.wait(): | |
raise OSError(p.stderr.read().rstrip()) | |
else: | |
return int(p.stdout.read()) | |
def filedate_created(filename, as_datetime=False, date_format=None): | |
if system() == 'Darwin': | |
modified_date = datetime.fromtimestamp(get_creation_time(filename)) | |
else: | |
modified_date = datetime.fromtimestamp(stat(filename).st_ctime) | |
if as_datetime: | |
return modified_date | |
if date_format: | |
return modified_date.strftime(date_format) | |
if system() == 'Darwin': | |
return modified_date.strftime("%d/%m/%Y %H:%M:%S %p") | |
elif system() == 'Windows': | |
return modified_date.strftime("%d %B %Y, %H:%M:%S %p") | |
def filedate_modified(filename, as_datetime=False, date_format=None): | |
modified_date = datetime.fromtimestamp(stat(filename).st_mtime) | |
if as_datetime: | |
return modified_date | |
if date_format: | |
return modified_date.strftime(date_format) | |
if system() == 'Darwin': | |
return modified_date.strftime("%d/%m/%Y %H:%M:%S %p") | |
elif system() == 'Windows': | |
return modified_date.strftime("%d %B %Y, %H:%M:%S %p") | |
def chunk(filename, size=4096): | |
with open(filename, 'rb') as fp: | |
for piece in iter(lambda: fp.read(size), ''): | |
yield piece | |
def buffered_chunk(filename, blocksize=blocksize_sm): | |
file_size = filesize(filename) | |
num_of_chunks = calculate_chunks(filename, blocksize) | |
chunk_size = blocksize | |
total_bytes = 0 | |
with open(filename, 'rb') as fp: | |
for x in xrange(1, num_of_chunks+1): | |
if x == num_of_chunks: | |
chunk_size = file_size - total_bytes | |
data = bytearray(chunk_size) | |
fp.readinto(data) | |
total_bytes += len(data) | |
yield data | |
del data | |
def calculate_md5(filename): | |
md5 = hashlib.md5() | |
for piece in chunk(filename): | |
md5.update(piece) | |
return md5.hexdigest() | |
def calculate_chunks(filename, chunk_size=blocksize_sm): | |
file_size = filesize(filename) | |
num_of_chunks = int(file_size / chunk_size) | |
if file_size % chunk_size: | |
num_of_chunks += 1 | |
return num_of_chunks | |
def split(filename, output_directory=None, blocksize=blocksize_sm, digits=5): | |
if not output_directory: | |
output_directory = splitpath(filename)[0] | |
for i, block in enumerate(chunk(filename, blocksize), 1): | |
# print i, hashlib.md5(block).hexdigest(), type(block) | |
with open(joinpath(output_directory, | |
basename(filename)+'.') + str(i).zfill(digits), | |
'wb') as fp: | |
fp.write(block) | |
fp.flush() | |
def join(filename, digits=5): | |
filename, source_directory = basename(filename), dirname(filename) | |
org_filename = filename[0:-(digits+1)] | |
num_of_chunks = glob(joinpath(source_directory, org_filename) + '.*') | |
with open(joinpath(source_directory, org_filename), 'wb') as out_fp: | |
for i in sorted(num_of_chunks): | |
with open(i, 'rb') as fp: | |
# fp.readinto(data) | |
data = fp.read() | |
out_fp.write(data) | |
out_fp.flush() | |
def generate_index(source_fname, blocksize=blocksize_sm): | |
location, fname = splitpath(source_fname) | |
index_filename = splitext(fname)[0] + '.index' | |
fsize = filesize(source_fname) | |
fdate_created = filedate_created(source_fname) | |
fdate_modified = filedate_modified(source_fname) | |
md5_signature = hashlib.md5() | |
index_table = list() | |
for i, block in enumerate(chunk(source_fname, blocksize), 1): | |
md5_signature.update(block) | |
row = [i, len(block), hashlib.md5(block).hexdigest()] | |
index_table.append(row) | |
file_signature = md5_signature.hexdigest() | |
# print index_filename | |
# print location | |
# print fname | |
# print ','.join([str(fsize), fdate_created, fdate_modified]) | |
# print file_signature | |
# print '\n'.join([','.join(str(y) for y in x) | |
# for x in [i for i in index_table]]) | |
with open(joinpath(location, index_filename), 'w') as fp: | |
fp.write(location + '\n') | |
fp.write(fname + '\n') | |
fp.write(','.join([str(fsize), fdate_created, fdate_modified]) + '\n') | |
fp.write(file_signature + '\n') | |
fp.write('\n'.join([','.join(str(y) for y in x) | |
for x in [i for i in index_table]])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment