Created
February 2, 2023 21:34
-
-
Save ca0abinary/25191485044a7ccbe7d147cde2c4717e to your computer and use it in GitHub Desktop.
Reads the partition table of an EBS snapshot using Ebs Direct API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# python -m pip install pyreadpartitions | |
# Update the volume_id to a volume with one or more snapshots you have access to | |
# Run the script! | |
from pyreadpartitions import get_disk_partitions_info | |
def main(): | |
ebs = EbsDirectIO(volume_id='vol-0ec018d77cf9af4dd') | |
print(f''' | |
Got data for volume {ebs.snapshot["VolumeId"]} | |
Volume Size: {ebs.snapshot["VolumeSize"]} GiB | |
Snapshot Id: {ebs.snapshot["SnapshotId"]} | |
Snapshot Start: {ebs.snapshot["StartTime"]} | |
Snapshot State: {ebs.snapshot["State"]} | |
''') | |
info = get_disk_partitions_info(ebs) | |
print(f' Uses {"MBR" if info.mbr is not None else "GPT"} partition table {info.mbr if info.mbr is not None else info.gpt}') | |
ebs.close() | |
##### | |
# EbsDirectIO.py | |
##### | |
from collections import namedtuple | |
from typing import List | |
import boto3 | |
import sqlite3 | |
from mypy_boto3_ec2 import EC2Client | |
from mypy_boto3_ebs import EBSClient | |
from mypy_boto3_ec2.type_defs import SnapshotTypeDef | |
from mypy_boto3_ebs.type_defs import BlockTypeDef | |
class EbsDirectIO: | |
ec2_client:EC2Client = None | |
ebs_client:EBSClient = None | |
snapshot:SnapshotTypeDef = None | |
offset:int = 0 | |
block_size:int = 524288 | |
db_connection:sqlite3.Connection = None | |
db_cursor:sqlite3.Cursor = None | |
DataBlock = namedtuple("DataBlock", "block_index block_token data") | |
def __init__(self, ec2_client:EC2Client=None, | |
ebs_client:EBSClient=None, | |
snapshot:SnapshotTypeDef=None, | |
volume_id:str=None | |
) -> None: | |
self.ebs_client = ebs_client | |
if self.ebs_client is None: | |
self.ebs_client = boto3.client('ebs') | |
self.ec2_client = ec2_client | |
if self.ec2_client is None: | |
self.ec2_client = boto3.client('ec2') | |
self.snapshot = snapshot | |
if self.snapshot is None: | |
snapshots = self.ec2_client.describe_snapshots(Filters=[{ | |
'Name': 'volume-id', | |
'Values': [volume_id] | |
}])['Snapshots'] | |
self.snapshot = sorted(snapshots, | |
key=lambda x: x['StartTime'], | |
reverse=True)[0] | |
self.db_connect() | |
first_block = self.db_get_block(1) | |
if first_block is None: | |
initial_blocks = self.ebs_client.list_snapshot_blocks(SnapshotId=self.snapshot["SnapshotId"], MaxResults=100)["Blocks"] | |
self.db_put_block_defs(initial_blocks) | |
def db_connect(self) -> None: | |
self.db_connection = sqlite3.connect(f'{self.snapshot["SnapshotId"]}.db') | |
with self.db_connection as con: | |
con.execute('CREATE TABLE IF NOT EXISTS [blocks] (block_index INT NOT NULL PRIMARY KEY, block_token INT NOT NULL, data BLOB NULL)') | |
def db_put_block_defs(self, blocks:List[BlockTypeDef]) -> None: | |
with self.db_connection as con: | |
con.executemany(''' | |
INSERT INTO [blocks] (block_index, block_token) VALUES (?, ?) | |
ON CONFLICT(block_index) DO UPDATE SET block_token=excluded.block_token | |
''', [(x['BlockIndex'], x['BlockToken']) for x in blocks]) | |
def db_get_block(self, block_index:int) -> DataBlock: | |
with self.db_connection as con: | |
res = con.execute('SELECT * FROM [blocks] WHERE block_index = ?', (block_index,)) | |
result = res.fetchone() | |
if result is None: return None | |
return self.DataBlock(*result) | |
def db_put_block(self, block_index:int, block_data:bytes) -> None: | |
with self.db_connection as con: | |
con.execute('UPDATE [blocks] SET data = ? WHERE block_index = ?', (block_data, block_index,)) | |
def tell(self) -> int: return self.offset | |
def fileno(self) -> int: return 0 | |
def seek(self, offset:int=0) -> int: | |
self.offset = offset | |
return self.offset | |
def close(self) -> None: | |
self.db_connection.close() | |
def read(self, size:int=-1) -> bytes: | |
offset_block = int(self.offset / self.block_size) | |
num_blocks = int(size / self.block_size) | |
if num_blocks < 1 and size > 0: num_blocks = 1 | |
data_buffer = bytearray() | |
for i in range(0, num_blocks): | |
block_data = self.db_get_block(offset_block + i) | |
if block_data is None: | |
block_defs = self.ebs_client.list_snapshot_blocks(SnapshotId=self.snapshot["SnapshotId"], StartingBlockIndex=offset_block, MaxResults=100)["Blocks"] | |
self.db_put_block_defs(block_defs) | |
block_data = self.db_get_block(offset_block + i) | |
if block_data.data is None: | |
data = self.ebs_client.get_snapshot_block(SnapshotId=self.snapshot['SnapshotId'], | |
BlockIndex=block_data.block_index, | |
BlockToken=block_data.block_token) | |
blob = data['BlockData'].read() | |
self.db_put_block(block_data.block_index, blob) | |
data_buffer.extend(blob) | |
else: | |
data = self.db_get_block(offset_block + i) | |
data_buffer.extend(data.data) | |
sub_block_offset = self.offset % self.block_size | |
self.offset = self.offset + size | |
return bytes(data_buffer[sub_block_offset:sub_block_offset+size]) | |
#### | |
# Run | |
#### | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment