public class CustomerOrderEventHandler extends MultiEventHandler {
private static Logger logger = LogManager.getLogger(CustomerOrderEventHandler.class);
//private BasicDataSource establishedConnections = new BasicDataSource();
//private DB2SimpleDataSource nativeEstablishedConnections = new DB2SimpleDataSource();
private AS400JDBCManagedConnectionPoolDataSource dynamicEstablishedConnections =
new AS400JDBCManagedConnectionPoolDataSource();
private State3 orderState3;
private State2 orderState2;
private State1 orderState1;
public CustomerOrderEventHandler() throws SQLException {
connectionPool3();
Connection connection = dynamicEstablishedConnections.getConnection();
connection.close();
}
private void connectionPool3() {
dynamicEstablishedConnections.setServerName(State.server);
dynamicEstablishedConnections.setDatabaseName(State.DATABASE);
dynamicEstablishedConnections.setUser(State.user);
dynamicEstablishedConnections.setPassword(State.password);
dynamicEstablishedConnections.setSavePasswordWhenSerialized(true);
dynamicEstablishedConnections.setPrompt(false);
dynamicEstablishedConnections.setMinPoolSize(3);
dynamicEstablishedConnections.setInitialPoolSize(5);
dynamicEstablishedConnections.setMaxPoolSize(50);
}
public void onEvent(CustomerOrder orderEvent){
long start = System.currentTimeMillis();
Connection dbConnection = null;
try {
dbConnection = dynamicEstablishedConnections.getConnection();
long connectionSetupTime = System.currentTimeMillis() - start;
state3 = new State3(dbConnection);
state2 = new State2(dbConnection);
state1 = new State1(dbConnection);
long initialisation = System.currentTimeMillis() - start - connectionSetupTime;
int[] state3Result = state3.apply(orderEvent);
int[] state2Result = state2.apply(orderEvent);
long state1Result = state1.apply(orderEvent);
dbConnection.commit();
logger.info("eventId="+ getEventId(orderEvent) +
",connectionSetupTime=" + connectionSetupTime +
",queryPreCompilation=" + initialisation +
",insertionOnlyTimeTaken=" +
(System.currentTimeMillis() - (start + connectionSetupTime + initialisation)) +
",insertionTotalTimeTaken=" + (System.currentTimeMillis() - start));
} catch (SQLException e) {
logger.error("Error updating the order states.", e);
if(dbConnection != null) {
try {
dbConnection.rollback();
} catch (SQLException e1) {
logger.error("Error rolling back the state.", e1);
}
}
throw new CustomerOrderEventHandlerRuntimeException("Error updating the customer order states.", e);
}
}
private Long getEventId(CustomerOrder order) {
return Long.valueOf(order.getMessageHeader().getCorrelationId());
}
}import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class State2 extends State {
private static Logger logger = LogManager.getLogger(DetailState.class);
Connection connection;
PreparedStatement preparedStatement;
String detailsCompiledQuery = "INSERT INTO " + DATABASE + "." + getStateName() +
"(" + DetailState.EVENT_ID + ", " +
State2.ORDER_NUMBER + ", " +
State2.SKU_ID + ", " +
State2.SKU_ORDERED_QTY + ") VALUES(?, ?, ?, ?)";
public State2(Connection connection) throws SQLException {
this.connection = connection;
this.preparedStatement = this.connection.prepareStatement(detailsCompiledQuery); // this is taking ~200ms each time
this.preparedStatement.setPoolable(true); //might not be required, not sure
}
public int[] apply(CustomerOrder event) throws StateException {
event.getMessageBody().getDetails().forEach(detail -> {
try {
preparedStatement.setLong(1, getEventId(event));
preparedStatement.setString(2, getOrderNo(event));
preparedStatement.setInt(3, detail.getSkuId());
preparedStatement.setInt(4, detail.getQty());
preparedStatement.addBatch();
} catch (SQLException e) {
logger.error(e);
throw new StateException("Error setting up data", e);
}
});
long startedTime = System.currentTimeMillis();
int[] inserted = new int[0];
try {
inserted = preparedStatement.executeBatch();
} catch (SQLException e) {
throw new StateException("Error updating allocations data", e);
}
logger.info("eventId="+ getEventId(event) +
",state=details,insertionTimeTaken=" + (System.currentTimeMillis() - startedTime));
return inserted;
}
@Override
protected String getStateName() {
return properties.getProperty("state.order.details.name");
}
}{
"timeMillis": 1490720504955,
"thread": "RecordProcessor-0003",
"level": "ERROR",
"loggerName": "com.eventstream.consumer.dispatcher.MultiEventHandler",
"message": "error dispatching event, {}",
"thrown": {
"commonElementCount": 0,
"name": "java.lang.reflect.InvocationTargetException",
"cause": {
"commonElementCount": 18,
"localizedMessage": "serverName: Property was not changed.",
"message": "serverName: Property was not changed.",
"name": "com.ibm.as400.access.ExtendedIllegalStateException",
"extendedStackTrace": [
{
"class": "com.ibm.as400.access.AS400JDBCManagedDataSource",
"method": "setServerName",
"file": "AS400JDBCManagedDataSource.java",
"line": 3679,
"exact": false,
"location": "DB2DriverAS400-1.0.0.jar",
"version": "JTOpen 8.2"
},
{
"class": "com.nordstrom.purchaseorder.events.handler.PurchaseOrderEventHandler",
"method": "setupConnectionPool",
"file": "PurchaseOrderEventHandler.java",
"line": 43,
"exact": false,
"location": "classes/",
"version": "?"
},
{
"class": "com.nordstrom.purchaseorder.events.handler.PurchaseOrderEventHandler",
"method": "onEvent",
"file": "PurchaseOrderEventHandler.java",
"line": 82,
"exact": false,
"location": "classes/",
"version": "?"
}
]
},
"extendedStackTrace": [
{
"class": "sun.reflect.NativeMethodAccessorImpl",
"method": "invoke0",
"file": "NativeMethodAccessorImpl.java",
"line": -2,
"exact": false,
"location": "?",
"version": "1.8.0_111"
},
{
"class": "sun.reflect.NativeMethodAccessorImpl",
"method": "invoke",
"file": "NativeMethodAccessorImpl.java",
"line": 62,
"exact": false,
"location": "?",
"version": "1.8.0_111"
},
{
"class": "sun.reflect.DelegatingMethodAccessorImpl",
"method": "invoke",
"file": "DelegatingMethodAccessorImpl.java",
"line": 43,
"exact": false,
"location": "?",
"version": "1.8.0_111"
},
{
"class": "java.lang.reflect.Method",
"method": "invoke",
"file": "Method.java",
"line": 498,
"exact": false,
"location": "?",
"version": "1.8.0_111"
},
{
"class": "com.eventstream.consumer.dispatcher.MultiEventHandler",
"method": "lambda$dispatchEvent$1",
"file": "MultiEventHandler.java",
"line": 43,
"exact": false,
"location": "stream-driver-1.0-SNAPSHOT.jar",
"version": "?"
},
{
"class": "java.util.Optional",
"method": "ifPresent",
"file": "Optional.java",
"line": 159,
"exact": false,
"location": "?",
"version": "1.8.0_111"
},
{
"class": "com.eventstream.consumer.dispatcher.MultiEventHandler",
"method": "dispatchEvent",
"file": "MultiEventHandler.java",
"line": 41,
"exact": false,
"location": "stream-driver-1.0-SNAPSHOT.jar",
"version": "?"
},
{
"class": "com.eventstream.consumer.kinesis.KinesisEventPartitionProcessor",
"method": "lambda$processRecords$0",
"file": "KinesisEventPartitionProcessor.java",
"line": 160,
"exact": false,
"location": "stream-driver-1.0-SNAPSHOT.jar",
"version": "?"
},
{
"class": "java.util.ArrayList",
"method": "forEach",
"file": "ArrayList.java",
"line": 1249,
"exact": true,
"location": "?",
"version": "1.8.0_111"
},
{
"class": "com.eventstream.consumer.kinesis.KinesisEventPartitionProcessor",
"method": "processRecords",
"file": "KinesisEventPartitionProcessor.java",
"line": 147,
"exact": true,
"location": "stream-driver-1.0-SNAPSHOT.jar",
"version": "?"
},
{
"class": "com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask",
"method": "callProcessRecords",
"file": "ProcessTask.java",
"line": 215,
"exact": true,
"location": "amazon-kinesis-client-1.7.4.jar",
"version": "?"
},
{
"class": "com.amazonaws.services.kinesis.clientlibrary.lib.worker.ProcessTask",
"method": "call",
"file": "ProcessTask.java",
"line": 170,
"exact": true,
"location": "amazon-kinesis-client-1.7.4.jar",
"version": "?"
},
{
"class": "com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator",
"method": "call",
"file": "MetricsCollectingTaskDecorator.java",
"line": 49,
"exact": true,
"location": "amazon-kinesis-client-1.7.4.jar",
"version": "?"
},
{
"class": "com.amazonaws.services.kinesis.clientlibrary.lib.worker.MetricsCollectingTaskDecorator",
"method": "call",
"file": "MetricsCollectingTaskDecorator.java",
"line": 24,
"exact": true,
"location": "amazon-kinesis-client-1.7.4.jar",
"version": "?"
},
{
"class": "java.util.concurrent.FutureTask",
"method": "run",
"file": "FutureTask.java",
"line": 266,
"exact": true,
"location": "?",
"version": "1.8.0_111"
},
{
"class": "java.util.concurrent.ThreadPoolExecutor",
"method": "runWorker",
"file": "ThreadPoolExecutor.java",
"line": 1142,
"exact": true,
"location": "?",
"version": "1.8.0_111"
},
{
"class": "java.util.concurrent.ThreadPoolExecutor$Worker",
"method": "run",
"file": "ThreadPoolExecutor.java",
"line": 617,
"exact": true,
"location": "?",
"version": "1.8.0_111"
},
{
"class": "java.lang.Thread",
"method": "run",
"file": "Thread.java",
"line": 745,
"exact": true,
"location": "?",
"version": "1.8.0_111"
}
]
},
"endOfBatch": false,
"loggerFqcn": "org.apache.logging.log4j.spi.AbstractLogger",
"threadId": 84,
"threadPriority": 5
}