Skip to content

Instantly share code, notes, and snippets.

@memorycraft
Last active December 30, 2015 05:38
Show Gist options
  • Select an option

  • Save memorycraft/7783449 to your computer and use it in GitHub Desktop.

Select an option

Save memorycraft/7783449 to your computer and use it in GitHub Desktop.
import java.io.InputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.lang.RandomStringUtils;
import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
public class MemorycraftKinesisLoggingProcessor implements IRecordProcessor {
private static final Log LOG = LogFactory.getLog(MemorycraftKinesisLoggingProcessor.class);
private String kinesisShardId;
private AmazonS3 s3;
private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
private long nextCheckpointTimeInMillis;
private final CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
/**
* Constructor.
*/
public MemorycraftKinesisLoggingProcessor() {
super();
}
/**
* {@inheritDoc}
*/
@Override
public void initialize(String shardId) {
this.kinesisShardId = shardId;
this.s3 = new AmazonS3Client(new ClasspathPropertiesFileCredentialsProvider());
}
/**
* {@inheritDoc}
*/
@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
process(records);
if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
checkpoint(checkpointer);
nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
}
}
/** Process records performing retries as needed. Skip "poison pill" records.
* @param records
*/
private void process(List<Record> records) {
for (Record record : records) {
InputStream in = null;
try{
String seqno = record.getSequenceNumber();
LOG.info("process record ["+seqno+"]");
String json = decoder.decode(record.getData()).toString();
in = new ByteArrayInputStream(json.getBytes("UTF-8"));
ObjectMetadata metadata = new ObjectMetadata();
metadata.setContentLength(json.getBytes("UTF-8").length);
String filename = RandomStringUtils.randomAlphanumeric(32);
PutObjectResult putObjectResult = s3.putObject("memorycraft-kinesis", "log/"+filename, in, metadata);
LOG.info(" saved [json]=" + json);
} catch (CharacterCodingException e) {
//do somothing
} catch (Throwable t) {
//do somothing
} finally {
try{
in.close();
}
catch(IOException e){
LOG.error("close error");
}
}
}
}
/**
* {@inheritDoc}
*/
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
if (reason == ShutdownReason.TERMINATE) {
checkpoint(checkpointer);
}
}
/** Checkpoint with retries.
* @param checkpointer
*/
private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
try{
LOG.info("CHECKPOINT !!!!!");
checkpointer.checkpoint();
} catch (ShutdownException se) {
//do something
} catch (ThrottlingException e) {
//do something
} catch (InvalidStateException e) {
//do something
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment