Skip to content

Instantly share code, notes, and snippets.

@jayunit100
Created December 14, 2014 03:33
Show Gist options
  • Save jayunit100/c926562d09ae00df8fe7 to your computer and use it in GitHub Desktop.
Save jayunit100/c926562d09ae00df8fe7 to your computer and use it in GitHub Desktop.
cleanup
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.bigtop.bigpetstore.spark.generator
import com.github.rnowling.bps.datagenerator.datamodels.{Store,Customer,PurchasingProfile,Transaction}
import com.github.rnowling.bps.datagenerator.{DataLoader,StoreGenerator,CustomerGenerator => CustGen, PurchasingProfileGenerator,TransactionGenerator}
import com.github.rnowling.bps.datagenerator.framework.SeedFactory
import scala.collection.JavaConversions._
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._
import org.apache.spark.rdd._
import java.util.ArrayList
import scala.util.Random
import java.io.File
import java.util.Date
object SparkDriver {
private var nStores: Int = -1
private var nCustomers: Int = -1
private var simulationLength: Double = -1.0
private var seed: Long = -1
private var outputDir: String = ""
private val NPARAMS = 5
private def printUsage() {
val usage: String =
"BigPetStore Data Generator.\n" +
"Usage: spark-submit ... outputDir nStores nCustomers simulationLength [seed]\n" +
"outputDir - (string) directory to write files\n" +
"nStores - (int) number of stores to generate\n" +
"nCustomers - (int) number of customers to generate\n" +
"simulationLength - (float) number of days to simulate\n" +
"seed - (long) seed for RNG. If not given, one is reandomly generated.\n"
System.err.println(usage)
}
def parseArgs(args: Array[String]) {
if(args.length != NPARAMS && args.length != (NPARAMS - 1)) {
printUsage()
System.exit(1)
}
outputDir = args(0)
try {
nStores = args(1).toInt
}
catch {
case _ : NumberFormatException =>
System.err.println("Unable to parse '" + args(1) + "' as an integer for nStores.\n")
printUsage()
System.exit(1)
}
try {
nCustomers = args(2).toInt
}
catch {
case _ : NumberFormatException =>
System.err.println("Unable to parse '" + args(2) + "' as an integer for nCustomers.\n")
printUsage()
System.exit(1)
}
try {
simulationLength = args(3).toDouble
}
catch {
case _ : NumberFormatException =>
System.err.println("Unable to parse '" + args(3) + "' as a float for simulationLength.\n")
printUsage()
System.exit(1)
}
//optional: This gaurantees identity?
if(args.length == NPARAMS) {
try {
seed = args(4).toLong
}
catch {
case _ : NumberFormatException =>
System.err.println("Unable to parse '" + args(4) + "' as a long for seed.\n")
printUsage()
System.exit(1)
}
}
else {
seed = (new Random()).nextLong
}
}
/**
* Here we generate an RDD of all the petstore transactions,
* by generating the static data first (stores, customers, ...)
* followed by running the simulation as a distributed spark task.
*/
def generateData(sc: SparkContext): RDD[Transaction] = {
val inputData = new DataLoader().loadData()
val seedFactory = new SeedFactory(seed);
println("Generating stores...")
val stores : ArrayList[Store] = new ArrayList()
val storeGenerator = new StoreGenerator(inputData, seedFactory);
for(i <- 1 to nStores) {
val store = storeGenerator.generate()
stores.add(store)
}
println("Done.")
println("Generating customers...")
var customers: List[Customer] = List()
val custGen = new CustGen(inputData, stores, seedFactory)
for(i <- 1 to nCustomers) {
val customer = custGen.generate()
customers = customer :: customers
}
println("...Done generating customers.")
println("Broadcasting stores and products...")
val storesBC = sc.broadcast(stores)
val productBC = sc.broadcast(inputData.getProductCategories())
val customerRDD = sc.parallelize(customers)
val nextSeed = seedFactory.getNextSeed()
println("...Done broadcasting stores and products.")
println("Defining transaction DAG...")
val transactionRDD = customerRDD.mapPartitionsWithIndex {
(index, custIter) =>
val seedFactory = new SeedFactory(nextSeed ^ index)
val transactionIter = custIter.map{
customer =>
val products = productBC.value
val profileGen = new PurchasingProfileGenerator(products, seedFactory)
val profile = profileGen.generate()
val transGen = new TransactionGenerator(customer, profile, storesBC.value, products, seedFactory)
var transactions : List[Transaction] = List()
var transaction = transGen.generate()
while(transaction.getDateTime() < simulationLength) {
transactions = transaction :: transactions
transaction = transGen.generate()
}
transactions
}
transactionIter
}.flatMap( s => s)
println("...Done defining transaction DAG.");
println("Generating transactions...")
// forces RDD materialization.
val nTrans = transactionRDD.count()
println(s"... Done Generating $nTrans transactions.")
/**
* Return the RDD representing all the petstore transactions.
*/
transactionRDD
}
def writeData(transactionRDD : RDD[Transaction]) {
val initialDate : Long = new Date().getTime()
val transactionStringsRDD = transactionRDD.map {
t =>
val products = t.getProducts()
//map all the products bought in the transaction
//over a function which converts the record to a
//string.
val records = products.map{
p =>
val name = t.getCustomer().getName()
val custLocation = t.getCustomer().getLocation()
val storeLocation = t.getStore().getLocation()
// days -> milliseconds = days * 24 h / day * 60 min / hr * 60 sec / min * 1000 ms / sec
val dateMS = (t.getDateTime * 24.0 * 60.0 * 60.0 * 1000.0).toLong
val date = new Date(initialDate + dateMS)
//
var record = ""
record += t.getStore().getId() + ","
record += storeLocation.getZipcode() + ","
record += storeLocation.getCity() + ","
record += storeLocation.getState() + ","
record += t.getCustomer().getId() + ","
record += name.getFirst() + "," + name.getSecond() + ","
record += custLocation.getZipcode() + ","
record += custLocation.getCity() + ","
record += custLocation.getState() + ","
record += t.getId() + ","
record += date + ","
record += p
record
}
records
}
//TODO: Confirm this works on any HCFS and not just local fs.
transactionStringsRDD.saveAsTextFile(outputDir + "/transactions")
}
/**
* Write generated data. For configuration options,
* see above.
* @param args
*/
def main(args: Array[String]) {
parseArgs(args)
val conf = new SparkConf().setAppName("BPS Data Generator")
val sc = new SparkContext(conf)
val transactionRDD = generateData(sc)
writeData(transactionRDD)
sc.stop()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment