Created
December 14, 2014 03:33
-
-
Save jayunit100/c926562d09ae00df8fe7 to your computer and use it in GitHub Desktop.
cleanup
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.bigtop.bigpetstore.spark.generator | |
import com.github.rnowling.bps.datagenerator.datamodels.{Store,Customer,PurchasingProfile,Transaction} | |
import com.github.rnowling.bps.datagenerator.{DataLoader,StoreGenerator,CustomerGenerator => CustGen, PurchasingProfileGenerator,TransactionGenerator} | |
import com.github.rnowling.bps.datagenerator.framework.SeedFactory | |
import scala.collection.JavaConversions._ | |
import org.apache.spark.{SparkContext, SparkConf} | |
import org.apache.spark.SparkContext._ | |
import org.apache.spark.rdd._ | |
import java.util.ArrayList | |
import scala.util.Random | |
import java.io.File | |
import java.util.Date | |
object SparkDriver { | |
private var nStores: Int = -1 | |
private var nCustomers: Int = -1 | |
private var simulationLength: Double = -1.0 | |
private var seed: Long = -1 | |
private var outputDir: String = "" | |
private val NPARAMS = 5 | |
private def printUsage() { | |
val usage: String = | |
"BigPetStore Data Generator.\n" + | |
"Usage: spark-submit ... outputDir nStores nCustomers simulationLength [seed]\n" + | |
"outputDir - (string) directory to write files\n" + | |
"nStores - (int) number of stores to generate\n" + | |
"nCustomers - (int) number of customers to generate\n" + | |
"simulationLength - (float) number of days to simulate\n" + | |
"seed - (long) seed for RNG. If not given, one is reandomly generated.\n" | |
System.err.println(usage) | |
} | |
def parseArgs(args: Array[String]) { | |
if(args.length != NPARAMS && args.length != (NPARAMS - 1)) { | |
printUsage() | |
System.exit(1) | |
} | |
outputDir = args(0) | |
try { | |
nStores = args(1).toInt | |
} | |
catch { | |
case _ : NumberFormatException => | |
System.err.println("Unable to parse '" + args(1) + "' as an integer for nStores.\n") | |
printUsage() | |
System.exit(1) | |
} | |
try { | |
nCustomers = args(2).toInt | |
} | |
catch { | |
case _ : NumberFormatException => | |
System.err.println("Unable to parse '" + args(2) + "' as an integer for nCustomers.\n") | |
printUsage() | |
System.exit(1) | |
} | |
try { | |
simulationLength = args(3).toDouble | |
} | |
catch { | |
case _ : NumberFormatException => | |
System.err.println("Unable to parse '" + args(3) + "' as a float for simulationLength.\n") | |
printUsage() | |
System.exit(1) | |
} | |
//optional: This gaurantees identity? | |
if(args.length == NPARAMS) { | |
try { | |
seed = args(4).toLong | |
} | |
catch { | |
case _ : NumberFormatException => | |
System.err.println("Unable to parse '" + args(4) + "' as a long for seed.\n") | |
printUsage() | |
System.exit(1) | |
} | |
} | |
else { | |
seed = (new Random()).nextLong | |
} | |
} | |
/** | |
* Here we generate an RDD of all the petstore transactions, | |
* by generating the static data first (stores, customers, ...) | |
* followed by running the simulation as a distributed spark task. | |
*/ | |
def generateData(sc: SparkContext): RDD[Transaction] = { | |
val inputData = new DataLoader().loadData() | |
val seedFactory = new SeedFactory(seed); | |
println("Generating stores...") | |
val stores : ArrayList[Store] = new ArrayList() | |
val storeGenerator = new StoreGenerator(inputData, seedFactory); | |
for(i <- 1 to nStores) { | |
val store = storeGenerator.generate() | |
stores.add(store) | |
} | |
println("Done.") | |
println("Generating customers...") | |
var customers: List[Customer] = List() | |
val custGen = new CustGen(inputData, stores, seedFactory) | |
for(i <- 1 to nCustomers) { | |
val customer = custGen.generate() | |
customers = customer :: customers | |
} | |
println("...Done generating customers.") | |
println("Broadcasting stores and products...") | |
val storesBC = sc.broadcast(stores) | |
val productBC = sc.broadcast(inputData.getProductCategories()) | |
val customerRDD = sc.parallelize(customers) | |
val nextSeed = seedFactory.getNextSeed() | |
println("...Done broadcasting stores and products.") | |
println("Defining transaction DAG...") | |
val transactionRDD = customerRDD.mapPartitionsWithIndex { | |
(index, custIter) => | |
val seedFactory = new SeedFactory(nextSeed ^ index) | |
val transactionIter = custIter.map{ | |
customer => | |
val products = productBC.value | |
val profileGen = new PurchasingProfileGenerator(products, seedFactory) | |
val profile = profileGen.generate() | |
val transGen = new TransactionGenerator(customer, profile, storesBC.value, products, seedFactory) | |
var transactions : List[Transaction] = List() | |
var transaction = transGen.generate() | |
while(transaction.getDateTime() < simulationLength) { | |
transactions = transaction :: transactions | |
transaction = transGen.generate() | |
} | |
transactions | |
} | |
transactionIter | |
}.flatMap( s => s) | |
println("...Done defining transaction DAG."); | |
println("Generating transactions...") | |
// forces RDD materialization. | |
val nTrans = transactionRDD.count() | |
println(s"... Done Generating $nTrans transactions.") | |
/** | |
* Return the RDD representing all the petstore transactions. | |
*/ | |
transactionRDD | |
} | |
def writeData(transactionRDD : RDD[Transaction]) { | |
val initialDate : Long = new Date().getTime() | |
val transactionStringsRDD = transactionRDD.map { | |
t => | |
val products = t.getProducts() | |
//map all the products bought in the transaction | |
//over a function which converts the record to a | |
//string. | |
val records = products.map{ | |
p => | |
val name = t.getCustomer().getName() | |
val custLocation = t.getCustomer().getLocation() | |
val storeLocation = t.getStore().getLocation() | |
// days -> milliseconds = days * 24 h / day * 60 min / hr * 60 sec / min * 1000 ms / sec | |
val dateMS = (t.getDateTime * 24.0 * 60.0 * 60.0 * 1000.0).toLong | |
val date = new Date(initialDate + dateMS) | |
// | |
var record = "" | |
record += t.getStore().getId() + "," | |
record += storeLocation.getZipcode() + "," | |
record += storeLocation.getCity() + "," | |
record += storeLocation.getState() + "," | |
record += t.getCustomer().getId() + "," | |
record += name.getFirst() + "," + name.getSecond() + "," | |
record += custLocation.getZipcode() + "," | |
record += custLocation.getCity() + "," | |
record += custLocation.getState() + "," | |
record += t.getId() + "," | |
record += date + "," | |
record += p | |
record | |
} | |
records | |
} | |
//TODO: Confirm this works on any HCFS and not just local fs. | |
transactionStringsRDD.saveAsTextFile(outputDir + "/transactions") | |
} | |
/** | |
* Write generated data. For configuration options, | |
* see above. | |
* @param args | |
*/ | |
def main(args: Array[String]) { | |
parseArgs(args) | |
val conf = new SparkConf().setAppName("BPS Data Generator") | |
val sc = new SparkContext(conf) | |
val transactionRDD = generateData(sc) | |
writeData(transactionRDD) | |
sc.stop() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment