Skip to content

Instantly share code, notes, and snippets.

@MooxxNew
Last active September 8, 2016 09:01
Show Gist options
  • Select an option

  • Save MooxxNew/7d833a2c74f35b87c5dc to your computer and use it in GitHub Desktop.

Select an option

Save MooxxNew/7d833a2c74f35b87c5dc to your computer and use it in GitHub Desktop.
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by Mooxx on 9/25/2015 AD.
*/
object TestPipe {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[*]").setAppName("Test Pipe")
val sc = new SparkContext(conf)
val data = List("hi","hello","how","are","you")
var dataRDD = sc.makeRDD(data)
dataRDD.collect().foreach(println)
val scriptPath = "/Users/Mooxx/IdeaProjects/TestAdamPipe/src/main/scala-2.10/echo.sh"
val pipeRDD = dataRDD.pipe(scriptPath)
pipeRDD.collect().foreach(println)
}
}
@MooxxNew
Copy link
Copy Markdown
Author

MooxxNew commented Sep 8, 2016

@MooxxNew
Copy link
Copy Markdown
Author

MooxxNew commented Sep 8, 2016

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.bdgenomics.adam.projections.{AlignmentRecordField, Projection}
import org.bdgenomics.adam.rdd.ADAMContext
import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD

/**

  • Created by Mooxx on 8/30/2016 AD.
    */
    object TestSplit {

    val conf = new SparkConf().setAppName("SparkWordCount").setMaster("local[3]")

    val sc = new SparkContext(conf)

    val ac = new ADAMContext(sc)

    def main(args: Array[String]) {
    val reads = loadADAM( "/Users/pathaweengoenthai/Documents/Data/small.adam")

    val dataRDD = reads.rdd.map(x => x.getSequence)

     val sdd = sc.parallelize(dataRDD.collect())
    

    // val kmers = List("hi","hello","how","are","you","i","am","mooxx")
    // val dataRDD = sc.makeRDD(kmers)
    val scriptPath = "/Users/pathaweengoenthai/Documents/TestPipe/TestPipe"
    val pipeRDD = sdd.pipe(scriptPath)
    val xxx = pipeRDD.map(x => (x.split(",")(0),x.split(",")(1).toLong)).reduceByKey(+)
    println("\n\n\n\nxxxxxxx List : " + sdd.collect().foreach(println) + " testPipe : " + pipeRDD.collect().foreach(println))
    println("\n\n\n\n\n\nyyyyyyyyyy rerult : " + xxx.collect().foreach(println))

// println("\nxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n" + kmers.collect.foreach(a => println(a)) +"\nxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n")
// println("\nxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n" + kmers.collect.length +"\nxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n")
}

def loadADAM(path: String): AlignmentRecordRDD = {
val load = ac.loadAlignments(path,
projection = Some(
Projection(
AlignmentRecordField.sequence,
AlignmentRecordField.readMapped,
AlignmentRecordField.mapq
)
)
)

return load

}

// def split(DNASequence: RDD[Char]): RDD[Char] ={
//
// }

}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment