ThreadPoolExecutor代码中的注释
ExcutorService 尽可能使用线程池里的Thread来执行任务,通常使用Excutors工厂方法进行配置。
线程池通常处理两个不同的问题:它们会在执行大量异步任务的时候,通过减少每个任务调度的开销来提高性能,当执行一组任务的时候,线程池提供(包括线程在内)资源调度的方法。每一个ThreadPoolExecutor也可以做一些统计工作。例如统计执行完的任务数。ThreadPoolExecutor为了适应广泛的使用场景,提供了许多可调参数和回调方法。
我们也可以用一些Excutors提供的工厂方法来创建线程池。
Executors.newCachedThreadPool()
:提供无边界的线程池,提供自动回收线程的功能。
Executors.newFixedThreadPool(int)
:确定大小的线程池。Executors.newSingleThreadExecutor()
:提供一个后台线程。
corePoolSize(核心线程数)和maximumPoolSize(最大线程数):
ThreadPoolExecutor会自动调整线程池的大小,根据这两个值,当通过execute(java.lang.Runnable)
方法提交一个任务时,当 小于corePoolSize数量的线程在线程池里运行,即使这些线程处于闲置状态,也会创建一个线程来执行这个新任务。如果线程池中有多于corePoolSize,小于maximumPoolSize个线程正在运行,那么只有当等待队列满了才会创建新的线程去执行新任务。
通过将corePoolSize和maximumPoolSize设置成一样的值来实现固定大小的线程池。
通过将maximumPoolSize设置成Integer.MAX_VALUE,实现无限大的线程池,可以容纳任意数量的并发任务。这两个值也可以通过setCorePoolSize(int)
和setMaximumPoolSize(int)
来动态修改。
根据需求来构造线程池:
在默认情况下,核心线程只有在新任务来临的时候才被初始化创建出来,但是当线程池在创建初期等待队列就非空,可以通过实现prestartCoreThread()
和prestartAllCoreThreads()
方法来预启动核心线程。
创建新线程:
可以通过ThreadFactory
创建新的线程。如果没有指定,会使用Executors.defaultThreadFactory()
创建新线程。使用默认ThreadFactory
创建的线程会在同一个java.lang.ThreadGroup
中,且同样的优先级(NORM_PRORITY),且无daemon状态。通过制定不同的ThreadFactory
,可以改变线程的名称、线程group、优先级、daemon状态等。当ThreadFactory
创建失败时,会返回null,线程池会继续运转,但是新的任务可能不会执行。线程需要有modifyThread运行时权限,如果没有这个权限,服务会被降级:这会导致一些配置不会立即生效,关闭池有可能保持在终止状态而非完成。
Keep-alive times(线程空闲时间):
当线程数大于核心线程数,当辅助线程被挂起时间超过线程空闲时间,辅助线程会被终止。这是一种当线程池空闲时,资源重用的手段,当之后线程池再度忙起来,会重新创建新的线程。这个参数可以通过setKeepAliveTime(long,java.util.concurrent.TimeUnit)
方法动态修改。
通过setKeepAliveTime(Long.MAX_VALUE,TimeUnit.NANOSECONDS)
,有效禁止挂起的线程在终止之前被关闭。
在默认情况下,这个参数只对辅助线程有约束作用,通过调用allowCoreThreadTimeOut(boolean)
可以讲参数的作用范围扩大到核心线程。
任务等待队列:
等待队列的等待策略会影响池子大小。
- 如果正在运行的线程数小于核心线程,Executor倾向于创建新线程执行任务,而非排队等待。
- 如果核心线程已满或者有更多的线程在运行,Executor倾向于等待,而非创建新的线程。
- 如果等待队列已满,当池子没满(线程数小于
maximumPoolSize
的情况),会创建新的线程,如果池子也满了,任务将会被拒绝。
等待队列的一些策略:
- 直接传递:
SynchronousQueue
,当有任务来的时候,不会入队列,如果没有线程可以立即执行这个任务时,尝试入队列会导致失败,因此会尝试创建新线程。这种策略会防止有依赖关系的请求相互等待的情况。直接传递通常要求池子大小无限大,以防止新任务来临时rejected,也满足当任务到达的速度比运行速度要快时池子大小无限扩大的需求。 - 容量无限制的等待队列:
LinkedBlockingQueue
,当所有的核心线程处于繁忙状态时,新来的任务会入队。因此不会有辅助线程被创建,maximumPoolSize
也不会起作用。这种队列适用于任务之间是相互独立的情况。例如:在web服务器中消除请求激增的情况下,这种队列可以满足请求排队。 - 有容量限制的等待队列:
ArrayBlockingQueue
可以通过设置maximumPoolSizes
,预防资源枯竭,但是这种队列难以调试和控制。队列的容量和池子的容量是相互影响的:用大队列小池子会最小化CPU和系统资源的使用,并且可以节省context切换的成本,但是会降低吞吐量。当一个任务经常阻塞,系统会安排比预计的更多的时间给更多的线程。用小队列大池子,CPU会一直处于busy状态,这样会导致调度繁忙,因为最终也会减少吞吐量。
被拒绝的任务:
当Executor已经被干掉,或者等待队列和池子的容量到达上限时,执行execute(java.lang.Runnable)
方法,提交新任务会被拒绝。这种情况会执行RejectedExecutionHandler.rejectedExecution(java.lang.Runnable,java.util.concurrent.ThreadPoolExecutor)
方法。
当任务被拒绝时,有四种预定义的策略可用:
-
ThreadPoolExecutor.AbortPolicy
会抛出RejectedExecutionException
异常。 -
ThreadPoolExecutor.CallerRunsPolicy
执行execute方法的线程执行任务,这提供了反馈控制机制放慢新任务的提交速度。 -
ThreadPoolExecutor.DiscardOldestPolicy
,任务被忽略。 -
ThreadPoolExecutor.DiscardOldestPolicy
,如果executor没有被干掉,等待队列中,位于head的任务会被忽略,然后重试execute,如果还是被拒绝,会重复此过程。
也可以定义其他种类的RejectedExecutionHandler
,需要注意有容量限制的队列和池子的情况。
钩子方法:
可以在每个任务的beforeExecute(java.lang.Thread,java.lang.Runnable)
和afterExecute(java.lang.Runnable,java.lang.Throwable)
方法中执行一些代码。例如:可以统计数据,打log,可以重启ThreadLocals。
terminated()
会在Executor完全被干掉之后执行。
如果这些钩子方法中出异常,task会执行失败,或者线程会被意外终止掉。
队列的维护:
getQueue()
:可以获取工作队列。
remove(java.lang.Runnable)
和purge()
:当大量的任务被取消的时候,可以帮助内存回收,
Finalization:
当一个池子不再被引用且池子中没有剩下的线程时,会被自动干掉。如果担心用户忘记调用shutdown()
,可以设置合适的keep-alive时间,并且使用一个核心线程数为0的线程池,或者调用allowCoreThreadTimeOut(boolean)
方法来保证工作线程会最终被干掉。
构造方法
public ThreadPoolExecutor(int corePoolSize,//核心线程数
int maximumPoolSize,//最大线程数
long keepAliveTime,//最长空闲时间
TimeUnit unit,//时间单位
BlockingQueue<Runnable> workQueue,//等待队列
ThreadFactory threadFactory, //辅助线程生成策略
RejectedExecutionHandler handler)//被拒绝的任务处理方式
BlockQueue
常用的方法
入队:
offer(anObject)
:表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false(本方法不阻塞当前执行方法的线程)
offer(E o, long timeout, TimeUnit unit)
,可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
put(anObject)
:把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续。出队:
poll(time)
:取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null;
poll(long timeout, TimeUnit unit)
:从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。
take()
:取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;
drainTo()
:一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。
子类
-
ArrayBlockingQueue
ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。 -
LinkedBlockingQueue
对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。 -
SynchronousQueue
一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的BlockingQueue来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。
声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:
如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。 -
DelayQueue
DelayQueue中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue是一个没有大小限制的队列,因此往队列中插入数据的操作(生产者)永远不会被阻塞,而只有获取数据的操作(消费者)才会被阻塞。
使用场景:
DelayQueue使用场景较少,但都相当巧妙,常见的例子比如使用一个DelayQueue来管理一个超时未响应的连接队列。 -
PriorityBlockingQueue
基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。