Skip to content

Instantly share code, notes, and snippets.

View djouallah's full-sized avatar

Mimoune djouallah

View GitHub Profile
select `WKT`,
`C1`,
`C2`
from
(
select `WKT`,
sum(`area`) as `C1`,
min(`WKT`) as `C2`
from
(
let
parquetfunctin =(params) =>
let
Parquet = Parquet.Document(Binary.Buffer(Web.Contents("https://nyc-tlc.s3.amazonaws.com/trip+data/",[RelativePath=params])))
in
Parquet,
from datetime import datetime, date, timedelta
import urllib.request as urllib2
import tempfile
import pandas as pd
import pyarrow as pa
import pyarrow.dataset as ds
import re ,shutil
from urllib.request import urlopen
import os
import adlfs
#V order thing you can ignore those two lines
spark.conf.set("spark.sql.parquet.vorder.enabled", "true")
spark.conf.set("spark.microsoft.delta.optimizeWrite.enabled", "true")
#Load from the default lakehouse, make sure you click on the pin
from pyspark.sql.types import *
df = spark.read.option("header", "true").format("csv").load("Files/csv/*.csv")
df.write.mode("overwrite").format("delta").save("Tables/tablecsv")
@djouallah
djouallah / gist:998571cf7560fb697ed174d1ef65b7fe
Created July 5, 2023 11:14
load latest metadata iceberg
import boto3
import pandas as pd
s3_client = boto3.client('s3')
bucket = 'xxxxxxx'
prefix = 'zzzz/yyyyyy/metadata'
paginator = s3_client.get_paginator('list_objects_v2')
response_iterator = paginator.paginate(Bucket=bucket, Prefix=prefix)
file_names = pd.DataFrame(columns=['file','date'])
for response in response_iterator:
for object_data in response['Contents']:
@djouallah
djouallah / delta.py
Created October 25, 2023 12:50
write to Onelake using Python
%%time
!pip install -q duckdb
!pip install -q deltalake
import duckdb
from deltalake.writer import write_deltalake
from trident_token_library_wrapper import PyTridentTokenLibrary
aadToken = PyTridentTokenLibrary.get_access_token("storage")
sf =1
for x in range(0, sf) :
con=duckdb.connect()
SELECT
--Query01
l_returnflag,
l_linestatus,
SUM(l_quantity) AS sum_qty,
SUM(l_extendedprice) AS sum_base_price,
SUM(l_extendedprice * (1 - l_discount)) AS sum_disc_price,
SUM(l_extendedprice * (1 - l_discount) * (1 + l_tax)) AS sum_charge,
AVG(l_quantity) AS avg_qty,
AVG(l_extendedprice) AS avg_price,
import requests
import time
import json
import base64
def get_notebook_content(notebook_id_or_name):
nb = notebookutils.notebook.get(notebook_id_or_name)
workspaceId = nb['workspaceId']
notebookId = nb['id']
format = 'ipynb'
COPY (select '' as filename, null as year ) TO "/lakehouse/default/Files/scada" (FORMAT PARQUET, PARTITION_BY (year),OVERWRITE_OR_IGNORE ) ;
SET VARIABLE list_of_files = (select list(file) from glob("/lakehouse/default/Files/Daily_Reports/*.CSV")
where parse_filename(file) not in
(select filename from read_parquet("/lakehouse/default/Files/scada/*/*.parquet"))) ;
create or replace view raw as (select * from read_csv(getvariable('list_of_files'),Skip=1,header =0,all_varchar=1,
columns={
'I': 'VARCHAR','UNIT': 'VARCHAR','XX': 'VARCHAR','VERSION': 'VARCHAR','SETTLEMENTDATE': 'VARCHAR','RUNNO': 'VARCHAR',
import boto3
s3_resource = boto3.resource('s3',
region_name = "us-east-1" ,
endpoint_url = "zzzzzzzzzzzzzzzzzzzz" ,
aws_access_key_id = "uuuuuuuuuuuuuuuuuu",
aws_secret_access_key = "xxxxxxxxxxxxxxxxxxxxxxxxxxx"
)
bucket_name = "uuuuu"
bucket = s3_resource.Bucket(bucketName)