最近做的一个小项目中有这样的需求: 整个项目有一份config.json
保存着项目的一些配置,是存储在本地文件的一个资源,并且应用中存在读写(读>>写)更新问题。既然读写并发操作,那么就涉及到操作互斥,这里自然想到了读写锁,也顺便对自己读写锁方面的知识做个梳理。
为什么需要读写锁? 与传统锁不同的是读写锁的规则是可以共享读,但只能一个写,总结起来为:读读不互斥,读写互斥,写写互斥
,而一般的独占锁是:读读互斥,读写互斥,写写互斥
,而场景中往往读远远大于写 ,读写锁就是为了这种优化而创建出来的一种机制。 注意是读远远大于写
,一般情况下独占锁的效率低来源于高并发下对临界区的激烈竞争导致线程上下文切换。因此当并发不是很高的情况下,读写锁由于需要额外维护读锁的状态,可能还不如独占锁的效率高。因此需要根据实际情况选择使用。
一个简单的读写锁实现 根据上面理论可以利用两个int变量来简单实现一个读写锁,实现虽然烂,但是原理都是差不多的,值得阅读下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 public class ReadWriteLock { private int readCount = 0 ; private int writeCount = 0 ; public synchronized void lockRead () throws InterruptedException { while (writeCount > 0 ) { wait(); } readCount++; } public synchronized void unlockRead () { readCount--; notifyAll(); } public synchronized void lockWrite () throws InterruptedException { while (writeCount > 0 ) { wait(); } writeCount++; while (readCount > 0 ) { wait(); } } public synchronized void unlockWrite () { writeCount--; notifyAll(); } }
ReadWriteLock的实现原理 在Java中ReadWriteLock
的主要实现为ReentrantReadWriteLock
,其提供了以下特性:
公平性选择:支持公平与非公平(默认)的锁获取方式,吞吐量非公平优先于公平。
可重入:读线程获取读锁之后可以再次获取读锁,写线程获取写锁之后可以再次获取写锁
可降级:写线程获取写锁之后,其还可以再次获取读锁,然后释放掉写锁,那么此时该线程是读锁状态,也就是降级操作。
ReentrantReadWriteLock的结构 ReentrantReadWriteLock
的核心是由一个基于AQS的同步器Sync
构成,然后由其扩展出ReadLock
(共享锁),WriteLock
(排它锁)所组成。
并且从ReentrantReadWriteLock
的构造函数中可以发现ReadLock
与WriteLock
使用的是同一个Sync,具体怎么实现同一个队列既可以为共享锁,又可以表示排他锁下文会具体分析。
清单一:ReentrantReadWriteLock构造函数
1 2 3 4 5 public ReentrantReadWriteLock (boolean fair) { sync = fair ? new FairSync () : new NonfairSync (); readerLock = new ReadLock (this ); writerLock = new WriteLock (this ); }
Sync的实现 sync
是读写锁实现的核心,sync
是基于AQS实现的,在AQS中核心是state字段和双端队列,那么一个一个问题来分析。
Sync是如何同时表示读锁与写锁? 清单2:读写锁状态获取
1 2 3 4 5 6 7 8 9 static final int SHARED_SHIFT = 16 ;static final int SHARED_UNIT = (1 << SHARED_SHIFT);static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1 ;static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1 ;static int sharedCount (int c) { return c >>> SHARED_SHIFT; }static int exclusiveCount (int c) { return c & EXCLUSIVE_MASK; }
从代码中获取读写状态可以看出其是把state(int32位)
字段分成高16位与低16位,其中高16位表示读锁个数,低16位表示写锁个数,如下图所示(图来自Java并发编程艺术 )。 该图表示当前一个线程获取到了写锁,并且重入了两次,因此低16位是3,并且该线程又获取了读锁,并且重入了一次,所以高16位是2,当写锁被获取时如果读锁不为0那么读锁一定是获取写锁的这个线程。
读锁的获取 读锁的获取主要实现是AQS中的acquireShared
方法,其调用过程如下代码。
清单3:读锁获取入口
1 2 3 4 5 6 7 8 9 public void lock () { sync.acquireShared(1 ); } public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); }
其中doAcquireShared(arg)
方法是获取失败之后AQS中入队操作,等待被唤醒后重新获取,那么关键点就是tryAcquireShared(arg)
方法,方法有点长,因此先总结出获取读锁所经历的步骤,获取的第一部分步骤如下:
操作1:读写需要互斥,因此当存在写锁并且持有写锁的线程不是该线程时获取失败。
操作2:是否存在等待写锁的线程,存在的话则获取读锁需要等待,避免写锁饥饿。(写锁优先级是比较高的)
操作3:CAS获取读锁,实际上是state字段的高16位自增。
操作4:获取成功后再ThreadLocal中记录当前线程获取读锁的次数。
清单4:读锁获取的第一部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 protected final int tryAcquireShared (int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1 ; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; } return 1 ; } return fullTryAcquireShared(current); }
当操作2,操作3失败时会执行fullTryAcquireShared(current)
,为什么会这样写呢?个人认为是一种补偿操作,操作2与操作3失败并不代表当前线程没有读锁的资格 ,并且这里的读锁是共享锁,有资格就应该被获取成功,因此给予补偿获取读锁的操作。在fullTryAcquireShared(current)
中是一个循环获取读锁的过程,大致步骤如下:
操作5:等同于操作2,存在写锁,且写锁线程并非当前线程则直接返回失败
操作6:当前线程是重入读锁,这里只会偏向第一个获取读锁的线程以及最后一个获取读锁的线程,其他都需要去AQS中排队。
操作7:CAS改变读锁状态
操作8:同操作4,获取成功后再ThreadLocal中记录当前线程获取读锁的次数。
清单5:读锁获取的第二部分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 final int fullTryAcquireShared (Thread current) { HoldCounter rh = null ; for (;;) { int c = getState(); if (exclusiveCount(c) != 0 ) { if (getExclusiveOwnerThread() != current) return -1 ; } else if (readerShouldBlock()) { if (firstReader == current) { } else { if (rh == null ) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0 ) readHolds.remove(); } } if (rh.count == 0 ) return -1 ; } } if (sharedCount(c) == MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null ) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; } return 1 ; } } }
读锁的释放 清单6:读锁释放入口
1 2 3 4 5 6 7 8 9 10 11 12 public void unlock () { sync.releaseShared(1 ); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; }
读锁的释放主要是tryReleaseShared(arg)
函数,因此拆解其步骤如下:
操作1:清理ThreadLocal中保存的获取锁数量信息
操作2:CAS修改读锁个数,实际上是自减一
清单7:读锁的释放流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 protected final boolean tryReleaseShared (int unused) { Thread current = Thread.currentThread(); if (firstReader == current) {; if (firstReaderHoldCount == 1 ) firstReader = null ; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1 ) { readHolds.remove(); if (count <= 0 ) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0 ; } }
写锁的获取 清单8:写锁的获取入口
1 2 3 4 5 6 7 8 9 10 11 public void lock () { sync.acquire(1 ); } public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
写锁的获取也主要是tryAcquire(arg)
方法,这里也拆解步骤:
操作1:如果读锁数量不为0或者写锁数量不为0,并且不是重入操作,则获取失败。
操作2:如果当前锁的数量为0,也就是不存在操作1的情况,那么该线程是有资格获取到写锁,因此修改状态,设置独占线程为当前线程
清单9:写锁的获取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 protected final boolean tryAcquire (int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0 ) { if (w == 0 || current != getExclusiveOwnerThread()) return false ; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); setState(c + acquires); return true ; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false ; setExclusiveOwnerThread(current); return true ; }
写锁的释放 清单10:写锁的释放入口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public void unlock () { sync.release(1 ); } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; }
写锁的释放主要是tryRelease(arg)
方法,其逻辑就比较简单了,注释很详细。清单11:写锁的释放
1 2 3 4 5 6 7 8 9 10 11 12 13 14 protected final boolean tryRelease (int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0 ; if (free) setExclusiveOwnerThread(null ); setState(nextc); return free; }
一些其他问题 锁降级操作哪里体现? 锁降级操作指的是一个线程获取写锁之后再获取读锁,然后读锁释放掉写锁的过程。在tryAcquireShared(arg)
获取读锁的代码中有如下代码。清单12:写锁降级策略
1 2 3 4 5 6 7 8 Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1 ; 。。。。。读锁获取。。。。。
那么锁降级有什么用?答案是为了可见性的保证。在ReentrantReadWriteLock
的javadoc中有如下代码,其是锁降级的一个应用示例。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock (); void processCachedData () { rwl.readLock().lock(); if (!cacheValid) { rwl.readLock().unlock(); rwl.writeLock().lock(); try { if (!cacheValid) { data = ... cacheValid = true ; } rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); } } try { use(data); } finally { rwl.readLock().unlock(); } } }}
公平与非公平的区别 清单13:公平下的Sync
1 2 3 4 5 6 7 8 9 static final class FairSync extends Sync { private static final long serialVersionUID = -2274990926593161451L ; final boolean writerShouldBlock () { return hasQueuedPredecessors(); } final boolean readerShouldBlock () { return hasQueuedPredecessors(); } }
公平下的Sync实现策略是所有获取的读锁或者写锁的线程都需要入队排队,按照顺序依次去尝试获取锁。
清单14:非公平下的Sync
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 static final class NonfairSync extends Sync { private static final long serialVersionUID = -8159625535654395037L ; final boolean writerShouldBlock () { return false ; } final boolean readerShouldBlock () { return apparentlyFirstQueuedIsExclusive(); } }
非公平下由于抢占式获取锁,写锁是可能产生饥饿,因此解决办法就是提高写锁的优先级,换句话说获取写锁之前先占坑。