Created
June 15, 2017 12:28
-
-
Save hogmoru/a8c68747d2ca7b720db93fb2472f4c59 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import java.util.concurrent.ExecutorService; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.LinkedBlockingQueue; | |
import static java.util.concurrent.TimeUnit.SECONDS; | |
public class MessageProcessor { | |
private static final long CLEANUP_PERIOD_S = 10; | |
private final Map<Long, ConvoQueue> queuesByConvo = new HashMap<>(); | |
private final ExecutorService executorService; | |
public MessageProcessor(int nbThreads) { | |
executorService = Executors.newFixedThreadPool(nbThreads); | |
ScheduledExecutorService cleanupScheduler = Executors.newScheduledThreadPool(1); | |
cleanupScheduler.scheduleAtFixedRate(this::removeEmptyQueues, CLEANUP_PERIOD_S, CLEANUP_PERIOD_S, SECONDS); | |
} | |
public void addMessageToProcess(Message message) { | |
ConvoQueue queue = getQueue(message.getConversationId()); | |
queue.addMessage(message); | |
} | |
private ConvoQueue getQueue(Long convoId) { | |
synchronized (queuesByConvo) { | |
return queuesByConvo.computeIfAbsent(convoId, p -> new ConvoQueue(executorService)); | |
} | |
} | |
private void removeEmptyQueues() { | |
synchronized (queuesByConvo) { | |
queuesByConvo.entrySet().removeIf(entry -> entry.getValue().isEmpty()); | |
} | |
} | |
} | |
class ConvoQueue { | |
private Queue<MessageTask> queue; | |
private MessageTask activeTask; | |
private ExecutorService executorService; | |
ConvoQueue(ExecutorService executorService) { | |
this.executorService = executorService; | |
this.queue = new LinkedBlockingQueue<>(); | |
} | |
private void runNextIfPossible() { | |
synchronized(this) { | |
if (activeTask == null) { | |
activeTask = queue.poll(); | |
if (activeTask != null) { | |
executorService.submit(activeTask); | |
} | |
} | |
} | |
} | |
void complete(MessageTask task) { | |
synchronized(this) { | |
if (task == activeTask) { | |
activeTask = null; | |
runNextIfPossible(); | |
} | |
else { | |
throw new IllegalStateException("Attempt to complete task that is not supposed to be active: "+task); | |
} | |
} | |
} | |
boolean isEmpty() { | |
return queue.isEmpty(); | |
} | |
void addMessage(Message message) { | |
add(new MessageTask(this, message)); | |
} | |
private void add(MessageTask task) { | |
synchronized(this) { | |
queue.add(task); | |
runNextIfPossible(); | |
} | |
} | |
} | |
public class MessageTask implements Runnable { | |
private ConvoQueue convoQueue; | |
private Message message; | |
MessageTask(ConvoQueue convoQueue, Message message) { | |
this.convoQueue = convoQueue; | |
this.message = message; | |
} | |
@Override | |
public void run() { | |
try { | |
processMessage(); | |
} | |
finally { | |
convoQueue.complete(this); | |
} | |
} | |
private void processMessage() { | |
// Dummy processing with random delay to observe reordered messages & preserved convo order | |
try { | |
Thread.sleep((long) (50*Math.random())); | |
} catch (InterruptedException e) { | |
e.printStackTrace(); | |
} | |
System.out.println(message); | |
} | |
} | |
class Message { | |
private long id; | |
private long conversationId; | |
private String data; | |
Message(long id, long conversationId, String someData) { | |
this.id = id; | |
this.conversationId = conversationId; | |
this.data = someData; | |
} | |
long getConversationId() { | |
return conversationId; | |
} | |
String getData() { | |
return data; | |
} | |
public String toString() { | |
return "Message{" + id + "," + conversationId + "," + data + "}"; | |
} | |
} | |
public class MessageProcessorTest { | |
public static void main(String[] args) { | |
MessageProcessor test = new MessageProcessor(2); | |
for (int i=1; i<100; i++) { | |
test.addMessageToProcess(new Message(1000+i,i%7,"hi "+i)); | |
} | |
// Kill after 4 seconds for online test | |
try {Thread.sleep(4000);} catch(Exception e) {} | |
System.exit(0); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment