阻塞队列
队列:先进先出的一种数据结构
阻塞队列就是在数据为空的时候,如果从队列中获取数据将被阻塞,如果队列满了的话往队列里面插入数据将会被阻塞。
有界与无界
有界队列就是队列的大小是固定的
无界队列就是队列满了之后可以进行扩容
延时阻塞队列
DelayQueue,延时获取,如果延时没有到,那么将会被阻塞
可以
线程池的参数
核心线程数
最大线程数
- IO密集型
IO密集型一般最大线程数是cpu核心数2
IO密集型一般不是特别占用cpu的资源,有网络操作和磁盘操作。当进行一个网络操作的时候,线程需要等待网络数据的返回,这个时候线程会交出执行权,所以我们设置cpu核心数2的话会保证cpu的资源不会被浪费。 - CPU密集型
CPU密集型一般最大线程数是cpu核心数+1
CPU密集型一般是频繁的数据操作,操作系统中有内存和磁盘存储,操作系统将磁盘存储的一部分拿出来作为虚拟内存,如果一个线程要使用虚拟内存和内存中的数据的时候,由于虚拟内存获取数据的速度远远没有从内存获取数据快,这个时候我们就可以再启动额外的线程来执行下一个数据操作。 - 混合型
如果CPU密集型和IO密集型花的时间差不多,那么可以进行拆分为2个线程池
如果差距很大的话,以时间花费大的为准
线程存活时间
一般来说当线程的run方法执行完了之后线程就会被销毁,那么如何让线程按照存活时间?
我们从阻塞队列中获取数据,如果没有数据线程会被阻塞,他就不会死亡
拒绝策略
当核心线程数,阻塞队列,最大线程数已满的话线程池有拒绝任务的策略
1.抛异常
立马抛出一场
2.抛弃阻塞队列最前面的
删除掉队列中最老的那一个任务
3.抛弃现有的任务
直接把现在要提交的任务删掉
4.交给提交任务的线程处理
如果主线程提交的任务,那么就让主线程来执行。
线程池关闭
shutDown 将还未执行的线程销毁
shutDownNow 使用中断策略interrupt,销毁执行中的线程以及未执行的线程
线程池的原理
1.当线程池中的线程还没有达到核心线程数,那么就创建线程来执行任务
2.如果线程池中的线程还没有达到核心线程数,那么就向阻塞队列中插入
3.如果阻塞队列也满了,那么就看看是否达到最大线程数,如果没有达到最大线程数,那就开启线程执行任务
4.如果达到了最大线程数,那么就使用拒绝策略
零拷贝
用户空间和内核空间的概念
为了保证我们的应用程序访问不到操作系统里面的数据,并且不篡改操作系统的数据,操作系统将内存分为了2部分,一部分是操作系统内存,一部分是用户内存,用户内存无法访问操作系统内存。
但是,如果我们程序需要访问网络的时候,需要操作系统底层调用网卡等设备,我们需要在用户空间里面将请求参数复制到操作系统内存中,当获取到网络结果之后需要将返回数据从操作系统内存拷贝到用户内存中
为了避免两次无用的拷贝,操作系统允许我们的应用程序在操作系统内存中申请一块临时空间,我们只需要将请求数据放到申请的操作系统空间内,当网络数据返回的时候会写入到我们申请的空间内。
DMA
磁盘操作 操作系统会交给磁盘控制器
网络操作 操作系统会交给网络控制器
单机缓存系统
1.通过add和get方法存入和获取数据,当add的时候数据也要放入延时队列中。
2.开启一个子线程获取延时队列的数据,如果拿到数据了说明时间到了,过期了我们就需要将数据删除,如果没有拿到说明还未过期
public class Memory {
private DelayQueue<Data> delayQueue;
private ConcurrentHashMap<String,Object> hashMap=new ConcurrentHashMap<>();
private final Thread thread;
private Memory(){
delayQueue=new DelayQueue<>();
thread = new Thread(new CheckRunnable(this));
thread.start();
}
private static Memory memory;
public static Memory getInstance(){
if (memory==null){
synchronized (Memory.class){
if (memory==null){
memory=new Memory();
}
}
}
return memory;
}
public <T>T get(String key){
return (T) hashMap.get(key);
}
public <T> void add(String key,T value,long time){
delayQueue.add(new Data(key,time));
hashMap.put(key,value);
}
static class CheckRunnable implements Runnable{
private Memory memory;
public CheckRunnable(Memory memory){
this.memory=memory;
}
@Override
public void run() {
Thread thread = Thread.currentThread();
while (!thread.isInterrupted()){
Data take = memory.delayQueue.poll();
if (take!=null) {
if (memory.hashMap.containsKey(take.getKey())) {
System.out.println("remove");
memory.hashMap.remove(take.getKey());
}
}
}
}
}
public void shutDown(){
thread.interrupt();
hashMap.clear();
}
public static void main(String[] args) {
new Thread(){
@Override
public void run() {
super.run();
final Memory instance = Memory.getInstance();
instance.<String>add("name","Z",10000);
instance.<Integer>add("age",18,8000);
int count=1;
while (true){
try {
sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(2000*count);
String name = instance.<String>get("name");
Integer age = instance.<Integer>get("age");
System.out.println("name="+name);
System.out.println("age="+age);
count++;
}
}
}.start();
}
}
public class Data implements Delayed {
private String key;
private long time;
public Data(String key, long timeout){
this.time=timeout+System.currentTimeMillis();
this.key=key;
}
public String getKey(){
return key;
}
@Override
public long getDelay(TimeUnit unit) {
return this.time-System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
if (this.getDelay(TimeUnit.MILLISECONDS)>o.getDelay(TimeUnit.MILLISECONDS)){
return 1;
}else if (this.getDelay(TimeUnit.MILLISECONDS)<o.getDelay(TimeUnit.MILLISECONDS)){
return -1;
}
return 0;
}
}