Skip to content

Instantly share code, notes, and snippets.

@Abhishek-Deshmukh
Created May 15, 2025 10:17
Show Gist options
  • Save Abhishek-Deshmukh/cf7e6177585219d786c15887f741df92 to your computer and use it in GitHub Desktop.
Save Abhishek-Deshmukh/cf7e6177585219d786c15887f741df92 to your computer and use it in GitHub Desktop.
Submits the jobs to SLURM from a custom queue
# leave this running on a tmux
# every 30 minutes it checks the SLURM queue and the custom queue and submits accordingly.
import os
import subprocess
import shutil
import logging
import time
# --- Configuration ---
SLURM_USERNAME = "your_username"
SLURM_JOB_LIMIT = 2000
QUEUE_DIR = f"/lustre/cbm/users/{SLURM_USERNAME}/jobs/queue"
SUBMITTED_DIR = f"/lustre/cbm/users/{SLURM_USERNAME}/jobs/submitted"
LOG_FILE = f"/lustre/cbm/users/{SLURM_USERNAME}/jobs/slurm_queue_manager.log"
# --- Logging Setup ---
log_dir = os.path.dirname(LOG_FILE)
if log_dir and not os.path.exists(log_dir):
try:
os.makedirs(log_dir)
except OSError as e:
LOG_FILE = "slurm_queue_manager.log"
print(f"Warning: Could not create log directory {log_dir}. Logging to {LOG_FILE}. Error: {e}")
logging.basicConfig(
filename=LOG_FILE,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def get_slurm_job_count(username):
# Get job count using squeue -u <username> -h (headerless)
try:
command = ["squeue", "-u", username, "-h"]
result = subprocess.run(
command,
capture_output=True,
text=True,
check=False # Do not raise exception on non-zero exit code
)
if result.returncode != 0:
logging.error(f"squeue failed (exit {result.returncode}). Stderr: {result.stderr.strip()}")
return -1
output_lines = result.stdout.strip().split('\n')
if not output_lines or output_lines == ['']:
return 0
return len(output_lines)
except FileNotFoundError:
logging.error("squeue command not found.")
return -1
except Exception as e:
logging.error(f"Error checking SLURM queue: {e}")
return -1
def process_queue_directory():
logging.info(f"Checking queue: {QUEUE_DIR}")
if not os.path.exists(QUEUE_DIR):
logging.warning(f"Queue directory not found: {QUEUE_DIR}.")
return
if not os.path.exists(SUBMITTED_DIR):
try:
os.makedirs(SUBMITTED_DIR)
logging.info(f"Created submitted dir: {SUBMITTED_DIR}")
except OSError as e:
logging.error(f"Error creating submitted dir {SUBMITTED_DIR}: {e}")
scripts_to_submit = sorted([f for f in os.listdir(QUEUE_DIR) if f.endswith(".sh")])
if not scripts_to_submit:
logging.info("No scripts in queue.")
return
logging.info(f"Found {len(scripts_to_submit)} script(s).")
for script_filename in scripts_to_submit:
current_jobs = get_slurm_job_count(SLURM_USERNAME)
if current_jobs == -1:
logging.error("Failed to get job count. Skipping submission.")
break # Stop processing queue for this run
logging.info(f"Jobs for {SLURM_USERNAME}: {current_jobs}/{SLURM_JOB_LIMIT}")
if current_jobs >= SLURM_JOB_LIMIT:
logging.warning(f"Limit ({SLURM_JOB_LIMIT}) reached. Stopping submission for this run.")
break # Stop processing queue for this run
script_path = os.path.join(QUEUE_DIR, script_filename)
logging.info(f"Submitting: {script_filename}")
try:
submit_command = ["sbatch", script_path]
submit_result = subprocess.run(
submit_command,
capture_output=True,
text=True,
check=False # Do not raise exception on non-zero exit code
)
if submit_result.returncode == 0:
logging.info(f"Submitted {script_filename}. Stdout: {submit_result.stdout.strip()}")
try:
destination_path = os.path.join(SUBMITTED_DIR, script_filename)
shutil.move(script_path, destination_path)
logging.info(f"Moved {script_filename} to {SUBMITTED_DIR}")
except Exception as e:
logging.error(f"Error moving {script_filename} to {SUBMITTED_DIR}: {e}")
else:
logging.error(f"Submission failed for {script_filename}. Exit code: {submit_result.returncode}. Stderr: {submit_result.stderr.strip()}")
except FileNotFoundError:
logging.error(f"sbatch not found when submitting {script_filename}.")
break # Critical error, stop processing
except Exception as e:
logging.error(f"Unexpected error during submission of {script_filename}: {e}")
# continue # Optionally continue to next script
# --- Main ---
if __name__ == "__main__":
while True:
logging.info("--- Queue Manager Started ---")
current_jobs = get_slurm_job_count(SLURM_USERNAME)
if current_jobs == -1:
logging.error("Could not get job count. Aborting.")
elif current_jobs >= SLURM_JOB_LIMIT:
logging.info(f"Limit ({SLURM_JOB_LIMIT}) reached. No submission this run.")
else:
logging.info(f"Jobs for {SLURM_USERNAME}: {current_jobs}. Limit {SLURM_JOB_LIMIT}. Checking queue.")
process_queue_directory()
logging.info("--- Queue Manager Finished ---")
time.sleep(1800) # Sleep for 30 minutes before checking again
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment