Skip to content

Instantly share code, notes, and snippets.

public QueueConsumer(
QueueConsumerModule<?> queueConsumerModule,
RetryPolicy retryPolicy,
PlatformTransactionManager transactionManager,
int polledItemsLimit,
long pollingPeriodInSecs,
int partitionCount
) {
// ...
this.scheduledExecutorService = Executors.newScheduledThreadPool(partitionCount);
public interface QueueConsumerModule<ID> {
List<ID> findItemIdsWhereQueueingNextAttemptTimeIsBefore(int partition, LocalDateTime time, int limit);
// ...
}
@Entity
@Table(name = "sms_message", indexes = @Index(name = "idx_sms_msg_queue_polling_fields", columnList = "next_attempt_time,partition"))
public class SmsMessage {
// ...
@Column(name = "partition")
private int partition;
// ...
public interface QueueConsumerModule<ID> {
List<ID> findItemIdsWhereQueueingNextAttemptTimeIsBefore(String fromAddress, LocalDateTime time, int limit);
// ...
}
public QueueConsumer(
QueueConsumerModule<?> queueConsumerModule,
RetryPolicy retryPolicy,
PlatformTransactionManager transactionManager,
int polledItemsLimit,
long pollingPeriodInSecs,
int processingThreadCount
) {
// ...
this.processingExecutorService = Executors.newFixedThreadPool(processingThreadCount);
@EventListener(OnGrantedEvent.class)
public void onGrantedEvent() {
logger.info("Granted leadership");
startProcessingTask();
}
@EventListener(OnRevokedEvent.class)
public void onRevokedEvent() {
logger.info("Revoked leadership");
public class SmsQueueingApplication {
// ...
@Bean
public DefaultLockRepository lockRepository(DataSource dataSource) {
return new DefaultLockRepository(dataSource);
}
@Bean
public JdbcLockRegistry lockRegistry(LockRepository lockRepository) {
2019-05-10 07:45:12.160 ERROR 8415 --- [pool-1-thread-1] com.ag04.jpaqueue.QueueConsumer : Error while processing item by ID 316: Wrong time picked for send
java.lang.IllegalStateException: Wrong time picked for send
at com.ag04.smsqueueing.sender.SmsSenderImpl.send(SmsSenderImpl.java:23) ~[classes/:na]
at com.ag04.smsqueueing.SmsSendingQueueConsumerModule.processItem(SmsSendingQueueConsumerModule.java:50) ~[classes/:na]
at com.ag04.smsqueueing.SmsSendingQueueConsumerModule.processItem(SmsSendingQueueConsumerModule.java:16) ~[classes/:na]
at com.ag04.jpaqueue.QueueConsumer.processItem(QueueConsumer.java:123) [classes/:na]
at com.ag04.jpaqueue.QueueConsumer.lambda$processItemAndHandleErrorIfRaised$0(QueueConsumer.java:107) [classes/:na]
at com.ag04.jpaqueue.QueueConsumer$1.doInTransactionWithoutResult(QueueConsumer.java:117) ~[classes/:na]
at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:36) ~[spring-tx-5.1.5.RELE
2019-05-10 07:45:11.518 INFO 8415 --- [pool-1-thread-1] c.ag04.smsqueueing.sender.SmsSenderImpl : Sending SMS: SmsMessage[id=311, uid='e2a334af-4398-420a-b493-e5a05549fcc9', fromAddress='80444', toAddress='385913344829', text='Hello, this is text generated at 2019-05-10T07:40:36.360', sendingState=QueueingState[status=NOT_ATTEMPTED, nextAttemptTime=2019-05-10T07:40:36.360, attemptCount=0, lastAttemptTime=null, lastAttemptErrorMessage='null']]
2019-05-10 07:45:11.605 INFO 8415 --- [pool-1-thread-1] c.ag04.smsqueueing.sender.SmsSenderImpl : Sending SMS: SmsMessage[id=312, uid='dda97da8-af60-4eca-87d7-c406bf1c3161', fromAddress='80555', toAddress='385913344533', text='Hello, this is text generated at 2019-05-10T07:40:36.363', sendingState=QueueingState[status=NOT_ATTEMPTED, nextAttemptTime=2019-05-10T07:40:36.363, attemptCount=0, lastAttemptTime=null, lastAttemptErrorMessage='null']]
2019-05-10 07:45:11.699 INFO 8415 --- [pool-1-thread-1] c.ag04.smsqueueing.sender.SmsSenderImpl : Sending SMS: SmsMessage[id
2019-05-10 07:45:11.512 INFO 8415 --- [pool-1-thread-1] com.ag04.jpaqueue.QueueConsumer : Fetched 100 pending queued items