阴霾造句,惊世巨鳄3,鳌江到温州动车
队列同步器 (aqs), 是用来构建锁或其他同步组件的基础框架,它通过使用 int 变量表示同步状态,通过内置的 fifo 的队列完成资源获取的排队工作。(摘自《java并发编程的艺术》)
我们知道获取同步状态有独占和共享两种模式,本文先针对独占模式进行分析。
private transient volatile node head;
head 同步队列头节点
private transient volatile node tail;
tail 同步队列尾节点
private volatile int state;
state 同步状态值
volatile int waitstatus;
waitstatus 节点的等待状态,可取值如下 :
volatile node prev;
prev 指向当前节点的前置节点
volatile node next;
next 指向当前节点的后置节点
volatile thread thread;
thread 节点对应的线程也是指当前获取锁失败的线程
node nextwaiter;
独占模式下获取同步状态, 既是当前只允许一个线程获取到同步状态
public final void acquire(int arg) { if (!tryacquire(arg) && acquirequeued(addwaiter(node.exclusive), arg)) selfinterrupt(); }
从 acquire 方法中我们可以大概猜测下,获取锁的过程如下:
下面具体看下各个阶段如何实现:
private node addwaiter(node mode) { // 绑定当前线程 创建 node 节点 node node = new node(thread.currentthread(), mode); // try the fast path of enq; backup to full enq on failure node pred = tail; // 判断同步队列尾节点是否为空 if (pred != null) { // node 的前置节点指向队列尾部 node.prev = pred; // 将同步队列的 tail 移动指向 node if (compareandsettail(pred, node)) { // 将原同步队列的尾部后置节点指向 node pred.next = node; return node; } } // tail 为空说明同步队列还未初始化 // 此时调用 enq 完成队列的初始化及 node 入队 enq(node); return node; }
private node enq(final node node) { // 轮询的方式执行 // 成功入队后退出 for (;;) { node t = tail; if (t == null) { // must initialize // 创建 node, 并将 head 指向该节点 // 同时将 tail 指向该节点 // 完成队列的初始化 if (compareandsethead(new node())) tail = head; } else { // node 的前置节点指向队列尾部 node.prev = t; // 将同步队列的 tail 移动指向 node if (compareandsettail(t, node)) { // 将原同步队列的尾部后置节点指向 node t.next = node; return t; } } } }
从代码中可以看出通过 cas 操作保证节点入队的有序安全,其入队过程中如下图所示:
final boolean acquirequeued(final node node, int arg) { boolean failed = true; try { boolean interrupted = false; // for (;;) { // 获取当前节点的前置节点 final node p = node.predecessor(); // 判断前置节点是否为 head 头节点 // 若前置节点为 head 节点,则再次尝试获取同步状态 if (p == head && tryacquire(arg)) { // 若获取同步状态成功 // 则将队列的 head 移动指向当前节点 sethead(node); // 将原头部节点的 next 指向为空,便于对象回收 p.next = null; // help gc failed = false; // 退出轮询过程 return interrupted; } if (shouldparkafterfailedacquire(p, node) && parkandcheckinterrupt()) interrupted = true; } } finally { if (failed) cancelacquire(node); } }
private static boolean shouldparkafterfailedacquire(node pred, node node) { int ws = pred.waitstatus; if (ws == node.signal) /* * this node has already set status asking a release * to signal it, so it can safely park. */ // 若前置节点状态为 -1 ,则说明后置节点 node 可以安全挂起了 return true; if (ws > 0) { /* * predecessor was cancelled. skip over predecessors and * indicate retry. */ do { // ws > 0 说明前置节点状态为 cancelled , 也就是说前置节点为无效节点 // 此时从前置节点开始向队列头节点方向寻找有效的前置节点 // 此操作也即是将 cancelled 节点从队列中移除 node.prev = pred = pred.prev; } while (pred.waitstatus > 0); pred.next = node; } else { /* * waitstatus must be 0 or propagate. indicate that we * need a signal, but don't park yet. caller will need to * retry to make sure it cannot acquire before parking. */ // 若前置节点状态为初始状态 则将其状态设为 -1 compareandsetwaitstatus(pred, ws, node.signal); } return false; }
private final boolean parkandcheckinterrupt() { // 将当前线程挂起 locksupport.park(this); // 被唤醒后检查当前线程是否被挂起 return thread.interrupted(); }
从 acquirequeued 的实现可以看出,节点在入队后会采用轮询的方式(自旋)重复执行以下过程:
如下图所示:
接下来我们看看同步状态释放的实现。
释放同步状态
public final boolean release(int arg) { // 尝试释放同步状态 if (tryrelease(arg)) { node h = head; if (h != null && h.waitstatus != 0) // 唤醒后置节点 unparksuccessor(h); return true; } return false; }
private void unparksuccessor(node node) { /* * if status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. it is ok if this * fails or if status is changed by waiting thread. */ int ws = node.waitstatus; if (ws < 0) // 将 head 节点状态改为 0 compareandsetwaitstatus(node, ws, 0); /* * thread to unpark is held in successor, which is normally * just the next node. but if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 获取后置节点 node s = node.next; if (s == null || s.waitstatus > 0) { s = null; for (node t = tail; t != null && t != node; t = t.prev) if (t.waitstatus <= 0) s = t; } if (s != null) // 唤醒后置节点上所阻塞的线程 locksupport.unpark(s.thread); }
从上述代码,我们可以明白释放同步状态的过程如下:
如下图所示(红色曲线表示节点自旋过程) :
独占模式下获取同步状态,不同于 acquire 方法,该方法对中断操作敏感; 也就是说当前线程在获取同步状态的过程中,若被中断则会抛出中断异常
public final void acquireinterruptibly(int arg) throws interruptedexception { if (thread.interrupted()) // 检查线程是否被中断 // 中断则抛出中断异常由调用方处理 throw new interruptedexception(); if (!tryacquire(arg)) doacquireinterruptibly(arg); }
private void doacquireinterruptibly(int arg) throws interruptedexception { final node node = addwaiter(node.exclusive); boolean failed = true; try { for (;;) { final node p = node.predecessor(); if (p == head && tryacquire(arg)) { sethead(node); p.next = null; // help gc failed = false; return; } if (shouldparkafterfailedacquire(p, node) && parkandcheckinterrupt()) // 不同于 acquire 的操作,此处在唤醒后检查是否中断,若被中断直接抛出中断异常 throw new interruptedexception(); } } finally { if (failed) // 抛出中断异常后最终执行 cancelacquire cancelacquire(node); } }
private void cancelacquire(node node) { // ignore if node doesn't exist if (node == null) return; node.thread = null; // skip cancelled predecessors node pred = node.prev; while (pred.waitstatus > 0) node.prev = pred = pred.prev; // prednext is the apparent node to unsplice. cases below will // fail if not, in which case, we lost race vs another cancel // or signal, so no further action is necessary. node prednext = pred.next; // can use unconditional write instead of cas here. // after this atomic step, other nodes can skip past us. // before, we are free of interference from other threads. node.waitstatus = node.cancelled; // if we are the tail, remove ourselves. // 若当前节点为 tail 节点,则将 tail 移动指向 node 的前置节点 if (node == tail && compareandsettail(node, pred)) { // 同时将node 前置节点的 next 指向 null compareandsetnext(pred, prednext, null); } else { // if successor needs signal, try to set pred's next-link // so it will get one. otherwise wake it up to propagate. int ws; if (pred != head && ((ws = pred.waitstatus) == node.signal || (ws <= 0 && compareandsetwaitstatus(pred, ws, node.signal))) && pred.thread != null) { // 当前节点位于队列中部 node next = node.next; if (next != null && next.waitstatus <= 0) // 将前置节点的 next 指向 node 的后置节点 compareandsetnext(pred, prednext, next); } else { // 若 node 的前置节点为 head 节点则唤醒 node 节点的后置节点 unparksuccessor(node); } node.next = node; // help gc } }
从 acquireinterruptibly 的实现可以看出,若线程在获取同步状态的过程中出现中断操作,则会将当前线程对应的同步队列等待节点从队列中移除并唤醒可获取同步状态的线程。
独占模式超时获取同步状态,该操作与acquireinterruptibly一样对中断操作敏感,不同在于超过等待时间若未获取到同步状态将会返回
public final boolean tryacquirenanos(int arg, long nanostimeout) throws interruptedexception { if (thread.interrupted()) throw new interruptedexception(); return tryacquire(arg) || doacquirenanos(arg, nanostimeout); }
private boolean doacquirenanos(int arg, long nanostimeout) throws interruptedexception { if (nanostimeout <= 0l) return false; // 计算等待到期时间 final long deadline = system.nanotime() + nanostimeout; final node node = addwaiter(node.exclusive); boolean failed = true; try { for (;;) { final node p = node.predecessor(); if (p == head && tryacquire(arg)) { sethead(node); p.next = null; // help gc failed = false; return true; } nanostimeout = deadline - system.nanotime(); if (nanostimeout <= 0l) // 超时时间到期直接返回 return false; if (shouldparkafterfailedacquire(p, node) && nanostimeout > spinfortimeoutthreshold) // 按指定时间挂起s locksupport.parknanos(this, nanostimeout); if (thread.interrupted()) throw new interruptedexception(); } } finally { if (failed) cancelacquire(node); } }
同步队列中的节点在自旋获取同步状态的过程中,会将前置节点的状态由 0 初始状态改为 -1 signal, 若是中断敏感的操作则会将状态由 0 改为 1
同步队列中的节点在释放同步状态的过程中会将同步队列的 head 节点的状态改为 0, 也即是由 -1 变为 0;
本文主要分析了独占模式获取同步状态的操作,其大概流程如下:
如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复
浅析我对 String、StringBuilder、StringBuffer 的理解
使用IDEA搭建SSM框架的详细教程(spring + springMVC +MyBatis)
Springboot整合freemarker 404问题解决方案
引入mybatis-plus报 Invalid bound statement错误问题的解决方法
网友评论