Skip to content

Instantly share code, notes, and snippets.

@welly87
Created October 1, 2020 04:11
Show Gist options
  • Save welly87/29472b1f5fc2200f8ceee3678e361e9a to your computer and use it in GitHub Desktop.
Save welly87/29472b1f5fc2200f8ceee3678e361e9a to your computer and use it in GitHub Desktop.
@welly87
Copy link
Author

welly87 commented Oct 1, 2020

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz

!pip install -q findspark
!pip install -q pyarrow

@welly87
Copy link
Author

welly87 commented Oct 1, 2020

!wget https://github.com/welly87/spark-load/raw/master/mysql-connector-java-8.0.14.jar
!mv /content/mysql-connector-java-8.0.14.jar /content/spark-2.4.7-bin-hadoop2.7/jars/mysql-connector-java-8.0.14.jar

@welly87
Copy link
Author

welly87 commented Oct 1, 2020

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

driver_path = "/content/mysql-connector-java-8.0.14.jar" 

os.environ['PYSPARK_SUBMIT_ARGS'] = f'pyspark-shell --driver-class-path {driver_path} --jars {driver_path}'

import findspark
findspark.init("spark-2.4.7-bin-hadoop2.7")

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Bea Cukai - PySpark/Kudu") \
    .config('spark.driver.bindAddress','localhost') \
    .config('spark.driver.allowMultipleContexts','true') \
    .config('spark.sql.execution.arrow.pyspark.enabled', 'true') \
    .getOrCreate()

@welly87
Copy link
Author

welly87 commented Oct 1, 2020

@welly87
Copy link
Author

welly87 commented Oct 1, 2020

jdbcDF = spark.read \
  .format("jdbc") \
  .option("url", "jdbc:mysql://relational.fit.cvut.cz/ccs?serverTimezone=UTC") \
  .option("driver", "com.mysql.jdbc.Driver") \
  .option("dbtable", "transactions_1k") \
  .option("user", "guest") \
  .option("password", "relational") \
  .load()

@welly87
Copy link
Author

welly87 commented Oct 1, 2020

jdbcDF.createOrReplaceTempView("transactions")
trans = spark.sql("select * from transactions limit where Amount > 20")
trans.count()

@welly87
Copy link
Author

welly87 commented Oct 1, 2020

tdf = trans.select("*").toPandas()
tdf.head()

@welly87
Copy link
Author

welly87 commented Oct 1, 2020

SELECT * FROM welly.transactions_1k;

@welly87
Copy link
Author

welly87 commented Oct 1, 2020

CREATE EXTERNAL TABLE 001_andi.transactions_1k
STORED AS KUDU
TBLPROPERTIES (
  'kudu.table_name' = 'transactions_1k'
);

@welly87
Copy link
Author

welly87 commented Oct 1, 2020

CREATE EXTERNAL TABLE welly.transactions_1k
STORED AS KUDU
TBLPROPERTIES (
  'kudu.table_name' = 'transactions_1k'
);

CREATE EXTERNAL TABLE welly.customers
STORED AS KUDU
TBLPROPERTIES (
  'kudu.table_name' = 'customers'
);

CREATE EXTERNAL TABLE welly.gasstations
STORED AS KUDU
TBLPROPERTIES (
  'kudu.table_name' = 'gasstations'
);

CREATE EXTERNAL TABLE welly.products
STORED AS KUDU
TBLPROPERTIES (
  'kudu.table_name' = 'products'
);

@welly87
Copy link
Author

welly87 commented Oct 1, 2020

@welly87
Copy link
Author

welly87 commented Oct 1, 2020

@welly87
Copy link
Author

welly87 commented Oct 1, 2020

!pip install impyla

@welly87
Copy link
Author

welly87 commented Oct 1, 2020

from impala.dbapi import connect
from impala.util import as_pandas

conn = connect(host='178.128.112.105', port=21050) 
cursor = conn.cursor()
cursor.execute('SELECT * FROM welly.transactions_1k LIMIT 100')
df = as_pandas(cursor)

df.head()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment