一、并发
进程:每个进程都拥有自己的一套变量
线程:线程之间共享数据
1.线程
Java中为多线程任务提供了很多的类。包括最基础的Thread类、Runnable等接口,用于线程同步的锁、阻塞队列、同步器,使用线程池的执行器、执行框架,还有可以在多线程中使用的线程安全集合等。
(1)使用多线程给其他任务提供机会
创建线程:
- 将任务代码移到实现了Runnable接口的类的run方法中,这个接口非常简单,只有一个方法:
public interface Runnable{
void run();
}
- 由Runnable创建一个Thread对象
Thread t = new Thread(r);
- 启动线程:
t.start();
注意:不要调用Thread或Runnable对象中的run方法,只会执行同一个线程中的任务,不会启动新的线程,应该使用Thread的start方法。
二、中断线程
没有终止线程的方法,只能通过interrupt方法来请求中断。
Thread或Runnable对象的run()方法包装了新线程中执行的代码,在run()方法中遇到下面的情况,线程会终止。
- 正常终止。执行完最后一条语句,也包括遇到return返回
- 异常终止。出现未捕获的异常
强制结束:
- 调用Thread对象的stop()方法。抛出一个ThreadDeath异常,停止线程执行(这个异常如果被捕获一定要重新抛出)。这个方法已经不推荐使用,原因是线程可能停止在一个不安全的状态(例如转账操作,从一个账号减了钱,还没加到另一个账号,线程被强制结束了),应该使用请求中断的方式。
- 请求中断方式。要结束一个线程,就设置该线程的中断变量(调用Thread对象的interrupt()方法)(表明着有人想要中断这个线程),线程中的代码自己要负责查询中断变量(Thread类静态方法interrupted()或Thread对象的isInterrupted()方法),如果发现中断变量被设置了就自觉点不要再执行了,恢复到安全的状态后自行退出。请求中断不是强制的,如果线程中的代码不查询中断变量,或者发现中断变量已经被设置了但是不理会继续厚着脸皮执行,这个线程还是会一直运行不会被停止。
//栗子
public class InterruptTest {
public static void main(String[] args) throws InterruptedException {
MyThread t = new MyThread("MyThread");
t.start();
Thread.sleep(100);// 睡眠100毫秒
t.interrupt();// 中断t线程
}
}
class MyThread extends Thread {
int i = 0;
public MyThread(String name) {
super(name);
}
public void run() {
while(!isInterrupted()) {// 当前线程没有被中断,则执行
System.out.println(getName() + getId() + "执行了" + ++i + "次");
}
}
}
void interrupt()方法和InterruptedException特别说明
- 如果调用interrupt方法时,若线程正被某些可中断的方法阻塞着(sleep,wait或可中断IO调用等),那么现在肯定是无法检测中断状态的,系统会清理中断状态,抛出InterruptedException异常,阻塞的方法调用会立即被这个异常中断。
- 如果调用interrupt方法将中断状态设置为了true,不久就调用了一个可中断的方法(sleep,wait,可中断IO调用等),这个调用不会成功,并且同样会清除中断状态标志,使中断标志为false,抛出InterruptedException异常。可见,如果会循环调用sleep()这类可中断的方法,就不需要再手动检测中断状态了。
- interrupt向线程发送中断请求,线程的中断状态将被设置为true,如果目前线程被阻塞,那么InterruptedException异常将被抛出,中断状态会被设为false。
Thread的static boolean interrupted():
- 测试当前线程是否被中断,这一调用会产生一个副作用,它将当前线程的中断状态重置为false
Thread的boolean isInterrupted():
- 测试线程是否被终止,这一调用不会改变线程的中断状态。
可见如果不设置中断,InterruptedException肯定不会出现,而只要抛出InterruptedException,设置的中断状态肯定已经被清理了,这种情况只有InterruptedException这个异常是我们知道有中断请求的唯一标识了,因此我们要向外层通知有中断发生,千万不要再把这个异常压制住,否则怎么调用interrupt()方法请求中断都不会有作用,线程中外层的代码压根不知道有中断这回事,照常运行。将这个中断请求通知给外层有两种方式:
- catch到InterruptedException时,调用Thread.currentThread().interrupt(),重新把中断状态设置上,让外层可以检测到。
- 最好的方法是,不要再catch InterruptedException异常啦,只要有这个异常就往外层抛吧。一直抛到最外层,在Thread对象或Runnable对象的run()方法中处理这个异常(处理操作:恢复到安全的状态然后自觉退出)。
public class Erupt {
static class MyInterruptableExceptionTask implements Runnable
{
private int begin=0;
public MyInterruptableExceptionTask(int s){begin=s;}
@Override
public void run() {
try {
int end=begin+10;
for(int i=begin; i<end; i++){
System.out.println("sub: "+i);
Thread.sleep(1000); //如果设置中断时正在sleep,或设置完中断后一个循环里遇到sleep,都会抛出InterruptedException异常,不需要再手动检测中断状态了
}
} catch (InterruptedException e) {
System.out.println("the call Thread.sleep(n) is interrupted by InterruptedExcetpion");
Thread.currentThread().interrupt(); //产生InterruptedException异常时中断状态被清除,所以要重新设置中断或将中断异常向外抛出供后续代码检测是否发生了中断
}
if(Thread.currentThread().isInterrupted())
System.out.println("sub thread is interrupted");
else
System.out.println("sub natural stop");
}
}
public static void main(String[] args) {
Thread t=new Thread(new MyInterruptableExceptionTask(111));
t.start();
for(int i=0; i<10; i++){
System.out.println("main: "+i);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(i==5)
t.interrupt();
}
}
}
三、线程状态
线程有6中状态:
- New:用new操作符创建一个新线程时,如 new Thread(r), 该线程还没有开始运行。这意味着它的状态是new,当一个线程处在new状态,程序还没有开始运行线程中的代码。在线程运行之前还有一些基础工作要做。
- Runnable:一旦调用start()方法,线程就处于runnable状态。可以可运行的线程可能正在运行也可能没有运行,这取决于操作系统给线程提供运行的时间(这就是为什么这个状态成为可运行而不是运行),事实上,运行中的线程被中断,目的是为了让他们线程获得运行机会。线程调度的细节依赖于操作系统提供的服务。抢占式调度系统给每一个可运行线程一个时间片来执行任务,当时间片用完,操作系统剥夺该线程的运行权,并给另一个线程可运行机会。当选择下一个线程时,操作系统考虑线程的优先级。
- Bolocked:阻塞,当一个线程试图获取一个内部的对象锁(而不是java.util.concurrent库里的锁), 而该锁被其他线程持有,则该线程进入阻塞状态。当其他线程释放该锁,并且线程调度器允许本线程持有它的时候,该线程将变成非阻塞状态。
- Waiting:等待,当线程通知另一个线程通知调度器一个条件时,它自己进入等待状态。在调用Object.wait方法或Thread.join方法,或者是等待java.util.concurrent库中的Lock或Condition时,就会出现这种情况。实际上,被阻塞状态与被等待状态是有很大不同的。
- Timed waiting:计时等待,有几个方法有一个超时参数。调用它们导致线程进入计时等待(timed waiting)状态。这一状态将一直保持到超时期满或者接收到适当的通知。带有超时参数的方法有Thread.sleep和Object.wait, Thrad.join, Lock.tryLock以及Condition.await的计时版。
- Terminated:因为run方法正常退出而自然死亡;-因为一个没有捕获的异常终止了run方法二意外死亡

要确定一个线程的当前状态,可以调用Thread的getState()方法。
1.创建新线程
new Thread(r)
2.可运行线程
调用start后,线程处于Runnable转态
3.被阻塞线程和等待线程
- 当试图获取一个内部的对象锁,而该锁被他人持有,则进入阻塞状态。
- 当线程等待另一个线程通知调度器一个条件时,自己进入等待状态。
- 有几个方法有一个超时参数,调用他们导致线程进入计时等待
4.被终止的线程
- 自然退出
- 没有捕获异常终止了run方法而以外死亡
- void join():阻塞调用此方法的线程(calling thread),直到线程t完成,此线程再继续;通常用于在main()主线程内,等待其它线程完成再结束main()主线程,底层通过wait实现的。
- void join(long millis):等待指定的线程死亡或者经过指定的毫秒数
四、线程属性
1.线程优先级
默认情况下,一个线程继承它父亲的优先级,可以用setPriority()方法来提高一个线程的优先级:
- MIN_PRIORITY:1
- MAX_PRIORITY:10
- NORM_PRIORITY:5
- void setPriority(int newPriority)
- static void yield():理论上,yield意味着放手,放弃,投降。一个调用yield()方法的线程告诉虚拟机它乐意让其他线程占用自己的位置。这表明该线程没有在做一些紧急的事情。注意,这仅是一个暗示,并不能保证不会产生任何影响。
2.守护线程
通过调用:
t.setDaemon(true)
可以将当前线程转换为守护线程。
守护线程的唯一用途是为其他线程提供服务。当只剩下守护线程时,虚拟机就退出来了。
守护线程很容易被中断,所以尽量避免用守护线程去访问文件、数据库等固有资源。
3.未捕获异常处理器
线程运行不能按照顺序执行过程中捕获异常的方式来处理异常,异常会被直接抛出到控制台(由于线程的本质,使得你不能捕获从线程中逃逸的异常。一旦异常逃逸出任务的run方法,它就会向外传播到控制台,除非你采用特殊的形式捕获这种异常。),这样会让你很头疼,无法捕捉到异常就无法处理异常而引发的问题。
线程的run方法是不会抛出任何受查异常,所以异常需要被传播到一个用于未捕获异常的处理器,该处理器必须实现Thread.UncaughtExceptionHandler接口的类,这个接口只有一个方法:
void uncaughtException(Thread t,Throwable e);
/*
* 第一步:定义符合线程异常处理器规范的“异常处理器”
* 实现Thread.UncaughtExceptionHandler规范
*/
class MyUncaughtExceptionHandler implements Thread.UncaughtExceptionHandler{
/*
* Thread.UncaughtExceptionHandler.uncaughtException()会在线程因未捕获的异常而临近死亡时被调用
*/
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("caught "+e);
}
}
/*
* 第二步:定义线程工厂
* 线程工厂用来将任务附着给线程,并给该线程绑定一个异常处理器
*/
class HanlderThreadFactory implements ThreadFactory{
@Override
public Thread newThread(Runnable r) {
System.out.println(this+"creating new Thread");
Thread t = new Thread(r);
System.out.println("created "+t);
t.setUncaughtExceptionHandler(new MyUncaughtExceptionHandler());//设定线程工厂的异常处理器
System.out.println("eh="+t.getUncaughtExceptionHandler());
return t;
}
}
/*
* 第三步:我们的任务可能会抛出异常
* 显示的抛出一个exception
*/
class ExceptionThread implements Runnable{
@Override
public void run() {
Thread t = Thread.currentThread();
System.out.println("run() by "+t);
System.out.println("eh = "+t.getUncaughtExceptionHandler());
throw new RuntimeException();
}
}
/*
* 第四步:使用线程工厂创建线程池,并调用其execute方法
*/
public class ThreadExceptionUncaughtExceptionHandler{
public static void main(String[] args){
ExecutorService exec = Executors.newCachedThreadPool(new HanlderThreadFactory());
exec.execute(new ExceptionThread());
}
}
可以使用setUncaughtExceptionHandler方法安装一个处理器,也可以使用setDefaultUncaughtExceptionHandler为所有线程安装一个默认的处理器。
public static void main(String[] args){
Thread.setDefaultUncaughtExceptionHandler(new MyUncaughtExceptionHandler());
ExecutorService exec =Executors.newCachedThreadPool();
exec.execute(new ExceptionThread());
}
在java中要捕捉多线程产生的异常,需要自定义异常处理器,并设定到对应的线程工厂中(即第一步和第二步)。
如果你知道将要在代码中处使用相同的异常处理器,那么更简单的方式是在Thread类中设置一个静态域,并将这个处理器设置为默认的未捕获处理器。
这个处理器只有在不存在线程专有的未捕获异常处理器的情况下才会被调用。
五、同步
1.锁对象
ReentrantLock是Java并发包中互斥锁,它有公平锁和非公平锁两种实现方式,以lock()为例,其使用方式为:
myLock.lock();
try{
critical section
}
finally{
myLock.unLock();
}
public void transfer(int from,int to,double amount){
bankLock.lock();
try{
if (accounts[from]<amount){
return;
}
System.out.println(Thread.currentThread());
accounts[from]-=amount;
System.out.printf(" %10.2f from %d to %d ",amount,from,to);
accounts[to]+=amount;
System.out.printf("Total Balance:%10.2f%n",getTotalBalance());
}
finally {
bankLock.unlock();
}
}
那么,ReentrantLock内部是如何实现锁的呢?接下来我们就以JDK1.7中的ReentrantLock的lock()为例详细研究下。ReentrantLock类实现了Lock和java.io.Serializable接口,其内部有一个实现锁功能的关键成员变量Sync类型的sync,定义如下:
/** Synchronizer providing all implementation mechanics */
private final Sync sync;
2.条件对象
使用一个条件对象来管理那些已经获得一个锁但是不能做有用工作的线程
private Condition sufficentFunds;
private final double[] accounts ;
public Bank(int n,double initialBalance){
accounts = new double[n];
Arrays.fill(accounts,initialBalance);
sufficentFunds = bankLock.newCondition();
}
如果发现银行存储不足,等待直到另一个线程向账户注入资金,但是由于锁对象的排他性,通过条件对象使当前线程被阻塞,放弃了锁。如果transfer发现资金不足,会调用sufficientFunds.await();而转入线程调会调用sufficientFunds.singalAll()
sufficientFunds.singalAll():通知等待的线程可能条件满足,可以让sufficientFunds.await()返回阻塞地方再次测试一下。
singal是随机选择一条等待线程。
public class Bank {
private Lock bankLock = new ReentrantLock();
private Condition sufficentFunds;
private final double[] accounts ;
public Bank(int n,double initialBalance){
accounts = new double[n];
Arrays.fill(accounts,initialBalance);
sufficentFunds = bankLock.newCondition();
}
public void transfer(int from,int to,double amount){
bankLock.lock();
try{
if (accounts[from]<amount){
sufficentFunds.await();
}
System.out.println(Thread.currentThread());
accounts[from]-=amount;
System.out.printf(" %10.2f from %d to %d ",amount,from,to);
accounts[to]+=amount;
System.out.printf("Total Balance:%10.2f%n",getTotalBalance());
sufficentFunds.signalAll();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
bankLock.unlock();
}
}
private double getTotalBalance() {
double sum = 0;
for (double a:accounts){
sum+=a;
}
return sum;
}
public int size(){
return accounts.length;
}
}
3.synchronized关键字
锁和条件对象的关键之处:
- 锁用来保护代码片段,任何时刻只能有一个线程执行保护的代码
- 锁可以管理试图进入被保护代码段的线程
- 锁可以拥有一个或多个相关的条件对象
- 每个条件对象管理那些已经进入被保护代码段但还不能运行的线程
public synchronized void method(){
method body
}
等价于
public void method(){
this.intrinsiclock.lock();
try{
method
}
finally{
this.intrinsiclock.unlock();
}
}
wait方法等价于intrinsicCondition.awaiy();
notifyAll等价于intrinsicCondition.singalAll();
public class Bank {
private Lock bankLock = new ReentrantLock();
private Condition sufficentFunds;
private final double[] accounts ;
public Bank(int n,double initialBalance){
accounts = new double[n];
Arrays.fill(accounts,initialBalance);
sufficentFunds = bankLock.newCondition();
}
public synchronized void transfer(int from,int to,double amount) throws InterruptedException {
if (accounts[from]<amount){
wait();
}
System.out.println(Thread.currentThread());
accounts[from]-=amount;
System.out.printf(" %10.2f from %d to %d ",amount,from,to);
accounts[to]+=amount;
System.out.printf("Total Balance:%10.2f%n",getTotalBalance());
notifyAll();
}
private double getTotalBalance() {
double sum = 0;
for (double a:accounts){
sum+=a;
}
return sum;
}
public int size(){
return accounts.length;
}
}
(1)修饰一个代码块
一个线程访问一个对象中的synchronized(this)同步代码块时,其他试图访问该对象的线程将被阻塞。我们看下面一个例子:
/**
* 同步线程
*/
class SyncThread implements Runnable {
private static int count;
public SyncThread() {
count = 0;
}
public void run() {
synchronized(this) {
for (int i = 0; i < 5; i++) {
try {
System.out.println(Thread.currentThread().getName() + ":" + (count++));
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public int getCount() {
return count;
}
}
SyncThread syncThread = new SyncThread();
Thread thread1 = new Thread(syncThread, "SyncThread1");
Thread thread2 = new Thread(syncThread, "SyncThread2");
thread1.start();
thread2.start();
SyncThread1:0
SyncThread1:1
SyncThread1:2
SyncThread1:3
SyncThread1:4
SyncThread2:5
SyncThread2:6
SyncThread2:7
SyncThread2:8
SyncThread2:9
当两个并发线程(thread1和thread2)访问同一个对象(syncThread)中的synchronized代码块时,在同一时刻只能有一个线程得到执行,另一个线程受阻塞,必须等待当前线程执行完这个代码块以后才能执行该代码块。Thread1和thread2是互斥的,因为在执行synchronized代码块时会锁定当前的对象,只有执行完该代码块才能释放该对象锁,下一个线程才能执行并锁定该对象。
我们再把SyncThread的调用稍微改一下:
Thread thread1 = new Thread(new SyncThread(), "SyncThread1");
Thread thread2 = new Thread(new SyncThread(), "SyncThread2");
thread1.start();
thread2.start();
SyncThread1:0
SyncThread2:1
SyncThread1:2
SyncThread2:3
SyncThread1:4
SyncThread2:5
SyncThread2:6
SyncThread1:7
SyncThread1:8
SyncThread2:9
这时创建了两个SyncThread的对象syncThread1和syncThread2,线程thread1执行的是syncThread1对象中的synchronized代码(run),而线程thread2执行的是syncThread2对象中的synchronized代码(run);我们知道synchronized锁定的是对象,这时会有两把锁分别锁定syncThread1对象和syncThread2对象,而这两把锁是互不干扰的,不形成互斥,所以两个线程可以同时执行。
(2)当一个线程访问对象的一个synchronized(this)同步代码块时,另一个线程仍然可以访问该对象中的非synchronized(this)同步代码块。
public class SyncTest implements Runnable{
private static int count;
@Override
public void run() {
for (int i = 0;i<5;i++){
System.out.println(Thread.currentThread().getName()+":"+(count++));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void print(){
for (int i = 0;i<5;i++){
System.out.println(Thread.currentThread().getName()+":"+(count++));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
SyncTest s = new SyncTest();
Thread t1 = new Thread(s);
Thread t2 = new Thread(s);
t1.start();
t2.start();
}
}
Thread-1:0
Thread-0:0
Thread-1:1
Thread-0:2
Thread-1:3
Thread-0:4
Thread-0:5
Thread-1:5
Thread-0:7
Thread-1:6
(3)指定要给某个对象加锁
/**
* 银行账户类
*/
class Account {
String name;
float amount;
public Account(String name, float amount) {
this.name = name;
this.amount = amount;
}
//存钱
public void deposit(float amt) {
amount += amt;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//取钱
public void withdraw(float amt) {
amount -= amt;
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public float getBalance() {
return amount;
}
}
/**
* 账户操作类
*/
class AccountOperator implements Runnable{
private Account account;
public AccountOperator(Account account) {
this.account = account;
}
public void run() {
synchronized (account) {
account.deposit(500);
account.withdraw(500);
System.out.println(Thread.currentThread().getName() + ":" + account.getBalance());
}
}
}
Account account = new Account("zhang san", 10000.0f);
AccountOperator accountOperator = new AccountOperator(account);
final int THREAD_NUM = 5;
Thread threads[] = new Thread[THREAD_NUM];
for (int i = 0; i < THREAD_NUM; i ++) {
threads[i] = new Thread(accountOperator, "Thread" + i);
threads[i].start();
}
Thread3:10000.0
Thread2:10000.0
Thread1:10000.0
Thread4:10000.0
Thread0:10000.0
在AccountOperator 类中的run方法里,我们用synchronized 给account对象加了锁。这时,当一个线程访问account对象时,其他试图访问account对象的线程将会阻塞,直到该线程访问account对象结束。也就是说谁拿到那个锁谁就可以运行它所控制的那段代码。
当有一个明确的对象作为锁时,就可以用类似下面这样的方式写程序。
public void method3(SomeObject obj)
{
//obj 锁定的对象
synchronized(obj)
{
// todo
}
}
当没有明确的对象作为锁,只是想让一段代码同步时,可以创建一个特殊的对象来充当锁:
class Test implements Runnable
{
private byte[] lock = new byte[0]; // 特殊的instance变量
public void method()
{
synchronized(lock) {
// todo 同步代码块
}
}
public void run() {
}
}
说明:零长度的byte数组对象创建起来将比任何对象都经济――查看编译后的字节码:生成零长度的byte[]对象只需3条操作码,而Object lock = new Object()则需要7行操作码。
(4)修饰一个方法
ynchronized修饰一个方法很简单,就是在方法的前面加synchronized,public synchronized void method(){//todo}; synchronized修饰方法和修饰一个代码块类似,只是作用范围不一样,修饰代码块是大括号括起来的范围,而修饰方法范围是整个函数。
public synchronized void run() {
for (int i = 0; i < 5; i ++) {
try {
System.out.println(Thread.currentThread().getName() + ":" + (count++));
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
(5)Synchronized也可修饰一个静态方法,用法如下:
public synchronized static void method() {
// todo
}
将静态方法声明为synchronize,如果调用这个方法,则该类的class对象的锁被锁住,其他线程可以调用同一个类的这个或任何其他同步静态方法都需要等待当前线程释放。
(6)修饰一个类:
同修饰static的效果一样
内部锁和条件存在的局限性:
- 不能中断一个正在试图获取锁的线程
- 试图获得锁时不能设定超时
- 每个锁仅有单一的条件,可能是不够的
- 最好是不使用Lock/Condition也不使用synchronize关键字,多数情况下使用concurrent包中的一种机制
- synchronized尽量使用
- 有特殊情况才使用Lock/Conditions
7.Volatile域
有时候,仅仅为了同步一两个实例域就使用synchronized关键字或是Lock/Condition,会造成很多不必要的开销。这时候我们可以使用volatile关键字,使用volatile关键字修饰一个实例域会告诉编译器和虚拟机这个域可能会被多线程并发访问,这样编译器和虚拟机就能确保它的值总是我们所期望的。
volatile关键字的实现原理大致是这样的:我们在访问内存中的变量时,通常都会把它缓存在寄存器中,以后再需要读它的值时,只需从相应寄存器中读取,若要对该变量进行写操作,则直接写相应寄存器,最后写回该变量所在的内存单元。若线程A把count变量的值缓存在寄存器中,并将count加2(将相应寄存器的值加2),这时线程B被调度,它读取count变量加2后并写回。然后线程A又被调度,它会接着刚才的操作,也就是会把count值写回,此时线程A是直接把寄存器中的值写回count所在单元,而这个值是过期的。若count被volatile关键字修饰,这个问题便可被圆满解决。volatile变量有一个性质,就是任何时候读取它的值时,都会直接去相应内存单元读取,而不是读取缓存在寄存器中的值。这样一来,在上面那个场景中,线程A把count写回时,会从内存中读取count最新的值,从而确保了count的值总是我们所期望的。
8.final变量
关键字 final 可以视为 C++ 中 const 机制的一种受限版本,用于构造不可变对象。final 类型的域是不能修改的(但如果 final 域所引用的对象时可变的,那么这些被引用的对象是可以修改的)。然而,在 Java 内存模型中,final 域还有着特殊的语义。final 域能确保初始化过程的安全性,从而可以不受限制的访问不可变对象,并在共享这些对象时无需同步。
注: 个人理解为,final 字段一旦被初始化完成,并且构造器没有把 this 引用传递出去,那么在其他线程中就能看到 final 字段的值(域内变量可见性,和 volatile 类似),而且其外部可见状态永远也不会改变。它所带来的安全性是最简单最纯粹的。
注: 即使对象是可变的,通过将对象的某些域声明为final类型,仍然可以 简化对状态的判断 ,因此限制对象的可变性也就相当于限制了该对象可能的状态集合。仅包含一个或两个可变状态的“基本不可变”对象仍然比包含多个可变状态的对象简单。通过将域声明为final类型,也相当于告诉维护人员这些域是不会变化的。
正如“除非需要更高的可见性,否则应将所有的饿域都声明为私有域”[EJ Item 12]是一个良好的变成习惯,“除非需要某个域是可变的,否则应将其声明为final域”也是一个良好的变成习惯。
9.原子性
原子是世界上的最小单位,具有不可分割性。比如 a=0;(a非long和double类型) 这个操作是不可分割的,那么我们说这个操作时原子操作。再比如:a++; 这个操作实际是a = a + 1;是可分割的,所以他不是一个原子操作。非原子操作都会存在线程安全问题,需要我们使用同步技术(sychronized)来让它变成一个原子操作。一个操作是原子操作,那么我们称它具有原子性。Java的concurrent包下提供了一些原子类,我们可以通过阅读API来了解这些原子类的用法。比如:AtomicInteger、AtomicLong、AtomicReference等。
11.锁测试与超时
通过tryLock()去试图申请锁,如果返回true则立即执行,返回false则去做其他事
public class TestTryLock {
private List<Object> list = new ArrayList<Object>();
private Lock lock = new ReentrantLock();
public static void main(String[] args) {
final TestTryLock test = new TestTryLock();
new Thread("第一个线程 ") {
@Override
public void run() {
test.doSomething(Thread.currentThread());
}
}.start();
new Thread("第二个线程 ") {
@Override
public void run() {
test.doSomething(Thread.currentThread());
}
}.start();
}
public void doSomething(Thread thread) {
if (lock.tryLock()) {
try {
System.out.println(thread.getName() + "得到了锁.");
for (int i = 0; i < 10; i++) {
list.add(i);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println(thread.getName() + "释放了锁.");
lock.unlock();
}
} else {
System.out.println(thread.getName() + "获取锁失败.");
}
}
}
以上代码运行结果如下:
第一个线程 得到了锁.
第一个线程 释放了锁.
第二个线程 得到了锁.
第二个线程 释放了锁.
12.读写锁
若很多线程从一个内存区域读取数据,但其中只有极少的一部分线程会对其中的数据进行修改,此时我们希望所有Reader线程共享数据,而所有Writer线程对数据的访问要互斥。我们可以使用读/写锁来达到这一目的。
Java中的读/写锁对应着ReentrantReadWriteLock类,它实现了ReadWriteLock接口,这个接口的定义如下:
public interface ReadWriteLock {
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock();
/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock();
}
我们可以看到这个接口就定义了两个方法,其中readLock方法用来获取一个“读锁”,writeLock方法用来获取一个“写锁”。
ReentrantReadWriteLock类的使用步骤通常如下所示:
//构造一个ReentrantReadWriteLock对象
private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
//分别从中“提取”读锁和写锁
private Lock readLock = rwl.readLock();
private Lock writeLock = rwl.writeLock();
//对所有的Reader线程加读锁
readLock.lock();
try {
//读操作可并发,但写操作会互斥
} finally {
readLock.unlock();
}
//对所有的Writer线程加写锁
writeLock.lock();
try {
//排斥所有其他线程的读和写操作
} finally {
writeLock.unlock();
}
在使用ReentrantReadWriteLock类时,我们需要注意以下两点:
- 若当前已经有线程占用了读锁,其他要申请写锁的线程需要占用读锁的线程释放了读锁才能申请成功;
- 若当前已经有线程占用了写锁,其他要申请读锁或写锁的线程都需要等待占用写锁的线程释放了写锁才能申请成功。
13.为什么弃用stop和suspend方法
stop方法会终止所有未结束的方法,包括run方法,当前线程被终止,立即释放被它锁住的所有对象的锁,会导致对象状态的不一致。
suspend是挂起一个持有锁对象的线程,如果调用suspend方法的线程试图获取同一个锁,就会产生死锁。假如有A,B两个线程,A线程在获得某个锁之后被suspend阻塞,这时A不能继续执行,线程B在或者相同的锁之后才能调用resume方法将A唤醒,但是此时的锁被A占有,B不能继续执行,也就不能及时的唤醒A,此时A,B两个线程都不能继续向下执行而形成了死锁。这就是suspend被弃用的原因。
六、阻塞队列
以上我们所介绍的都属于Java并发机制的底层基础设施。在实际编程我们应该尽量避免使用以上介绍的较为底层的机制,而使用Java类库中提供给我们封装好的较高层次的抽象。对于许多同步问题,我们可以通过使用一个或多个队列来解决:生产者线程向队列中插入元素,消费者线程则取出他们。考虑一下我们最开始提到的Counter类,我们可以通过队列来这样解决它的同步问题:增加计数值的线程不能直接访问Counter对象,而是把add指令对象插入到队列中,然后由另一个可访问Counter对象的线程从队列中取出add指令对象并执行add操作(只有这个线程能访问Counter对象,因此无需采取额外措施来同步)。
当试图向满队列中添加元素或者向空队列中移除元素时,阻塞队列(blocking queue)会导致线程阻塞。通过阻塞队列,我们可以按以下模式来工作:工作者线程可以周期性的将中间结果放入阻塞队列中,其他线程可取出中间结果并进行进一步操作。若前者工作的比较慢(还没来得及向队列中插入元素),后者会等待它(试图从空队列中取元素从而阻塞);若前者运行的快(试图向满队列中插元素),它会等待其他线程。阻塞队列提供了以下方法:
- add方法:添加一个元素。若队列已满,会抛出IllegalStateException异常。
- element方法:返回队列的头元素。若队列为空,会抛出NoSuchElementException异常。
- offer方法:添加一个元素,若成功则返回true。若队列已满,则返回false。
- peek方法:返回队列的头元素。若队列为空,则返回null。
- poll方法:删除并返回队列的头元素。若队列为空,则返回null。
- put方法:添加一个元素。若队列已满,则阻塞。
- remove方法:移除并返回头元素。若队列为空,会抛出NoSuchElementException。
- take方法:移除并返回头元素。若队列为空,则阻塞。
阻塞队列的方法分为以下三种:
- 当队列作为线程管理工具,可以使用put和take作为阻塞线程的手段
- 当向满的队列中添加或者从空的队列中移出元素,add、remove和element操作将抛出异常。
- 当队列会在任何时刻满或者空,要使用offer、poll、peek方法作为代替。
- 抛异常:如果试图的操作无法立即执行,抛一个异常。
- 特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
- 阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
- 超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是true / false)。
BlockingQueue 是个接口,你需要使用它的实现之一来使用BlockingQueue,Java.util.concurrent包下具有以下 BlockingQueue 接口的实现类:
- ArrayBlockingQueue:ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改)。
- DelayQueue:DelayQueue 对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现 java.util.concurrent.Delayed 接口。
- LinkedBlockingQueue:LinkedBlockingQueue 内部以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。
- PriorityBlockingQueue:PriorityBlockingQueue 是一个无界的并发队列。它使用了和类 java.util.PriorityQueue 一样的排序规则。你无法向这个队列中插入 null 值。所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable 接口。因此该队列中元素的排序就取决于你自己的 Comparable 实现。
- SynchronousQueue:SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。据此,把这个类称作一个队列显然是夸大其词了。它更多像是一个汇合点。
demo:
package blocking_queue;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
/**
* @author Sean
* @version 1.0
* @date 创建时间:2017/7/15 14:29
* @parameter
* @return
*/
public class BlockingQueueTest {
public static class Producer implements Runnable{
private final BlockingQueue<Integer> blockingQueue;
private volatile boolean flag;
private Random random;
public Producer(BlockingQueue<Integer> blockingQueue) {
this.blockingQueue = blockingQueue;
flag = false;
random = new Random();
}
@Override
public void run() {
while (!flag){
int info = random.nextInt(100);
try {
blockingQueue.put(info);
System.out.println(Thread.currentThread().getName()+" procuce "+info);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void shutDown(){
flag = true;
}
}
public static class Consumer implements Runnable{
private final BlockingQueue<Integer> blockingQueue;
private volatile boolean flag;
public Consumer(BlockingQueue<Integer> blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void run() {
while(!flag){
int info;
try {
info = blockingQueue.take();
System.out.println(Thread.currentThread().getName()+" consumer "+info);
Thread.sleep(50);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
public void shutDown(){
flag=true;
}
}
public static void main(String[] args) {
BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>(10);
Producer producer=new Producer(blockingQueue);
Consumer consumer=new Consumer(blockingQueue);
//创建5个生产者,5个消费者
for(int i=0;i<10;i++){
if(i<5){
new Thread(producer,"producer"+i).start();
}else{
new Thread(consumer,"consumer"+(i-5)).start();
}
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
producer.shutDown();
consumer.shutDown();
}
}
java.util.concurrent包提供了以下几种阻塞队列:
- LinkedBlockingQueue是一个基于链表实现的阻塞队列。默认容量没有上限,但也有可以指定最大容量的构造方法。它有的“双端队列版本”为LinkedBlockingDeque。
- ArrayBlockingQueue是一个基于数组实现的阻塞队列,它在构造时需要指定容量。它还有一个构造方法可以指定一个公平性参数,若这个参数为true,那么等待了最长时间的线程会得到优先处理(指定公平性参数会降低性能)。
- PriorityBlockingQueue是一个基于堆实现的带优先级的阻塞队列。元素会按照它们的优先级被移除队列。
package blockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
public class BlockingQueueTest {
private int size = 20;
private ArrayBlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(size);
public static void main(String[] args) {
BlockingQueueTest test = new BlockingQueueTest();
Producer producer = test.new Producer();
Consumer consumer = test.new Consumer();
producer.start();
consumer.start();
}
class Consumer extends Thread{
@Override
public void run() {
while(true){
try {
//从阻塞队列中取出一个元素
blockingQueue.take();
System.out.println("队列剩余" + blockingQueue.size() + "个元素");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
class Producer extends Thread{
@Override
public void run() {
while (true) {
try {
//向阻塞队列中插入一个元素
blockingQueue.put(1);
System.out.println("队列剩余空间:" + (size - blockingQueue.size()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
在以上代码中,我们有一个生产者线程不断地向一个阻塞队列中插入元素,同时消费者线程从这个队列中取出元素。若生产者生产的比较快,消费者取的比较慢导致队列满,此时生产者再尝试插入时就会阻塞在put方法中,直到消费者取出一个元素;反过来,若消费者消费的比较快,生产者生产的比较慢导致队列空,此时消费者尝试从中取出时就会阻塞在take方法中,直到生产者插入一个元素。
七、Callable与Future
Callable和Future,它俩很有意思的,一个产生结果,一个拿到结果。
我们之前提到了创建线程的两种方式,它们有一个共同的缺点,那就是异步方法run没有返回值,也就是说我们无法直接获取它的执行结果,只能通过共享变量或者线程间通信等方式来获取。好消息是通过使用Callable和Future,我们可以方便的获得线程的执行结果。
Callable接口与Runnable接口类似,区别在于它定义的异步方法call有返回值。Callable接口的定义如下:
public interface Callable<V> {
V call() throws Exception;
}
类型参数V即为异步方法call的返回值类型。
来看个简单栗子:
public class CallableAndFuture {
public static void main(String[] args) {
Callable<Integer> callable = new Callable<Integer>() {
public Integer call() throws Exception {
return new Random().nextInt(100);
}
};
FutureTask<Integer> future = new FutureTask<Integer>(callable);
new Thread(future).start();
try {
Thread.sleep(5000);// 可能做一些事情
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
FutureTask实现了两个接口,Runnable和Future,所以它既可以作为Runnable被线程执行,又可以作为Future得到Callable的返回值,那么这个组合的使用有什么好处呢?假设有一个很耗时的返回值需要计算,并且这个返回值不是立刻需要的话,那么就可以使用这个组合,用另一个线程去计算返回值,而当前线程在使用这个返回值之前可以做其它的操作,等到需要这个返回值时,再通过Future得到,岂不美哉!
Future可以对具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成以及获取结果。可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。Future接口的定义如下:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
在Future接口中声明了5个方法,每个方法的作用如下:
cancel方法用来取消任务,如果取消任务成功则返回true,如果取消任务失败则返回false。参数mayInterruptIfRunning表示是否允许取消正在执行却没有执行完毕的任务,如果设置true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论mayInterruptIfRunning为true还是false,此方法肯定返回false(即如果取消已经完成的任务会返回false);如果任务正在执行,若mayInterruptIfRunning设置为true,则返回true,若mayInterruptIfRunning设置为false,则返回false;如果任务还没有执行,则无论mayInterruptIfRunning为true还是false,肯定返回true。
isCancelled方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
isDone方法表示任务是否已经完成,若任务完成,则返回true;
get()方法用来获取执行结果,这个方法会阻塞,一直等到任务执行完才返回;
-
get(long timeout, TimeUnit unit)用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回null。
Future接口的实现类是FutureTask:
public class FutureTask<V> implements RunnableFuture<V>
FutureTask类实现了RunnableFuture接口,这个接口的定义如下:
public interface RunnableFuture<V> implements Runnable, Future<V> {
void run();
}
可以看到RunnableFuture接口扩展了Runnable接口和Future接口。
FutureTask类有如下两个构造器:
public FutureTask(Callable<V> callable)
public FutureTask(Runnable runnable, V result)
FutureTask通常与线程池配合使用,通常会创建一个包装了Callable对象的FutureTask实例,并用submit方法将它提交到一个线程池去执行,我们可以通过FutureTask的get方法获取返回结果。
public class CallableAndFuture {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newSingleThreadExecutor();
Future<Integer> future = threadPool.submit(new Callable<Integer>() {
public Integer call() throws Exception {
return new Random().nextInt(100);
}
});
try {
Thread.sleep(5000);// 可能做一些事情
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
九、执行器
创建一个新线程涉及和操作系统的交互,因此会产生一定的开销。在有些应用场景下,我们会在程序中创建大量生命周期很短的线程,这时我们应该使用线程池(thread pool)。通常,一个线程池中包含一些准备运行的空闲线程,每次将Runnable对象交给线程池,就会有一个线程执行run方法。当run方法执行完毕时,线程不会进入Terminated
状态,而是在线程池中准备等下一个Runnable到来时提供服务。使用线程池统一管理线程可以减少并发线程的数目,线程数过多往往会在线程上下文切换上以及同步操作上浪费过多时间。
执行器类(java.util.concurrent.Executors)提供了许多静态工厂方法来构建线程池。
1.线程池
在Java中,线程池通常指一个ThreadPoolExecutor对象,ThreadPoolExecutor类继承了AbstractExecutorService类,而AbstractExecutorService抽象类实现了ExecutorService接口,ExecutorService接口又扩展了Executor接口。也就是说,Executor接口是Java中实现线程池的最基本接口。我们在使用线程池时通常不直接调用ThreadPoolExecutor类的构造方法,二回使用Executors类提供给我们的静态工厂方法,这些静态工厂方法内部会调用ThreadPoolExecutor的构造方法,并为我们准备好相应的构造参数。
Executor是类中的以下三个方法会返回一个实现了ExecutorService接口的ThreadPoolExecutor类的对象:
newCachedThreadPool() //返回一个带缓存的线程池,该池在必要的时候创建线程,在线程空闲60s后终止线程
newFixedThreadPool(int threads) //返回一个线程池,线程数目由threads参数指明
newSingleThreadExecutor() //返回只含一个线程的线程池,它在一个单一的线程中依次执行各个任务
newScheduledThreadPool()//包含预定执行而构建的线程池
- 对于newCachedThreadPool方法返回的线程池:对每个任务,若有空闲线程可用,则立即让它执行任务;若没有可用的空闲线程,它就会创建一个新线程并加入线程池中;
- newFixedThreadPool方法返回的线程池里的线程数目由创建时指定,并一直保持不变。若提交给它的任务多于线程池中的空闲线程数目,那么就会把任务放到队列中,当其他任务执行完毕后再来执行它们;
- newSingleThreadExecutor会返回一个大小为1的线程池,由一个线程执行提交的任务。
以下方法可将一个Runnable对象或Callable对象提交给线程池:
Future<T> submit(Callable<T> task)
Future<T> submit(Runnable task, T result)
Future<?> submit(Runnable task)
调用submit方法会返回一个Future对象,可通过这个对象查询该任务的状态。我们可以在这个Future对象上调用isDone、cancle、isCanceled等方法(Future接口会在下面进行介绍)。第一个submit方法提交一个Callable对象到线程池中;第二个方法提交一个Runnable对象,并且Future的get方法在完成的时候返回指定的result对象。
当我们使用完线程池时,就调用shutdown方法,该方法会启动该线程池的关闭例程。被关闭的线程池不能再接受新的任务,当关闭前已存在的任务执行完毕后,线程池死亡。shutdownNow方法可以取消线程池中尚未开始的任务并尝试中断所有线程池中正在运行的线程。
在使用线程池时,我们通常应该按照以下步骤来进行:
- 调用Executors中相关方法构建一个线程池;
- 调用submit方法提交一个Runnable对象或Callable对象到线程池中;
- 若想要取消一个任务,需要保存submit返回的Future对象;
- 当不再提交任何任务时,调用shutdown方法。
package test;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ExecutorService fixedThreadPool = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
final int index = i;
fixedThreadPool.execute(new Runnable() {
public void run() {
try {
System.out.println(index);
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
2.预定执行
ScheduledExecutorService接口含有为预定执行(Scheduled Execution)或重复执行的任务专门设计的方法。Executors类的newScheduledThreadPool和newSingleThreadScheduledExecutor方法会返回实现了ScheduledExecutorService接口的对象。可以使用以下方法来预定执行的任务:
ScheduledFuture<V> schedule(Callable<V> task, long time, TimeUnit unit)
ScheduledFuture<?> schedule(Runnable task, long time, TimeUnit unit)
//以上两个方法预定在指定时间过后执行任务
SchedukedFuture<?> scheduleAtFixedRate(Runnable task, long initialDelay, long period, TimeUnit unit) //在指定的延迟(initialDelay)过后,周期性地执行给定任务
ScheduledFuture<?> scheduleWithFixedDelay(Runnable task, long initialDelay, long delay, TimeUnit unit) //在指定延迟(initialDelay)过后周期性的执行任务,每两个任务间的间隔为delay指定的时间
package test;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPoolExecutorTest {
public static void main(String[] args) {
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
scheduledThreadPool.schedule(new Runnable() {
public void run() {
System.out.println("delay 3 seconds");
}
}, 3, TimeUnit.SECONDS);
}
}
3.控制任务组
对ExecutorService对象调用invokeAny方法可以把一个Callable对象集合提交到相应的线程池中执行,并返回某个已经完成的任务的结果,该方法的定义如下:
T invokeAny(Collection<Callable<T>> tasks)
T invokeAny(Collection<Callable<T>> tasks, long timeout, TimeUnit unit)
该方法可以指定一个超时参数。这个方法的不足在于我们无法知道它返回的结果是哪个任务执行的结果。如果集合中的任意Callable对象的执行结果都能满足我们的需求的话,使用invokeAny方法是很好的。
invokeAll方法也会提交Callable对象集合到相应的线程池中,并返回一个Future对象列表,代表所有任务的解决方案。该方法的定义如下:
List<Future<T>> invokeAll(Collection<Callable<T>> tasks)
List<Future<T>> invokeAll(Collection<Callable<T>> tasks, long timeout, TimeUnit unit)