Last active
August 20, 2023 18:02
-
-
Save mirkobrombin/22f3a11d9fa6be7291e8918c4ddd28e6 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import shlex | |
import subprocess | |
class Diskutils: | |
@staticmethod | |
def pretty_size(size: int) -> str: | |
if size > 1024**3: | |
return f"{round(size / 1024 ** 3, 2)} GB" | |
elif size > 1024**2: | |
return f"{round(size / 1024 ** 2, 2)} MB" | |
elif size > 1024: | |
return f"{round(size / 1024, 2)} KB" | |
else: | |
return f"{size} B" | |
class Disk: | |
def __init__(self, disk: str): | |
self.__disk = disk | |
self.__partitions = self.__get_partitions() | |
self.__size = self.__get_size() | |
def __get_partitions(self): | |
partitions = [] | |
for partition in os.listdir(self.block): | |
if partition.startswith(self.__disk): | |
partitions.append(Partition(self.__disk, partition)) | |
for volume in self.__get_volumes_from_partition(partition): | |
partitions.append(volume) | |
return partitions | |
def __get_size(self): | |
with open(f"{self.block}/size", "r") as f: | |
return int(f.read().strip()) * 512 | |
def __get_volumes_from_partition(self, partition: str): | |
volumes = [] | |
out = "" | |
found = False | |
vg = "" | |
try: | |
out = subprocess.check_output(["sudo", "pvdisplay"]).decode("utf-8") | |
except subprocess.CalledProcessError: | |
return volumes | |
for line in out.split("\n"): | |
_out = "" | |
if partition in line: | |
found = True | |
continue | |
if found: | |
if "VG Name" in line: | |
vg = line.replace(" ", "").replace("VGName", "") | |
try: | |
_out = subprocess.check_output( | |
["sudo", "lvdisplay", vg] | |
).decode("utf-8") | |
except subprocess.CalledProcessError: | |
break | |
for _line in _out.split("\n"): | |
if "LV Path" in _line: | |
lv = _line.replace(" ", "").replace("LVPath", "") | |
volumes.append(Partition(vg, lv)) | |
return volumes | |
@property | |
def partitions(self): | |
return self.__partitions | |
def get_partition(self, mountpoint: str): | |
for partition in self.partitions: | |
if partition.mountpoint == mountpoint: | |
return partition | |
def update_partitions(self): | |
self.__partitions = self.__get_partitions() | |
@property | |
def disk(self): | |
return f"/dev/{self.__disk}" | |
@property | |
def name(self): | |
return self.__disk | |
@property | |
def block(self): | |
return f"/sys/block/{self.__disk}" | |
@property | |
def size(self): | |
return self.__size | |
@property | |
def pretty_size(self): | |
size = self.size | |
if size > 1024**3: | |
return f"{round(size / 1024 ** 3, 2)} GB" | |
elif size > 1024**2: | |
return f"{round(size / 1024 ** 2, 2)} MB" | |
elif size > 1024: | |
return f"{round(size / 1024, 2)} KB" | |
else: | |
return f"{size} B" | |
class Partition: | |
def __init__(self, disk: str, partition: str): | |
self.__disk = disk | |
self.__partition = partition | |
self.__mountpoint = self.__get_mountpoint() | |
self.__size = self.__get_size() | |
self.__fs_type = self.__get_fs_type() | |
self.__uuid = self.__get_uuid() | |
self.__label = self.__get_label() | |
def __get_mountpoint(self): | |
try: | |
return ( | |
subprocess.check_output( | |
f"findmnt -n -o TARGET {self.partition}", shell=True | |
) | |
.decode("utf-8") | |
.strip() | |
) | |
except subprocess.CalledProcessError: | |
return None | |
def __get_size(self): | |
try: | |
size_output = ( | |
subprocess.check_output( | |
f"lsblk -d -n -b -o SIZE {self.partition}", shell=True | |
) | |
.decode("utf-8") | |
.strip() | |
) | |
return int(size_output) | |
except subprocess.CalledProcessError: | |
return None | |
def __get_fs_type(self): | |
try: | |
return ( | |
subprocess.check_output( | |
f"lsblk -d -n -o FSTYPE {self.partition}", shell=True | |
) | |
.decode("utf-8") | |
.strip() | |
) | |
except subprocess.CalledProcessError: | |
return None | |
def __get_uuid(self): | |
try: | |
return ( | |
subprocess.check_output( | |
f"lsblk -d -n -o UUID {self.partition}", shell=True | |
) | |
.decode("utf-8") | |
.strip() | |
) | |
except subprocess.CalledProcessError: | |
return None | |
def __get_label(self): | |
try: | |
return ( | |
subprocess.check_output( | |
f"findmnt -n -o LABEL {self.partition}", shell=True | |
) | |
.decode("utf-8") | |
.strip() | |
) | |
except subprocess.CalledProcessError: | |
return None | |
@property | |
def partition(self): | |
if self.__partition.startswith("/dev"): | |
return self.__partition | |
return f"/dev/{self.__partition}" | |
@property | |
def block(self): | |
if self.__partition.startswith("/dev"): | |
return self.__partition | |
return f"/sys/block/{self.__disk}/{self.__partition}" | |
@property | |
def mountpoint(self): | |
return self.__mountpoint | |
@property | |
def size(self): | |
return self.__size | |
@property | |
def pretty_size(self): | |
size = self.size | |
if size > 1024**3: | |
return f"{round(size / 1024 ** 3, 2)} GB" | |
elif size > 1024**2: | |
return f"{round(size / 1024 ** 2, 2)} MB" | |
elif size > 1024: | |
return f"{round(size / 1024, 2)} KB" | |
else: | |
return f"{size} B" | |
@property | |
def fs_type(self): | |
return self.__fs_type | |
@property | |
def uuid(self): | |
return self.__uuid | |
@property | |
def label(self): | |
return self.__label | |
def __lt__(self, other): | |
return self.partition < other.partition | |
def __eq__(self, other): | |
if not other: | |
return False | |
return self.uuid == other.uuid and self.fs_type == other.fs_type | |
class DisksManager: | |
def __init__(self): | |
self.__disks = self.__get_disks() | |
def __get_disks(self): | |
disks = [] | |
for disk in os.listdir("/sys/block"): | |
if disk.startswith(("loop", "ram", "sr", "zram", "dm")): | |
continue | |
if os.path.isfile("/sys/block/" + disk + "/removable"): | |
with open("/sys/block/" + disk + "/removable") as f: | |
removable = int(f.readlines()[0].strip()) | |
if removable == 1: | |
continue | |
disks.append(Disk(disk)) | |
return disks | |
@property | |
def all_disks(self): | |
return self.__disks | |
def get_disk(self, disk: str): | |
for disk in self.all_disks: | |
if disk.disk == disk: | |
return disk |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment