Created
October 18, 2012 14:00
-
-
Save khotyn/3912000 to your computer and use it in GitHub Desktop.
一个无限容量的用于 Work Steeling 的 DEQueue 的实现
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.khotyn.test; | |
/** | |
* Created with IntelliJ IDEA. | |
* User: khotyn | |
* Date: 12-10-18 | |
* Time: 下午9:14 | |
* To change this template use File | Settings | File Templates. | |
*/ | |
public class CircularArray { | |
private int logCapacity; | |
private Runnable[] currentTasks; | |
CircularArray(int myLogCapacity) { | |
logCapacity = myLogCapacity; | |
currentTasks = new Runnable[1 << logCapacity]; | |
} | |
int capacity() { | |
return 1 << logCapacity; | |
} | |
Runnable get(int i) { | |
return currentTasks[i % capacity()]; | |
} | |
void put(int i, Runnable task) { | |
currentTasks[i % capacity()] = task; | |
} | |
CircularArray resize(int bottom, int top) { | |
CircularArray newTasks = new CircularArray(logCapacity); | |
for(int i = top; i < bottom; i++) { | |
newTasks.put(i, get(i)); | |
} | |
return newTasks; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.khotyn.test; | |
import java.util.concurrent.atomic.AtomicReference; | |
/** | |
* User: khotyn | |
* Date: 12-10-18 | |
* Time: 下午9:18 | |
* 一个无限容量的用于 Work Steeling 的 DEQueue 的实现,代码来自于 The Art of Multiprocessor Programming. | |
*/ | |
public class UnboundedDEQueue { | |
private final static int LOG_CAPACITY = 4; | |
private volatile CircularArray tasks; | |
volatile int bottom; | |
AtomicReference<Integer> top; | |
public UnboundedDEQueue(int LOG_CAPACITY) { | |
tasks = new CircularArray(LOG_CAPACITY); | |
top = new AtomicReference<Integer>(0); | |
bottom = 0; | |
} | |
boolean isEmpty() { | |
int localTop = top.get(); | |
int localBottom = bottom; | |
return localBottom <= localTop; | |
} | |
public void pushBottom(Runnable r) { | |
int oldBottom = bottom; | |
int oldTop = top.get(); | |
CircularArray currentTasks = tasks; | |
int size = oldBottom - oldTop; | |
if (size >= currentTasks.capacity() - 1) { // Bottom 快领先 Top 一圈,可以马上进行扩容了。 | |
currentTasks = currentTasks.resize(oldBottom, oldTop); | |
tasks = currentTasks; | |
} | |
tasks.put(oldBottom, r); | |
bottom = oldBottom + 1; | |
} | |
public Runnable popTop() { | |
int oldTop = top.get(); | |
int newTop = oldTop + 1; | |
int oldBottom = bottom; | |
CircularArray currentTasks = tasks; // 为什么加上这么一句?这个变量是 volatile 的,是为了在这里放一个屏障吗?前后的顺序不会乱掉? | |
int size = oldBottom - oldTop; | |
if (size <= 0) { // size 小于 0,DEQueue 已经空了,直接返回 null | |
return null; | |
} | |
Runnable r = tasks.get(oldTop); | |
if (top.compareAndSet(oldTop, newTop)) { | |
return r; | |
} | |
return null; // CAS 操作失败,直接返回 null | |
} | |
public Runnable popBottom() { | |
CircularArray currentTasks = tasks; | |
bottom--; // bottom 先减掉,上面的 popTop 方法如果正在执行,那么就马上可以看见了。 | |
int oldTop = top.get(); | |
int newTop = oldTop + 1; | |
int size = bottom - oldTop; | |
if (size < 0) { // bottom 比 oldTop 小,DEQueue 已经空了,直接返回 null | |
bottom = oldTop; | |
return null; | |
} | |
Runnable r = tasks.get(bottom); | |
if (size > 0) { // size 大于 0,表示 bottom 和 top 的距离还比较远,可以放心操作,直接返回 r | |
return r; | |
} | |
if (!top.compareAndSet(oldTop, newTop)) { // size 等于 0 的情况,top 和 bottom 跑到一块儿去了,用 CAS 操作保证安全 | |
r = null; | |
} | |
bottom = oldTop + 1; // 无论上面的 CAS 操作是否成功(竞争的双方总有一个人成功了,这意味着 top 增加了 1,bottom 已经比 top 小 1,所以这里把它设置成和 top 一样大小。) | |
return r; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment