Created
September 13, 2023 09:07
-
-
Save Maigre/ff5e60b616d31f4054844c2138afc5e4 to your computer and use it in GitHub Desktop.
sd2img
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import subprocess, os, sys, math | |
####################### | |
def usage(): | |
print("Usage: sudo sd2img /dev/sdx /path/to/image.img") | |
def b_size(size_bytes): | |
if size_bytes == 0: | |
return "0" | |
size_name = ("", "K", "M", "G") | |
i = int(math.floor(math.log(size_bytes, 1024))) | |
p = math.pow(1024, i) | |
s = round(size_bytes / p) | |
return "%s%s" % (s, size_name[i]) | |
def t_size(size_bytes): | |
if size_bytes == 0: | |
return "0" | |
size_name = ("B", "KB", "MB", "GB", "TB") | |
i = int(math.floor(math.log(size_bytes, 1024))) | |
p = math.pow(1024, i) | |
s = round(size_bytes / p, 2) | |
return "%s %s" % (s, size_name[i]) | |
def confirm(question): | |
answer = "" | |
while answer not in ["y", "n"]: | |
answer = input(question+" [Y/N] ").lower() | |
return answer == "y" | |
###################### | |
# Check SUDO | |
if os.geteuid() != 0: | |
print("must be root, please sudo") | |
exit(1) | |
# Check SD provided | |
if len(sys.argv) <= 1 or not sys.argv[1].startswith('/dev') or sys.argv[1][-1].isdigit(): | |
print("missing or invalid source device ") | |
usage() | |
exit(1) | |
# Check IMG provided | |
if len(sys.argv) <= 2 or not sys.argv[2].endswith('img'): | |
print("missing or invalid destination file ") | |
usage() | |
exit(1) | |
# fdisk | |
try: | |
fdisk = subprocess.check_output("fdisk -l "+sys.argv[1], shell=True) | |
except: exit(1) | |
fdisk = str(fdisk).split("\\n") | |
# Sector Size | |
sectorSize = 0 | |
for line in fdisk: | |
if "Sector size" in line: | |
sectorSize = int(line[line.find(": ")+2:line.rfind(" /")].split(" ")[0]) | |
break | |
if sectorSize > 0: | |
print("** Sector size:\t", sectorSize, "bytes") | |
else: | |
print("ERROR: can't detect sector size.\n") | |
print(fdisk) | |
exit(1) | |
# Last sector | |
lastSector = 0 | |
lines = [part for part in fdisk if part.startswith(sys.argv[1])] | |
for l in lines: | |
lsec = int([val for val in l.split(' ')[1:] if val != ''][1]) | |
if lsec > lastSector: lastSector = lsec | |
if lastSector > 0: | |
print("** Last sector:\t", lastSector) | |
else: | |
print("ERROR: can't detect last sector.\n") | |
print(fdisk) | |
exit(1) | |
# Blocks | |
blockSize = 4 * 1024 * 1024 # 4M | |
blockCount = math.ceil(lastSector * sectorSize / blockSize) + 1 | |
print("** Block size:\t", b_size(blockSize)) | |
print("** Block count:\t", blockCount) | |
print() | |
# Size | |
targetSize = blockSize*blockCount | |
print("Target size:\t", t_size(targetSize)) | |
print() | |
# Destination | |
destPath = None | |
if os.path.isabs(sys.argv[2]): | |
destPath = sys.argv[2] | |
else: | |
destPath = os.path.join( os.getcwd(), sys.argv[2]); | |
# Free space | |
disk = os.statvfs(os.path.dirname(destPath)) | |
availableSize = disk.f_bavail * disk.f_frsize | |
if targetSize >= availableSize: | |
print("ERROR: Not enough free space left on disk...") | |
exit(1) | |
# Overwrite | |
if os.path.exists(destPath): | |
if not confirm("WARNING: Destination "+destPath+" already exists. Overwrite ?"): | |
print("Cancelled.") | |
exit(1) | |
# Go | |
ddCMD = "dd if="+sys.argv[1]+" of="+destPath+" bs="+b_size(blockSize)+" count="+str(blockCount)+" status=progress" | |
dd = subprocess.Popen(ddCMD.split(' '), stderr=subprocess.PIPE) | |
line = '' | |
print() | |
# Progress | |
while True: | |
out = dd.stderr.read(1) | |
if dd.poll() != None: break | |
if out != '': | |
s = out.decode("utf-8") | |
if s != '\r': line = line + s | |
else: | |
line = line.split(' ') | |
if len(line) > 1 and line[1] == 'bytes': | |
progress = math.ceil( int(line[0])*100/targetSize ) | |
print("Progess: "+str(progress)+" % ("+line[9]+" "+line[10]+")", end="\r") | |
# print(line) | |
line = '' | |
print("\n Syncing..") | |
subprocess.check_output("sync", shell=True) | |
print("\nDone !") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment