进程概念
进程(英语:process),是计算机中已运行程序的实体。程序本身只是指令,数据及其组织形式的描述,进程才是程序的真正运行实例。
线程概念
线程(英语:Thread),是操作系统运算调度的最小单位,它被包含在进程中,是进程中的实际运行单位。
进程和线程的关系
在面向线程设计的系统(如当代大多数操作系统,Linux2.6及更新的版本)中,进程本身不是基本运行单位,而是线程的容器。同一进程中的多条线程,共享该进程中的全部系统资源,如虚拟地址空间,文件描述符和信号处理等等。但同一进程中的多个线程有各自的调用栈(call stack),自己的寄存器环境(register context),自己的线程本地存储(thread-local storage)。
我们可以在 Mac 上打开活动监视器,查看进程数和对应的线程数,尝试理解上面的概念。
还可以看下阮一峰的这个博客,及其下面的部分评论,帮助理解概念。
进程与线程的一个简单解释
Java 线程的运行状态
- 初始状态:在其他线程中,新建 Thread 。
- 可运行状态:调用该线程的 start 方法,使其变成可运行状态,在线程池中,等待获取 CPU 资源执行任务,由操作系统调度。
- 运行中:操作系统调度该线程,获得 CPU 资源执行任务。
- 结束:run() 中的代码执行完毕,main 方法结束 。
- 阻塞状态:阻塞状态结束即变成可运行状态
- 等待用户输入
- sleep 线程睡眠
- thread2.join,等待其他线程 如 thread2 终止后,在恢复。
- 等待队列
- synchronized 锁中,使用 object.wait() 等待队列,object.notify 唤醒或者到了等待时间,object.notify() 随机唤醒在此对象监视器上其中一个线程,object.notifyAll() 唤醒在此对象监视器上的所有线程。
- ReentrantLock Condition await()/signal() 。
- 线程同步(互斥锁)synchronized,ReentrantLock 等。
- 线程让步,Thread.yield() 把资源让给其他线程。线程状态由运行中变成可运行。因为是 OS 自己调度的线程,所以不一定能起到让步资源的目的。
Thread 基本用法
线程的构造方法
Thread()
Thread(Runnable target)
Thread(String name)
Thread(Runnable target, String name)
...
线程的实现方法
- 继承 Thread ,实现 run 方法。
class TestThread1 extends Thread {
@Override
public void run() {
super.run();
System.out.println("testThread1 run end.");
}
}
TestThread1 testThread1 = new TestThread1();
testThread1.start();
- 实现 Runnable 接口 和 run方法。
class TestThread2 implements Runnable {
@Override
public void run() {
System.out.println("testThread2 run end.");
}
}
Thread testThread2 = new Thread(new TestThread2(), "t2");
testThread2.start();
- 内部类 Runnable。
Thread testThread3 = new Thread(new Runnable() {
@Override
public void run() {
System.out.println("testThread3 run end.");
}
});
testThread3.start();
启动线程
TestThread1 testThread1 = new TestThread1();
testThread1.start();
终止线程
void interrupt ()
boolean isInterrupted ()
//示例:
public void test01() {
testThread3 = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1000; i++) {
System.out.println("t3计数 " + i);
if (testThread3.isInterrupted()) {
break;
}
}
System.out.println("testThread3 run end.");
}
});
testThread3.start();
for (int i = 0; i < 500; i++) {
if(i == 499) {
testThread3.interrupt();
System.out.println("testThread3 interrupt.");
}
}
}
线程优先级
void setPriority (int newPriority)
MAX_PRIORITY Constant Value: 10 (0x0000000a)
MIN_PRIORITY Constant Value: 1 (0x00000001)
newPriority 的值在 1 和 10 之间(包括1,10)
线程安全
互斥锁
互斥锁(英语:英语:Mutual exclusion,缩写 Mutex)是一种用于多线程编程中,防止两条线程同时对同一公共资源(比如全局变量)进行读写的机制。该目的通过将代码切片成一个一个的临界区域(critical section)达成。临界区域指的是一块对公共资源进行访问的代码,并非一种机制或是算法。一个程序、进程、线程可以拥有多个临界区域,但是并不一定会应用互斥锁。
线程锁 - 参考文章 - JDK 5.0 中更灵活、更具可伸缩性的锁定机制
synchronized 用法
- 对类对象加锁
private synchronized void increaseCount1() {
shareData++;
}
private void increaseCount1() {
synchronized (this) {
shareData++;
}
}
- 对其他对象加锁
private final Object mLock = new Object();
private void increaseCount1() {
synchronized (mLock) {
shareData++;
}
}
- 对类加锁
private synchronized static void increaseCount1() {
shareData++;
}
private synchronized void increaseCount1() {
synchronized (this.getClass()) {
shareData++;
}
}
synchronized 的优点:
使用简单,易于理解,不必担心锁没有释放,JVM 会帮助释放锁。
synchronized 的问题:
无法中断一个等候锁的线程,无法通过等候锁的顺序来获得锁,锁不被释放就不会得到锁。
ReentrantLock 用法
private int shareData;
private ReentrantLock mLock = new ReentrantLock();
public void increaseCount() {
mLock.lock();
try {
shareData++;
} finally {
mLock.unlock();
}
}
oublic void readCount() {
mLock.lock();
try {
return shareData;
} finally {
mLock.unlock();
}
}
ReentrantLock 的优点:
实现了 Lock 接口,和 synchronized 相同的并发性和内存语义,还多了轮询锁,定时锁,可中断锁等候。在激烈争用的情况下性能更好。ReentrantLock 默认是不公平锁,公平锁可以按照求锁的顺序获得锁,但是会耗损性能。
ReentrantLock 的缺点:
要在 finally 中释放锁,如果保护的代码抛出异常,锁将永远无法释放。
synchronized 和 ReentrantLock 该如何选择:
优先使用 synchronized ,除非发现需要用到 synchronized 不具备的特性(比如时间锁等候、可中断锁等候、无块结构锁、多个条件变量或者轮询锁)或是在特别激烈争用的情况下,则适合使用 ReentrantLock。
ReentrantReadWriteLock 具有读同步优化
private int shareData;
private ReentrantReadWriteLock mLock = new ReentrantReadWriteLock();
public void increaseCount() {
mLock.writeLock.lock();
try {
shareData++;
} finally {
mLock.writeLock.unlock();
}
}
oublic void readCount() {
mLock.readLock.lock();
try {
return shareData;
} finally {
mLock.readLock.unlock();
}
}
ReentrantReadWriteLock 的特点:
读锁之间不互斥,读锁和写锁之间互斥,写锁和写锁之间互斥。适用于大量读操作,少量写操作的场景。
信号量
信号量(英语:Semaphore)又称为信号量、旗语,是一个同步对象,用于保持在0至指定最大值之间的一个计数值。当线程完成一次对该 semaphore 对象等待时,该计数值减一;当线程完成一次对 semaphore 对象释放时,计数值加一。当计数值为0,则线程等待该 semaphore 对象,直至该 semaphore 对象变成 signaled 状态。semaphore 对象的计数值大于0,为 signaled 状态;计数值等于0,为 nonsignaled 状态。
public void test02() {
final Semaphore semaphore = new Semaphore(3);
System.out.println("可用的信号量:" + semaphore.availablePermits());
for (int i = 0; i < 50; i++) {
final int flag = i;
Thread testThread = new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("test thread runing..." + flag);
Thread.sleep((long) (Math.random() * 3000));
semaphore.release();
System.out.println("可用的信号量:" + semaphore.availablePermits());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
testThread.start();
}
}
线程协作
- Object wait¬ify/notifyAll
如果有多个 wait 线程,notify 只随机唤醒其中之一,notifyAll 会唤醒所有的 wait 线程,被唤醒的线程在锁池中,待 notify 的线程释放完锁之后,所有在锁池的线程才去竞争锁。
private final Object mLock = new Object();
private void increaseCount1() {
synchronized (mLock) {
mLock.wait();/mLock.notify();
shareData++;
}
}
- Condition await&signal
private ReentrantLock mLock = new ReentrantLock();
private Condition mCondition = mLock.newCondition();
// 要在锁的包裹下 lock()-await()/signal()-unlock()
mCondition.await();
mCondition.signal();
线程池
线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
使用单线程,固定线程,动态线程或是自定义的,根据自己的业务需求和使用场景合理选择。
单线程
可以保障任务的执行顺序,队列是个无界队列。
static ExecutorService newSingleThreadExecutor()
static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory)
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 100; i++) {
final int flag = i;
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
Thread.sleep((long) (Math.random() * 3000));
System.out.println(Thread.currentThread().getName() + " running ... " + flag);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
executorService.execute(runnable);
}
流程图:
固定线程数
创建过的线程可以复用,不过只能创建固定数量的线程,有需要长时间处理的任务可以考虑使用。
static ExecutorService newFixedThreadPool(int nThreads)
static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory)
public void testFixedThreadPool() {
ExecutorService executorService = Executors.newFixedThreadPool(3);
for (int i = 0; i < 100; i++) {
final int flag = i;
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
Thread.sleep((long) (2000));
System.out.println(Thread.currentThread().getName() + " running ... " + flag);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
executorService.execute(runnable);
}
}
流程图:
动态线程数
在无空闲线程可用的情况下,可随时新建线程,创建过的线程可以复用,池线程数支持 0-Integer.MAX_VALUE,超过缺省 60 s可以移除线程,适合任务时间短的异步任务。
如果是大量的需要长时间处理的任务,因为会创建很多线程,效率会很慢。
static ExecutorService newCachedThreadPool()
static ExecutorService newCachedThreadPool(ThreadFactory threadFactory)
public void testCachedThreadPool() {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 100; i++) {
final int flag = i;
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
Thread.sleep((long) (5000));
System.out.println(Thread.currentThread().getName() + " running ... " + flag);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
executorService.execute(runnable);
}
}
流程图:
自定义线程池
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
public void testCustomThreadPool() {
int N = 2;
BlockingQueue<Runnable> blockingDeque = new ArrayBlockingQueue<Runnable>(10);
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(N + 1, N * 2, 1000, TimeUnit.MILLISECONDS, blockingDeque);
for (int i = 0; i < 14; i++) {
final int flag = i;
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
Thread.sleep((long) (1000));
System.out.println(Thread.currentThread().getName() + " running ... " + flag);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
poolExecutor.execute(runnable);
}
}
- corePoolSize(核心线程池大小),当提交一个任务到线程池,如果线程池中的线程数量小于 corePoolSize,线程池就会创建一个线程,即使有空闲的线程可以执行任务。
- maximumPoolSize(线程池最大大小),如果核心线程池已满,任务会添加到任务队列,任务队列如果也满了,而当前线程数小于 maximumPoolSize,则会创建新线程执行任务。
- keepAliveTime(空闲线程存活时间),如果空闲线程 idle 的时间大于 keepAliveTime 则移除该线程。
- unit(时间的单位),单位有天(DAYS),小时(HOURS),分钟(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
-
workQueue(BlockingQueue 阻塞队列) 的常规策略:
- 直接切换 (Direct handoffs),SynchronousQueue 就是这种队列,它将任务直接交给线程,而不持有。如果没有线程可以执行这个任务,那任务无法添加到队列,所以只能创建新线程。这种策略可以避免在处理有内部依赖的请求集合时被锁住。直接切换的策略通常需要没有限制的 maximumPoolSizes 线程池数,比如 newCachedThreadPool 动态线程,所以需要设置 idle 时间来移除空闲超时的线程。这样可以避免提交的任务被拒绝执行。当要处理的任务一直比可用的线程多时,线程会无限创建下去。
- 无界队列(Unbounded queues),LinkedBlockingQueue 属于无界队列,newFixedThreadPool 使用的就是这个队列,当 corePoolSize 的线程都在忙碌的时候,任务会被添加到这个队列里等待执行。当每个任务完全独立于其他任务的时候,这个策略是合适的,它们的执行不会相互干扰,不独立就会有被锁住的风险。例如,在 Web 服务中,它有时可以处理突发的大量请求,但是当处理的任务比可用的线程一直多时,队列里的任务会无限的增多。
- 有界队列(Bounded queues),ArrayBlockingQueue 属于有界队列,有界队列在配合使用有限 maximumPoolSizes 的时候,有助于防止系统资源耗尽,但 maximumPoolSizes 的大小难以调整,控制。队列大小和最大线程池大小需要相互折衷,使用大队列,小池可以最大限度的减少 CPU 的使用率,系统资源使用和上线文切换的成本,但会导致吞吐量降低。使用小队列通常需要大的线程池,这可以保持 CPUs 繁忙,但可能会突然遇到无法接受的调度开销,这也会降低吞吐量。
- threadFactory,帮助创建线程。
- RejectedExecutionHandler handler(饱和策略),在 Executor 被 shut down 关闭的时候,新提交的任务会被拒绝。在使用了有限的最大线程池和有限的队列时,如果两项都饱和了,新任务也会被拒绝。这时,execute 调用RejectedExecutionHandler 的
rejectedExecution(Runnable, ThreadPoolExecutor)
方法,下面有四个预先定义好的策略供使用:- ThreadPoolExecutor.AbortPolicy(默认的),拒绝时抛出 RejectedExecutionException 异常。
- ThreadPoolExecutor.CallerRunsPolicy ,调用 execute 的线程自己运行这个任务,这提供了一个简单的反馈控制机制,会降低新任务的提交速度。
- ThreadPoolExecutor.DiscardPolicy,丢弃掉无法执行的任务。
- ThreadPoolExecutor.DiscardOldestPolicy,如果 executor 没有被关闭,那将丢弃掉队列最前面的任务,然后在尝试执行(尝试执行,可能还会失败,会重复该过程)。
- 自定义策略,实现 RejectedExecutionHandler。
流程图:
从队列中移除任务
public void testCustomThreadPool() {
int N = 2;
BlockingQueue<Runnable> blockingDeque = new ArrayBlockingQueue<Runnable>(10);
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(N + 1, N * 2, 1000, TimeUnit.MILLISECONDS, blockingDeque);
for (int i = 0; i < 14; i++) {
final int flag = i;
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
Thread.sleep((long) (1000));
System.out.println(Thread.currentThread().getName() + " running ... " + flag);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
poolExecutor.execute(runnable);
}
poolExecutor.getQueue().remove();
poolExecutor.getQueue().remove();
poolExecutor.getQueue().remove();
/*
pool-1-thread-2 running ... 1
pool-1-thread-4 running ... 13
pool-1-thread-3 running ... 2
pool-1-thread-1 running ... 0
pool-1-thread-1 running ... 9
pool-1-thread-2 running ... 6
pool-1-thread-4 running ... 7
pool-1-thread-3 running ... 8
pool-1-thread-4 running ... 12
pool-1-thread-2 running ... 11
pool-1-thread-1 running ... 10
3,4,5 号任务被删除,没有执行,该方法删除队列前面的任务,0,1,2 已经执行所以无法删除。
*/
}
关闭线程池
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
获取任务的直接结果
public void testReturnTaskResult() {
ExecutorService executorService = Executors.newFixedThreadPool(2);
Future<String> future0 = executorService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
System.out.println(Thread.currentThread().getName() + " callable running ... 0");
return "0 task finished!";
}
});
Future<?> future1 = executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " runnable running ... 1");
}
});
Future<?> future2 = executorService.submit(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " runnable running ... 2");
}
}, "2 task finished!");
try {
System.out.println("future 0:" + future0.get());
System.out.println("future 1:" + future1.get());
System.out.println("future 2:" + future2.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
Future.get()
方法会阻塞当前线程。
聊聊并发(三)——JAVA线程池的分析和使用 InfoQ
文章中谈到如何合理的配置线程池,根据任务的性质,是 CPU 密集型,IO 密集型,还是混合型任务,还有任务的优先级,任务的执行时间,和任务的依赖性,如数据库连接。