Created
December 30, 2022 18:38
-
-
Save ardzz/ad355044ba9e06789dad3f27f8986479 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import subprocess | |
import time | |
from ftplib import FTP | |
from rich import print | |
from rich.live import Live | |
from rich.panel import Panel | |
from rich.table import Table | |
ftp_config = { | |
'host': 'ftp.fbi.gov', | |
'port': 21, | |
'user': '', | |
'password': 'fbi', | |
'path': 'secret-path/' | |
} | |
mysql_config = { | |
'host': 'localhost', | |
'port': 3306, | |
'username': 'root', | |
'password': '', | |
'database': 'drugs', | |
} | |
backup_file_name = f"{mysql_config['database']}-{time.strftime('%Y-%m-%d')}.sql" | |
def export_database(file_output): | |
if mysql_config['password'] == '': | |
command = f"mysqldump -u {mysql_config['username']} -h {mysql_config['host']} -P {mysql_config['port']} {mysql_config['database']} > {file_output}" | |
else: | |
command = f"mysqldump -u {mysql_config['username']} -p{mysql_config['password']} -h {mysql_config['host']} -P {mysql_config['port']} {mysql_config['database']} > {file_output}" | |
subprocess.call(command, shell=True) | |
return os.path.isfile(file_output) | |
class FTP_client: | |
def __init__(self, credentials: dict): | |
required_keys = ['host', 'port', 'user', 'password'] | |
for key in required_keys: | |
if key not in credentials: | |
raise ValueError(f"Missing required key: {key}") | |
if not credentials[key]: | |
raise ValueError(f"Empty value for key: {key}") | |
self.host = credentials['host'] | |
self.port = credentials['port'] | |
self.user = credentials['user'] | |
self.password = credentials['password'] | |
self.path = credentials['path'] | |
self.ftp = FTP() | |
def connect(self): | |
connect = self.ftp.connect(self.host, self.port) | |
return connect.startswith('220'); | |
def login(self): | |
login = self.ftp.login(self.user, self.password) | |
return login.startswith('230'); | |
def upload(self, file_path: str, file_name: str): | |
with open(file_path, 'rb') as file: | |
return self.ftp.storbinary(f'STOR {self.path}{file_name}', file) | |
def download(self, file_name: str, file_path: str): | |
with open(file_path, 'wb') as file: | |
download = self.ftp.retrbinary(f'RETR {self.path}{file_name}', file.write) | |
return download.startswith('226') | |
def __del__(self): | |
self.ftp.quit() | |
if __name__ == "__main__": | |
message = "Backup file management menggunakan FTP\n" | |
message += "──────────────────────────────────────\n" | |
message += f"Host: {ftp_config['host']}\nPort: {ftp_config['port']}\nUser: {ftp_config['user']}" | |
message += f"\nPath: {ftp_config['path']}\n" | |
message += f"File: {backup_file_name}" | |
print(Panel.fit(message, title="Tugas Besar - Sistem Operasi", border_style="green")) | |
table = Table(show_header=True, header_style="bold magenta") | |
table.add_column("Actions", style="dim", width=12) | |
table.add_column("Status", justify="right") | |
table.add_column("Time", justify="center") | |
with Live(table, refresh_per_second=1) as live: | |
ftp = FTP_client(ftp_config) | |
table.add_row("Connection", f"Connecting to {ftp.host}:{ftp.port}", time.strftime("%H:%M:%S")) | |
live.update(table) | |
connecting_time = time.time() | |
if ftp.connect(): | |
table.columns[1]._cells[0] = "[bold green]Connected[/bold green]" | |
table.columns[2]._cells[0] = f"{time.time() - connecting_time:.2f}s" | |
live.update(table) | |
else: | |
table.columns[1]._cells[0] = "[bold red]Failed[/bold red]" | |
table.columns[2]._cells[0] = f"{time.time() - connecting_time:.2f}s" | |
live.update(table) | |
exit() | |
table.add_row("Login", "Logging in...") | |
live.update(table) | |
login_time = time.time() | |
if ftp.login(): | |
table.columns[1]._cells[1] = "[bold green]Logged in[/bold green]" | |
table.columns[2]._cells[1] = f"{time.time() - login_time:.2f}s" | |
live.update(table) | |
else: | |
table.columns[1]._cells[1] = "[bold red]Failed[/bold red]" | |
table.columns[2]._cells[1] = f"{time.time() - login_time:.2f}s" | |
live.update(table) | |
exit() | |
table.add_row("Export", "Exporting database...") | |
live.update(table) | |
export_time = time.time() | |
if export_database(backup_file_name): | |
table.columns[1]._cells[2] = "[bold green]Exported[/bold green]" | |
table.columns[2]._cells[2] = f"{time.time() - export_time:.2f}s" | |
live.update(table) | |
else: | |
table.columns[1]._cells[2] = "[bold red]Failed[/bold red]" | |
table.columns[2]._cells[2] = f"{time.time() - export_time:.2f}s" | |
live.update(table) | |
exit() | |
table.add_row("Upload", f"Uploading backup file ... ({os.path.getsize(backup_file_name) / 1024:.2f} KB)") | |
live.update(table) | |
upload_time = time.time() | |
upload = ftp.upload(backup_file_name, backup_file_name) | |
if upload.startswith('226'): | |
table.columns[1]._cells[3] = "[bold green]Uploaded[/bold green]" | |
table.columns[2]._cells[3] = f"{time.time() - upload_time:.2f}s" | |
live.update(table) | |
else: | |
table.columns[1]._cells[3] = f"[bold red]Failed[/bold red] ({upload})" | |
table.columns[2]._cells[3] = f"{time.time() - upload_time:.2f}s" | |
live.update(table) | |
table.add_row("Delete", "Deleting backup file...") | |
live.update(table) | |
delete_time = time.time() | |
os.remove(backup_file_name) | |
if not os.path.isfile(backup_file_name): | |
table.columns[1]._cells[4] = "[bold green]Deleted[/bold green]" | |
table.columns[2]._cells[4] = f"{time.time() - delete_time:.2f}s" | |
live.update(table) | |
else: | |
table.columns[1]._cells[4] = "[bold red]Failed[/bold red]" | |
table.columns[2]._cells[4] = f"{time.time() - delete_time:.2f}s" | |
live.update(table) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment