volatile
是java虚拟机提供的轻量级的同步机制
- 保证可见性
- 不保证原子性
- 禁止指令重排
JMM
(java menory model java内存模型,是一种抽象概念并不真实存在,它描述的是一组规范或规则,通过这组规范定义了程序中各个变量(包括实例字段,静态字段和构成数组对象的元素)的访问方式)
- 线程解锁前,必须把共享变量的值刷新回主内存
- 线程加锁前必须读取主内存的最新值到自己的工作内存
- 加锁解锁是同一把锁
CAS是什么
cas : 比较并更新
如果期望值与物理内存的一样则修改。
返回boolean
automicInteger.compareAndSet(5,1024);
CAS底层原理
Unsafe类
do while
比较当前工作内存中的值和主内存中的值,如果相同则执行规定操作,否则继续比较直到主内存和工作内存中的值一致为止。
CAS缺点
如果CAS失败,会一直进行尝试。如果CAS长时间一直不成功,可能会给CPU带来了很大的开销。
ABA问题
狸猫换太子
线程t1把变量值由A该成B ,再由B改为了A ,对于t2线程来看以为没有发生变化
解决ABA问题
新增版本号或时间戳
AtomicStampedReference<Integer> atomicStampedReference = new AtomicStampedReference<>(100,1);
atomicStampedReference.compareAndSet(100,101,atomicStampedReference.getStamp(),atomicStampedReference.getStamp()+1);
并发异常
java.util.ConcurrentModificationException 并发修改异常
使用线程安全的数据结构
List
- new Vector();
- Collections.synchronizedList(new ArrayList());
- new CopyOnWriteArrayList();底层使用了ReentrantLock锁,在JUC包下
Set
- CopyOnWriteArraySet
Hashset底层是Hashmap
Map
- ConcurrentHashMap
锁
- 公平锁(按照先后)和非公平锁(可插队)
- 可重入锁(递归锁)防止死锁(synchronized和reentrantLock都是可重入锁)
外层方法获取了锁,进入内层的方法会自动获取锁,是使用同一把锁 - 自旋锁
是指尝试获取锁的线程不会立即阻塞,而是采用循环的方式去尝试获取锁,这样的好处是减少线程上下切换的消耗,缺点是循环会消耗CPU
优点:不用阻塞
缺点:耗费性能
package com.dongge.volatiledemo;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class SpinLockDemo {
AtomicReference<Thread> atomicReference = new AtomicReference<>();
public void myLock(){
Thread thread = Thread.currentThread();
System.out.println(Thread.currentThread().getName()+"\t come in ");
while(!atomicReference.compareAndSet(null,thread)){
}
}
public void myUnLock(){
Thread thread = Thread.currentThread();
atomicReference.compareAndSet(thread,null);
System.out.println(Thread.currentThread().getName()+"\t invoked myUnLock");
}
public static void main(String[] args) {
SpinLockDemo spinLockDemo = new SpinLockDemo();
new Thread(()->{
spinLockDemo.myLock();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
spinLockDemo.myUnLock();
},"AA").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
new Thread(()->{
spinLockDemo.myLock();
spinLockDemo.myUnLock();
},"BB").start();
}
}
- 独占锁(写锁)
ReentrantLock和Synchronized都是独占锁
写操作:原子+独占 整个过程必须是一个完整的统一体,中间不许被分割,被打断 - 共享锁(读锁)
是指该锁可以被多个线程所持有
ReentrantReadWriteLock其读锁是共享锁,其写锁是独占锁
package com.dongge.volatiledemo;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class MyCache {
private volatile Map<String, Object> map = new HashMap<>();
private ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
public void put(String key, Object value) {
rwLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "\t正在写入:" + key);
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "\t正在写入完成:" + key);
} catch (Exception e) {
e.printStackTrace();
} finally {
rwLock.writeLock().unlock();
}
}
public void get(String key) {
rwLock.readLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "\t正在读取:" + key);
try {
TimeUnit.MILLISECONDS.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.get(key);
System.out.println(Thread.currentThread().getName() + "\t读取完成:" + key);
} catch (Exception e) {
e.printStackTrace();
} finally {
rwLock.readLock().unlock();
}
}
public void clearMap(){
map.clear();
}
}
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 0; i < 5; i++) {
final int tempInt = i;
new Thread(() -> {
myCache.put(tempInt + "", tempInt + "");
}, String.valueOf(i)).start();
}
for (int i = 0; i < 5; i++) {
final int tempInt = i;
new Thread(() -> {
myCache.get(tempInt + "");
}, String.valueOf(i)).start();
}
}
}
CountDownLatch
规定的线程都完成后,做减法 countDownLatch.countDown();
package com.dongge.volatiledemo;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 0; i < 6; i++) {
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"\t上完自习离开教室");
countDownLatch.countDown();
},String.valueOf(i)).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+"\t班长最后关门走人");
}
}
CyclicBarrier 做加法
都到齐了才开始
package com.dongge.volatiledemo;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierDemo {
public static void main(String[] args) {
CyclicBarrier cyclicBarrier = new CyclicBarrier(7,()->{
System.out.println("召唤神龙");
});
for (int i = 1; i <= 7; i++) {
final int tempInt = i;
new Thread(()->{
System.out.println(Thread.currentThread().getName()+"\t收集到第"+tempInt+"龙珠");
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
},String.valueOf(i)).start();
}
}
}
Semaphore信号量
主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。
package com.dongge.volatiledemo;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreDemo {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);//模拟3个停车位
for (int i = 1; i <= 6; i++) {//模拟6部汽车
new Thread(()->{
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"\t抢到车位");
TimeUnit.SECONDS.sleep(3);
System.out.println(Thread.currentThread().getName()+"\t停车3秒后离开车位");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
},String.valueOf(i)).start();
}
}
}
阻塞队列BlockingQueue
当阻塞队列是空时,从队列中获取元素的操作将会被阻塞。
当阻塞队列是满时,往队列里添加元素的操作将会被阻塞。
多线程的判断一定要用
while(boolean) 不能用if(boolean)
Synchronized与Lock的区别
synchronized是关键字属于JVM层面,自动释放
synchronized要么随机唤醒一个线程,要么全部唤醒
monitorenter monitorexit
lock是API层面的锁,需要主动释放
lock可以帮多个Condition,可以精确唤醒线程
等待是否可中断
synchronized不可中断,除非抛出异常或者正常运行完成
ReetrantLock可中断,1.设置超时方法trylock(long timeout,TimeUnit unit)
2.lockInterruptibly()放代码块中,调用interrupt()方法可中断
生产者与消费者 阻塞队列版
package com.dongge.volatiledemo;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class PrdConsumer_BlockQueueDemo {
public static void main(String[] args) throws Exception {
Resource myResource = new Resource(new ArrayBlockingQueue<>(10));
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t生产线程启动");
try {
myResource.myProd();
} catch (Exception e) {
e.printStackTrace();
}
}, "prod").start();
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "\t消费线程启动");
try {
myResource.myConsumer();
} catch (Exception e) {
e.printStackTrace();
}
}, "consumer").start();
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
myResource.stop();
}
}
class Resource {
private volatile boolean FLAG = true;
private AtomicInteger atomicInteger = new AtomicInteger();
BlockingQueue<String> blockingQueue = null;
public Resource(BlockingQueue<String> blockingQueue) {
this.blockingQueue = blockingQueue;
System.out.println(blockingQueue.getClass().getName());
}
public void myProd() throws Exception {
String data = null;
boolean retValue;
while (FLAG) {
data = atomicInteger.incrementAndGet() + "";
retValue = blockingQueue.offer(data, 2l, TimeUnit.SECONDS);
if (retValue) {
System.out.println(Thread.currentThread().getName() + "\t插入队列" + data + "成功");
} else {
System.out.println(Thread.currentThread().getName() + "\t插入队列" + data + "失败");
}
TimeUnit.SECONDS.sleep(1);
}
System.out.println(Thread.currentThread().getName() + "大老板叫停了,表示FLAG=false,生产动作结束");
}
public void myConsumer() throws Exception {
String result = null;
while (FLAG) {
result = blockingQueue.poll(2l, TimeUnit.SECONDS);
if (null == result || result.equalsIgnoreCase("")) {
return;
}
System.out.println(Thread.currentThread().getName() + "\t消费队列" + result + "成功");
}
}
public void stop() throws Exception {
this.FLAG = false;
}
}
Callable
package com.dongge.volatiledemo;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;
class MyThread implements Callable<Integer> {
@Override
public Integer call() throws Exception {
return 1024;
}
}
public class CallableDemo{
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<Integer> futureTask = new FutureTask<>(new MyThread());
new Thread(futureTask,"AA").start();
while(true){
if(futureTask.isDone()){
System.out.println(futureTask.get());
break;
}
}
}
}
注意 :多个线程使用同一个FutureTask,只能被一个线程执行。
线程池
获取当前主机CPU核数
Runtime.getRuntime().availableProcessors()
使用线程池的优点:
- 提高响应速度,当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高可管理性,线程是稀缺资源,如果无限的创建,不仅会消耗系统资源,还会降低系统稳定性。用完后可以还回池中再次使用。
ThreadPoolExecutor
固定线程数的线程池,执行一个长期的任务
Executors.newFixedThreadPool(int) 使用的是链表阻塞队列
一池一个线程
Executors.newSingleThreadExecutor() 使用的是链表阻塞队列
一池N个线程,执行很短的任务
Executors.newCachedThreadPool() 使用的是同步队列
case 1
public class MyThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool = Executors.newFixedThreadPool(5);
try{
for (int i = 0; i < 10; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"\t办理业务");
});
}
}catch (Exception e){
e.printStackTrace();
}finally {
threadPool.shutdown();
}
}
}
线程池的7大参数
- corePoolSize:线程池中的常驻核心线程数
- maximumPoolSize:线程池能够容纳同时执行的最大线程数,此值必须大于等于1
- keepAliveTime: 多余空闲线程的存活时间
当前线程池数量超过corePoolSize时,当空闲时间达到keepAliveTime值时,多余空闲线程会被销毁直到只剩下corePoolSize个线程为止。 - unit: keepAliveTime 的单位
- workQueue: 任务队列,被提交但尚未被执行的任务
- threadFactory: 表示生成线程池中工作线程的线程工厂,用于创建线程一般默认即可
- handler: 拒绝策略,表示当队列满了并且工作线程大于线程池的最大线程数(maximumPoolSize)时如何来拒绝
当前工作线程大于corePoolSize,就会到workQueue(候客区),并创建线程小于等于maximumPoolSize,当workQueue也满了就会根据拒绝策略处理。
线程池的拒绝策略
- AbortPolicy(默认)直接抛出RejectedExecutionException阻止系统正常运行。
- CallerRunsPolicy:调用运行,一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者(任务由调用者执行)。
- DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入队列中尝试再次提交当前任务。
- DiscardPolicy:直接丢弃任务,不予任何处理也不抛出异常。如果允许任务丢失,这是最好的一种方案。
注意:实际工作中不要Executors来创建线程池,需要手写
public class MyThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(2,5,
1l, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.AbortPolicy());
}
}
public class MyThreadPoolDemo {
public static void main(String[] args) {
ExecutorService threadPool = new ThreadPoolExecutor(2,5,
1l, TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(3),Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());
try {
for (int i = 0; i < 10; i++) {
threadPool.execute(()->{
System.out.println(Thread.currentThread().getName()+"\t办理业务");
});
}
} catch (Exception e) {
e.printStackTrace();
} finally {
threadPool.shutdown();
}
}
}
合理的配置线程池线程数
- CUP密集型(计算密集型)
cup核数+1 - IO密集型
- cup核数 * 2
- cup核数 / 1 - 阻塞系数 (0.8~0.9)
死锁
线程各自持有对方的锁,陷入相互等待。
package com.dongge.volatiledemo;
import java.util.concurrent.TimeUnit;
class HoldLockThread implements Runnable{
private String lockA;
private String lockB;
public HoldLockThread(String lockA, String lockB) {
this.lockA = lockA;
this.lockB = lockB;
}
@Override
public void run() {
synchronized (lockA){
System.out.println(Thread.currentThread().getName()+"\t自己持有"+lockA+"\t尝试获得:"+lockB);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (lockB){
System.out.println(Thread.currentThread().getName()+"\t自己持有"+lockB+"\t尝试获得:"+lockA);
}
}
}
}
public class DeadLockDemo {
public static void main(String[] args) {
String lockA = "lockA";
String lockB = "lockB";
new Thread(new HoldLockThread(lockA,lockB),"AAA").start();
new Thread(new HoldLockThread(lockB,lockA),"BBB").start();
}
}
找出死锁
jps -l 找出出问题的进程
jstack 9636(进程编号)