|
import os |
|
import ssl |
|
import sys |
|
import ftplib |
|
import zipfile |
|
import re |
|
import requests |
|
import urllib.parse |
|
import shutil |
|
import time |
|
|
|
class ImplicitFTP_TLS(ftplib.FTP_TLS): |
|
""" |
|
FTP_TLS subclass that automatically wraps sockets in SSL to support implicit FTPS. |
|
see https://stackoverflow.com/a/36049814 |
|
""" |
|
def __init__(self, *args, **kwargs): |
|
super().__init__(*args, **kwargs) |
|
self._sock = None |
|
|
|
@property |
|
def sock(self): |
|
"""Return the socket.""" |
|
return self._sock |
|
|
|
@sock.setter |
|
def sock(self, value): |
|
"""When modifying the socket, ensure that it is ssl wrapped.""" |
|
if value is not None and not isinstance(value, ssl.SSLSocket): |
|
value = self.context.wrap_socket(value) |
|
self._sock = value |
|
|
|
def download_model(host, access_code, model, download_path='.'): |
|
download_path = os.path.abspath(download_path) |
|
os.makedirs(download_path, exist_ok=True) |
|
local_file_path = os.path.join(download_path, model) |
|
|
|
try: |
|
ftp = ImplicitFTP_TLS() |
|
ftp.connect(host=host, port=990, timeout=10) |
|
ftp.login(user='bblp', passwd=access_code) |
|
ftp.prot_p() |
|
|
|
# Get a list of available files to handle special characters |
|
available_files = ftp.nlst() |
|
print("Files on FTP Server:", available_files) # Debugging line |
|
|
|
# Check if the exact filename exists |
|
if model not in available_files: |
|
print(f"File {model} not found in root. Checking cache/ directory...") |
|
ftp.cwd('cache') |
|
available_files = ftp.nlst() |
|
if model not in available_files: |
|
print(f"Error: File {model} not found on FTP server!") |
|
ftp.quit() |
|
return |
|
else: |
|
print(f"File {model} found in root directory.") |
|
|
|
# Download file |
|
with open(local_file_path, 'wb') as fp: |
|
ftp.retrbinary(f'RETR {model}', fp.write) |
|
|
|
ftp.quit() |
|
|
|
# Verify file size after download |
|
if os.path.getsize(local_file_path) == 0: |
|
print(f"Error: Downloaded file is 0KB! Something went wrong.") |
|
else: |
|
print(f"Successfully downloaded {model} ({os.path.getsize(local_file_path)} bytes)") |
|
|
|
except Exception as e: |
|
print(f"FTP download failed: {e}") |
|
|
|
|
|
def extract_image_and_gcode(model_path, extract_path="www/bblab"): |
|
""" |
|
Extracts the image and the G-code file from a 3MF archive and saves them with fixed names. |
|
Deletes all other extracted files/folders before renaming and moving the important ones. |
|
""" |
|
model_path = os.path.abspath(model_path) |
|
extract_path = os.path.abspath(extract_path) |
|
|
|
print(f"Checking file: {model_path}") |
|
|
|
time.sleep(2) # Ensure the file is fully downloaded before processing |
|
|
|
if not os.path.exists(model_path): |
|
print(f"Error: File {model_path} does not exist!") |
|
return |
|
|
|
if not zipfile.is_zipfile(model_path): |
|
print(f"Error: {model_path} is not a valid 3MF (ZIP) file.") |
|
return |
|
|
|
try: |
|
with zipfile.ZipFile(model_path, 'r') as archive: |
|
print("3MF archive successfully opened. Extracting all files...") |
|
archive.extractall(extract_path) # Extract everything |
|
|
|
# Print extracted structure for debugging |
|
for root, dirs, files in os.walk(extract_path): |
|
print(f"Extracted folder: {root}") |
|
for file in files: |
|
print(f" - {file}") |
|
|
|
# Clean up unnecessary files |
|
gcode_file = None |
|
image_file = None |
|
all_files = [] |
|
|
|
# Identify all extracted files and store them for deletion (except gcode and image) |
|
for root, dirs, files in os.walk(extract_path): |
|
for file in files: |
|
file_path = os.path.join(root, file) |
|
all_files.append(file_path) |
|
|
|
# Check for the G-code and image file |
|
if file.lower().endswith(".gcode"): |
|
gcode_file = file_path |
|
elif file.lower() == "plate_1.png": |
|
image_file = file_path |
|
|
|
# Now delete all files except the G-code and image files |
|
for file_path in all_files: |
|
if file_path not in [gcode_file, image_file]: |
|
try: |
|
os.remove(file_path) |
|
print(f"Deleted extra file: {file_path}") |
|
except Exception as e: |
|
print(f"Error deleting file {file_path}: {e}") |
|
|
|
# Delete empty directories (and non-empty if they don't contain the needed files) |
|
for root, dirs, files in os.walk(extract_path, topdown=False): |
|
for dir in dirs: |
|
dir_path = os.path.join(root, dir) |
|
# Only remove empty directories, if they don't have our files |
|
if not os.listdir(dir_path): # Empty directory |
|
shutil.rmtree(dir_path) |
|
print(f"Deleted empty folder: {dir_path}") |
|
|
|
# After cleanup, now rename and move the important files |
|
final_gcode_path = os.path.join(extract_path, 'plate_1.gcode') |
|
if gcode_file: |
|
shutil.move(gcode_file, final_gcode_path) |
|
print(f"Extracted real G-code: {final_gcode_path}") |
|
extract_filament_data(final_gcode_path) |
|
else: |
|
print("Error: No valid G-code file found after extraction.") |
|
|
|
final_image_path = os.path.join(extract_path, 'cover_image.png') |
|
if image_file: |
|
shutil.move(image_file, final_image_path) |
|
print(f"Extracted image to {final_image_path}") |
|
else: |
|
print("Error: plate_1.png not found in the 3MF archive.") |
|
|
|
# Remove the original .3mf file |
|
os.remove(model_path) |
|
print(f"Deleted the .3mf file: {model_path}") |
|
|
|
except Exception as e: |
|
print(f"An error occurred: {e}") |
|
def extract_filament_data(gcode_file_path): |
|
""" |
|
Extract filament data (weight, cost, length, type) from a G-code file and |
|
print it out (or update Home Assistant helpers as needed). |
|
""" |
|
filament_cost = None |
|
filament_weight = None |
|
filament_length = None |
|
filament_type = None |
|
|
|
with open(gcode_file_path, 'r') as file: |
|
content = file.read() |
|
|
|
cost_match = re.search(r'; filament cost = ([\d\.]+)', content) |
|
if cost_match: |
|
filament_cost = cost_match.group(1) |
|
|
|
weight_match = re.search(r'; filament used \[g\] = ([\d\.]+)', content) |
|
if weight_match: |
|
filament_weight = weight_match.group(1) |
|
|
|
length_match = re.search(r'; filament used \[mm\] = ([\d\.]+)', content) |
|
if length_match: |
|
filament_length = length_match.group(1) |
|
|
|
type_match = re.search(r'; filament_type = (.+)', content) |
|
if type_match: |
|
filament_type = type_match.group(1) |
|
|
|
|
|
if filament_cost: |
|
print(f"{filament_cost}") |
|
update_home_assistant_helper("input_number.3d_current_cost", filament_cost) |
|
|
|
if filament_weight: |
|
print(f"{filament_weight}") |
|
update_home_assistant_helper("input_number.3d_current_weight", filament_weight) |
|
|
|
if filament_length: |
|
print(f"{filament_length}") |
|
update_home_assistant_helper("input_number.3d_current_length", filament_length) |
|
|
|
if filament_type: |
|
print(f"{filament_type}") |
|
update_home_assistant_helper("input_text.3d_current_type", filament_type) |
|
|
|
def update_home_assistant_helper(helper_name, value): |
|
""" |
|
Update Home Assistant input helper while preserving metadata. |
|
""" |
|
url = f'http://homeassistant.local:8123/api/states/{helper_name}' |
|
headers = { |
|
'Authorization': 'Bearer <YOUR_LONG_LIVED_ACCESS_TOKEN>', |
|
'Content-Type': 'application/json', |
|
} |
|
|
|
try: |
|
get_response = requests.get(url, headers={'Authorization': headers['Authorization']}) |
|
get_response.raise_for_status() |
|
current_state = get_response.json() |
|
|
|
|
|
payload = { |
|
'state': str(value), |
|
'attributes': current_state.get('attributes', {}) |
|
} |
|
|
|
|
|
response = requests.post(url, headers=headers, json=payload) |
|
response.raise_for_status() |
|
print(f"Successfully updated {helper_name} with value: {value}") |
|
|
|
except requests.exceptions.RequestException as e: |
|
print(f"Failed to update {helper_name}: {e}") |
|
|
|
if __name__ == "__main__": |
|
if len(sys.argv) < 4: |
|
print("Host, access code, and model name required") |
|
else: |
|
printer_ip = sys.argv[1] |
|
access_code = sys.argv[2] |
|
model_name = sys.argv[3] |
|
|
|
|
|
download_model(host=printer_ip, access_code=access_code, model=model_name, download_path='www/bblab') |
|
|
|
|
|
extract_image_and_gcode(f'www/bblab/{model_name}', extract_path='www/bblab') |
Cool, would be nice to get this straight into the integration instead.