import org.apache.spark.SparkConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import scala.io.Source.fromFile
import scala.util.Random
import scala.util.parsing.json.JSON
object DBGenMain2 {
val master = "local[2]"
val appName = "testing"
val conf: SparkConf =
new SparkConf()
.setMaster(master)
.setAppName(appName)
.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
.set("spark.default.parallelism", 2.toString)
val r = scala.util.Random
def generators = Map[String, Unit => Any](
"INT" -> (_ => r.nextInt(100)),
"STRING" -> (_ => Random.alphanumeric.take(8).mkString("")),
"DOUBLE" -> (_ => r.nextDouble),
"LONG" -> (_ => r.nextLong)
)
val mapStringToSQLType = Map(
("INT" -> IntegerType),
("STRING" -> StringType),
("DOUBLE" -> DoubleType),
("LONG" -> LongType)
)
type JSONMap = Map[String, String]
def argsParser(args: Array[String]) = {
args.map {
arg =>
(arg.split("--")(1).split("=")(0), arg.split("--")(1).split("=")(1))
}.toMap
}
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().config(conf).getOrCreate()
val argsMap = argsParser(args)
val schemaBasePath = argsMap("schemaBasePath") // /Users/$user/workspace/test_data/schemas
val outputBasePath = argsMap("outputBasePath") // /Users/$user/workspace/test_data/output
val numRecords = argsMap("numRecords").toInt
val tableFiles = argsMap("tables").split(",").map { tableName => (tableName, s"${schemaBasePath}/${tableName}.json") }.toMap //members,claims
val jsonSchemasForTables: Map[String, JSONMap] = tableFiles.map { case (k, v) =>
(k, JSON.parseFull(fromFile(v).getLines.mkString).get.asInstanceOf[JSONMap])
}
jsonSchemasForTables.map {
case (tableName, json) =>
val rowRDD = spark.sparkContext.parallelize(1 to numRecords).map {
_ =>
Row.fromSeq(generateRowForSchemaType(json))
}
val structFields: Array[StructField] = json.map { case (fieldName, value) =>
StructField(fieldName, mapStringToSQLType(value))
}.toArray
val structSchema = new StructType(structFields)
(tableName, spark.createDataFrame(rowRDD, structSchema))
}.foreach {
case (tableName, df) =>
df.write.option("separator", "\u0001").mode(SaveMode.Overwrite).option("header", "true").csv(s"$outputBasePath/$tableName/")
}
}
private def generateRowForSchemaType(json: JSONMap) = {
json.mapValues(x => generators(x).apply()).values.toSeq
}
}
To Invoke: jarName.jar --schemaBasePath=/Users/atif/workspace/test_data/schemas --outputBasePath=/Users/atif/workspace/test_data/output --tables=members,claims --numRecords=10000000
Where members.json is
{
"member_id": "INT",
"member_name": "STRING",
"member_insured_amt": "LONG"
}
And claims.json is
{
"claims_id": "INT",
"claims_member_id": "INT",
"claims_description": "STRING",
"claims_amount": "LONG"
}