自旋锁在高并发的用处,实现各种锁

虚假唤醒:
由于莫名其妙的原因,线程有可能在没有调用过notify()和notifyAll()的情况下醒来。这就是所谓的假唤醒(spurious wakeups)

.wikipedia的描述.png

我的demo测试,确实存在虚假唤醒,导致结果不一致,如图将while自旋换成if判断后 输出的count值可能到不了10000,需要多测试几遍。
我的理解,假如一个线程进入if条件后进行wait()释放锁,此时有别的线程在执行++count,此时刚好发生虚唤醒(别问我怎么会发生,高并发就是这么巧,自己测试)那么就会执行下面的语句,也进行++count,其实相当于两个线程看到的比如都是77 ++后只变到了78,所以就导致了错误的结果发生,自旋锁while怎么避免呢,由于是while循环,即使被虚唤醒,那么该线程的代码还是得执行条件判断,就又进入了wait状态(因为即使发生虚唤醒事件,条件变量isLocked不可能变成false)所以解决了这个问题。但是自旋锁是while循环,需要耗费cpu资源的。


修改测试的地方

完整测试代码

package com.alibaba.otter.canal.common;
import java.util.concurrent.CountDownLatch;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@SuppressWarnings("restriction")
public class LockTest extends AbstractZkTest {
    private Object obj = new Object();
    private int count = 0;
    @Before
    public void setUp() {
        
    }

    @After
    public void tearDown() {
    }

    @Test
    public void testUnsafe() {
        CountDownLatch latch=new CountDownLatch(10000);
        for (int i = 0; i < 10000; i++) {
            Worker worker  = new Worker(latch);
            worker.start();
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
    }
    public int inc(){
        synchronized(this){
            System.out.println(count);
            return ++count;
            
        }
    }

    private Lock lock = new Lock();
    public int lockInc() {
        try {
            lock.lock();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        int newCount = ++count;
        System.out.println(count);
        lock.unlock();
        return newCount;
    }
    public class Lock{
        private boolean isLocked = false;

        public synchronized void lock()
            throws InterruptedException{
//          while(isLocked){
            if(isLocked){
                wait();
            }
            isLocked = true;
        }

        public synchronized void unlock(){
            isLocked = false;
            notify();
        }
    }
    class Worker extends Thread {
        private CountDownLatch latch;
        public Worker(CountDownLatch latch) {
            this.latch = latch;
        }
        public void run() {
//          inc();
            lockInc();
            latch.countDown();
        }
    }
}

实现可重入锁
概念:可重入其实是同步代码段中 发现是本线程 则不需要再wait,直接可以执行 另一个同步代码块。java的synchronized同步是可重入的 例如

public synchronized outer(){
    inner();
}

public synchronized inner(){
    //do something
}

当线程获得锁 进入outer同步块后 需要执行inner 另一个同步块,按理说此时是所有线程都去抢占inner代码块的锁,但是可重入的话 获得锁的线程直接可以执行inner语句
如果用lock代替synchronized的话一定要注意处理可重入性,避免死锁。主要就是通过记录是不是自己获得了锁,并且锁了几次,释放锁的时候对应的将次数减少即可。这里附上完整的测试代码

package com.alibaba.otter.canal.common;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@SuppressWarnings("restriction")
public class LockTest extends AbstractZkTest {
    private Object obj = new Object();
    private int count = 0;
    @Before
    public void setUp() {
        
    }

    @After
    public void tearDown() {
    }

    @Test
    public void testUnsafe() {
        CountDownLatch latch=new CountDownLatch(10000);
        for (int i = 0; i < 10000; i++) {
            Worker worker  = new Worker(latch);
            worker.start();
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
    }
    public int inc(){
        synchronized(this){
            System.out.println(count);
            return ++count;
            
        }
    }

    private Lock lock = new Lock();
    public int lockInc() {
        int newCount=count;
        try {
            lock.lock();
            newCount = ++count;
            System.out.println(count);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
        
        return newCount;
    }
    public class Lock{
        private boolean isLocked = false;

        public synchronized void lock()
            throws InterruptedException{
            while(isLocked){
            
//          if(isLocked){
                
                wait();
            }
            isLocked = true;
        }

        public synchronized void unlock(){
            isLocked = false;
            notify();
        }
    }
    class Worker extends Thread {
        private CountDownLatch latch;
        public Worker(CountDownLatch latch) {
            this.latch = latch;
        }
        public void run() {
//          inc();
//          lockInc();
            try {
//              reentrantOuter();//可重入 synchronized方式
//              unReentrantOuter(); //lock未处理是否自己锁 产生的是不可重入锁,导致死锁
                reentrantLockOuter();//lock方式实现可重入锁
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            latch.countDown();
        }
    }
    

    /**
     * 
     * 可重入锁的测试
     */
    @Test
    public void testReentrant() {
        CountDownLatch latch=new CountDownLatch(2);
        for (int i = 0; i < 2; i++) {
            Worker worker  = new Worker(latch);
            worker.start();
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        
    }
    public synchronized void reentrantOuter() throws InterruptedException{
        System.out.println("reentrantOuter1");
        reentrantInner();
    }
    
    public synchronized void reentrantInner() throws InterruptedException{
        Thread.currentThread().sleep(10);
        System.out.println("reentrantInner2");
    }
    
    
    public void unReentrantOuter() throws InterruptedException{
        try {
            lock.lock();
            System.out.println("unReentrantouter1");
            unReentrantInner();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public synchronized void unReentrantInner() {
        try {
            lock.lock();
            System.out.println("unReentrantInner2");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    
    public void reentrantLockOuter() {
        try {
            reentrantLock.lock();
            System.out.println("unReentrantouter1");
            reentrantLockInner();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            reentrantLock.unlock();
        }
    }

    public synchronized void reentrantLockInner() throws InterruptedException{
        try {   
            reentrantLock.lock();
            System.out.println("unReentrantInner2");
            Thread.currentThread().sleep(10);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            reentrantLock.unlock();
        }
    }
    
    ReentrantLock reentrantLock = new ReentrantLock();
    class ReentrantLock{
        boolean isLocked = false;
        Thread  lockedBy = null;
        int lockedCount = 0;

        public synchronized void lock()
            throws InterruptedException{
            Thread callingThread = Thread.currentThread();
            while(isLocked && lockedBy != callingThread){
                wait();
            }
            isLocked = true;
            lockedCount++;
            lockedBy = callingThread;
      }

        public synchronized void unlock(){
            if(Thread.currentThread() ==this.lockedBy){
                lockedCount--;
                if(lockedCount == 0){
                    isLocked = false;
                    notify();
                }
            }
        }
    }
}

上述锁都是非公平的锁,即先来的请求不一定是先处理,这样的话就会导致有的线程可能很久得不到锁(不要问为什么,并发大的话就是可能发生),这样的话有些问题。我们基于此实现各公平的锁。主要思路是 来的请求线程放到列表中,然后 notify的时候调用列表第一个的notify,即通知唤醒先来的请求线程即可。附上完整测试代码。

package com.alibaba.otter.canal.common;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@SuppressWarnings("restriction")
public class FairLockTest extends AbstractZkTest {
    private Object obj = new Object();
    private int count = 0;
    @Before
    public void setUp() {

    }

    @After
    public void tearDown() {
    }

    @Test
    public void testUnsafe() {
        CountDownLatch latch = new CountDownLatch(10000);
        for (int i = 0; i < 10000; i++) {
            Worker worker = new Worker(latch, i);
            worker.start();
        }
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
    FairLock fairLock = new FairLock();
    public int fairLockInc() {
        int newCount = count;
        try {
            fairLock.lock();
            newCount = ++count;

        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } finally {
            fairLock.unlock();
        }

        return newCount;
    }

    class Worker extends Thread {
        private CountDownLatch latch;
        public Worker(CountDownLatch latch, int i) {
            this.latch = latch;
            this.setName(i + " thread");
        }
        public void run() {
            try {
                fairLockInc();
            } catch (Exception e) {
                e.printStackTrace();
            }
            latch.countDown();
        }
    }

    class FairLock {
        private boolean isLocked = false;
        private Thread lockingThread = null;
        private List<QueueObject> waitingThreads = new ArrayList<QueueObject>();

        public void lock() throws InterruptedException {
            QueueObject queueObject = new QueueObject();
            boolean isLockedForThisThread = true;
            synchronized (this) {
                System.out.println(Thread.currentThread().getName() + " in");
                waitingThreads.add(queueObject);
            }

            while (isLockedForThisThread) {
                synchronized (this) {
                    isLockedForThisThread = isLocked || waitingThreads.get(0) != queueObject;
                    if (!isLockedForThisThread) {
                        isLocked = true;
                        System.out.println(Thread.currentThread().getName()
                                + " out");
                        waitingThreads.remove(queueObject);
                        lockingThread = Thread.currentThread();
                        return;
                    }
                }
                try {
                    queueObject.doWait();
                } catch (InterruptedException e) {
                    synchronized (this) {
                        System.out.println(Thread.currentThread().getName()
                                + " out");
                        waitingThreads.remove(queueObject);
                    }
                    throw e;
                }
            }
        }

        public synchronized void unlock() {
            if (this.lockingThread != Thread.currentThread()) {
                throw new IllegalMonitorStateException(
                        "Calling thread has not locked this lock");
            }
            isLocked = false;
            lockingThread = null;
            if (waitingThreads.size() > 0) {
                waitingThreads.get(0).doNotify();
            }
        }
    }
    class QueueObject {

        private boolean isNotified = false;

        public synchronized void doWait() throws InterruptedException {

            while (!isNotified) {
                this.wait();
            }

            this.isNotified = false;
        }

        public synchronized void doNotify() {
            this.isNotified = true;
            this.notify();
        }

        public boolean equals(Object o) {
            return this == o;
        }

    }

}

其实我们基于此还可以实现更多的锁,可以实现基于优先级的锁,主要实现思路就是创建线程的时候传入优先级参数,然后我们可以在入等待列表的时候对比传入的优先级参数进行比较大小,找到插入的位置即可,当然方法不止这一个,也可以是通知的时候选取优先级最大的通知。我觉得基于此我们可以把juc的所有类都可以实现。
上面都是基于wait/notify/notifyAll来同步的。wait/notify机制有个很蛋疼的地方是,比如线程B要用notify通知线程A,那么线程B要确保线程A已经在wait调用上等待了,否则线程A可能永远都在等待。编程的时候就会很蛋疼。另外,是调用notify,还是notifyAll?notify只会唤醒一个线程,如果错误地有两个线程在同一个对象上wait等待,那么又悲剧了。为了安全起见,貌似只能调用notifyAll了
看一看 java.util.concurrent.locks对wait/notify/notifyAll的代替 怎么实现的各种锁

Paste_Image.png

这里涉及到一个基础类 也是基于Unsafe 类实现的。
给出官方api的翻译版

用来创建锁和其他同步类的基本线程阻塞原语。
此类以及每个使用它的线程与一个许可关联(从 Semaphore 类的意义上说)。如果该许可可用,并且可在进程中使用,则调用 park 将立即返回;否则可能 阻塞。如果许可尚不可用,则可以调用 unpark 使其可用。(但与 Semaphore 不同的是,许可不能累积,并且最多只能有一个许可。)
park 和 unpark 方法提供了阻塞和解除阻塞线程的有效方法,并且不会遇到导致过时方法 Thread.suspend 和 Thread.resume 因为以下目的变得不可用的问题:由于许可的存在,调用 park 的线程和另一个试图将其 unpark 的线程之间的竞争将保持活性。此外,如果调用者线程被中断,并且支持超时,则 park 将返回。park 方法还可以在其他任何时间“毫无理由”地返回,因此通常必须在重新检查返回条件的循环里调用此方法。从这个意义上说,park 是“忙碌等待”的一种优化,它不会浪费这么多的时间进行自旋,但是必须将它与 unpark 配对使用才更高效。
三种形式的 park 还各自支持一个 blocker 对象参数。此对象在线程受阻塞时被记录,以允许监视工具和诊断工具确定线程受阻塞的原因。(这样的工具可以使用方法 getBlocker(java.lang.Thread) 访问 blocker。)建议最好使用这些形式,而不是不带此参数的原始形式。在锁实现中提供的作为 blocker 的普通参数是 this。
这些方法被设计用来作为创建高级同步实用工具的工具,对于大多数并发控制应用程序而言,它们本身并不是很有用。park 方法仅设计用于以下形式的构造:
while (!canProceed()) { ... LockSupport.park(this); }在这里,在调用 park 之前,canProceed 和其他任何动作都不会锁定或阻塞。因为每个线程只与一个许可关联,park 的任何中间使用都可能干扰其预期效果。
示例用法。 以下是一个先进先出 (first-in-first-out) 非重入锁类的框架。

class FIFOMutex {
   private final AtomicBoolean locked = new AtomicBoolean(false);
   private final Queue<Thread> waiters
     = new ConcurrentLinkedQueue<Thread>();
   public void lock() {
     boolean wasInterrupted = false;
     Thread current = Thread.currentThread();
     waiters.add(current);
    // Block while not first in queue or cannot acquire lock
     while (waiters.peek() != current ||
            !locked.compareAndSet(false, true)) {
        LockSupport.park(this);
        if (Thread.interrupted()) // ignore interrupts while waiting
          wasInterrupted = true;
     }
     waiters.remove();
     if (wasInterrupted)          // reassert interrupt status on exit
        current.interrupt();
   }
   public void unlock() {
     locked.set(false);
     LockSupport.unpark(waiters.peek());
   }
 }

这里写了个测试类,附上源码(有大概注释)

package com.alibaba.otter.canal.common;

import java.util.concurrent.locks.LockSupport;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class LockSupportTest extends AbstractZkTest {
    @Before
    public void setUp() {
        
    }

    @After
    public void tearDown() {
    }

    @Test
    public void testLockSupport() {
         LockSupport.park();
         System.out.println("block.");//阻塞在这里证明 默认许可时不可用的
    }
    @Test
    public void testUnpark() {
         Thread thread = Thread.currentThread();
         LockSupport.unpark(thread);//释放许可
         LockSupport.park();// 获取许可
         System.out.println("b");//正常执行 一对一使用
    }
    
    @Test
    public void testReentrantUnpark() {
        Thread thread = Thread.currentThread();
        
        LockSupport.unpark(thread);
        
        System.out.println("a");
        LockSupport.park();
        System.out.println("b");
        LockSupport.park();
        System.out.println("c");//阻塞在这里 ,说明非可重入的
    }
    @Test
    public void testInterrupt() throws Exception {
        Thread t = new Thread(new Runnable()
        {
            private int count = 0;

            @Override
            public void run()
            {
                long start = System.currentTimeMillis();
                long end = 0;

                while ((end - start) <= 1000)
                {
                    count++;
                    end = System.currentTimeMillis();
                }

                System.out.println("after 1 second.count=" + count);

            //等待或许许可
                LockSupport.park();
                System.out.println("thread over." + Thread.currentThread().isInterrupted());

            }
        });

        t.start();

        Thread.sleep(2000);

        // 中断线程
        t.interrupt(); //不会抛出InterruptException 不影响主线程

        
        System.out.println("main over");
    }
    
    
    
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 215,294评论 6 497
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,780评论 3 391
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 161,001评论 0 351
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,593评论 1 289
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,687评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,679评论 1 294
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,667评论 3 415
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,426评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,872评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,180评论 2 331
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,346评论 1 345
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,019评论 5 340
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,658评论 3 323
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,268评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,495评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,275评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,207评论 2 352

推荐阅读更多精彩内容

  • 1.解决信号量丢失和假唤醒 public class MyWaitNotify3{ MonitorObject m...
    Q罗阅读 875评论 0 1
  • 本文出自 Eddy Wiki ,转载请注明出处:http://eddy.wiki/interview-java.h...
    eddy_wiki阅读 2,103评论 0 14
  • layout: posttitle: 《Java并发编程的艺术》笔记categories: Javaexcerpt...
    xiaogmail阅读 5,813评论 1 19
  • 人是会怀念的,并非要到特定的老龄或者身体机能渐渐退化了才细想过去,而后被动拉扯些旧日里的光景润色眼下局限着...
    及客阅读 204评论 0 2
  • 早秋的天空颜色总是很淡,穹顶的郁蓝浅薄的近乎透明;云彩像斑斑鱼鳞又像裸露的河床,分来一点夕阳的秋意,散发着干净松爽...
    小野风信子阅读 468评论 0 2