Created
May 23, 2021 13:26
-
-
Save saisgit/0b885f04b6d55a9c8beb5b7f9d646613 to your computer and use it in GitHub Desktop.
Program to Create Spark dataframe on Snowflake Table from AWS EMR
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Program to Create spark dataframe on Snowflake Table from AWS EMR. | |
| # Launch pyspark with SnowFlake jdbc connectors | |
| # pyspark --packages net.snowflake:snowflake-jdbc:3.11.1,net.snowflake:spark-snowflake_2.11:2.5.7-spark_2.4 | |
| import boto3 | |
| import json | |
| # Create Spark Session | |
| spark = SparkSession.builder.appName("Connect-Snowflake").enableHiveSupport().getOrCreate() | |
| # Function takes secretkey value that defined in AWS secrets manager and returns snowflake credentials as json | |
| def get_secret_credentials(secret_key): | |
| session = boto3.session.Session() | |
| client = session.client(service_name='secretsmanager',region_name='us-east-1') | |
| secret_response = client.get_secret_value(SecretId=secret_key) | |
| secret = json.loads(secret_response['secretString']) | |
| return secret | |
| # Call above function by passing secretkey | |
| secrets_json = get_secret_credentials('test/snowflake/cluster') | |
| # Define options with all required key-value pairs for snowflake connection | |
| snowflake_credentials = { "sfUrl" : secrets_json['account']+".snowflakecomputing.com", | |
| "sfUsername" : secrets_json['test_user'], | |
| "sfPassword" : secrets_json['test_password'], | |
| "sfRole" : secrets_json['sf_role'], | |
| "sfDatabase" : "test_db", | |
| "sfSchema" : "test_schema", | |
| "sfWarehouse" : secrets_json['test_wareohouse'] | |
| } | |
| # Sample query on Snowflake table | |
| my_query = "select * from sfdb.test_table limit 1" | |
| # Create spark dataframe by passing above snowflake options and query | |
| df = spark.read.format("net.snowflake.spark.snowflake").options(**snowflake_credentials).option("query",my_query).load() | |
| df.show() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment