Skip to content

Instantly share code, notes, and snippets.

@jarutis
Created August 20, 2015 07:29
Show Gist options
  • Save jarutis/9e55b5879c37acd0cbcf to your computer and use it in GitHub Desktop.
Save jarutis/9e55b5879c37acd0cbcf to your computer and use it in GitHub Desktop.
workshop

Spark Workshop 101

What is Spark?

Computational engine

Apache Spark is a fast and general engine for large-scale data processing.

Can be programmed with Scala, Java, Python, R languages.

Has several modules for:

  • Batch analysis (SQL stands here).
  • Real time analysis (Streaming data).
  • Machine learning (some basic statistics as well).
  • Graph analysis.

Spark Notebook

Notebooks incubating @ Vinted

We have two versions of notebooks running. Both full of compromises at the moment.

Create your folder

  • Create new folder with your name.

img/create_folder.png

  • Click on your created folder to enter it.

Create new notebook

  • Go to Clusters and click Create near Vinted Small.

img/create_cluster.png

  • Enter something meanigfull for Name.
  • Click on Spark Conf tab.
  • Enter something meanigfull with your name in spark.app.name.
  • Click OK

Fix notebook name

  • Due to some bug the Notebook has name Untitled.
  • Click on it, enter new name, click OK.
  • Click Save.
  • Close the Notebook.
  • Go to Running notebooks. Shutdown yourname/Untitled.snb.
  • Go to Files and open your notebook again.

Shutdown your notebook after work is done!

  • Go to Running notebooks and shutdown your notebook.

img/running.png

A little bit of Scala

Variables

Create variables

val someNumber = 1.5
val otherNumber = 5
val collection = Seq(1, 2, 3, 4)

val string = "some words"
val interpolatedString = s"number: ${someNumber}"

Objects

Everything is an object in Scala

[fn:1: Documentation of what methods each object has can be found at http://www.scala-lang.org/api/current/.]

Objects have methods

1.0.toString
1.5.floor

Objects (cont.)

Methods can be chained together

1.1
  .ceil
  .min(1.7)
  .until(2.1, 0.1)
  .map { number => f"${number}%1.1f" }

**** Input :B_block:

def makeString(arg1: Integer, arg2: String) = {
  arg1.toString + " = " + arg2
}
makeString(1, "vienas")

Output

“In Spark, a DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.” — databricks

Column

dataframe("columnName")
$"columnName".substr(1, 10)

Reading data

Create SQLContext

import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new HiveContext(sc)
import hiveCtx.implicits._

hiveCtx.sql("""
  select * from mysql_imports.us__items_ex
""")

Our helper functions

import vinted.datasources._
val tx = hiveCtx.readCoreTables("transactions")
val items = hiveCtx.readCoreTablesNewOld("items")

import vinted.warehouse.jobs.facts.Sale
val saleFact = Sale.read(hiveCtx)

Custom input data

Hive table

val dataframe =
 hiveCtx.sql("""
   select * from mysql_imports.us__apps
 """) // or
hiveCtx.sql("use mysql_imports")
val dataframe = 
 hiveCtx.table("us__items_ex")

Text file

val dataframe = sc.textFile("/path/f.txt").toDF

Create data in Notebook

val dataframe = Seq(
  (1, "jonas",  "t-shirt", 12.5),
  (2, "jonas",  "hoodie",  30.0),
  (3, "julija", "dress",   52.5),
  (4, "julija", "shues",   12.5),
  (5, "lechas", "t-shirt", 12.5),
).toDF("id", "name", "item", "price")

Comparison with SQL

Projections

SELECT id, price FROM items_ex
hiveCtx.readCoreTables("items_ex")
  .select($"id", $"price")

Filtering

SELECT * FROM items_ex WHERE price > 20
hiveCtx.readCoreTables("items_ex")
  .filter("CONCAT(year,month,day) > '20150101'")

// filter accepts Hive SQL expressions

Aggregation

SELECT app_id, COUNT(id) c 
FROM items_ex 
GROUP BY app_id
import org.apache.spark.sql.functions.count

hiveCtx.readCoreTables("items_ex")
  .groupBy($"app_id")
  .agg(count($"id") as "c")

Joins

SELECT i.id, a.title
FROM items_ex i
JOIN apps a ON i.app_id = a.id
val items = hiveCtx.readCoreTables("items_ex")
val apps = hiveCtx.readCoreTables("items_ex")

items.as("i")
  .join(apps.as("a"),
        $"i.app_id" === $"a.id" && 
          $"i.portal_id" === $"a.portal_id",
        "left_outer")

Ordering

SELECT * FROM items_ex ORDER BY created_at DESC
hiveCtx.readCoreTables("items_ex")
  .orderBy($"created_at")
  // or
  .orderBy($"created_at".desc)

Conditionals

SELECT IF(created_at > '2015', 1, 2)
FROM items_ex
import org.apache.spark.sql.functions.when

hiveCtx.readCoreTables("items_ex")
  .select(
    when($"created_at" > "2015", 1)
      .otherwise(2)
  )

Conditionals CASE

hiveCtx.readCoreTables("items_ex")
  .select(
    when($"created_at" === "2015", 1)
      .when($"created_at" > "2014", 2)
      .when($"created_at" > "2013", 3)
      .otherwise(4)
  )

Window functions

SELECT
  *,
  row_number() OVER
     (partition by app_id ORDER BY price) AS rank
FROM items_ex

Window functions (cont.)

import org.apache.spark.sql.functions.rowNumber
import org.apache.spark.sql.expressions.Window

hiveCtx.readCoreTables("items_ex")
  .select(
  $"*",
  rowNumber().over(
    Window.partitionBy($"app_id").orderBy($"price")
  ) as "row_num"

more info here

Hive functions in select

hiveCtx.readCoreTables("items_ex").as("i")
  .selectExpr(
    "SUBSTRING(created_at, 1, 10) AS date",
    "id",
    "i.*"
  )

Extending SQL

Writing UDFs

def trimSpace(str: String) = {
  str.trim
}
trimSpace("   asas   ")
val trim1 = hiveCtx.udf.register("trim2", trimSpace _)

hiveCtx.sql("SELECT trim2(title) FROM apps")

hiveCtx.readCoreTables("apps")
  .select(trim1($"title"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment