Exchanger类
分类:
IT文章
•
2023-11-05 14:41:21
1.简述
Exchanger是适用在两个线程之间数据交换的并发工具类,它的作用是找到一个同步点,当两个线程都执行到了同步点(exchange方法)之后(有一个没有执行到就一直等待,也可以设置等待超时时间),就将自身线程的数据与对方交换。
Exchanger使用场景:
2.Exchanger的常用方法
/**构造方法
*/
//创建一个新的Exchanger
Exchanger()
/**常用方法
*/
//exchange方法用于交互数据V
V exchange(V x)
//延迟一定时间交换数据
V exchange(V x, long timeout, TimeUnit unit)
View Code
3.Exchanger的源码分析
Exchanger的算法核心是通过一个可以交换数据的slot和一个可以带有数据item的参与者。
Exchanger的主要属性:
/** The number of CPUs, for sizing and spin control */
private static final int NCPU = Runtime.getRuntime().availableProcessors();
//arena(Slot数组)的容量。设置这个值用来避免竞争。
private static final int CAPACITY = 32;
//arena最大不会超过FULL,避免空间浪费。如果单核或者双核CPU,FULL=0,只有一个SLot可以用。
private static final int FULL = Math.max(0, Math.min(CAPACITY, NCPU / 2) - 1);
//自旋等待次数。单核情况下,自旋次数为0;多核情况下为大多数系统线程上下文切换的平均值。该值设置太大会消耗CPU。
private static final int SPINS = (NCPU == 1) ? 0 : 2000;
//若在超时机制下,自旋次数更少,因为多个检测超时的时间,这是一个经验值。
private static final int TIMED_SPINS = SPINS / 20;
private static final class Node extends AtomicReference<Object> {
//创建这个节点的线程提供的用于交换的数据。
public final Object item;
//等待唤醒的线程
public volatile Thread waiter;
/**
* Creates node with given item and empty hole.
* @param item the item
*/
public Node(Object item) {
this.item = item;
}
}
//一个Slot就是一对线程交换数据的地方。这里对Slot做了缓存行填充,能够避免伪共享问题。虽然填充导致浪费了一些空间,但Slot是按需创建,一般没什么问题。
private static final class Slot extends AtomicReference<Object> {
// Improve likelihood of isolation on <= 64 byte cache lines
long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
}
//Slot数组,在需要时才进行初始化,用volatile修饰,因为这样可以安全的使用双重锁检测方式构建。
private volatile Slot[] arena = new Slot[CAPACITY];
//正在使用的slot下标的最大值。当一个线程经历了多次CAS竞争后,这个值会递增。当一个线程自旋等待超时后,这个值会递减。
private final AtomicInteger max = new AtomicInteger();
View Code
/**
* 等待其他线程到达交换点,然后与其进行数据交换。
* 如果其他线程到来,那么交换数据,返回。
* 如果其他线程未到来,那么当前线程等待,直到如下情况发生:
* 1.有其他线程来进行数据交换。
* 2.当前线程被中断。
*/
public V exchange(V x) throws InterruptedException {
//检测当前线程是否被中断。
if (!Thread.interrupted()) {
//进行数据交换。
Object v = doExchange((x == null) ? NULL_ITEM : x, false, 0);
//检测结果是否为null。
if (v == NULL_ITEM)
return null;
//检测是否被取消。
if (v != CANCEL)
return (V)v;
//清除中断标记。
Thread.interrupted(); // Clear interrupt status on IE throw
}
throw new InterruptedException();
}
/**
* 等待其他线程到达交换点,然后与其进行数据交换。
* 如果其他线程到来,那么交换数据,返回。
* 如果其他线程未到来,那么当前线程等待,直到如下情况发生:
* 1.有其他线程来进行数据交换。
* 2.当前线程被中断。
* 3.超时。
*/
public V exchange(V x, long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
//检测当前线程是否被中断。
if (!Thread.interrupted()) {
//进行数据交换。
Object v = doExchange((x == null) ? NULL_ITEM : x,
true, unit.toNanos(timeout));
//检测结果是否为null。
if (v == NULL_ITEM)
return null;
//检测是否被取消。
if (v != CANCEL)
return (V)v;
if (!Thread.interrupted())
throw new TimeoutException();
}
throw new InterruptedException();
}
/**doExchange方法,进行数据交换
*/
private Object doExchange(Object item, boolean timed, long nanos) {
Node me = new Node(item);
//根据thread id计算出自己要去的那个交易位置(slot)
int index = hashIndex();
int fails = 0;
for (;;) {
Object y;
Slot slot = arena[index];
//slot = null,创建一个slot,然后会回到for循环,再次开始
if (slot == null)
createSlot(index);
else if ((y = slot.get()) != null &&//slot里面有人等着(有Node),则尝试和其交换
slot.compareAndSet(y, null)) {//关键点1:slot清空,Node拿出来,俩人在Node里面交互。把Slot让给后面的人,做交互地点
Node you = (Node)y;
//把Node里面的东西,换成自己的
if (you.compareAndSet(null, item)) {
//唤醒对方
LockSupport.unpark(you.waiter);
//自己把对方的东西拿走
return you.item;
}//关键点2:如果运气不好,在Node里面要交换的时候,被另一个线程抢了,回到for循环,重新开始
}
else if (y == null &&//slot里面为空(没有Node),则自己把位置占住
slot.compareAndSet(null, me)) {
//如果是0这个位置,自己阻塞,等待别人来交换
if (index == 0)
return timed ?
awaitNanos(me, slot, nanos) :
await(me, slot);
//不是0这个位置,自旋等待
Object v = spinWait(me, slot);
//自旋等待的时候,运气好,有人来交换了,返回
if (v != CANCEL)
return v;
//自旋的时候,没人来交换。走执行下面的,index减半,挪个位置,重新开始for循环
me = new Node(item);
int m = max.get();
if (m > (index >>>= 1))
max.compareAndSet(m, m - 1);
}
else if (++fails > 1) {//失败 case1: slot有人,要交互,但被人家抢了 case2: slot没人,自己要占位置,又被人家抢了
int m = max.get();
//3次匹配失败,把index扩大,再次开始for循环
if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))
index = m + 1;
else if (--index < 0)
index = m;
}
}
}
/**
* 在下标为0的Slot上等待获取其他线程填充的值。
* 如果在Slot被填充之前超时或者被中断,那么操作失败。
*/
private Object awaitNanos(Node node, Slot slot, long nanos) {
int spins = TIMED_SPINS;
long lastTime = 0;
Thread w = null;
for (;;) {
Object v = node.get();
if (v != null)
//如果已经被其他线程填充了值,那么返回这个值。
return v;
long now = System.nanoTime();
if (w == null)
w = Thread.currentThread();
else
nanos -= now - lastTime;
lastTime = now;
if (nanos > 0) {
if (spins > 0)
--spins; //先自旋几次。
else if (node.waiter == null)
node.waiter = w; //自旋阶段完毕后,将当前线程设置到node的waiter域。
else if (w.isInterrupted())
tryCancel(node, slot); //如果当前线程被中断,尝试取消node。
else
LockSupport.parkNanos(node, nanos); //阻塞给定的时间。
}
else if (tryCancel(node, slot) && !w.isInterrupted())
//超时后,如果当前线程没有被中断,那么从Slot数组的其他位置看看有没有等待交换数据的节点
return scanOnTimeout(node);
}
}