Skip to content

Instantly share code, notes, and snippets.

@aazout
Created June 12, 2015 18:09
Show Gist options
  • Save aazout/ba3353f4bfd8e68eb409 to your computer and use it in GitHub Desktop.
Save aazout/ba3353f4bfd8e68eb409 to your computer and use it in GitHub Desktop.
This code snippet shows a method of running a 3rd party batch trainer using subprocess in Scala on Spark.
package com.aol.advertising.execution
import org.apache.spark.{SparkConf, SparkContext, HashPartitioner}
import scala.sys.process._
import java.io._
import org.apache.hadoop.io._
import org.apache.hadoop.mapred.{FileSplit, TextInputFormat}
import org.apache.spark.rdd.HadoopRDD
object FlowBatch {
def printToFile(f: java.io.File)(op: java.io.PrintWriter => Unit) {
val p = new java.io.PrintWriter(f)
try { op(p) } finally { p.close() }
}
/*
* This function will be used to map the partitions to campaign ID. Thus the
* file name can have an identifier, so that the training system matches one output
* model for each id. For instance, one model per campaign.
*/
def pathKeyFunction(path : String) : Int = {
//name.substring("campaign".length, name.length).toInt
val campaignStr = "(campaign).*?/".r.findFirstIn(path).get
(campaignStr.substring(campaignStr.indexOf("n") + 1, campaignStr.length - 1)).toInt
}
def main(args : Array[String]) {
val conf = new SparkConf()
.setAppName("Flow Framework 1.0")
.set("spark.executor.memory", "10g")
val sc = new SparkContext(conf)
val text = sc.hadoopFile("hdfs://localhost:9000/data/*/*.gz", classOf[TextInputFormat], classOf[LongWritable], classOf[Text], sc.defaultMinPartitions)
val hadoopRdd = text.asInstanceOf[HadoopRDD[LongWritable, Text]]
val fileAndLine = hadoopRdd.mapPartitionsWithInputSplit { (inputSplit, iterator) =>
val file = inputSplit.asInstanceOf[FileSplit]
iterator.map {
tpl => (pathKeyFunction(file.getPath.toString), tpl._2.toString())
}
}.partitionBy(new HashPartitioner(2))
fileAndLine.groupBy(f => f._1) map (x => {
/* Create a random temp file to store training data */
val f = new File(x._1 + ".model")
printToFile(f) { p => x._2 map (y => p.println(y._2)) }
val output = (Process("vw -d " + f.getPath + " -f house_model --loss_function=logistic --l2=0.00000001 --l1=0.00000001 -b 20")).!!
println("VW Output: " + output)
f.delete
}) collect
sc.stop()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment