Skip to content

Instantly share code, notes, and snippets.

@lovasoa
Created November 22, 2017 08:33
Show Gist options
  • Save lovasoa/3d8799569fda5d3e268923e8d931b2e4 to your computer and use it in GitHub Desktop.
Save lovasoa/3d8799569fda5d3e268923e8d931b2e4 to your computer and use it in GitHub Desktop.
Thread-safe class to implement a stream consumer that works in batch.
package com.qwant.utils;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import static java.util.Collections.synchronizedList;
/**
* Thread-safe class to implement a stream consumer that works in batch.
* The implementor only creates a function that consumes a list, and this class
* implements a consumer of single values that builds a buffer and flushes it when its size
* reaches a certain limit or no element was added for a certain amount of time.
*
* @param <T> The type of elements to consume.
*/
public abstract class BatchConsumer<T> implements Consumer<T> {
private List<T> waiting = synchronizedList(new ArrayList<>());
private int batchSize;
private long timeoutMillis;
private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private ScheduledFuture<?> cleanupFuture;
public BatchConsumer(int batchSize, long timeoutMillis) {
this.batchSize = batchSize;
this.timeoutMillis = timeoutMillis;
}
@Override
public void accept(T element) {
this.waiting.add(element);
flush(false);
if (cleanupFuture != null) this.cleanupFuture.cancel(false);
this.cleanupFuture = scheduler.schedule(
() -> this.flush(true),
this.timeoutMillis, TimeUnit.MILLISECONDS);
}
public void flush(boolean force) {
List<T> newBatch = batchToFlush(force);
if (newBatch != null) {
this.acceptBatch(newBatch);
}
}
private synchronized List<T> batchToFlush(boolean force) {
List<T> old = null;
if (this.waiting.size() >= (force ? 1 : this.batchSize)) {
old = this.waiting;
this.waiting = new ArrayList<>();
}
return old;
}
public abstract void acceptBatch(List<T> batch);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment