Skip to content

Instantly share code, notes, and snippets.

View mannharleen's full-sized avatar
👽

Harleen Mann mannharleen

👽
View GitHub Profile
@mannharleen
mannharleen / rdd-sort-strings-asc-desc.scala
Last active September 13, 2017 05:41
Spark RDD API to sort RDD by string ascending and string descending at the same time
/*---------------------------------------------------------------------------------------------------------------------------------
Spark RDD API to sort RDD by string ascending and string descending at the same time
akin to DF API "sort by col1 asc, col2 desc"
Created by: https://www.linkedin.com/in/harleenmann1/
---------------------------------------------------------------------------------------------------------------------------------*/
/*
// Problem: Given an RDD[String, String, Int], order the elements by ascending first element and descending second element
// Use RDD API
*/
val rddkv = sc.parallelize(List(("k1",1),("k2",2),("k1",2),("k3",5),("k3",1)))
//rddkv.collect
//Array[(String, Int)] = Array((k1,1), (k2,2), (k1,2), (k3,5), (k3,1))
//creating a hash partition
rddkv.partitionBy(new org.apache.spark.HashPartitioner(3)).mapPartitionsWithIndex( (i,iter_p) => iter_p.map(x=>" index="+i+" value="+x)).collect
//Array[String] = Array(" index=0 value=(k1,1)", " index=0 value=(k1,2)", " index=1 value=(k2,2)", " index=2 value=(k3,5)", " index=2 value=(k3,1)")
//creating a range partition
Fun with RDD transfomations :-
val rdd = sc.parallelize(Array(("a","c",1),("b","c",2),("a","a",5),("a","a",1)))
//rdd.collect
//res44: Array[(String, String, Int)] = Array((a,c,1), (b,c,2), (a,a,5), (a,a,1))
//------------------------------//
//-- map
rdd.map(p=> (p._1,p._2,p._3,p._3+1)).collect
//res54: Array[(String, String, Int, Int)] = Array((a,c,1,2), (b,c,2,3), (a,a,5,6), (a,a,1,2))
/*
Expected Result: Order_Date , Order_status, total_orders, total_amount. In plain english, please find total orders and total amount per status per day. The result should be sorted by order date in descending, order status in ascending and total amount in descending and total orders in ascending. Aggregation should be done using below methods.
*/val df_orders = sqlContext.read.format("com.databricks.spark.avro").load("/user/cloudera/problem1/orders")
/[order_id: int, order_date: bigint, order_customer_id: int, order_status: string]
val df_order_items = sqlContext.read.format("com.databricks.spark.avro").load("/user/cloudera/problem1/order-items")
//[order_item_id: int, order_item_order_id: int, order_item_product_id: int, order_item_quantity: int, order_item_subtotal: float, order_item_product_price: float]
//--------------------------------//--------------------------------//--------------------------------//--------------------------------
//--- Textfile, Sequence file
set io.compression.codecs;
//displays all installed codecs
set mapred.output.compression.type=BLOCK;
//--------------------------------
//-- Intermediate file compression
@mannharleen
mannharleen / agent3.conf
Created September 20, 2017 03:04
flume: avro source ->jdbc channel->logger sink
agent3.sources = source1
agent3.channels = channel1
agent3.sinks = sink1
agent3.sources.source1.type = avro
agent3.sources.source1.port = 11112
agent3.sources.source1.bind = localhost
agent3.sinks.sink1.type = logger
@mannharleen
mannharleen / agent4.conf
Created September 20, 2017 04:05
avro source to file channel to hdfs sink (avro with snappy codec)
#avro source to file channel to hdfs sink (with avro snappy codec)
agent4.sources = source1
agent4.channels = channel1
agent4.sinks = sink1
# Source configuration
agent4.sources.source1.type = avro
agent4.sources.source1.port = 11112
agent4.sources.source1.bind = localhost
@mannharleen
mannharleen / sols to arun-teaches-u-tech.blogspot.m
Created September 20, 2017 14:02
Rough solutions to CCA practice problems from Arun @ http://arun-teaches-u-tech.blogspot.my/
------------------------
http://arun-teaches-u-tech.blogspot.my/p/cca-175-prep-problem-scenario-1.html
------------------------
1.
sqoop import --connect 'jdbc:mysql://localhost:3306/retail_db \
--username root --password cloudera \
--table orders \
--target-dir /user/cloudera/problem1/orders \
--as-avrodatafile --compress --compression-codec org.apache.hadoop.io.compress.SnappyCodec

sqoop cheat sheet

sqoop help

sqoop help import
sqoop help export
sqoop help merge

sqoop list-xx

Complex data types in hive can be tricky.

I struggled to find any decent reference on the internet that would help me 'insert into ...' for array, map and struct data types columns. All I found was examples with the 'load ...' statement.

Here are some working examples that would help anyone with the issue that I faced.

ARRAYS:

create table x( col1 Int, col2 ARRAY<Int>);
insert into x select 1,array(1,2,3) union all select 2,array(2,3,4);