Create a spark application written in Scala or PySpark that reads in the provided signals dataset, processes the data, and stores the entire output as specified below.
For each entity_id in the signals dataset, find the item_id with the oldest and newest month_id.In some cases it may be the same item. If there are 2 different items with the same month_id then take the item with the lower item_id. Finally sum the count of signals for each entity and output as the total_signals. The correct output should contain 1 row per unique entity_id.
- Create a Scala SBT project Or Pyspark Project (If you know scala then please use the same as we give higher preference to that).
- Use the Spark Scala/Pyspark API and Dataframes/Datasets
- Please do not use Spark SQL with a sql string!
- If you write spark.sql( “select ….”) you are doing it wrong!!
- Produce a single Parquet output file, regardless of parallelism during processing
- Hint: Use window analytics functions to compute final output in a single query
entity_id: long
item_id: integer
source: integer
month_id: integer
signal_count: integer
entity_id: long
oldest_item_id: integer
newest_item_id: integer
total_signals: integer
+----------+--------------+--------------+-------------+
| entity_id|oldest_item_id|newest_item_id|total_signals|
+----------+--------------+--------------+-------------+
| 190979372| 2| 1| 20|
| 220897278| 2| 1| 66|
|1146753107| 2| 0| 22|
| 224619015| 0| 3| 12|
| 118083189| 5| 3| 234|
| 162371| 4| 2| 29|
| 555304| 0| 2| 9|
| 118634684| 2| 3| 17|
| 213956643| 10000| 1| 17|
Please submit the entire Scala SBT / PySpark project with all code necessary to build and run the app. You can bundle it as a zip, tarball or github link. Also include a copy of the output file that is generated from running the app.
package pack
import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions.{ col, lit, rand }
import org.apache.spark.sql.expressions.Window
object obj {
def main(args: Array[String]): Unit = {
}
}