Created
November 22, 2017 08:33
-
-
Save lovasoa/3d8799569fda5d3e268923e8d931b2e4 to your computer and use it in GitHub Desktop.
Thread-safe class to implement a stream consumer that works in batch.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.qwant.utils; | |
import java.util.ArrayList; | |
import java.util.List; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.ScheduledExecutorService; | |
import java.util.concurrent.ScheduledFuture; | |
import java.util.concurrent.TimeUnit; | |
import java.util.function.Consumer; | |
import static java.util.Collections.synchronizedList; | |
/** | |
* Thread-safe class to implement a stream consumer that works in batch. | |
* The implementor only creates a function that consumes a list, and this class | |
* implements a consumer of single values that builds a buffer and flushes it when its size | |
* reaches a certain limit or no element was added for a certain amount of time. | |
* | |
* @param <T> The type of elements to consume. | |
*/ | |
public abstract class BatchConsumer<T> implements Consumer<T> { | |
private List<T> waiting = synchronizedList(new ArrayList<>()); | |
private int batchSize; | |
private long timeoutMillis; | |
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); | |
private ScheduledFuture<?> cleanupFuture; | |
public BatchConsumer(int batchSize, long timeoutMillis) { | |
this.batchSize = batchSize; | |
this.timeoutMillis = timeoutMillis; | |
} | |
@Override | |
public void accept(T element) { | |
this.waiting.add(element); | |
flush(false); | |
if (cleanupFuture != null) this.cleanupFuture.cancel(false); | |
this.cleanupFuture = scheduler.schedule( | |
() -> this.flush(true), | |
this.timeoutMillis, TimeUnit.MILLISECONDS); | |
} | |
public void flush(boolean force) { | |
List<T> newBatch = batchToFlush(force); | |
if (newBatch != null) { | |
this.acceptBatch(newBatch); | |
} | |
} | |
private synchronized List<T> batchToFlush(boolean force) { | |
List<T> old = null; | |
if (this.waiting.size() >= (force ? 1 : this.batchSize)) { | |
old = this.waiting; | |
this.waiting = new ArrayList<>(); | |
} | |
return old; | |
} | |
public abstract void acceptBatch(List<T> batch); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment