Skip to content

Instantly share code, notes, and snippets.

@allenmichael
Last active September 5, 2019 20:46
Show Gist options
  • Save allenmichael/75f9041765621e5f2ea46a01c625543c to your computer and use it in GitHub Desktop.
Save allenmichael/75f9041765621e5f2ea46a01c625543c to your computer and use it in GitHub Desktop.
A template for setting up a SparkFlows File Watcher using S3, Secrets Manager, SQS, and Lambda
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.sf</groupId>
<artifactId>SFWorkflowExecuteHandler</artifactId>
<packaging>jar</packaging>
<version>1.0</version>
<name>SFWorkflowExecuteHandler</name>
<dependencies>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-core</artifactId>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-lambda-java-events</artifactId>
<version>2.2.7</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-secretsmanager</artifactId>
<version>1.11.339</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.1</version>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
AWSTemplateFormatVersion: 2010-09-09
Parameters:
BucketWhereJarIsStored:
Description: Enter your bucket name where you've stored the JAR file for the Lambda function
Type: String
Default: sparkflows-deployments
BucketObjectNameJarFile:
Description: Enter the object name stored in your bucket if it's not named SFWorkflowExecuteHandler-1.0.jar
Type: String
Default: SFWorkflowExecuteHandler-1.0.jar
SparkFlowsToken:
Description: Enter your SparkFlows token to be stored in Secrets Manager.
Type: String
NoEcho: true
AllowedPattern: '.+'
ConstraintDescription: 'must be included.'
SparkFlowsUrl:
Description: Enter your SparkFlows API URL to be stored in the Lambda function environment variables.
Type: String
AllowedPattern: "^https?://[^\\s/$.?#].[^\\s]*$"
ConstraintDescription: "must be a valid URL."
WorkflowId:
Description: Enter your SparkFlows Workflow ID to be stored in the Lambda function environment variables.
Type: String
AllowedPattern: '.+'
ConstraintDescription: 'must be included.'
Metadata:
AWS::CloudFormation::Interface:
ParameterGroups:
-
Label:
default: "Lambda Function Code Storage Configuration"
Parameters:
- BucketWhereJarIsStored
- BucketObjectNameJarFile
-
Label:
default: "SparkFlows Configuration"
Parameters:
- SparkFlowsToken
- SparkFlowsUrl
- WorkflowId
ParameterLabels:
BucketWhereJarIsStored:
default: "Bucket Name"
BucketObjectNameJarFile:
default: "Object Name"
SparkFlowsToken:
default: "SparkFlows Access Token"
SparkFlowsUrl:
default: "SparkFlows API URL"
WorkflowId:
default: "SparkFlows Workflow ID"
Resources:
SFFileWatcherQueueF9695A19:
Type: AWS::SQS::Queue
SFFileWatcherQueuePolicy7A15DAB3:
Type: AWS::SQS::QueuePolicy
Properties:
PolicyDocument:
Statement:
- Action:
- sqs:SendMessage
- sqs:GetQueueAttributes
- sqs:GetQueueUrl
Condition:
ArnLike:
aws:SourceArn:
Fn::GetAtt:
- SFFileWatcherBucket0424C094
- Arn
Effect: Allow
Principal:
Service: s3.amazonaws.com
Resource:
Fn::GetAtt:
- SFFileWatcherQueueF9695A19
- Arn
Version: "2012-10-17"
Queues:
- Ref: SFFileWatcherQueueF9695A19
SFFileWatcherLambdaDLQQueueB9497625:
Type: AWS::SQS::Queue
SFFileWatcherBucket0424C094:
Type: AWS::S3::Bucket
Properties:
PublicAccessBlockConfiguration:
BlockPublicAcls: true
RestrictPublicBuckets: true
VersioningConfiguration:
Status: Enabled
UpdateReplacePolicy: Retain
DeletionPolicy: Retain
SFFileWatcherBucketNotifications934A0D16:
Type: Custom::S3BucketNotifications
Properties:
ServiceToken:
Fn::GetAtt:
- BucketNotificationsHandler050a0587b7544547bf325f094a3db8347ECC3691
- Arn
BucketName:
Ref: SFFileWatcherBucket0424C094
NotificationConfiguration:
QueueConfigurations:
- Events:
- s3:ObjectCreated:*
Filter:
Key:
FilterRules:
- Name: suffix
Value: _SUCCESS
- Name: prefix
Value: events
QueueArn:
Fn::GetAtt:
- SFFileWatcherQueueF9695A19
- Arn
DependsOn:
- SFFileWatcherQueuePolicy7A15DAB3
- SFFileWatcherQueueF9695A19
BucketNotificationsHandler050a0587b7544547bf325f094a3db834RoleB6FB88EC:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Statement:
- Action: sts:AssumeRole
Effect: Allow
Principal:
Service: lambda.amazonaws.com
Version: "2012-10-17"
ManagedPolicyArns:
- Fn::Join:
- ""
- - "arn:"
- Ref: AWS::Partition
- :iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
BucketNotificationsHandler050a0587b7544547bf325f094a3db834RoleDefaultPolicy2CF63D36:
Type: AWS::IAM::Policy
Properties:
PolicyDocument:
Statement:
- Action: s3:PutBucketNotification
Effect: Allow
Resource: "*"
Version: "2012-10-17"
PolicyName: BucketNotificationsHandler050a0587b7544547bf325f094a3db834RoleDefaultPolicy2CF63D36
Roles:
- Ref: BucketNotificationsHandler050a0587b7544547bf325f094a3db834RoleB6FB88EC
BucketNotificationsHandler050a0587b7544547bf325f094a3db8347ECC3691:
Type: AWS::Lambda::Function
Properties:
Description: AWS CloudFormation handler for "Custom::S3BucketNotifications" resources (@aws-cdk/aws-s3)
Code:
ZipFile: >-
exports.handler = (event, context) => {
const s3 = new (require('aws-sdk').S3)();
const https = require("https");
const url = require("url");
log(JSON.stringify(event, undefined, 2));
const props = event.ResourceProperties;
if (event.RequestType === 'Delete') {
props.NotificationConfiguration = {}; // this is how you clean out notifications
}
const req = {
Bucket: props.BucketName,
NotificationConfiguration: props.NotificationConfiguration
};
return s3.putBucketNotificationConfiguration(req, (err, data) => {
log({ err, data });
if (err) {
return submitResponse("FAILED", err.message + `\nMore information in CloudWatch Log Stream: ${context.logStreamName}`);
}
else {
return submitResponse("SUCCESS");
}
});
function log(obj) {
console.error(event.RequestId, event.StackId, event.LogicalResourceId, obj);
}
// tslint:disable-next-line:max-line-length
// adapted from https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-lambda-function-code.html#cfn-lambda-function-code-cfnresponsemodule
// to allow sending an error messge as a reason.
function submitResponse(responseStatus, reason) {
const responseBody = JSON.stringify({
Status: responseStatus,
Reason: reason || "See the details in CloudWatch Log Stream: " + context.logStreamName,
PhysicalResourceId: context.logStreamName,
StackId: event.StackId,
RequestId: event.RequestId,
LogicalResourceId: event.LogicalResourceId,
NoEcho: false,
});
log({ responseBody });
const parsedUrl = url.parse(event.ResponseURL);
const options = {
hostname: parsedUrl.hostname,
port: 443,
path: parsedUrl.path,
method: "PUT",
headers: {
"content-type": "",
"content-length": responseBody.length
}
};
const request = https.request(options, (r) => {
log({ statusCode: r.statusCode, statusMessage: r.statusMessage });
context.done();
});
request.on("error", (error) => {
log({ sendError: error });
context.done();
});
request.write(responseBody);
request.end();
}
};
Handler: index.handler
Role:
Fn::GetAtt:
- BucketNotificationsHandler050a0587b7544547bf325f094a3db834RoleB6FB88EC
- Arn
Runtime: nodejs8.10
Timeout: 300
SparkFlowsTokenCBF650E6:
Type: AWS::SecretsManager::Secret
Properties:
SecretString: !Ref SparkFlowsToken
SFFileWatcherLambdaServiceRole4A4AF0A9:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Statement:
- Action: sts:AssumeRole
Effect: Allow
Principal:
Service: lambda.amazonaws.com
Version: "2012-10-17"
ManagedPolicyArns:
- Fn::Join:
- ""
- - "arn:"
- Ref: AWS::Partition
- :iam::aws:policy/service-role/AWSLambdaBasicExecutionRole
SFFileWatcherLambdaServiceRoleDefaultPolicy1767BDCA:
Type: AWS::IAM::Policy
Properties:
PolicyDocument:
Statement:
- Action: sqs:SendMessage
Effect: Allow
Resource:
Fn::GetAtt:
- SFFileWatcherLambdaDLQQueueB9497625
- Arn
- Action:
- xray:PutTraceSegments
- xray:PutTelemetryRecords
Effect: Allow
Resource: "*"
- Action:
- sqs:ReceiveMessage
- sqs:ChangeMessageVisibility
- sqs:GetQueueUrl
- sqs:DeleteMessage
- sqs:GetQueueAttributes
Effect: Allow
Resource:
Fn::GetAtt:
- SFFileWatcherQueueF9695A19
- Arn
- Action: secretsmanager:GetSecretValue
Effect: Allow
Resource:
Ref: SparkFlowsTokenCBF650E6
Version: "2012-10-17"
PolicyName: SFFileWatcherLambdaServiceRoleDefaultPolicy1767BDCA
Roles:
- Ref: SFFileWatcherLambdaServiceRole4A4AF0A9
SFFileWatcherLambda2CC58CA5:
Type: AWS::Lambda::Function
Properties:
Code:
S3Bucket:
Ref: BucketWhereJarIsStored
S3Key:
Ref: BucketObjectNameJarFile
Handler: com.sf.handler.WorkflowExecuteHandler::handleRequest
Role:
Fn::GetAtt:
- SFFileWatcherLambdaServiceRole4A4AF0A9
- Arn
Runtime: java8
DeadLetterConfig:
TargetArn:
Fn::GetAtt:
- SFFileWatcherLambdaDLQQueueB9497625
- Arn
Environment:
Variables:
SPARKFLOWS_TOKEN_SECRET_ID:
Ref: SparkFlowsTokenCBF650E6
SPARKFLOWS_URL: !Ref SparkFlowsUrl
WORKFLOW_ID: !Ref WorkflowId
MemorySize: 3008
Timeout: 30
TracingConfig:
Mode: Active
DependsOn:
- SFFileWatcherLambdaServiceRoleDefaultPolicy1767BDCA
- SFFileWatcherLambdaServiceRole4A4AF0A9
SFFileWatcherLambdaSqsEventSourceHelloWorldStackSFFileWatcherQueueA80A99B96BE15CC1:
Type: AWS::Lambda::EventSourceMapping
Properties:
EventSourceArn:
Fn::GetAtt:
- SFFileWatcherQueueF9695A19
- Arn
FunctionName:
Ref: SFFileWatcherLambda2CC58CA5
package com.sf.handler;
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
import com.amazonaws.services.lambda.runtime.events.SQSEvent.SQSMessage;
import com.amazonaws.services.secretsmanager.*;
import com.amazonaws.services.secretsmanager.model.*;
public class WorkflowExecuteHandler implements RequestHandler<SQSEvent, Void>{
@Override
public Void handleRequest(SQSEvent event, Context context)
{
final AWSSecretsManager sm = AWSSecretsManagerClientBuilder.defaultClient();
GetSecretValueRequest req = new GetSecretValueRequest().withSecretId(System.getenv("SPARKFLOWS_TOKEN_SECRET_ID"));
String secret = sm.getSecretValue(req).getSecretString();
System.out.println("Retrieved secret...");
System.out.println(secret);
for(SQSMessage msg : event.getRecords()){
System.out.println(new String(msg.getBody()));
}
return null;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment