Last active
January 13, 2016 07:20
-
-
Save marcovivero/97137abc028a2e73e61c to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package conversions | |
import scala.math.max | |
import scala.math.min | |
import org.apache.spark.SparkContext | |
import org.apache.spark.sql.DataFrame | |
import org.apache.spark.sql.functions | |
import org.apache.spark.sql.SQLContext | |
import org.apache.spark.sql.UserDefinedFunction | |
import org.joda.time.DateTime | |
class Users( | |
sc: SparkContext) | |
extends Serializable { | |
val sqlContext = new SQLContext(sc) | |
import Common.timeFormatter | |
import sqlContext.implicits._ | |
import Users._ | |
// Get ad data. | |
val adData: DataFrame = sc | |
.textFile("<FILE_PATH>/users_ads.csv") | |
.map(line => { | |
// Extract CSV fields. | |
val values: Array[String] = line.split(",") | |
// Extract user id. | |
val id: String = values(0) | |
// Extract traffic source. | |
val trafficSource: String = values(1) | |
// Extract ad campaign. | |
val adCampaign: String = values(2) | |
// Extract ad medium. | |
val adMedium: String = values(3) | |
// Extract search keyword. | |
val searchKeyword: String = values(4) | |
// Extract ad content. | |
val adContent: String = values(5) | |
AdInfo(id, trafficSource, adCampaign, adMedium, searchKeyword, adContent) | |
}) | |
.toDF // Convert to Spark DataFrame. | |
.dropDuplicates(Seq("id")) // Deduplicate data. | |
// Get purchase data. | |
val purchaseData: DataFrame = sc | |
.textFile("<FILE_PATH>/conversions.csv") | |
.map(line => { | |
// Extract CSV fields. | |
val values: Array[String] = line.split(",") | |
// Extract user id. | |
val userId: String = values(0) | |
// Extract price at which item was sold. | |
val price: Double = values(2).toDouble | |
// Extract item quantity sold. | |
val quantity: Double = values(3).toDouble | |
// Extract revenue from purchase event. | |
val revenue: Double = price * quantity | |
// Extract purchase timestamp. | |
val purchaseTimestamp: String = values(4) | |
// Extract purchase time in seconds. | |
val purchaseTime: Double = timeFormatter | |
.parseDateTime(purchaseTimestamp) | |
.getMillis | |
.toDouble / 1000 | |
PurchaseEvent(userId, revenue, purchaseTime) | |
}) | |
.toDF // Convert to Spark DataFrame. | |
// Get signup data. | |
val signupData: DataFrame = sc | |
.textFile("<FILE_PATH>/users.csv") | |
.map(line => { | |
// Extract CSV fields. | |
val values: Array[String] = line.split(",") | |
val valuesSize: Int = values.size | |
// Extract user id/ | |
val id: String = values(0) | |
// Some register country fields contain commas,and thus we must collapse | |
// inner values components in order to extract register country. | |
val registerCountry: String = values | |
.slice(1, valuesSize - 1) | |
.mkString("-") | |
// Extract timestamp. | |
val signupTimestamp: String = values(valuesSize - 1) | |
// Parse DateTime from timestamp. | |
val signupDateTime: DateTime = timeFormatter | |
.parseDateTime(signupTimestamp) | |
// Extract signup time in seconds. | |
val signupTime: Double = signupDateTime | |
.getMillis | |
.toDouble / 1000 | |
// Extract day of year, week of year, and month of year for future feature generation. | |
val dayOfYear: String = signupDateTime | |
.getDayOfYear | |
.toString | |
val weekOfYear: String = signupDateTime | |
.getWeekOfWeekyear | |
.toString | |
val monthOfYear: String = signupDateTime | |
.getMonthOfYear | |
.toString | |
SignupEvent(id, registerCountry, signupTime, dayOfYear, weekOfYear, monthOfYear) | |
}) | |
.toDF // Convert to Spark DataFrame. | |
// Get view data. | |
val viewData: DataFrame = sc | |
.textFile("<FILE_PATH>/views.csv") | |
.map(line => { | |
// Extract CSV fields. | |
val values = line.split(",") | |
// Extract user id. | |
val userId: String = values(0) | |
// Extract view event timestamp. | |
val viewTimestamp: String = values(2) | |
// Extract view event time in seconds. | |
val viewTime: Double = timeFormatter | |
.parseDateTime(viewTimestamp) | |
.getMillis | |
.toDouble / 1000 | |
ViewEvent(userId,viewTime) | |
}) | |
.toDF // Convert to Spark DataFrame | |
// Extract earliest and latest recorded purchase times. | |
private val purchaseTimes: MinMaxTime = purchaseData | |
.withColumn("purchaseTimeCopy", $"purchaseTime") | |
.agg(Map("purchaseTime" -> "min", "purchaseTimeCopy" -> "max")) | |
.map(row => { | |
val minTime = row.getAs[Double]("min(purchaseTime)") | |
val maxTime = row.getAs[Double]("max(purchaseTimeCopy)") | |
MinMaxTime(minTime, maxTime) | |
}) | |
.collect | |
.apply(0) | |
// Extract earliest and latest recorded view times. | |
private val viewTimes: MinMaxTime = viewData | |
.withColumn("viewTimeCopy", $"viewTime") | |
.agg(Map("viewTime" -> "min", "viewTimeCopy" -> "max")) | |
.map(row => { | |
val minTime = row.getAs[Double]("min(viewTime)") | |
val maxTime = row.getAs[Double]("max(viewTimeCopy)") | |
MinMaxTime(minTime, maxTime) | |
}) | |
.collect | |
.apply(0) | |
// Extract minimum activity time. | |
val minActivityTime: Double = min(purchaseTimes.minTime, viewTimes.minTime) | |
val maxActivityTime: Double = max(purchaseTimes.maxTime, viewTimes.maxTime) | |
// Compute constant denoting the length of 365 days in seconds. | |
private val yearInSeconds: Double = 365 * 24 * 60 * 60 | |
// Generate activity time filter for removing users according to processing rules. | |
val activityFilterUDF: (Double, Double, Double) => UserDefinedFunction = { | |
(minActivityTime: Double, maxActivityTime: Double, yearInSeconds: Double) => functions.udf({ | |
(signupTime: Double) => { | |
signupTime >= minActivityTime && maxActivityTime - signupTime >= yearInSeconds | |
} | |
}) | |
} | |
// Generate user defined column function for returning nonzero revenues within CV time limits. | |
val cvUDF: Double => UserDefinedFunction = (yearInSeconds: Double) => functions.udf({ | |
(revenue: Double, purchaseTime: Double, signupTime: Double) => { | |
val timeDiff: Double= purchaseTime - signupTime | |
if (timeDiff >= 0 && timeDiff <= yearInSeconds) { | |
revenue | |
} else { | |
0.0 | |
} | |
} | |
}) | |
// Get pre final data frame that satisifies activity and revenue time requirements. | |
val validUsers: DataFrame = signupData | |
// Only keep users that satisfy activity requirements. | |
.filter(activityFilterUDF(minActivityTime, maxActivityTime, yearInSeconds)($"signupTime")) | |
val preCVData: DataFrame = validUsers | |
.join(purchaseData, validUsers("id") === purchaseData("userId"), "left_outer") | |
.na | |
.fill(Map("revenue" -> 0.0, "purchaseTime" -> 0.0)) | |
.drop("userId") | |
// Restrict revenue withing 12 month time limit. | |
.withColumn("customerValue", cvUDF(yearInSeconds)($"revenue", $"purchaseTime", $"signupTime")) | |
val cvData: DataFrame = preCVData | |
.groupBy("id") | |
.agg( | |
functions.first($"registerCountry").as("registerCountry"), | |
functions.first($"dayOfYear").as("dayOfYear"), | |
functions.first($"weekOfYear").as("weekOfYear"), | |
functions.first($"monthOfYear").as("monthOfYear"), | |
functions.sum($"customerValue").as("customerValue")) | |
.join(adData, "id") | |
.drop("id") | |
} | |
object Users { | |
def main(args: Array[String]): Unit = { | |
import scala.sys.process._ | |
printWrapper("Deleting old data.....") | |
"sudo rm -rf <OUTPUT_FILE_PATH>/users.parquet/".!! | |
printWrapper("............") | |
"sudo rm -rf <OUTPUT_FILE_PATH>/customerValue.csv/".!! | |
printWrapper("Done.") | |
val sparkConf = Common.getSparkConf("Computing_CV") | |
val sc: SparkContext = new SparkContext(sparkConf) | |
val users: Users = new Users(sc) | |
val cvData = users | |
.cvData | |
.cache | |
printWrapper("Writing new data.....") | |
cvData | |
.write | |
.parquet("<OUTPUT_FILE_PATH>/users.parquet") | |
printWrapper("............") | |
cvData | |
.map(_.getAs[Double]("customerValue")) | |
.saveAsTextFile("<OUTPUT_FILE_PATH>/customerValue.csv") | |
printWrapper("New data written.") | |
} | |
def printWrapper(any: Any): Unit = { | |
println() | |
println(any) | |
println() | |
} | |
case class AdInfo( | |
id: String, | |
trafficSource: String, | |
adCampaign: String, | |
adMedium: String, | |
searchKeyword: String, | |
adContent: String) | |
extends Serializable | |
case class MinMaxTime( | |
minTime: Double, | |
maxTime: Double) | |
extends Serializable | |
case class PurchaseEvent( | |
userId: String, | |
revenue: Double, | |
purchaseTime: Double) | |
extends Serializable | |
case class SignupEvent( | |
id: String, | |
registerCountry: String, | |
signupTime: Double, | |
dayOfYear: String, | |
weekOfYear: String, | |
monthOfYear: String) | |
extends Serializable | |
case class ViewEvent( | |
userId: String, | |
viewTime: Double) | |
extends Serializable | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment