Last active
October 26, 2021 11:54
-
-
Save mnanchev/30e5fc2b6d6e9ab4f3d45cc40aafbb19 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// We create the dataset which reads the parquet files in the 2021 bucket prefix | |
const cfnDataset = new CfnDataset(this, 'Dataset', { | |
name: 'cost-and-usage-report-dataset', | |
input: { | |
s3InputDefinition: { | |
bucket: `cost-and-usage-report-dataset-2021-12-12`, | |
key: `2021/<[^/]+>.parquet`, | |
}, | |
}, | |
format: 'PARQUET', | |
}); | |
// The recipe groups the costs by service and account is and sums them up | |
// As next action it changes the date format to the required by amazon forecast by creating a new colum for it | |
// As last step it removes the redundant information by removing the date column, which was not transformed | |
const recipe = new CfnRecipe(this, 'dataBrewRecipe', { | |
name: 'cost-and-usage-report-recipe', | |
steps: [ | |
{ | |
action: { | |
operation: 'GROUP_BY', | |
parameters: { | |
groupByAggFunctionOptions: | |
'[{"sourceColumnName":"line_item_unblended_cost","targetColumnName":"line_item_unblended_cost_sum","targetColumnDataType":"double","functionName":"SUM"}]', | |
sourceColumns: '["line_item_usage_start_date","product_product_name","line_item_usage_account_id"]', | |
useNewDataFrame: 'true', | |
}, | |
}, | |
}, | |
{ | |
action: { | |
operation: 'DATE_FORMAT', | |
parameters: { | |
dateTimeFormat: 'yyyy-mm-dd', | |
functionStepType: 'DATE_FORMAT', | |
sourceColumn: 'line_item_usage_start_date', | |
targetColumn: 'line_item_usage_start_date_DATEFORMAT', | |
}, | |
}, | |
}, | |
{ | |
action: { | |
operation: 'DELETE', | |
parameters: { | |
sourceColumns: '["line_item_usage_start_date"]', | |
}, | |
}, | |
}, | |
], | |
}); | |
// The recipe depends on the cost and usage report presence in S3 | |
recipe.node.addDependency(prefixCreation); | |
const cfnProject = new CfnProject(this, 'dataBrewProject', CfnProjectProps = { | |
datasetName: 'cost-and-usage-report-dataset', | |
name: `cost-and-usage-report-forecasting-project`, | |
recipeName: `cost-and-usage-report-recipe`, | |
roleArn: `arn:aws:iam::559706524079:role/service-role/dataBrewServiceRole`, | |
}; | |
cfnProject.addDependsOn(recipe); | |
cfnProject.addDependsOn(cfnDataset); | |
// Ater the recipe, project and dataset are created, we will need to publish the recipe, | |
// using custom resource, which implements onUpdate and onDelete lifecycles | |
const publishRecipe = new AwsCustomResource(this, `publishRecipe`, { | |
onUpdate: { | |
service: 'DataBrew', | |
action: 'publishRecipe', | |
parameters: { | |
Name: recipe.name, | |
}, | |
physicalResourceId: { id: `publishRecipe` }, | |
}, | |
onDelete: { | |
service: 'DataBrew', | |
action: 'deleteRecipeVersion', | |
parameters: { | |
Name: `${recipe.name}` /* required */, | |
RecipeVersion: '1.0', | |
}, | |
}, | |
policy: AwsCustomResourcePolicy.fromSdkCalls({ resources: AwsCustomResourcePolicy.ANY_RESOURCE }), | |
}); | |
publishRecipe.node.addDependency(recipe); | |
// Last step is to create a scheduled job, which executes the project (recipe on the dataset) | |
const cfnJob = new CfnJob(this, 'dataBrewRecipeJob', { | |
type: 'RECIPE', | |
projectName: 'cost-and-usage-report-forecasting-project' | |
name: `cost-and-usage-report-job`, | |
outputs: [ | |
{ | |
//compressionFormat: "GZIP", | |
format: 'CSV', | |
location: { | |
bucket: outputBucket.bucketName, | |
key: `cost-and-usage-report-output`, | |
}, | |
overwrite: true, | |
}, | |
], | |
roleArn: dataBrewRole.roleArn, | |
}); | |
cfnJob.addDependsOn(cfnProject); | |
//Job schedule | |
new CfnSchedule(this, 'dataBrewJobSchedule', { | |
cronExpression: 'Cron(0 23 * * ? *)', | |
name: `cost-and-usage-report-job-schedule`, | |
jobNames: [`cost-and-usage-report-job`], | |
}).addDependsOn(cfnJob); | |
// start the databrew job to run once before the schedule | |
const startDataBrewJob = new AwsCustomResource(this, `startDataBrewJob`, { | |
onUpdate: { | |
service: 'DataBrew', | |
action: 'startJobRun', | |
parameters: { | |
Name: `cost-and-usage-report-job`, | |
}, | |
physicalResourceId: { id: `startDataBrewJob` }, | |
}, | |
policy: AwsCustomResourcePolicy.fromSdkCalls({ resources: AwsCustomResourcePolicy.ANY_RESOURCE }), | |
}); | |
startDataBrewJob.node.addDependency(cfnJob); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment