Skip to content

Instantly share code, notes, and snippets.

View mannharleen's full-sized avatar
👽

Harleen Mann mannharleen

👽
View GitHub Profile

Practice Problem:

Using sqoop, import orders table into hdfs to folders /user/cloudera/problem1/orders. Use any file type and compression codec of your choice.

Using sqoop, import order_items table into hdfs to folders /user/cloudera/problem1/order-items. Use any file type and compression codec of your choice.

Using Spark Scala load data at /user/cloudera/problem1/orders and /user/cloudera/problem1/orders-items items as dataframes.

Find total orders and total amount per status per day. The result should be sorted by order date in descending, order status in ascending and total amount in ascending and total orders in ascending.

Use sparkSQL and spark dataframe API to solve this

@mannharleen
mannharleen / Problem: sparkRDD.md
Last active September 28, 2017 13:20
Test your knowledge in using the Spark RDD API (hint: can you aggregateByKey?)

Practice problem

Using Spark RDD API do the following for retail_db database:

For each product category, find the higest price, the total number of products, the average price and the minimum price

$ spark-shell

scala>
sqlContext.setConf("spark.sql.shuffle.partitions","4")
val products = sqlContext.read.parquet("/user/hive/warehouse/retail_db.db/products")

Problem:

Show all the orders details for that day which had the highest number of orders. Use hive to perform this.

Caution: You may not be able to perform this using sparkSQL < 2.0 version since it does not support sub queries in where clause

hive>
select * from retail_db.orders
where order_date =
(select q1.order_date from

Using retail_db, find total orders and total amount per status per day.

The result should be sorted by order date in descending, order status in ascending and total amount in ascending and total orders in ascending.

Use pyspark API

import pyspark.sql.functions as F
o = sparkContext.sql("select * from orders")
oi = sparkContext.sql("select * from order_items")
df = oi.join(o,o.order_id == oi.order_item_order_id).groupby(F.from_unixtime(df.order_date/1000).alias("date"), o.order_status).agg(F.countDistinct(o.order_id).alias("total_orders"),F.sum(oi.order_item_subtotal).alias("total_amount"))
df1 = df.orderBy(df.date.desc(), df.order_status, df.total_orders, df.total_amount.desc())
@mannharleen
mannharleen / anaconda environments for pyspark.md
Last active October 3, 2017 07:35
Installing anaconda on windows, creating separate environments

Install from: https://www.anaconda.com/download

Open the "Anaconda prompt" from start menu

create and activate new environment

conda create --name py36_env python=3.6 anaconda
activate py36_env

Confirm python version by running

## In this examples we will build a assembly/fat/uber jar for that will include oracle jdbc driver "ojdbc6.jar"
### We will be using sbt for building the fat jar
### The folder structure of the project must be:
```
root
|--lib
|--ojdbc.jar
|--project
|--assembly.sbt
package com.oraclejdbc
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.hive.HiveContext
import java.util.Properties
object oraclejdbc {
def main(args: Array[String]): Unit = {

The secure way: sqlplus easy connect syntax in order to be prompted for a password:

sqlplus LOGMNR_USER@\"172.18.20.170:1521/XXXLIVE\"

specify the password in the command:
sqlplus LOGMNR_USER/password @172.18.20.170:1521/XXXLIVE

@mannharleen
mannharleen / apply_cdc.scala
Created October 10, 2017 09:56
Actual implementation of applying the CDC data to the base data to obtain the up to date data.
/*
---- CDC in Hadoop using Spark with Data Frames API ----
Implementation details:
- Source system used: Oracle 11g using Streaming CDC in hotLog mode
- Destination: S3 folders
*/
import org.apache.spark.{SparkContext,SparkConf}
import org.apache.spark.sql.hive.HiveContext
package test
import java.sql.DriverManager
//Connects to mysql DB using java.sql.DriverManager class
object test1 {
def main(args: Array[String]): Unit = {
//Class.forName("com.mysql.jdbc.Driver")
val connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/retail_db","root","xxxx")