|
public class Consumer implements IRecordProcessor { |
|
|
|
private static final Log log = LogFactory.getLog(Consumer.class); |
|
private static final long BACKOFF_TIME = 1000; |
|
private Logger logger; |
|
private Logger rawLogger; |
|
private Gson gson; |
|
private FileWritterProperties fileWritterProperties; |
|
private String shardId; |
|
|
|
public Consumer(FileWritterProperties fileWritterProperties, Gson gson) { |
|
log.info("New Consumer"); |
|
this.fileWritterProperties = fileWritterProperties; |
|
this.gson = gson; |
|
} |
|
|
|
@Override |
|
public void initialize(InitializationInput ii) { |
|
shardId = ii.getShardId(); |
|
String fileName = "track-" + ii.getShardId() + "-" + ii.getExtendedSequenceNumber().getSubSequenceNumber(); |
|
|
|
logger = FileWritter.getLogger(fileWritterProperties.getPath(), fileName); |
|
|
|
log.info("Created file " + fileWritterProperties.getPath() + "/" + fileName); |
|
|
|
rawLogger = RawWritter.getLogger(fileWritterProperties.getPath(), fileName); |
|
|
|
log.info("Created file " + fileWritterProperties.getPath() + "/raw/" + fileName); |
|
|
|
} |
|
|
|
@Override |
|
public void processRecords(ProcessRecordsInput pri) { |
|
List<Record> records = pri.getRecords(); |
|
|
|
log.info("Received " + records.size() + " records"); |
|
|
|
for (Record record : records) { |
|
String json = new String(record.getData().array(), StandardCharsets.UTF_8); |
|
log.info("Message received: " + json + "\n"); |
|
|
|
rawLogger.info(json); |
|
|
|
try { |
|
Event event = gson.fromJson(json, Event.class); |
|
logger.info(CSVWritter.print(event)); |
|
} catch (Exception ex) { |
|
log.error(ex); |
|
} |
|
} |
|
|
|
checkpoint(pri.getCheckpointer()); |
|
|
|
} |
|
|
|
@Override |
|
public void shutdown(ShutdownInput si) { |
|
log.info("Shutdown record processor for shard " + shardId); |
|
|
|
if (si.getShutdownReason() == ShutdownReason.TERMINATE) { |
|
checkpoint(si.getCheckpointer()); |
|
} |
|
} |
|
|
|
|
|
private void checkpoint(IRecordProcessorCheckpointer checkpointer) { |
|
try { |
|
checkpointer.checkpoint(); |
|
} catch (KinesisClientLibDependencyException | InvalidStateException | ThrottlingException | ShutdownException ex) { |
|
log.error(ex); |
|
} |
|
|
|
try { |
|
Thread.sleep(BACKOFF_TIME); |
|
} catch (InterruptedException ex) { |
|
log.error(ex); |
|
} |
|
} |
|
} |