Skip to content

Instantly share code, notes, and snippets.

@cankush625
Created August 3, 2021 10:01
Show Gist options
  • Save cankush625/57740aec06bf18841f7313278472b7ce to your computer and use it in GitHub Desktop.
Save cankush625/57740aec06bf18841f7313278472b7ce to your computer and use it in GitHub Desktop.
import requests
import concurrent.futures
import json
import logging
# AWS pricing API Endpoint
url = "https://a0.p.awsstatic.com/pricing/1.0/ec2/region/{region}/reserved-instance/linux/index.json"
class AWSService:
# Retrieving EC2 instance prices from AWS EC2 Pricing API
@classmethod
def get_ec2_prices(cls, **kwargs):
dict_of_args = dict(kwargs)
prices_list = []
try:
response = requests.get(url.format(region=dict_of_args["region"]))
try:
pricing_data = response.json()
prices = pricing_data["prices"]
for price in prices:
if cls.is_valid_input(price, **dict_of_args):
prices_list.append(price)
except Exception as exc:
cls.exception_handler(exc)
except Exception as exc:
cls.exception_handler(exc)
return prices_list
@staticmethod
def is_valid_input(price, **kwargs):
base_argument = price["attributes"]
original_values_list = [
base_argument["aws:offerTermOfferingClass"],
base_argument["aws:offerTermLeaseLength"],
base_argument["aws:offerTermPurchaseOption"],
base_argument["aws:ec2:operatingSystem"],
base_argument["aws:ec2:tenancy"],
]
dict_of_args = dict(kwargs)
user_values_list = [
dict_of_args["term_offering_class"],
dict_of_args["term_length"],
dict_of_args["payment_option"],
dict_of_args["operating_system"],
dict_of_args["tenancy"],
]
for index in range(len(original_values_list)):
if original_values_list[index] != user_values_list[index]:
return False
return True
@classmethod
def save_prices_in_file(cls, prices_list, file_id):
prices = json.dumps(prices_list)
try:
with open(f"./prices/ec2_price_{file_id}.json", "w") as prices_file:
prices_file.write(prices)
except Exception as exc:
cls.exception_handler(exc)
@staticmethod
def exception_handler(exception):
# Store possible exceptions in a map of key as exception class and value as a message
exception_map = {
json.decoder.JSONDecodeError: "JSON Decode Error",
KeyError: "Key not found in JSON response",
requests.exceptions.InvalidSchema: "Invalid URL. No connection adapters found",
requests.exceptions.HTTPError: "HTTP error",
requests.exceptions.ConnectionError: "Failed to establish a new connection",
requests.exceptions.RequestException: "Request Exception",
FileNotFoundError: "The prices directory is not present",
}
message = exception_map.get(exception.__class__)
if message is None:
message = exception
logging.exception(message, exc_info=False)
# Running get_ec2_prices and save_prices_in_file function using this function
# This will enable us to run both functions in a single thread as both functions are dependent on each other
# This will create new file for every input using file_id
@staticmethod
def get_and_save_ec2_prices(*args):
list_of_args = list(args)
file_id = list_of_args[0]
reg = list_of_args[1]
term_offer_cls = list_of_args[2]
term_len = list_of_args[3]
payment_opt = list_of_args[4]
prices_list = AWSService.get_ec2_prices(
region=reg,
term_offering_class=term_offer_cls,
term_length=term_len,
payment_option=payment_opt,
operating_system="Linux",
tenancy="Shared",
)
# If prices_list is not empty then save the prices_list data into the file
if len(prices_list) != 0:
AWSService.save_prices_in_file(prices_list, file_id)
# List of inputs
# list_of_input[i] = ["id", "region", "term_offering_class", "term_length", "payment_option", "operating_system", "tenancy"]
list_of_input = [
["0", "ap-south-1", "standard", "1yr", "No Upfront", "Linux", "Shared"],
["1", "us-east-1", "convertible", "1yr", "No Upfront", "Linux", "Shared"],
]
# Get prices for all inputs in parallel
with concurrent.futures.ThreadPoolExecutor() as executor:
for list_of_arguments in list_of_input:
executor.submit(AWSService.get_and_save_ec2_prices, *list_of_arguments)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment