并发整理最后一篇,之前两篇
并发整理(一)— Java并发底层原理
并发整理(二)— Java线程与锁
这篇讲的主要是JDK中运用之前说的并发基础来包装的一些类给开发者来并发调用,仔细研究这些有利于我们加深对并发处理的理解
ConcurrentHashMap
HashMap是非线程安全的,如果在多线程下使用很容易形成环状链表
关于HashMap,java8也做了很多改进可以看下面的文章
HashTable虽然是线程安全的但是其会把每个操作都会加锁,会严重影响性能所以java推荐使用ConcurrentHashMap来在并发环境下代替HashMap。但是不同的版本有不同的实现方法,主要看以下
注:源码内容太多,所以我没有贴代码,可以自行结合先说的逻辑看源码
Java7
以前的版本主要采用的锁分段技术,主要以下:
- 其内部由Segment数组与HashEntry数组组成,其中Segment继承自ReentrantLock,Segment结构与HashMap一致,所以这样拆分就解决了HashTable整个加锁的效率问题
- 每次操作元素,都需要先hash一次到segment然后再hash一次到每个桶,并且两次用的hash算法也是不一样的
- get操作:ConcurrentHashMap中Entry是volatile所以可以不用加锁也可以保证原子性
- put操作:ConcurrentHashMap先判断是否要扩容,然后才添加逻辑。这样的逻辑比HashMap更加合理,因为原来的方式很可能造成扩容后没有元素还需要添加了,这样就造成了浪费
- 扩容:扩容不会把ConcurrentHashMap整个容器扩容,只是对某个segment进行扩容
- size操作:ConcurrentHashMap的计数非常巧妙,一般HashMap里面都有一个ModCount来标志所以的操作量,在计数时ConcurrentHashMap会先尝试2次统计单个segment不加锁直接统计,如果这个时候发现modcount变了说明并发存在,于是之后每次统计都会锁住segment
探索 ConcurrentHashMap 高并发性的实现机制
Java8
Node的不同
-
Node节点中加了volatile关键字
volatile V val volatile Node<K,V> next
新增
find
方法,便于get
操作,把查找的工作全部交给节点,其实也是因为加入了红黑树所以查找单独封装在相应节点更好(并发)不能直接通过Node中的
setValue
直接设置,为了解决并发问题只能用ConcurrentHashMap封装的CAS操作-
已经知道了HashMap中大于8节点的桶会变成红黑树桶,但是与HashMap不同的是,ConcurrentHashMap的桶是TreeBin,其封装了红黑树增删以及旋转节点操作,并且存了树的根节点,其里面存的才是TreeNode
并且:TreeBin中通过一个简单的volatile型变量通过CAS操作来实现了一个简易读写锁来支持并发
扩容
前面说了在HashMap的时候很容易由于扩容问题在并发中造成环节点,但是8中的ConcurrentHashMap也摒弃了之前的分段锁,使用了更优美的方法:
- ConcurrentHashMap中加入了标志量为Moved的节点ForwardingNode,其储存了新扩容的table,这个节点只有在发送扩容时才存在
- 如果一个元素
put
完之后需要扩容了,就会进入流程,每个原来Table的桶扩容到新的table完了后都会在原来的桶中加入ForwardingNode节点代表已经扩容,桶里没有东西的,也直接加入ForwardingNode。扩容的时候也会对每个桶用synchronized加锁,避免并发问题 - 只要不超过最大并发数,ConcurrentHashMap允许并发的扩容加快速度。所以,如果在并发时如果
put
操作发现了ForwardingNode节点,有线程在进行并发,所以加入扩容,并且CAS更新sizeCtl全局变量代表当前并发数 - 在扩容的时候和新版的HashMap不一样,每个桶其不管是普通节点还是树节点,扩容后构建的都是原来倒序的
- 遍历完所有节点,就完成了复制工作,让扩容的table作为新的table
Put操作
由于摒弃了分段锁,所以不再是以前那样的两次hash,这次通过不允许全部的KEY或Value为null,用CAS操作和每个桶加锁来保证并发的安全:
- hash计算出在哪个桶之后,进入死循环,知道插入成功为止
- 如果这个桶空的,那么就直接CAS放入,不用加锁
- 如果这个桶是ForwardingNode(Moved)节点,则加入并发扩容
- 不是以上就给桶上锁(synchronized),之后用是普通节点遍历节点插入,不然就用树的插入方法插入
get操作
这个比较简单,而且ConcurrentHashMap直接把查找节点封装到了桶,直接调用find
就好
ConcurrentLinkedQueue
与HashMap的同步处理相比,这个简单的多
ConcurrentLinkedQueue采用非阻塞,不加锁的CAS算法来保证并发的问题
类中有volatile修饰的head和tail变量,下面以入队列offer
为例,出队列是一样的逻辑
- 最简单的可以想到每次添加过后CAS去更新tail就可以了,但是这样效率太低了,所以Doug Lea用了一种非常巧妙的方法
- 所以添加之后尽量减少tail的CAS操作,在以前的版本有一个HOPS常量,默认是1,当前tail节点和真正的尾节点差值大于HOPS才会CAS更新tail,而如果小于则不会更新这样就减少了CAS操作,并发的效率变高,所以才有很多地方都有的结论,tail节点不总是尾节点
- java8源码里面把HOPS这个变量去了,直接去判断如果tail节点的next节点不为空,则将入队节点CAS设置成tail节点(相当于大于HOPS=1),如果tail节点next为null则直接把新节点放到tail后面,不更新tail
因为ConcurrentLinkedQueue是无界的所以offer始终返回true,不要通过返回值去判断入队成功
BlockingQueue
阻塞队列完全是由生产者与消费者场景产生的,其在队列基础上支持两个阻塞的插入与删除操作
- ArrayBlockingQueue:是一个基于数组实现的有界阻塞队列,可以实现公平与非公平阻塞
- LinkedBlockingQueue:基于链表实现的有界阻塞队列,最大为Integer.MAX_VALUE
- PriorityBlockingQueue:支持优先级(即可用comparator定义排序规则,但是不能保证同优先级顺序)的无界阻塞队列
- DelayQueue:支持延时获取元素无界阻塞队列,内部由一个PriorityQueue实现,其入队的元素需要实现Delay接口
-
SynchronousQueue:不储存元素的阻塞队列,每个
put
都要等一个take
,否则不能继续添加,支持公平与非公平 -
LinkedTransferQueue:对于其他阻塞队列,多了
tryTransfer
与transfer
方法,意思就是先尝试立即给消费者,如果没有,才会加入队列 - LinkedBlockingDeque:双端阻塞队列
原理
以ArrayBlockingQueue为例,使用并发中典型的通知模式,就比如,如果队列满了,则添加就会阻塞,直到消费了一个之后再通知生产者当前队列可以用
-
其内部有一个ReentrantLock,用来生成阻塞生产与消费的两个方法的Condition变量notEmpty和notFull
/** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull;
由于主要基于生产与消费,所以不会考虑一般队列那样的并发处理,在每次入队出队都会加锁
每次消费如果容器count==0则会
notEmpty.await()
,而生产的时候都会调用notEmpty.signal()
,同理生产阻塞
Executor框架
结构
主要由以下组成
- 任务:Runnable接口或者Callable接口的实现类
- 执行者(线程调度者):Executor接口的实现类,其主要两个实现类:ThreadPoolExecutor与ScheduledThreadPoolExecutor
- 计算结果:如果任务是Callable接口,那么会返回结果接口Future的实现类
ThreadPoolExecutor
流程
其是Executor的一个主要实现了,下面三个线程池都继承它,其内部有一个核心线程池corePool,一个最大线程池maximumPool,和一个执行队列BlockingQueue,其执行的逻辑如下:
- 如果当前线程少于corePoolSize则获取全局锁,创建线程来执行任务
- 如果允运行的线程大于corePoolSize则把任务加入执行队列BlockingQueue
- 如果队列都满了,那就再获取全局锁,创建新线程执行任务
- 如果maximumPool都满了,那就使用RejectExecutionHandler拒绝任务
其中线程池创建线程时会将线程封装在一个Worker里面,每次worker执行完任务后会循环获取任务队列中的任务来执行
一般直接用ThreadPoolExecutor还是太复杂,所以用下面四个:
FixedThreadPool
可重用固定线程数的线程池,将corePoolSize与maximumPool设置相同,所以没有了第3步,并且其内使用LinkedBlockingQueue来作为工作队列,所以只要小于Integer.MAX_VALUE,都会入队,如果池满了就一直等待,线程Worker做完任务如果没有新的则立即结束
SingleThreadExecutor
单个线程的Executor,corePoolSize与maximumPoolSize都是1,维护一个工作队列
CachedThreadPool
有缓存线程的线程池,corePoolSize为0,maximumPoolSize为Integer.MAX_VALUE,并且每个线程Worker执行完后会等待60s没有任务才关闭,其内部工作队列为SynchronousQueue是没有容量的阻塞队列,所以每个任务只要没有空闲线程,都会直接开一个新的线程Worker,极端情况下CachedThreadPool会因为创建过多的线程而耗尽CPU和内存资源
ScheduledThreadPoolExecutor
支持延时以及周期的执行任务,其将maximumPool置为无效,并且内部使用DelayQueue来周期执行任务
- 池中的任务全部封装成一个ScheduledFutureTask,包含了任务要被执行的时间time,任务执行的间隔时间period
-
获取任务:因为DelayQueue内部是一个PriorityQueue,根据每个Task的time来优先级排序,每次获取任务都用阻塞队列
take
操作直到有time大于当前时间 - 周期任务:周期任务执行完后会修改task中的time为下次执行时间,再将其放回队列