Created
August 27, 2025 12:44
-
-
Save lastbattle/1eb60eba7633696fcf3ea718dbbba068 to your computer and use it in GitHub Desktop.
Synchronise local folder content with NAS via SMB protocol
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from tools.smb_sync import SMBSync | |
self.synology_nas_fs = SMBSync( | |
nas_ip=os.getenv('NAS_IP'), | |
username=os.getenv('NAS_USERNAME'), | |
password=os.getenv('NAS_PASSWORD'), | |
share_name=os.getenv('NAS_SHARE_NAME'), | |
nas_folder=os.getenv('NAS_FOLDER'), | |
local_folder=self.analysis_writer.base_dir.replace("./", ""), # Where the local data is stored | |
logger=logging | |
) | |
# Check connection status | |
logging.info("Testing connection to NAS storage db...") | |
logging.info(self.synology_nas_fs.connect()) | |
if self.synology_nas_fs: | |
self.synology_nas_fs.sync() | |
if self.synology_nas_fs: | |
self.synology_nas_fs.disconnect() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import smb.SMBConnection | |
from pathlib import Path | |
from smb.smb_structs import OperationFailure | |
import logging | |
class SMBSync: | |
"""A class to sync a local folder to a Synology NAS via SMB with 1:1 mirroring.""" | |
# Initialize the NAS FS class via SMB protocol | |
def __init__(self, nas_ip, username, password, share_name, nas_folder, local_folder, client_name='python-client', logger=None): | |
""" | |
Initialize SMBSync with connection details and folder paths via SMB protocol | |
Args: | |
nas_ip (str): IP address of the NAS (e.g., '192.168.1.100'). | |
username (str): NAS username. | |
password (str): NAS password. | |
share_name (str): NAS shared folder name. | |
nas_folder (str): Target folder path within the shared folder. | |
local_folder (str): Local folder path to sync. | |
client_name (str): Client identifier for SMB connection. | |
logger (logging.Logger, optional): Logger instance. Defaults to None. | |
""" | |
self.nas_ip = nas_ip | |
self.username = username | |
self.password = password | |
self.share_name = share_name | |
self.nas_folder = nas_folder.strip('/') | |
self.local_folder = os.path.abspath(local_folder) | |
self.client_name = client_name | |
self.conn = None | |
if logger: | |
self.logger = logger | |
else: | |
self.logger = logging.getLogger(__name__) | |
# Suppress DEBUG logs for telegram and its HTTP libraries | |
self.logger.getLogger('smb').setLevel(logging.WARNING) | |
def connect(self): | |
"""Establish SMB connection to the NAS.""" | |
try: | |
self.conn = smb.SMBConnection.SMBConnection( | |
self.username, | |
self.password, | |
self.client_name, | |
'SynologyNAS', | |
use_ntlm_v2=True | |
) | |
self.conn.connect(self.nas_ip, 445) | |
self.logger.info("[NAS] Connected to NAS successfully.") | |
except Exception as e: | |
self.logger.error(f"[NAS] Failed to connect to NAS: {e}") | |
raise Exception(f"Failed to connect to NAS: {e}") | |
def reconnect(self): | |
"""Reconnect to the NAS if connection is lost.""" | |
self.logger.info("[NAS] Attempting to reconnect to NAS...") | |
try: | |
self.disconnect() | |
self.connect() | |
return True | |
except Exception as e: | |
self.logger.error(f"[NAS] Failed to reconnect to NAS: {e}") | |
return False | |
def disconnect(self): | |
"""Close SMB connection if open.""" | |
if self.conn: | |
try: | |
self.conn.close() | |
self.logger.info("[NAS] Disconnected from NAS.") | |
except Exception as e: | |
self.logger.warning(f"[NAS] Error closing connection: {e}") | |
self.conn = None | |
def _get_nas_path(self, local_path): | |
"""Convert local path to NAS path, ensuring forward slashes.""" | |
relative_path = os.path.relpath(local_path, self.local_folder) | |
# Always use forward slashes for NAS paths | |
nas_path = f"{self.nas_folder}/{relative_path.replace(os.sep, '/')}" | |
return nas_path | |
def _ensure_nas_dir(self, nas_path): | |
"""Create directory on NAS if it doesn't exist.""" | |
try: | |
parts = nas_path.strip('/').split('/') | |
current_path = '' | |
# Create each directory level if needed | |
for i, part in enumerate(parts): | |
if not part: # Skip empty parts | |
continue | |
if current_path: | |
current_path = f"{current_path}/{part}" | |
else: | |
current_path = part | |
# Skip the last part if it's a filename | |
if i == len(parts) - 1 and '.' in part: | |
continue | |
try: | |
self.conn.getAttributes(self.share_name, current_path) | |
self.logger.debug(f"[NAS] Directory exists: {current_path}") | |
except OperationFailure: | |
self.logger.info(f"[NAS] Creating directory: {current_path}") | |
self.conn.createDirectory(self.share_name, current_path) | |
except Exception as e: | |
# [Errno 32] Broken pipe | |
if "Broken pipe" in str(e): | |
self.logger.warning(f"[NAS] Connection lost while creating directory {nas_path}. Attempting to reconnect...") | |
if self.reconnect(): | |
# Retry after reconnection | |
self._ensure_nas_dir(nas_path) | |
return | |
self.logger.error(f"[NAS] Error creating directory {nas_path}: {e}") | |
def _sync_file(self, local_path, nas_path): | |
"""Sync a single file to NAS, updating if necessary.""" | |
try: | |
local_stat = os.stat(local_path) | |
local_size = local_stat.st_size | |
# Ensure parent directory exists | |
dir_path = '/'.join(nas_path.split('/')[:-1]) | |
if dir_path: | |
self._ensure_nas_dir(dir_path) | |
try: | |
nas_attr = self.conn.getAttributes(self.share_name, nas_path) | |
nas_size = nas_attr.file_size | |
if local_size != nas_size: | |
with open(local_path, 'rb') as f: | |
self.conn.storeFile(self.share_name, nas_path, f) | |
self.logger.info(f"[NAS] Updated: {nas_path}") | |
else: | |
self.logger.debug(f"[NAS] Skipped (same size): {nas_path}") | |
except OperationFailure: | |
with open(local_path, 'rb') as f: | |
self.conn.storeFile(self.share_name, nas_path, f) | |
self.logger.info(f"[NAS] Uploaded: {nas_path}") | |
except Exception as e: | |
# [Errno 32] Broken pipe | |
if "Broken pipe" in str(e): | |
self.logger.warning(f"[NAS] Connection lost while syncing {local_path}. Attempting to reconnect...") | |
if self.reconnect(): | |
# Retry after reconnection | |
self._sync_file(local_path, nas_path) | |
return | |
self.logger.error(f"[NAS] Error syncing {local_path} to {nas_path}: {e}") | |
def _delete_extra_nas_files(self, nas_path): | |
"""Delete files and directories on NAS that don't exist in local folder.""" | |
try: | |
nas_items = self.conn.listPath(self.share_name, nas_path) | |
for item in nas_items: | |
if item.filename in ['.', '..']: | |
continue | |
item_path = f"{nas_path}/{item.filename}" if nas_path else item.filename | |
# Convert NAS path to local path using forward slash conversion | |
rel_path = item_path[len(self.nas_folder):].lstrip('/') | |
local_equivalent = os.path.join(self.local_folder, rel_path.replace('/', os.sep)) | |
if item.isDirectory: | |
self._delete_extra_nas_files(item_path) | |
if not os.path.exists(local_equivalent): | |
try: | |
self.conn.deleteDirectory(self.share_name, item_path) | |
self.logger.info(f"[NAS] Deleted directory: {item_path}") | |
except OperationFailure as e: | |
self.logger.warning(f"[NAS] Failed to delete directory {item_path}: {e}") | |
else: | |
if not os.path.exists(local_equivalent): | |
self.conn.deleteFiles(self.share_name, item_path) | |
self.logger.info(f"[NAS] Deleted file: {item_path}") | |
except Exception as e: | |
# [Errno 32] Broken pipe | |
if "Broken pipe" in str(e): | |
self.logger.warning(f"[NAS] Connection lost while processing {nas_path}. Attempting to reconnect...") | |
if self.reconnect(): | |
# Retry after reconnection | |
self._delete_extra_nas_files(nas_path) | |
return | |
self.logger.error(f"[NAS] Error processing {nas_path}: {e}") | |
def sync(self): | |
"""Perform 1:1 sync of local folder to NAS.""" | |
try: | |
self.logger.info(f"[NAS] Starting sync from {self.local_folder} to {self.nas_folder}...") | |
if not self.conn: | |
self.connect() | |
# First ensure the base directory exists | |
self._ensure_nas_dir(self.nas_folder) | |
# Sync files and directories | |
for root, dirs, files in os.walk(self.local_folder): | |
nas_root = self._get_nas_path(root) | |
self._ensure_nas_dir(nas_root) | |
for file in files: | |
local_path = os.path.join(root, file) | |
nas_path = self._get_nas_path(local_path) | |
self._sync_file(local_path, nas_path) | |
# Delete extra files on NAS | |
self._delete_extra_nas_files(self.nas_folder) | |
self.logger.info("[NAS] Sync completed successfully.") | |
except Exception as e: | |
# [Errno 32] Broken pipe | |
if "Broken pipe" in str(e): | |
self.logger.warning("[NAS] Connection lost during sync. Attempting to reconnect and continue...") | |
if self.reconnect(): | |
# Retry the sync after reconnection | |
self.sync() | |
return | |
self.logger.error(f"[NAS] Sync failed: {e}") | |
finally: | |
self.disconnect() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment