Created
August 3, 2016 07:20
-
-
Save osvadimos/2954ce4c0f7fc249594c999822e639f2 to your computer and use it in GitHub Desktop.
Pipeline object creator
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.amazonaws.services.datapipeline.DataPipelineClient; | |
import com.amazonaws.services.datapipeline.model.*; | |
import com.google.common.collect.Lists; | |
import java.util.List; | |
import java.util.Map; | |
public class DDBExportPipelineCreator { | |
public static PutPipelineDefinitionResult putPipelineDefinition( | |
final DataPipelineClient dataPipelineClient, | |
final String pipelineId, | |
final Map<String, | |
String> params) { | |
List<PipelineObject> pipelineObjectList = getPipelineObjects(); | |
List<ParameterValue> parameterValues = getParameterValues(params); | |
PutPipelineDefinitionRequest putPipelineDefinition = new PutPipelineDefinitionRequest() | |
.withPipelineId(pipelineId) | |
.withParameterValues(parameterValues) | |
.withPipelineObjects(pipelineObjectList); | |
PutPipelineDefinitionResult putPipelineResult = dataPipelineClient.putPipelineDefinition(putPipelineDefinition); | |
if (putPipelineResult.isErrored()) { | |
throw new RuntimeException("Error in pipeline definition."); | |
} | |
return putPipelineResult; | |
} | |
private static List<PipelineObject> getPipelineObjects() { | |
PipelineObject defaultObject = DDBExportPipelineObjectCreator.getDefault(); | |
PipelineObject ddbSourceTable = DDBExportPipelineObjectCreator.getDDBSourceTable(); | |
PipelineObject s3BackupLocation = DDBExportPipelineObjectCreator.getS3BackupLocation(); | |
PipelineObject emrCluster = DDBExportPipelineObjectCreator.getEMRCluster(); | |
PipelineObject emrActivity = DDBExportPipelineObjectCreator.getEMRActivity(); | |
PipelineObject snsSuccess = DDBExportPipelineObjectCreator.getSNSSuccessActivity(); | |
PipelineObject snsFail = DDBExportPipelineObjectCreator.getSNSFailActivity(); | |
return Lists.newArrayList( | |
defaultObject, | |
ddbSourceTable, | |
s3BackupLocation, | |
emrCluster, | |
emrActivity, | |
snsSuccess, | |
snsFail); | |
} | |
private static List<ParameterValue> getParameterValues(final Map<String, String> params) { | |
ParameterValue ddbRegion = new ParameterValue() | |
.withId("myDDBRegion") | |
.withStringValue(params.get("region")); | |
ParameterValue pipeDDBTableName = new ParameterValue() | |
.withId("myDDBTableName") | |
.withStringValue(params.get("myDDBTableName")); | |
ParameterValue pipeDDBReadThroughputRatio = new ParameterValue() | |
.withId("myDDBReadThroughputRatio") | |
.withStringValue("0.25"); | |
ParameterValue pipeOutputS3Location = new ParameterValue() | |
.withId("myOutputS3Location") | |
.withStringValue(params.get("myOutputS3Location")); | |
ParameterValue pipeLogsS3Location = new ParameterValue() | |
.withId("myLogsS3Location") | |
.withStringValue(params.get("myLogsS3Location")); | |
ParameterValue myResizeClusterBeforeRunning = new ParameterValue() | |
.withId("myResizeClusterBeforeRunning") | |
.withStringValue("true"); | |
ParameterValue mySuccessARN = new ParameterValue() | |
.withId("myTopicSuccess") | |
.withStringValue(params.get("myTopicSuccess")); | |
ParameterValue myFailARN = new ParameterValue() | |
.withId("myTopicFail") | |
.withStringValue(params.get("myTopicFail")); | |
return Lists.newArrayList( | |
ddbRegion, | |
pipeDDBTableName, | |
pipeDDBReadThroughputRatio, | |
pipeOutputS3Location, | |
myResizeClusterBeforeRunning, | |
pipeLogsS3Location, | |
mySuccessARN, | |
myFailARN); | |
} | |
public static ActivatePipelineResult activatePipeline(final DataPipelineClient dataPipelineClient, final String pipelineId) { | |
ActivatePipelineRequest activatePipelineRequest = new ActivatePipelineRequest() | |
.withPipelineId(pipelineId); | |
ActivatePipelineResult result = dataPipelineClient.activatePipeline(activatePipelineRequest); | |
return result; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.amazonaws.services.datapipeline.model.Field; | |
import com.amazonaws.services.datapipeline.model.PipelineObject; | |
import com.google.common.collect.Lists; | |
import java.util.List; | |
public class DDBExportPipelineObjectCreator { | |
public static PipelineObject getDefault() { | |
String name = "Default"; | |
String id = "Default"; | |
Field type = new Field() | |
.withKey("scheduleType") | |
.withStringValue("ondemand"); | |
Field role = new Field() | |
.withKey("role") | |
.withStringValue("DataPipelineDefaultRole"); | |
Field resourceRole = new Field() | |
.withKey("resourceRole") | |
.withStringValue("DataPipelineDefaultResourceRole"); | |
Field pipelineLogURI = new Field() | |
.withKey("pipelineLogUri") | |
.withStringValue("#{myLogsS3Location}"); | |
List<Field> fieldsList = Lists.newArrayList( | |
type, | |
role, | |
resourceRole, | |
pipelineLogURI); | |
return new PipelineObject() | |
.withName(name) | |
.withId(id) | |
.withFields(fieldsList); | |
} | |
public static PipelineObject getSNSSuccessActivity() { | |
String name = "SuccessNotify"; | |
String id = "SuccessNotify"; | |
Field type = new Field() | |
.withKey("type") | |
.withStringValue("SnsAlarm"); | |
Field topicArn = new Field() | |
.withKey("topicArn") | |
.withStringValue("#{myTopicSuccess}"); | |
Field role = new Field() | |
.withKey("role") | |
.withStringValue("DataPipelineDefaultRole"); | |
Field subject = new Field() | |
.withKey("subject") | |
.withStringValue("COPY SUCCESS: #{node.@scheduledStartTime}"); | |
Field message = new Field() | |
.withKey("message") | |
.withStringValue("#{myDDBTableName}"); | |
List<Field> fieldsList = Lists.newArrayList(type, | |
topicArn, | |
role, | |
subject, | |
message); | |
return new PipelineObject() | |
.withName(name) | |
.withId(id) | |
.withFields(fieldsList); | |
} | |
public static PipelineObject getSNSFailActivity() { | |
String name = "FailureNotify"; | |
String id = "FailureNotify"; | |
Field type = new Field() | |
.withKey("type") | |
.withStringValue("SnsAlarm"); | |
Field topicArn = new Field() | |
.withKey("topicArn") | |
.withStringValue("#{myTopicFail}"); | |
Field role = new Field() | |
.withKey("role") | |
.withStringValue("DataPipelineDefaultRole"); | |
Field subject = new Field() | |
.withKey("subject") | |
.withStringValue("FAIL: #{node.@scheduledStartTime}"); | |
Field message = new Field() | |
.withKey("message") | |
.withStringValue("#{myDDBTableName}"); | |
List<Field> fieldsList = Lists.newArrayList(type, | |
role, | |
topicArn, | |
subject, | |
message); | |
return new PipelineObject() | |
.withName(name) | |
.withId(id) | |
.withFields(fieldsList); | |
} | |
public static PipelineObject getDDBSourceTable() { | |
String name = "DDBSourceTable"; | |
String id = "DDBSourceTable"; | |
Field type = new Field() | |
.withKey("type") | |
.withStringValue("DynamoDBDataNode"); | |
Field tableName = new Field() | |
.withKey("tableName") | |
.withStringValue("#{myDDBTableName}"); | |
Field readThroughputPercent = new Field().withKey("readThroughputPercent") | |
.withStringValue("#{myDDBReadThroughputRatio}"); | |
List<Field> fieldsList = Lists.newArrayList(tableName, type, readThroughputPercent); | |
return new PipelineObject() | |
.withName(name) | |
.withId(id) | |
.withFields(fieldsList); | |
} | |
public static PipelineObject getS3BackupLocation() { | |
String name = "S3BackupLocation"; | |
String id = "S3BackupLocation"; | |
Field type = new Field() | |
.withKey("type") | |
.withStringValue("S3DataNode"); | |
Field directoryPath = new Field() | |
.withKey("directoryPath") | |
.withStringValue("#{myOutputS3Location}#{format(@scheduledStartTime, 'YYYY-MM-dd-HH-mm-ss')}"); | |
Field onFail = new Field() | |
.withKey("onFail") | |
.withRefValue("FailureNotify"); | |
Field onSuccess = new Field() | |
.withKey("onSuccess") | |
.withRefValue("SuccessNotify"); | |
List<Field> fieldsList = Lists.newArrayList(type, | |
directoryPath, | |
onFail, | |
onSuccess); | |
return new PipelineObject() | |
.withName(name) | |
.withId(id) | |
.withFields(fieldsList); | |
} | |
public static PipelineObject getEMRCluster() { | |
String name = "EmrClusterForBackup"; | |
String id = "EmrClusterForBackup"; | |
Field type = new Field() | |
.withKey("type") | |
.withStringValue("EmrCluster"); | |
Field amiVersion = new Field() | |
.withKey("amiVersion") | |
.withStringValue("3.10.0"); | |
Field masterInstanceType = new Field() | |
.withKey("masterInstanceType") | |
.withStringValue("m3.xlarge"); | |
Field coreInstanceType = new Field() | |
.withKey("coreInstanceType") | |
.withStringValue("m3.xlarge"); | |
Field coreInstanceCount = new Field() | |
.withKey("coreInstanceCount") | |
.withStringValue("1"); | |
Field region = new Field() | |
.withKey("region") | |
.withStringValue("#{myDDBRegion}"); | |
Field terminateAfter = new Field() | |
.withKey("terminateAfter") | |
.withStringValue("12 hours"); | |
Field bootstrapAction = new Field() | |
.withKey("bootstrapAction") | |
.withStringValue("s3://elasticmapreduce" + | |
"/bootstrap-actions/configure-hadoop, --yarn-key-value,yarn.nodemanager.resource.memory-mb=11520," + | |
"--yarn-key-value,yarn.scheduler.maximum-allocation-mb=11520," + | |
"--yarn-key-value,yarn.scheduler.minimum-allocation-mb=1440," + | |
"--yarn-key-value,yarn.app.mapreduce.am.resource.mb=2880," + | |
"--mapred-key-value,mapreduce.map.memory.mb=5760," + | |
"--mapred-key-value,mapreduce.map.java.opts=-Xmx4608M," + | |
"--mapred-key-value,mapreduce.reduce.memory.mb=2880," + | |
"--mapred-key-value,mapreduce.reduce.java.opts=-Xmx2304m," + | |
"--mapred-key-value,mapreduce.map.speculative=false"); | |
List<Field> fieldsList = Lists.newArrayList(type, | |
amiVersion, | |
masterInstanceType, | |
coreInstanceCount, | |
coreInstanceType, | |
region, | |
terminateAfter, | |
bootstrapAction); | |
return new PipelineObject() | |
.withName(name) | |
.withId(id) | |
.withFields(fieldsList); | |
} | |
public static PipelineObject getEMRActivity() { | |
String name = "TableBackupActivity"; | |
String id = "TableBackupActivity"; | |
Field type = new Field() | |
.withKey("type") | |
.withStringValue("EmrActivity"); | |
Field input = new Field() | |
.withKey("input") | |
.withRefValue("DDBSourceTable"); | |
Field output = new Field() | |
.withKey("output") | |
.withRefValue("S3BackupLocation"); | |
Field runsOn = new Field() | |
.withKey("runsOn") | |
.withRefValue("EmrClusterForBackup"); | |
Field resizeClusterBeforeRunning = new Field() | |
.withKey("resizeClusterBeforeRunning") | |
.withStringValue("#{myResizeClusterBeforeRunning}"); | |
Field maximumRetries = new Field() | |
.withKey("maximumRetries") | |
.withStringValue("2"); | |
Field step = new Field().withKey("step") | |
.withStringValue("s3://dynamodb-emr-#{myDDBRegion}/emr-ddb-storage-" + | |
"handler/2.1.0/emr-ddb-2.1.0.jar,org.apache.hadoop.dynamodb.tools.DynamoDbExport," + | |
"#{output.directoryPath},#{input.tableName},#{input.readThroughputPercent}"); | |
List<Field> fieldsList = Lists.newArrayList(type, | |
input, | |
output, | |
runsOn, | |
resizeClusterBeforeRunning, | |
maximumRetries, | |
step); | |
return new PipelineObject() | |
.withName(name) | |
.withId(id) | |
.withFields(fieldsList); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment