Forked from Databracket9/duckdb_postgres_pyspark_tutorial
Created
October 31, 2024 15:29
-
-
Save juliobetta/e368652b8f0146900e23de3292736a16 to your computer and use it in GitHub Desktop.
Data Engineering with DuckDb Tutorial
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#library import | |
import duckdb | |
import configparser | |
from duckdb.experimental.spark.sql import SparkSession as session | |
from duckdb.experimental.spark.sql.functions import col, when, lit | |
#read configs from secrets file | |
config = configparser.ConfigParser() | |
config.read('duck_db_demo/secrets.ini') | |
user = config['POSTGRES']['USER'] | |
password = config['POSTGRES']['PASSWORD'] | |
port = config['POSTGRES']['PORT'] | |
database = config['POSTGRES']['DATABASE'] | |
#install and load postgres extension | |
duckdb.execute('INSTALL postgres;') | |
duckdb.execute('LOAD postgres;') | |
#connect to postgres | |
duckdb.execute("ATTACH 'dbname={} user={} password={} host=127.0.0.1 port={}' AS db (TYPE POSTGRES);".format(database, user, password, port)) | |
#query postgres tables to test connectivity | |
pg_tables = duckdb.sql("select * from pg_tables") | |
#read csv file | |
fake_currency = duckdb.read_csv("data/fake_currency_data.csv") | |
#use csv dataframe variable as sql view without convertions | |
temp_var = duckdb.sql("select * from fake_currency where Country = 'UK'") | |
#convert to pandas dataframe | |
temp_df = temp_var.df() | |
#create sparksession | |
spark = session.builder.getOrCreate() | |
#pandas dataframe to pyspark dataframe | |
UK_df = spark.createDataFrame(temp_df) | |
#pyspark transformations | |
UK_df = UK_df.withColumn("HighThickness", when((col("Thickness") > 0.05), col("Thickness"))) | |
UK_df = UK_df.where(col("SecurityFeatures") == "Watermark") | |
UK_df = UK_df.toPandas() | |
#data load or write to postgres sql | |
duckdb.execute("CREATE TABLE IF NOT EXISTS db.uk_watername_sample AS FROM UK_df") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment