时间:2023-06-20 06:28:38 / 来源:
博客园 / 点击数:()
【资料图】
JUC同步锁原理源码解析六----ExchangerExchangerExchanger的来源A synchronization point at which threads can pair and swap elements within pairs. Each thread presents some object on entry to the {@link #exchange exchange} method, matches with a partner thread, and receives its partner"s object on return. An Exchanger may be viewed as a bidirectional form of a {@link SynchronousQueue}.
JDK中对Exchanger的定义是在一个同步线程点,配对的线程可以交换彼此的属性数据。每一个线程提交对象数据并调用exchange方法,匹配到一个线程并接受该线程携带的数据返回。Exchanger可以被当成一个双向的同步队列。当然Exchanger并不是说只有两个线程进行匹配,它也可以进行多对多的匹配,但是只有成对的线程可以匹配并交换数据成功。
Exchanger的底层实现Exchanger的底层实现依旧依赖于CAS的自旋锁操作,通过cas保证原子性的操作
2.Exchanger基本使用public class ExchangerDemo { static Exchanger exchanger = new Exchanger<>(); public static void main(String[] args) { new Thread(()->{ String s = "T1"; try { s = exchanger.exchange(s); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " " + s); }, "t1").start(); new Thread(()->{ String s = "T2"; try { s = exchanger.exchange(s); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() + " " + s); }, "t2").start(); }}
Exchanger类public class Exchanger { /** * The byte distance (as a shift value) between any two used slots * in the arena. 1 << ASHIFT should be at least cacheline size. */ private static final int ASHIFT = 7; /** * The maximum supported arena index. The maximum allocatable * arena size is MMASK + 1. Must be a power of two minus one, less * than (1<<(31-ASHIFT)). The cap of 255 (0xff) more than suffices * for the expected scaling limits of the main algorithms. */ private static final int MMASK = 0xff; /** * Unit for sequence/version bits of bound field. Each successful * change to the bound also adds SEQ. */ private static final int SEQ = MMASK + 1; /** The number of CPUs, for sizing and spin control */ private static final int NCPU = Runtime.getRuntime().availableProcessors(); /** * The maximum slot index of the arena: The number of slots that * can in principle hold all threads without contention, or at * most the maximum indexable value. */ static final int FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1; /** * The bound for spins while waiting for a match. The actual * number of iterations will on average be about twice this value * due to randomization. Note: Spinning is disabled when NCPU==1. */ private static final int SPINS = 1 << 10; /** * Value representing null arguments/returns from public * methods. Needed because the API originally didn"t disallow null * arguments, which it should have. */ private static final Object NULL_ITEM = new Object(); /** * Sentinel value returned by internal exchange methods upon * timeout, to avoid need for separate timed versions of these * methods. */ private static final Object TIMED_OUT = new Object(); /** The corresponding thread local class */ static final class Participant extends ThreadLocal { public Node initialValue() { return new Node(); } } /** * Per-thread state */ private final Participant participant; /** * Elimination array; null until enabled (within slotExchange). * Element accesses use emulation of volatile gets and CAS. */ private volatile Node[] arena; /** * Slot used until contention detected. */ private volatile Node slot; /** * The index of the largest valid arena position, OR"ed with SEQ * number in high bits, incremented on each update. The initial * update from 0 to SEQ is used to ensure that the arena array is * constructed only once. */ private volatile int bound;}
Node类@sun.misc.Contended static final class Node {//@sun.misc.Contended 进行缓存行填充,防止数据移植刷新缓存行,造成性能损耗 int index; // Arena index int bound; // Last recorded value of Exchanger.bound int collides; // Number of CAS failures at current bound int hash; // Pseudo-random for spins Object item; // This thread"s current item volatile Object match; // Item provided by releasing thread volatile Thread parked; // Set to this thread when parked, else null}
Exchanger的构造器public Exchanger() { participant = new Participant();}
exchange()方法public V exchange(V x) throws InterruptedException { Object v; Object item = (x == null) ? NULL_ITEM : x; // 将对象赋值给item if ((arena != null ||//表示有多个线程在竞争匹配 (v = slotExchange(item, false, 0L)) == null) &&//slot匹配的对象返回空,slot表示的一对一的匹配。 ((Thread.interrupted() || // 线程是否发生中断 (v = arenaExchange(item, false, 0L)) == null)))//arena表示发生多线程竞争,如果匹配失败 throw new InterruptedException();//抛出中断异常 return (v == NULL_ITEM) ? null : (V)v;//返回匹配后并交换的数据}
exchange(V x, long timeout, TimeUnit unit)方法:public V exchange(V x, long timeout, TimeUnit unit) throws InterruptedException, TimeoutException { Object v; Object item = (x == null) ? NULL_ITEM : x;// 将对象赋值给item long ns = unit.toNanos(timeout);//超时的时间换算成纳秒 if ((arena != null || (v = slotExchange(item, true, ns)) == null) &&//slot匹配的对象返回空,slot表示的一对一的匹配。 ((Thread.interrupted() ||// 线程是否发生中断 (v = arenaExchange(item, true, ns)) == null)))//arena表示发生多线程竞争 throw new InterruptedException();//抛出中断异常 if (v == TIMED_OUT)//如果返回对象是超时 throw new TimeoutException();//抛出超时异常 return (v == NULL_ITEM) ? null : (V)v;//返回匹配后并交换的数据}
slotExchange方法private final Object slotExchange(Object item, boolean timed, long ns) { Node p = participant.get();//获取参与者节点 Thread t = Thread.currentThread();//获取当前线程 if (t.isInterrupted()) // 判断是否发生中断 return null;//中断唤醒直接返回 for (Node q;;) { if ((q = slot) != null) {//表示当前属于slot的匹配,也即不是多对多的情况 if (U.compareAndSwapObject(this, SLOT, q, null)) {//cas将当前slot置空 Object v = q.item;//获取节点q的item对象 q.match = item;//将item复制给节点的match,这里表示匹配成功 Thread w = q.parked;//获取当前线程阻塞在slot中等待的线程 if (w != null)//如果w线程不为空 U.unpark(w);//唤醒w线程 return v;//返回对象V } // create arena on contention, but continue until slot null //走到这里表示竞争激烈,创建arena数组 if (NCPU > 1 && bound == 0 &&//如果CPU大于1并且bound为0表示bound前置判断,为0标识没创建arena U.compareAndSwapInt(this, BOUND, 0, SEQ))//cas设置BOUND的值为序列号,同时保证多个线程的条件下,只有一个线程可以创建成功 arena = new Node[(FULL + 2) << ASHIFT];// FULL = (NCPU >= (MMASK << 1)) ? MMASK : NCPU >>> 1; MMASK = 0xff ; ASHIFT = 7 full按照CPU进行取值,如果CPU大于510,直接取MMASK,否则CPU数/2。 ASHIFT的值为7。所以2的7次方为128。也即可以保证缓存行对齐的大小 } else if (arena != null)//走到这里表示竞争激烈,升级成多对多的匹配,所以直接返回调用arenaExchange的匹配 return null; // caller must reroute to arenaExchange else {//这里表示当前线程找不到可以匹配的,所以将自身放到slot中等待后续匹配 p.item = item;//将item放到p.item中 if (U.compareAndSwapObject(this, SLOT, null, p))//将SLOT节点置为p节点 break;//退出 p.item = null;//cas失败将item置为空,继续循环 } } // await release 走到这里的条件是,当前线程将自身cas放入到slot中成功后break退出上一个循环 int h = p.hash;//获取p节点的hash值 long end = timed ? System.nanoTime() + ns : 0L;//如果有超时时间,计算超时时间 int spins = (NCPU > 1) ? SPINS : 1;//SPINS = 1 << 10 默认自旋次数为1024。具体看CPU个数 Object v; while ((v = p.match) == null) {//如果当前p.mach为空,表示没有匹配成功 if (spins > 0) {//进行自旋 h ^= h << 1; h ^= h >>> 3; h ^= h << 10; if (h == 0) h = SPINS | (int)t.getId(); else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)//自旋到一定次数,让出CPU执行权。 Thread.yield(); } else if (slot != p)//如果slot不等p继续自旋,因为竞争强烈,很快就可以匹配成功 spins = SPINS; else if (!t.isInterrupted() && arena == null &&//判断线程是否中断并且arena是否为空 (!timed || (ns = end - System.nanoTime()) > 0L)) {//超时判断没有超时 U.putObject(t, BLOCKER, this);//将BLOCKER置为当前对象 p.parked = t;//parked为当前线程 if (slot == p)//slot等于p节点 U.park(false, ns);//阻塞等待 p.parked = null;//再次唤醒后,将p.parked的线程置为空 U.putObject(t, BLOCKER, null);//阻塞对象也置为空 } else if (U.compareAndSwapObject(this, SLOT, p, null)) {//cas将slot的p节点置为空 v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;//如果已经超时并且不是中断唤醒,置为TIMED_OUT,否则为null break;//退出循环 } } U.putOrderedObject(p, MATCH, null);//将p.macth置为空 p.item = null;//item也置为空 p.hash = h;//p.hash位置h return v;//所以这里返回只有两种情况:1.slot中匹配成功 2.超时后返回null}
arenaExchange方法private final Object arenaExchange(Object item, boolean timed, long ns) { Node[] a = arena;//获取到arena的数组 Node p = participant.get();//获取到节点p for (int i = p.index;;) { //获取index的下标 int b, m, c; long j; // j is raw array offset Node q = (Node)U.getObjectVolatile(a, j = (i << ASHIFT) + ABASE);//获取基地址+128缓存行对齐的节点Q if (q != null && U.compareAndSwapObject(a, j, q, null)) {//如果q不为空并且cas将其职位空 Object v = q.item; //将q.item赋值给V q.match = item;//将要交换的item值赋值为q.match Thread w = q.parked;//获取到当前阻塞的线程 if (w != null)//如果线程不为空 U.unpark(w);//唤醒当前线程 return v;//返回匹配成功后交换过来的值 } else if (i <= (m = (b = bound) & MMASK) && q == null) {//如果index小于最大的arena的下标,也即属于合法下标,并且当前q为空,q为空表示我自己是第一个进来的。将我自己放到arena的j下标处 p.item = item; //将当前item赋值为节点p if (U.compareAndSwapObject(a, j, null, p)) {//cas将arena中下标为j的位置置为p节点 long end = (timed && m == 0) ? System.nanoTime() + ns : 0L;//计算超时时间 Thread t = Thread.currentThread(); // 获取当前线程 for (int h = p.hash, spins = SPINS;;) {//进行自旋 Object v = p.match;//获取当前p.match的值 if (v != null) {//如果匹配的值不为空,代表匹配成功了 U.putOrderedObject(p, MATCH, null);//将p节点的match值为空 p.item = null;//清空当前p.item的值 p.hash = h;//将h赋值为p return v;返回匹配成功后交换过来的值 } else if (spins > 0) {//匹配不成功,进行自旋等待 h ^= h << 1; h ^= h >>> 3; h ^= h << 10; // xorshift if (h == 0) // initialize hash h = SPINS | (int)t.getId(); else if (h < 0 && // approx 50% true (--spins & ((SPINS >>> 1) - 1)) == 0)//h<0并且自旋计算后未0, Thread.yield(); //让出cpu执行权 } else if (U.getObjectVolatile(a, j) != p)//获取当前arena中j下标的值不等于p spins = SPINS; // 未匹配成功,赋值自旋次数 else if (!t.isInterrupted() && m == 0 &&//判断是否中断,并且arena (!timed ||//是否超时等待 (ns = end - System.nanoTime()) > 0L)) {//超时时间是否大于0 U.putObject(t, BLOCKER, this); // 设置blocker阻塞对象为当前对象 p.parked = t;//parked的阻塞线程为当前线程 if (U.getObjectVolatile(a, j) == p)//判断当前arena的下标是否为p U.park(false, ns);//阻塞等待 p.parked = null;//唤醒后将parked置为空 U.putObject(t, BLOCKER, null);//唤醒后将BLOCKER置为空 } else if (U.getObjectVolatile(a, j) == p &&//如果当前arena的j下标处为p U.compareAndSwapObject(a, j, p, null)) {//cas将j下标的p节点置为空 if (m != 0) //m表示最大的arena数组不等于0 U.compareAndSwapInt(this, BOUND, b, b + SEQ - 1);//将数组缩小 p.item = null;//将item置为空 p.hash = h;//h赋值给p.hash i = p.index >>>= 1;//将index下标右移1位 if (Thread.interrupted())//判断是否是中断唤醒 return null;//返回空 if (timed && m == 0 && ns <= 0L)//如果超时设置并且m为0,ns超时时间小于0表示已经超时 return TIMED_OUT;//返回TIMED_OUT break; // expired; restart } } } else p.item = null; //cas失败,将p.item还原 } else {//进入这里的情况:q不为空,代表下标竞争激烈,cas失败。所以换个下标继续循环匹配 if (p.bound != b) { //如果p.bound不等于b,表示当前b已经修改过了 p.bound = b;//重新获取最新值 p.collides = 0;//将失败次数置为0; i = (i != m || m == 0) ? m : m - 1;//这里直接取m是因为有线程可能已经往m中放入了待匹配节点 } else if ((c = p.collides) < m || m == FULL ||//失败次数小于m,获取m==arena最大值。m为当前的bound的值。 !U.compareAndSwapInt(this, BOUND, b, b + SEQ + 1)) {//cas将BOUND p.collides = c + 1;//失败次数加1 i = (i == 0) ? m : i - 1; //若当前i为0,赋值m业绩最大下标,否则i的下标减1。如果是m==FULL条件进来,表示已经在最大下标处,所以从后往前查找 } else i = m + 1; //当cas设置bound成功后将i下标置为m+1,将cas的下标尽量打散。 p.index = i;//获取新的下标继续循环 } }}
4.留言到了这里,并发工具包常用的原子性工具类已经结束了,LockSupport由于直接调用底层的park方法,较为复杂,设计到JVM的源码,暂时能力有限,后续如果看懂了在进行更新
标签:最近更新
- 全球视讯!JUC同步锁原理源码解析六----Exchanger2023-06-20
- 不能没有你简谱 不能没有你_最资讯2023-06-20
- 【环球报资讯】宏碁驱动在哪下载(宏碁驱动下载)2023-06-20
- 传递“最美”力量 铁路人见证高铁建设“加速度”|要闻速递2023-06-20
- 天天实时:新春祝福语2021简短唯美_新春祝福语2021简短2023-06-20
- 1997年1元香港硬币价格_1997年香港硬币1元2023-06-20
- 在线扫一扫识别情头图片 查找情侣头像另一半_环球播报2023-06-20
- 红旗HQ9定价失策导致销量不佳?顶配确实很顶,但那是顶配2023-06-20
- 小吉祥村(关于小吉祥村介绍)2023-06-20
- 天天速看:iPhone 12如何录屏2023-06-20
- 我们可以用什么来测量风向东北风是由什么方吹来的风_我们可以用什么来测量风向 环球新要闻2023-06-20
- 世界信息:磁性材料什么意思_磁性材料的意思2023-06-20
- 快与慢的议论文题目_快与慢的议论文素材名言简介介绍-全球快看2023-06-19
- 2023年自贡签下首个百亿级大单 四川自贡沿滩引进投资113亿元的电子化学品及配套项目2023-06-19
- 环球滚动:当前内需不足存在特殊性 刺激政策要坚持不搞大水漫灌2023-06-19
- 怀孕了有什么征兆和反应_来大姨妈的前兆 全球最新2023-06-19
- 焦点信息:杭州亚运会观赛购票攻略来了,你最想看哪项?2023-06-19
- 焦点报道:港股人民币柜台的第一天,战绩如何?2023-06-19
- 刷新市场标杆,江铃福特全顺成全能轻客价值之选2023-06-19
- 刷新市场标杆,江铃福特全顺成全能轻客价值之选2023-06-19
- 【播资讯】文化|兴仁市非遗集市引各地游客前往2023-06-19
- 全球热议:案值4000万!上海海警局12小时内连续查获2起走私冻品案件2023-06-19
- 乡村之美丨台州:绘就乡村治理新画卷 世界最新2023-06-19
- 华为袁勇强:展望全光网络发展趋势,持续打造高品质联接2023-06-19
- 中国互联网协会发布打击利用恶意投诉非法牟利自律公约-天天热点2023-06-19
- 十万元起售、配置全面升级, 奔腾T90为美好生活而来2023-06-19
- 每日精选:中南“两大中心”落户湖南,湘江新区签约蚂蚁集团共建科创高地2023-06-19
- 电影《第八个嫌疑人》定档9月9日-天天精选2023-06-19
- 心情不愉快的词_关于心情不愉快的句子2023-06-19
- 每日快讯!招商银行美运NBA珍藏款球队信用卡怎么样?卡片值得申请吗?2023-06-19