Created
May 15, 2024 14:06
-
-
Save Databracket9/90dc492ebef0b6a81d3977bbe0a97666 to your computer and use it in GitHub Desktop.
Demonstration to learn how to perform ETL operation on snowflake datasets using Python with Snowpark and store in S3.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import snowflake.snowpark as snowpark | |
from datetime import date | |
def create_stage(session: snowpark.Session, stage_name: str, s3_arn:str, aws_id:str, aws_secret:str): | |
query = "create or replace temporary stage {} url='{}' CREDENTIALS=(AWS_KEY_ID='{}' AWS_SECRET_KEY='{}')".format(stage_name, s3_arn, aws_id, aws_secret) | |
state = snowpark.Session.sql(self=session, query=query) | |
return state | |
def query_data(session: snowpark.Session, query: str, date: str): | |
table = snowpark.Session.sql(self=session, query=query) | |
return table | |
def write_to_s3(session: snowpark.Session, format: str, write_query: str, dataframe: snowpark.DataFrame): | |
dataframe.create_or_replace_temp_view("write_table") | |
status = snowpark.Session.sql(self=session, query=write_query) | |
return status | |
def main(session: snowpark.Session): | |
stage_name = "test_stage" | |
referral_type="PAID" | |
campaign_date = "22-06-01" | |
file_format = "parquet" | |
file_name = date.today() | |
dataframe = create_stage(session=session, stage_name=stage_name,s3_arn="s3://snowflakewrite", aws_id="aws_id", aws_secret="aws_secret") | |
query = "select * from google_keywords_search_dataset__discover_all_searches_on_google.datafeeds.google_keywords where REFERRAL_TYPE = '{}' and DATE = '{}' order by COUNTRY asc limit 2".format(referral_type, campaign_date) | |
dataframe = query_data(session=session, query=query, date=campaign_date) | |
write_query = "COPY INTO @{}/referral_datasets/{} from write_table FILE_FORMAT = (TYPE = {})".format(stage_name, file_name, file_format) | |
status = write_to_s3(session=session, format="parquet", write_query=write_query, dataframe=dataframe) | |
return status |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment