Created
April 17, 2016 13:04
-
-
Save nitsanw/c713a7020cd753ea49d77f88ca4b40e5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2015 dorkbox, llc | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package org.jctools.queues; | |
import static org.jctools.util.UnsafeAccess.UNSAFE; | |
import java.util.Random; | |
import java.util.concurrent.LinkedTransferQueue; | |
import java.util.concurrent.locks.LockSupport; | |
import org.jctools.queues.alt.ConcurrentSequencedCircularArray; | |
import org.jctools.util.UnsafeRefArrayAccess; | |
abstract class MpmcTransferArrayQueueL1Pad<E> extends ConcurrentSequencedCircularArray<E> { | |
long p00, p01, p02, p03, p04, p05, p06, p07; | |
long p10, p11, p12, p13, p14, p15, p16; | |
public MpmcTransferArrayQueueL1Pad(int capacity) { | |
super(capacity); | |
} | |
} | |
abstract class MpmcTransferArrayQueueProducerField<E> extends MpmcTransferArrayQueueL1Pad<E> { | |
private final static long P_INDEX_OFFSET; | |
static { | |
try { | |
P_INDEX_OFFSET = UNSAFE | |
.objectFieldOffset(MpmcTransferArrayQueueProducerField.class.getDeclaredField("producerIndex")); | |
} | |
catch (NoSuchFieldException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
private volatile long producerIndex; | |
public MpmcTransferArrayQueueProducerField(int capacity) { | |
super(capacity); | |
} | |
protected final long lvProducerIndex() { | |
return producerIndex; | |
} | |
protected final boolean casProducerIndex(long expect, long newValue) { | |
return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue); | |
} | |
} | |
abstract class MpmcTransferArrayQueueL2Pad<E> extends MpmcTransferArrayQueueProducerField<E> { | |
long p01, p02, p03, p04, p05, p06, p07; | |
long p10, p11, p12, p13, p14, p15, p16, p17; | |
public MpmcTransferArrayQueueL2Pad(int capacity) { | |
super(capacity); | |
} | |
} | |
abstract class MpmcTransferArrayQueueConsumerField<E> extends MpmcTransferArrayQueueL2Pad<E> { | |
private final static long C_INDEX_OFFSET; | |
static { | |
try { | |
C_INDEX_OFFSET = UNSAFE | |
.objectFieldOffset(MpmcTransferArrayQueueConsumerField.class.getDeclaredField("consumerIndex")); | |
} | |
catch (NoSuchFieldException e) { | |
throw new RuntimeException(e); | |
} | |
} | |
private volatile long consumerIndex; | |
public MpmcTransferArrayQueueConsumerField(int capacity) { | |
super(capacity); | |
} | |
protected final long lvConsumerIndex() { | |
return consumerIndex; | |
} | |
protected final boolean casConsumerIndex(long expect, long newValue) { | |
return UNSAFE.compareAndSwapLong(this, C_INDEX_OFFSET, expect, newValue); | |
} | |
} | |
/** | |
* NOTE: This class is NOT a Queue, to avoid confusion I have revised it to extend a super class which does not extend | |
* the queue interface while providing the buffer + sequence buffer infrastructure. It is a very partial implementation | |
* of the TransferQueue interface. | |
* | |
* A bounded FIFO transfer medium based on {@link MpmcArrayQueue} that offers lower heap usage and better | |
* performance/scaling than {@link LinkedTransferQueue}. | |
* | |
* <p> | |
* <p> | |
* When using this queue as a transfer queue, it is sensitive to the ratio of producers-to-consumers. In this mode, this | |
* queue offers the best performance when the total time spent by the consumers is operating at the same speed or faster | |
* than the producers. | |
* | |
* <p> | |
* <p> | |
* This queue orders elements FIFO (first-in-first-out) with respect to any given producer. In addition to | |
* queue.offer/poll, this queue also supports transfer/take (from Java 7, TransferQueue interface), which means that | |
* producers will block until a consumer is available to receive the producer's object. Consumers that do not have data | |
* available will block until a producer transfers data. This is the same behavior as the {@link LinkedTransferQueue}. | |
* | |
* | |
* <p> | |
* <p> | |
* The {@code size} method is a constant-time operation, however because of the asynchronous nature of this queues, it | |
* is an overestimation the actual size. | |
* | |
* <p> | |
* <p> | |
* Memory consistency effects: As with other concurrent collections, actions in a thread prior to placing an object into | |
* a {@code MpmcTransferArrayQueue} <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a> actions | |
* subsequent to the access or removal of that element from the {@code MpmcTransferArrayQueue} in another thread. | |
* | |
* <p> | |
* <p> | |
* If it is possible for there to be more than 1024 threads running concurrently, set the system property | |
* "MpmcTransferArrayQueue.size" to be the expected max thread count. The default is tuned for consumer hardware. | |
* | |
* @author dorkbox, llc | |
* @param <T> | |
* the type of elements held in this collection | |
*/ | |
public class MpmcArrayTransferQueue<T> extends MpmcTransferArrayQueueConsumerField<MpmcArrayTransferQueue.Node> { | |
long p01, p02, p03, p04, p05, p06, p07; | |
long p10, p11, p12, p13, p14, p15, p16, p17; | |
@SuppressWarnings("boxing") | |
private static final int QUEUE_SIZE = Integer.getInteger("org.jctools.queues.MpmcTransferArrayQueue.size", 1024); | |
private abstract static class NodeColdItem { | |
@SuppressWarnings("unused") | |
Object item = null; | |
} | |
@SuppressWarnings("unused") | |
private abstract static class NodePad extends NodeColdItem { | |
long y0, y1, y2, y3, y4, y5, y6 = 7L; | |
} | |
private abstract static class NodeHotItem extends NodePad { | |
@SuppressWarnings("unused") | |
Thread thread; | |
} | |
protected static class Node extends NodeHotItem { | |
private static final long ITEM; | |
private static final long THREAD; | |
static { | |
try { | |
ITEM = UNSAFE.objectFieldOffset(NodeColdItem.class.getDeclaredField("item")); | |
THREAD = UNSAFE.objectFieldOffset(NodeHotItem.class.getDeclaredField("thread")); | |
} | |
catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
} | |
static final Object lvItem(Object node) { | |
return UNSAFE.getObjectVolatile(node, ITEM); | |
} | |
static final void soItem(Object node, Object v) { | |
UNSAFE.putOrderedObject(node, ITEM, v); | |
} | |
static final void soThread(Object node, Object thread) { | |
UNSAFE.putOrderedObject(node, THREAD, thread); | |
} | |
static final Object lvThread(Object node) { | |
return UNSAFE.getObjectVolatile(node, THREAD); | |
} | |
} | |
/** | |
* The number of times to spin (with randomly interspersed calls to Thread.yield) on multiprocessor before blocking | |
* when a node is apparently the first waiter in the queue. See above for explanation. Must be a power of two. The | |
* value is empirically derived -- it works pretty well across a variety of processors, numbers of CPUs, and OSes. | |
*/ | |
private static final int FRONT_SPINS = 1 << 7; | |
private static final ThreadLocal<Random> randomThreadLocal = new ThreadLocal<Random>() { | |
@Override | |
protected Random initialValue() { | |
return new Random(); | |
} | |
}; | |
private final ThreadLocal<Node> nodeThreadLocal = new ThreadLocal<Node>() { | |
@Override | |
protected Node initialValue() { | |
return new Node(); | |
} | |
}; | |
public MpmcArrayTransferQueue() { | |
super(QUEUE_SIZE); // must be power of 2. This is just an approximation, and systems with more than | |
// 1024 threads | |
// Prevent rare disastrous classloading in first call to LockSupport.park. | |
// See: https://bugs.openjdk.java.net/browse/JDK-8074773 | |
try { | |
Class.forName(LockSupport.class.getName(), true, LockSupport.class.getClassLoader()); | |
} | |
catch (ClassNotFoundException e) { | |
throw new Error(e); | |
} | |
} | |
/** | |
* PRODUCER method | |
* <p> | |
* Place an item on the queue, and wait (as long as needed, if necessary) for a corresponding consumer to take it. | |
*/ | |
public void transfer(final T item) throws InterruptedException { | |
final Node node = nodeThreadLocal.get(); | |
// local load of field to avoid repeated loads after volatile reads | |
final long mask = this.mask; | |
final Object[] buffer = this.buffer; | |
final long[] seqBuffer = this.sequenceBuffer; | |
long cIndex; | |
long pIndex; | |
// ASSUMPTION: if queue is not empty it must be either full of Transfer or Take nodes | |
while (true) { | |
cIndex = lvConsumerIndex(); | |
pIndex = lvProducerIndex(); | |
if (cIndex == pIndex) { | |
// EMPTY -> push a transfer node and park | |
if (pushAndPark(node, item, mask, buffer, seqBuffer, pIndex)) { | |
return; | |
} | |
// busySpin(CHAINED_SPINS); | |
} | |
else { | |
// NON-EMPTY -> check the first node (from the consumer index) to see what type it is. | |
// CRITICAL WARNING: the node item in this element can potentially be changed while in use. | |
final long nodeOffset = calcOffset(cIndex, mask); | |
final Object firstNode = UnsafeRefArrayAccess.lpElement(buffer, nodeOffset); | |
if (firstNode != null) { | |
// another producer/consumer hasn't finished setting the object yet | |
Object firstItem = Node.lvItem(firstNode); // this requires ordering the element and item | |
// stores | |
if (firstItem != null) { | |
// TRANSFER -> same mode -> push a transfer node and park | |
if (pushAndPark(node, item, mask, buffer, seqBuffer, pIndex)) { | |
return; | |
} | |
} | |
else { | |
// TAKE -> immediate transfer -> pop a take node, hand item + unpark taker | |
if (popAndUnpark(item, mask, buffer, seqBuffer, cIndex, nodeOffset, firstNode)) { | |
return; // CAS FAILED: Retry | |
} | |
} | |
} | |
// busySpin(FRONT_SPINS); | |
} | |
} | |
} | |
/** | |
* CONSUMER | |
* <p> | |
* Remove an item from the queue. If there are no items on the queue, wait for a producer to place an item on the | |
* queue. This will wait as long as necessary. | |
*/ | |
public T take() throws InterruptedException { | |
final Node node = nodeThreadLocal.get(); | |
return take(node); | |
} | |
/** | |
* CONSUMER | |
* <p> | |
* Remove an item from the queue. If there are no items on the queue, wait for a producer to place an item on the | |
* queue. This will wait as long as necessary. | |
* <p> | |
* This method does not depend on thread-local for node information, and so *can be* more efficient. The node used | |
* by this method will contain the data. | |
*/ | |
@SuppressWarnings("unchecked") | |
private T take(Node node) throws InterruptedException { | |
// local load of field to avoid repeated loads after volatile reads | |
final long mask = this.mask; | |
final Object[] buffer = this.buffer; | |
final long[] seqBuffer = this.sequenceBuffer; | |
long cIndex; | |
long pIndex; | |
// ASSUMPTION: if queue is not empty it must be either full of Transfer or Take nodes | |
while (true) { | |
cIndex = lvConsumerIndex(); | |
pIndex = lvProducerIndex(); | |
if (cIndex == pIndex) { | |
// EMPTY -> push a take node and park | |
if (pushAndPark(node, null, mask, buffer, seqBuffer, pIndex)) { | |
return (T) Node.lvItem(node); | |
} | |
// busySpin(CHAINED_SPINS); | |
} | |
else { | |
// NON-EMPTY -> check the first node (from the consumer index) to see what type it is. | |
// CRITICAL WARNING: the node item in this element can potentially be changed while in use. | |
final long nodeOffset = calcOffset(cIndex, mask); // on 64bit(no compressed oops) JVM | |
// this is the same as seqOffset | |
final Object firstNode = UnsafeRefArrayAccess.lpElement(buffer, nodeOffset); | |
if (firstNode != null) { | |
// another producer/consumer hasn't finished setting the object yet | |
Object firstItem = Node.lvItem(firstNode); // this requires ordering the element and type | |
// stores | |
if (firstItem == null) { | |
// TAKE -> same mode -> push a take node and park | |
if (pushAndPark(node, null, mask, buffer, seqBuffer, pIndex)) { | |
// now have valid node, return node item | |
return (T) Node.lvItem(node); | |
} | |
} | |
else { | |
// TRANSFER -> immediate take -> pop a transfer node, take item + unpark transfer-er | |
if (popAndUnpark(null, mask, buffer, seqBuffer, cIndex, nodeOffset, firstNode)) { | |
return (T) firstItem; | |
} | |
} | |
} | |
// busySpin(FRONT_SPINS); | |
} | |
} | |
} | |
/** | |
* @return true if successfully pushed an item to the queue | |
*/ | |
private boolean pushAndPark(final Object node, final T item, final long mask, final Object[] buffer, | |
final long[] seqBuffer, final long pIndex) throws InterruptedException { | |
// claim the slot for writing | |
final long pIndexNext = pIndex + 1; | |
final long pSeqOffset = calcSequenceOffset(pIndex, mask); | |
final long pSequence = lvSequence(seqBuffer, pSeqOffset); | |
// slot is not ready for writing (buffer full? not fully consumed? claimed by other producer?) | |
if (pSequence != pIndex || | |
!casProducerIndex(pIndex, pIndexNext)) { // failed to claim the index | |
return false; // Retry | |
} | |
// SUCCESS | |
final Thread myThread = Thread.currentThread(); | |
// save the node (type/item/thread) at this position and park | |
Node.soItem(node, item); | |
Node.soThread(node, myThread); | |
final long nodeOffset = calcOffset(pIndex, mask); | |
// note that node values are written before storing | |
UnsafeRefArrayAccess.soElement(buffer, nodeOffset, node); | |
// increment sequence by 1, the value expected by consumer | |
soSequence(sequenceBuffer, pSeqOffset, pIndexNext); | |
parkAndSpinNode(node, myThread); | |
return true; | |
} | |
/** | |
* Take an item from the queue and unpark it | |
*/ | |
private boolean popAndUnpark(final T item, final long mask, final Object[] buffer, final long[] seqBuffer, | |
final long cIndex, final long nodeOffset, final Object node) { | |
final long cIndexNext = cIndex + 1; | |
final long cSeqOffset = calcSequenceOffset(cIndex, mask); | |
final long cSequence = lvSequence(seqBuffer, cSeqOffset); | |
// slot is not ready for reading (buffer empty? not fully produced? claimed by other consumer?) | |
if (cSequence != cIndexNext || | |
!casConsumerIndex(cIndex, cIndexNext)) { // failed the CAS | |
return false; // CAS FAILED: Retry | |
} | |
// SUCCESS | |
// set the item to the node (or clear if item == null) | |
Node.soItem(node, item); | |
// advance the sequence | |
UnsafeRefArrayAccess.spElement(buffer, nodeOffset, null); // clear the old sequence value, it is | |
// eventually published | |
// Move sequence ahead by capacity, preparing it for next offer | |
// (seeing this value from a consumer will lead to retry) | |
soSequence(seqBuffer, cSeqOffset, cIndexNext + mask); // StoreStore | |
// unpark | |
final Object thread = Node.lvThread(node); | |
Node.soThread(node, null); // signals genuine unpark | |
UNSAFE.unpark(thread); // using unsafe to avoid casting | |
return true; | |
} | |
/** | |
* @return true if there is no data waiting on the queue to be taken, false if there is data waiting to be taken | |
*/ | |
public boolean isEmpty() { | |
final long mask = this.mask; | |
final Object[] buffer = this.buffer; | |
long consumerIndex; | |
long producerIndex; | |
while (true) { | |
consumerIndex = lvConsumerIndex(); | |
producerIndex = lvProducerIndex(); | |
if (consumerIndex == producerIndex) { | |
return true; // EMPTY | |
} | |
else { | |
final Object firstNode = UnsafeRefArrayAccess.lpElement(buffer, calcOffset(consumerIndex, mask)); | |
if (firstNode == null) { | |
continue; // the last producer/consumer hasn't finished setting the object yet | |
} | |
// queue is either full of transfers or full of waiting takers | |
final Object item = Node.lvItem(firstNode); | |
; | |
if (consumerIndex != lvConsumerIndex()) { | |
continue; // consumer index moved while we examine node type, try again | |
} | |
return item == null; | |
} | |
} | |
} | |
private void parkAndSpinNode(final Object node, final Thread myThread) throws InterruptedException { | |
int spins = -1; // initialized after first item and cancel checks | |
Random randomYields = null; // bound if needed | |
for (;;) { | |
if (Node.lvThread(node) == null) { | |
return; | |
} | |
else if (myThread.isInterrupted()) { | |
throw new InterruptedException(); | |
} | |
else if (spins < 0) { | |
spins = FRONT_SPINS; | |
randomYields = randomThreadLocal.get(); | |
} | |
else if (spins > 0) { | |
if (randomYields.nextInt(1024) == 0) { | |
Thread.yield(); | |
} | |
--spins; | |
} | |
else { | |
// park can return for NO REASON (must check for thread values) | |
LockSupport.park(this); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment