Skip to content

Instantly share code, notes, and snippets.

@afgane
Created November 12, 2018 14:06
Show Gist options
  • Save afgane/4d6ca128744795a4e6ceb34b7605dfc0 to your computer and use it in GitHub Desktop.
Save afgane/4d6ca128744795a4e6ceb34b7605dfc0 to your computer and use it in GitHub Desktop.
GVL instance setup for Pulsar bursting
# /mnt/galaxy/galaxy-app/lib/galaxy/jobs/rules/dyndests.py
from galaxy.jobs import JobDestination
import os
import multiprocessing
from galaxycloudrunner.server import get_next_server
# This rules file determines the number of local cpus and then sets the
# Slurm --ntasks parameter appropriately. The maximum and minimum cpus
# to use per job is listed below. These values can be altered to suit the
# current system. So for a 1 - 2 cpu system, the --ntasks is set to 1, for
# a 4 cpu system it is set to 2 and for 8 or above it is set to 4.
# Any other dynamic settings of cpu or other slurm parameters like walltime
# or memory can also be set here. An example is the mapping_dynamic_job_wrapper
# function. It looks at the size of the input files and determines the number
# of cpus to use. It could just as easily change the walltime or memory requirements.
MINCPUS = 1
MAXCPUS = 4
def _adjustcpus(x):
if x < MINCPUS:
return MINCPUS
elif x > MAXCPUS:
return MAXCPUS
else:
return x
def mapping_dynamic_job_wrapper(job):
#allocate extra cpus for large files.
cpus_avail = multiprocessing.cpu_count()
inp_data = dict([(da.name, da.dataset) for da in job.input_datasets])
inp_data.update([(da.name, da.dataset) for da in job.input_library_datasets])
query_file = inp_data["fastq_input1"].file_name
query_size = os.path.getsize(query_file)
if query_size > 100 * 1024 * 1024:
cpunum = cpus_avail
else:
cpunum = cpus_avail/2
cpunum = _adjustcpus(cpunum)
cpu_str = "--ntasks=" + str(cpunum)
return JobDestination(runner="slurm", params={"nativeSpecification": cpu_str})
def gcr_runner():
print("Scheduling jobsss")
next_server = get_next_server()
print("on next_server %s" % next_server)
if next_server.get('url'):
print("job dest %s" % next_server)
return JobDestination(
#runner=next_server.get('runner'),
runner="pulsar_rest",
params={"url": next_server.get('url'),
"private_token": next_server.get('auth_token')})
return None
def default_dynamic_job_wrapper(job):
gcr = gcr_runner()
if gcr:
return gcr
#Allocate the number of cpus based on the number available (by instance size)
cpus_avail = multiprocessing.cpu_count()
cpunum = cpus_avail/2
cpunum = _adjustcpus(cpunum)
cpu_str = "--ntasks=" + str(cpunum)
return JobDestination(runner="slurm", params={"nativeSpecification": cpu_str})
<!-- /opt/cloudman/config/conftemplates/job_conf.xml -->
<?xml version="1.0" ?>
<job_conf>
<plugins>
<plugin id="dynamic" type="runner">
<param id="rules_module">galaxy.jobs.rules</param>
</plugin>
<plugin id="slurm" load="galaxy.jobs.runners.slurm:SlurmJobRunner" type="runner">
<param id="internalexception_state">error</param>
<param id="internalexception_retries">10</param>
</plugin>
<plugin id="pulsar_rest" load="galaxy.jobs.runners.pulsar:PulsarRESTJobRunner" type="runner"/>
<plugin id="local" load="galaxy.jobs.runners.local:LocalJobRunner" type="runner" workers="4"/>
</plugins>
<handlers default="handlers">
$cloudman_handlers
</handlers>
<destinations default="default_dynamic_job_wrapper">
<!-- Slurm specific destinations -->
<destination id="slurm_cluster" runner="slurm"/>
<destination id="slurm_2slots" runner="slurm">
<param id="nativeSpecification">--ntasks=2</param>
</destination>
<destination id="slurm_4slots" runner="slurm">
<param id="nativeSpecification">--ntasks=4</param>
</destination>
<destination id="slurm_8slots" runner="slurm">
<param id="nativeSpecification">--ntasks=8</param>
</destination>
<destination id="slurm_16slots" runner="slurm">
<param id="nativeSpecification">--ntasks=16</param>
</destination>
<!-- Dynamic destinations -->
<destination id="mapping_dynamic" runner="dynamic">
<param id="type">python</param>
<param id="function">mapping_dynamic_job_wrapper</param>
</destination>
<destination id="default_dynamic_job_wrapper" runner="dynamic">
<param id="type">python</param>
<param id="function">default_dynamic_job_wrapper</param>
</destination>
<!-- pulsar config -->
<destination id="pulsar_server" runner="pulsar_rest">
<param id="url">http://pulsar_server_ip:pulsar_server_port/</param>
</destination>
<!-- local destination -->
<destination id="local_runner" runner="local"/>
</destinations>
<tools>
<!-- Need to specify no multi-threading for GATK Depth of Coverage -->
<tool destination="slurm_cluster" id="toolshed.g2.bx.psu.edu/repos/iuc/gatk2/gatk2_depth_of_coverage/2.8.0"/>
<tool destination="slurm_cluster" id="upload1"/>
</tools>
</job_conf>
$ source /mnt/galaxy/galaxy-app/.venv/bin/activate
$ pip install -U git+https://github.com/CloudVE/cloudlaunch-cli.git
# pip install GalaxyCloudRunner (after scp'ing it)
# edit /mnt/galaxy/galaxy-app/run.sh to add a line for export CLOUDLAUNCH_API_TOKEN="..."
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment