Last active
March 25, 2020 05:23
-
-
Save gbraccialli/827a2180ffbfecec594483fb3875cf8d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
GIT_PROJECT = "xxxx" | |
PROJECT = "aaaaa" | |
USERNAME = "guilherme" | |
BRANCH = "develop" | |
SPARK_MODE = "local" # local or yarn | |
%run /home/jupyter/kedro_load.py $GIT_PROJECT $PROJECT $USERNAME $BRANCH $SPARK_MODE | |
###################################################################### | |
def randomString(stringLength=10): | |
import random, string | |
"""Generate a random string of fixed length """ | |
letters = string.ascii_lowercase | |
return ''.join(random.choice(letters) for i in range(stringLength)) | |
def init_spark(spark_hadoop_path,username): | |
import os | |
import findspark | |
findspark.init( | |
spark_hadoop_path | |
) | |
from pyspark.sql import SparkSession | |
if (SPARK_MODE=="yarn"): | |
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3" | |
queue = f"{username}_{randomString(5)}" | |
spark = ( | |
SparkSession.builder | |
.master("yarn") | |
.config("spark.sql.execution.arrow.enabled", "false") | |
.config("spark.yarn.queue",queue) | |
.appName(queue) | |
.getOrCreate() | |
) | |
else: | |
spark = ( | |
SparkSession.builder | |
.master("local[*]") | |
.config("spark.driver.memory", "4g") | |
.config("spark.sql.execution.arrow.enabled", "false") | |
.appName(f"app-{username}") | |
.getOrCreate() | |
) | |
return spark | |
import logging | |
import pathlib | |
import datetime | |
import os | |
import sys | |
import getpass | |
import importlib | |
GIT_PROJECT = sys.argv[1] | |
PROJECT = sys.argv[2] | |
USERNAME = sys.argv[3] | |
BRANCH = sys.argv[4] | |
SPARK_MODE = sys.argv[5] | |
spark = init_spark("/usr/lib/spark/",USERNAME) | |
from pyspark.sql import functions as F | |
from pyspark.sql import Window as W | |
from pyspark.sql.types import * | |
import pandas as pd | |
os.system("mkdir /tmp/code") | |
os.system("chmod 777 -R /tmp/code") | |
os.system(f"rm -rf /tmp/code/{USERNAME}_{PROJECT}/") | |
os.system(f"mkdir -p /tmp/code/{USERNAME}_{PROJECT}/") | |
os.system("chmod 777 -R /tmp/code") | |
sync_command1 = f"cd /tmp/code/{USERNAME}_{PROJECT}/; git clone https://github.com/xxx/{GIT_PROJECT}.git;" | |
os.system(sync_command1) | |
sync_command2 = f"cd /tmp/code/{USERNAME}_{PROJECT}/{GIT_PROJECT}/; git pull; git checkout {BRANCH};" | |
os.system(sync_command2) | |
#logging.info(sync_command) | |
#print(sync_command1) | |
#print(sync_command2) | |
LOCAL_BASE_PATH = f'/tmp/code/{USERNAME}_{PROJECT}/{GIT_PROJECT}/' | |
CONF_ROOT = "conf" | |
PROJECT_BASE_PATH = f'{LOCAL_BASE_PATH}' | |
LOGS_DIR = f'{LOCAL_BASE_PATH}/logs/' | |
VIZUALIZATION_DIR = f'{LOCAL_BASE_PATH}/conf/visualization' | |
pathlib.Path(LOGS_DIR).mkdir(parents=True, exist_ok=True) | |
pathlib.Path(VIZUALIZATION_DIR).mkdir(parents=True, exist_ok=True) | |
os.chdir(PROJECT_BASE_PATH) # Move to project root | |
path_to_add = f"{PROJECT_BASE_PATH}/src/" | |
if path_to_add not in sys.path: | |
sys.path.insert(0, path_to_add) | |
#logging.getLogger().handlers[0].setLevel(logging.ERROR) | |
logging.getLogger("py4j.java_gateway").setLevel(logging.ERROR) | |
from kedro.context import KedroContext, load_context | |
project_context = load_context(f"{PROJECT_BASE_PATH}") | |
io = project_context.io | |
catalog = project_context.catalog | |
pipeline = project_context.pipeline | |
logging.getLogger("kedro").setLevel(logging.ERROR) | |
logging.getLogger("kedro.io").setLevel(logging.ERROR) | |
logging.getLogger("kedro.pipeline").setLevel(logging.INFO) | |
def run_pipeline(pipeline): | |
from kedro.runner import SequentialRunner | |
SequentialRunner().run(pipeline, io) | |
import pandas as pd | |
pd.set_option('display.max_columns', None) | |
print("kedro loaded at {}".format(datetime.datetime.now())) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment