先从整体上说下
大致分为下面几个包
atomic包
locks 包
线程池&并发集合
并发集合
ConcurrentHashMap
这个集合是建立在HashMap基础上的,可以分jdk1.7和jdk1.8来说
jdk1.7:
由一个个Segment组成,每个Segment里面类似HashMap结构,默认是有16个Segment,此时最多可以允许16个线程并发操作。Segment指定后不可扩容。
jdk1.8:
抛弃了分段锁,结构上与HashMap一致,链表加红黑树
CopyOnWriteArrayList
CopyOnWriteArrayList是一个线程安全ArrayList。
读操作无锁,add操作时采用拷贝一个新数组来操作,读效率很高,但是有可能有脏读
CopyOnWriteArraySet
基于CopyOnWriteArrayList,但是添加的时候得遍历下元素在不在,所以效率低一点
hashset也是添加的时候遍历吗?是的,只不过不是顺序遍历。因为它是基于hashmap实现的,根据hashcode定位到位置后,会判断这个位置有没有元素,有的话key是不是相等,相等则覆盖value
BlockingQueue
这是个接口,实现的有
1.ArrayBlockingQueue 基于数组先进先出,有界
2.LinkedBlockingQueue 基于链表,默认无界(INT最大值),可指定容量
3.PriorityBlockingQueue 无界
lock
AQS
提供基于Lock接口的并发基础类,主要运用CAS实现。
目的:在并发状态下管理被阻塞的线程
作用: 提供一套通用的机制来管理同步状态,阻塞/唤醒线程,管理等待队列等。
方法:acquire()、acquireShared()、release()、releaseShared()、hasWaiters()、cas修改状态及头/尾指针的方法等
核心:等待队列(CLH队列)
CLH队列
1.双向链表
2.结点是对线程的包装,分独占和共享两种类型
3.用前一结点的某一属性表示后一结点的状态
4.自旋方式不断cas插入包含当前线程的结点到尾部
Condition
一般配合lock使用,await()、signal()、signalAll()
Object里面的wait()、notify()、notifyAll()是配合synchronized使用的
ReentrantLock
基于AQS,里面有一个state值,先CAS设置state,如果state!=0,但发现拿锁的是自己则++,获取失败的加到CLH队列尾,内部自旋继续获取锁。
这里可以顺便说下synchronized,它基于monitor对象的进入进出实现,monitor在对象头中,如果被线程持有便处于锁定状态,另外,synchronized同步块时,是显示的用monitorenter和monitorexit两个指令实现同步;而synchronized同步方法时,是隐式的,会取常量池中方法的ACC_SYNCHRONIZED表示是否同步方法,是则获取monitor。早期monitor依赖底层操作系统实现,所以存在用户态到核心态的切换,效率低,但jdk6后做了很多优化,比如偏向锁,轻量锁,锁自旋,锁消除等。
lock与synchronized区别,什么时候用lock
底层原理的区别就是上面说的,至于其它的,lock是接口,synchronized是同步原语;lock需要手动释放,而且lock可以实现公平锁和读写锁;两个都是可重入锁。
atomic
AtomicInteger
里面有一个volatile 修饰的value属性,为了保证值在线程间可见,并发的操作采用CAS,可实现原子递增,
这里也可以看出来,如果一个变量只是volatile修饰是不能实现原子递增的,因为i++涉及2步操作,一个是获取值,一个是赋值。
线程池
有几个关键类要搞清楚
Executor 接口,只有一个execute方法
Executors 封装了一些创建常用线程池的方法
ExecutorService 接口,继承execute,含shutdown、submit、invokeAll等方法
ThreadPoolExecutor,实现ExecutorService,也可用于创建线程池
Future 接口,含cancel、get、isDone等方法
一般常见的线程池如下,可用Executors调用创建
newFixedThreadPool,固定线程数量的
newSingleThreadExecutor ,单个线程的
newCachedThreadPool, 可调节线程数量线程池,有核心线程数量和最大线程数量
newSingleThreadScheduledExecutor, 可执行定时任务的线程池
这几个实现都是调用ThreadPoolExecutor去设置不同的参数。
这里还可以提一下使用异步线程的时候注意try-catch再打个日志, 否则可能造成出现异常又没有日志的情况
private static final ThreadPoolExecutor threadPool = newThreadPoolExecutor(4,10,60,TimeUnit.SECONDS,newLinkedBlockingQueue());
同步&互斥类
CyclicBarrier
可循环利用屏障,有两个构造方法
CyclicBarrier(int parties)和CyclicBarrier(int parties, Runnable barrierAction)
后者,Runnable参数表示当调用cb.await()的的线程数达到指定数量后,开始执行的方法。
比如new CyclicBarrier(4, “do一起出门”)
有4个同学分别开始准备出门,准备好之后都调用cb.await(),表示达到屏障,达到4个之后,就一起出门。
实现原理:
主要基于ReentrantLock,CyclicBarrier里有一个lock属性。
1.每当线程执行await,内部变量count减1,如果count!= 0,说明有线程还未到屏障处,则在锁条件变量trip上等待。
2.当count == 0时,说明所有线程都已经到屏障处,执行条件变量的signalAll方法唤醒等待的线程
CountDownLatch
倒数的门栓,主要有两个方法countDown()和await()。
一般呢在主线程await(),然后开几个线程去执行一些其它的逻辑,执行完后分别调用countDown(), 每次countDown()数量减1,当数量到0后,便会通知主线程继续执行。
实现原理:
基于AQS,AQS里有一个volatile 修饰的state,每次countDown()时会CAS-1,调用await()会判断state是否为0,不是则等待。
这里可以顺便说下CyclicBarrier和CountDownLatch两者的一个区别:
CyclicBarrier可以重复使用,CountDownLatch不能
Semaphore
信号量,主要有acquire() 和 release() 两个方法。
比如初始有10个信号,new Semaphore(10),然后执行方法的时候先调用acquire() 获取一个信号,
如果有则可以执行,并且剩余信号量减1,没有则等待; 获取了信号的线程执行完之后可以释放信号,释放了剩余信号就加1.
实现原理:
也借助AQS,并有公平和非公平两种方式