Skip to content

Instantly share code, notes, and snippets.

import org.apache.spark.sql.functions.udf
import spark.sessionState.conf
conf.setConfString("spark.sql.pivotMaxValues", "" + Int.MaxValue)
val csv = spark.read.format("csv").option("header", true).load("/Users/tilak/Downloads/Pam/SalesAnalysis/data/store_sales_unified_2017.csv")
val uniqueKey: (String, String, String, String) => String = (x, y, z, v) => x + "_" + y + "_" + z + "_" + v
val someFn = udf(uniqueKey)
val newData = csv.withColumn("unique", someFn(csv.col("receipt_id"), csv.col("cash_register_id"), csv.col("sale_time"), csv.col("date")))
val countArticles = newData.groupBy("unique", "article_id").count()
var articles = countArticles.select("article_id").distinct()
val articleIds = articles.collect.map(x => x(0))
import org.apache.spark.sql.functions.udf
val csv = spark.read.format("csv").option("header", true).load("/Users/tilak/Downloads/Pam/SalesAnalysis/data/store_sales_unified_2017.csv")
val uniqueKey: (String, String, String, String) => String = (x, y ,z , v) => x + "_" + y + "_" + z + "_" + v
val someFn = udf(uniqueKey)
val newData = csv.withColumn("unique", someFn(csv.col("receipt_id"), csv.col("cash_register_id"), csv.col("sale_time"), csv.col("date")))
val countArticles = newData.groupBy("unique", "article_id").count()
val sameBill = countArticles.crossJoin(countArticles).filter(x => x.getString(0) == x.getString(3) && x.getString(1) != x.getString(4))
val newNames = sameBill.columns.toList.zipWithIndex.map((x) => x._1 + "_" + x._2)
@tilakpatidar
tilakpatidar / README.md
Last active January 25, 2018 21:43
Uninstall Cloudera and install HDP 2.6

Install HDP 2.6 and remove Cloudera 5.12

Uninstall cloudera

Follow the guide to uninstall. Link Also, remove the existing folders and users.

#Remove conf and logs
rm -rf rm -rf /etc/hadoop
rm -rf rm -rf /etc/hbase
rm -rf rm -rf /etc/hive
@tilakpatidar
tilakpatidar / README.md
Last active October 30, 2018 19:05
Install HUE on HDP

Install HUE on HDP

Prerequisites

Install required packages

yum install -y ant gcc g++ libkrb5-dev libmysqlclient-dev
yum install -y libssl-dev libsasl2-dev libsasl2-modules-gssapi-mit
yum install -y libsqlite3-dev libtidy-0.99-0 libxml2-dev libxslt-dev
yum install -y maven libldap2-dev python-dev python-simplejson python-setuptools
yum install -y libxslt-devel libxml++-devel libxml2-devel libffi-devel
@tilakpatidar
tilakpatidar / postgres_to_kafka.sh
Last active August 27, 2020 22:31
Postgres to Kafka streaming using debezium
# Run postgres instance
docker run --name postgres -p 5000:5432 debezium/postgres
# Run zookeeper instance
docker run -it --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper
# Run kafka instance
docker run -it --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka
# Run kafka connect
@tilakpatidar
tilakpatidar / basic.sql
Last active March 15, 2018 09:58
Common SQL server queries
-- Finding difference in values for some key
SELECT * from A a,b WHERE a.id = b.id AND NOT a.name = b.name;
--Returns one row for each CHECK, UNIQUE, PRIMARY KEY, and/or FOREIGN KEY
SELECT *
FROM INFORMATION_SCHEMA.TABLE_CONSTRAINTS
WHERE CONSTRAINT_NAME='XYZ'
--Returns one row for each FOREIGN KEY constrain
@tilakpatidar
tilakpatidar / pull_csv_from_s3.job
Last active March 24, 2018 07:06
Apache Gobblin job file for pulling csv files from a S3 Bucket
# ====================================================================
# PullCsvFromS3
# Pull CSV data from a directory S3 to our local system
# ====================================================================
job.name=PullCsvFromS3
job.description=Pull CSV data from a directory S3 to our local system
fs.uri=file:///
# Set working directory
@tilakpatidar
tilakpatidar / pull_csv_from_s3_to_avro.job
Created March 24, 2018 07:09
Apache Gobblin pull CSVs from S3 storage and write to AVRO
# ====================================================================
# PullCsvFromS3
# Pull CSV data from a directory S3 to our local system
# ====================================================================
job.name=PullCsvFromS3
job.description=Pull CSV data from a directory S3 to our local system and write as AVRO files
fs.uri=file:///
# Set working directory
@tilakpatidar
tilakpatidar / pull_csv_from_s3_to_mysql.job
Created March 25, 2018 02:30
Apache Gobblin job to ingest csv files from s3 buckets to a MySQL table.
# ====================================================================
# PullCsvFromS3
# Pull CSV data from a directory S3 to MySQL
# ====================================================================
job.name=PullCsvFromS3
job.description=Pull CSV data from a directory S3 to MySQL
fs.uri=file:///
# Set working directory
@tilakpatidar
tilakpatidar / unique_orc_records.scala
Created March 25, 2018 13:26
Finding unique records between ORC file and MySQL rows using Apache Spark
import spark.implicits._
import org.apache.spark.sql.SaveMode
val products = spark.sqlContext.read.format("jdbc").option("driver", "com.mysql.jdbc.Driver").option("dbtable", "products").option("user", "gobblin").option("password", "gobblin").option("url", "jdbc:mysql://localhost/mopar_demo").load()
val newProducts = spark.sqlContext.read.format("orc").load("/Users/tilak/gobblin/mopar-demo/output/org/apache/gobblin/copy/user/tilak/pricing.products_1521799535.csv/20180325023900_append/part.task_PullCsvFromS3_1521945534992_0_0.orc")
val newnewProducts = newProducts.except(products)
val dfWriter = newnewProducts.write.mode(SaveMode.Append)
val connectionProperties = new java.util.Properties()