Create a spark application written in Scala or PySpark that reads in the provided signals dataset, processes the data, and stores the entire output as specified below.
For each entity_id in the signals dataset, find the item_id with the oldest and newest month_id.In some cases it may be the same item. If there are 2 different items with the same month_id then take the item with the lower item_id. Finally sum the count of signals for each entity and output as the total_signals. The correct output should contain 1 row per unique entity_id.
- Create a Scala SBT project Or Pyspark Project (If you know scala then please use the same as we give higher preference to that).
- Use the Spark Scala/Pyspark API and Dataframes/Datasets
- Please do not use Spark SQL with a sql string!
- If you write spark.sql( “select ….”) you are doing it wrong!!
- Produce a single Parquet output file, regardless of parallelism during processing
- Hint: Use window analytics functions to compute final output in a single query
entity_id: long
item_id: integer
source: integer
month_id: integer
signal_count: integer
entity_id: long
oldest_item_id: integer
newest_item_id: integer
total_signals: integer
+----------+--------------+--------------+-------------+
| entity_id|oldest_item_id|newest_item_id|total_signals|
+----------+--------------+--------------+-------------+
| 190979372| 2| 1| 20|
| 220897278| 2| 1| 66|
|1146753107| 2| 0| 22|
| 224619015| 0| 3| 12|
| 118083189| 5| 3| 234|
| 162371| 4| 2| 29|
| 555304| 0| 2| 9|
| 118634684| 2| 3| 17|
| 213956643| 10000| 1| 17|
Please submit the entire Scala SBT / PySpark project with all code necessary to build and run the app. You can bundle it as a zip, tarball or github link. Also include a copy of the output file that is generated from running the app.
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
def process_signals_data(input_path: str, output_path: str):
# Initialize Spark session
spark = SparkSession.builder
.appName("Signals Processor")
.getOrCreate()
if name == "main":
# Set the input and output paths (these should be replaced with actual paths in your environment)
input_path = "data/signals.csv" # Path to input CSV file
output_path = "output/signals_output.parquet" # Output path for the Parquet file