Last active
August 29, 2015 14:15
-
-
Save dmlloyd/238895a28abda37d8fdd to your computer and use it in GitHub Desktop.
Simplified buffer pool impl
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* JBoss, Home of Professional Open Source | |
* | |
* Copyright 2015 Red Hat, Inc. and/or its affiliates. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package io.examples; | |
import static java.lang.Math.min; | |
import java.nio.ByteBuffer; | |
import java.util.ArrayDeque; | |
import java.util.concurrent.ConcurrentLinkedQueue; | |
/** | |
* @author <a href="mailto:[email protected]">David M. Lloyd</a> | |
*/ | |
public interface BufferPool2 { | |
ByteBuffer allocate(); | |
void free(ByteBuffer buffer); | |
boolean isDirect(); | |
int getSize(); | |
default BufferPool2 zeroing() { | |
return new BufferPool2() { | |
public ByteBuffer allocate() { | |
return BufferPool2.this.allocate(); | |
} | |
public void free(final ByteBuffer buffer) { | |
Buffers.zero(buffer); | |
BufferPool2.this.free(buffer); | |
} | |
public boolean isDirect() { | |
return BufferPool2.this.isDirect(); | |
} | |
public int getSize() { | |
return BufferPool2.this.getSize(); | |
} | |
public BufferPool2 zeroing() { | |
return this; | |
} | |
}; | |
} | |
static BufferPool2 create(int size, boolean direct) { | |
final int finalSize = min(Integer.highestOneBit(size - 1), 0x40000000) << 1; | |
final int masterSize = min(0x100000, size); | |
final int slices = size < 0x10000 ? masterSize / size : 1; | |
return new BufferPool2() { | |
private final ConcurrentLinkedQueue<ByteBuffer> masterQueue = new ConcurrentLinkedQueue<>(); | |
private final ThreadLocal<LocalCache> bufferQueue = new ThreadLocal<LocalCache>() { | |
protected LocalCache initialValue() { | |
return new LocalCache(); | |
} | |
public void remove() { | |
get().empty(); | |
} | |
}; | |
public ByteBuffer allocate() { | |
final LocalCache localCache = bufferQueue.get(); | |
ByteBuffer byteBuffer = localCache.queue.pollLast(); | |
if (byteBuffer == null) { | |
ConcurrentLinkedQueue<ByteBuffer> masterQueue = this.masterQueue; | |
byteBuffer = masterQueue.poll(); | |
if (byteBuffer == null) { | |
ByteBuffer masterBuffer = direct ? ByteBuffer.allocate(masterSize) : ByteBuffer.allocateDirect(masterSize); | |
if (slices == 1) { | |
byteBuffer = masterBuffer; | |
} else { | |
for (int i = 0; i < slices; i ++) { | |
masterBuffer.limit(masterBuffer.position() + finalSize); | |
if (i < slices - 1) { | |
fastFree(masterBuffer.slice(), localCache); | |
masterBuffer.position(masterBuffer.position() + finalSize); | |
} | |
} | |
assert masterBuffer.position() + finalSize == masterBuffer.capacity(); | |
masterBuffer.limit(masterBuffer.capacity()); | |
byteBuffer = masterBuffer.slice(); | |
} | |
} else { | |
localCache.outstanding ++; | |
} | |
} | |
return byteBuffer; | |
} | |
public void free(final ByteBuffer buffer) { | |
if (isDirect() != buffer.isDirect() || getSize() != buffer.capacity()) { | |
throw new IllegalArgumentException("Wrong buffer returned to pool"); | |
} | |
fastFree(buffer, bufferQueue.get()); | |
} | |
private void fastFree(final ByteBuffer buffer, final LocalCache localCache) { | |
buffer.clear(); | |
if (localCache.outstanding -- >= 16 || localCache.queue.size() == 16) { | |
masterQueue.add(buffer); | |
} else { | |
localCache.queue.add(buffer); | |
} | |
} | |
public boolean isDirect() { | |
return direct; | |
} | |
public int getSize() { | |
return finalSize; | |
} | |
class LocalCache { | |
final ArrayDeque<ByteBuffer> queue = new ArrayDeque<>(); | |
int outstanding; | |
protected void finalize() throws Throwable { | |
empty(); | |
} | |
void empty() { | |
while (! queue.isEmpty()) { | |
masterQueue.add(queue.poll()); | |
} | |
} | |
} | |
}; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment