开篇
本着尊重原创作者精神,我必须表明这个实现是由组内的同事蜂神实现的,我知识站在前人的肩膀上学习了下具体的实现!!!
致敬原作者的牛逼技术!!!
PalDB限制
- PalDB is optimal in replacing the usage of large in-memory data storage but still use memory (off-heap, yet much less) to do its job. Disabling memory mapping and relying on seeks is possible but is not what PalDB has been optimized for.
- The size of the index is limited to 2GB. There's no limitation in the data size however.PalDB的索引大小不超过2GB
- PalDB is not thread-safe at the moment so synchronization should be done externally if multi-threaded.PalDB是不线程安全的
PalDB线程安全代码实现
在java的世界里线程安全的实现方法无非是通过锁或者通过ThreadLocal变量,加锁会对性能有一定的损耗,而ThreadLocal相比之下性能损耗几乎为零,所以在实现上采用的就是ThreadLocal关键字。
代码实现比较
在实现过程中我们把公用的变量都用ThreadLocal修饰,这样不同的线程读取数据的时候就不会相互干扰,主要变量由:
- dataInputOutput 读取数据保存的缓冲区
- storage PalDB读取实现类
- serialization PalDB自定义实现类的保存变量
//线程不安全版本的ReaderImpl实现类
public final class ReaderImpl implements StoreReader {
// Configuration
private final Configuration config;
// Buffer
private final DataInputOutput dataInputOutput = new DataInputOutput();
// Storage
private final StorageReader storage;
// Serialization
private final StorageSerialization serialization;
// Cache
private final StorageCache cache;
// File
private final File file;
// Opened?
private boolean opened;
//线程安全版本的ReaderImpl实现类
public final class ReaderImpl implements StoreReader {
// Configuration
private final Configuration config;
// Buffer
private final ThreadLocal<DataInputOutput> localDataInputOutput = new ThreadLocal<>();
// = new DataInputOutput();
// Storage
private final ThreadLocal<StorageReader> localStorage = new ThreadLocal<>();
// Serialization
private final ThreadLocal<StorageSerialization> localSerialization = new ThreadLocal<>();
private final Map<Thread, StorageReader> allStorageReader = new ConcurrentHashMap<Thread, StorageReader>();
线程不安全版本
多线程并发读取的时候会因为共同了storage造成读取数据混乱,所以线程安全版本就需要在这几个会造成混乱的变量上做文章。包括:
- dataInputOutput 读取数据保存的缓冲区
- storage PalDB读取实现类
- serialization PalDB自定义实现类的保存变量
public <K> K get(Object key, K defaultValue) {
checkOpen();
if (key == null) {
throw new NullPointerException("The key can't be null");
}
K value = cache.get(key);
if (value == null) {
try {
byte[] valueBytes = storage.get(serialization.serializeKey(key));
if (valueBytes != null) {
Object v = serialization.deserialize(dataInputOutput.reset(valueBytes));
cache.put(key, v);
return (K) v;
} else {
return defaultValue;
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
} else if (value == StorageCache.NULL_VALUE) {
return null;
}
return value;
}
线程安全版本
每次都从本线程的ThreadLocal变量中获取StorageReader读取数据,然后反序列化到ThreadLocal变量中保存的localDataInputOutput存储序列化的数据。保证了变量线程的安全。
public <K> K get(Object key, K defaultValue) {
checkOpen();
if (key == null) {
throw new NullPointerException("The key can't be null");
}
K value = cache.get(key);
if (value == null) {
try {
byte[] valueBytes = localStorage.get().get(localSerialization.get().serializeKey(key));
if (valueBytes != null) {
Object v = localSerialization.get().deserialize(localDataInputOutput.get().reset(valueBytes));
cache.put(key, v);
return (K) v;
} else {
return defaultValue;
}
} catch (Exception ex) {
throw new RuntimeException(ex);
}
} else if (value == StorageCache.NULL_VALUE) {
return null;
}
return value;
}
代码实现核心
在PalDB的api当中所有的get/put动作都会有checkOpen方法调用,所以在这个方法内部我们把线程不安全的变量都进行了拷贝并保存到ThreadLocal当中,这是整个实现的核心精华!!!感谢原作者牛逼的设计!!!
private void checkOpen() {
if (!opened) {
throw new IllegalStateException("The store is closed");
}
StorageReader storage = localStorage.get();
if (null == storage || storage.isClosed()) {
storage = mainStorage.duplicate();
localStorage.set(storage);
allStorageReader.put(Thread.currentThread(), storage);
}
if (null == localDataInputOutput.get()) {
localDataInputOutput.set(new DataInputOutput());
}
if (null == localSerialization.get()) {
localSerialization.set(new StorageSerialization(this.config));
}
}