Skip to content

Instantly share code, notes, and snippets.

View revolutionisme's full-sized avatar

Biplob Biswas revolutionisme

View GitHub Profile
@revolutionisme
revolutionisme / convert.py
Created July 4, 2023 08:47 — forked from ghelobytes/convert.py
Convert s3 url to http url
def s3_to_http(url):
if url.startswith('s3://'):
s3_path = url
bucket = s3_path[5:].split('/')[0]
object_name = '/'.join(s3_path[5:].split('/')[1:])
return 'https://s3.amazonaws.com/{0}/{1}'.format(bucket, object_name)
else:
return url
@revolutionisme
revolutionisme / airflow_metadata_query.py
Created June 24, 2021 15:49
Query the Airflow metadata db from within a python operator
def set_task_status(**kwargs):
dag_id = kwargs["dag_id"]
task_id = kwargs["task_id"]
start_date = kwargs["start_date"]
end_date = kwargs["end_date"]
session = settings.Session()
print("session: ", str(session))
@revolutionisme
revolutionisme / pyspark_setup.py
Last active January 28, 2021 08:37
Pyspark settings to read various kinds of data from different sources from your local setup
from pyspark.sql import SparkSession
# 1. Get the hadoop version used by your spark installation along with the spark version
spark = SparkSession.builder.master("local").getOrCreate()
print(f"Hadoop version: {spark._jvm.org.apache.hadoop.util.VersionInfo.getVersion()}")
print(f"Spark Version: {spark.version}")
# 2. Reading data from a a public S3 bucket without configuring AWS credentials, package could've been set with the pyspark job run
@revolutionisme
revolutionisme / pyspark-emr.sh
Last active January 4, 2021 17:09
Script to configure an EMR cluster and launch it with a pyspark code
aws emr create-cluster --release-label emr-5.31.0 --applications Name=Spark \
--instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole \
--ec2-attributes InstanceProfile=EMR_EC2_DefaultRole --configurations https://s3.amazonaws.com/mybucket/myfolder/myConfig.json
# Fleet Config json examples
# 1. https://github.com/awsdocs/amazon-emr-management-guide/blob/master/doc_source/emr-instance-fleet.md
# 2. https://aws.amazon.com/blogs/aws/new-amazon-emr-instance-fleets/
# 3. https://medium.com/finbox/easy-steps-to-optimise-your-aws-emr-performance-and-reduce-cost-ba4bd115973
aws emr create-cluster --release-label emr-5.4.0 \
@revolutionisme
revolutionisme / aws_sso.py
Last active October 20, 2020 11:36
Generate temporary credentials based on aws sso for the specified profile name in your aws credentials file
# This gist is completely based on https://gist.github.com/sgtoj/af0ed637b1cc7e869b21a62ef56af5ac,
# with a minor improvement such that you dont have to call aws sso login from outside, its handled inside using subprocess
# Assumes you have already installed aws-cli
#!/usr/bin/env python3
import boto3
import json
import os
import subprocess
@revolutionisme
revolutionisme / tunnel_to_database_server.py
Created April 23, 2020 19:36
Python xample to run query using ssh tunneling
import logging
import pymysql as db
import pandas as pd
from sshtunnel import SSHTunnelForwarder
logging.basicConfig(
format="%(asctime)s : %(levelname)s : %(message)s", level=logging.DEBUG
)
# ssh variables
@revolutionisme
revolutionisme / split_data.py
Created April 12, 2020 16:54
Split data from one column to multiple columns using pyspark and remove first row which contains the headers
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName('testapp').getOrCreate()
df = spark.createDataFrame([("col1:col2:col3",),
("1:a:2001",),
("2:b:2002",),
("3:c:2003",)],
["value"])
df.show()
@revolutionisme
revolutionisme / s3_to_pandas.py
Created March 16, 2020 15:30
Reading and Writing data to/from s3 using pandas and s3fs
import pandas as pd
import s3fs
fs = s3fs.S3FileSystem(profile_name='aws_profile')
with fs.open("s3://my-bucket/file.csv", "rb") as f:
csv_df = pd.read_csv(f)
fs = s3fs.S3FileSystem(profile_name='other_aws_profile')
with fs.open("s3://another-bucket/file.csv", "wb") as f
@revolutionisme
revolutionisme / get_abs_path.py
Created October 17, 2019 09:56
Get the absolute path for the given relative path, the relative path is calculated from the module where this function is called
from os import path
from inspect import getframeinfo, stack
def get_abs_path(relative_path):
"""
Returns the absolute path for the given relative path, the relative path is calculated from where this function is
called
:param relative_path: Relative path from file_name location
:return: absolute path of the relative path
@revolutionisme
revolutionisme / merge_after_spelling_fix.py
Created March 7, 2019 09:40
Fix spelling on one column of one of the dataset based on similar column in other dataset
import pandas as pd
import difflib
df1 = pd.read_stata('path to first dataset')
df2 = pd.read_stata('path to second dataset')
def fix_spelling(x):
try:
return difflib.get_close_matches(x, df2['common_column'])[0]
except Exception: