Skip to content

Instantly share code, notes, and snippets.

@dixitm20
Last active September 4, 2024 08:16
Show Gist options
  • Save dixitm20/2feb1d8df0ab6e0ac2cb3f08f7de5962 to your computer and use it in GitHub Desktop.
Save dixitm20/2feb1d8df0ab6e0ac2cb3f08f7de5962 to your computer and use it in GitHub Desktop.
Assignment For Data Engineer

Spark: Scala / PySpark Exercise

Create a spark application written in Scala or PySpark that reads in the provided signals dataset, processes the data, and stores the entire output as specified below.

For each entity_id in the signals dataset, find the item_id with the oldest and newest month_id.In some cases it may be the same item. If there are 2 different items with the same month_id then take the item with the lower item_id. Finally sum the count of signals for each entity and output as the total_signals. The correct output should contain 1 row per unique entity_id.

Requirements:

  1. Create a Scala SBT project Or Pyspark Project (If you know scala then please use the same as we give higher preference to that).
  2. Use the Spark Scala/Pyspark API and Dataframes/Datasets
    • Please do not use Spark SQL with a sql string!
    • If you write spark.sql( “select ….”) you are doing it wrong!!
  3. Produce a single Parquet output file, regardless of parallelism during processing
  4. Hint: Use window analytics functions to compute final output in a single query

Input:

entity_id: long
item_id: integer
source: integer
month_id: integer
signal_count: integer

Output:

entity_id: long
oldest_item_id: integer
newest_item_id: integer
total_signals: integer

Example partial output:

+----------+--------------+--------------+-------------+
| entity_id|oldest_item_id|newest_item_id|total_signals|
+----------+--------------+--------------+-------------+
| 190979372|             2|             1|           20|
| 220897278|             2|             1|           66|
|1146753107|             2|             0|           22|
| 224619015|             0|             3|           12|
| 118083189|             5|             3|          234|
|    162371|             4|             2|           29|
|    555304|             0|             2|            9|
| 118634684|             2|             3|           17|
| 213956643|         10000|             1|           17|

Submission Guidelines:

Please submit the entire Scala SBT / PySpark project with all code necessary to build and run the app. You can bundle it as a zip, tarball or github link. Also include a copy of the output file that is generated from running the app.

@iamyashsingh
Copy link

package pack

import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions.{ col, lit, rand }
import org.apache.spark.sql.expressions.Window

object obj {

def main(args: Array[String]): Unit = {

System.setProperty("hadoop.home.dir", "C://data/hadoop")

val conf = new SparkConf().setAppName("first").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

val data = spark.read
  .parquet("file:///c:/data/data.parquet") // Adjust the path as needed

// Define the window specification
val windowSpec = Window.partitionBy("entity_id")

// Compute oldest and newest item_id
val resultDF = data
  .withColumn("oldest_item_id", first("item_id").over(windowSpec.orderBy(col("month_id"), col("item_id"))))
  .withColumn("newest_item_id", last("item_id").over(windowSpec.orderBy(col("month_id").desc, col("item_id"))))
  .groupBy("entity_id")
  .agg(
    sum("count").as("total_signals"),
    first("oldest_item_id").as("oldest_item_id"),
    first("newest_item_id").as("newest_item_id")
  )

// Write to Parquet
resultDF.write.csv("file:///c:/data/output.csv")

}
}

@iamyashsingh
Copy link

package pack

import org.apache.spark.SparkConf
import org.apache.spark.sql.functions._
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.functions.{ col, lit, rand }
import org.apache.spark.sql.expressions.Window

object obj {

def main(args: Array[String]): Unit = {

System.setProperty("hadoop.home.dir", "C://data/hadoop")

val conf = new SparkConf().setAppName("first").setMaster("local[*]")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

val data = spark.read
  .parquet("file:///c:/data/data.parquet") // Adjust the path as needed

// Define the window specification
val windowSpec = Window.partitionBy("entity_id")

// Compute oldest and newest item_id
val resultDF = data
  .withColumn("oldest_item_id", first("item_id").over(windowSpec.orderBy(col("month_id"), col("item_id"))))
  .withColumn("newest_item_id", last("item_id").over(windowSpec.orderBy(col("month_id").desc, col("item_id"))))
  .groupBy("entity_id")
  .agg(
    sum("count").as("total_signals"),
    first("oldest_item_id").as("oldest_item_id"),
    first("newest_item_id").as("newest_item_id")
  )

// Write to Parquet
resultDF.write.csv("file:///c:/data/output.csv")

}
}

@haswithamadupalli
Copy link

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def process_signals_data(input_path: str, output_path: str):
# Initialize Spark session
spark = SparkSession.builder
.appName("Signals Processor")
.getOrCreate()

# Read the input CSV file into a DataFrame
signals_df = spark.read.csv(input_path, header=True, inferSchema=True)

# Ensure correct data types for the columns
signals_df = signals_df.withColumn("entity_id", signals_df["entity_id"].cast("long")) \
                       .withColumn("item_id", signals_df["item_id"].cast("int")) \
                       .withColumn("month_id", signals_df["month_id"].cast("int")) \
                       .withColumn("signal_count", signals_df["signal_count"].cast("int"))

# Define window specifications for computing oldest and newest item_id
window_oldest = Window.partitionBy("entity_id").orderBy(F.col("month_id").asc(), F.col("item_id").asc())
window_newest = Window.partitionBy("entity_id").orderBy(F.col("month_id").desc(), F.col("item_id").asc())

# Compute the oldest and newest item_id for each entity_id
signals_df = signals_df.withColumn("oldest_item_id", F.first("item_id").over(window_oldest)) \
                       .withColumn("newest_item_id", F.first("item_id").over(window_newest))

# Select distinct rows to get unique entity_id, oldest_item_id, and newest_item_id
signals_df = signals_df.select("entity_id", "oldest_item_id", "newest_item_id").distinct()

# Compute total signals for each entity_id
total_signals_df = signals_df.groupBy("entity_id") \
                             .agg(F.sum("signal_count").alias("total_signals"))

# Join the results to combine total signals with oldest and newest item_id information
result_df = signals_df.join(total_signals_df, "entity_id", "inner") \
                      .select("entity_id", "oldest_item_id", "newest_item_id", "total_signals")

# Write the output to a single Parquet file
result_df.coalesce(1).write.mode("overwrite").parquet(output_path)

print(f"Output written to {output_path}")

# Stop the Spark session
spark.stop()

if name == "main":
# Set the input and output paths (these should be replaced with actual paths in your environment)
input_path = "data/signals.csv" # Path to input CSV file
output_path = "output/signals_output.parquet" # Output path for the Parquet file

# Process the signals data
process_signals_data(input_path, output_path)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment