什么是线程?
可以从《深入理解JVM》得到参考
线程是比进程更轻量级的调度单位,多个线程可以共享进程的资源,而其自身也有独立的资源
主流OS都有其线程实现,而JVM中的线程更为轻量级,并且对不同硬件与OS做了兼容
实现线程有3种方式
有内核线程,由OS来调度
由用户线程,由用户空间自己调度
以及混合调度,目前JVM的线程应该是混合调度实现的
为什么要有线程池?
假如每个异步任务都要开启一个线程,其资源消耗太大
而且项目中线程过多,描述符过多,会引起OOM
每个线程执行完后就会被销毁,有点可惜
所以需要有个线程池,使得任务执行完后,线程不会被销毁,而是被缓存起来
整体架构
image.png
执行规则
- 核心线程
- 队列
- 非核心线程
- 饱和策略
如何实现线程复用?
对于线程,无非是使用睡眠/唤醒机制来实现
- wait/notify
- ReentrantLock
- C++:select、poll、epoll
线程池则是利用了阻塞队列的特性,take元素的时候,如果为空,则会一直阻塞
如以下代码,最终阻塞在getTask方法上,即BlockingQueue的take
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take(); // 阻塞于此
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
借鉴这个思想,我们可以设计一个推送系统,在发消息的时候,准备一个BlockingQueue,在while循环里take这个BlockingQueue,实现持续的消息发送
如何合理取消任务?
- shutdown,拒绝新任务到来
- awaitTermination(awaitTime, TimeUnit.MILLISECONDS),等待若干秒,看是否还有任务未完成
- 调用shutdownNow,它会对所有运行中的任务调用intercept,但是注意intercept并不是什么黑科技,他只是修改一个标志位,你在工作任务中需要检测intercepted标记位,判断是否需要停止任务,举个例子
Runnable r = new Runnable() {
@Override
public void run() {
while(true) {
if (Thread.isIntercepted()) {
return;
}
}
}
}
此外线程的许多方法是可以响应中断的,比如wait,sleep
public final void wait(long millis) throws InterruptedException {
wait(millis, 0);
}
public static void sleep(long millis) throws InterruptedException {
Thread.sleep(millis, 0);
}
也就是说,在线程执行这些方法并阻塞的时候,调用Thread.interrupted(),会使得他们结束这种状态,并抛出中断异常
总结
整体上就这几个点。对于这个库,虽然整体架构比较简单,但是它在设计模式上,并发处理上,有许多小的细节可以深挖,比如AQS,CAS等,认真读完后,会有很大的收获
后记
有什么写得错误、让人费解或遗漏的地方,希望可以不吝赐教,我会马上更改