Skip to content

Instantly share code, notes, and snippets.

@syedatifakhtar
Last active May 29, 2020 02:56
Show Gist options
  • Save syedatifakhtar/43064d0079d4cb48109c641880b7770f to your computer and use it in GitHub Desktop.
Save syedatifakhtar/43064d0079d4cb48109c641880b7770f to your computer and use it in GitHub Desktop.
Generate spark data for given JSON/AVSC schema
import org.apache.spark.SparkConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SaveMode, SparkSession}

import scala.io.Source.fromFile
import scala.util.Random
import scala.util.parsing.json.JSON

object DBGenMain2 {
  val master = "local[2]"
  val appName = "testing"

  val conf: SparkConf =
    new SparkConf()
      .setMaster(master)
      .setAppName(appName)
      .set("spark.sql.sources.partitionOverwriteMode", "dynamic")
      .set("spark.default.parallelism", 2.toString)
  val r = scala.util.Random

  def generators = Map[String, Unit => Any](
    "INT" -> (_ => r.nextInt(100)),
    "STRING" -> (_ => Random.alphanumeric.take(8).mkString("")),
    "DOUBLE" -> (_ => r.nextDouble),
    "LONG" -> (_ => r.nextLong)
  )

  val mapStringToSQLType = Map(
    ("INT" -> IntegerType),
    ("STRING" -> StringType),
    ("DOUBLE" -> DoubleType),
    ("LONG" -> LongType)
  )

  type JSONMap = Map[String, String]

  def argsParser(args: Array[String]) = {
    args.map {
      arg =>
        (arg.split("--")(1).split("=")(0), arg.split("--")(1).split("=")(1))
    }.toMap
  }

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().config(conf).getOrCreate()
    val argsMap = argsParser(args)
    val schemaBasePath = argsMap("schemaBasePath") // /Users/$user/workspace/test_data/schemas
    val outputBasePath = argsMap("outputBasePath") // /Users/$user/workspace/test_data/output
    val numRecords = argsMap("numRecords").toInt
    val tableFiles = argsMap("tables").split(",").map { tableName => (tableName, s"${schemaBasePath}/${tableName}.json") }.toMap //members,claims
    val jsonSchemasForTables: Map[String, JSONMap] = tableFiles.map { case (k, v) =>
      (k, JSON.parseFull(fromFile(v).getLines.mkString).get.asInstanceOf[JSONMap])
    }

    jsonSchemasForTables.map {

      case (tableName, json) =>

        val rowRDD = spark.sparkContext.parallelize(1 to numRecords).map {
          _ =>
            Row.fromSeq(generateRowForSchemaType(json))
        }

        val structFields: Array[StructField] = json.map { case (fieldName, value) =>
          StructField(fieldName, mapStringToSQLType(value))
        }.toArray

        val structSchema = new StructType(structFields)

        (tableName, spark.createDataFrame(rowRDD, structSchema))


    }.foreach {
      case (tableName, df) =>
        df.write.option("separator", "\u0001").mode(SaveMode.Overwrite).option("header", "true").csv(s"$outputBasePath/$tableName/")
    }

  }

  private def generateRowForSchemaType(json: JSONMap) = {
    json.mapValues(x => generators(x).apply()).values.toSeq
  }
}

To Invoke: jarName.jar --schemaBasePath=/Users/atif/workspace/test_data/schemas --outputBasePath=/Users/atif/workspace/test_data/output --tables=members,claims --numRecords=10000000

Where members.json is

{
	"member_id": "INT",
	"member_name": "STRING",
	"member_insured_amt": "LONG"
}

And claims.json is

{
	"claims_id": "INT",
	"claims_member_id": "INT",
	"claims_description": "STRING",
	"claims_amount": "LONG"

}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment