Skip to content

Instantly share code, notes, and snippets.

2019-05-10 07:41:25.619 INFO 8074 --- [ main] c.a.s.producer.SmsProducerImpl : Producing SMS: fromAddress=80111, toAddress=385913344599, text=Hello, this is text generated at 2019-05-10T07:41:25.618, sendTime=2019-05-10T07:41:25.618
2019-05-10 07:41:25.622 INFO 8074 --- [ main] c.a.s.producer.SmsProducerImpl : Producing SMS: fromAddress=80444, toAddress=385913344606, text=Hello, this is text generated at 2019-05-10T07:41:25.622, sendTime=2019-05-10T07:41:25.622
2019-05-10 07:41:25.625 INFO 8074 --- [ main] c.a.s.producer.SmsProducerImpl : Producing SMS: fromAddress=80222, toAddress=385913344021, text=Hello, this is text generated at 2019-05-10T07:41:25.625, sendTime=2019-05-10T07:41:25.625
@Bean
public QueueConsumer smsSendingQueueConsumer(SmsSendingQueueConsumerModule smsSendingQueueConsumerModule, PlatformTransactionManager transactionManager) {
RetryPolicy retryPolicy = new LimitedRetryPolicy(3, new FixedDelayRetryPolicy(Duration.ofMinutes(1)));
return new QueueConsumer(smsSendingQueueConsumerModule, retryPolicy, transactionManager, 100, 10);
}
@Entity
@Table(name = "sms_message", indexes = @Index(name = "idx_sms_msg_queue_polling_fields", columnList = "next_attempt_time"))
public class SmsMessage {
@Id
@Column(name = "id", nullable = false)
@GeneratedValue
private Long id;
@Column(name = "uid", nullable = false, unique = true)
private String uid; // app-assigned unique ID
public void processQueuedItems() {
try {
LocalDateTime now = LocalDateTime.now();
List<?> itemIds = this.queueConsumerModule.findItemIdsWhereQueueingNextAttemptTimeIsBefore(now, itemsPollSize);
if (!itemIds.isEmpty()) {
logger.info("Fetched {} pending queued items", itemIds.size());
for (Object itemId : itemIds) {
processItemAndHandleErrorIfRaised(itemId);
}
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
// ...
private void startProcessingTask() {
logger.info("Starting queue processing task with delay of {} secs", this.pollingPeriodInSecs);
Runnable command = this::processQueuedItems;
this.processingTask = this.scheduledExecutorService.scheduleWithFixedDelay(command, pollingPeriodInSecs, pollingPeriodInSecs, TimeUnit.SECONDS);
}
public interface QueueConsumerModule<ID> {
List<ID> findItemIdsWhereQueueingNextAttemptTimeIsBefore(LocalDateTime time, int limit);
Optional<QueueingState> getQueueingStateForItem(ID itemId);
Optional<QueueingState> processItem(ID itemId);
}
@Embeddable
public class QueueingState {
public enum Status {
NOT_ATTEMPTED,
ERROR,
SUCCESS
}
private Status status;
@vmarcinko
vmarcinko / UndertowServer.java
Created April 28, 2015 06:21
Undertow-Spring MVC integration
package vmarcinko.undertow;
import io.undertow.Handlers;
import io.undertow.Undertow;
import io.undertow.server.HttpHandler;
import io.undertow.server.handlers.PathHandler;
import io.undertow.server.handlers.RedirectHandler;
import io.undertow.server.handlers.resource.FileResourceManager;
import io.undertow.servlet.Servlets;
import io.undertow.servlet.api.DeploymentInfo;
@vmarcinko
vmarcinko / CategoryEnumInfo.java
Created January 21, 2015 16:03
Nanocube DMP encoder in java
public class CategoryEnumInfo {
private final String label;
private final Byte encodedValue;
public CategoryEnumInfo(String label, Byte encodedValue) {
this.label = label;
this.encodedValue = encodedValue;
}
public String getLabel() {
@vmarcinko
vmarcinko / EmbeddedKafkaCluster.java
Created August 16, 2014 13:34
Embedded Zookeeper & Kafka cluster
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import java.io.File;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;