Skip to content

Instantly share code, notes, and snippets.

@blvp
Created October 7, 2015 17:16
Show Gist options
  • Select an option

  • Save blvp/83e999cb2cbb357ebe7a to your computer and use it in GitHub Desktop.

Select an option

Save blvp/83e999cb2cbb357ebe7a to your computer and use it in GitHub Desktop.
package ru.dz.tele2.wsc.service.impl;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.mail.javamail.JavaMailSender;
import javax.mail.internet.MimeMessage;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Created by blvp on 07.10.15.
*/
@Slf4j
public class MailConsumer implements InitializingBean, DisposableBean{
private static final String DEFAULT_THREAD_NAME = "mailThread";
private BlockingQueue<MimeMessage> queue;
private JavaMailSender javaMailSender;
private boolean shouldConsumingMessages = true;
private Thread consumerThread;
private boolean isDaemonOn = true;
private String threadName = DEFAULT_THREAD_NAME;
public MailConsumer(JavaMailSender javaMailSender) {
this(javaMailSender, new LinkedBlockingQueue<>());
}
public MailConsumer(
JavaMailSender javaMailSender,
BlockingQueue<MimeMessage> queue
) {
this.queue = queue;
this.javaMailSender = javaMailSender;
}
public void consume(MimeMessage mimeMessage) {
try {
log.info("consuming message : {}", mimeMessage);
queue.put(mimeMessage);
} catch (InterruptedException e) {
log.warn("concurrency problem at consuming messages!");
}
}
protected Thread createConsumerThread() {
Thread consumerThread = new Thread(() -> {
while (this.shouldConsumingMessages) {
try {
log.info("taking message!");
MimeMessage message = queue.take();
log.info("sending message: {}", message);
javaMailSender.send(message);
} catch (InterruptedException continued) {
log.info("possibly empty queue");
}
}
});
consumerThread.setDaemon(isDaemonOn);
consumerThread.setName(this.threadName);
return consumerThread;
}
@Override
public void destroy() throws Exception {
log.info("Destroying Mail Consumer!");
this.shouldConsumingMessages = false;
}
@Override
public void afterPropertiesSet() throws Exception {
log.info("initializing Mail Consumer!");
this.consumerThread = createConsumerThread();
consumerThread.start();
}
public void setDaemonOn(boolean isThreadDaemon) {
this.isDaemonOn = isThreadDaemon;
}
public void setConsumerThread(Thread consumerThread) {
this.consumerThread = consumerThread;
}
public void setThreadName(String threadName) {
this.threadName = threadName;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment