This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from airflow.models import BaseOperator | |
import logging | |
from subprocess import Popen, STDOUT, PIPE | |
from airflow.exceptions import AirflowException, AirflowTaskTimeout | |
from airflow.utils.decorators import apply_defaults | |
''' | |
SparkOperator for airflow designed to simplify work with Spark on YARN. | |
Simplifies using spark-submit in airflow DAGs, retrieves application id | |
and tracking URL from logs and ensures YARN application is killed on timeout. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from oauthenticator.google import LocalGoogleOAuthenticator | |
c.JupyterHub.authenticator_class = LocalGoogleOAuthenticator | |
c.LocalGoogleOAuthenticator.oauth_callback_url = 'https://jupyterhub.wikia-services.com/hub/oauth_callback' | |
c.LocalGoogleOAuthenticator.client_id = '__OAUTH_CLIENT_ID__' | |
c.LocalGoogleOAuthenticator.client_secret = '__OAUTH_CLIENT_SECRET__' | |
c.LocalGoogleOAuthenticator.create_system_users = True | |
c.LocalGoogleOAuthenticator.hosted_domain = 'wikia-inc.com' | |
c.LocalGoogleOAuthenticator.login_service = 'Google' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Install OAuthenticator | |
sudo docker exec jupyterhub bash -c '/opt/conda/bin/pip install oauthenticator' | |
# Upgrade aws cli to enable access to secret manager | |
sudo pip install --upgrade awscli | |
# Configure jupyterhub to support OAuth | |
OAUTH_CLIENT_ID=`aws secretsmanager get-secret-value --secret-id oauth_client_id --region us-east-1 | jq -r .SecretString | head -c -1` | |
OAUTH_CLIENT_SECRET=`aws secretsmanager get-secret-value --secret-id oauth_client_secret --region us-east-1 | jq -r .SecretString | head -c -1` | |
aws s3 cp s3://de-bin/jupyterhub-emr/templates/jupyterhub_config.py - | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
FROM alpine:3.10 | |
ARG SPARK_VERSION=3.0.0-preview | |
ARG HADOOP_VERSION_SHORT=3.2 | |
ARG HADOOP_VERSION=3.2.0 | |
ARG AWS_SDK_VERSION=1.11.375 | |
RUN apk add --no-cache bash openjdk8-jre python3 | |
# Download and extract Spark |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import SparkSession | |
from argparse import ArgumentParser | |
# parse arguments | |
parser = ArgumentParser() | |
parser.add_argument('--input-events', help='Events, parquet format') | |
parser.add_argument('--input-ads', help='Ads, JSON format') | |
parser.add_argument('--output-joined', help='Output location of enriched data') | |
parser.add_argument('--output-invalid', help='Invalid data') | |
parser.add_argument('--dt', help='Date partition indicator') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import unittest | |
import shutil | |
import os | |
import json | |
from datetime import datetime | |
from pyspark.sql import SparkSession | |
class TestIntegration(unittest.TestCase): | |
INPUT_EVENTS = "/tmp/input_events" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import subprocess | |
class TestIntegration(unittest.TestCase): | |
def test_enrichment(self): | |
# given | |
self.add_event( | |
ts=datetime(2020, 3, 31, 13, 15), | |
user_id='USER1', | |
ad_id='AD1') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class TestIntegration(unittest.TestCase): | |
def test_enrichment(self): | |
# given | |
self.add_event( | |
ts=datetime(2020, 3, 31, 13, 15), | |
user_id='USER1', | |
ad_id='AD1') | |
self.add_ad( | |
id='AD1', |