Skip to content

Instantly share code, notes, and snippets.

View szczeles's full-sized avatar

Mariusz Strzelecki szczeles

View GitHub Profile
@szczeles
szczeles / SparkOperator.py
Created January 31, 2017 08:04
SparkOperator for airflow designed to simplify work with Spark on YARN. Simplifies using spark-submit in airflow DAGs, retrieves application id and tracking URL from logs and ensures YARN application is killed on timeout
from airflow.models import BaseOperator
import logging
from subprocess import Popen, STDOUT, PIPE
from airflow.exceptions import AirflowException, AirflowTaskTimeout
from airflow.utils.decorators import apply_defaults
'''
SparkOperator for airflow designed to simplify work with Spark on YARN.
Simplifies using spark-submit in airflow DAGs, retrieves application id
and tracking URL from logs and ensures YARN application is killed on timeout.
from oauthenticator.google import LocalGoogleOAuthenticator
c.JupyterHub.authenticator_class = LocalGoogleOAuthenticator
c.LocalGoogleOAuthenticator.oauth_callback_url = 'https://jupyterhub.wikia-services.com/hub/oauth_callback'
c.LocalGoogleOAuthenticator.client_id = '__OAUTH_CLIENT_ID__'
c.LocalGoogleOAuthenticator.client_secret = '__OAUTH_CLIENT_SECRET__'
c.LocalGoogleOAuthenticator.create_system_users = True
c.LocalGoogleOAuthenticator.hosted_domain = 'wikia-inc.com'
c.LocalGoogleOAuthenticator.login_service = 'Google'
# Install OAuthenticator
sudo docker exec jupyterhub bash -c '/opt/conda/bin/pip install oauthenticator'
# Upgrade aws cli to enable access to secret manager
sudo pip install --upgrade awscli
# Configure jupyterhub to support OAuth
OAUTH_CLIENT_ID=`aws secretsmanager get-secret-value --secret-id oauth_client_id --region us-east-1 | jq -r .SecretString | head -c -1`
OAUTH_CLIENT_SECRET=`aws secretsmanager get-secret-value --secret-id oauth_client_secret --region us-east-1 | jq -r .SecretString | head -c -1`
aws s3 cp s3://de-bin/jupyterhub-emr/templates/jupyterhub_config.py - |
FROM alpine:3.10
ARG SPARK_VERSION=3.0.0-preview
ARG HADOOP_VERSION_SHORT=3.2
ARG HADOOP_VERSION=3.2.0
ARG AWS_SDK_VERSION=1.11.375
RUN apk add --no-cache bash openjdk8-jre python3
# Download and extract Spark
@szczeles
szczeles / job.py
Created March 31, 2020 13:25
Integration tests of Spark applications
from pyspark.sql import SparkSession
from argparse import ArgumentParser
# parse arguments
parser = ArgumentParser()
parser.add_argument('--input-events', help='Events, parquet format')
parser.add_argument('--input-ads', help='Ads, JSON format')
parser.add_argument('--output-joined', help='Output location of enriched data')
parser.add_argument('--output-invalid', help='Invalid data')
parser.add_argument('--dt', help='Date partition indicator')
import unittest
import shutil
import os
import json
from datetime import datetime
from pyspark.sql import SparkSession
class TestIntegration(unittest.TestCase):
INPUT_EVENTS = "/tmp/input_events"
import subprocess
class TestIntegration(unittest.TestCase):
def test_enrichment(self):
# given
self.add_event(
ts=datetime(2020, 3, 31, 13, 15),
user_id='USER1',
ad_id='AD1')
@szczeles
szczeles / then.py
Created March 31, 2020 15:26
then.py
class TestIntegration(unittest.TestCase):
def test_enrichment(self):
# given
self.add_event(
ts=datetime(2020, 3, 31, 13, 15),
user_id='USER1',
ad_id='AD1')
self.add_ad(
id='AD1',