主要内容摘抄自《Java并发编程的艺术》
Java并发机制的底层实现原理
在多线程并发编程中synchronized经常被称为重量级锁。
synchronized:Java中的每一个对象对可以作为锁,具体如下:
- 对于普通同步方法,锁是当前实例对象
- 对于静态同步方法,锁是当前类的Class对象
- 对于同步方法块,锁是synchronized括号里配置的对象
在JDK1.6中,锁一共有4中状态,级别从低到高依次是:无锁状态、偏向锁状态、轻量级锁状态和重量级锁状态,这几个状态会随着竞争情况逐渐升级。
偏向锁:
有人经过研究发现,大多数情况下,锁不仅不存在多线程竞争,而且总是由同一线程多次获得,为了让线程获得锁的代价更低而引入了偏向锁。
当一个线程访问同步块并获取锁时,会在对象头和栈帧中的锁记录里存储锁偏向的线程ID,以后该线程在进入和退出同步块时不需要进行CAS操作来加锁和解锁,只需简单测试对象头的Mark Word里是否存储着指向当前线程的偏向锁。如果测试成功,表示线程已经获得了锁。如果测试失败,则需要再测试一下Mark Word中偏向锁的表示是否设置成1(表示当前是偏向锁),如果没有设置则使用CAS竞争锁,如果设置了,则尝试使用CAS将对象头的偏向锁指向当前线程。
轻量级锁:
线程在执行同步块之前,JVM会先在当前线程的栈桢中创建用于存储锁记录的空间,并将对象头中的Mark Word复制到锁记录中,官方称为Displaced Mark Word。然后线程尝试使用 CAS将对象头中的Mark Word替换为指向锁记录的指针。如果成功,当前线程获得锁,如果失败,表示其他线程竞争锁,当前线程便尝试使用自旋来获取锁。
JMM
Java中,所有实例域、静态域和数组元素(?)都存储在堆内存中,堆内存在线程之间共享。局部变量,方法定义参数和异常处理器参数不会在线程之间共享,不会有内存可见性问题。
在执行程序时,为了提高性能,编译器和处理器常常会对指令做重排序。重排序分3种类型。
- 1)编译器优化的重排序。编译器在不改变单线程程序语义的前提下,可以重新安排语句的执行顺序。
- 2)指令级并行的重排序。现代处理器采用了指令级并行技术(Instruction-Level Parallelism,ILP)来将多条指令重叠执行。如果不存在数据依赖性,处理器可以改变语句对应 机器指令的执行顺序。
- 3)内存系统的重排序。由于处理器使用缓存和读/写缓冲区,这使得加载和存储操作看上去可能是在乱序执行。
volatile的内存语义
class A {
volatile long value = 0;
public void setValue(long value) {
this.value = value;
}
public synchronized void setValueSync(long value) {
this.value = value;
}
public long getValue() {
return value;
}
public synchronized long getValueSync() {
return value;
}
public void increase(){
value ++;
}
}
使用volatile修饰的value的get()、set()方法和加了synchronized关键字修饰的同步get()、set()方法一致。
但是increase()方法却不能保证同步。
锁的语义决定了临界区代码的执行具有原子性。这意味着,即使是64位的long型和double型变量,只要它是volatile变量,对该变量的读/写就具有原子性。如果是多个volatile操作或类似于volatile++这种复合操作,这些操作整体上不具有原子性。
也就是说,volatile关键字可以保证变量的可见性以及原子性(单个volatile变量的读/写)。
volatile写的内存语义:当写一个volatile变量时,JMM会把该线程对应的本地内存中的共享变量值刷新到主内存。
volatile读的内存语义:当读一个volatile变量时,JMM会把该线程对应的本地内存置为无效。线程接下来将从主内存中读取共享变量。
锁的内存语义
当线程释放锁时,JMM会把该线程对应的本地内存中的共享变量刷新到主内存中。
当线程获取锁时,JMM会把该线程对应的本地内存置为无效。
final域的内存语义
对于final域,编译器和处理器要遵守两个重排序规则。
1)在构造函数内对一个final域的写入,与随后把这个被构造对象的引用赋值给一个引用变量,这两个操作之间不能重排序。
2)初次读一个包含final域的对象的引用,与随后初次读这个final域,这两个操作之间不能重排序。
写final域的重排序规则可以确保:在对象引用为任意线程可见之前,对象的final域已经被正确初始化过了,而普通域不具有这个保障。
happens-before
懒加载
单例模式:
public class UnsafeLazyInitialization {
private static Instance instance;
public static Instance getInstance() {
if (instance == null) // 1:A线程执行
instance = new Instance(); // 2:B线程执行
return instance;
}
}
会有问题。加入同步:
public class SafeLazyInitialization {
private static Instance instance;
public synchronized static Instance getInstance() {
if (instance == null)
instance = new Instance();
return instance;
}
}
如果方法调用频繁会造成很大的开销。加入双重检查锁定(Double-Checked Locking):
public class DoubleCheckedLocking { // 1
private static Instance instance; // 2
public static Instance getInstance() { // 3
if (instance == null) { // 4:第一次检查
synchronized (DoubleCheckedLocking.class) { // 5:加锁
if (instance == null) // 6:第二次检查
instance = new Instance(); // 7:问题的根源出在这里
} // 8
} // 9
return instance; // 10
} // 11
}
上面代码表面上看起来,似乎两全其美。
·多个线程试图在同一时间创建对象时,会通过加锁来保证只有一个线程能创建对象。
·在对象创建好之后,执行getInstance()方法将不需要获取锁,直接返回已创建好的对象。
双重检查锁定看起来似乎很完美,但这是一个错误的优化!在线程执行到第4行,代码读取到instance不为null时,instance引用的对象有可能还没有完成初始化。
memory = allocate(); // 1:分配对象的内存空间
instance = memory; // 3:设置instance指向刚分配的内存地址
// 注意,此时对象还没有被初始化!
ctorInstance(memory); // 2:初始化对象
上面3行伪代码中的2和3之间,可能会被重排序(在一些JIT编译器上,这种重排序是真实发生的,详情见参考文献1的“Out-of-order writes”部分)。
根据《The Java Language Specification,Java SE 7 Edition》,所有线程在执行Java程序时必须要遵守intra-thread semantics。intra-thread semantics保证重排序不会改变单线程内的程序执行结果。换句话说,intra-thread semantics允许那些在单线程内,不会改变单线程程序执行结果的重排序。上面3行伪代码的2和3之间虽然被重排序了,但这个重排序并不会违反intra-thread semantics。这个重排序在没有改变单线程程序执行结果的前提下,可以提高程序的执行性能。
所以,示例代码的第7行(instance=new Singleton();)如果发生重排序,另一个并发执行的线程B就有可能在第4行判断instance不为null。线程B接下来将访问instance所引用的对象,但此时这个对象可能还没有被A线程初始化!
解决方案(volatile):
public class SafeDoubleCheckedLocking {
private volatile static Instance instance;
public static Instance getInstance() {
if (instance == null) {
synchronized (SafeDoubleCheckedLocking.class) {
if (instance == null)
instance = new Instance(); // instance为volatile,现在没问题了
}
}
return instance;
}
}
解决方案二(基于类初始化的解决方案):
public class InstanceFactory {
private static class InstanceHolder {
public static Instance instance = new Instance();
}
public static Instance getInstance() {
return InstanceHolder.instance ; // 这里将导致InstanceHolder类被初始化
}
}
Java并发编程基础
Java天生支持多线程,就算仅仅是main的运行也有很多线程一起工作:
public class MultiThread {
public static void main(String[] args) {
ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);
for (ThreadInfo threadInfo : threadInfos) {
System.out.println(threadInfo.getThreadName());
}
}
}
输出结果:
main
Reference Handler
Finalizer
Signal Dispatcher
Attach Listener
Common-Cleaner
Monitor Ctrl-Break
线程
Java线程在运行的生命周期中可能出现如下6中状态,在给定的一个时刻,线程只能处于其中的一个状态。
Thread类中的State:
public enum State {
/**
* Thread state for a thread which has not yet started.
*/
NEW,
/**
* Thread state for a runnable thread. A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
*/
RUNNABLE,
/**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
* {@link Object#wait() Object.wait}.
*/
BLOCKED,
/**
* Thread state for a waiting thread.
* A thread is in the waiting state due to calling one of the
* following methods:
* <ul>
* <li>{@link Object#wait() Object.wait} with no timeout</li>
* <li>{@link #join() Thread.join} with no timeout</li>
* <li>{@link LockSupport#park() LockSupport.park}</li>
* </ul>
*
* <p>A thread in the waiting state is waiting for another thread to
* perform a particular action.
*
* For example, a thread that has called {@code Object.wait()}
* on an object is waiting for another thread to call
* {@code Object.notify()} or {@code Object.notifyAll()} on
* that object. A thread that has called {@code Thread.join()}
* is waiting for a specified thread to terminate.
*/
WAITING,
/**
* Thread state for a waiting thread with a specified waiting time.
* A thread is in the timed waiting state due to calling one of
* the following methods with a specified positive waiting time:
* <ul>
* <li>{@link #sleep Thread.sleep}</li>
* <li>{@link Object#wait(long) Object.wait} with timeout</li>
* <li>{@link #join(long) Thread.join} with timeout</li>
* <li>{@link LockSupport#parkNanos LockSupport.parkNanos}</li>
* <li>{@link LockSupport#parkUntil LockSupport.parkUntil}</li>
* </ul>
*/
TIMED_WAITING,
/**
* Thread state for a terminated thread.
* The thread has completed execution.
*/
TERMINATED;
}
线程在自身的生命周期中,并不是固定地处于某个状态,而是随着代码地执行在不同地状态之间进行切换。
关于wait和sleep方法.
wait方法是Object
类的,也就是说,所有类都能调用该方法。sleep方法是Thread
类的静态方法。
- wait方法释放锁,sleep则继续持有锁;
- wait方法必须在同步块(synchronized)中执行,否则会抛异常(IllegalMonitorStateException);
线程通信
public class Synchronized {
public static void main(String[] args) {
// 对Synchronized Class对象进行加锁
synchronized (Synchronized.class) {
}
// 静态同步方法,对Synchronized Class对象进行加锁
m();
}
public static synchronized void m() {
}
}
对以上代码生成的class文件进行反编译(javap -v):
public static void main(java.lang.String[]);
descriptor: ([Ljava/lang/String;)V
flags: (0x0009) ACC_PUBLIC, ACC_STATIC
Code:
stack=2, locals=3, args_size=1
0: ldc #2 // class top/hellooooo/concurrency/base/Synchronized
2: dup
3: astore_1
4: monitorenter
5: aload_1
6: monitorexit
7: goto 15
10: astore_2
11: aload_1
12: monitorexit
13: aload_2
14: athrow
15: invokestatic #3 // Method m:()V
18: return
Exception table:
...
public static synchronized void m();
descriptor: ()V
flags: (0x0029) ACC_PUBLIC, ACC_STATIC, ACC_SYNCHRONIZED
Code:
stack=0, locals=0, args_size=0
0: return
LineNumberTable:
line 12: 0
以上class信息中,对于同步块的实现使用了monitorenter和monitorexit指令,而同步方法则是依靠方法修饰符上的ACC_SYNCHRONIZED来完成的。
无论采用哪种方式,其本质是对一个对象的监视器(monitor)进行获取,而这个获取过程是排他的,也就是同一时刻只能有一个线程获取到由synchronized所保护对象的监视器。
任意一个对象都拥有自己的监视器,当这个对象由同步块或者这个对象的同步方法调用时,执行方法的线程必须先获取到该对象的监视器才能进入同步块或者同步方法,而没有获取到监视器(执行该方法)的线程将会被阻塞在同步块和同步方法的入口处,进入BLOCKED状态。
关于Object
的wait和notify:
等待方遵循如下原则。 1)获取对象的锁。 2)如果条件不满足,那么调用对象的wait()方法,被通知后仍要检查条件。 3)条件满足则执行对应的逻辑。
对应的伪代码如下。
synchronized(对象) {
while(条件不满足) {
对象.wait();
}
对应的处理逻辑
}
通知方遵循如下原则。 1)获得对象的锁。 2)改变条件。 3)通知所有等待在对象上的线程。 对应的伪代码如下。
synchronized(对象) {
改变条件
对象.notifyAll();
}
管道输入/输出流
管道输入/输出流和普通的文件输入/输出流或者网络输入/输出流不同之处在于,它主要用于线程之间的数据传输,而传输的媒介为内存。
管道输入/输出流主要包括了如下4种具体实现:PipedOutputStream、PipedInputStream、 PipedReader和PipedWriter,前两种面向字节,而后两种面向字符。
public class Piped {
public static void main(String[] args) throws Exception {
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader();
// 将输出流和输入流进行连接,否则在使用时会抛出IOException
out.connect(in);
Thread printThread = new Thread(new Print(in), "PrintThread");
printThread.start();
int receive = 0;
try {
while ((receive = System.in.read()) != -1) {
out.write(receive);
}
} finally {
out.close();
}
}
static class Print implements Runnable {
private PipedReader in;
public Print(PipedReader in) {
this.in = in;
}
public void run() {
int receive = 0;
try {
while ((receive = in.read()) != -1) {
System.out.print((char) receive);
}
} catch (IOException ex) {
}
}
}
}
结果:
Repeat
Repeat
01
01
-1
-1
ThreadLocal
即线程变量,是一个以ThreadLocal对象为键、人一对象为值的存储结构。这个结构被附带在线程上,也就是说一个线程可以根据一个ThreadLocal对象查询到绑定在这个线程上的一个值。
public class Profiler {
// 第一次get()方法调用时会进行初始化(如果set方法没有调用),每个线程会调用一次
private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<Long>() {
protected Long initialValue() {
return System.currentTimeMillis();
}
};
public static final void begin() {
TIME_THREADLOCAL.set(System.currentTimeMillis());
}
public static final long end() {
return System.currentTimeMillis() - TIME_THREADLOCAL.get();
}
public static void main(String[] args) throws Exception {
Profiler.begin();
TimeUnit.SECONDS.sleep(1);
System.out.println("Cost: " + Profiler.end() + " mills");
}
}
结果:
Cost: 1007 mills
示例:线程池
该示例取自B站汪文君老师的并发课。
public class SimpleThreadPool {
// 线程池线程数
private final int size;
// 线程池默认线程数
private final static int DEFAULT_SIZE = 10;
// 线程池线程名公共前缀
private final static String THREAD_PREFIX = "SIMPLE_THREAD_POOL-";
// 方便为线程取名
private static int seq = 0;
// 线程池中线程
private final static List<WorkerThread> THREAD_QUEUE = new ArrayList<>();
// 线程池的任务队列
private final static LinkedList<Runnable> TASK_QUEUE = new LinkedList();
public SimpleThreadPool() {
this(DEFAULT_SIZE);
}
public SimpleThreadPool(int size) {
this.size = size;
// 初始化的任务就是新建线程
init();
}
private void init() {
for (int i = 0; i < size; i++) {
createWorkerTask();
}
}
public void submit(Runnable runnable) {
// 取到TASK_QUEUE的锁后添加任务并通知等待对象
synchronized (TASK_QUEUE) {
TASK_QUEUE.addLast(runnable);
TASK_QUEUE.notifyAll();
}
}
private void createWorkerTask() {
WorkerThread task = new WorkerThread(THREAD_PREFIX + (seq++));
task.start();
THREAD_QUEUE.add(task);
}
private enum TaskState {
FREE, RUNNING, BLOCKED, DEAD
}
private static class WorkerThread extends Thread{
// 工作线程初始化状态为FREE状态
private volatile TaskState taskState = TaskState.FREE;
public WorkerThread(String name) {
super(name);
}
@Override
public void run() {
// 只要线程状态不为DEAD就不断运行
while (this.taskState != TaskState.DEAD) {
Runnable runnable;
synchronized (TASK_QUEUE) {
while (TASK_QUEUE.isEmpty()) {
try {
taskState = TaskState.BLOCKED;
TASK_QUEUE.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 从任务队列中取出任务
runnable = TASK_QUEUE.removeFirst();
}
// 有任务就运行
if (runnable != null) {
taskState = TaskState.RUNNING;
runnable.run();
taskState = TaskState.FREE;
}
}
}
}
public static void main(String[] args) {
SimpleThreadPool threadPool = new SimpleThreadPool();
for (int i = 0; i < 40; i++) {
// 没这个就报错
// java: local variables referenced from a lambda expression must be final or effectively final
int finalI = i;
threadPool.submit(() -> {
System.out.println("Task " + finalI + " serviced by " + Thread.currentThread().getName());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Task " + finalI + " serviced by " + Thread.currentThread().getName() + " finished");
});
}
}
}
这是一个非常简单的示例,甚至不包括线程池的停止、任务队列的拒绝策略等常用功能。当然,该示例主要目的是展示线程池的基本思想,足够简单才是好的。
Java中的锁
AbstractQueuedSynchronizer
队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组 件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3 个方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))来进行操 作,因为它们能够保证状态的改变是安全的。子类推荐被定义为自定义同步组件的静态内部 类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获 取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、 ReentrantReadWriteLock和CountDownLatch等)。
示例:
class Mutex implements Lock {
// 静态内部类,自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 是否处于占用状态
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 当状态为0的时候获取锁
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 释放锁,将状态设置为0
protected boolean tryRelease(int releases) {
if (getState() == 0) throw new
IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 返回一个Condition,每个condition都包含了一个condition队列
Condition newCondition() {
return new ConditionObject();
}
}
// 仅需要将操作代理到Sync上即可
private final Sync sync = new Sync();
public void lock() {
sync.acquire(1);
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
public void unlock() {
sync.release(1);
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
上述示例中,独占锁Mutex是一个自定义同步组件,它在同一时刻只允许一个线程占有 锁。Mutex中定义了一个静态内部类,该内部类继承了同步器并实现了独占式获取和释放同步 状态。在tryAcquire(int acquires)方法中,如果经过CAS设置成功(同步状态设置为1),则代表获 取了同步状态,而在tryRelease(int releases)方法中只是将同步状态重置为0。用户使用Mutex时 并不会直接和内部同步器的实现打交道,而是调用Mutex提供的方法,在Mutex的实现中,以获 取锁的lock()方法为例,只需要在方法实现中调用同步器的模板方法acquire(int args)即可,当 前线程调用该方法获取同步状态失败后会被加入到同步队列中等待,这样就大大降低了实现 一个可靠自定义同步组件的门槛。
实现分析:
同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。
同步器包含了两个节点类型的引用,一个指向头节点,而另一个指向尾节点。 试想一下,当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,转而被构造成为节点并加入到同步队列中,而这个加入队列的过程必须要保证线程安全,因此同步器提供了一个基于CAS的设置尾节点的方法:compareAndSetTail(Node expect,Node update),它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。
通过调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,也就是由于线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
提供一个tryAcquire的实现:
// 当状态为0的时候获取锁 public boolean tryAcquire(int acquires) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
上述代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作,其主要逻辑是:首先调用自定义同步器实现的tryAcquire(int arg)方法,该方法保证线程安全的获取同步状态,如果同步状态获取失败,则构造同步节点(独占式 Node.EXCLUSIVE,同一时刻只能有一个线程成功获取同步状态)并通过addWaiter(Node node) 方法将该节点加入到同步队列的尾部,最后调用acquireQueued(Node node,int arg)方法,使得该节点以“死循环”的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。
private Node addWaiter(Node mode) {
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue();
}
}
}
同步器通过“死循环”来保证节点的正确添加,在“死循环”中只有通过CAS将节点设置成为尾节点之后,当前线程才能从该方法返回,否则,当前线程不断地尝试设置。
节点进入同步队列之后,就进入了一个自旋的过程,每个节点(或者说每个线程)都在自省地观察,当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则依旧留在这个自旋过程中(并会阻塞节点的线程)。
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
在acquireQueued(final Node node,int arg)方法中,当前线程在“死循环”中尝试获取同步状 态,而只有前驱节点是头节点才能够尝试获取同步状态,这是为什么?原因有两个,如下。 第一,头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点。 第二,维护同步队列的FIFO原则。该方法中,节点自旋获取同步状态的行为如图5-4所示。
acquire方法调用流程:
总结:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列 (或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用tryRelease(int arg)方法释放同步状态,然后唤醒头节点的后继节点。
下面提供一个共享锁的例子,该锁最多允许两个线程获取锁:
public class TwinsLock implements Lock {
private final Sync sync = new Sync(2);
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
if (count <= 0) {
throw new IllegalArgumentException("count must large
than zero.");
}
setState(count);
}
public int tryAcquireShared(int reduceCount) {
for (;;) {
int current = getState();
int newCount = current - reduceCount;
if (newCount < 0 || compareAndSetState(current,
newCount)) {
return newCount;
}
}
}
public boolean tryReleaseShared(int returnCount) {
for (;;) {
int current = getState();
int newCount = current + returnCount;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
}
public void lock() {
sync.acquireShared(1);
}
public void unlock() {
sync.releaseShared(1);
}
// 其他接口方法略
}
ReentrantLock
回忆在上一节中的示例(Mutex),同时考虑如下场景:当一个线程调用Mutex的lock() 方法获取锁之后,如果再次调用lock()方法,则该线程将会被自己所阻塞,原因是Mutex在实现 tryAcquire(int acquires)方法时没有考虑占有锁的线程再次获取锁的场景,而在调用 tryAcquire(int acquires)方法时返回了false,导致该线程被阻塞。简单地说,Mutex是一个不支持 重进入的锁。而synchronized关键字隐式的支持重进入,比如一个synchronized修饰的递归方 法,在方法执行时,执行线程在获取了锁之后仍能连续多次地获得该锁,而不像Mutex由于获取了锁,而在下一次获取锁时出现阻塞自己的情况。
以ReentrantLock
非公平获取锁实现为例:
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
成功获取锁的线程再次获取锁,只是增加了同步状态值,这也就要求ReentrantLock在释放同步状态时减少同步状态值。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
LockSupport
LockSupport定义了一组以park开头的方法用来阻塞当前线程,以及unpark(Thread thread) 方法来唤醒一个被阻塞的线程。Park有停车的意思,假设线程为车辆,那么park方法代表着停车,而unpark方法则是指车辆启动离开。
ConcurrentHashMap
在并发编程中使用HashMap可能导致程序死循环(HashMap在并发执行put操作时会引起死循环,是因为多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获 取Entry。)。而使用线程安全的HashTable效率又非常低下(HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable 的效率非常低下。因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同步方法时,会进入阻塞或轮询状态。),基于以上两个原因,便有了ConcurrentHashMap的登场机会。
HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的线程都必须竞争同一把锁,假如容器里有多把锁,每一把锁用于锁容器其中一部分数据,那么当多线程访问容器里不同数据段的数据时,线程间就不会存在锁竞争,从而可以有效提高并发访问效率,这就是ConcurrentHashMap所使用的锁分段技术。首先将数据分成一段一段地存储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数据也能被其他线程访问。
参考
- Java并发编程的艺术
- 高并发编程实战