Skip to content

Instantly share code, notes, and snippets.

@dmlloyd
Last active August 29, 2015 14:16
Show Gist options
  • Select an option

  • Save dmlloyd/b4bd06c7b7dfa046d084 to your computer and use it in GitHub Desktop.

Select an option

Save dmlloyd/b4bd06c7b7dfa046d084 to your computer and use it in GitHub Desktop.
BufferSource concept
/*
* JBoss, Home of Professional Open Source
*
* Copyright 2015 Red Hat, Inc. and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.xnio;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.util.ArrayDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
/**
* A source of pooled buffers.
*
* @author <a href="mailto:[email protected]">David M. Lloyd</a>
*/
public abstract class BufferPool {
private static final boolean sliceLargeBuffers;
static {
sliceLargeBuffers = Boolean.parseBoolean(System.getProperty("xnio.buffer.slice-large-buffers", "true"));
}
private final ConcurrentLinkedQueue<ByteBuffer> masterQueue = new ConcurrentLinkedQueue<>();
private final LocalBufferCacheThreadLocal threadLocal = new LocalBufferCacheThreadLocal(this);
private final int size;
private final boolean direct;
BufferPool(final int size, final boolean direct) {
assert Integer.bitCount(size) == 1;
assert size >= 0x10;
assert size <= 0x4000_0000;
this.size = size;
this.direct = direct;
}
// buffer pool size constants
static final int LARGE_SIZE = 0x100000;
static final int NORMAL_SIZE = 0x2000;
static final int SMALL_SIZE = 0x40;
static final int LOCAL_QUEUE_SIZE = 0x10;
// todo: static final int CACHE_LINE_SIZE = max(64, CacheInfo.getSmallestDataCacheLineSize());
static final int CACHE_LINE_SIZE = 64;
/**
* The large direct buffer pool. This pool produces buffers 1 MiB in size.
*/
public static final BufferPool LARGE_DIRECT = create(LARGE_SIZE, true);
/**
* The medium direct buffer pool. This pool produces buffers 8 KiB in size.
*/
public static final BufferPool NORMAL_DIRECT = sliceLargeBuffers ? subPool(LARGE_DIRECT, NORMAL_SIZE) : create(NORMAL_SIZE, true);
/**
* The small direct buffer pool. This pool produces buffers 64 B in size.
*/
public static final BufferPool SMALL_DIRECT = subPool(NORMAL_DIRECT, SMALL_SIZE);
/**
* The large heap buffer pool. This pool produces buffers 1 MiB in size.
*/
public static final BufferPool LARGE_HEAP = create(LARGE_SIZE, false);
/**
* The medium heap buffer pool. This pool produces buffers 8 KiB in size.
*/
public static final BufferPool NORMAL_HEAP = create(NORMAL_SIZE, false);
/**
* The small direct buffer pool. This pool produces buffers 64 B in size.
*/
public static final BufferPool SMALL_HEAP = create(SMALL_SIZE, false);
/**
* A set of buffer sources for each size, which can either be {@link #DIRECT} or {@link #HEAP}.
*/
public static final class Set {
private final BufferPool small, normal, large;
Set(final BufferPool small, final BufferPool normal, final BufferPool large) {
this.small = small;
this.normal = normal;
this.large = large;
}
/**
* Get the small buffer pool for this set.
*
* @return the small buffer pool for this set
*/
public BufferPool getSmall() {
return small;
}
/**
* Get the medium buffer pool for this set.
*
* @return the medium buffer pool for this set
*/
public BufferPool getNormal() {
return normal;
}
/**
* Get the large buffer pool for this set.
*
* @return the large buffer pool for this set
*/
public BufferPool getLarge() {
return large;
}
/**
* The direct buffer source set.
*/
public static final Set DIRECT = new Set(SMALL_DIRECT, NORMAL_DIRECT, LARGE_DIRECT);
/**
* The heap buffer source set.
*/
public static final Set HEAP = new Set(SMALL_HEAP, NORMAL_HEAP, LARGE_HEAP);
}
/**
* Allocate a buffer from this source pool. The buffer must be freed through the {@link #free(ByteBuffer)} method.
*
* @return the allocated buffer
*/
public ByteBuffer allocate() {
final LocalBufferCache localCache = threadLocal.get();
ByteBuffer byteBuffer = localCache.queue.pollLast();
if (byteBuffer == null) {
ConcurrentLinkedQueue<ByteBuffer> masterQueue = this.masterQueue;
byteBuffer = masterQueue.poll();
if (byteBuffer == null) {
byteBuffer = createBuffer();
} else {
localCache.outstanding ++;
}
}
return byteBuffer;
}
/**
* Free a buffer into its appropriate pool based on its size. Care must be taken to avoid
* returning a slice of a pooled buffer, since this could cause both the buffer and its slice
* to be separately repooled, leading to likely data corruption.
*
* @param buffer the buffer to free
*/
public static void free(ByteBuffer buffer) {
final int size = buffer.capacity();
if (Integer.bitCount(size) == 1 && ! buffer.isReadOnly()) {
if (buffer.isDirect()) {
if (! (buffer instanceof MappedByteBuffer)) {
if (size == NORMAL_SIZE) {
NORMAL_DIRECT.doFree(buffer);
} else if (size == SMALL_SIZE) {
SMALL_DIRECT.doFree(buffer);
} else if (size == LARGE_SIZE) {
LARGE_DIRECT.doFree(buffer);
}
}
} else {
if (size == NORMAL_SIZE) {
NORMAL_HEAP.doFree(buffer);
} else if (size == SMALL_SIZE) {
SMALL_HEAP.doFree(buffer);
} else if (size == LARGE_SIZE) {
LARGE_HEAP.doFree(buffer);
}
}
}
}
/**
* Free a buffer as with {@link #free(ByteBuffer)} except the buffer is first zeroed and cleared.
*
* @param buffer the buffer to free
*/
public static void zeroAndFree(ByteBuffer buffer) {
Buffers.zero(buffer);
free(buffer);
}
/**
* Determine if this source returns direct buffers.
* @return {@code true} if the buffers are direct, {@code false} if they are heap
*/
public boolean isDirect() {
return direct;
}
/**
* Get the size of buffers returned by this source. The size will be a power of two.
*
* @return the size of buffers returned by this source
*/
public int getSize() {
return size;
}
// private
static BufferPool create(int size, boolean direct) {
assert Integer.bitCount(size) == 1;
assert size >= 0x10;
assert size <= 0x4000_0000;
return new BufferPool(size, direct) {
ByteBuffer createBuffer() {
return direct ? ByteBuffer.allocate(size) : ByteBuffer.allocateDirect(size);
}
};
}
static BufferPool subPool(BufferPool bufferPool, int size) {
// must be a power of two, not too small, and smaller than the parent buffer source
assert Integer.bitCount(size) == 1;
assert Integer.bitCount(bufferPool.getSize()) == 1;
assert size >= 0x10;
assert size < bufferPool.getSize();
// and thus..
assert bufferPool.getSize() % size == 0;
return new BufferPool(size, bufferPool.isDirect()) {
ByteBuffer createBuffer() {
ByteBuffer parentBuffer = bufferPool.allocate();
ByteBuffer result = Buffers.slice(parentBuffer, size);
while (parentBuffer.hasRemaining()) {
super.doFree(Buffers.slice(parentBuffer, size));
// avoid false sharing between buffers
if (size < CACHE_LINE_SIZE) {
Buffers.skip(parentBuffer, CACHE_LINE_SIZE - size);
}
}
return result;
}
};
}
abstract ByteBuffer createBuffer();
final void doFree(final ByteBuffer buffer) {
assert buffer.capacity() == size;
assert buffer.isDirect() == direct;
assert ! (buffer instanceof MappedByteBuffer);
buffer.clear();
final LocalBufferCache localCache = threadLocal.get();
int oldVal = localCache.outstanding;
if (oldVal >= LOCAL_QUEUE_SIZE || localCache.queue.size() == LOCAL_QUEUE_SIZE) {
masterQueue.add(buffer);
} else {
localCache.outstanding = oldVal - 1;
localCache.queue.add(buffer);
}
}
ConcurrentLinkedQueue<ByteBuffer> getMasterQueue() {
return masterQueue;
}
LocalBufferCacheThreadLocal getThreadLocal() {
return threadLocal;
}
static class LocalBufferCacheThreadLocal extends ThreadLocal<LocalBufferCache> {
final BufferPool bufferPool;
LocalBufferCacheThreadLocal(final BufferPool bufferPool) {
this.bufferPool = bufferPool;
}
protected LocalBufferCache initialValue() {
return new LocalBufferCache(bufferPool);
}
public void remove() {
get().empty();
}
}
static class LocalBufferCache {
final LocalBufferCacheThreadLocal bufferQueue;
final ArrayDeque<ByteBuffer> queue = new ArrayDeque<>(LOCAL_QUEUE_SIZE);
int outstanding;
LocalBufferCache(final BufferPool bufferPool) {
bufferQueue = bufferPool.getThreadLocal();
}
protected void finalize() throws Throwable {
empty();
}
void empty() {
ArrayDeque<ByteBuffer> queue = this.queue;
if (! queue.isEmpty()) {
ConcurrentLinkedQueue<ByteBuffer> masterQueue = bufferQueue.bufferPool.getMasterQueue();
do {
masterQueue.add(queue.poll());
} while (! queue.isEmpty());
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment