Created
August 11, 2014 04:11
-
-
Save mpkocher/df2af47e19de35353e8a to your computer and use it in GitHub Desktop.
Port of the streaming filter report to Scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.github.mpkocher | |
import java.io.File | |
// This library is not very impressive, but it works. | |
// The main limitation is that it doesn't read in large chunks of the file. | |
import com.github.tototoshi.csv._ | |
/* | |
filtered_summary.csv | |
File format | |
Movie,ReadId,#Bases,Readlength,ReadScore,SequencingZMW,Productivity,PassedFilter | |
m120201_042231_42129_c100275262550000001523007907041260_s1_p0,m120201_042231_42129_c100275262550000001523007907041260_s1_p0/0,291,0,0.0000,0,0,0 | |
m120201_042231_42129_c100275262550000001523007907041260_s1_p0,m120201_042231_42129_c100275262550000001523007907041260_s1_p0/1,308,0,0.0000,0,0,0 | |
m120201_042231_42129_c100275262550000001523007907041260_s1_p0,m120201_042231_42129_c100275262550000001523007907041260_s1_p0/2,284,0,0.0000,0,0,0 | |
m120201_042231_42129_c100275262550000001523007907041260_s1_p0,m120201_042231_42129_c100275262550000001523007907041260_s1_p0/3,319,0,0.0000,0,0,0 | |
Note: The artifact filter yields a file with different headers | |
*/ | |
case class FilterRecord(movieId: String, readId: String, nBases: Int, readLength: Int, readScore: Double, sZmw: Int, productivity: Int, passedFilter: Int) | |
case class ReportAttribute(attributeId: String, value: Number) | |
case class Report(attributeId: String, attributes: List[ReportAttribute]) | |
abstract class Accumulator(val attributeId: String) { | |
def applyRecord(record: FilterRecord): Unit | |
def value: Number | |
def toAttribute : ReportAttribute = { | |
ReportAttribute(attributeId, value) | |
} | |
} | |
class TotalBasesAccumulator(override val attributeId: String, var total: Int = 0) extends Accumulator(attributeId) { | |
override def applyRecord(record: FilterRecord): Unit = { | |
total += record.nBases | |
} | |
override def value = total | |
} | |
class TotalRecordsAccumulator(override val attributeId: String, var total: Int = 0) extends Accumulator(attributeId) { | |
override def applyRecord(record: FilterRecord): Unit = { | |
total += 1 | |
} | |
override def value = total | |
} | |
class MeanReadLengthAccumulator(override val attributeId: String, var nrecords: Int = 0, var total: Int = 0) extends Accumulator(attributeId) { | |
override def applyRecord(r: FilterRecord): Unit = { | |
nrecords += 1 | |
total += r.readLength | |
} | |
def value: Number = { | |
nrecords match { | |
case 0 => 0 | |
case _ => total / nrecords | |
} | |
} | |
} | |
class MeanReadScoreAccumulator(override val attributeId: String, var nrecords: Int = 0, var total: Double = 0) extends Accumulator(attributeId) { | |
override def applyRecord(r: FilterRecord): Unit = { | |
nrecords += 1 | |
total += r.readScore | |
} | |
def value: Number = { | |
nrecords match { | |
case 0 => 0 | |
case _ => total / nrecords.toDouble | |
} | |
} | |
} | |
// Filter Records | |
trait FilterAble { | |
def applyRecord(r: FilterRecord): Boolean | |
} | |
class FilterByReadLength(val minReadLength: Int) extends FilterAble { | |
override def applyRecord(record: FilterRecord): Boolean = record.readLength > minReadLength | |
} | |
class FilterByReadScore(val minReadScore: Double) extends FilterAble { | |
override def applyRecord(r: FilterRecord): Boolean = r.readScore > minReadScore | |
} | |
class FilterHasPassedFilter extends FilterAble { | |
override def applyRecord(r: FilterRecord): Boolean = r.passedFilter == 1 | |
} | |
class StatsModel(val filters: List[FilterAble], var accumulators: List[Accumulator]) { | |
def applyRecord(r: FilterRecord): Unit = { | |
val passedFilters = if (filters.isEmpty) { | |
true | |
} | |
else { | |
filters.map(f => f.applyRecord(r)).reduce(_ && _) | |
} | |
if (passedFilters) { | |
for (accumulator <- accumulators) { | |
accumulator.applyRecord(r) | |
} | |
} | |
} | |
} | |
object ReportUtils { | |
def toRecord(movieId: String, readId: String, nBases: String, readLength: String, readScore: String, sZmw: String, productivity: String, passedFilter: String): FilterRecord = { | |
FilterRecord(movieId, readId, nBases.toInt, readLength.toInt, readScore.toDouble, sZmw.toInt, productivity.toInt, passedFilter.toInt) | |
} | |
def printStatsModelSummary(models: List[StatsModel]): Unit = { | |
for (model <- models) { | |
for (accumulator <- model.accumulators) { | |
println(accumulator.toAttribute) | |
} | |
} | |
} | |
def statsModelsToReport(attributeId: String, models: List[StatsModel]): Report = { | |
val attributes = models.flatMap(x => x.accumulators).map(_.toAttribute) | |
Report(attributeId, attributes) | |
} | |
def toModels : List[StatsModel] = { | |
// PreFilter | |
val preFilters = List[FilterAble]() | |
val preAccumulators = List(new TotalBasesAccumulator("prefilter.nbases"), new TotalRecordsAccumulator("prefilter.nreads"), new MeanReadLengthAccumulator("prefilter.mean_readlength"), new MeanReadScoreAccumulator("prefilter.mean_readscore")) | |
val preStatsModel = new StatsModel(preFilters, preAccumulators) | |
// PostFilter | |
val postFilters = List(new FilterHasPassedFilter(), new FilterByReadScore(0.75)) | |
val postAccumulators = List(new TotalRecordsAccumulator("postfilter.nreads"), new TotalBasesAccumulator("post.nbases"), new MeanReadLengthAccumulator("post.mean_readlength"), new MeanReadScoreAccumulator("post.mean_readscore")) | |
val postStatsModel = new StatsModel(postFilters, postAccumulators) | |
List(preStatsModel, postStatsModel) | |
} | |
def runAnalysis(models: List[StatsModel], record: FilterRecord): Unit = { | |
for (model <- models) { | |
model.applyRecord(record) | |
} | |
} | |
def toReport(fileName: String): Report = { | |
val models = toModels | |
val reader = CSVReader.open(new File(fileName)) | |
val it = reader.iterator | |
// Read in Header | |
println(reader.readNext()) | |
while (it.hasNext) { | |
val x = it.next() | |
x match { | |
// This is a bit clumsy | |
case List(a, b, c, d, e, f, g, h) => runAnalysis(models, toRecord(a, b, c, d, e, f, g ,h)) | |
case _ => println(x) //This should use the Left, Right pattern | |
} | |
} | |
reader.close() | |
printStatsModelSummary(models) | |
statsModelsToReport("filter_report", models) | |
} | |
} | |
object Program extends App { | |
override def main(args: Array[String]): Unit = { | |
// val fileName = "/Users/mkocher/scratch/filter_stats_streaming/filtered_summary.csv" | |
val fileName = "/Users/mkocher/scratch/filter_stats_streaming/filtered_summary_first_100.csv" | |
println(fileName) | |
val report = ReportUtils.toReport(fileName) | |
println(report) | |
println("exiting ...") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment