Skip to content

Instantly share code, notes, and snippets.

@mpkocher
Created August 11, 2014 04:11
Show Gist options
  • Save mpkocher/df2af47e19de35353e8a to your computer and use it in GitHub Desktop.
Save mpkocher/df2af47e19de35353e8a to your computer and use it in GitHub Desktop.
Port of the streaming filter report to Scala
package com.github.mpkocher
import java.io.File
// This library is not very impressive, but it works.
// The main limitation is that it doesn't read in large chunks of the file.
import com.github.tototoshi.csv._
/*
filtered_summary.csv
File format
Movie,ReadId,#Bases,Readlength,ReadScore,SequencingZMW,Productivity,PassedFilter
m120201_042231_42129_c100275262550000001523007907041260_s1_p0,m120201_042231_42129_c100275262550000001523007907041260_s1_p0/0,291,0,0.0000,0,0,0
m120201_042231_42129_c100275262550000001523007907041260_s1_p0,m120201_042231_42129_c100275262550000001523007907041260_s1_p0/1,308,0,0.0000,0,0,0
m120201_042231_42129_c100275262550000001523007907041260_s1_p0,m120201_042231_42129_c100275262550000001523007907041260_s1_p0/2,284,0,0.0000,0,0,0
m120201_042231_42129_c100275262550000001523007907041260_s1_p0,m120201_042231_42129_c100275262550000001523007907041260_s1_p0/3,319,0,0.0000,0,0,0
Note: The artifact filter yields a file with different headers
*/
case class FilterRecord(movieId: String, readId: String, nBases: Int, readLength: Int, readScore: Double, sZmw: Int, productivity: Int, passedFilter: Int)
case class ReportAttribute(attributeId: String, value: Number)
case class Report(attributeId: String, attributes: List[ReportAttribute])
abstract class Accumulator(val attributeId: String) {
def applyRecord(record: FilterRecord): Unit
def value: Number
def toAttribute : ReportAttribute = {
ReportAttribute(attributeId, value)
}
}
class TotalBasesAccumulator(override val attributeId: String, var total: Int = 0) extends Accumulator(attributeId) {
override def applyRecord(record: FilterRecord): Unit = {
total += record.nBases
}
override def value = total
}
class TotalRecordsAccumulator(override val attributeId: String, var total: Int = 0) extends Accumulator(attributeId) {
override def applyRecord(record: FilterRecord): Unit = {
total += 1
}
override def value = total
}
class MeanReadLengthAccumulator(override val attributeId: String, var nrecords: Int = 0, var total: Int = 0) extends Accumulator(attributeId) {
override def applyRecord(r: FilterRecord): Unit = {
nrecords += 1
total += r.readLength
}
def value: Number = {
nrecords match {
case 0 => 0
case _ => total / nrecords
}
}
}
class MeanReadScoreAccumulator(override val attributeId: String, var nrecords: Int = 0, var total: Double = 0) extends Accumulator(attributeId) {
override def applyRecord(r: FilterRecord): Unit = {
nrecords += 1
total += r.readScore
}
def value: Number = {
nrecords match {
case 0 => 0
case _ => total / nrecords.toDouble
}
}
}
// Filter Records
trait FilterAble {
def applyRecord(r: FilterRecord): Boolean
}
class FilterByReadLength(val minReadLength: Int) extends FilterAble {
override def applyRecord(record: FilterRecord): Boolean = record.readLength > minReadLength
}
class FilterByReadScore(val minReadScore: Double) extends FilterAble {
override def applyRecord(r: FilterRecord): Boolean = r.readScore > minReadScore
}
class FilterHasPassedFilter extends FilterAble {
override def applyRecord(r: FilterRecord): Boolean = r.passedFilter == 1
}
class StatsModel(val filters: List[FilterAble], var accumulators: List[Accumulator]) {
def applyRecord(r: FilterRecord): Unit = {
val passedFilters = if (filters.isEmpty) {
true
}
else {
filters.map(f => f.applyRecord(r)).reduce(_ && _)
}
if (passedFilters) {
for (accumulator <- accumulators) {
accumulator.applyRecord(r)
}
}
}
}
object ReportUtils {
def toRecord(movieId: String, readId: String, nBases: String, readLength: String, readScore: String, sZmw: String, productivity: String, passedFilter: String): FilterRecord = {
FilterRecord(movieId, readId, nBases.toInt, readLength.toInt, readScore.toDouble, sZmw.toInt, productivity.toInt, passedFilter.toInt)
}
def printStatsModelSummary(models: List[StatsModel]): Unit = {
for (model <- models) {
for (accumulator <- model.accumulators) {
println(accumulator.toAttribute)
}
}
}
def statsModelsToReport(attributeId: String, models: List[StatsModel]): Report = {
val attributes = models.flatMap(x => x.accumulators).map(_.toAttribute)
Report(attributeId, attributes)
}
def toModels : List[StatsModel] = {
// PreFilter
val preFilters = List[FilterAble]()
val preAccumulators = List(new TotalBasesAccumulator("prefilter.nbases"), new TotalRecordsAccumulator("prefilter.nreads"), new MeanReadLengthAccumulator("prefilter.mean_readlength"), new MeanReadScoreAccumulator("prefilter.mean_readscore"))
val preStatsModel = new StatsModel(preFilters, preAccumulators)
// PostFilter
val postFilters = List(new FilterHasPassedFilter(), new FilterByReadScore(0.75))
val postAccumulators = List(new TotalRecordsAccumulator("postfilter.nreads"), new TotalBasesAccumulator("post.nbases"), new MeanReadLengthAccumulator("post.mean_readlength"), new MeanReadScoreAccumulator("post.mean_readscore"))
val postStatsModel = new StatsModel(postFilters, postAccumulators)
List(preStatsModel, postStatsModel)
}
def runAnalysis(models: List[StatsModel], record: FilterRecord): Unit = {
for (model <- models) {
model.applyRecord(record)
}
}
def toReport(fileName: String): Report = {
val models = toModels
val reader = CSVReader.open(new File(fileName))
val it = reader.iterator
// Read in Header
println(reader.readNext())
while (it.hasNext) {
val x = it.next()
x match {
// This is a bit clumsy
case List(a, b, c, d, e, f, g, h) => runAnalysis(models, toRecord(a, b, c, d, e, f, g ,h))
case _ => println(x) //This should use the Left, Right pattern
}
}
reader.close()
printStatsModelSummary(models)
statsModelsToReport("filter_report", models)
}
}
object Program extends App {
override def main(args: Array[String]): Unit = {
// val fileName = "/Users/mkocher/scratch/filter_stats_streaming/filtered_summary.csv"
val fileName = "/Users/mkocher/scratch/filter_stats_streaming/filtered_summary_first_100.csv"
println(fileName)
val report = ReportUtils.toReport(fileName)
println(report)
println("exiting ...")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment