Last active
June 21, 2021 01:28
-
-
Save sixy6e/61fec088ff7d4624a8ec584f85a0c35c to your computer and use it in GitHub Desktop.
toy script for aws summary
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
A toy script to replicate the AWS costings summary that is done manually within Excel |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Replicate the summarising functionality that is currently done | |
manually via Excel. | |
""" | |
import calendar | |
import datetime | |
from pathlib import Path | |
from typing import Tuple | |
import numpy | |
from sklearn.linear_model import LinearRegression | |
import pandas | |
import click | |
import structlog | |
_LOG = structlog.get_logger() | |
def gather(pathname: Path, extension: str) -> pandas.DataFrame: | |
""" | |
Gather the downloaded csv's and combine them into a singular pandas.DataFrame. | |
Assumption's: | |
* There are no other files in the directory that would match the | |
extension via glob | |
* A particular column name to be used as the datetime index | |
* A particular row is to be skipped (generated by the AWS export) | |
""" | |
filenames = pathname.glob(f"*.{extension}") | |
data = {} | |
for fname in filenames: | |
data[fname.stem] = pandas.read_csv( | |
fname, skiprows=[1], header=0, index_col="Service", parse_dates=["Service"] | |
) | |
data[fname.stem].index.rename("Period", inplace=True) | |
concatenated = pandas.concat( | |
data, | |
names=["Account"], | |
) # use the index (Service), and include all columns (outer join) | |
return concatenated | |
def summarise(dataframe: pandas.DataFrame) -> pandas.DataFrame: | |
""" | |
Summarise each of the columns by index `Period`. | |
""" | |
summary = pandas.pivot_table( | |
dataframe, index="Period", values=dataframe.columns, aggfunc=numpy.sum | |
) | |
return summary | |
def update_partial_month(dataframe: pandas.DataFrame) -> pandas.DataFrame: | |
""" | |
Data downloaded for the current month will only be a partial cost. | |
For example, 12th January will only be costs to date (depending on the hour | |
will only be a partial day). For a forecast based on monthly data, this | |
will result in an incorrect weighting when forecasting. | |
This process attempts to minimise the impact by calculating a daily average | |
and use it to blank fill for the remaining days in the month. | |
""" | |
copy = dataframe.copy() | |
today = datetime.datetime.today() # more than likely a partial day | |
yesterday = today - datetime.timedelta(days=1) | |
last_day = calendar.monthrange(today.year, today.month)[1] | |
end_of_month = datetime.datetime( | |
today.year, today.month, last_day, 23, 59 | |
) # last minute possible to get full day | |
days_togo = (end_of_month - yesterday).days | |
for name, group in dataframe.groupby("Account"): | |
row = group.iloc[-1] | |
group.iloc[-1] = row + row / today.day * days_togo | |
copy.loc[name] = group | |
return copy | |
def predict( | |
dataframe: pandas.DataFrame, predict: int = 6, back: int = 3 | |
) -> pandas.DataFrame: | |
result = {} | |
# fill null data; as this is cost related, set to 0 | |
filled = dataframe.fillna(0) | |
cols = dataframe.columns | |
for name, acc_group in filled.groupby("Account"): | |
idx = pandas.date_range( | |
acc_group.index.levels[1][-1], | |
periods=predict + 1, # months required for forward prediction | |
freq="M", | |
) | |
frame = pandas.DataFrame(index=idx[1:], columns=cols) # ignore 0 (current month) | |
rows = acc_group.shape[0] | |
independent = numpy.arange(rows).reshape( | |
rows, 1 | |
) # simple method for defining the x term | |
forward = numpy.arange(predict).reshape(predict, 1) + rows | |
# for some reason, regressing all cols as n_variates gave different results | |
# thus we are looping over cols | |
for col in cols: | |
regress = LinearRegression() | |
# regress.fit(independent[-back:], acc_group[[col]].iloc[-steps_back:-1]) | |
regress.fit(independent[-back:], acc_group[[col]].iloc[-back:]) | |
frame[col] = regress.predict(forward) | |
# recalculate the total cost column | |
frame["Total cost ($)"] = frame.drop("Total cost ($)", axis=1).sum(axis=1) | |
result[name] = pandas.concat([acc_group.loc[name], frame]) | |
concatenated = pandas.concat(result, names=["Account", "Period"]) | |
return concatenated | |
@click.command() | |
@click.option( | |
"--pathname", | |
type=click.Path(file_okay=False, readable=True), | |
help="The input pathname to the files to be summarised", | |
) | |
@click.option("--extension", help="File extension to glob") | |
@click.option( | |
"--outdir", type=click.Path(file_okay=False, writable=True), help="Output filename" | |
) | |
def main(pathname, extension, outdir): | |
outdir = Path(outdir) | |
if not outdir.exists(): | |
outdir.mkdir(parents=True) | |
_LOG.info("gathering and collating") | |
concatenated = gather(Path(pathname), extension) | |
out_fname = outdir.joinpath("concatenated.csv") | |
_LOG.info("writing concatenated", out_fname=str(out_fname)) | |
concatenated.to_csv(out_fname) | |
_LOG.info("updating partial day records") | |
updated = update_partial_month(concatenated) | |
out_fname = outdir.joinpath("partial_day_update.csv") | |
_LOG.info("writing partial day updated", out_fname=str(out_fname)) | |
updated.to_csv(out_fname) | |
_LOG.info("summarising") | |
summary = summarise(updated) | |
out_fname = outdir.joinpath("summary.csv") | |
_LOG.info("writing summary", out_fname=str(out_fname)) | |
summary.to_csv(out_fname) | |
_LOG.info("forward prediction") | |
predicted = predict(updated) | |
out_fname = outdir.joinpath("forward-prediction.csv") | |
_LOG.info("writing forward prediction", out_fname=str(out_fname)) | |
predicted.to_csv(out_fname) | |
out_fname = outdir.joinpath("aws-ausseabed-forward-cost-prediction.xlsx") | |
_LOG.info("writing sorted forward prediction", out_fname=str(out_fname)) | |
with pandas.ExcelWriter(out_fname) as src: | |
for name, group in predicted.groupby("Account"): | |
# sort the columns highest to lowest based on individual values | |
_LOG.info("sorting columns highest to lowest", sheet=name) | |
new_col_order = group.max().sort_values()[::-1].index.to_list() | |
group = group[new_col_order] | |
_LOG.info("writing table", sheet=name) | |
group.to_excel(src, name.split("_")[0]) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment