虚假唤醒:
由于莫名其妙的原因,线程有可能在没有调用过notify()和notifyAll()的情况下醒来。这就是所谓的假唤醒(spurious wakeups)
我的demo测试,确实存在虚假唤醒,导致结果不一致,如图将while自旋换成if判断后 输出的count值可能到不了10000,需要多测试几遍。
我的理解,假如一个线程进入if条件后进行wait()释放锁,此时有别的线程在执行++count,此时刚好发生虚唤醒(别问我怎么会发生,高并发就是这么巧,自己测试)那么就会执行下面的语句,也进行++count,其实相当于两个线程看到的比如都是77 ++后只变到了78,所以就导致了错误的结果发生,自旋锁while怎么避免呢,由于是while循环,即使被虚唤醒,那么该线程的代码还是得执行条件判断,就又进入了wait状态(因为即使发生虚唤醒事件,条件变量isLocked不可能变成false)所以解决了这个问题。但是自旋锁是while循环,需要耗费cpu资源的。
完整测试代码
package com.alibaba.otter.canal.common;
import java.util.concurrent.CountDownLatch;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@SuppressWarnings("restriction")
public class LockTest extends AbstractZkTest {
private Object obj = new Object();
private int count = 0;
@Before
public void setUp() {
}
@After
public void tearDown() {
}
@Test
public void testUnsafe() {
CountDownLatch latch=new CountDownLatch(10000);
for (int i = 0; i < 10000; i++) {
Worker worker = new Worker(latch);
worker.start();
}
try {
latch.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public int inc(){
synchronized(this){
System.out.println(count);
return ++count;
}
}
private Lock lock = new Lock();
public int lockInc() {
try {
lock.lock();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
int newCount = ++count;
System.out.println(count);
lock.unlock();
return newCount;
}
public class Lock{
private boolean isLocked = false;
public synchronized void lock()
throws InterruptedException{
// while(isLocked){
if(isLocked){
wait();
}
isLocked = true;
}
public synchronized void unlock(){
isLocked = false;
notify();
}
}
class Worker extends Thread {
private CountDownLatch latch;
public Worker(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
// inc();
lockInc();
latch.countDown();
}
}
}
实现可重入锁
概念:可重入其实是同步代码段中 发现是本线程 则不需要再wait,直接可以执行 另一个同步代码块。java的synchronized同步是可重入的 例如
public synchronized outer(){
inner();
}
public synchronized inner(){
//do something
}
当线程获得锁 进入outer同步块后 需要执行inner 另一个同步块,按理说此时是所有线程都去抢占inner代码块的锁,但是可重入的话 获得锁的线程直接可以执行inner语句
如果用lock代替synchronized的话一定要注意处理可重入性,避免死锁。主要就是通过记录是不是自己获得了锁,并且锁了几次,释放锁的时候对应的将次数减少即可。这里附上完整的测试代码
package com.alibaba.otter.canal.common;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@SuppressWarnings("restriction")
public class LockTest extends AbstractZkTest {
private Object obj = new Object();
private int count = 0;
@Before
public void setUp() {
}
@After
public void tearDown() {
}
@Test
public void testUnsafe() {
CountDownLatch latch=new CountDownLatch(10000);
for (int i = 0; i < 10000; i++) {
Worker worker = new Worker(latch);
worker.start();
}
try {
latch.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public int inc(){
synchronized(this){
System.out.println(count);
return ++count;
}
}
private Lock lock = new Lock();
public int lockInc() {
int newCount=count;
try {
lock.lock();
newCount = ++count;
System.out.println(count);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
lock.unlock();
}
return newCount;
}
public class Lock{
private boolean isLocked = false;
public synchronized void lock()
throws InterruptedException{
while(isLocked){
// if(isLocked){
wait();
}
isLocked = true;
}
public synchronized void unlock(){
isLocked = false;
notify();
}
}
class Worker extends Thread {
private CountDownLatch latch;
public Worker(CountDownLatch latch) {
this.latch = latch;
}
public void run() {
// inc();
// lockInc();
try {
// reentrantOuter();//可重入 synchronized方式
// unReentrantOuter(); //lock未处理是否自己锁 产生的是不可重入锁,导致死锁
reentrantLockOuter();//lock方式实现可重入锁
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
latch.countDown();
}
}
/**
*
* 可重入锁的测试
*/
@Test
public void testReentrant() {
CountDownLatch latch=new CountDownLatch(2);
for (int i = 0; i < 2; i++) {
Worker worker = new Worker(latch);
worker.start();
}
try {
latch.await();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public synchronized void reentrantOuter() throws InterruptedException{
System.out.println("reentrantOuter1");
reentrantInner();
}
public synchronized void reentrantInner() throws InterruptedException{
Thread.currentThread().sleep(10);
System.out.println("reentrantInner2");
}
public void unReentrantOuter() throws InterruptedException{
try {
lock.lock();
System.out.println("unReentrantouter1");
unReentrantInner();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public synchronized void unReentrantInner() {
try {
lock.lock();
System.out.println("unReentrantInner2");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void reentrantLockOuter() {
try {
reentrantLock.lock();
System.out.println("unReentrantouter1");
reentrantLockInner();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
public synchronized void reentrantLockInner() throws InterruptedException{
try {
reentrantLock.lock();
System.out.println("unReentrantInner2");
Thread.currentThread().sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
reentrantLock.unlock();
}
}
ReentrantLock reentrantLock = new ReentrantLock();
class ReentrantLock{
boolean isLocked = false;
Thread lockedBy = null;
int lockedCount = 0;
public synchronized void lock()
throws InterruptedException{
Thread callingThread = Thread.currentThread();
while(isLocked && lockedBy != callingThread){
wait();
}
isLocked = true;
lockedCount++;
lockedBy = callingThread;
}
public synchronized void unlock(){
if(Thread.currentThread() ==this.lockedBy){
lockedCount--;
if(lockedCount == 0){
isLocked = false;
notify();
}
}
}
}
}
上述锁都是非公平的锁,即先来的请求不一定是先处理,这样的话就会导致有的线程可能很久得不到锁(不要问为什么,并发大的话就是可能发生),这样的话有些问题。我们基于此实现各公平的锁。主要思路是 来的请求线程放到列表中,然后 notify的时候调用列表第一个的notify,即通知唤醒先来的请求线程即可。附上完整测试代码。
package com.alibaba.otter.canal.common;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@SuppressWarnings("restriction")
public class FairLockTest extends AbstractZkTest {
private Object obj = new Object();
private int count = 0;
@Before
public void setUp() {
}
@After
public void tearDown() {
}
@Test
public void testUnsafe() {
CountDownLatch latch = new CountDownLatch(10000);
for (int i = 0; i < 10000; i++) {
Worker worker = new Worker(latch, i);
worker.start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
FairLock fairLock = new FairLock();
public int fairLockInc() {
int newCount = count;
try {
fairLock.lock();
newCount = ++count;
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} finally {
fairLock.unlock();
}
return newCount;
}
class Worker extends Thread {
private CountDownLatch latch;
public Worker(CountDownLatch latch, int i) {
this.latch = latch;
this.setName(i + " thread");
}
public void run() {
try {
fairLockInc();
} catch (Exception e) {
e.printStackTrace();
}
latch.countDown();
}
}
class FairLock {
private boolean isLocked = false;
private Thread lockingThread = null;
private List<QueueObject> waitingThreads = new ArrayList<QueueObject>();
public void lock() throws InterruptedException {
QueueObject queueObject = new QueueObject();
boolean isLockedForThisThread = true;
synchronized (this) {
System.out.println(Thread.currentThread().getName() + " in");
waitingThreads.add(queueObject);
}
while (isLockedForThisThread) {
synchronized (this) {
isLockedForThisThread = isLocked || waitingThreads.get(0) != queueObject;
if (!isLockedForThisThread) {
isLocked = true;
System.out.println(Thread.currentThread().getName()
+ " out");
waitingThreads.remove(queueObject);
lockingThread = Thread.currentThread();
return;
}
}
try {
queueObject.doWait();
} catch (InterruptedException e) {
synchronized (this) {
System.out.println(Thread.currentThread().getName()
+ " out");
waitingThreads.remove(queueObject);
}
throw e;
}
}
}
public synchronized void unlock() {
if (this.lockingThread != Thread.currentThread()) {
throw new IllegalMonitorStateException(
"Calling thread has not locked this lock");
}
isLocked = false;
lockingThread = null;
if (waitingThreads.size() > 0) {
waitingThreads.get(0).doNotify();
}
}
}
class QueueObject {
private boolean isNotified = false;
public synchronized void doWait() throws InterruptedException {
while (!isNotified) {
this.wait();
}
this.isNotified = false;
}
public synchronized void doNotify() {
this.isNotified = true;
this.notify();
}
public boolean equals(Object o) {
return this == o;
}
}
}
其实我们基于此还可以实现更多的锁,可以实现基于优先级的锁,主要实现思路就是创建线程的时候传入优先级参数,然后我们可以在入等待列表的时候对比传入的优先级参数进行比较大小,找到插入的位置即可,当然方法不止这一个,也可以是通知的时候选取优先级最大的通知。我觉得基于此我们可以把juc的所有类都可以实现。
上面都是基于wait/notify/notifyAll来同步的。wait/notify机制有个很蛋疼的地方是,比如线程B要用notify通知线程A,那么线程B要确保线程A已经在wait调用上等待了,否则线程A可能永远都在等待。编程的时候就会很蛋疼。另外,是调用notify,还是notifyAll?notify只会唤醒一个线程,如果错误地有两个线程在同一个对象上wait等待,那么又悲剧了。为了安全起见,貌似只能调用notifyAll了
看一看 java.util.concurrent.locks对wait/notify/notifyAll的代替 怎么实现的各种锁
这里涉及到一个基础类 也是基于Unsafe 类实现的。
给出官方api的翻译版
用来创建锁和其他同步类的基本线程阻塞原语。
此类以及每个使用它的线程与一个许可关联(从 Semaphore 类的意义上说)。如果该许可可用,并且可在进程中使用,则调用 park 将立即返回;否则可能 阻塞。如果许可尚不可用,则可以调用 unpark 使其可用。(但与 Semaphore 不同的是,许可不能累积,并且最多只能有一个许可。)
park 和 unpark 方法提供了阻塞和解除阻塞线程的有效方法,并且不会遇到导致过时方法 Thread.suspend 和 Thread.resume 因为以下目的变得不可用的问题:由于许可的存在,调用 park 的线程和另一个试图将其 unpark 的线程之间的竞争将保持活性。此外,如果调用者线程被中断,并且支持超时,则 park 将返回。park 方法还可以在其他任何时间“毫无理由”地返回,因此通常必须在重新检查返回条件的循环里调用此方法。从这个意义上说,park 是“忙碌等待”的一种优化,它不会浪费这么多的时间进行自旋,但是必须将它与 unpark 配对使用才更高效。
三种形式的 park 还各自支持一个 blocker 对象参数。此对象在线程受阻塞时被记录,以允许监视工具和诊断工具确定线程受阻塞的原因。(这样的工具可以使用方法 getBlocker(java.lang.Thread) 访问 blocker。)建议最好使用这些形式,而不是不带此参数的原始形式。在锁实现中提供的作为 blocker 的普通参数是 this。
这些方法被设计用来作为创建高级同步实用工具的工具,对于大多数并发控制应用程序而言,它们本身并不是很有用。park 方法仅设计用于以下形式的构造:
while (!canProceed()) { ... LockSupport.park(this); }在这里,在调用 park 之前,canProceed 和其他任何动作都不会锁定或阻塞。因为每个线程只与一个许可关联,park 的任何中间使用都可能干扰其预期效果。
示例用法。 以下是一个先进先出 (first-in-first-out) 非重入锁类的框架。
class FIFOMutex {
private final AtomicBoolean locked = new AtomicBoolean(false);
private final Queue<Thread> waiters
= new ConcurrentLinkedQueue<Thread>();
public void lock() {
boolean wasInterrupted = false;
Thread current = Thread.currentThread();
waiters.add(current);
// Block while not first in queue or cannot acquire lock
while (waiters.peek() != current ||
!locked.compareAndSet(false, true)) {
LockSupport.park(this);
if (Thread.interrupted()) // ignore interrupts while waiting
wasInterrupted = true;
}
waiters.remove();
if (wasInterrupted) // reassert interrupt status on exit
current.interrupt();
}
public void unlock() {
locked.set(false);
LockSupport.unpark(waiters.peek());
}
}
这里写了个测试类,附上源码(有大概注释)
package com.alibaba.otter.canal.common;
import java.util.concurrent.locks.LockSupport;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class LockSupportTest extends AbstractZkTest {
@Before
public void setUp() {
}
@After
public void tearDown() {
}
@Test
public void testLockSupport() {
LockSupport.park();
System.out.println("block.");//阻塞在这里证明 默认许可时不可用的
}
@Test
public void testUnpark() {
Thread thread = Thread.currentThread();
LockSupport.unpark(thread);//释放许可
LockSupport.park();// 获取许可
System.out.println("b");//正常执行 一对一使用
}
@Test
public void testReentrantUnpark() {
Thread thread = Thread.currentThread();
LockSupport.unpark(thread);
System.out.println("a");
LockSupport.park();
System.out.println("b");
LockSupport.park();
System.out.println("c");//阻塞在这里 ,说明非可重入的
}
@Test
public void testInterrupt() throws Exception {
Thread t = new Thread(new Runnable()
{
private int count = 0;
@Override
public void run()
{
long start = System.currentTimeMillis();
long end = 0;
while ((end - start) <= 1000)
{
count++;
end = System.currentTimeMillis();
}
System.out.println("after 1 second.count=" + count);
//等待或许许可
LockSupport.park();
System.out.println("thread over." + Thread.currentThread().isInterrupted());
}
});
t.start();
Thread.sleep(2000);
// 中断线程
t.interrupt(); //不会抛出InterruptException 不影响主线程
System.out.println("main over");
}
}