Created
December 19, 2016 05:42
-
-
Save ankurcha/179e49bf5fef5aee3e92aa0b44246ee2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.brightcove.rna.model.Events; | |
import com.brightcove.rna.model.Row; | |
import io.grpc.Context; | |
import org.jetbrains.annotations.NotNull; | |
import org.mapdb.*; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.IOException; | |
import java.util.UUID; | |
import java.util.concurrent.Executors; | |
import java.util.concurrent.TimeUnit; | |
import java.util.stream.Stream; | |
public class DiskOverflowReducer implements AutoCloseable { | |
private static final Logger log = LoggerFactory.getLogger(DiskOverflowReducer.class); | |
private static final long ALLOCATED_SIZE_IN_BYTES = 1024 * 1024 * 10; // 10 MB | |
private static final DB dbDisk = | |
DBMaker.tempFileDB() | |
.concurrencyScale(8) | |
.closeOnJvmShutdown() | |
.fileDeleteAfterClose() | |
.fileLockDisable() | |
.closeOnJvmShutdownWeakReference() | |
.fileMmapEnable() | |
.fileMmapPreclearDisable() | |
.cleanerHackEnable() | |
.allocateStartSize(ALLOCATED_SIZE_IN_BYTES * 3) | |
.allocateIncrement(ALLOCATED_SIZE_IN_BYTES) | |
.fileChannelEnable() | |
.make(); | |
private static final DB dbMemory = DBMaker.memoryDirectDB().concurrencyScale(8).make(); | |
private static final Serializer<Row> ROW_SERIALIZER = | |
new Serializer<Row>() { | |
@Override | |
public void serialize(@NotNull DataOutput2 out, @NotNull Row value) throws IOException { | |
value.writeDelimitedTo(out); | |
} | |
@Override | |
public Row deserialize(@NotNull DataInput2 input, int available) throws IOException { | |
return Row.parseDelimitedFrom(new DataInput2.DataInputToStream(input)); | |
} | |
}; | |
static { | |
dbDisk.defaultSerializerRegisterClass(Row.class); | |
// register shutdown hooks | |
Runtime.getRuntime() | |
.addShutdownHook( | |
new Thread( | |
() -> { | |
dbMemory.close(); | |
dbDisk.close(); | |
})); | |
} | |
private final HTreeMap<byte[], Row> diskMap; | |
private final HTreeMap<byte[], Row> aggregationMap; | |
private final String name; | |
public DiskOverflowReducer() { | |
MemoryMonitor.INSTANCE.waitForResources("DiskOverflowReducer"); | |
this.name = UUID.randomUUID().toString(); | |
log.info("Creating new Disk overflow reducer"); | |
this.diskMap = | |
dbDisk | |
.hashMap(name + "-onDisk") | |
.removeCollapsesIndexTreeDisable() | |
.keySerializer(Serializer.BYTE_ARRAY_DELTA2) | |
.valueSerializer(ROW_SERIALIZER) | |
.create(); | |
this.aggregationMap = | |
dbMemory | |
.hashMap(name + "-inMemory") | |
.keySerializer(Serializer.BYTE_ARRAY_DELTA2) | |
.valueSerializer(ROW_SERIALIZER) | |
.expireMaxSize(ALLOCATED_SIZE_IN_BYTES) | |
.expireOverflow(diskMap) | |
.expireAfterCreate(15, TimeUnit.MINUTES) | |
.expireExecutor(Executors.newScheduledThreadPool(2)) | |
.create(); | |
} | |
public DiskOverflowReducer mergeAll(Stream<Row> input) { | |
input.forEach( | |
row -> { | |
if (Context.current().isCancelled()) { | |
throw new RequestCancelledException(); | |
} | |
if (!aggregationMap.isClosed()) { | |
aggregationMap.merge(row.getKey().toByteArray(), row, RowUtil::merge); | |
} | |
}); | |
return this; | |
} | |
public Stream<Row> values() { | |
return aggregationMap.getValues().stream(); | |
} | |
@Override | |
public void close() throws Exception { | |
log.info("Closing reducer: aggregationMap={}, diskMap={}", aggregationMap.getSize(), diskMap.getSize()); | |
aggregationMap.clear(); | |
diskMap.clear(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment