Last active
March 5, 2021 03:43
-
-
Save asmaier/5768c7cda3620901440a62248614bbd0 to your computer and use it in GitHub Desktop.
Pyspark script for downloading a single parquet file from Amazon S3 via the s3a protocol. It also reads the credentials from the "~/.aws/credentials", so we don't need to hardcode them. See also https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html .
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# | |
# Some constants | |
# | |
aws_profile = "your_profile" | |
aws_region = "your_region" | |
s3_bucket = "your_bucket" | |
# | |
# Reading environment variables from aws credential file | |
# | |
import os | |
import configparser | |
config = configparser.ConfigParser() | |
config.read(os.path.expanduser("~/.aws/credentials")) | |
access_id = config.get(aws_profile, "aws_access_key_id") | |
access_key = config.get(aws_profile, "aws_secret_access_key") | |
# | |
# Configuring pyspark | |
# | |
# see https://github.com/jupyter/docker-stacks/issues/127#issuecomment-214594895 | |
# and https://github.com/radanalyticsio/pyspark-s3-notebook/blob/master/s3-source-example.ipynb | |
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell" | |
# If this doesn't work you might have to delete your ~/.ivy2 directory to reset your package cache. | |
# (see https://github.com/databricks/spark-redshift/issues/244#issuecomment-239950148) | |
import pyspark | |
sc=pyspark.SparkContext() | |
# see https://github.com/databricks/spark-redshift/issues/298#issuecomment-271834485 | |
sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true") | |
# see https://stackoverflow.com/questions/28844631/how-to-set-hadoop-configuration-values-from-pyspark | |
hadoop_conf=sc._jsc.hadoopConfiguration() | |
# see https://stackoverflow.com/questions/43454117/how-do-you-use-s3a-with-spark-2-1-0-on-aws-us-east-2 | |
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") | |
hadoop_conf.set("com.amazonaws.services.s3.enableV4", "true") | |
hadoop_conf.set("fs.s3a.access.key", access_id) | |
hadoop_conf.set("fs.s3a.secret.key", access_key) | |
# see http://blog.encomiabile.it/2015/10/29/apache-spark-amazon-s3-and-apache-mesos/ | |
hadoop_conf.set("fs.s3a.connection.maximum", "100000") | |
# see https://docs.aws.amazon.com/general/latest/gr/rande.html#s3_region | |
hadoop_conf.set("fs.s3a.endpoint", "s3." + aws_region + ".amazonaws.com") | |
# | |
# Downloading the parquet file | |
# | |
sql=pyspark.sql.SparkSession(sc) | |
path = s3_bucket + "your_path" | |
dataS3=sql.read.parquet("s3a://" + path) |
Try to use "s3a" and not "s3" .
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hello Techmates,
I have created my aws free account and uploaded a weather file in a bucket (region:: sa-east-1 :: South America).
Afterwards, I have been trying to read a file from AWS S3 bucket by pyspark as below::
from pyspark import SparkConf, SparkContext
ak=''
sk=''
sc._jsc.hadoopConfiguration().set("fs.s3.impl","org.apache.hadoop.fs.s3.S3FileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3.awsAccessKeyId",ak)
sc._jsc.hadoopConfiguration().set("fs.s3.awsSecretAccessKey",sk)
a=sc.textFile("s3://bucket_name/weatherhistory.txt");
a.collect()
But it is showing :: /weatherhistory.txt does not exists.
But, when am trying the same using python (boto3), I can easily read the file.
import boto
import boto.s3.connection
access_key = ''
secret_key = ''
conn = boto.connect_s3(bucket_name,
aws_access_key_id = access_key,
aws_secret_access_key = secret_key)
.....
.....
Even have listed the keys spark-default.conf as well
[default]
aws_access_key_id=*****
aws_secret_access_key=*****
But, still the error is appearing as :: /weatherhistory.txt does not exists.
have tried this approach as well but the error is same.
conf = (SparkConf()
.setAppName("S3 Configuration Test")
.set("spark.executor.instances", "1")
.set("spark.executor.cores", 1)
.set("spark.executor.memory", "2g")
.set("fs.s3.awsAccessKeyId", "")
.set("fs.s3.awsSecretAccessKey", "")
.set("fs.s3.endpoint", "s3-sa-east-1.amazonaws.com")
.set("com.amazonaws.services.s3.enableV4", "true")
.set("fs.s3.impl", "org.apache.hadoop.fs.s3.S3FileSystem"))
sc.conf=conf
a=sc.textFile("s3://bucketname/weatherhistory.txt")
Even have tried to write a file thinking that my directory pointing was not correct and if the file write is successful, could pin point the path where it is pointing now but still no progress and say no path exists.
If you please could guide us in this regard, it would really be helpful. Thanks in advance.