Skip to content

Instantly share code, notes, and snippets.

@memorycraft
Last active December 30, 2015 05:38
Show Gist options
  • Save memorycraft/7783481 to your computer and use it in GitHub Desktop.
Save memorycraft/7783481 to your computer and use it in GitHub Desktop.
import java.io.InputStream;
import java.io.ByteArrayInputStream;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CharacterCodingException;
import java.util.List;
import java.util.Map;
import java.util.HashMap;
import java.util.Iterator;
import java.lang.reflect.Field;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import net.arnx.jsonic.JSON;
import com.amazonaws.services.dynamodb.model.Key;
import com.amazonaws.services.dynamodb.model.AttributeAction;
import com.amazonaws.services.dynamodb.model.AttributeValueUpdate;
import com.amazonaws.services.dynamodb.model.AttributeValue;
import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException;
import com.amazonaws.services.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.dynamodb.AmazonDynamoDBClient;
import com.amazonaws.services.dynamodb.model.PutItemRequest;
import com.amazonaws.services.dynamodb.model.PutItemResult;
import com.amazonaws.services.dynamodb.model.UpdateItemRequest;
import com.amazonaws.services.dynamodb.model.UpdateItemResult;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason;
import com.amazonaws.services.kinesis.model.Record;
public class MemorycraftKinesisDistanceProcessor implements IRecordProcessor {
private static final Log LOG = LogFactory.getLog(MemorycraftKinesisDistanceProcessor.class);
private String kinesisShardId;
private AmazonDynamoDB dynamodb;
private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L;
private long nextCheckpointTimeInMillis;
private final CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
/**
* Constructor.
*/
public MemorycraftKinesisDistanceProcessor() {
super();
}
/**
* {@inheritDoc}
*/
@Override
public void initialize(String shardId) {
this.kinesisShardId = shardId;
dynamodb = new AmazonDynamoDBClient(new ClasspathPropertiesFileCredentialsProvider());
}
/**
* {@inheritDoc}
*/
@Override
public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) {
process(records);
if (System.currentTimeMillis() > nextCheckpointTimeInMillis) {
checkpoint(checkpointer);
nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS;
}
}
/** Process records performing retries as needed. Skip "poison pill" records.
* @param records
*/
private void process(List<Record> records) {
for (Record record : records) {
try{
String seqno = record.getSequenceNumber();
LOG.info("process record ["+seqno+"]");
String data = decoder.decode(record.getData()).toString();
Hoge hoge = JSON.decode(data, Hoge.class);
Key key = new Key(new AttributeValue(hoge.user_id));
String value = "" + Math.sqrt(Math.pow(Double.parseDouble(hoge.x), 2) + Math.pow(Double.parseDouble(hoge.y), 2));
Map<String, AttributeValueUpdate> item = new HashMap<String, AttributeValueUpdate>();
item.put("distance", new AttributeValueUpdate().withAction(AttributeAction.ADD).withValue(new AttributeValue().withN(value)));
UpdateItemResult updateItemResult = dynamodb.updateItem(new UpdateItemRequest("memorycraft-kinesis", key, item));
LOG.info(" saved [user_id:" + hoge.user_id+", distance:"+value+"]");
} catch (CharacterCodingException e) {
//do somothing
} catch (Throwable t) {
//do somothing
}
}
}
/**
* {@inheritDoc}
*/
@Override
public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) {
if (reason == ShutdownReason.TERMINATE) {
checkpoint(checkpointer);
}
}
/** Checkpoint with retries.
* @param checkpointer
*/
private void checkpoint(IRecordProcessorCheckpointer checkpointer) {
try{
LOG.info("CHECKPOINT !!!!!");
checkpointer.checkpoint();
} catch (ShutdownException se) {
//do something
} catch (ThrottlingException e) {
//do something
} catch (InvalidStateException e) {
//do something
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment