Last active
February 25, 2021 17:04
-
-
Save kuharan/90bd52d5faa970c82717f13eaa4d6bec to your computer and use it in GitHub Desktop.
AWS Lambda Function to automate DP APIs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import requests | |
import time | |
from pytz import timezone | |
import datetime | |
status_dict = {'0':'Running','1':'Running with Errors','12':'Running with Failures','15':'Queueing with Failures'} | |
session_type_dict = {'0':'Backup'} | |
def login(): | |
session = requests.session() | |
url = "https://xxx.xxx.xxx.com:xxx/auth/realms/DataProtector/protocol/openid-connect/token" | |
payload = {"username":"user_name|*|xxx.xxx.xxx.com", "password":"password", "client_id":"dp-gui", "grant_type":"password"} | |
response = json.loads(session.post(url, data = payload, verify = r"path_to_cert\xxx.xxx.xxx.com_cacert.pem").text) | |
access_token = response['access_token'] | |
return access_token,session | |
def check_session(access_token, session): | |
url = "https://xxx.xxx.xxx.com:xxx/idb/sessions/filter/" | |
payload = "{\r\n \"filter\": {\r\n \"status\": [0,1,12,15],\r\n \"sessionType\": [0]\r\n }\r\n}" | |
headers = { | |
'Authorization': 'Bearer {}'.format(access_token), | |
'Content-Type': 'application/json' | |
} | |
response = json.loads(session.post(url, headers=headers, data = payload, verify = r"path_to_cert\xxx.xxx.xxx.com_cacert.pem").text) | |
return response | |
def check_duration(session_start_time, hours): | |
epoch_time = int(time.time()) | |
if (epoch_time - session_start_time) > hours * 3600: | |
return True | |
else: | |
return False | |
def convert_epoch_to_datetime(epoch_time): | |
gmt_time = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(epoch_time)) | |
cst_time = datetime.datetime.strptime(gmt_time, '%Y-%m-%d %H:%M:%S').astimezone(timezone('US/Central')) | |
return str(cst_time)+" CT" | |
def lambda_handler(event, context): | |
access_token,session = login() | |
response = check_session(access_token, session) | |
long_running_sessions = [] | |
for item in range(response['items_count']): | |
output={} | |
long_run_status = check_duration(session_start_time=response['items'][item]['startTime'], hours=2) | |
if long_run_status: | |
output['Name'] = response['items'][item]['name'] | |
output['Datalist'] = response['items'][item]['datalist'] | |
output['SessionType'] = session_type_dict[str(response['items'][item]['sessionType'])] | |
output['Status'] = status_dict[str(response['items'][item]['status'])] | |
output['StartTime'] = convert_epoch_to_datetime(response['items'][item]['startTime']) | |
output['Owner'] = response['items'][item]['owner'] | |
long_running_sessions.append(output) | |
url = "https://api-gateway.us-east-1.amazonaws.com/default/lambda_name" | |
headers = { | |
'Content-Type': 'application/json' | |
} | |
response = requests.get(url, headers=headers, data = json.dumps(long_running_sessions)) | |
print(response.text.encode('utf8')) | |
lambda_handler(None, None) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment