Skip to content

Instantly share code, notes, and snippets.

@lastbattle
Created August 27, 2025 12:44
Show Gist options
  • Save lastbattle/1eb60eba7633696fcf3ea718dbbba068 to your computer and use it in GitHub Desktop.
Save lastbattle/1eb60eba7633696fcf3ea718dbbba068 to your computer and use it in GitHub Desktop.
Synchronise local folder content with NAS via SMB protocol
from tools.smb_sync import SMBSync
self.synology_nas_fs = SMBSync(
nas_ip=os.getenv('NAS_IP'),
username=os.getenv('NAS_USERNAME'),
password=os.getenv('NAS_PASSWORD'),
share_name=os.getenv('NAS_SHARE_NAME'),
nas_folder=os.getenv('NAS_FOLDER'),
local_folder=self.analysis_writer.base_dir.replace("./", ""), # Where the local data is stored
logger=logging
)
# Check connection status
logging.info("Testing connection to NAS storage db...")
logging.info(self.synology_nas_fs.connect())
if self.synology_nas_fs:
self.synology_nas_fs.sync()
if self.synology_nas_fs:
self.synology_nas_fs.disconnect()
import os
import smb.SMBConnection
from pathlib import Path
from smb.smb_structs import OperationFailure
import logging
class SMBSync:
"""A class to sync a local folder to a Synology NAS via SMB with 1:1 mirroring."""
# Initialize the NAS FS class via SMB protocol
def __init__(self, nas_ip, username, password, share_name, nas_folder, local_folder, client_name='python-client', logger=None):
"""
Initialize SMBSync with connection details and folder paths via SMB protocol
Args:
nas_ip (str): IP address of the NAS (e.g., '192.168.1.100').
username (str): NAS username.
password (str): NAS password.
share_name (str): NAS shared folder name.
nas_folder (str): Target folder path within the shared folder.
local_folder (str): Local folder path to sync.
client_name (str): Client identifier for SMB connection.
logger (logging.Logger, optional): Logger instance. Defaults to None.
"""
self.nas_ip = nas_ip
self.username = username
self.password = password
self.share_name = share_name
self.nas_folder = nas_folder.strip('/')
self.local_folder = os.path.abspath(local_folder)
self.client_name = client_name
self.conn = None
if logger:
self.logger = logger
else:
self.logger = logging.getLogger(__name__)
# Suppress DEBUG logs for telegram and its HTTP libraries
self.logger.getLogger('smb').setLevel(logging.WARNING)
def connect(self):
"""Establish SMB connection to the NAS."""
try:
self.conn = smb.SMBConnection.SMBConnection(
self.username,
self.password,
self.client_name,
'SynologyNAS',
use_ntlm_v2=True
)
self.conn.connect(self.nas_ip, 445)
self.logger.info("[NAS] Connected to NAS successfully.")
except Exception as e:
self.logger.error(f"[NAS] Failed to connect to NAS: {e}")
raise Exception(f"Failed to connect to NAS: {e}")
def reconnect(self):
"""Reconnect to the NAS if connection is lost."""
self.logger.info("[NAS] Attempting to reconnect to NAS...")
try:
self.disconnect()
self.connect()
return True
except Exception as e:
self.logger.error(f"[NAS] Failed to reconnect to NAS: {e}")
return False
def disconnect(self):
"""Close SMB connection if open."""
if self.conn:
try:
self.conn.close()
self.logger.info("[NAS] Disconnected from NAS.")
except Exception as e:
self.logger.warning(f"[NAS] Error closing connection: {e}")
self.conn = None
def _get_nas_path(self, local_path):
"""Convert local path to NAS path, ensuring forward slashes."""
relative_path = os.path.relpath(local_path, self.local_folder)
# Always use forward slashes for NAS paths
nas_path = f"{self.nas_folder}/{relative_path.replace(os.sep, '/')}"
return nas_path
def _ensure_nas_dir(self, nas_path):
"""Create directory on NAS if it doesn't exist."""
try:
parts = nas_path.strip('/').split('/')
current_path = ''
# Create each directory level if needed
for i, part in enumerate(parts):
if not part: # Skip empty parts
continue
if current_path:
current_path = f"{current_path}/{part}"
else:
current_path = part
# Skip the last part if it's a filename
if i == len(parts) - 1 and '.' in part:
continue
try:
self.conn.getAttributes(self.share_name, current_path)
self.logger.debug(f"[NAS] Directory exists: {current_path}")
except OperationFailure:
self.logger.info(f"[NAS] Creating directory: {current_path}")
self.conn.createDirectory(self.share_name, current_path)
except Exception as e:
# [Errno 32] Broken pipe
if "Broken pipe" in str(e):
self.logger.warning(f"[NAS] Connection lost while creating directory {nas_path}. Attempting to reconnect...")
if self.reconnect():
# Retry after reconnection
self._ensure_nas_dir(nas_path)
return
self.logger.error(f"[NAS] Error creating directory {nas_path}: {e}")
def _sync_file(self, local_path, nas_path):
"""Sync a single file to NAS, updating if necessary."""
try:
local_stat = os.stat(local_path)
local_size = local_stat.st_size
# Ensure parent directory exists
dir_path = '/'.join(nas_path.split('/')[:-1])
if dir_path:
self._ensure_nas_dir(dir_path)
try:
nas_attr = self.conn.getAttributes(self.share_name, nas_path)
nas_size = nas_attr.file_size
if local_size != nas_size:
with open(local_path, 'rb') as f:
self.conn.storeFile(self.share_name, nas_path, f)
self.logger.info(f"[NAS] Updated: {nas_path}")
else:
self.logger.debug(f"[NAS] Skipped (same size): {nas_path}")
except OperationFailure:
with open(local_path, 'rb') as f:
self.conn.storeFile(self.share_name, nas_path, f)
self.logger.info(f"[NAS] Uploaded: {nas_path}")
except Exception as e:
# [Errno 32] Broken pipe
if "Broken pipe" in str(e):
self.logger.warning(f"[NAS] Connection lost while syncing {local_path}. Attempting to reconnect...")
if self.reconnect():
# Retry after reconnection
self._sync_file(local_path, nas_path)
return
self.logger.error(f"[NAS] Error syncing {local_path} to {nas_path}: {e}")
def _delete_extra_nas_files(self, nas_path):
"""Delete files and directories on NAS that don't exist in local folder."""
try:
nas_items = self.conn.listPath(self.share_name, nas_path)
for item in nas_items:
if item.filename in ['.', '..']:
continue
item_path = f"{nas_path}/{item.filename}" if nas_path else item.filename
# Convert NAS path to local path using forward slash conversion
rel_path = item_path[len(self.nas_folder):].lstrip('/')
local_equivalent = os.path.join(self.local_folder, rel_path.replace('/', os.sep))
if item.isDirectory:
self._delete_extra_nas_files(item_path)
if not os.path.exists(local_equivalent):
try:
self.conn.deleteDirectory(self.share_name, item_path)
self.logger.info(f"[NAS] Deleted directory: {item_path}")
except OperationFailure as e:
self.logger.warning(f"[NAS] Failed to delete directory {item_path}: {e}")
else:
if not os.path.exists(local_equivalent):
self.conn.deleteFiles(self.share_name, item_path)
self.logger.info(f"[NAS] Deleted file: {item_path}")
except Exception as e:
# [Errno 32] Broken pipe
if "Broken pipe" in str(e):
self.logger.warning(f"[NAS] Connection lost while processing {nas_path}. Attempting to reconnect...")
if self.reconnect():
# Retry after reconnection
self._delete_extra_nas_files(nas_path)
return
self.logger.error(f"[NAS] Error processing {nas_path}: {e}")
def sync(self):
"""Perform 1:1 sync of local folder to NAS."""
try:
self.logger.info(f"[NAS] Starting sync from {self.local_folder} to {self.nas_folder}...")
if not self.conn:
self.connect()
# First ensure the base directory exists
self._ensure_nas_dir(self.nas_folder)
# Sync files and directories
for root, dirs, files in os.walk(self.local_folder):
nas_root = self._get_nas_path(root)
self._ensure_nas_dir(nas_root)
for file in files:
local_path = os.path.join(root, file)
nas_path = self._get_nas_path(local_path)
self._sync_file(local_path, nas_path)
# Delete extra files on NAS
self._delete_extra_nas_files(self.nas_folder)
self.logger.info("[NAS] Sync completed successfully.")
except Exception as e:
# [Errno 32] Broken pipe
if "Broken pipe" in str(e):
self.logger.warning("[NAS] Connection lost during sync. Attempting to reconnect and continue...")
if self.reconnect():
# Retry the sync after reconnection
self.sync()
return
self.logger.error(f"[NAS] Sync failed: {e}")
finally:
self.disconnect()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment