Created
May 15, 2025 10:17
-
-
Save Abhishek-Deshmukh/cf7e6177585219d786c15887f741df92 to your computer and use it in GitHub Desktop.
Submits the jobs to SLURM from a custom queue
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# leave this running on a tmux | |
# every 30 minutes it checks the SLURM queue and the custom queue and submits accordingly. | |
import os | |
import subprocess | |
import shutil | |
import logging | |
import time | |
# --- Configuration --- | |
SLURM_USERNAME = "your_username" | |
SLURM_JOB_LIMIT = 2000 | |
QUEUE_DIR = f"/lustre/cbm/users/{SLURM_USERNAME}/jobs/queue" | |
SUBMITTED_DIR = f"/lustre/cbm/users/{SLURM_USERNAME}/jobs/submitted" | |
LOG_FILE = f"/lustre/cbm/users/{SLURM_USERNAME}/jobs/slurm_queue_manager.log" | |
# --- Logging Setup --- | |
log_dir = os.path.dirname(LOG_FILE) | |
if log_dir and not os.path.exists(log_dir): | |
try: | |
os.makedirs(log_dir) | |
except OSError as e: | |
LOG_FILE = "slurm_queue_manager.log" | |
print(f"Warning: Could not create log directory {log_dir}. Logging to {LOG_FILE}. Error: {e}") | |
logging.basicConfig( | |
filename=LOG_FILE, | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s' | |
) | |
def get_slurm_job_count(username): | |
# Get job count using squeue -u <username> -h (headerless) | |
try: | |
command = ["squeue", "-u", username, "-h"] | |
result = subprocess.run( | |
command, | |
capture_output=True, | |
text=True, | |
check=False # Do not raise exception on non-zero exit code | |
) | |
if result.returncode != 0: | |
logging.error(f"squeue failed (exit {result.returncode}). Stderr: {result.stderr.strip()}") | |
return -1 | |
output_lines = result.stdout.strip().split('\n') | |
if not output_lines or output_lines == ['']: | |
return 0 | |
return len(output_lines) | |
except FileNotFoundError: | |
logging.error("squeue command not found.") | |
return -1 | |
except Exception as e: | |
logging.error(f"Error checking SLURM queue: {e}") | |
return -1 | |
def process_queue_directory(): | |
logging.info(f"Checking queue: {QUEUE_DIR}") | |
if not os.path.exists(QUEUE_DIR): | |
logging.warning(f"Queue directory not found: {QUEUE_DIR}.") | |
return | |
if not os.path.exists(SUBMITTED_DIR): | |
try: | |
os.makedirs(SUBMITTED_DIR) | |
logging.info(f"Created submitted dir: {SUBMITTED_DIR}") | |
except OSError as e: | |
logging.error(f"Error creating submitted dir {SUBMITTED_DIR}: {e}") | |
scripts_to_submit = sorted([f for f in os.listdir(QUEUE_DIR) if f.endswith(".sh")]) | |
if not scripts_to_submit: | |
logging.info("No scripts in queue.") | |
return | |
logging.info(f"Found {len(scripts_to_submit)} script(s).") | |
for script_filename in scripts_to_submit: | |
current_jobs = get_slurm_job_count(SLURM_USERNAME) | |
if current_jobs == -1: | |
logging.error("Failed to get job count. Skipping submission.") | |
break # Stop processing queue for this run | |
logging.info(f"Jobs for {SLURM_USERNAME}: {current_jobs}/{SLURM_JOB_LIMIT}") | |
if current_jobs >= SLURM_JOB_LIMIT: | |
logging.warning(f"Limit ({SLURM_JOB_LIMIT}) reached. Stopping submission for this run.") | |
break # Stop processing queue for this run | |
script_path = os.path.join(QUEUE_DIR, script_filename) | |
logging.info(f"Submitting: {script_filename}") | |
try: | |
submit_command = ["sbatch", script_path] | |
submit_result = subprocess.run( | |
submit_command, | |
capture_output=True, | |
text=True, | |
check=False # Do not raise exception on non-zero exit code | |
) | |
if submit_result.returncode == 0: | |
logging.info(f"Submitted {script_filename}. Stdout: {submit_result.stdout.strip()}") | |
try: | |
destination_path = os.path.join(SUBMITTED_DIR, script_filename) | |
shutil.move(script_path, destination_path) | |
logging.info(f"Moved {script_filename} to {SUBMITTED_DIR}") | |
except Exception as e: | |
logging.error(f"Error moving {script_filename} to {SUBMITTED_DIR}: {e}") | |
else: | |
logging.error(f"Submission failed for {script_filename}. Exit code: {submit_result.returncode}. Stderr: {submit_result.stderr.strip()}") | |
except FileNotFoundError: | |
logging.error(f"sbatch not found when submitting {script_filename}.") | |
break # Critical error, stop processing | |
except Exception as e: | |
logging.error(f"Unexpected error during submission of {script_filename}: {e}") | |
# continue # Optionally continue to next script | |
# --- Main --- | |
if __name__ == "__main__": | |
while True: | |
logging.info("--- Queue Manager Started ---") | |
current_jobs = get_slurm_job_count(SLURM_USERNAME) | |
if current_jobs == -1: | |
logging.error("Could not get job count. Aborting.") | |
elif current_jobs >= SLURM_JOB_LIMIT: | |
logging.info(f"Limit ({SLURM_JOB_LIMIT}) reached. No submission this run.") | |
else: | |
logging.info(f"Jobs for {SLURM_USERNAME}: {current_jobs}. Limit {SLURM_JOB_LIMIT}. Checking queue.") | |
process_queue_directory() | |
logging.info("--- Queue Manager Finished ---") | |
time.sleep(1800) # Sleep for 30 minutes before checking again |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment