Last active
December 2, 2020 13:23
-
-
Save karlschriek/c8d9ed0e410f49e29fc8f946df7f8ce3 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
import os | |
import getpass | |
import requests | |
import os | |
import io | |
import re | |
import sys | |
import stat | |
import requests | |
import platform | |
import time | |
import zipfile | |
from selenium.webdriver import Chrome | |
from selenium.webdriver.chrome.options import Options | |
import kfp | |
DRIVER_ROOT_URL = 'https://chromedriver.storage.googleapis.com' | |
SUPPORTED_PLATFORMS = { | |
'Windows': 'win32', | |
'Darwin': 'mac64', | |
'Linux': 'linux64' | |
} | |
def validate_creds(username, password): | |
if not username or not password: | |
logging.info("Please provide AWS Cognito credentials below") | |
username = input("Username: ") | |
password = getpass.getpass("Password: ") | |
return username, password | |
def make_cookie_dict(cookie_list, include=None): | |
if include==None: | |
include = ["AWSELBAuthSessionCookie-0", "AWSELBAuthSessionCookie-1"] | |
return {cookie["name"]: cookie["value"] for cookie in cookie_list if cookie["name"] in include} | |
def make_cookie_string(cookie_dict): | |
cookie_strings=[f"{k}={v}" for k,v in cookie_dict.items()] | |
return ";".join(cookie_strings) | |
def get_auth_cookie(host, username=None, password=None, from_cache=True): | |
"""This function communicates with AWS auth services and returns | |
credentials requried for communicating with KFP API Server | |
Current dom only works for cognito itself. We might extend to other IDP pages later. | |
Args: | |
host (string): KFP API server URL | |
username (string): username for authentication | |
password (string): user account password for authentication | |
Returns: | |
auth_cookie (string): Returns a string | |
example: | |
:: | |
'AWSELBAuthSessionCookie-0=0YVVtNnipQ...cjmqrMOuYmpTxvo' | |
""" | |
cache = DiskCache('elb_session_cookie') | |
if from_cache: | |
cached_creds = cache.get(expires_in=86400) | |
if cached_creds: | |
return cached_creds | |
else: | |
logging.info( | |
'Credentials are not found in the cache, trying to login' | |
) | |
driver = get_chrome_driver() | |
driver.get(host) | |
username, password = validate_creds(username, password) | |
# Setting the value of email input field | |
driver.execute_script( | |
'var element = document.getElementById("signInFormUsername");' + | |
'element.value = "{}";'.format(username) | |
) | |
# Setting the value of password input field | |
driver.execute_script( | |
'var element = document.getElementById("signInFormPassword");' + | |
'element.value = "{}";'.format(password) | |
) | |
# Submitting the form or click the sign in button | |
driver.execute_script( | |
'document.getElementsByName("signInSubmitButton")[0].click();' | |
) | |
cookies_list = driver.get_cookies() | |
cookies_dict = make_cookie_dict(cookies_list) | |
cookie_string = make_cookie_string(cookies_dict) | |
cache.save(cookie_string) | |
return cookie_string | |
class DiskCache: | |
""" Helper class for caching data on disk.""" | |
_DEFAULT_CACHE_ROOT = os.path.join(os.path.expanduser('~'), '.config', 'kfp') | |
def __init__(self, name): | |
self.name = name | |
self.cache_path = os.path.join(self._DEFAULT_CACHE_ROOT, self.name) | |
if not os.path.exists(self._DEFAULT_CACHE_ROOT): | |
os.makedirs(self._DEFAULT_CACHE_ROOT) | |
def save(self, content): | |
""" cache content in a known location | |
Args: | |
content (str): content to be cached | |
""" | |
with open(self.cache_path, 'w') as cacheFile: | |
cacheFile.write(content) | |
def get(self, expires_in=None): | |
"""[summary] | |
Args: | |
expires_in (int, optional): Time in seconds since last modifed | |
when the cache is valid. Defaults to None which means for ever. | |
Returns: | |
(str): retrived cached content or None if not found | |
""" | |
try: | |
if expires_in and time.time() - os.path.getmtime(self.cache_path) > expires_in: | |
return | |
with open(self.cache_path) as cacheFile: | |
return cacheFile.read() | |
except FileNotFoundError: | |
return | |
def os_name(): | |
pl = sys.platform | |
if pl == "linux" or pl == "linux2": | |
return "linux" | |
elif pl == "darwin": | |
return "mac" | |
elif pl == "win32": | |
return "win" | |
def get_chrome_version(): | |
pattern = r'\d+\.\d+\.\d+' | |
cmd_mapping = { | |
"linux": 'google-chrome --version || google-chrome-stable --version', | |
"mac": r'/Applications/Google\ Chrome.app/Contents/MacOS/Google\ Chrome --version', | |
"win": r'reg query "HKEY_CURRENT_USER\Software\Google\Chrome\BLBeacon" /v version' | |
} | |
cmd = cmd_mapping[os_name()] | |
stdout = os.popen(cmd).read() | |
version = re.search(pattern, stdout) | |
if not version: | |
raise ValueError('Could not get version for Chrome with this command: {}'.format(cmd)) | |
current_version = version.group(0) | |
return current_version | |
def get_latest_release_version(): | |
latest_release_url = "{}/LATEST_RELEASE".format(DRIVER_ROOT_URL) | |
chrome_version = get_chrome_version() | |
logging.info("Get LATEST driver version for {}".format(chrome_version)) | |
resp = requests.get("{}_{}".format(latest_release_url,chrome_version)) | |
return resp.text.rstrip() | |
def get_chrome_driver_path(): | |
if os_name() == "win": | |
return os.path.join(os.path.expanduser('~'), '.config', 'kfp', 'chromedriver.exe') | |
else: | |
return os.path.join(os.path.expanduser('~'), '.config', 'kfp', 'chromedrivery') | |
def download_driver(): | |
""" Helper function for downloading the driver | |
""" | |
osy = SUPPORTED_PLATFORMS[platform.system()] | |
chrome_release_version = get_latest_release_version() | |
driver_url = '{}/{}/chromedriver_{}.zip'.format(DRIVER_ROOT_URL, chrome_release_version, osy); | |
logging.info('Downloading driver {} ...'.format(driver_url)) | |
r = requests.get(driver_url) | |
z = zipfile.ZipFile(io.BytesIO(r.content)) | |
path = os.path.dirname(get_chrome_driver_path()) | |
if not os.path.exists(path): | |
os.makedirs(path) | |
z.extractall(path) | |
os.chmod(get_chrome_driver_path(), stat.S_IEXEC) | |
return os.path.join(get_chrome_driver_path(), "drivers", "chromedriver", | |
osy, chrome_release_version, "chromedriver") | |
def get_chrome_driver(): | |
""" Download Chrome driver if it doesn't already exists | |
""" | |
options = Options() | |
options.add_argument('--headless') | |
options.add_argument('--disable-gpu') | |
if not os.path.exists(get_chrome_driver_path()): | |
logging.info('Selenium driver not found, trying to download one') | |
download_driver() | |
return Chrome(executable_path=get_chrome_driver_path(), options=options) | |
host="https://www.mysubsomain.mydomain.com" //this has corresspond to your Cognito app client callback, e.g. www.mysubsomain.mydomain.com/oauth2/idpresponse | |
cookies = get_auth_cookie(host, from_cache=True) #set to False if you always want to fetch a new cookie. Not yet tested if it correctly expires an old one | |
client = kfp.Client(host=f'{host}/pipeline', | |
namespace="karl-schriek", | |
cookies=cookies) | |
client.create_experiment(name="test_cookies", namespace="karl-schriek") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment