一 .volatile关键字
volatile是java提供的轻量级的同步机制,主要有三个特性:保证可见性,禁止指令重排,不保证原子性。
JMM(Java 内存模型)
基本概念
JMM本身是一种抽象的概念 并不真实存在,他描述的是一组定义或规范,通过这组规范规定了程序中的访问方式
JMM同步规定:
1.线程解锁前必须把共享变量的值刷回主内存
2.线程加锁前把主内存的值复制到自己的内存
3.加锁和解锁必须是同一把锁。
由于 JVM 运行程序的实体是线程,而每个线程创建时 JVM 都会为其创建一个工作内存,工作内存是每个线程的私有数据区域,而 Java 内存模型中规定所有变量的储存在主内存,主内存是共享内存区域,所有的线程都可以访问,但线程对变量的操作(读取赋值等)必须都工作内存进行看。
首先要将变量从主内存拷贝的自己的工作内存空间,然后对变量进行操作,操作完成后再将变量写回主内存,不能直接操作主内存中的变量,工作内存中存储着主内存中的变量副本拷贝,前面说过,工作内存是每个线程的私有数据区域,因此不同的线程间无法访问对方的工作内存,线程间的通信(传值)必须通过主内存来完成。
1.可见性代码示例
public class VolatileDemo {
static int v1;
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
v1 = 100;
}
}).start();
while (v1 == 0) {
}
System.out.println("v1 = " + v1);
}
}
上面示例无法输出结果,当主线程已经读到v1的值时,如果不加volatile关键字,另一个线程更改这个值不会去通知主线程。所以进入死循环。如果加上volatile关键字,则会刷新主线程的v1的值,打印之后结束主线程。
2.不保证原子性代码示例
public class VolatileDemo {
static volatile int v1;
public static void add(){
v1++;
}
public static void main(String[] args) throws InterruptedException {
for(int i = 0;i< 20;i++) {
new Thread(new Runnable() {
@Override
public void run() {
for(int i = 0;i < 1000; i++){
add();
}
}
}).start();
}
//当活动线程只有主线程和GC时才进行打印否则让位其他线程
while (Thread.activeCount() > 2){
Thread.yield();
}
System.out.println("v1 = " + v1);
}
}
i++并非原子操作,包含三个步骤:1·读取i的值 2·将i的值加一 3·将加一后的值写回i
发现上面示例打印的总是比20000小,说明volatile并不能保证原子性
3.禁止指令重排
指令重排:一般情况下,CPU和编译器为了提升程序执行的效率,会按照一定的规则允许进行指令优化,在某些情况下,这种优化会带来一些执行的逻辑问题,主要的原因是代码逻辑之间是存在一定的先后顺序,在并发执行情况下,会发生二义性,即按照不同的执行逻辑,会得到不同的结果信息。
volatile 实现禁止指令重排序的优化,从而避免了多线程环境下程序出现乱序的现象
先了解一个概念,内存屏障(Memory Barrier)又称内存栅栏,是一个 CPU 指令,他的作用有两个:
1.保证特定操作的执行顺序
2.保证某些变量的内存可见性(利用该特性实现 volatile 的内存可见性)
由于编译器个处理器都能执行指令重排序优化,如果在指令间插入一条 Memory Barrier 则会告诉编译器和 CPU,不管什么指令都不能个这条 Memory Barrier 指令重排序,也就是说通过插入内存屏障禁止在内存屏障前后执行重排序优化。内存屏障另一个作用是强制刷出各种 CPU 缓存数据,因此任何 CPU 上的线程都能读取到这些数据的最新版本。
volatile常见用法(双端检锁单例)
public class Singleton {
private static volatile Singleton singleton;
private Singleton() {
}
public static Singleton getInstance(){
if(singleton == null){
synchronized (Singleton.class){
if(singleton == null){
singleton = new Singleton();
}
}
}
return singleton;
}
}
如果不加volatile多线程环境下存在指令重排的风险,singleton = new Singleton(); 可以分解为三条指令
1.分配内存地址
2.初始化对象
3.将内存地址指向初始化对象
由于指令重排只保证单线程下程序的执行结果,编译可以优化为132的顺序,这样在getInstance方法调用时singleton == null。加上volatile可以避免这个问题。
二.CAS (CompareAndSwap)比较并交换
CAS是一种无锁编程,对比synchronized效率更高。CAS操作包含三个操作数——内存位置(V),预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器将会自动将该位置值更新为新值,否则,不做任何操作。无论哪种情况,它都会在CAS指令之前返回该位置的值。
通过以上定义我们知道CAS其实是有三个步骤的
1.读取内存中的值
2.将读取的值和预期的值比较
3.如果比较的结果符合预期,则写入新值
CAS 体现在 JAVA 语言中就是 sun.misc.Unsafe 类中的各个方法。调用 UnSafe 类中的 CAS 方法,JVM 会帮我们实现出 CAS 汇编指令。这是一种完全依赖硬件的功能,通过它实现了原子操作。由于 CAS 是一种系统源语,源语属于操作系统用语范畴,是由若干条指令组成,用于完成某一个功能的过程,并且原语的执行必须是连续的,在执行的过程中不允许被中断,也就是说 CAS 是一条原子指令,不会造成所谓的数据不一致的问题。
UnSafe类
public class AtomicInteger extends Number implements java.io.Serializable {
private static final long serialVersionUID = 6214790243416807050L;
// setup to use Unsafe.compareAndSwapInt for updates
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long valueOffset;
static {
try {
valueOffset = unsafe.objectFieldOffset
(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}
private volatile int value;
/**
* Creates a new AtomicInteger with the given initial value.
*
* @param initialValue the initial value
*/
public AtomicInteger(int initialValue) {
value = initialValue;
}
/**
* Creates a new AtomicInteger with initial value {@code 0}.
*/
public AtomicInteger() {
}
Unsafe 是 CAS 的核心类,由于 Java 方法无法直接访问底层系统,而需要通过本地(native)方法来访问, Unsafe 类相当一个后门,基于该类可以直接操作特定内存的数据。Unsafe 类存在于 sun.misc 包中,其内部方法操作可以像 C 指针一样直接操作内存,因为 Java 中 CAS 操作执行依赖于 Unsafe 类。
变量 vauleOffset,表示该变量值在内存中的偏移量,因为 Unsafe 就是根据内存偏移量来获取数据的。
变量 value 用 volatile 修饰,保证了多线程之间的内存可见性。
AtomicInteger示例代码
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicDemo {
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger(2019);
System.out.println(i.compareAndSet(2019, 2020)); //true
System.out.println(i); //2020
AtomicInteger n = new AtomicInteger();
System.out.println(n.compareAndSet(2019, 2020)); //false
System.out.println(n); // 0
}
}
除了JAVA提供的基本类型外还提供了AtuomicRefence 原子引用类,可以处理对象的原子类。
AtomicRefence示例代码
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
public class AtomicDemo {
public static void main(String[] args) {
User user = new User("zhangsan", 18);
AtomicReference atomicReference = new AtomicReference(user);
atomicReference.compareAndSet(user,new User("lisi",20));
System.out.println(atomicReference.get());
}
}
class User {
private String name;
private int age;
public User(String name, int age) {
this.name = name;
this.age = age;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
CAS的缺点
1.循环时间长开销很大。如果 CAS 失败,会一直尝试,如果 CAS 长时间一直不成功,可能会给 CPU 带来很大的开销(比如线程数很多,每次比较都是失败,就会一直循环),所以希望是线程数比较小的场景。
2.只能对一个共享变量进行原子操作,多个变量的情况不可用。
3.会出现ABA问题
ABA问题
两个线程修改共享变量,共享变量由A改为B,又由B改为A。此时另一个线程并不知道情况,以为共享变量的值没有改变,将共享变量的值修改。
代码示例
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicABADemo {
public static void main(String[] args) {
AtomicInteger i = new AtomicInteger();
new Thread(new Runnable() {
@Override
public void run() {
i.compareAndSet(0,1);
System.out.println("i的值由0改为1");
i.compareAndSet(1,0);
System.out.println("i的值由1改为0");
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
i.compareAndSet(0,2019);
System.out.println("ABA问题出现——》i的值由0改为2019");
}
}).start();
}
}
解决方案
java提供了一个带有版本号的原子引用类AtomicStampedRefence,实际上就是一个乐观锁。
代码示例
import java.util.concurrent.atomic.AtomicStampedReference;
public class AtomicDemo {
public static void main(String[] args) {
User user = new User("zhangsan",18);
// 传入初始对象和版本号
AtomicStampedReference atomicStampedReference = new AtomicStampedReference(user,0);
User user1 = new User("lisi",20);
new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(user,user1,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
System.out.println("版本号为" + atomicStampedReference.getStamp() + ",User对象信息为" + atomicStampedReference.getReference());
atomicStampedReference.compareAndSet(user1,user,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
System.out.println("版本号为" + atomicStampedReference.getStamp() + ",User对象信息为" + atomicStampedReference.getReference());
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
int stamp = atomicStampedReference.getStamp();
User user2 = new User("wangwu",28);
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
atomicStampedReference.compareAndSet(user,user2,stamp,stamp++);
System.out.println("版本号为" + atomicStampedReference.getStamp() + ",User对象信息为" + atomicStampedReference.getReference());
}
}).start();
}
}
class User {
private String name;
private int age;
public User(String name, int age) {
this.name = name;
this.age = age;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
", age=" + age +
'}';
}
}
三 .Java中的锁
1.公平锁与非公平锁
公平锁:是指多个线程按照申请的顺序来获取值
非公平锁:是值多个线程获取值的顺序并不是按照申请锁的顺序,有可能后申请的线程比先申请的线程优先获取锁,在高并发的情况下,可能会造成优先级翻转或者饥饿现象
两者区别
公平锁:在并发环境中,每一个线程在获取锁时会先查看此锁维护的等待队列,如果为空,或者当前线程是等待队列的第一个就占有锁,否者就会加入到等待队列中,以后会按照 FIFO 的规则获取锁
非公平锁:一上来就尝试占有锁,如果失败在进行排队
代码示例
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
ReentrantLock的构造方法中可传入一个boolean值,表示是否是公平锁。默认为非公平锁。 synchronized是一种非公平锁。
2.可重入锁(递归锁)和不可重入锁
可重入锁:指的是同一个线程外层函数获得锁之后,内层仍然能获取到该锁,在同一个线程在外层方法获取锁的时候,在进入内层方法或会自动获取该锁。
不可重入锁: 若当前线程执行某个方法已经获取了该锁,那么在方法中尝试再次获取锁时,就会获取不到被阻塞。
代码示例
public class ReentrantLockDemo {
public static void main(String[] args) {
method1();
}
public static synchronized void method1(){
System.out.println("method1执行");
method2();
}
public static synchronized void method2(){
System.out.println("method2执行");
}
}
在main方法中调用method1()发现两个方法都执行了,说明synchronized是可重入锁。ReentrantLock也是可重入锁。
3.自旋锁 类似CAS
尝试获取锁的线程不会立即堵塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上线文切换的消耗,缺点就是循环会消耗 CPU。
代码示例
import java.util.concurrent.atomic.AtomicReference;
public class SpinLockTest {
public static void main(String[] args) {
SpinLock spinLock = new SpinLock();
new Thread(new Runnable() {
@Override
public void run() {
spinLock.lock();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("------------");
spinLock.unlock();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
spinLock.lock();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("++++++++++++");
spinLock.unlock();
}
}).start();
}
}
class SpinLock {
private AtomicReference<Thread> atomicReference = new AtomicReference();
public void lock() {
Thread thread = Thread.currentThread();
while (!atomicReference.compareAndSet(null, thread)) {
}
System.out.println(thread.getName() + "获取锁");
}
public void unlock() {
Thread thread = Thread.currentThread();
atomicReference.compareAndSet(thread, null);
System.out.println(thread.getName() + "释放锁");
}
}
/*
Thread-0获取锁
------------
Thread-0释放锁
Thread-1获取锁
++++++++++++
Thread-1释放锁
*/
输出结果表示加锁成功。获取锁的时候,如果原子引用为空就获取锁,不为空表示有人获取了锁,就循环等待。
4.独占锁与共享锁(读写锁)
独占锁:指该锁一次只能被一个线程持有
共享锁:该锁可以被多个线程持有
Java中的ReentrantLock和synchronized都是独占锁。ReentrantReadWriteLock中的ReadLock是共享锁,WriteLock是独占锁。多个线程可以同时持有ReadLock,读写·写读·写写都是互斥的。
代码示例
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class ReentrantReadWriteLockTest {
public static void main(String[] args) {
ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock();
List list = new ArrayList();
for(int i = 0;i < 20 ; i++){
new Thread(new Runnable() {
@Override
public void run() {
readWriteLock.writeLock().lock();
System.out.println(Thread.currentThread() + "写开始");
list.add(Math.round(Math.random()*100));
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + "写结束");
readWriteLock.writeLock().unlock();
}
}).start();
}
for(int i = 0;i < 20 ; i++){
new Thread(new Runnable() {
@Override
public void run() {
readWriteLock.readLock().lock();
System.out.println(Thread.currentThread() + "读开始");
System.out.println(list);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread() + "读结束");
readWriteLock.readLock().unlock();
}
}).start();
}
}
}
5.synchronized 和 Lock
原始结构
synchronized 是关键字属于 JVM 层面,反应在字节码上是 monitorenter 和 monitorexit,其底层是通过 monitor 对象来完成,其实 wait/notify 等方法也是依赖 monitor 对象只有在同步快或方法中才能调用 wait/notify 等方法。
Lock 是具体类(java.util.concurrent.locks.Lock)是 api 层面的锁。
使用方法
synchronized 不需要用户手动去释放锁,当 synchronized 代码执行完后系统会自动让线程释放对锁的占用。
ReentrantLock 则需要用户手动的释放锁,若没有主动释放锁,可能导致出现死锁的现象,lock() 和 unlock() 方法需要配合 try/finally 语句来完成。
等待是否可中断
synchronized 不可中断,除非抛出异常或者正常运行完成。
ReentrantLock 可中断,设置超时方法 tryLock(long timeout, TimeUnit unit),lockInterruptibly() 放代码块中,调用 interrupt() 方法可中断。
加锁是否公平
synchronized 非公平锁
ReentrantLock 默认非公平锁,构造方法中可以传入 boolean 值,true 为公平锁,false 为非公平锁。
锁可以绑定多个 Condition
synchronized 没有 Condition。
ReentrantLock 用来实现分组唤醒需要唤醒的线程们,可以精确唤醒,而不是像 synchronized 要么随机唤醒一个线程要么唤醒全部线程。
类别 | synchronized | Lock |
---|---|---|
存在层次 | Java的关键字,在jvm层面上 | 是一个类 |
锁的释放 | 1、以获取锁的线程执行完同步代码,释放锁 2、线程执行发生异常,jvm会让线程释放锁 | 在finally中必须释放锁,不然容易造成线程死锁 |
锁的获取 | 假设A线程获得锁,B线程等待。如果A线程阻塞,B线程会一直等待 | 分情况而定,Lock有多个锁获取的方式,具体下面会说道,大致就是可以尝试获得锁,线程可以不用一直等待 |
锁状态 | 无法判断 | 可以判断 |
锁类型 | 可重入 不可中断 非公平 | 可重入 可判断 可公平(两者皆可) |
性能 | 少量同步 | 大量同步 |
JDK1.6以后,为了减少获得锁和释放锁所带来的性能消耗,提高性能,引入了“轻量级锁”和“偏向锁”。官方更建议使用synchronized。详情见Java中的偏向锁,轻量级锁, 重量级锁解析
代码示例:synchronized实现生产消费模型
import java.util.concurrent.TimeUnit;
public class ProdConsume_Synchronized {
private int count = 0;
public static final int FULL = 10;
private volatile boolean flag = true;
private Object lock;
public ProdConsume_Synchronized(Object lock) {
this.lock = lock;
}
public static void main(String[] args) {
Object lock = new Object();
ProdConsume_Synchronized prodConsume_synchronized = new ProdConsume_Synchronized(lock);
new Thread(new Runnable() {
@Override
public void run() {
try {
prodConsume_synchronized.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
prodConsume_synchronized.prod();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
prodConsume_synchronized.flag = false;
}
public void prod() throws InterruptedException {
while (flag) {
synchronized (lock) {
while (count == FULL) {
lock.wait();
}
count++;
long round = Math.round(Math.random() * 1000);
Thread.sleep(round);
System.out.println(round + "号商品生产完毕,还有" + count + "个商品");
lock.notifyAll();
}
}
}
public void consume() throws InterruptedException {
while (flag) {
synchronized (lock) {
while (count == 0) {
lock.wait();
}
count--;
Thread.sleep(500);
System.out.println("取走一个商品,还有" + count + "个商品");
lock.notifyAll();
}
}
}
}
输出结果
643号商品生产完毕,还有1个商品
322号商品生产完毕,还有2个商品
819号商品生产完毕,还有3个商品
877号商品生产完毕,还有4个商品
112号商品生产完毕,还有5个商品
904号商品生产完毕,还有6个商品
978号商品生产完毕,还有7个商品
569号商品生产完毕,还有8个商品
949号商品生产完毕,还有9个商品
661号商品生产完毕,还有10个商品
取走一个商品,还有9个商品
取走一个商品,还有8个商品
取走一个商品,还有7个商品
取走一个商品,还有6个商品
取走一个商品,还有5个商品
取走一个商品,还有4个商品
取走一个商品,还有3个商品
124号商品生产完毕,还有4个商品
代码示例:ReentrantLock实现生产消费模型
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProdConsume_ReentrantLock {
private int count = 0;
public static final int FULL = 10;
private volatile boolean flag = true;
private Lock lock = new ReentrantLock();
private Condition condition_prod = lock.newCondition();
private Condition condition_consume = lock.newCondition();
public static void main(String[] args) {
ProdConsume_ReentrantLock prodConsume_reentrantLock = new ProdConsume_ReentrantLock();
for (int i = 0; i < 3; i++) {
new Thread(new Runnable() {
@Override
public void run() {
prodConsume_reentrantLock.prod();
}
}).start();
}
for (int i = 0; i < 3; i++) {
new Thread(new Runnable() {
@Override
public void run() {
prodConsume_reentrantLock.consume();
}
}).start();
}
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
prodConsume_reentrantLock.flag = false;
}
public void prod() {
while (flag) {
try {
lock.lock();
try {
while (count == FULL) {
condition_prod.await();
}
count++;
long round = Math.round(Math.random() * 1000);
Thread.sleep(round);
System.out.println(round + "号商品生产完毕,还有" + count + "个商品");
condition_consume.signalAll();
} catch (Exception e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
}
public void consume() {
while (flag) {
try {
lock.lock();
try {
while (count == 0) {
condition_consume.await();
}
count--;
Thread.sleep(500);
System.out.println("取走一个商品,还有" + count + "个商品");
condition_prod.signalAll();
} catch (Exception e) {
e.printStackTrace();
}
} finally {
lock.unlock();
}
}
}
}
代码示例:ReentrantLock实现精准唤醒
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class PrecisionWeakUp_ReentrantLock {
private final Lock lock = new ReentrantLock();
private Condition conditionA = lock.newCondition();
private Condition conditionB = lock.newCondition();
private Condition conditionC = lock.newCondition();
private int flag = 1;
// 多线程下先输出五次A 再输出五次B 再输出五次C
public static void main(String[] args) {
PrecisionWeakUp_ReentrantLock precisionWeakUp_reentrantLock = new PrecisionWeakUp_ReentrantLock();
for(int i = 0;i < 3;i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
precisionWeakUp_reentrantLock.printA();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
for(int i = 0;i < 3;i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
precisionWeakUp_reentrantLock.printB();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
for(int i = 0;i < 3;i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
precisionWeakUp_reentrantLock.printC();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}
public void printA() throws InterruptedException {
try {
lock.lock();
while (flag != 1) {
conditionA.await();
}
for (int i = 0; i < 5; i++) {
System.out.println("A");
}
flag = 2;
conditionB.signalAll();
}finally {
lock.unlock();
}
}
public void printB() throws InterruptedException {
try {
lock.lock();
while (flag != 2) {
conditionB.await();
}
for (int i = 0; i < 5; i++) {
System.out.println("B");
}
flag = 3;
conditionC.signalAll();
}finally {
lock.unlock();
}
}
public void printC() throws InterruptedException {
try {
lock.lock();
while (flag != 3) {
conditionC.await();
}
for (int i = 0; i < 5; i++) {
System.out.println("C");
}
flag = 1;
conditionA.signalAll();
}finally {
lock.unlock();
}
}
}
四 .JUC包的并发工具类
1.CountDownLatch
CountDownLatch中count down是倒数的意思,latch则是门闩的含义。整体含义可以理解为倒数的门栓。在构造CountDownLatch的时候需要传入一个整数n,在这个整数“倒数”到0之前,主线程需要等待在门口,而这个“倒数”过程则是由各个执行线程驱动的,每个线程执行完一个任务“倒数”一次。总结来说,CountDownLatch的作用就是等待其他的线程都执行完任务,必要时可以对各个任务的执行结果进行汇总,然后主线程才继续往下执行。
CountDownLatch主要有两个方法:countDown()和await()。countDown()方法用于使计数器减一,其一般是执行任务的线程调用,await()方法则使调用该方法的线程处于等待状态,其一般是主线程调用。这里需要注意的是,countDown()方法并没有规定一个线程只能调用一次,当同一个线程调用多次countDown()方法时,每次都会使计数器减一;另外,await()方法也并没有规定只能有一个线程执行该方法,如果多个线程同时执行await()方法,那么这几个线程都将处于等待状态,并且以共享模式享有同一个锁。
代码示例
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class CountDownLatchDemo {
public static void main(String[] args) {
List list = new ArrayList();
List synchronizedList = Collections.synchronizedList(list);
CountDownLatch countDownLatch = new CountDownLatch(5);
for(int i = 0;i < 5;i++){
new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
long round = Math.round(Math.random() * 100);
synchronizedList.add(round);
System.out.println(round + "添加进List");
countDownLatch.countDown();
}
}).start();
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(synchronizedList);
}
}
输出结果
68添加进List
91添加进List
57添加进List
12添加进List
98添加进List
[68, 57, 12, 98, 91]
CountDownLatch非常适合于对任务进行拆分,使其并行执行,比如某个任务执行2s,其对数据的请求可以分为五个部分,那么就可以将这个任务拆分为5个子任务,分别交由五个线程执行,执行完成之后再由主线程进行汇总,此时,总的执行时间将决定于执行最慢的任务,平均来看,还是大大减少了总的执行时间。
2. CyclicBarrier
CyclicBarrier同步屏障,可以让一组线程达到一个屏障时被阻塞,直到最后一个线程达到屏障时,所以被阻塞的线程才能继续执行。
CyclicBarrier好比一扇门,默认情况下关闭状态,堵住了线程执行的道路,直到所有线程都就位,门才打开,让所有线程一起通过。
CyclicBarrier的构造方法
1.CyclicBarrier(int parties):创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动 barrier 时执行预定义的操作。
2.CyclicBarrier(int parties, Runnable barrierAction) :创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动 barrier 时执行给定的屏障操作,该操作由最后一个进入 barrier 的线程执行。
代码示例
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
//景区观光车循环发车,每一辆车一个五个座位,坐满发车
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("人员已到位,出发");
}
});
for(int i = 1;i <= 5;i++){
final int n = i;
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(n + "号游客上车");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}).start();
}
for(int i = 6;i <= 10;i++){
final int n = i;
new Thread(new Runnable() {
@Override
public void run() {
System.out.println(n + "号游客上车");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
输出结果
1号游客上车
3号游客上车
2号游客上车
5号游客上车
4号游客上车
人员已到位,出发
6号游客上车
7号游客上车
8号游客上车
9号游客上车
10号游客上车
人员已到位,出发
每当线程执行await,内部变量count减1,如果count!= 0,说明有线程还未到屏障处,则在锁条件变量trip上等待。
当count == 0时,说明所有线程都已经到屏障处,执行条件变量的signalAll方法唤醒等待的线程。
CountDownLatch与CyclicBarrier比较
CountDownLatch | CyclicBarrier |
---|---|
减计数方式 | 加计数方式 |
计算为0时释放所有等待的线程 | 计数达到指定值时释放所有等待线程 |
计数为0时,无法重置 | 计数达到指定值时,计数置为0重新开始 |
调用countDown()方法计数减一,调用await()方法只进行阻塞,对计数没任何影响 | 调用await()方法计数加1,若加1后的值不等于构造方法的值,则线程阻塞 |
不可重复利用 | 可重复利用 |
3.Semaphore
ReentrantLock和Synchronized一次都只允许一个线程访问一个资源。Semaphore允许多个线程同时访问同一个资源。
Semaphore管理着一组许可(permit),许可的初始数量可以通过构造函数设定,操作时首先要获取到许可,才能进行操作,操作完成后需要释放许可。如果没有获取许可,则阻塞到有许可被释放。如果初始化了一个许可为1的Semaphore,那么就相当于一个不可重入的互斥锁。其中0、1就相当于它的状态,当=1时表示其他线程可以获取,当=0时,排他,即其他线程必须要等待。
Semaphore的构造方法
1.Semaphore(int permits) :创建具有给定的许可数和非公平的公平设置的 Semaphore,默认非公平锁。
2.Semaphore(int permits, boolean fair) :创建具有给定的许可数和给定的公平设置的 Semaphore。
代码示例
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
public static void main(String[] args) {
// 六个车抢三个车位
Semaphore semaphore = new Semaphore(3);
for(int i = 1;i <= 6;i++){
final int n = i;
new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println(n + "号抢到车位——————————");
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println(n + "号离开车位++++++++++");
semaphore.release();
}
}
}).start();
}
}
}
输出结果
1号抢到车位——————————
2号抢到车位——————————
3号抢到车位——————————
2号离开车位++++++++++
1号离开车位++++++++++
3号离开车位++++++++++
4号抢到车位——————————
6号抢到车位——————————
5号抢到车位——————————
6号离开车位++++++++++
4号离开车位++++++++++
5号离开车位++++++++++
Semaphore在限制流量方面有非常多的应用,比如程序跑批高峰时几万个数据库的连接同时操作,为了不影响其他用户访问只允许同时开放十条连接。
四.线程安全的集合类
Java中的集合包括三大类,它们是Set、List和Map它们都处于java.util包中,Set、List和Map都是接口,它们有各自的实现类。
1.List(列表)
实现类主要有ArrayList,LinkedList,Vector
ArrayList,LinkedList为线程不安全的集合类,ArrayList底层是数组而LinkedList底层实现为链表。Vector和ArrayList类似,是长度可变的数组。Vector是线程安全的,它给几乎所有的public方法都加上了synchronized关键字。由于加锁导致性能降低,在不需要并发访问同一对象时,这种强制性的同步机制就显得多余,所以现在Vector已被弃用。
2.Set(集)
实现类主要有HashSet,TreeSet
HashSet是一个无序的集合,基于HashMap实现;TreeSet是一个有序的集合,基于TreeMap实现。HashSet集合中允许有null元素,TreeSet集合中不允许有null元素。HashSet和TreeSet都是线程不安全的。
3.Map(映射)
实现类主要有HashMap,TreeMap,HashTable
HashTable和HashMap类似,不同点是HashTable是线程安全的,它给几乎所有public方法都加上了synchronized关键字,还有一个不同点是HashTable的K,V都不能是null,但HashMap可以,它现在也因为性能原因被弃用。TreeMap也是线程不安全的。
4.除废弃的集合类外还有哪些方法可以保证线程安全
1.Collections包装方法
Collections工具类中提供了相应的包装方法把它们包装成线程安全的集合
List<E> synArrayList = Collections.synchronizedList(new ArrayList<E>());
Set<E> synHashSet = Collections.synchronizedSet(new HashSet<E>());
Map<K,V> synHashMap = Collections.synchronizedMap(new HashMap<K,V>());
2.java.util.concurrent包中的集合
CopyOnWriteArrayList和CopyOnWriteArraySet
CopyOnWriteArrayList 中的set、add、remove等方法,都使用了ReentrantLock的lock来加锁, unlock来解锁当增加元素的时候使用Arrays.copyOf()来拷贝副本,在副本上增加元素,然后改变原来引用的指向副本。读操作不需要加锁,因此,CopyOnWriteArrayList类是一个线程安全的List接口实现,这对于读操作远远多于写操作的应用非常适合,特别是在并发的情况下,可以提供高性能的并发读取,并保证读取的内容一定是正确的,不受多线程并发问题的影响。
ConcurrentHashMap
1.8版本的ConcurrentHashMap抛弃了原有的 Segment 分段锁,而采用了 CAS + synchronized 来保证并发安全性。详情见HashMap? ConcurrentHashMap? 相信看完这篇没人能难住你!
5.CopyOnWrite机制
CopyOnWrite容器即写是复制的容器。通俗的理解就是当我们往一个容器添加元素的时候,不直接往当前容器添加,而是先将当前容器进行Copy,复制出一个新的容器,然后新的容器里添加元素,添加完元素后,再将原容器的引用指向新的容器。这样做的好处就是我们可以对CopyOnWrite容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素.所以,CopyOnWrite容器也是一种读写分离的思想。读和写不容的容器。
ArrayList里添加元素,在添加的时候是需要加锁的,否则多线程写的时候会copy出多个副本出来
读的时候不需要加锁,如果读的时候有多线程正在像ArrayList中添加数据,还是会读到旧的数据,因为写的时候不会锁住旧的ArrayList
1、使用场景:读多写少
2、使用注意点:减少扩容开销;b、使用批量添加
缺点:
a、内存占用问题
b、数据一致性问题
五.BlockingQueue(阻塞队列)
在某些情况下对阻塞队列的访问可能会造成阻塞。被阻塞的情况主要有如下两种:
1、当阻塞队列是空时,从队列中获取元素的操作将会被阻塞。
2、当阻塞队列是满时,往队列里添加元素的操作将会被阻塞。
当一个线程对已经满了的阻塞队列进行入队操作时会阻塞,除非有另外一个线程进行了出队操作,当一个线程对一个空的阻塞队列进行出队操作时也会阻塞,除非有另外一个线程进行了入队操作。
操作 | 抛异常 ThrowsException | 特定值 SpecialValue | 阻塞 Blocks | 超时 TimesOut |
---|---|---|---|---|
插入 | add(o) | offer(o) | put(o) | offer(o, timeout, timeunit) |
移除 | remove(o) | poll(o) | take(o) | poll(timeout, timeunit) |
检查 | element(o) | peek(o) |
这四类方法分别对应的是:
1、ThrowsException :如果操作不能马上进行,则抛出异常
2、SpecialValue :如果操作不能马上进行,将会返回一个特殊的值,一般是true或者false
3、Blocks : 如果操作不能马上进行,操作会被阻塞
4、TimesOut : 如果操作不能马上进行,操作会被阻塞指定的时间,如果指定时间没执行,则返回一个特殊值,一般是true或者false
插入方法
add(E e):添加成功返回true,失败抛 IllegalStateException 异常
offer(E e):成功返回 true,如果此队列已满,则返回 false
put(E e):将元素插入此队列的尾部,如果该队列已满,则一直阻塞
删除方法
remove(Object o) :移除指定元素,成功返回true,失败返回false
poll():获取并移除此队列的头元素,若队列为空,则返回 null
take():获取并移除此队列头元素,若没有元素则一直阻塞
检查方法
element() :获取但不移除此队列的头元素,没有元素则抛异常
peek() :获取但不移除此队列的头;若队列为空,则返回 null
BlockingQueue的七个实现类
1、ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
2、LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
3、PriorityBlockingQueue :一个支持优先级排序的无界阻塞队列。
4、DelayQueue:一个使用优先级队列实现的无界阻塞队列。
5、SynchronousQueue:一个不存储元素的阻塞队列。
6、LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
7、LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
注意:第七个实现类的末尾为Deque
详细信息Java并发编程-阻塞队列(BlockingQueue)的实现原理
代码示例:BlockingQueue实现生产消费模型
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class ProdConsume_BlockingQueue {
private AtomicInteger atomicInteger = new AtomicInteger();
private BlockingQueue blockingQueue;
private volatile boolean flag = true;
public ProdConsume_BlockingQueue(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void prod() {
while (flag) {
try {
long round = Math.round(Math.random() * 1000);
Thread.sleep(round);
blockingQueue.put(round);
int i = atomicInteger.incrementAndGet();
System.out.println(round + "号商品生产完毕放入队列,队列中还有" + i + "个商品");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void consum() {
while (flag) {
try {
Thread.sleep(500);
Object take = blockingQueue.take();
atomicInteger.decrementAndGet();
System.out.println(take + "号商品被购买");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
BlockingQueue blockingQueue = new ArrayBlockingQueue(100);
ProdConsume_BlockingQueue prodConsume_blockingQueue = new ProdConsume_BlockingQueue(blockingQueue);
new Thread(new Runnable() {
@Override
public void run() {
prodConsume_blockingQueue.prod();
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
prodConsume_blockingQueue.consum();
}
}).start();
try {
TimeUnit.SECONDS.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
prodConsume_blockingQueue.flag = false;
}
}
输出结果
180号商品生产完毕放入队列,队列中还有1个商品
180号商品被购买
331号商品生产完毕放入队列,队列中还有1个商品
201号商品生产完毕放入队列,队列中还有2个商品
331号商品被购买
201号商品被购买
897号商品生产完毕放入队列,队列中还有1个商品
897号商品被购买
588号商品生产完毕放入队列,队列中还有1个商品
217号商品生产完毕放入队列,队列中还有2个商品
588号商品被购买
154号商品生产完毕放入队列,队列中还有2个商品
217号商品被购买
592号商品生产完毕放入队列,队列中还有2个商品
154号商品被购买
442号商品生产完毕放入队列,队列中还有2个商品
592号商品被购买
712号商品生产完毕放入队列,队列中还有2个商品
442号商品被购买
712号商品被购买
893号商品生产完毕放入队列,队列中还有1个商品
15号商品生产完毕放入队列,队列中还有2个商品
893号商品被购买
496号商品生产完毕放入队列,队列中还有2个商品
4号商品生产完毕放入队列,队列中还有3个商品
15号商品被购买
534号商品生产完毕放入队列,队列中还有3个商品
172号商品生产完毕放入队列,队列中还有4个商品
496号商品被购买
4号商品被购买
891号商品生产完毕放入队列,队列中还有3个商品
534号商品被购买
414号商品生产完毕放入队列,队列中还有3个商品
172号商品被购买
891号商品被购买
920号商品生产完毕放入队列,队列中还有2个商品
71号商品生产完毕放入队列,队列中还有3个商品
144号商品生产完毕放入队列,队列中还有4个商品
414号商品被购买
586号商品生产完毕放入队列,队列中还有4个商品
920号商品被购买
71号商品被购买
553号商品生产完毕放入队列,队列中还有3个商品
五.实现多线程的几种方式
1. 继承Thread类,重写run方法
略
2. 实现Runnable接口,重写run方法
略
3. 实现Callable接口,重写call方法,通过FutureTask包装器来创建Thread线程
Runnable和Callable的区别:
1、Callable规定的方法是call(),Runnable规定的方法是run().
2、Callable的任务执行后可返回值,而Runnable的任务是不能返回值得
3、call方法可以抛出异常,run方法不可以
4、运行Callable任务可以拿到一个Future对象,表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并检索计算的结果。通过Future对象可以了解任务执行情况,可取消任务的执行,还可获取执行结果。
Future接口
Future是一个接口,代表了一个异步计算的结果。接口中的方法用来检查计算是否完成、等待完成和得到计算的结果。
当计算完成后,只能通过get()方法得到结果,get方法会阻塞直到结果准备好了。
如果想取消,那么调用cancel()方法。其他方法用于确定任务是正常完成还是取消了。一旦计算完成了,那么这个计算就不能被取消。
FutureTask类
FutureTask类实现了RunnableFuture接口,而RunnnableFuture接口继承了Runnable和Future接口,所以说FutureTask是一个提供异步计算的结果的任务。
FutureTask可以用来包装Callable或者Runnbale对象。因为FutureTask实现了Runnable接口,所以FutureTask也可以被提交给Executor。
Callable两种执行方式
1、借助FutureTask执行
FutureTask类同时实现了两个接口,Future和Runnable接口,所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值。
代码示例
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
public class CallableDemo {
public static void main(String[] args) {
FutureTask<Integer> futureTask = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("Callable");
Thread.sleep(1000);
return (int)(Math.random()*100);
}
});
futureTask.run();
//如果没有执行完一直阻塞
while (!futureTask.isDone()){
}
Integer integer = null;
try {
integer = futureTask.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println(integer);
}
}
2、借助线程池来运行
↓
4. 线程池ThreadPoolExecuter
↓
六.线程池ThreadPoolExecutor
线程池主要是控制运行线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。
主要特点是:线程复用、控制最大并发数、管理线程。
1.降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
2.提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
3.提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
ThreadPoolExecutor的构造方法
7个参数的构造方法 代码示例
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
序号 | 名称 | 类型 | 含义 |
---|---|---|---|
1 | corePoolSize | int | 线程池中的常驻核心线程数 |
2 | maximumPoolSize | int | 线程池能够容纳同时执行的最大线程数 |
3 | keepAliveTime | long | 多余空闲线程的存活时间 |
4 | unit | TimeUnit | keepAliveTime的单位 |
5 | workQueue | BlockingQueue | 被提交但尚未被执行的任务队列 |
6 | threadFactory | ThreadFactory | 线程池中工作线程的线程工厂 |
7 | handler | RejectedExecutionHandler | 拒绝策略 |
1. int corePoolSize:线程池中的常驻核心线程数
核心线程:线程池新建线程的时候,如果当前线程总数小于corePoolSize,则新建的是核心线程,如果超过corePoolSize,则新建的是非核心线程核心线程默认情况下会一直存活在线程池中,即使这个核心线程是闲置状态。
如果指定ThreadPoolExecutor的allowCoreThreadTimeOut这个属性为true,那么核心线程如果为闲置状态,超过一定时间(时长下面参数决定),就会被销毁掉。
2.int maximumPoolSize:线程池能够容纳同时执行的最大线程数
maximumPoolSize此值必须大于等于1.
maximumPoolSize = corePoolSize + 非核心线程数(可缓冲的线程数)。
3.long keepAliveTime:多余空闲线程的存活时间
当前线程池数量超过corePoolSize时,当空闲时间达到keepAliveTime值时,多余空闲线程会被销毁直到只剩下corePoolSize个线程为止。
默认情况下,只有当线程池中的线程数大于corePoolSize时keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize。
4.TimeUnit unit:keepAliveTime的单位
5.BlockingQueue workQueue:被提交但尚未被执行的任务队列
当所有的核心线程都在工作时,新添加的任务会被添加到这个队列中等待处理,如果队列满了,则新建非核心线程执行任务。
6.ThreadFactory threadFactory:创建线程的方式。
用于创建线程,一般用默认的即可。
7.RejectedExecutionHandler handler:拒绝策略
当提交任务数超过maxmumPoolSize+workQueue之和时,任务会交给RejectedExecutionHandler来处理。
jdk1.5提供的四种拒绝策略 :
1.AbortPolicy(默认):直接抛出RejectedExecutionException异常阻止系统正常运行。
2.CallerRunsPolicy:“调用者运行”一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。
3.DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务。
4.DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种方案。
以上内置拒绝策略均实现了RejectedExecutionHandler接口
线程池的处理流程
线程池判断核心线程池里是的线程是否都在执行任务,如果不是(正在执行的线程数 小于 corePoolSize),则创建一个新的工作线程来执行任务。如果核心线程池里的线程都在执行任务(正在执行的线程数 大于 corePoolSize),则进入下一个流程
线程池判断工作队列是否已满。如果工作队列没有满,则将新提交的任务储存在这个工作队列里。如果工作队列满了,则进入下一个流程。
线程池判断其内部线程是否都处于工作状态。如果没有正在(运行的线程数量小于maximumPoolSize),则创建一个新的工作线程来执行任务。如果已满了(运行的线程数量 大于 maximumPoolSize),则交给饱和策略来处理这个任务。
当一个线程完成任务时,它会从队列中取下一个任务来执行。
当一个线程无事可做超过一定的时间(keepAlilveTime)时,线程池会判断:如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。线程池的所有任务完成后最终会收缩到corePoolSize的大小。
线程池执行时的四种情况
如果当前运行的线程少于corePoolSize,则创建新线程来执行任务
如果运行的线程等于或多于corePoolSize ,则将任务加入BlockingQueue
如果无法将任务加入BlockingQueue(队列已满),则创建新的线程来处理任务
如果创建新线程将使当前运行的线程超出maxiumPoolSize,任务将被拒绝,并调用RejectedExecutionHandler.rejectedExecution()方法。
Java提供的线程池
Java在Executors工具类中提供了5种线程池
1. SingleThreadExecutor 单一线程池
它只会创建一条工作线程处理任务;
采用的阻塞队列为LinkedBlockingQueue;
2.FixedThreadPool 定长线程池
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
它是一种固定大小的线程池;
corePoolSize和maximunPoolSize都为用户设定的线程数量nThreads;
keepAliveTime为0,意味着一旦有多余的空闲线程,就会被立即停止掉;但这里keepAliveTime无效;
阻塞队列采用了LinkedBlockingQueue,它是一个无界队列;
由于阻塞队列是一个无界队列,因此永远不可能拒绝任务;
由于采用了无界队列,实际线程数量将永远维持在nThreads,因此maximumPoolSize和keepAliveTime将无效。
3. CachedThreadPool 可缓存线程池
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
它是一个可以无限扩大的线程池;
它比较适合处理执行时间比较小的任务;
corePoolSize为0,maximumPoolSize为无限大,意味着线程数量可以无限大;
keepAliveTime为60S,意味着线程空闲时间超过60S就会被杀死;
采用SynchronousQueue装等待的任务,这个阻塞队列没有存储空间,这意味着只要有请求到来,就必须要找到一条工作线程处理他,如果当前没有空闲的线程,那么就会再创建一条新的线程。
4. ScheduledThreadPool 可调度的线程池
它用来处理延时任务或定时任务。
它接收SchduledFutureTask类型的任务,有两种提交任务的方式:
scheduledAtFixedRate
scheduledWithFixedDelay
SchduledFutureTask接收的参数:
time:任务开始的时间
sequenceNumber:任务的序号
period:任务执行的时间间隔
它采用DelayQueue存储等待的任务
DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若time相同则根据sequenceNumber排序;
DelayQueue也是一个无界队列;
工作线程的执行过程:
工作线程会从DelayQueue取已经到期的任务去执行;
执行结束后重新设置任务的到期时间,再次放回DelayQueue
5. newWorkStealingPool Java8新增,使用可用的处理器作为它的并行级别
待补充
生产上应该使用哪种线程池
在阿里巴巴Java开发手册并发处理章节中严禁使用Java提供的线程池,所以生产上只能使用自定义的线程池。
【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。 说明:使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
【强制】线程池不允许使用Executors去创建,而是通过ThreadPoolExecutor的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险。说明:Executors返回的线程池对象的弊端如下:
1)FixedThreadPool和SingleThreadPool:允许的请求队列长度为Integer.MAX_VALUE,可能会堆积大量的请求,从而导致OOM。
2)CachedThreadPool和ScheduledThreadPool:允许的创建线程数量为Integer.MAX_VALUE,可能会创建大量的线程,从而导致OOM。
向线程池提交任务
1.void execute(Runnable command)
用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功
ExecutorService executorService = Executors.newFixedThreadPool(5);
executorService.execute(() -> System.out.println(Thread.currentThread().getName()));
2.<T> Future<T> submit(Callable<T> task)
用于提交需要返回值的任务
Future<Integer> future = executorService.submit(() -> (int) Math.random());
Integer i = future.get();
System.out.println(i);
关闭线程池
ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),它们的原理是遍历线程池中的工作线程,然后逐个调用线程的interrupt方法来中断线程,但这两种方式对于正在执行的线程处理方式不同。
1.shutdown()
仅停止阻塞队列中等待的线程,那些正在执行的线程就会让他们执行结束。
2.shutdownNow()
不仅会停止阻塞队列中的线程,而且会停止正在执行的线程。
合理配置线程池
CPU 密集型
CPU 密集的意思是该任务需要大量的运算,而没有阻塞,CPU 一直全速运行。
CPU 密集型任务尽可能的少的线程数量,一般为 CPU 核数 + 1 个线程的线程池。
IO 密集型
由于 IO 密集型任务线程并不是一直在执行任务,可以多分配一点线程数,如 CPU * 2 。
也可以使用公式:CPU 核数 / (1 - 阻塞系数);其中阻塞系数在 0.8 ~ 0.9 之间。