Last active
January 11, 2017 13:40
-
-
Save willfurnass/0c24fd13f2ee2535fbae9a0340cd9d3a to your computer and use it in GitHub Desktop.
Ruffus + DRMAA test
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# A test of whether Ruffus can be used to submit jobs to a cluster using the | |
# DRMAA API, sharing the DRMAA session between threads to allow multiple jobs to | |
# be submitted at once without burdening the head node with multiple DRMAA | |
# sessions. | |
# Can run this from a conda environnment created using | |
# conda create -n drmaatest -c bioconda python=2.7 ruffus drmaa | |
from __future__ import print_function | |
from ruffus import transform, suffix, pipeline_run, cmdline | |
from ruffus.drmaa_wrapper import run_job | |
import drmaa | |
import sys | |
session = drmaa.Session() | |
session.initialize() | |
starting_files = ["a.stg1", "b.stg1", "c.stg1"] | |
# where each file contains multiple lines with one word per line | |
# logger which can be passed to multiprocessing ruffus tasks | |
logger, logger_mutex = cmdline.setup_logging(__name__, 'test.log', True) | |
@transform(starting_files, suffix(".stg1"), ".stg2") | |
def stg1_func(input_file, output_file): | |
"""Extract just the first three lines of the input file.""" | |
stmt = "head -n 3 {} > {}".format(input_file, output_file) | |
run_job(stmt, | |
job_name="stage1", | |
drmaa_session=session, | |
job_other_options="-V -l rmem=50M", | |
job_script_directory=".", | |
logger=logger, | |
retain_job_scripts=True) | |
@transform(stg1_func, suffix(".stg2"), ".stg3") | |
def stg2_func(input_file, output_file): # | |
"""Join the lines of the input file.""" | |
stmt = "paste -sd' ' {} > {}".format(input_file, output_file) | |
run_job(stmt, | |
job_name="stage2", | |
drmaa_session=session, | |
job_other_options="-V -l rmem=50M", | |
job_script_directory=".", | |
logger=logger, | |
retain_job_scripts=True) | |
if __name__ == '__main__': | |
pipeline_run(target_tasks=[stg2_func], multithread=3) | |
session.exit() | |
sys.exit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment