StampedLock

1.官方文档

A capability-based lock with three modes for controlling 
read/write access. The state of a StampedLock consists of a 
version and mode. Lock acquisition methods return a stamp 
that represents and controls access with respect to a lock state; 
"try" versions of these methods may instead return the special 
value zero to represent failure to acquire access. Lock release 
and conversion methods require stamps as arguments, and fail 
if they do not match the state of the lock. The three modes are:

* Writing. Method writeLock() possibly blocks waiting for 
  exclusive access, returning a stamp that can be used in method 
  unlockWrite(long) to release the lock. Untimed and timed 
  versions of tryWriteLock are also provided. When the lock is 
  held in write mode, no read locks may be obtained, and all 
  optimistic read validations will fail.

* Reading. Method readLock() possibly blocks waiting for non-
  exclusive access, returning a stamp that can be used in method 
  unlockRead(long) to release the lock. Untimed and timed 
  versions of tryReadLock are also provided.

* Optimistic Reading. Method tryOptimisticRead() returns a 
  non-zero stamp only if the lock is not currently held in write 
  mode. Method validate(long) returns true if the lock has not 
  been acquired in write mode since obtaining a given stamp. 
  This mode can be thought of as an extremely weak version of a 
  read-lock, that can be broken by a writer at any time. The use 
  of optimistic mode for short read-only code segments often 
  reduces contention and improves throughput. However, its use 
  is inherently fragile. Optimistic read sections should only read 
  fields and hold them in local variables for later use after 
  validation. Fields read while in optimistic mode may be wildly 
  inconsistent, so usage applies only when you are familiar 
  enough with data representations to check consistency and/or 
  repeatedly invoke method validate(). For example, such steps 
  are typically required when first reading an object or array 
  reference, and then accessing one of its fields, elements or 
  methods.

This class also supports methods that conditionally provide 
conversions across the three modes. For example, method 
tryConvertToWriteLock(long) attempts to "upgrade" a mode, 
returning a valid write stamp if (1) already in writing mode (2) in 
reading mode and there are no other readers or (3) in optimistic 
mode and the lock is available. The forms of these methods are 
designed to help reduce some of the code bloat that otherwise 
occurs in retry-based designs.

StampedLocks are designed for use as internal utilities in the 
development of thread-safe components. Their use relies on 
knowledge of the internal properties of the data, objects, and 
methods they are protecting. They are not reentrant, so locked 
bodies should not call other unknown methods that may try to 
re-acquire locks (although you may pass a stamp to other 
methods that can use or convert it). The use of read lock 
modes relies on the associated code sections being side-effect-
free. Unvalidated optimistic read sections cannot call methods 
that are not known to tolerate potential inconsistencies. Stamps 
use finite representations, and are not cryptographically secure 
(i.e., a valid stamp may be guessable). Stamp values may 
recycle after (no sooner than) one year of continuous operation. 
A stamp held without use or validation for longer than this 
period may fail to validate correctly. StampedLocks are 
serializable, but always deserialize into initial unlocked state, so 
they are not useful for remote locking.

The scheduling policy of StampedLock does not consistently 
prefer readers over writers or vice versa. All "try" methods are 
best-effort and do not necessarily conform to any scheduling or 
fairness policy. A zero return from any "try" method for 
acquiring or converting locks does not carry any information 
about the state of the lock; a subsequent invocation may 
succeed.

Because it supports coordinated usage across multiple lock 
modes, this class does not directly implement the Lock or 
ReadWriteLock interfaces. However, a StampedLock may be 
viewed asReadLock(), asWriteLock(), or asReadWriteLock() in 
applications requiring only the associated set of functionality.

Sample Usage. The following illustrates some usage idioms in 
a class that maintains simple two-dimensional points. The 
sample code illustrates some try/catch conventions even 
though they are not strictly needed here because no exceptions 
can occur in their bodies.

基于功能的锁,具有三种控制读/写访问的模式。 StampedLock的状态包括版本和模式。锁获取方法返回一个表示和控制锁状态访问的stamp;这些方法的“try”版本可能会返回特殊值零以表示获取访问失败。锁释放和转换方法需要stamp作为参数,如果它们与锁状态不匹配则会失败。这三种模式是:

  • 写。方法writeLock()可能阻塞等待独占访问,返回可在方法unlockWrite(long)中使用的stamp以释放锁。还提供了不定时和定时版本的tryWriteLock。当锁保持在写模式时,不能获得读锁,并且所有乐观读验证都将失败。
  • 读。方法readLock()可能阻塞等待非独占访问,返回可以在方法unlockRead(long)中使用的stamp以释放锁。还提供了不定时和定时版本的tryReadLock。
  • 乐观读。方法tryOptimisticRead()仅在当前锁未处于写模式时才返回非零stamp。方法validate(long)如果在获取给定stamp后没有被写模式获取锁,则返回true。这种模式可以被认为是读锁的极弱版本,可以随时由写线程打破。对短的只读代码段使用乐观模式通常可以减少争用并提高吞吐量。但是,它的使用本质上是脆弱的。乐观读取部分应该只读取字段并在验证后将它们保存在局部变量中,以便以后使用。在乐观模式下读取的字段可能非常不一致,因此仅当您熟悉数据表示以检查一致性和/或重复调用方法validate()时才能使用。例如,在首先读取对象或数组引用,然后访问其字段、元素或方法之一时,通常需要这些步骤。

此类还支持提供有条件地跨三种模式的转换的方法。例如,方法tryConvertToWriteLock(long)尝试“升级”一个模式并返回一个有效的写stamp,如果

  • 1)已经处于写模式
  • 2)处于读模式并且没有其他读线程
  • 3)处于乐观模式并且锁是可用的。

这些方法的形式旨在帮助减少基于重试设计中出现的一些代码膨胀。

StampedLocks设计用作开发线程安全组件的内部实用程序。它们的使用依赖于对它们所保护的数据、对象和方法的内部属性的了解。它们不是可重入的,因此锁定的主体不应该调用可能尝试重新获取锁的其他未知方法(尽管您可以将stamp传递给可以使用或转换它的其他方法)。读模式的使用依赖于相关的代码部分是无副作用的。未经验证的乐观读取部分无法调用未知容忍潜在不一致的方法。stamps使用有限表示,并且不是加密安全的(即可猜测有效stamp)。stamp值可在连续操作一年后(不快于一年)后重循环。未经使用或验证超过此期限而持有的stamp可能无法正确验证。 StampedLocks是可序列化的,但总是反序列化为初始无锁状态,因此它们对远程锁定没有用。

StampedLock的调度策略并不总是倾向于读线程,反之亦然。所有try方法都是尽力而为,并不一定符合任何调度或公平政策。从任何“try”方法获取或转换锁时返回零 不会携带有关锁状态的任何信息;后续调用可能会成功。

因为它支持跨多种锁模式的协同使用,所以此类不直接实现Lock或ReadWriteLock接口。但是,StampedLock可以在仅需要相关功能集的应用程序中被当做asRe​​adLock(),asWriteLock()或asReadWriteLock()。

示例用法。以下说明了维护简单二维点的类中的一些用法习惯。示例代码说明了一些try / catch约定,尽管这里没有严格要求,因为它们的主体中不会发生异常。

 class Point {
   private double x, y;
   private final StampedLock sl = new StampedLock();

   void move(double deltaX, double deltaY) { // an exclusively locked method
     long stamp = sl.writeLock();
     try {
       x += deltaX;
       y += deltaY;
     } finally {
       sl.unlockWrite(stamp);
     }
   }

   double distanceFromOrigin() { // A read-only method
     long stamp = sl.tryOptimisticRead();
     double currentX = x, currentY = y;
     if (!sl.validate(stamp)) {
        stamp = sl.readLock();
        try {
          currentX = x;
          currentY = y;
        } finally {
           sl.unlockRead(stamp);
        }
     }
     return Math.sqrt(currentX * currentX + currentY * currentY);
   }

   void moveIfAtOrigin(double newX, double newY) { // upgrade
     // Could instead start with optimistic, not read mode
     long stamp = sl.readLock();
     try {
       while (x == 0.0 && y == 0.0) {
         long ws = sl.tryConvertToWriteLock(stamp);
         if (ws != 0L) {
           stamp = ws;
           x = newX;
           y = newY;
           break;
         }
         else {
           sl.unlockRead(stamp);
           stamp = sl.writeLock();
         }
       }
     } finally {
       sl.unlock(stamp);
     }
   }
 }

2.实现说明

The design employs elements of Sequence locks (as used in 
linux kernels; see Lameter's 
http://www.lameter.com/gelato2005.pdf and elsewhere; see 
Boehm's http://www.hpl.hp.com/techreports/2012/HPL-2012-
68.html) and Ordered RW locks (see Shirako et al 
http://dl.acm.org/citation.cfm?id=2312015) 

 Conceptually, the primary state of the lock includes a 
sequence number that is odd when write-locked and even 
otherwise. However, this is offset by a reader count that is non-
zero when read-locked.  The read count is ignored when 
validating "optimistic" seqlock-reader-style stamps.  Because 
we must use a small finite number of bits (currently 7) for 
readers, a supplementary reader overflow word is used when 
the number of readers exceeds the count field. We do this by 
treating the max reader count value (RBITS) as a spinlock 
protecting overflow updates. 

 Waiters use a modified form of CLH lock used in 
AbstractQueuedSynchronizer (see its internal documentation 
for a fuller account), where each node is tagged (field mode) as 
either a reader or writer. Sets of waiting readers are grouped 
(linked) under a common node (field cowait) so act as a single 
node with respect to most CLH mechanics.  By virtue of the 
queue structure, wait nodes need not actually carry sequence 
numbers; we know each is greater than its predecessor.  This 
simplifies the scheduling policy to a mainly-FIFO scheme that 
incorporates elements of Phase-Fair locks (see Brandenburg & 
Anderson, especially http://www.cs.unc.edu/~bbb/diss/).  In 
particular, we use the phase-fair anti-barging rule: If an 
incoming reader arrives while read lock is held but there is a 
queued writer, this incoming reader is queued.  (This rule is 
responsible for some of the complexity of method acquireRead, 
but without it, the lock becomes highly unfair.) Method release 
does not (and sometimes cannot) itself wake up cowaiters. This 
is done by the primary thread, but helped by any other threads 
with nothing better to do in methods acquireRead and 
acquireWrite. 

 These rules apply to threads actually queued. All tryLock forms 
opportunistically try to acquire locks regardless of preference 
rules, and so may "barge" their way in.  Randomized spinning 
is used in the acquire methods to reduce (increasingly 
expensive) context switching while also avoiding sustained 
memory thrashing among many threads.  We limit spins to the 
head of queue. A thread spin-waits up to SPINS times (where 
each iteration decreases spin count with 50% probability) 
before blocking. If, upon wakening it fails to obtain lock, and is 
still (or becomes) the first waiting thread (which indicates that 
some other thread barged and obtained lock), it escalates spins 
(up to MAX_HEAD_SPINS) to reduce the likelihood of 
continually losing to barging threads. 

 Nearly all of these mechanics are carried out in methods 
acquireWrite and acquireRead, that, as typical of such code, 
sprawl out because actions and retries rely on consistent sets 
of locally cached reads. 

 As noted in Boehm's paper (above), sequence validation 
(mainly method validate()) requires stricter ordering rules than 
apply to normal volatile reads (of "state").  To force orderings of 
reads before a validation and the validation itself in those cases 
where this is not already forced, we use Unsafe.loadFence. 

 The memory layout keeps lock state and queue pointers 
together (normally on the same cache line). This usually works 
well for read-mostly loads. In most other cases, the natural 
tendency of adaptive-spin CLH locks to reduce memory 
contention lessens motivation to further spread out contended 
locations, but might be subject to future improvements.

该设计采用了序列锁Sequence locks的元素(在linux内核中使用;参见Lameter的http://www.lameter.com/gelato2005.pdf和其他地方;参见Boehm的http://www.hpl.hp.com/techreports/2012/ HPL-2012-68.html)和有序RW锁Ordered RW locks(参见Shirako等人http://dl.acm.org/citation.cfm?id=2312015

从概念上讲,锁的主要状态包括在写锁定时甚至是其他情况下奇数的序列号。但是,读锁定时被非0的读计数所抵消。验证“乐观”seqlock-reader-style stamps时,将忽略读计数。因为我们必须为读线程使用小的有限数量的位(当前为7位),所以当读线程的数量超过count时,将使用补充的读线程溢出字。我们通过将最大读线程count值(RBITS)视为保护溢出更新的自旋锁来实现此目的。

等待线程使用在AbstractQueuedSynchronizer中使用的CLH锁的修改形式,其中每个节点都标记为(字段模式)作为读线程或写线程。等待读线程的集合组队(链接)放在一个公共节点(字段cowait)下,因此就大多数CLH机制而言充当单个节点。凭借队列结构,等待节点实际上不需要携带序列号;我们知道每个都比它的前驱更大。这将调度策略简化为FIFO方案,该方案包含Phase-Fair锁的元素(参见Brandenburg&Anderson,尤其是http://www.cs.unc.edu/~bbb/diss/)。特别是,我们使用phase-fair不允许插队规则:如果在保持读取锁定时传入的读线程但是有一个排队的写线程,则此传入的读线程将排队。 (这个规则导致了一些方法acquireRead的复杂性,但没有它,锁将变得非常不公平。)方法release本身不会(有时不能)唤醒cowaiters。这是由主线程完成的,其他在方法acquireRead和acquireWrite中没有更好的事情可以做的任何线程可以进行协助。

这些规则适用于实际排队的线程。无论偏好规则如何,所有tryLock形式都会机会性地尝试获取锁定,因此可能会插队。在acquire方法中使用随机自旋来减少(越来越昂贵)上下文切换,同时还避免多线程间持续的内存抖动。我们将自旋限制在队列的头部。在阻塞之前,线程自旋等待SPINS次数(其中每次迭代以50%的概率减少自旋计数)。如果在唤醒后无法获得锁,并仍然(或成为)第一个等待线程(表示某个其他线程插队并获得锁),则会提升自旋次数(最多为MAX_HEAD_SPINS)以减少因为插队线程不断丢失的获取锁的可能。

几乎所有这些机制都是在acquireWrite和acquireRead方法中执行的,这些代码通常会扩散,因为操作和重试依赖于一致的本地缓存读取集。

正如Boehm的论文(上文)所述,序列验证(主要是方法validate())需要更严格的排序规则,而不是适用于普通的volatile读取(“state”)。要在验证之前强制读取顺序,并在尚未强制执行验证的情况下强制验证,我们使用Unsafe.loadFence。

内存布局将锁定状态和队列指针保持在一起(通常在同一缓存行上)。这通常适用于大多数读取加载。在大多数其他情况下,自适应自旋CLH锁定以减少内存争用的自然趋势减少了进一步扩展竞争位置的动力,但可能受到未来改进的影响。

3.源码分析

3.1 构造器

    /**
     * Creates a new lock, initially in unlocked state.
     */
    public StampedLock() {
        state = ORIGIN;
    }
    /** Head of CLH queue */
    private transient volatile WNode whead;
    /** Tail (last) of CLH queue */
    private transient volatile WNode wtail;

    // views
    transient ReadLockView readLockView;
    transient WriteLockView writeLockView;
    transient ReadWriteLockView readWriteLockView;

    /** Lock sequence/state */
    private transient volatile long state;
    /** extra reader count when state read count saturated */
    private transient int readerOverflow;
    // Values for lock state and stamp operations
    private static final long RUNIT = 1L;
    private static final long WBIT  = 1L << LG_READERS;
    private static final long RBITS = WBIT - 1L;
    private static final long RFULL = RBITS - 1L;
    private static final long ABITS = RBITS | WBIT;
    private static final long SBITS = ~RBITS; // note overlap with ABITS

    // Initial value for lock state; avoid failure value zero
    private static final long ORIGIN = WBIT << 1;

3.2 写锁

    /**
     * Exclusively acquires the lock, blocking if necessary
     * until available.
     *
     * @return a stamp that can be used to unlock or convert mode
     */
    public long writeLock() {
        long s, next;  // bypass acquireWrite in fully unlocked case only
        return ((((s = state) & ABITS) == 0L &&
                 U.compareAndSwapLong(this, STATE, s, next = s + WBIT)) ?
                next : acquireWrite(false, 0L));
    }

当完全没有加锁时,绕过acquireWrite。

    private long acquireWrite(boolean interruptible, long deadline) {
        WNode node = null, p;
        for (int spins = -1;;) { // spin while enqueuing
            long m, s, ns;
            if ((m = (s = state) & ABITS) == 0L) {
                if (U.compareAndSwapLong(this, STATE, s, ns = s + WBIT))
                    return ns;
            }
            else if (spins < 0)
                spins = (m == WBIT && wtail == whead) ? SPINS : 0;
            else if (spins > 0) {
                if (LockSupport.nextSecondarySeed() >= 0)
                    --spins;
            }
            else if ((p = wtail) == null) { // initialize queue
                WNode hd = new WNode(WMODE, null);
                if (U.compareAndSwapObject(this, WHEAD, null, hd))
                    wtail = hd;
            }
            else if (node == null)
                node = new WNode(WMODE, p);
            else if (node.prev != p)
                node.prev = p;
            else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                p.next = node;
                break;
            }
        }

        for (int spins = -1;;) {
            WNode h, np, pp; int ps;
            if ((h = whead) == p) {
                if (spins < 0)
                    spins = HEAD_SPINS;
                else if (spins < MAX_HEAD_SPINS)
                    spins <<= 1;
                for (int k = spins;;) { // spin at head
                    long s, ns;
                    if (((s = state) & ABITS) == 0L) {
                        if (U.compareAndSwapLong(this, STATE, s,
                                                 ns = s + WBIT)) {
                            whead = node;
                            node.prev = null;
                            return ns;
                        }
                    }
                    else if (LockSupport.nextSecondarySeed() >= 0 &&
                             --k <= 0)
                        break;
                }
            }
            else if (h != null) { // help release stale waiters
                WNode c; Thread w;
                while ((c = h.cowait) != null) {
                    if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                        (w = c.thread) != null)
                        U.unpark(w);
                }
            }
            if (whead == h) {
                if ((np = node.prev) != p) {
                    if (np != null)
                        (p = np).next = node;   // stale
                }
                else if ((ps = p.status) == 0)
                    U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
                else if (ps == CANCELLED) {
                    if ((pp = p.prev) != null) {
                        node.prev = pp;
                        pp.next = node;
                    }
                }
                else {
                    long time; // 0 argument to park means no timeout
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        return cancelWaiter(node, node, false);
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    node.thread = wt;
                    if (p.status < 0 && (p != h || (state & ABITS) != 0L) &&
                        whead == h && node.prev == p)
                        U.park(false, time);  // emulate LockSupport.park
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, node, true);
                }
            }
        }
    }

在该方法中复合了很多特性:

  • 1)首先是入队自旋,并放到队列尾部
  • 2)如果队列中只剩下一个结点,则在队头进一步自旋
  • 3)会帮助release队头的cowait链表
  • 4)最后会进入阻塞
    /**
     * If the lock state matches the given stamp, releases the
     * exclusive lock.
     *
     * @param stamp a stamp returned by a write-lock operation
     * @throws IllegalMonitorStateException if the stamp does
     * not match the current state of this lock
     */
    public void unlockWrite(long stamp) {
        WNode h;
        if (state != stamp || (stamp & WBIT) == 0L)
            throw new IllegalMonitorStateException();
        state = (stamp += WBIT) == 0L ? ORIGIN : stamp;
        if ((h = whead) != null && h.status != 0)
            release(h);
    }
    /**
     * Wakes up the successor of h (normally whead). This is normally
     * just h.next, but may require traversal from wtail if next
     * pointers are lagging. This may fail to wake up an acquiring
     * thread when one or more have been cancelled, but the cancel
     * methods themselves provide extra safeguards to ensure liveness.
     */
    private void release(WNode h) {
        if (h != null) {
            WNode q; Thread w;
            U.compareAndSwapInt(h, WSTATUS, WAITING, 0);
            if ((q = h.next) == null || q.status == CANCELLED) {
                for (WNode t = wtail; t != null && t != h; t = t.prev)
                    if (t.status <= 0)
                        q = t;
            }
            if (q != null && (w = q.thread) != null)
                U.unpark(w);
        }
    }

唤醒队头的后继。

3.3 读锁

    /**
     * Non-exclusively acquires the lock, blocking if necessary
     * until available.
     *
     * @return a stamp that can be used to unlock or convert mode
     */
    public long readLock() {
        long s = state, next;  // bypass acquireRead on common uncontended case
        return ((whead == wtail && (s & ABITS) < RFULL &&
                 U.compareAndSwapLong(this, STATE, s, next = s + RUNIT)) ?
                next : acquireRead(false, 0L));
    }

在无竞争的情况下直接跳过acquireRead。

    private long acquireRead(boolean interruptible, long deadline) {
        WNode node = null, p;
        for (int spins = -1;;) {
            WNode h;
            if ((h = whead) == (p = wtail)) {
                for (long m, s, ns;;) {
                    if ((m = (s = state) & ABITS) < RFULL ?
                        U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                        (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L))
                        return ns;
                    else if (m >= WBIT) {
                        if (spins > 0) {
                            if (LockSupport.nextSecondarySeed() >= 0)
                                --spins;
                        }
                        else {
                            if (spins == 0) {
                                WNode nh = whead, np = wtail;
                                if ((nh == h && np == p) || (h = nh) != (p = np))
                                    break;
                            }
                            spins = SPINS;
                        }
                    }
                }
            }
            if (p == null) { // initialize queue
                WNode hd = new WNode(WMODE, null);
                if (U.compareAndSwapObject(this, WHEAD, null, hd))
                    wtail = hd;
            }
            else if (node == null)
                node = new WNode(RMODE, p);
            else if (h == p || p.mode != RMODE) {
                if (node.prev != p)
                    node.prev = p;
                else if (U.compareAndSwapObject(this, WTAIL, p, node)) {
                    p.next = node;
                    break;
                }
            }
            else if (!U.compareAndSwapObject(p, WCOWAIT,
                                             node.cowait = p.cowait, node))
                node.cowait = null;
            else {
                for (;;) {
                    WNode pp, c; Thread w;
                    if ((h = whead) != null && (c = h.cowait) != null &&
                        U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                        (w = c.thread) != null) // help release
                        U.unpark(w);
                    if (h == (pp = p.prev) || h == p || pp == null) {
                        long m, s, ns;
                        do {
                            if ((m = (s = state) & ABITS) < RFULL ?
                                U.compareAndSwapLong(this, STATE, s,
                                                     ns = s + RUNIT) :
                                (m < WBIT &&
                                 (ns = tryIncReaderOverflow(s)) != 0L))
                                return ns;
                        } while (m < WBIT);
                    }
                    if (whead == h && p.prev == pp) {
                        long time;
                        if (pp == null || h == p || p.status > 0) {
                            node = null; // throw away
                            break;
                        }
                        if (deadline == 0L)
                            time = 0L;
                        else if ((time = deadline - System.nanoTime()) <= 0L)
                            return cancelWaiter(node, p, false);
                        Thread wt = Thread.currentThread();
                        U.putObject(wt, PARKBLOCKER, this);
                        node.thread = wt;
                        if ((h != pp || (state & ABITS) == WBIT) &&
                            whead == h && p.prev == pp)
                            U.park(false, time);
                        node.thread = null;
                        U.putObject(wt, PARKBLOCKER, null);
                        if (interruptible && Thread.interrupted())
                            return cancelWaiter(node, p, true);
                    }
                }
            }
        }

        for (int spins = -1;;) {
            WNode h, np, pp; int ps;
            if ((h = whead) == p) {
                if (spins < 0)
                    spins = HEAD_SPINS;
                else if (spins < MAX_HEAD_SPINS)
                    spins <<= 1;
                for (int k = spins;;) { // spin at head
                    long m, s, ns;
                    if ((m = (s = state) & ABITS) < RFULL ?
                        U.compareAndSwapLong(this, STATE, s, ns = s + RUNIT) :
                        (m < WBIT && (ns = tryIncReaderOverflow(s)) != 0L)) {
                        WNode c; Thread w;
                        whead = node;
                        node.prev = null;
                        while ((c = node.cowait) != null) {
                            if (U.compareAndSwapObject(node, WCOWAIT,
                                                       c, c.cowait) &&
                                (w = c.thread) != null)
                                U.unpark(w);
                        }
                        return ns;
                    }
                    else if (m >= WBIT &&
                             LockSupport.nextSecondarySeed() >= 0 && --k <= 0)
                        break;
                }
            }
            else if (h != null) {
                WNode c; Thread w;
                while ((c = h.cowait) != null) {
                    if (U.compareAndSwapObject(h, WCOWAIT, c, c.cowait) &&
                        (w = c.thread) != null)
                        U.unpark(w);
                }
            }
            if (whead == h) {
                if ((np = node.prev) != p) {
                    if (np != null)
                        (p = np).next = node;   // stale
                }
                else if ((ps = p.status) == 0)
                    U.compareAndSwapInt(p, WSTATUS, 0, WAITING);
                else if (ps == CANCELLED) {
                    if ((pp = p.prev) != null) {
                        node.prev = pp;
                        pp.next = node;
                    }
                }
                else {
                    long time;
                    if (deadline == 0L)
                        time = 0L;
                    else if ((time = deadline - System.nanoTime()) <= 0L)
                        return cancelWaiter(node, node, false);
                    Thread wt = Thread.currentThread();
                    U.putObject(wt, PARKBLOCKER, this);
                    node.thread = wt;
                    if (p.status < 0 &&
                        (p != h || (state & ABITS) == WBIT) &&
                        whead == h && node.prev == p)
                        U.park(false, time);
                    node.thread = null;
                    U.putObject(wt, PARKBLOCKER, null);
                    if (interruptible && Thread.interrupted())
                        return cancelWaiter(node, node, true);
                }
            }
        }
    }

同理,acquireRead复合了很多机制:

  • 1)首先是入队自旋,如果队尾不是读模式则放到队列尾部,如果是读模式,则放到队尾的cowait中。
  • 2)如果队列中只剩下一个结点,则在队头进一步自旋
  • 3)会帮助release队头的cowait链表
  • 4)最后会进入阻塞
    /**
     * If the lock state matches the given stamp, releases the
     * non-exclusive lock.
     *
     * @param stamp a stamp returned by a read-lock operation
     * @throws IllegalMonitorStateException if the stamp does
     * not match the current state of this lock
     */
    public void unlockRead(long stamp) {
        long s, m; WNode h;
        for (;;) {
            if (((s = state) & SBITS) != (stamp & SBITS) ||
                (stamp & ABITS) == 0L || (m = s & ABITS) == 0L || m == WBIT)
                throw new IllegalMonitorStateException();
            if (m < RFULL) {
                if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                    if (m == RUNIT && (h = whead) != null && h.status != 0)
                        release(h);
                    break;
                }
            }
            else if (tryDecReaderOverflow(s) != 0L)
                break;
        }
    }

3.4 乐观读

    /**
     * Returns a stamp that can later be validated, or zero
     * if exclusively locked.
     *
     * @return a stamp, or zero if exclusively locked
     */
    public long tryOptimisticRead() {
        long s;
        return (((s = state) & WBIT) == 0L) ? (s & SBITS) : 0L;
    }

如果是被写锁独占,则返回0。

    /**
     * Returns true if the lock has not been exclusively acquired
     * since issuance of the given stamp. Always returns false if the
     * stamp is zero. Always returns true if the stamp represents a
     * currently held lock. Invoking this method with a value not
     * obtained from {@link #tryOptimisticRead} or a locking method
     * for this lock has no defined effect or result.
     *
     * @param stamp a stamp
     * @return {@code true} if the lock has not been exclusively acquired
     * since issuance of the given stamp; else false
     */
    public boolean validate(long stamp) {
        U.loadFence();
        return (stamp & SBITS) == (state & SBITS);
    }

SBITS只关心state有没有被写锁独占获取,如果没有,则返回true。(SBITS低位全为0,只获取高位值。低位为读锁标记,高位为写锁标记。)

3.5 锁转换

    /**
     * If the lock state matches the given stamp, performs one of
     * the following actions. If the stamp represents holding a write
     * lock, returns it.  Or, if a read lock, if the write lock is
     * available, releases the read lock and returns a write stamp.
     * Or, if an optimistic read, returns a write stamp only if
     * immediately available. This method returns zero in all other
     * cases.
     *
     * @param stamp a stamp
     * @return a valid write stamp, or zero on failure
     */
    public long tryConvertToWriteLock(long stamp) {
        long a = stamp & ABITS, m, s, next;
        while (((s = state) & SBITS) == (stamp & SBITS)) {
            if ((m = s & ABITS) == 0L) {
                if (a != 0L)
                    break;
                if (U.compareAndSwapLong(this, STATE, s, next = s + WBIT))
                    return next;
            }
            else if (m == WBIT) {
                if (a != m)
                    break;
                return stamp;
            }
            else if (m == RUNIT && a != 0L) {
                if (U.compareAndSwapLong(this, STATE, s,
                                         next = s - RUNIT + WBIT))
                    return next;
            }
            else
                break;
        }
        return 0L;
    }

方法tryConvertToWriteLock(long)尝试“升级”模式并返回一个有效的写stamp,如果

  • 1)已经处于写模式
  • 2)处于读模式并且没有其他读线程,则释放读锁
  • 3)处于乐观模式并且锁是可用的

其他情况返回0。

tryConvertToReadLock、tryConvertToOptimisticRead类似。

   /**
     * If the lock state matches the given stamp, releases the
     * corresponding mode of the lock.
     *
     * @param stamp a stamp returned by a lock operation
     * @throws IllegalMonitorStateException if the stamp does
     * not match the current state of this lock
     */
    public void unlock(long stamp) {
        long a = stamp & ABITS, m, s; WNode h;
        while (((s = state) & SBITS) == (stamp & SBITS)) {
            if ((m = s & ABITS) == 0L)
                break;
            else if (m == WBIT) {
                if (a != m)
                    break;
                state = (s += WBIT) == 0L ? ORIGIN : s;
                if ((h = whead) != null && h.status != 0)
                    release(h);
                return;
            }
            else if (a == 0L || a >= WBIT)
                break;
            else if (m < RFULL) {
                if (U.compareAndSwapLong(this, STATE, s, s - RUNIT)) {
                    if (m == RUNIT && (h = whead) != null && h.status != 0)
                        release(h);
                    return;
                }
            }
            else if (tryDecReaderOverflow(s) != 0L)
                return;
        }
        throw new IllegalMonitorStateException();
    }

4.总结

StampedLock的等待队列与RRW的CLH队列相比,有以下特点:

  • 当入队一个线程时,如果队尾是读结点,不会直接链接到队尾,而是链接到该读结点的cowait链中,cowait链本质是一个栈;
  • 当入队一个线程时,如果队尾是写结点,则直接链接到队尾;
  • QS类似唤醒线程的规则和A,都是首先唤醒队首结点。区别是StampedLock中,当唤醒的结点是读结点时,会唤醒该读结点的cowait链中的所有读结点(顺序和入栈顺序相反,也就是后进先出)。
  • 另外,StampedLock使用时要特别小心,避免锁重入的操作,在使用乐观读锁时也需要遵循相应的调用模板,防止出现数据不一致的问题。


参考

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,039评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,426评论 3 395
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,417评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,868评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,892评论 6 392
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,692评论 1 305
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,416评论 3 419
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,326评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,782评论 1 316
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,957评论 3 337
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,102评论 1 350
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,790评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,442评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,996评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,113评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,332评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,044评论 2 355

推荐阅读更多精彩内容