Skip to content

Instantly share code, notes, and snippets.

@aespinosa
Created July 17, 2010 20:19
Show Gist options
  • Save aespinosa/479822 to your computer and use it in GitHub Desktop.
Save aespinosa/479822 to your computer and use it in GitHub Desktop.
Karajan broadcast code
/*
* File: transfer_sgt.k
* Author: Allan Espinosa
* Description: Utility to broadcast SGT files produces from resources in
* Teragrid to OSG.
*
* References:
* sites pool() definition code based from Mihael Hategan's I2U2 monitor.k code
*/
import("sys.k")
import("task.k")
import("vdl-lib.xml")
global(LOG:DEBUG, "debug")
global(LOG:INFO, "info")
global(LOG:WARN, "warn")
global(LOG:ERROR, "error")
global(LOG:FATAL, "fatal")
element(removeJM, [url]
if(matches(url, ".*/jobmanager-.*") first(split(url, "/")) url)
)
element(pool, [handle, ..., optional(workdir), channel(properties)]
list(
host(name = handle
each(...)
to(properties
each(properties)
)
)
try(workdir, "")
)
)
element(servicelist, [type, provider, url]
service(type, provider=provider, jobManager="fork", url=removeJM(url))
/*list(type, provider, url, service(type, provider=provider, jobManager="fork", url=removeJM(url)))*/
)
element(gridftp, [url, optional(storage), optional(major), optional(minor), optional(patch)]
if(
url == "local://localhost"
servicelist("file", "local", "")
servicelist("file", "gsiftp", url)
)
)
element(jobmanager, [url, major, optional(universe), optional(minor), optional(patch)]
provider := if(
url == "local://localhost" "local"
url == "pbs://localhost" "pbs"
major == "4" "GT4"
major == "2" "GT2"
throw("Unknown job manager version: major = {major}, minor = {minor}, patch = {patch}, url = {url}")
)
servicelist(type="execution", provider=provider, url=url)
)
element(execution, [provider, url]
servicelist(type="execution", provider=provider, url=url)
)
element(filesystem, [provider, url, optional(storage)]
servicelist(type="file", provider=provider, url=url)
)
element(profile, [namespace, key, value]
if(
namespace == "karajan"
property("{key}", value)
property("{namespace}:{key}", value)
)
)
element(workdirectory, [dir]
workdir = dir
)
sitesFile := "osg.xml"
sites := list(executeFile(sitesFile))
/* Directory listing on all sites */
/*parallelFor(site, sites*/
/*rhost := first(site)*/
/*workdir := concat(last(site), "/SgtFiles/LGU")*/
/*if(not(file:exists(workdir, host=rhost, provider="gsiftp"))*/
/*dir:make(workdir, host=rhost, provider="gsiftp")*/
/*)*/
/*dump := file:list(workdir, host=rhost, provider="gsiftp")*/
/*print("{rhost} {dump}")*/
/*)*/
element(xfer, [sites, src, dest]
sequential(
shost := first(list:get(sites, src))
dhost := first(list:get(sites, dest))
sdir := concat(last(list:get(sites, src)), "/SgtFiles/LGU")
ddir := concat(last(list:get(sites, dest)), "/SgtFiles/LGU")
files := list(
"LGU_fx_664.sgt.md5"
"LGU_fy_664.sgt.md5"
"LGU_fx_664.sgt"
"LGU_fy_664.sgt"
)
for(file, files
print("{file} START Transferring from {shost} to {dhost}")
log(LOG:INFO, "{file} START Transferring from {shost} to {dhost}")
transfer(srcfile=file, srcdir=sdir, srchost=shost
destfile=file, destdir=ddir, desthost=dhost
provider="gsiftp")
/*wait(delay=2000)*/
print("{file} END Transferring from {shost} to {dhost}")
log(LOG:INFO, "{file} END Transferring from {shost} to {dhost}")
)
)
)
/* Hard-wired spanning tree-based broadcast code */
/*xfer(sites, 4, 9)*/
xfer(sites, 1, 2)
parallel(
sequential(
xfer(sites, 1, 3)
parallel(
sequential( xfer(sites, 1,5), xfer(sites,1,9) )
sequential( xfer(sites,3,7) )
)
)
sequential(
xfer(sites,2,4)
parallel(
sequential( xfer(sites,2,6))
sequential( xfer(sites,4,8) )
)
)
)
/* removed transfers because of osg.xml sitation */
/*sequential( xfer(sites,4,8), xfer(sites,4,12) )*/
/*sequential( xfer(sites,3,7), xfer(sites,3,11) )*/
/*sequential( xfer(sites,2,6), xfer(sites,2,10) )*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment