Created
          July 30, 2014 21:13 
        
      - 
      
- 
        Save DarwinAwardWinner/de5df8314593ac85d6ba to your computer and use it in GitHub Desktop. 
    Preloading wrappers for FastqStreamer and scanBam
  
        
  
    
      This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
      Learn more about bidirectional Unicode characters
    
  
  
    
  | library(ShortRead) | |
| library(parallel) | |
| tsmsg <- function(...) message(format(Sys.time(), "%Y-%m-%d %H:%M:%OS6"), ": ", ...) | |
| ## Like readChild but returns NULL when child is finished, and | |
| ## automatically unserializes results. | |
| readChildSafe <- function(child) { | |
| res <- parallel:::readChild(child) | |
| if (is.integer(res) || is.null(res)) { | |
| return(NULL) | |
| } | |
| return(unserialize(res)) | |
| } | |
| .PreloadProcWrapper <- setClass("PreloadProcWrapper", slots=c(proc="ANY")) | |
| setValidity("PreloadProcWrapper", function(object) { | |
| if (is(object@proc, "process")) | |
| TRUE | |
| else | |
| "proc must be a process" | |
| }) | |
| PreloadProcWrapper <- function(proc) .PreloadProcWrapper(proc=proc) | |
| ## TODO: Allow using the specified yield method instead of always "yield" | |
| setMethod("yield", signature=c(x="PreloadProcWrapper"), definition = function(x, ...) { | |
| parallel:::sendChildStdin(x@proc, "\n") | |
| readChildSafe(x@proc) | |
| }) | |
| ## returns a function that works like the constructor function but | |
| ## transparently preloads. | |
| preloadWrapper <- function(constructor, yieldMethod) { | |
| function(...) { | |
| args <- list(...) | |
| proc <- mcparallel({ | |
| stream <- do.call(constructor, args) | |
| nyield <- 0 | |
| tsmsg("CHILD: Preloading yield ", nyield+1, ".") | |
| while (length(nextYield <- yieldMethod(stream))) { | |
| nyield <- nyield + 1 | |
| tsmsg("CHILD: Preloaded yield ", nyield, " of length ", length(nextYield), ".") | |
| ## Wait for the next request | |
| readLines(con=stdin(), n=1) | |
| parallel:::sendMaster(nextYield) | |
| tsmsg("CHILD: Sent yield ", nyield, ".") | |
| tsmsg("CHILD: Preloading yield ", nyield+1, ".") | |
| } | |
| tsmsg("CHILD: Yield ", nyield+1, " was empty; not sending.") | |
| NULL | |
| }) | |
| PreloadProcWrapper(proc) | |
| } | |
| } | |
| PreloadingFastqStreamer <- preloadWrapper(FastqStreamer, yield) | |
| { | |
| sp <- SolexaPath(system.file('extdata', package='ShortRead')) | |
| fl <- file.path(analysisPath(sp), "s_1_sequence.txt") | |
| fqs <- PreloadingFastqStreamer(fl, n=75) | |
| Sys.sleep(1) | |
| nyield <- 0 | |
| tsmsg("MAIN: Requesting yield ", nyield+1, ".") | |
| while(length(x <- yield(fqs))) { | |
| nyield <- nyield + 1 | |
| tsmsg("MAIN: Received yield ", nyield, " of length ", length(x), ".") | |
| tsmsg("MAIN: Performing imaginary long calculation here.") | |
| # Long calculation here | |
| Sys.sleep(1) | |
| tsmsg("MAIN: Completed imaginary long calculation.") | |
| tsmsg("MAIN: Processed yield ", nyield, ".") | |
| tsmsg("MAIN: Requesting yield ", nyield+1, ".") | |
| } | |
| tsmsg("MAIN: Received empty yield ", nyield+1, ".") | |
| } | |
| PreloadingBamFile <- preloadWrapper(function(...) open(BamFile(...)), function(bam) scanBam(bam)[[1]]$qname) | |
| { | |
| fl <- system.file("extdata", "ex1.bam", package="Rsamtools", | |
| mustWork=TRUE) | |
| bfs <- PreloadingBamFile(fl, yieldSize=500) | |
| Sys.sleep(1) | |
| nyield <- 0 | |
| tsmsg("MAIN: Requesting yield ", nyield+1, ".") | |
| while(length(x <- yield(bfs))) { | |
| nyield <- nyield + 1 | |
| tsmsg("MAIN: Received yield ", nyield, " of length ", length(x), ".") | |
| tsmsg("MAIN: Performing imaginary long calculation here.") | |
| # Long calculation here | |
| Sys.sleep(1) | |
| tsmsg("MAIN: Completed imaginary long calculation.") | |
| tsmsg("MAIN: Processed yield ", nyield, ".") | |
| tsmsg("MAIN: Requesting yield ", nyield+1, ".") | |
| } | |
| tsmsg("MAIN: Received empty yield ", nyield+1, ".") | |
| } | 
  
    Sign up for free
    to join this conversation on GitHub.
    Already have an account?
    Sign in to comment