Created
April 10, 2017 07:37
-
-
Save yuta-imai/1412a7dd38a1a2d5805cf31ebffd53db to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package imaifactory.spark | |
import com.google.gson.{Gson, JsonElement, JsonObject} | |
import org.apache.spark.SparkConf | |
import org.apache.spark.serializer.KryoSerializer | |
import org.apache.spark.sql.SparkSession | |
object YarnProblemRepro extends App { | |
val conf = new SparkConf().setAppName("yarnIssueTester") | |
conf.set("spark.serializer", classOf[KryoSerializer].getName) | |
var jsonStringList = List( | |
""" | |
[ | |
{"name": "Alice", "score": 2 }, | |
{"name": "Alice", "score": 1 }, | |
{"name": "Bob", "score": 2 } | |
] | |
""".stripMargin, | |
""" | |
[ | |
{"name": "Alice", "score": 2 }, | |
{"name": "Alice", "score": 1 }, | |
{"name": "Bob", "score": 2 } | |
] | |
""".stripMargin) | |
case class Record ( name: String, B: Int ) | |
val spark = SparkSession.builder.config(conf).enableHiveSupport().getOrCreate() | |
val sc = spark.sparkContext | |
import spark.implicits._ | |
val rddOfString = sc.parallelize(jsonStringList) | |
val rddOfRecords = rddOfString.flatMap(jsonString => { | |
val gson = new Gson | |
val jsonArray = gson.fromJson(jsonString,classOf[Array[JsonObject]]) | |
jsonArray.map(record => { | |
val name = record.get("name") match { | |
case x:JsonElement => x.getAsString | |
case null => "-" | |
} | |
val score = record.get("score") match { | |
case x:JsonElement => x.getAsInt | |
case null => 0 | |
} | |
Record(name, score) | |
}).toIterator | |
}) | |
val df = rddOfRecords.toDF | |
df.groupBy("name").count.show | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment