-
-
Save MooxxNew/7d833a2c74f35b87c5dc to your computer and use it in GitHub Desktop.
| import org.apache.spark.{SparkConf, SparkContext} | |
| /** | |
| * Created by Mooxx on 9/25/2015 AD. | |
| */ | |
| object TestPipe { | |
| def main(args: Array[String]) { | |
| val conf = new SparkConf().setMaster("local[*]").setAppName("Test Pipe") | |
| val sc = new SparkContext(conf) | |
| val data = List("hi","hello","how","are","you") | |
| var dataRDD = sc.makeRDD(data) | |
| dataRDD.collect().foreach(println) | |
| val scriptPath = "/Users/Mooxx/IdeaProjects/TestAdamPipe/src/main/scala-2.10/echo.sh" | |
| val pipeRDD = dataRDD.pipe(scriptPath) | |
| pipeRDD.collect().foreach(println) | |
| } | |
| } |
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.bdgenomics.adam.projections.{AlignmentRecordField, Projection}
import org.bdgenomics.adam.rdd.ADAMContext
import org.bdgenomics.adam.rdd.read.AlignmentRecordRDD
/**
-
Created by Mooxx on 8/30/2016 AD.
*/
object TestSplit {val conf = new SparkConf().setAppName("SparkWordCount").setMaster("local[3]")
val sc = new SparkContext(conf)
val ac = new ADAMContext(sc)
def main(args: Array[String]) {
val reads = loadADAM( "/Users/pathaweengoenthai/Documents/Data/small.adam")val dataRDD = reads.rdd.map(x => x.getSequence)
val sdd = sc.parallelize(dataRDD.collect())// val kmers = List("hi","hello","how","are","you","i","am","mooxx")
// val dataRDD = sc.makeRDD(kmers)
val scriptPath = "/Users/pathaweengoenthai/Documents/TestPipe/TestPipe"
val pipeRDD = sdd.pipe(scriptPath)
val xxx = pipeRDD.map(x => (x.split(",")(0),x.split(",")(1).toLong)).reduceByKey(+)
println("\n\n\n\nxxxxxxx List : " + sdd.collect().foreach(println) + " testPipe : " + pipeRDD.collect().foreach(println))
println("\n\n\n\n\n\nyyyyyyyyyy rerult : " + xxx.collect().foreach(println))
// println("\nxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n" + kmers.collect.foreach(a => println(a)) +"\nxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n")
// println("\nxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n" + kmers.collect.length +"\nxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n")
}
def loadADAM(path: String): AlignmentRecordRDD = {
val load = ac.loadAlignments(path,
projection = Some(
Projection(
AlignmentRecordField.sequence,
AlignmentRecordField.readMapped,
AlignmentRecordField.mapq
)
)
)
return load
}
// def split(DNASequence: RDD[Char]): RDD[Char] ={
//
// }
}
http://blog.madhukaraphatak.com/pipe-in-spark/