Created
December 16, 2014 00:30
-
-
Save jayunit100/e7302769864cc4332ce9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Licensed to the Apache Software Foundation (ASF) under one or more | |
* contributor license agreements. See the NOTICE file distributed with | |
* this work for additional information regarding copyright ownership. | |
* The ASF licenses this file to You under the Apache License, Version 2.0 | |
* (the "License"); you may not use this file except in compliance with | |
* the License. You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.apache.bigtop.bigpetstore.spark.generator | |
import com.github.rnowling.bps.datagenerator.datamodels.inputs.ZipcodeRecord | |
import com.github.rnowling.bps.datagenerator.datamodels._ | |
import com.github.rnowling.bps.datagenerator.{DataLoader,StoreGenerator,CustomerGenerator => CustGen, PurchasingProfileGenerator,TransactionGenerator} | |
import com.github.rnowling.bps.datagenerator.framework.SeedFactory | |
import scala.collection.JavaConversions._ | |
import org.apache.spark.{SparkContext, SparkConf} | |
import org.apache.spark.SparkContext._ | |
import org.apache.spark.rdd._ | |
import java.util.ArrayList | |
import scala.util.Random | |
import java.io.File | |
import java.util.Date | |
object SparkDriver { | |
private var nStores: Int = -1 | |
private var nCustomers: Int = -1 | |
private var simulationLength: Double = -1.0 | |
private var seed: Long = -1 | |
private var outputDir: String = "" | |
private val NPARAMS = 5 | |
private def printUsage() { | |
val usage: String = | |
"BigPetStore Data Generator.\n" + | |
"Usage: spark-submit ... outputDir nStores nCustomers simulationLength [seed]\n" + | |
"outputDir - (string) directory to write files\n" + | |
"nStores - (int) number of stores to generate\n" + | |
"nCustomers - (int) number of customers to generate\n" + | |
"simulationLength - (float) number of days to simulate\n" + | |
"seed - (long) seed for RNG. If not given, one is reandomly generated.\n" | |
System.err.println(usage) | |
} | |
//TODO is this style for args good or should i clean it up? | |
def parseArgs(args: Array[String]) { | |
if(args.length != NPARAMS && args.length != (NPARAMS - 1)) { | |
printUsage() | |
System.exit(1) | |
} | |
outputDir = args(0) | |
try { | |
nStores = args(1).toInt | |
} | |
catch { | |
case _ : NumberFormatException => | |
System.err.println("Unable to parse '" + args(1) + "' as an integer for nStores.\n") | |
printUsage() | |
System.exit(1) | |
} | |
try { | |
nCustomers = args(2).toInt | |
} | |
catch { | |
case _ : NumberFormatException => | |
System.err.println("Unable to parse '" + args(2) + "' as an integer for nCustomers.\n") | |
printUsage() | |
System.exit(1) | |
} | |
try { | |
simulationLength = args(3).toDouble | |
} | |
catch { | |
case _ : NumberFormatException => | |
System.err.println("Unable to parse '" + args(3) + "' as a float for simulationLength.\n") | |
printUsage() | |
System.exit(1) | |
} | |
//optional: This gaurantees identity? | |
if(args.length == NPARAMS) { | |
try { | |
seed = args(4).toLong | |
} | |
catch { | |
case _ : NumberFormatException => | |
System.err.println("Unable to parse '" + args(4) + "' as a long for seed.\n") | |
printUsage() | |
System.exit(1) | |
} | |
} | |
else { | |
seed = (new Random()).nextLong | |
} | |
} | |
/** | |
* Here we generate an RDD of all the petstore transactions, | |
* by generating the static data first (stores, customers, ...) | |
* followed by running the simulation as a distributed spark task. | |
*/ | |
def generateData(sc: SparkContext): RDD[Transaction] = { | |
val inputData = new DataLoader().loadData() | |
val seedFactory = new SeedFactory(seed); | |
println("Generating stores...") | |
val stores : ArrayList[Store] = new ArrayList() | |
val storeGenerator = new StoreGenerator(inputData, seedFactory); | |
for(i <- 1 to nStores) { | |
val store = storeGenerator.generate() | |
stores.add(store) | |
} | |
println("Done.") | |
println("Generating customers...") | |
var customers: List[Customer] = List() | |
val custGen = new CustGen(inputData, stores, seedFactory) | |
for(i <- 1 to nCustomers) { | |
val customer = custGen.generate() | |
customers = customer :: customers | |
} | |
println("...Done generating customers.") | |
println("Broadcasting stores and products...") | |
val storesBC = sc.broadcast(stores) | |
val productBC = sc.broadcast(inputData.getProductCategories()) | |
val customerRDD = sc.parallelize(customers) | |
val nextSeed = seedFactory.getNextSeed() | |
println("...Done broadcasting stores and products.") | |
println("Defining transaction DAG...") | |
/** | |
* TODO RJ Can you explain this part ? | |
*/ | |
val transactionRDD = customerRDD.mapPartitionsWithIndex { | |
(index, custIter) => | |
val seedFactory = new SeedFactory(nextSeed ^ index) | |
val transactionIter = custIter.map{ | |
customer => | |
val products = productBC.value | |
val profileGen = new PurchasingProfileGenerator(products, seedFactory) | |
val profile = profileGen.generate() | |
val transGen = new TransactionGenerator(customer, profile, storesBC.value, products, seedFactory) | |
var transactions : List[Transaction] = List() | |
var transaction = transGen.generate() | |
while(transaction.getDateTime() < simulationLength) { | |
transactions = transaction :: transactions | |
transaction = transGen.generate() | |
} | |
/** | |
* output: The list of transactions produced | |
*/ | |
transactions | |
} | |
transactionIter | |
}.flatMap(s => s) | |
println("...Done defining transaction DAG."); | |
println("Generating transactions...") | |
// forces RDD materialization. | |
val nTrans = transactionRDD.count() | |
println(s"... Done Generating $nTrans transactions.") | |
/** | |
* Return the RDD representing all the petstore transactions. | |
* This RDD contains a distributed collection of instances where | |
* a customer went to a pet store, and bought a variable number of items. | |
* We can then serialize all the contents to disk. | |
*/ | |
transactionRDD | |
} | |
def lineItem(t: Transaction, name:Pair, | |
storeLocation:ZipcodeRecord, custLocation:ZipcodeRecord, | |
date:Date,p:Product): String = { | |
t.getStore().getId() + "," + | |
storeLocation.getZipcode() + "," + | |
storeLocation.getCity() + "," + | |
storeLocation.getState() + "," + | |
t.getCustomer().getId() + "," + | |
name.getFirst() + " " +name.getSecond() + "," + | |
custLocation.getZipcode() + "," + | |
custLocation.getCity() + "," + | |
custLocation.getState() + "," + | |
t.getId() + "," + | |
date + "," + p | |
} | |
def writeData(transactionRDD : RDD[Transaction]) { | |
val initialDate : Long = new Date().getTime() | |
val transactionStringsRDD = transactionRDD.map { | |
t => | |
val products = t.getProducts() | |
/********************************************************* | |
* Map all the products bought in the single transaction, | |
* over a function which converts the record to a | |
* string. | |
* NOTE: That one transaction includes many products. | |
* Map over each, and write out the metadata w/ transaction ID. | |
* ********************************************************/ | |
val records = products.map{ | |
p => | |
val name = t.getCustomer().getName() | |
val custLocation = t.getCustomer().getLocation() | |
val storeLocation = t.getStore().getLocation() | |
// days -> milliseconds = days * 24 h / day * 60 min / hr * 60 sec / min * 1000 ms / sec | |
val dateMS = (t.getDateTime * 24.0 * 60.0 * 60.0 * 1000.0).toLong | |
val date = new Date(initialDate + dateMS) | |
// Return a stringified "line item", which represents a single item bought. | |
lineItem(t,name,storeLocation,custLocation,date,p); | |
} | |
records | |
} | |
//TODO: Confirm this works on any HCFS and not just local fs. | |
transactionStringsRDD.saveAsTextFile(outputDir + "/transactions") | |
} | |
/** | |
* Write generated data. For configuration options, | |
* see above. | |
* @param args | |
*/ | |
def main(args: Array[String]) { | |
parseArgs(args) | |
val conf = new SparkConf().setAppName("BPS Data Generator") | |
val sc = new SparkContext(conf) | |
val transactionRDD = generateData(sc) | |
writeData(transactionRDD) | |
sc.stop() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment