This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def s3_to_http(url): | |
if url.startswith('s3://'): | |
s3_path = url | |
bucket = s3_path[5:].split('/')[0] | |
object_name = '/'.join(s3_path[5:].split('/')[1:]) | |
return 'https://s3.amazonaws.com/{0}/{1}'.format(bucket, object_name) | |
else: | |
return url |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def set_task_status(**kwargs): | |
dag_id = kwargs["dag_id"] | |
task_id = kwargs["task_id"] | |
start_date = kwargs["start_date"] | |
end_date = kwargs["end_date"] | |
session = settings.Session() | |
print("session: ", str(session)) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import SparkSession | |
# 1. Get the hadoop version used by your spark installation along with the spark version | |
spark = SparkSession.builder.master("local").getOrCreate() | |
print(f"Hadoop version: {spark._jvm.org.apache.hadoop.util.VersionInfo.getVersion()}") | |
print(f"Spark Version: {spark.version}") | |
# 2. Reading data from a a public S3 bucket without configuring AWS credentials, package could've been set with the pyspark job run |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
aws emr create-cluster --release-label emr-5.31.0 --applications Name=Spark \ | |
--instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole \ | |
--ec2-attributes InstanceProfile=EMR_EC2_DefaultRole --configurations https://s3.amazonaws.com/mybucket/myfolder/myConfig.json | |
# Fleet Config json examples | |
# 1. https://github.com/awsdocs/amazon-emr-management-guide/blob/master/doc_source/emr-instance-fleet.md | |
# 2. https://aws.amazon.com/blogs/aws/new-amazon-emr-instance-fleets/ | |
# 3. https://medium.com/finbox/easy-steps-to-optimise-your-aws-emr-performance-and-reduce-cost-ba4bd115973 | |
aws emr create-cluster --release-label emr-5.4.0 \ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This gist is completely based on https://gist.github.com/sgtoj/af0ed637b1cc7e869b21a62ef56af5ac, | |
# with a minor improvement such that you dont have to call aws sso login from outside, its handled inside using subprocess | |
# Assumes you have already installed aws-cli | |
#!/usr/bin/env python3 | |
import boto3 | |
import json | |
import os | |
import subprocess |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import pymysql as db | |
import pandas as pd | |
from sshtunnel import SSHTunnelForwarder | |
logging.basicConfig( | |
format="%(asctime)s : %(levelname)s : %(message)s", level=logging.DEBUG | |
) | |
# ssh variables |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import SparkSession | |
spark = SparkSession.builder.master("local").appName('testapp').getOrCreate() | |
df = spark.createDataFrame([("col1:col2:col3",), | |
("1:a:2001",), | |
("2:b:2002",), | |
("3:c:2003",)], | |
["value"]) | |
df.show() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import s3fs | |
fs = s3fs.S3FileSystem(profile_name='aws_profile') | |
with fs.open("s3://my-bucket/file.csv", "rb") as f: | |
csv_df = pd.read_csv(f) | |
fs = s3fs.S3FileSystem(profile_name='other_aws_profile') | |
with fs.open("s3://another-bucket/file.csv", "wb") as f |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from os import path | |
from inspect import getframeinfo, stack | |
def get_abs_path(relative_path): | |
""" | |
Returns the absolute path for the given relative path, the relative path is calculated from where this function is | |
called | |
:param relative_path: Relative path from file_name location | |
:return: absolute path of the relative path |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pandas as pd | |
import difflib | |
df1 = pd.read_stata('path to first dataset') | |
df2 = pd.read_stata('path to second dataset') | |
def fix_spelling(x): | |
try: | |
return difflib.get_close_matches(x, df2['common_column'])[0] | |
except Exception: |
NewerOlder