作者目前是清华大学软件学院 IoTDB 组在读学生,参与过 Apache IoTDB UDF 模块的代码维护和功能拓展,本文是作者在阅读 Apache IoTDB UDF 模块代码时的一点总结。
概述
UDF(User Defined Functions) 是数据库查询引擎里较为重要的一个模块,其为数据的高级分析提供了更多可能。
UDF 的使用说明可以参考作者的另一篇文章:
https://zhuanlan.zhihu.com/p/599011218
Apache IoTDB 的 UDF 功能实现总体可以分为三大部分:
- 向用户提供的编程接口,相关代码在包 org.apache.iotdb.udf.api
- 查询框架相关代码,包括 SQL 解析、逻辑计划生成、物理计划生成等
- UDF 查询计算执行时相关逻辑
本文主要对 UDF 查询计算执行时的相关逻辑做概要介绍,主要针对特定接口/抽象类做说明,并分析典型实现帮助理解,希望本文可以帮助读者更轻松地 Debug 阅读 UDF 计算流程的源码。
UDF 查询计算重要接口和工具类
在我看来,要理解 IoTDB 中 UDF 计算的流程,最关键的是理解以下几个接口/工具类的作用:
- IntermediateLayer
- LayerPointReader / LayerRowReader / LayerRowWindowReader
- Transformer
- ElasticSerializableTVList
理解上述接口/抽象类的作用之后再进行 Debug 阅读源码会事半功倍。
IntermediateLayer
作用
UDF 计算流程大致可以分成 InputLayer -> IntermediateLayer -> OutputLayer 三层,IntermediateLayer 封装了计算中间层的逻辑。
出现中间层的设计原因有:
- 查询树节点可能存在公共部分,中间层缓存计算结果可以避免重复计算。
// function 的输入是 a + b,而 a + b 本身也是查询的一列
// 可以直接使用这一列作为输入,没有必要重复计算 a + b
select function(a + b), a + b from root.sg
- 不同列消费数据的位置和速度可能不一致,使用中间层可以使用同一份数据,但是对外屏蔽这种差异
IntermediateLayer 缓存数据,可以通过其构造的 LayerPointReader / LayerRowReader / LayerRowWindowReader 访问 IntermediateLayer 缓存的数据。不同的 reader 类型对应不同的数据访问策略,即按点,按行,按窗口,窗口也可以通过多种形式进行划分。
这里贴出 IntermediateLayer 抽象类的源码:
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iotdb.db.mpp.transformation.dag.intermediate;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.mpp.plan.expression.Expression;
import org.apache.iotdb.db.mpp.transformation.api.LayerPointReader;
import org.apache.iotdb.db.mpp.transformation.api.LayerRowReader;
import org.apache.iotdb.db.mpp.transformation.api.LayerRowWindowReader;
import org.apache.iotdb.udf.api.customizer.strategy.AccessStrategy;
import org.apache.iotdb.udf.api.customizer.strategy.SessionTimeWindowAccessStrategy;
import org.apache.iotdb.udf.api.customizer.strategy.SlidingSizeWindowAccessStrategy;
import org.apache.iotdb.udf.api.customizer.strategy.SlidingTimeWindowAccessStrategy;
import org.apache.iotdb.udf.api.customizer.strategy.StateWindowAccessStrategy;
import java.io.IOException;
public abstract class IntermediateLayer {
protected static final int CACHE_BLOCK_SIZE = 2;
// for debug
protected final Expression expression;
protected final long queryId;
protected final float memoryBudgetInMB;
protected IntermediateLayer(Expression expression, long queryId, float memoryBudgetInMB) {
this.expression = expression;
this.queryId = queryId;
this.memoryBudgetInMB = memoryBudgetInMB;
}
public abstract LayerPointReader constructPointReader();
public abstract LayerRowReader constructRowReader();
public final LayerRowWindowReader constructRowWindowReader(
AccessStrategy strategy, float memoryBudgetInMB) throws QueryProcessException, IOException {
switch (strategy.getAccessStrategyType()) {
case SLIDING_TIME_WINDOW:
return constructRowSlidingTimeWindowReader(
(SlidingTimeWindowAccessStrategy) strategy, memoryBudgetInMB);
case SLIDING_SIZE_WINDOW:
return constructRowSlidingSizeWindowReader(
(SlidingSizeWindowAccessStrategy) strategy, memoryBudgetInMB);
case SESSION_TIME_WINDOW:
return constructRowSessionTimeWindowReader(
(SessionTimeWindowAccessStrategy) strategy, memoryBudgetInMB);
case STATE_WINDOW:
return constructRowStateWindowReader(
(StateWindowAccessStrategy) strategy, memoryBudgetInMB);
default:
throw new IllegalStateException(
"Unexpected access strategy: " + strategy.getAccessStrategyType());
}
}
protected abstract LayerRowWindowReader constructRowSlidingSizeWindowReader(
SlidingSizeWindowAccessStrategy strategy, float memoryBudgetInMB)
throws QueryProcessException;
protected abstract LayerRowWindowReader constructRowSlidingTimeWindowReader(
SlidingTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
throws QueryProcessException, IOException;
protected abstract LayerRowWindowReader constructRowSessionTimeWindowReader(
SessionTimeWindowAccessStrategy strategy, float memoryBudgetInMB)
throws QueryProcessException, IOException;
protected abstract LayerRowWindowReader constructRowStateWindowReader(
StateWindowAccessStrategy strategy, float memoryBudgetInMB)
throws QueryProcessException, IOException;
@Override
public String toString() {
return expression.toString();
}
}
SingleInputMultiReferenceIntermediateLayer
下面以 IntermediateLayer 的实现类 SingleInputMultiReferenceIntermediateLayer 作为例子来具体说明 IntermediateLayer 的作用。
关键成员变量:
// 中间层的输入,要获取新的未缓存过的数据点时,从该 reader 里读取数据
private final LayerPointReader parentLayerPointReader;
// 缓存数据的数据结构,内置 LRUCache,通过将数据溢出至磁盘保证内存不超限
private final ElasticSerializableTVList tvList;
// 配合 tvList,维持一个安全水线,index < 安全水线的数据不会再被使用
// 此时可以安全地抛弃这些不会再使用的数据,减小内存占用
private final SafetyLine safetyLine;
关键方法:
实现 IntermediateLayer 的所有 constructXXXReader 的抽象方法,通过 reader 提供访问中间层数据的方式,各 reader 的逻辑脉络相似。
下面以 constructPointReader() 举例说明,其它 reader 可以借助理解
该方法返回一个 LayerPointReader。该 reader 维护了一个 currentPointIndex,实际的数据来自于 SingleInputMultiReferenceIntermediateLayer#tvList,所有通过该 SingleInputMultiReferenceIntermediateLayer 构造出的 LayerPointReader 实际上都是在读取 tvList 里的数据,只是其 currentPointIndex 可能不同,这样就做到了一份数据提供多个游标来满足多个数据访问者的需要。
private final SafetyPile safetyPile = safetyLine.addSafetyPile();
private boolean hasCached = false;
private int currentPointIndex = -1;
构造的 LayerPointReader 的 next()实现逻辑:
// 如果当前点的 index 已经到了缓存的最大 index
// 那么就要尝试通过数据的源头,即 parentLayerPointReader 读取数据
// 否则直接递增 index
@Override
public boolean next() throws QueryProcessException, IOException {
if (!hasCached
&& (currentPointIndex < tvList.size() - 1
|| LayerCacheUtils.cachePoint(
parentLayerPointReaderDataType, parentLayerPointReader, tvList))) {
++currentPointIndex;
hasCached = true;
}
return hasCached;
}
构造的 LayerPointReader 的 readyForNext()实现逻辑:
@Override
public void readyForNext() {
hasCached = false;
// 所有构造出的 LayerPointReader 都维护安全水线,即会被用到的数据 index 最小值
// index 小于该值的数据不会再被使用,可以被安全地放弃
// SingleInputMultiReference IntermediateLayer 的安全水线就是所有 LayerPointReader
// 安全水线的最小值
safetyPile.moveForwardTo(currentPointIndex + 1);
// evictionUpperBound 需结合 SerializableList 的逻辑来理解
tvList.setEvictionUpperBound(safetyLine.getSafetyLine());
}
LayerPointReader / LayerRowReader / LayerRowWindowReader
IntermediateLayer 向外提供构造这三种 reader 的方法,这三种 reader 接口里封装了按点,按行,按窗口读取数据的逻辑。
在 Apache IoTDB 1.0 查询引擎演进为 MPP 架构时,为了适配 MPP,有了 YeildableReader 接口,该接口的 yield 方法逻辑语义应当与 LayerPointReader / LayerRowReader / LayerRowWindowReader 原有的 next() 方法一致,只是为了适配 MPP 框架而存在。
由于 yield() 和 next() 两套方法的存在,可能导致读者在阅读这块代码的时候感到困惑,为什么需要两种接口?实际是因为在 1.0 版本的 UDF 计算里 next() 方法大部分时间已经不会再被调用了(还有很小一部分场景在使用),由于历史包袱,还没删掉 next() 方法相关的逻辑。读者只需要理解 yield() 和 next() 其中一套的逻辑,就能理解另一套的逻辑,本文主要通过 next() 进行说明。
LayerPointReader
这个接口可以看作是对一个数据集的迭代器。
接口方法:
package org.apache.iotdb.db.mpp.transformation.api;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
import org.apache.iotdb.tsfile.utils.Binary;
import java.io.IOException;
public interface LayerPointReader extends YieldableReader {
boolean isConstantPointReader();
boolean next() throws QueryProcessException, IOException;
void readyForNext();
TSDataType getDataType();
long currentTime() throws IOException;
int currentInt() throws IOException;
long currentLong() throws IOException;
float currentFloat() throws IOException;
double currentDouble() throws IOException;
boolean currentBoolean() throws IOException;
boolean isCurrentNull() throws IOException;
Binary currentBinary() throws IOException;
next() 方法的返回值为布尔类型,实际上这个方法可以看作下述两个方法的结合:
boolean hasNext();
Object next();
即每次调用 next(),都会尝试移动迭代器的游标,如果可以往下移动(还有数据)则返回 true,否则返回 false。
成功调用一次 next() 后需要调用 readyForNext()。
实际返回数据是在调用 next() 且返回 true 之后,可以通过具体的数据类型去访问具体的 currentXXX()。
LayerRowReader
与 LayerPointReader 接口相似,只是访问数据时按照行访问。
public interface LayerRowReader extends YieldableReader {
boolean next() throws IOException, QueryProcessException;
void readyForNext();
TSDataType[] getDataTypes();
long currentTime() throws IOException;
Row currentRow();
/** whether current row fields are all null */
boolean isCurrentNull() throws IOException;
}
LayerRowWindowReader
与 LayerPointReader 接口相似,只是访问数据时按照窗口访问。
public interface LayerRowWindowReader extends YieldableReader {
boolean next() throws IOException, QueryProcessException;
void readyForNext() throws IOException, QueryProcessException;
TSDataType[] getDataTypes();
RowWindow currentWindow();
}
Transformer
实现 LayerPointReader 的一个抽象类,封装了 UDF 和 表达式的计算逻辑。
可以简单的将局部的数据流转方向理解成:
- Transformer 读取 IntermediateLayer 的数据作为输入(通过 IntermediateLayer 的 constructXXXReader 获取相应 reader)
- Transformer 完成数据的计算,包括一元/二元/三元/UDF 计算
- 为 Transformer 封装一个 IntermediateLayer,此时 Transformer 又可以成为该 IntermediateLayer 的数据源,这样就可以形成一颗计算树,从下往上逐层计算。
构造 IntermediateLayer 和 Transformer 的逻辑通过访问者模式被封装在 IntermediateLayerVisitor 类中。
Transformer 的源码如下:
public abstract class Transformer implements LayerPointReader {
protected boolean hasCachedValue;
protected long cachedTime;
protected int cachedInt;
protected long cachedLong;
protected float cachedFloat;
protected double cachedDouble;
protected boolean cachedBoolean;
protected Binary cachedBinary;
protected boolean currentNull;
protected Transformer() {
hasCachedValue = false;
}
@Override
public final boolean next() throws QueryProcessException, IOException {
if (!hasCachedValue) {
hasCachedValue = cacheValue();
}
return hasCachedValue;
}
/** if this method returns true, at least one of the cached field should be set */
protected abstract boolean cacheValue() throws QueryProcessException, IOException;
@Override
public final YieldableState yield() throws IOException, QueryProcessException {
if (hasCachedValue) {
return YieldableState.YIELDABLE;
}
final YieldableState yieldableState = yieldValue();
if (YieldableState.YIELDABLE == yieldableState) {
hasCachedValue = true;
}
return yieldableState;
}
/**
* if this method returns YieldableState.YIELDABLE, at least one of the cached field should be set
*/
protected abstract YieldableState yieldValue() throws QueryProcessException, IOException;
@Override
public final void readyForNext() {
hasCachedValue = false;
currentNull = false;
}
@Override
public final long currentTime() {
return cachedTime;
}
@Override
public final int currentInt() {
return cachedInt;
}
@Override
public final long currentLong() {
return cachedLong;
}
@Override
public final float currentFloat() {
return cachedFloat;
}
@Override
public final double currentDouble() {
return cachedDouble;
}
@Override
public final boolean currentBoolean() {
return cachedBoolean;
}
@Override
public final Binary currentBinary() {
return cachedBinary;
}
@Override
public final boolean isCurrentNull() {
return currentNull;
}
其子类只需要实现 cacheValue() 方法,定义自己的计算逻辑即可。
EasticSerializableTVList
EasticSerializableTVList 位于 org.apache.iotdb.db.mpp.transformation.datastructure,该包定义了 UDF 计算时用到的数据结构。
该类是一个数据点的容器,可以往里面 put 数据,也可以按照 index 读取数据。
为了避免占用内存超限,内置了 LRUCache 和数据溢出至磁盘的逻辑,实现中可以看到这两种思路,主要借鉴了操作系统分页机制。
EasticSerializableTVList 类图如下:
- SerializableList:接口,可以将元素序列化到文件以及从文件中读回
- BatchData:提供了 put 和 get 方法
- LRUCache:基于 LinkedHashMap 实现的 LRUCache
核心成员变量
// 可以理解为 EasticSerializableTVList 将数据分成块
// 每一块就是一个 SerializableTVList
protected List<SerializableTVList> tvLists;
// 用于存储 tvLists 的 index,只有 index 在 cache 中的 SerializableTVList 是位于内存中的
protected LRUCache cache;
// 每一个 SerializableTVList 块的容量
protected int internalTVListCapacity;
// cache 的大小,由于 cache 的每个元素都代表一个 SerializableTVList 数据块
// cacheSize 可以理解为存放在内存中的 SerializableTVList 块的数量
protected int cacheSize;
// 与 tvLists 的元素一一对应
protected List<BitMap> bitMaps;
// EasticSerializableTVList 的逻辑容量
protected int size;
// tvLists 中,index < evictionUpperBound / internalTVListCapacity 的元素不会再被使用
protected int evictionUpperBound;
核心方法
先看构造方法:
protected ElasticSerializableTVList(
TSDataType dataType, long queryId, float memoryLimitInMB, int cacheSize) {
this.dataType = dataType;
this.queryId = queryId;
this.memoryLimitInMB = memoryLimitInMB;
int allocatableCapacity = SerializableTVList.calculateCapacity(dataType, memoryLimitInMB);
internalTVListCapacity = allocatableCapacity / cacheSize;
if (internalTVListCapacity == 0) {
cacheSize = 1;
internalTVListCapacity = allocatableCapacity;
}
this.cacheSize = cacheSize;
cache = new LRUCache(cacheSize);
bitMaps = new ArrayList<>();
tvLists = new ArrayList<>();
size = 0;
evictionUpperBound = 0;
}
- 构造方法中首先计算 internalTVListCapacity,即每个块的容量是多少
- 初始化 cache,cache 的容量由 cacheSize 决定,cacheSize 决定了内存中存放多少块
按照 index 读取数据,以 getInt 为例:
public int getInt(int index) throws IOException {
// index / internalTVListCapacity 计算出数据位于哪个块
// index % internalTVListCapacity 计算出数据位于块的哪一行
return cache.get(index / internalTVListCapacity).getIntByIndex(index % internalTVListCapacity);
}
将数据放入容器,以 putInt 为例:
@Override
public void putInt(long timestamp, int value) throws IOException {
// 首先检查要不要分配新的块
checkExpansion();
// 找到 index 对应的数据块,将数据放入即可
cache.get(size / internalTVListCapacity).putInt(timestamp, value);
++size;
}
private void checkExpansion() {
if (size % internalTVListCapacity == 0) {
tvLists.add(SerializableTVList.newSerializableTVList(dataType, queryId));
bitMaps.add(new BitMap(internalTVListCapacity));
}
}
LRUCache 实现:
private class LRUCache extends Cache {
LRUCache(int capacity) {
super(capacity);
}
// 获取 tvLists 中对应 index 的 SerializableList并更新 cache
BatchData get(int targetIndex) throws IOException {
if (!containsKey(targetIndex)) {
// cache 中没有目标 index,且 cache 已满
// 此时可能需要将元素溢出到磁盘
if (cacheCapacity <= size()) {
int lastIndex = getLast();
// 如果数据不会再被用到则直接设为 null
if (lastIndex < evictionUpperBound / internalTVListCapacity) {
tvLists.set(lastIndex, null);
bitMaps.set(lastIndex, null);
} else {
// 将数据溢出到磁盘
tvLists.get(lastIndex).serialize();
}
}
// 目标数据不在 cache 里面,则肯定不在内存里面,需要读回内存
tvLists.get(targetIndex).deserialize();
}
// LRU 策略,更新 key
// 将元素移出 cache 的操作由 LinkedHashMap 自动完成
putKey(targetIndex);
return tvLists.get(targetIndex);
}
}
public abstract class Cache extends LinkedHashMap<Integer, Integer> {
protected final int cacheCapacity;
protected Cache(int cacheCapacity) {
super(cacheCapacity, 0.75F, true);
this.cacheCapacity = cacheCapacity;
}
@Override
protected boolean removeEldestEntry(Map.Entry<Integer, Integer> eldest) {
return size() > cacheCapacity;
}
// get the eldest key
public int getLast() {
return this.entrySet().iterator().next().getKey();
}
protected Integer putKey(Integer index) {
return put(index, index);
}
}
总结
本文对 UDF 查询计算过程的重要接口和工具类做了简要说明。如果读者希望完整了解 UDF 计算流程,还需要阅读接口的各个实现类代码,以及前文提到的:
- 向用户提供的编程接口,相关代码在包 org.apache.iotdb.udf.api
- 查询框架相关代码,包括 SQL 解析、逻辑计划生成、物理计划生成等
本文仅为个人理解,如有错误请指正~