流程数据的定义在flowable中是比较复杂的, 涉及到多张数据库表关联关系,这些在一个流程引擎中也是最为核心的数据,并且需要进行频繁的读取,所以为了实现读取的高效性,flowable将流程定义的数据放在内存中,需要时直接从内存中获取。本文就简单看一看flowable是怎么实现缓存的。
1. 用户定义的入口
先回顾一下,如果我们要在Spring环境中使用flowable必须采用类似方式定义一个bean
<bean id="processEngineConfiguration"
class="org.flowable.spring.SpringProcessEngineConfiguration">
SpringProcessEngineConfiguration继承于ProcessEngineConfigurationImpl,在ProcessEngineConfigurationImpl源码中可以看到:
protected int processDefinitionCacheLimit = -1; // By default, no limit
protected DeploymentCache<ProcessDefinitionCacheEntry> processDefinitionCache;
protected int processDefinitionInfoCacheLimit = -1; // By default, no limit
protected ProcessDefinitionInfoCache processDefinitionInfoCache;
protected int knowledgeBaseCacheLimit = -1;
protected DeploymentCache<Object> knowledgeBaseCache;
protected int appResourceCacheLimit = -1;
protected DeploymentCache<Object> appResourceCache;
因此我们可以在bean定义中可以设置processDefinitionCacheLimit等属性的值即可控制缓存的容量,如果没有进行设置将不会对其限制,为以防止OOM异常,建议可以设置一个,当超出容量时flowable引擎将会通过LRU算法进行移除。
2. 缓存的初始化
在flowable流程引擎在Spring环境中的启动源码分析这篇文章我们已经知道了flowble流程的初始化过程,那么对缓存的初始化肯定也能在相关内里找到,在ProcessEngineConfigurationImpl的init方法中有如下几行代码:
initProcessDefinitionCache();
initProcessDefinitionInfoCache();
initAppResourceCache();
initKnowledgeBaseCache();
以initProcessDefinitionCache为例看一下方法实现,不用多说:
public void initProcessDefinitionCache() {
if (processDefinitionCache == null) {
if (processDefinitionCacheLimit <= 0) {
processDefinitionCache = new DefaultDeploymentCache<ProcessDefinitionCacheEntry>();
} else {
processDefinitionCache = new DefaultDeploymentCache<ProcessDefinitionCacheEntry>(processDefinitionCacheLimit);
}
}
}
进入DefaultDeploymentCache的构造方法,当没有设置缓存大小时通过无参构造方法创建的是一个同步的Map, 重点可以看一下下面的有参构造函数实现:
/** Cache with no limit */
public DefaultDeploymentCache() {
this.cache = Collections.synchronizedMap(new HashMap<String, T>());
}
/**
* Cache which has a hard limit: no more elements will be cached than the limit.
*/
public DefaultDeploymentCache(final int limit) {
this.cache = Collections.synchronizedMap(new LinkedHashMap<String, T>(limit + 1, 0.75f, true) { // +1 is needed, because the entry is inserted first, before it is removed
// 0.75 is the default (see javadocs)
// true will keep the 'access-order', which is needed to have a real LRU cache
private static final long serialVersionUID = 1L;
protected boolean removeEldestEntry(Map.Entry<String, T> eldest) {
boolean removeEldest = size() > limit;
if (removeEldest && logger.isTraceEnabled()) {
logger.trace("Cache limit is reached, {} will be evicted", eldest.getKey());
}
return removeEldest;
}
});
}
有参构造方法核心是基于LinkedHashMap并且重写了removeEldestEntry方法,当超出容量时会返回true, 查看LinkedHashMap可以知道当调用put或putAll返回前会根据该方法返回的值决定是否移除最老的一个元素,从而实现了LRU缓存算法。
3. 缓存的写入与更新
在流程部署时肯定会涉及到相关数据的更新,通过RepositoryServiceImpl.deploy->DeployCmd.executeDeploy查看有如下代码:
// Actually deploy
commandContext.getProcessEngineConfiguration().getDeploymentManager().deploy(deployment, deploymentSettings);
查看DeploymentManager.deploy->BpmnDeployer.deploy代码:
cachingAndArtifactsManager.updateCachingAndArtifacts(parsedDeployment);
然后想看CachingAndArtifactsManager.updateCachingAndArtifacts方法源码即具体更新缓存的实现:
/**
* Ensures that the process definition is cached in the appropriate places, including the deployment's collection of deployed artifacts and the deployment manager's cache, as well as caching any
* ProcessDefinitionInfos.
*/
public void updateCachingAndArtifacts(ParsedDeployment parsedDeployment) {
CommandContext commandContext = Context.getCommandContext();
final ProcessEngineConfigurationImpl processEngineConfiguration = Context.getProcessEngineConfiguration();
DeploymentCache<ProcessDefinitionCacheEntry> processDefinitionCache = processEngineConfiguration.getDeploymentManager().getProcessDefinitionCache();
DeploymentEntity deployment = parsedDeployment.getDeployment();
for (ProcessDefinitionEntity processDefinition : parsedDeployment.getAllProcessDefinitions()) {
BpmnModel bpmnModel = parsedDeployment.getBpmnModelForProcessDefinition(processDefinition);
Process process = parsedDeployment.getProcessModelForProcessDefinition(processDefinition);
ProcessDefinitionCacheEntry cacheEntry = new ProcessDefinitionCacheEntry(processDefinition, bpmnModel, process);
processDefinitionCache.add(processDefinition.getId(), cacheEntry);
addDefinitionInfoToCache(processDefinition, processEngineConfiguration, commandContext);
// Add to deployment for further usage
deployment.addDeployedArtifact(processDefinition);
}
}
4. 缓存的读取
一个典型的读取场景就是在启动流程的时候,所以查看RuntimeServiceImpl.startProcessInstanceByKey->StartProcessInstanceCmd.execute方法源码:
// Find the process definition
ProcessDefinition processDefinition = null;
if (processDefinitionId != null) {
processDefinition = deploymentCache.findDeployedProcessDefinitionById(processDefinitionId);
if (processDefinition == null) {
throw new FlowableObjectNotFoundException("No process definition found for id = '" + processDefinitionId + "'", ProcessDefinition.class);
}
}
接下来进入DeploymentManager.findDeployedProcessDefinitionById可以看到, 首先会从缓存中查找,如果没有则从数据库中加载:
public ProcessDefinition findDeployedProcessDefinitionById(String processDefinitionId) {
if (processDefinitionId == null) {
throw new FlowableIllegalArgumentException("Invalid process definition id : null");
}
// first try the cache
ProcessDefinitionCacheEntry cacheEntry = processDefinitionCache.get(processDefinitionId);
ProcessDefinition processDefinition = cacheEntry != null ? cacheEntry.getProcessDefinition() : null;
if (processDefinition == null) {
processDefinition = processDefinitionEntityManager.findById(processDefinitionId);
if (processDefinition == null) {
throw new FlowableObjectNotFoundException("no deployed process definition found with id '" + processDefinitionId + "'", ProcessDefinition.class);
}
processDefinition = resolveProcessDefinition(processDefinition).getProcessDefinition();
}
return processDefinition;
}
然后看一下resolveProcessDefinition方法, 当缓存中没有数据时会调用deploy方法来重新加载缓存。
/**
* Resolving the process definition will fetch the BPMN 2.0, parse it and store the {@link BpmnModel} in memory.
*/
public ProcessDefinitionCacheEntry resolveProcessDefinition(ProcessDefinition processDefinition) {
String processDefinitionId = processDefinition.getId();
String deploymentId = processDefinition.getDeploymentId();
ProcessDefinitionCacheEntry cachedProcessDefinition = processDefinitionCache.get(processDefinitionId);
if (cachedProcessDefinition == null) {
if (Flowable5Util.isFlowable5ProcessDefinition(processDefinition, processEngineConfiguration)) {
return Flowable5Util.getFlowable5CompatibilityHandler().resolveProcessDefinition(processDefinition);
}
DeploymentEntity deployment = deploymentEntityManager.findById(deploymentId);
deployment.setNew(false);
deploy(deployment, null);
cachedProcessDefinition = processDefinitionCache.get(processDefinitionId);
if (cachedProcessDefinition == null) {
throw new FlowableException("deployment '" + deploymentId + "' didn't put process definition '" + processDefinitionId + "' in the cache");
}
}
return cachedProcessDefinition;
}
总结一下:flowable缓存的实现核心即基于LinkedHashMap并通过重写其removeEldestEntry方法实现LRU缓存移除算法。以流程定义缓存为例可以知道,每次部署时会将流程定义的数据加入缓存,每次流程启动时都会尝试去缓存中获取数据,如果缓存中有就直接返回,如果没有就从数据库中加载并放入缓存以供下次使用。