Created
June 12, 2015 18:09
-
-
Save aazout/ba3353f4bfd8e68eb409 to your computer and use it in GitHub Desktop.
This code snippet shows a method of running a 3rd party batch trainer using subprocess in Scala on Spark.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.aol.advertising.execution | |
import org.apache.spark.{SparkConf, SparkContext, HashPartitioner} | |
import scala.sys.process._ | |
import java.io._ | |
import org.apache.hadoop.io._ | |
import org.apache.hadoop.mapred.{FileSplit, TextInputFormat} | |
import org.apache.spark.rdd.HadoopRDD | |
object FlowBatch { | |
def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) { | |
val p = new java.io.PrintWriter(f) | |
try { op(p) } finally { p.close() } | |
} | |
/* | |
* This function will be used to map the partitions to campaign ID. Thus the | |
* file name can have an identifier, so that the training system matches one output | |
* model for each id. For instance, one model per campaign. | |
*/ | |
def pathKeyFunction(path : String) : Int = { | |
//name.substring("campaign".length, name.length).toInt | |
val campaignStr = "(campaign).*?/".r.findFirstIn(path).get | |
(campaignStr.substring(campaignStr.indexOf("n") + 1, campaignStr.length - 1)).toInt | |
} | |
def main(args : Array[String]) { | |
val conf = new SparkConf() | |
.setAppName("Flow Framework 1.0") | |
.set("spark.executor.memory", "10g") | |
val sc = new SparkContext(conf) | |
val text = sc.hadoopFile("hdfs://localhost:9000/data/*/*.gz", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], sc.defaultMinPartitions) | |
val hadoopRdd = text.asInstanceOf[HadoopRDD[LongWritable, Text]] | |
val fileAndLine = hadoopRdd.mapPartitionsWithInputSplit { (inputSplit, iterator) => | |
val file = inputSplit.asInstanceOf[FileSplit] | |
iterator.map { | |
tpl => (pathKeyFunction(file.getPath.toString), tpl._2.toString()) | |
} | |
}.partitionBy(new HashPartitioner(2)) | |
fileAndLine.groupBy(f => f._1) map (x => { | |
/* Create a random temp file to store training data */ | |
val f = new File(x._1 + ".model") | |
printToFile(f) { p => x._2 map (y => p.println(y._2)) } | |
val output = (Process("vw -d " + f.getPath + " -f house_model --loss_function=logistic --l2=0.00000001 --l1=0.00000001 -b 20")).!! | |
println("VW Output: " + output) | |
f.delete | |
}) collect | |
sc.stop() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment