Skip to content

Instantly share code, notes, and snippets.

@nitsanw
Created April 17, 2016 13:04
Show Gist options
  • Save nitsanw/c713a7020cd753ea49d77f88ca4b40e5 to your computer and use it in GitHub Desktop.
Save nitsanw/c713a7020cd753ea49d77f88ca4b40e5 to your computer and use it in GitHub Desktop.
/*
* Copyright 2015 dorkbox, llc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.jctools.queues;
import static org.jctools.util.UnsafeAccess.UNSAFE;
import java.util.Random;
import java.util.concurrent.LinkedTransferQueue;
import java.util.concurrent.locks.LockSupport;
import org.jctools.queues.alt.ConcurrentSequencedCircularArray;
import org.jctools.util.UnsafeRefArrayAccess;
abstract class MpmcTransferArrayQueueL1Pad<E> extends ConcurrentSequencedCircularArray<E> {
long p00, p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16;
public MpmcTransferArrayQueueL1Pad(int capacity) {
super(capacity);
}
}
abstract class MpmcTransferArrayQueueProducerField<E> extends MpmcTransferArrayQueueL1Pad<E> {
private final static long P_INDEX_OFFSET;
static {
try {
P_INDEX_OFFSET = UNSAFE
.objectFieldOffset(MpmcTransferArrayQueueProducerField.class.getDeclaredField("producerIndex"));
}
catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
}
private volatile long producerIndex;
public MpmcTransferArrayQueueProducerField(int capacity) {
super(capacity);
}
protected final long lvProducerIndex() {
return producerIndex;
}
protected final boolean casProducerIndex(long expect, long newValue) {
return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue);
}
}
abstract class MpmcTransferArrayQueueL2Pad<E> extends MpmcTransferArrayQueueProducerField<E> {
long p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16, p17;
public MpmcTransferArrayQueueL2Pad(int capacity) {
super(capacity);
}
}
abstract class MpmcTransferArrayQueueConsumerField<E> extends MpmcTransferArrayQueueL2Pad<E> {
private final static long C_INDEX_OFFSET;
static {
try {
C_INDEX_OFFSET = UNSAFE
.objectFieldOffset(MpmcTransferArrayQueueConsumerField.class.getDeclaredField("consumerIndex"));
}
catch (NoSuchFieldException e) {
throw new RuntimeException(e);
}
}
private volatile long consumerIndex;
public MpmcTransferArrayQueueConsumerField(int capacity) {
super(capacity);
}
protected final long lvConsumerIndex() {
return consumerIndex;
}
protected final boolean casConsumerIndex(long expect, long newValue) {
return UNSAFE.compareAndSwapLong(this, C_INDEX_OFFSET, expect, newValue);
}
}
/**
* NOTE: This class is NOT a Queue, to avoid confusion I have revised it to extend a super class which does not extend
* the queue interface while providing the buffer + sequence buffer infrastructure. It is a very partial implementation
* of the TransferQueue interface.
*
* A bounded FIFO transfer medium based on {@link MpmcArrayQueue} that offers lower heap usage and better
* performance/scaling than {@link LinkedTransferQueue}.
*
* <p>
* <p>
* When using this queue as a transfer queue, it is sensitive to the ratio of producers-to-consumers. In this mode, this
* queue offers the best performance when the total time spent by the consumers is operating at the same speed or faster
* than the producers.
*
* <p>
* <p>
* This queue orders elements FIFO (first-in-first-out) with respect to any given producer. In addition to
* queue.offer/poll, this queue also supports transfer/take (from Java 7, TransferQueue interface), which means that
* producers will block until a consumer is available to receive the producer's object. Consumers that do not have data
* available will block until a producer transfers data. This is the same behavior as the {@link LinkedTransferQueue}.
*
*
* <p>
* <p>
* The {@code size} method is a constant-time operation, however because of the asynchronous nature of this queues, it
* is an overestimation the actual size.
*
* <p>
* <p>
* Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into
* a {@code MpmcTransferArrayQueue} <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> actions
* subsequent to the access or removal of that element from the {@code MpmcTransferArrayQueue} in another thread.
*
* <p>
* <p>
* If it is possible for there to be more than 1024 threads running concurrently, set the system property
* "MpmcTransferArrayQueue.size" to be the expected max thread count. The default is tuned for consumer hardware.
*
* @author dorkbox, llc
* @param <T>
* the type of elements held in this collection
*/
public class MpmcArrayTransferQueue<T> extends MpmcTransferArrayQueueConsumerField<MpmcArrayTransferQueue.Node> {
long p01, p02, p03, p04, p05, p06, p07;
long p10, p11, p12, p13, p14, p15, p16, p17;
@SuppressWarnings("boxing")
private static final int QUEUE_SIZE = Integer.getInteger("org.jctools.queues.MpmcTransferArrayQueue.size", 1024);
private abstract static class NodeColdItem {
@SuppressWarnings("unused")
Object item = null;
}
@SuppressWarnings("unused")
private abstract static class NodePad extends NodeColdItem {
long y0, y1, y2, y3, y4, y5, y6 = 7L;
}
private abstract static class NodeHotItem extends NodePad {
@SuppressWarnings("unused")
Thread thread;
}
protected static class Node extends NodeHotItem {
private static final long ITEM;
private static final long THREAD;
static {
try {
ITEM = UNSAFE.objectFieldOffset(NodeColdItem.class.getDeclaredField("item"));
THREAD = UNSAFE.objectFieldOffset(NodeHotItem.class.getDeclaredField("thread"));
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
static final Object lvItem(Object node) {
return UNSAFE.getObjectVolatile(node, ITEM);
}
static final void soItem(Object node, Object v) {
UNSAFE.putOrderedObject(node, ITEM, v);
}
static final void soThread(Object node, Object thread) {
UNSAFE.putOrderedObject(node, THREAD, thread);
}
static final Object lvThread(Object node) {
return UNSAFE.getObjectVolatile(node, THREAD);
}
}
/**
* The number of times to spin (with randomly interspersed calls to Thread.yield) on multiprocessor before blocking
* when a node is apparently the first waiter in the queue. See above for explanation. Must be a power of two. The
* value is empirically derived -- it works pretty well across a variety of processors, numbers of CPUs, and OSes.
*/
private static final int FRONT_SPINS = 1 << 7;
private static final ThreadLocal<Random> randomThreadLocal = new ThreadLocal<Random>() {
@Override
protected Random initialValue() {
return new Random();
}
};
private final ThreadLocal<Node> nodeThreadLocal = new ThreadLocal<Node>() {
@Override
protected Node initialValue() {
return new Node();
}
};
public MpmcArrayTransferQueue() {
super(QUEUE_SIZE); // must be power of 2. This is just an approximation, and systems with more than
// 1024 threads
// Prevent rare disastrous classloading in first call to LockSupport.park.
// See: https://bugs.openjdk.java.net/browse/JDK-8074773
try {
Class.forName(LockSupport.class.getName(), true, LockSupport.class.getClassLoader());
}
catch (ClassNotFoundException e) {
throw new Error(e);
}
}
/**
* PRODUCER method
* <p>
* Place an item on the queue, and wait (as long as needed, if necessary) for a corresponding consumer to take it.
*/
public void transfer(final T item) throws InterruptedException {
final Node node = nodeThreadLocal.get();
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final Object[] buffer = this.buffer;
final long[] seqBuffer = this.sequenceBuffer;
long cIndex;
long pIndex;
// ASSUMPTION: if queue is not empty it must be either full of Transfer or Take nodes
while (true) {
cIndex = lvConsumerIndex();
pIndex = lvProducerIndex();
if (cIndex == pIndex) {
// EMPTY -> push a transfer node and park
if (pushAndPark(node, item, mask, buffer, seqBuffer, pIndex)) {
return;
}
// busySpin(CHAINED_SPINS);
}
else {
// NON-EMPTY -> check the first node (from the consumer index) to see what type it is.
// CRITICAL WARNING: the node item in this element can potentially be changed while in use.
final long nodeOffset = calcOffset(cIndex, mask);
final Object firstNode = UnsafeRefArrayAccess.lpElement(buffer, nodeOffset);
if (firstNode != null) {
// another producer/consumer hasn't finished setting the object yet
Object firstItem = Node.lvItem(firstNode); // this requires ordering the element and item
// stores
if (firstItem != null) {
// TRANSFER -> same mode -> push a transfer node and park
if (pushAndPark(node, item, mask, buffer, seqBuffer, pIndex)) {
return;
}
}
else {
// TAKE -> immediate transfer -> pop a take node, hand item + unpark taker
if (popAndUnpark(item, mask, buffer, seqBuffer, cIndex, nodeOffset, firstNode)) {
return; // CAS FAILED: Retry
}
}
}
// busySpin(FRONT_SPINS);
}
}
}
/**
* CONSUMER
* <p>
* Remove an item from the queue. If there are no items on the queue, wait for a producer to place an item on the
* queue. This will wait as long as necessary.
*/
public T take() throws InterruptedException {
final Node node = nodeThreadLocal.get();
return take(node);
}
/**
* CONSUMER
* <p>
* Remove an item from the queue. If there are no items on the queue, wait for a producer to place an item on the
* queue. This will wait as long as necessary.
* <p>
* This method does not depend on thread-local for node information, and so *can be* more efficient. The node used
* by this method will contain the data.
*/
@SuppressWarnings("unchecked")
private T take(Node node) throws InterruptedException {
// local load of field to avoid repeated loads after volatile reads
final long mask = this.mask;
final Object[] buffer = this.buffer;
final long[] seqBuffer = this.sequenceBuffer;
long cIndex;
long pIndex;
// ASSUMPTION: if queue is not empty it must be either full of Transfer or Take nodes
while (true) {
cIndex = lvConsumerIndex();
pIndex = lvProducerIndex();
if (cIndex == pIndex) {
// EMPTY -> push a take node and park
if (pushAndPark(node, null, mask, buffer, seqBuffer, pIndex)) {
return (T) Node.lvItem(node);
}
// busySpin(CHAINED_SPINS);
}
else {
// NON-EMPTY -> check the first node (from the consumer index) to see what type it is.
// CRITICAL WARNING: the node item in this element can potentially be changed while in use.
final long nodeOffset = calcOffset(cIndex, mask); // on 64bit(no compressed oops) JVM
// this is the same as seqOffset
final Object firstNode = UnsafeRefArrayAccess.lpElement(buffer, nodeOffset);
if (firstNode != null) {
// another producer/consumer hasn't finished setting the object yet
Object firstItem = Node.lvItem(firstNode); // this requires ordering the element and type
// stores
if (firstItem == null) {
// TAKE -> same mode -> push a take node and park
if (pushAndPark(node, null, mask, buffer, seqBuffer, pIndex)) {
// now have valid node, return node item
return (T) Node.lvItem(node);
}
}
else {
// TRANSFER -> immediate take -> pop a transfer node, take item + unpark transfer-er
if (popAndUnpark(null, mask, buffer, seqBuffer, cIndex, nodeOffset, firstNode)) {
return (T) firstItem;
}
}
}
// busySpin(FRONT_SPINS);
}
}
}
/**
* @return true if successfully pushed an item to the queue
*/
private boolean pushAndPark(final Object node, final T item, final long mask, final Object[] buffer,
final long[] seqBuffer, final long pIndex) throws InterruptedException {
// claim the slot for writing
final long pIndexNext = pIndex + 1;
final long pSeqOffset = calcSequenceOffset(pIndex, mask);
final long pSequence = lvSequence(seqBuffer, pSeqOffset);
// slot is not ready for writing (buffer full? not fully consumed? claimed by other producer?)
if (pSequence != pIndex ||
!casProducerIndex(pIndex, pIndexNext)) { // failed to claim the index
return false; // Retry
}
// SUCCESS
final Thread myThread = Thread.currentThread();
// save the node (type/item/thread) at this position and park
Node.soItem(node, item);
Node.soThread(node, myThread);
final long nodeOffset = calcOffset(pIndex, mask);
// note that node values are written before storing
UnsafeRefArrayAccess.soElement(buffer, nodeOffset, node);
// increment sequence by 1, the value expected by consumer
soSequence(sequenceBuffer, pSeqOffset, pIndexNext);
parkAndSpinNode(node, myThread);
return true;
}
/**
* Take an item from the queue and unpark it
*/
private boolean popAndUnpark(final T item, final long mask, final Object[] buffer, final long[] seqBuffer,
final long cIndex, final long nodeOffset, final Object node) {
final long cIndexNext = cIndex + 1;
final long cSeqOffset = calcSequenceOffset(cIndex, mask);
final long cSequence = lvSequence(seqBuffer, cSeqOffset);
// slot is not ready for reading (buffer empty? not fully produced? claimed by other consumer?)
if (cSequence != cIndexNext ||
!casConsumerIndex(cIndex, cIndexNext)) { // failed the CAS
return false; // CAS FAILED: Retry
}
// SUCCESS
// set the item to the node (or clear if item == null)
Node.soItem(node, item);
// advance the sequence
UnsafeRefArrayAccess.spElement(buffer, nodeOffset, null); // clear the old sequence value, it is
// eventually published
// Move sequence ahead by capacity, preparing it for next offer
// (seeing this value from a consumer will lead to retry)
soSequence(seqBuffer, cSeqOffset, cIndexNext + mask); // StoreStore
// unpark
final Object thread = Node.lvThread(node);
Node.soThread(node, null); // signals genuine unpark
UNSAFE.unpark(thread); // using unsafe to avoid casting
return true;
}
/**
* @return true if there is no data waiting on the queue to be taken, false if there is data waiting to be taken
*/
public boolean isEmpty() {
final long mask = this.mask;
final Object[] buffer = this.buffer;
long consumerIndex;
long producerIndex;
while (true) {
consumerIndex = lvConsumerIndex();
producerIndex = lvProducerIndex();
if (consumerIndex == producerIndex) {
return true; // EMPTY
}
else {
final Object firstNode = UnsafeRefArrayAccess.lpElement(buffer, calcOffset(consumerIndex, mask));
if (firstNode == null) {
continue; // the last producer/consumer hasn't finished setting the object yet
}
// queue is either full of transfers or full of waiting takers
final Object item = Node.lvItem(firstNode);
;
if (consumerIndex != lvConsumerIndex()) {
continue; // consumer index moved while we examine node type, try again
}
return item == null;
}
}
}
private void parkAndSpinNode(final Object node, final Thread myThread) throws InterruptedException {
int spins = -1; // initialized after first item and cancel checks
Random randomYields = null; // bound if needed
for (;;) {
if (Node.lvThread(node) == null) {
return;
}
else if (myThread.isInterrupted()) {
throw new InterruptedException();
}
else if (spins < 0) {
spins = FRONT_SPINS;
randomYields = randomThreadLocal.get();
}
else if (spins > 0) {
if (randomYields.nextInt(1024) == 0) {
Thread.yield();
}
--spins;
}
else {
// park can return for NO REASON (must check for thread values)
LockSupport.park(this);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment