Apache Spark is a fast and general engine for large-scale data processing.
- Batch analysis (SQL stands here).
- Real time analysis (Streaming data).
- Machine learning (some basic statistics as well).
- Graph analysis.
We have two versions of notebooks running. Both full of compromises at the moment.
- Spark notebook: http://zeppelin.vinted.net:9000/
- Zeppelin: http://zeppelin.vinted.net:8080/
- Create new folder with your name.
- Click on your created folder to enter it.
- Go to Clusters and click Create near Vinted Small.
- Enter something meanigfull for Name.
- Click on Spark Conf tab.
- Enter something meanigfull with your name in spark.app.name.
- Click OK
- Due to some bug the Notebook has name Untitled.
- Click on it, enter new name, click OK.
- Click Save.
- Close the Notebook.
- Go to Running notebooks. Shutdown yourname/Untitled.snb.
- Go to Files and open your notebook again.
- Go to Running notebooks and shutdown your notebook.
val someNumber = 1.5
val otherNumber = 5
val collection = Seq(1, 2, 3, 4)
val string = "some words"
val interpolatedString = s"number: ${someNumber}"[fn:1: Documentation of what methods each object has can be found at http://www.scala-lang.org/api/current/.]
1.0.toString
1.5.floor1.1
.ceil
.min(1.7)
.until(2.1, 0.1)
.map { number => f"${number}%1.1f" }**** Input :B_block:
def makeString(arg1: Integer, arg2: String) = {
arg1.toString + " = " + arg2
}
makeString(1, "vienas")“In Spark, a DataFrame is a distributed collection of data organized into named columns. It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with richer optimizations under the hood.” — databricks
dataframe("columnName")
$"columnName".substr(1, 10)import org.apache.spark.sql.hive.HiveContext
val hiveCtx = new HiveContext(sc)
import hiveCtx.implicits._
hiveCtx.sql("""
select * from mysql_imports.us__items_ex
""")import vinted.datasources._
val tx = hiveCtx.readCoreTables("transactions")
val items = hiveCtx.readCoreTablesNewOld("items")
import vinted.warehouse.jobs.facts.Sale
val saleFact = Sale.read(hiveCtx)val dataframe =
hiveCtx.sql("""
select * from mysql_imports.us__apps
""") // or
hiveCtx.sql("use mysql_imports")
val dataframe =
hiveCtx.table("us__items_ex")val dataframe = sc.textFile("/path/f.txt").toDFval dataframe = Seq(
(1, "jonas", "t-shirt", 12.5),
(2, "jonas", "hoodie", 30.0),
(3, "julija", "dress", 52.5),
(4, "julija", "shues", 12.5),
(5, "lechas", "t-shirt", 12.5),
).toDF("id", "name", "item", "price")SELECT id, price FROM items_exhiveCtx.readCoreTables("items_ex")
.select($"id", $"price")SELECT * FROM items_ex WHERE price > 20hiveCtx.readCoreTables("items_ex")
.filter("CONCAT(year,month,day) > '20150101'")
// filter accepts Hive SQL expressionsSELECT app_id, COUNT(id) c
FROM items_ex
GROUP BY app_idimport org.apache.spark.sql.functions.count
hiveCtx.readCoreTables("items_ex")
.groupBy($"app_id")
.agg(count($"id") as "c")SELECT i.id, a.title
FROM items_ex i
JOIN apps a ON i.app_id = a.idval items = hiveCtx.readCoreTables("items_ex")
val apps = hiveCtx.readCoreTables("items_ex")
items.as("i")
.join(apps.as("a"),
$"i.app_id" === $"a.id" &&
$"i.portal_id" === $"a.portal_id",
"left_outer")SELECT * FROM items_ex ORDER BY created_at DESChiveCtx.readCoreTables("items_ex")
.orderBy($"created_at")
// or
.orderBy($"created_at".desc)SELECT IF(created_at > '2015', 1, 2)
FROM items_eximport org.apache.spark.sql.functions.when
hiveCtx.readCoreTables("items_ex")
.select(
when($"created_at" > "2015", 1)
.otherwise(2)
)hiveCtx.readCoreTables("items_ex")
.select(
when($"created_at" === "2015", 1)
.when($"created_at" > "2014", 2)
.when($"created_at" > "2013", 3)
.otherwise(4)
)SELECT
*,
row_number() OVER
(partition by app_id ORDER BY price) AS rank
FROM items_eximport org.apache.spark.sql.functions.rowNumber
import org.apache.spark.sql.expressions.Window
hiveCtx.readCoreTables("items_ex")
.select(
$"*",
rowNumber().over(
Window.partitionBy($"app_id").orderBy($"price")
) as "row_num"hiveCtx.readCoreTables("items_ex").as("i")
.selectExpr(
"SUBSTRING(created_at, 1, 10) AS date",
"id",
"i.*"
)def trimSpace(str: String) = {
str.trim
}
trimSpace(" asas ")
val trim1 = hiveCtx.udf.register("trim2", trimSpace _)
hiveCtx.sql("SELECT trim2(title) FROM apps")
hiveCtx.readCoreTables("apps")
.select(trim1($"title"))

