Last active
December 30, 2015 05:38
-
-
Save memorycraft/7783459 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import java.io.FileInputStream; | |
| import java.io.IOException; | |
| import java.net.InetAddress; | |
| import java.util.Properties; | |
| import java.util.UUID; | |
| import org.apache.commons.logging.Log; | |
| import org.apache.commons.logging.LogFactory; | |
| import com.amazonaws.AmazonClientException; | |
| import com.amazonaws.auth.AWSCredentialsProvider; | |
| import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider; | |
| import com.amazonaws.auth.InstanceProfileCredentialsProvider; | |
| import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; | |
| import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; | |
| import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; | |
| import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; | |
| /** | |
| * Sample Kinesis Application. | |
| */ | |
| public final class MemorycraftKinesisDistanceConsumer { | |
| private static final String DEFAULT_APP_NAME = "MemorycraftKinesisDistanceConsumer"; | |
| private static final String DEFAULT_STREAM_NAME = "memostream"; | |
| private static final String DEFAULT_KINESIS_ENDPOINT = "https://kinesis.us-east-1.amazonaws.com"; | |
| // Initial position in the stream when the application starts up for the first time. | |
| // Position can be one of LATEST (most recent data) or TRIM_HORIZON (oldest available data) | |
| private static final InitialPositionInStream DEFAULT_INITIAL_POSITION = InitialPositionInStream.TRIM_HORIZON; | |
| private static String applicationName = DEFAULT_APP_NAME; | |
| private static String streamName = DEFAULT_STREAM_NAME; | |
| private static String kinesisEndpoint = DEFAULT_KINESIS_ENDPOINT; | |
| private static InitialPositionInStream initialPositionInStream = DEFAULT_INITIAL_POSITION; | |
| private static KinesisClientLibConfiguration kinesisClientLibConfiguration; | |
| private static final Log LOG = LogFactory.getLog(MemorycraftKinesisDistanceConsumer.class); | |
| /** | |
| * | |
| */ | |
| private MemorycraftKinesisDistanceConsumer() { | |
| super(); | |
| } | |
| /** | |
| * @param args Property file with config overrides (e.g. application name, stream name) | |
| * @throws IOException Thrown if we can't read properties from the specified properties file | |
| */ | |
| public static void main(String[] args) throws IOException { | |
| String propertiesFile = null; | |
| if (args.length > 1) { | |
| System.err.println("Usage: java " + MemorycraftKinesisDistanceConsumer.class.getName() + " <propertiesFile>"); | |
| System.exit(1); | |
| } else if (args.length == 1) { | |
| propertiesFile = args[0]; | |
| } | |
| configure(propertiesFile); | |
| System.out.println("Starting " + applicationName); | |
| LOG.info("Running " + applicationName + " to process stream " + streamName); | |
| IRecordProcessorFactory recordProcessorFactory = new MemorycraftKinesisDistanceProcessorFactory(); | |
| Worker worker = new Worker(recordProcessorFactory, kinesisClientLibConfiguration); | |
| int exitCode = 0; | |
| try { | |
| worker.run(); | |
| } catch (Throwable t) { | |
| LOG.error("Caught throwable while processing data.", t); | |
| exitCode = 1; | |
| } | |
| System.exit(exitCode); | |
| } | |
| private static void configure(String propertiesFile) throws IOException { | |
| if (propertiesFile != null) { | |
| loadProperties(propertiesFile); | |
| } | |
| // ensure the JVM will refresh the cached IP values of AWS resources (e.g. service endpoints). | |
| java.security.Security.setProperty("networkaddress.cache.ttl" , "60"); | |
| String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); | |
| LOG.info("Using workerId: " + workerId); | |
| // Get credentials from IMDS. If unsuccessful, get them from the classpath. | |
| AWSCredentialsProvider credentialsProvider = null; | |
| try { | |
| credentialsProvider = new InstanceProfileCredentialsProvider(); | |
| // Verify we can fetch credentials from the provider | |
| credentialsProvider.getCredentials(); | |
| LOG.info("Obtained credentials from the IMDS."); | |
| } catch (AmazonClientException e) { | |
| LOG.info("Unable to obtain credentials from the IMDS, trying classpath properties", e); | |
| credentialsProvider = new ClasspathPropertiesFileCredentialsProvider(); | |
| // Verify we can fetch credentials from the provider | |
| credentialsProvider.getCredentials(); | |
| LOG.info("Obtained credentials from the properties file."); | |
| } | |
| LOG.info("Using credentials with access key id: " + credentialsProvider.getCredentials().getAWSAccessKeyId()); | |
| kinesisClientLibConfiguration = new KinesisClientLibConfiguration(applicationName, streamName, kinesisEndpoint, | |
| initialPositionInStream, credentialsProvider, workerId); | |
| } | |
| /** | |
| * @param propertiesFile | |
| * @throws IOException Thrown when we run into issues reading properties | |
| */ | |
| private static void loadProperties(String propertiesFile) throws IOException { | |
| FileInputStream inputStream = new FileInputStream(propertiesFile); | |
| Properties properties = new Properties(); | |
| try { | |
| properties.load(inputStream); | |
| } finally { | |
| inputStream.close(); | |
| } | |
| String appNameOverride = properties.getProperty(ConfigKeys.APPLICATION_NAME_KEY); | |
| if (appNameOverride != null) { | |
| applicationName = appNameOverride; | |
| } | |
| LOG.info("Using application name " + applicationName); | |
| String streamNameOverride = properties.getProperty(ConfigKeys.STREAM_NAME_KEY); | |
| if (streamNameOverride != null) { | |
| streamName = streamNameOverride; | |
| } | |
| LOG.info("Using stream name " + streamName); | |
| String kinesisEndpointOverride = properties.getProperty(ConfigKeys.KINESIS_ENDPOINT_KEY); | |
| if (kinesisEndpointOverride != null) { | |
| kinesisEndpoint = kinesisEndpointOverride; | |
| } | |
| String initialPositionOverride = properties.getProperty(ConfigKeys.INITIAL_POSITION_IN_STREAM_KEY); | |
| if (initialPositionOverride != null) { | |
| initialPositionInStream = InitialPositionInStream.valueOf(initialPositionOverride); | |
| } | |
| LOG.info("Using initial position " + initialPositionInStream.toString() + " (if a checkpoint is not found)."); | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment