首先只有在多线程中才有并发和并行的概念。
并行
并行是指两者同时执行,比如赛跑,两个人都在不停的往前跑;(资源够用,比如三个线程,四核的CPU )
并发
并发是指资源有限的情况下,两者交替轮流使用资源,比如一段路(单核CPU资源)同时只能过一个人,A走一段后,让给B,B用完继续给A ,交替使用,目的是提高效率。
区别:
并行是从微观上,也就是在一个精确的时间片刻,有不同的程序在执行,这就要求必须有多个处理器。
并发是从宏观上,在一个时间段上可以看出是同时执行的,比如一个服务器同时处理多个session。
还有个东西 :
线程安全
多个线程访问同一个代码,不能让程序产生一些不确定的结果,这就是线程安全。
并行我们没有什么好做的(无非就是new 几个线程),并发的时候保证线程的安全就是要 "挖掘" 的技术了。jdk1.5 之前落后的synchronized技术确实安全,可是不够刺激啊。所以1.5引入的lock和封装lock的几个常用的集合类ConcurrentHashMap,BlockingQueue,ConcurrentLinkedQueue 这些都是生产力的极大提高啊。现在来deep 的研究一下
ConcurrentHashMap
ConcurrentHashMap,HashMap 都是根据 hash算法(一种将任意内容的输入转换成相同长度输出的加密方式,其输出被称为哈希值) 存储 键值对(key-value)的集合,两者的区别在于前者是线程安全的,而后者不是(HashTable也是线程安全的键值对集合,但是实现的比较 粗糙,性能较低)。
ConcurrentHashMap的三种主要的数据结构:
ConcurrentHashMap(整个hash表), Segment(桶), HashEntry(节点)
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁ReentrantLock,在ConcurrentHashMap里扮演锁的角色,HashEntry则用于存储键值对数据。
下图为ConcurrentHashMap,和HashTable 的模型图:
分段锁
ConcurrentHashMap分为N个segment,每个segment对应一个hash序列表,锁只是相对于每个segment。其实,可以把每个segment看成一个HashTable,每次对当前的一个segment上锁,提高了并行度(讲道理,如果有N个segment,效率会提高N被倍,N默认是16),这就是分段锁ReentrantLock和synchronized
源码非不是java8版(jdk1.8有修改),segment的定义:
static final class Segment<K,V> extends ReentrantLock implements Serializable {
/**
* 在本 segment 范围内,包含的 HashEntry 元素的个数
* 该变量 被声明为 volatile 型 */
transient volatile int count;
/** * table 被更新的次数 */
transient int modCount;
/** * 当 table 中包含的 HashEntry 元素的个数超过本变量值时,触发 table 的再散列 */
transient int threshold;
/**
* table 是由 HashEntry 对象组成的数组
* 如果散列时发生碰撞,碰撞的 HashEntry 对象就以链表的形式链接成一个链表
* table 数组的数组成员代表散列映射表的一个桶
* 每个 table 守护整个 ConcurrentHashMap 包含桶总数的一部分
* 如果并发级别为 16,table 则守护 ConcurrentHashMap 包含的桶总数的 1/16
*/
transient volatile HashEntry<K,V>[] table;
/** * 装载因子 */
final float loadFactor;
Segment(int initialCapacity, float lf) {
loadFactor = lf;
setTable(HashEntry.<K,V>*newArray*(initialCapacity));
}
/**
* 设置 table 引用到这个新生成的 HashEntry 数组
* 只能在持有锁或构造函数中调用本方法 */
void setTable(HashEntry<K,V>[] newTable) {
// 计算临界阀值为新数组的长度与装载因子的乘积
threshold = (int)(newTable.length * loadFactor);
table = newTable;
}
/** 根据 key 的散列值,找到 table 中对应的那个桶(table 数组的某个数组成员) */
HashEntry<K,V> getFirst(int hash) {
HashEntry<K,V>[] tab = table;
// 把散列值与 table 数组长度减 1 的值相“与”,
// 得到散列值对应的 table 数组的下标 // 然后返回 table 数组中此下标对应的 HashEntry 元素
return tab[hash & (tab.length - 1)];
}
}
从上可以看出,ConcurrentHashMap用的是ReentrantLock,其实segment就是继承的ReentrantLock
ConcurrentHashMap的读写操作:
public V put(K key,V value){//写操作
if(value == null)//容器不允许null做值,其实键也是不可以的
throws new NullPointerException();
int hash = hash(key);//计算散列码
return segmentFor(hash).put(key,hash,value,false); // ---根据散列值找到对应的segment---
}
public V get(K key){//读操作
int hash = hash(key.hashCode());
return segmentFor(hash).get(key,hash);
}
final Segment<K,V> segmentFor(int hash) { //找到对应segment
// 将散列值右移 segmentShift 个位,并在高位填充 0
// 然后把得到的值与 segmentMask 相“与”
// 从而得到 hash 值对应的 segments 数组的下标值
// 最后根据下标值返回散列码对应的 Segment 对象
return segments[(hash >>> segmentShift) & segmentMask];
}
在ConcurrentHashmap中,线程对映射表做度操作时,一般情况不加锁可以完成的,对容器做结构性的修改才需要加锁
- segment中的put 方法
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock();//加锁,锁定整个segment,不是hashmap
try {
int c = count;
if (c++ > threshold) // ensure capacity 超过阈值rehash
rehash();
HashEntry[] tab = table;
int index = hash & (tab.length - 1);//下标:把hash值和数组长度-1相与
HashEntry first = (HashEntry) tab[index];
HashEntry e = first;
//e== null 说明找到了空的节点,直接添加,否则,修改旧的value
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue;
if (e != null) {
oldValue = e.value;
if (!onlyIfAbsent)
e.value = value;
}
else {
oldValue = null;
++modCount;
// 链表的头部插入,后面的保持不变
tab[index] = new HashEntry(key, hash, first, value);
count = c; // write-volatile
}
return oldValue;
} finally {
unlock();//解锁
}
}
// HashEntry
static final class HashEntry {
final K key;
final int hash;
volatile V value;
final HashEntry next;
HashEntry(K key, int hash, HashEntry next, V value) {
this.key = key;
this.hash = hash;
this.next = next;
this.value = value;
}
}
请注意,每次插入,如果有hash冲突,都是在插入在链表头的,这样可以避免一个问题:其实在读的过程中,即使同时写,也不会出现问题
BlockingQueue
.....此处应有代码 (等会儿写个生存消费者模型)
ConcurrentLinkedQueue
....此处应有代码