Last active
August 29, 2015 14:16
-
-
Save dmlloyd/b4bd06c7b7dfa046d084 to your computer and use it in GitHub Desktop.
BufferSource concept
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /* | |
| * JBoss, Home of Professional Open Source | |
| * | |
| * Copyright 2015 Red Hat, Inc. and/or its affiliates. | |
| * | |
| * Licensed under the Apache License, Version 2.0 (the "License"); | |
| * you may not use this file except in compliance with the License. | |
| * You may obtain a copy of the License at | |
| * | |
| * http://www.apache.org/licenses/LICENSE-2.0 | |
| * | |
| * Unless required by applicable law or agreed to in writing, software | |
| * distributed under the License is distributed on an "AS IS" BASIS, | |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
| * See the License for the specific language governing permissions and | |
| * limitations under the License. | |
| */ | |
| package org.xnio; | |
| import java.nio.ByteBuffer; | |
| import java.nio.MappedByteBuffer; | |
| import java.util.ArrayDeque; | |
| import java.util.concurrent.ConcurrentLinkedQueue; | |
| /** | |
| * A source of pooled buffers. | |
| * | |
| * @author <a href="mailto:[email protected]">David M. Lloyd</a> | |
| */ | |
| public abstract class BufferPool { | |
| private static final boolean sliceLargeBuffers; | |
| static { | |
| sliceLargeBuffers = Boolean.parseBoolean(System.getProperty("xnio.buffer.slice-large-buffers", "true")); | |
| } | |
| private final ConcurrentLinkedQueue<ByteBuffer> masterQueue = new ConcurrentLinkedQueue<>(); | |
| private final LocalBufferCacheThreadLocal threadLocal = new LocalBufferCacheThreadLocal(this); | |
| private final int size; | |
| private final boolean direct; | |
| BufferPool(final int size, final boolean direct) { | |
| assert Integer.bitCount(size) == 1; | |
| assert size >= 0x10; | |
| assert size <= 0x4000_0000; | |
| this.size = size; | |
| this.direct = direct; | |
| } | |
| // buffer pool size constants | |
| static final int LARGE_SIZE = 0x100000; | |
| static final int NORMAL_SIZE = 0x2000; | |
| static final int SMALL_SIZE = 0x40; | |
| static final int LOCAL_QUEUE_SIZE = 0x10; | |
| // todo: static final int CACHE_LINE_SIZE = max(64, CacheInfo.getSmallestDataCacheLineSize()); | |
| static final int CACHE_LINE_SIZE = 64; | |
| /** | |
| * The large direct buffer pool. This pool produces buffers 1 MiB in size. | |
| */ | |
| public static final BufferPool LARGE_DIRECT = create(LARGE_SIZE, true); | |
| /** | |
| * The medium direct buffer pool. This pool produces buffers 8 KiB in size. | |
| */ | |
| public static final BufferPool NORMAL_DIRECT = sliceLargeBuffers ? subPool(LARGE_DIRECT, NORMAL_SIZE) : create(NORMAL_SIZE, true); | |
| /** | |
| * The small direct buffer pool. This pool produces buffers 64 B in size. | |
| */ | |
| public static final BufferPool SMALL_DIRECT = subPool(NORMAL_DIRECT, SMALL_SIZE); | |
| /** | |
| * The large heap buffer pool. This pool produces buffers 1 MiB in size. | |
| */ | |
| public static final BufferPool LARGE_HEAP = create(LARGE_SIZE, false); | |
| /** | |
| * The medium heap buffer pool. This pool produces buffers 8 KiB in size. | |
| */ | |
| public static final BufferPool NORMAL_HEAP = create(NORMAL_SIZE, false); | |
| /** | |
| * The small direct buffer pool. This pool produces buffers 64 B in size. | |
| */ | |
| public static final BufferPool SMALL_HEAP = create(SMALL_SIZE, false); | |
| /** | |
| * A set of buffer sources for each size, which can either be {@link #DIRECT} or {@link #HEAP}. | |
| */ | |
| public static final class Set { | |
| private final BufferPool small, normal, large; | |
| Set(final BufferPool small, final BufferPool normal, final BufferPool large) { | |
| this.small = small; | |
| this.normal = normal; | |
| this.large = large; | |
| } | |
| /** | |
| * Get the small buffer pool for this set. | |
| * | |
| * @return the small buffer pool for this set | |
| */ | |
| public BufferPool getSmall() { | |
| return small; | |
| } | |
| /** | |
| * Get the medium buffer pool for this set. | |
| * | |
| * @return the medium buffer pool for this set | |
| */ | |
| public BufferPool getNormal() { | |
| return normal; | |
| } | |
| /** | |
| * Get the large buffer pool for this set. | |
| * | |
| * @return the large buffer pool for this set | |
| */ | |
| public BufferPool getLarge() { | |
| return large; | |
| } | |
| /** | |
| * The direct buffer source set. | |
| */ | |
| public static final Set DIRECT = new Set(SMALL_DIRECT, NORMAL_DIRECT, LARGE_DIRECT); | |
| /** | |
| * The heap buffer source set. | |
| */ | |
| public static final Set HEAP = new Set(SMALL_HEAP, NORMAL_HEAP, LARGE_HEAP); | |
| } | |
| /** | |
| * Allocate a buffer from this source pool. The buffer must be freed through the {@link #free(ByteBuffer)} method. | |
| * | |
| * @return the allocated buffer | |
| */ | |
| public ByteBuffer allocate() { | |
| final LocalBufferCache localCache = threadLocal.get(); | |
| ByteBuffer byteBuffer = localCache.queue.pollLast(); | |
| if (byteBuffer == null) { | |
| ConcurrentLinkedQueue<ByteBuffer> masterQueue = this.masterQueue; | |
| byteBuffer = masterQueue.poll(); | |
| if (byteBuffer == null) { | |
| byteBuffer = createBuffer(); | |
| } else { | |
| localCache.outstanding ++; | |
| } | |
| } | |
| return byteBuffer; | |
| } | |
| /** | |
| * Free a buffer into its appropriate pool based on its size. Care must be taken to avoid | |
| * returning a slice of a pooled buffer, since this could cause both the buffer and its slice | |
| * to be separately repooled, leading to likely data corruption. | |
| * | |
| * @param buffer the buffer to free | |
| */ | |
| public static void free(ByteBuffer buffer) { | |
| final int size = buffer.capacity(); | |
| if (Integer.bitCount(size) == 1 && ! buffer.isReadOnly()) { | |
| if (buffer.isDirect()) { | |
| if (! (buffer instanceof MappedByteBuffer)) { | |
| if (size == NORMAL_SIZE) { | |
| NORMAL_DIRECT.doFree(buffer); | |
| } else if (size == SMALL_SIZE) { | |
| SMALL_DIRECT.doFree(buffer); | |
| } else if (size == LARGE_SIZE) { | |
| LARGE_DIRECT.doFree(buffer); | |
| } | |
| } | |
| } else { | |
| if (size == NORMAL_SIZE) { | |
| NORMAL_HEAP.doFree(buffer); | |
| } else if (size == SMALL_SIZE) { | |
| SMALL_HEAP.doFree(buffer); | |
| } else if (size == LARGE_SIZE) { | |
| LARGE_HEAP.doFree(buffer); | |
| } | |
| } | |
| } | |
| } | |
| /** | |
| * Free a buffer as with {@link #free(ByteBuffer)} except the buffer is first zeroed and cleared. | |
| * | |
| * @param buffer the buffer to free | |
| */ | |
| public static void zeroAndFree(ByteBuffer buffer) { | |
| Buffers.zero(buffer); | |
| free(buffer); | |
| } | |
| /** | |
| * Determine if this source returns direct buffers. | |
| * @return {@code true} if the buffers are direct, {@code false} if they are heap | |
| */ | |
| public boolean isDirect() { | |
| return direct; | |
| } | |
| /** | |
| * Get the size of buffers returned by this source. The size will be a power of two. | |
| * | |
| * @return the size of buffers returned by this source | |
| */ | |
| public int getSize() { | |
| return size; | |
| } | |
| // private | |
| static BufferPool create(int size, boolean direct) { | |
| assert Integer.bitCount(size) == 1; | |
| assert size >= 0x10; | |
| assert size <= 0x4000_0000; | |
| return new BufferPool(size, direct) { | |
| ByteBuffer createBuffer() { | |
| return direct ? ByteBuffer.allocate(size) : ByteBuffer.allocateDirect(size); | |
| } | |
| }; | |
| } | |
| static BufferPool subPool(BufferPool bufferPool, int size) { | |
| // must be a power of two, not too small, and smaller than the parent buffer source | |
| assert Integer.bitCount(size) == 1; | |
| assert Integer.bitCount(bufferPool.getSize()) == 1; | |
| assert size >= 0x10; | |
| assert size < bufferPool.getSize(); | |
| // and thus.. | |
| assert bufferPool.getSize() % size == 0; | |
| return new BufferPool(size, bufferPool.isDirect()) { | |
| ByteBuffer createBuffer() { | |
| ByteBuffer parentBuffer = bufferPool.allocate(); | |
| ByteBuffer result = Buffers.slice(parentBuffer, size); | |
| while (parentBuffer.hasRemaining()) { | |
| super.doFree(Buffers.slice(parentBuffer, size)); | |
| // avoid false sharing between buffers | |
| if (size < CACHE_LINE_SIZE) { | |
| Buffers.skip(parentBuffer, CACHE_LINE_SIZE - size); | |
| } | |
| } | |
| return result; | |
| } | |
| }; | |
| } | |
| abstract ByteBuffer createBuffer(); | |
| final void doFree(final ByteBuffer buffer) { | |
| assert buffer.capacity() == size; | |
| assert buffer.isDirect() == direct; | |
| assert ! (buffer instanceof MappedByteBuffer); | |
| buffer.clear(); | |
| final LocalBufferCache localCache = threadLocal.get(); | |
| int oldVal = localCache.outstanding; | |
| if (oldVal >= LOCAL_QUEUE_SIZE || localCache.queue.size() == LOCAL_QUEUE_SIZE) { | |
| masterQueue.add(buffer); | |
| } else { | |
| localCache.outstanding = oldVal - 1; | |
| localCache.queue.add(buffer); | |
| } | |
| } | |
| ConcurrentLinkedQueue<ByteBuffer> getMasterQueue() { | |
| return masterQueue; | |
| } | |
| LocalBufferCacheThreadLocal getThreadLocal() { | |
| return threadLocal; | |
| } | |
| static class LocalBufferCacheThreadLocal extends ThreadLocal<LocalBufferCache> { | |
| final BufferPool bufferPool; | |
| LocalBufferCacheThreadLocal(final BufferPool bufferPool) { | |
| this.bufferPool = bufferPool; | |
| } | |
| protected LocalBufferCache initialValue() { | |
| return new LocalBufferCache(bufferPool); | |
| } | |
| public void remove() { | |
| get().empty(); | |
| } | |
| } | |
| static class LocalBufferCache { | |
| final LocalBufferCacheThreadLocal bufferQueue; | |
| final ArrayDeque<ByteBuffer> queue = new ArrayDeque<>(LOCAL_QUEUE_SIZE); | |
| int outstanding; | |
| LocalBufferCache(final BufferPool bufferPool) { | |
| bufferQueue = bufferPool.getThreadLocal(); | |
| } | |
| protected void finalize() throws Throwable { | |
| empty(); | |
| } | |
| void empty() { | |
| ArrayDeque<ByteBuffer> queue = this.queue; | |
| if (! queue.isEmpty()) { | |
| ConcurrentLinkedQueue<ByteBuffer> masterQueue = bufferQueue.bufferPool.getMasterQueue(); | |
| do { | |
| masterQueue.add(queue.poll()); | |
| } while (! queue.isEmpty()); | |
| } | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment