GroupBy查询
group by是druid的核心查询。
从broker到historical的查询
历史节点groupby查询
1查询入口
QueryResource注入到Guice,启动Jetty server(Jetty+Jersey+guice组合):CliHistorical是历史节点启动入口,这里将QueryResource注入到guice中,JettyServerModule从Guice中取出QueryResource作为servlet加入到Jetty,QueryResource中使用Jersey配置rest接口,接收broker发送过来的请求处理并响应。
QueryResource将请求参数构造出Query对象,ServerManager构造QueryRunner,执行各类查询,返回Sequence,序列化返回给调用方。
如何序列化Sequence
Sequence类似java中的list列表,通过迭代器返回Sequence中元素。强大之处在于,前一个元素已经返回,后一个元素还没有计算出来,流式返回数据。之所以能够这样是因为最终的返回值是一个按照一个或者多个维度值排序的列表,列表的元素是从多个有序的同样是已经排好序的sequence中查到,即执行K路归并排序。当然合并的过程中有可能有维度值相同的元素,则执行聚合操作。
try {
// jgen 的输入流是StreamingOutput,这个流就是http输出流
jgen.writeStartArray();
while (!yielder.isDone()) {
final Object o = yielder.get();
// 这里可能当前数据已经返回了 而下一个元素还没查出来
jgen.writeObject(o);
yielder = yielder.next(null);
}
jgen.writeEndArray();
}
finally {
yielder.close();
}
整个druid的数据返回模式都是这类迭代器模式,查询结果可能很大,流式返回数据减少内存的使用。
2 并发执行查询请求
SegmentManager中管理着当前节点负责加载查询的segment的信息。ServerManager根据查询的参数中的interval(时间范围)找出接下来需要查询的segment分片PartitionChunk,这里也是druid查询快的原因,能够通过时间范围锁定分片数据。通过装饰器构造出一个具体执行查询QueryRunner
FunctionalIterable<QueryRunner<T>> queryRunners = FunctionalIterable
.create(intervals)
.transformCat(
new Function<Interval, Iterable<TimelineObjectHolder<String, ReferenceCountingSegment>>>()
{
@Override
public Iterable<TimelineObjectHolder<String, ReferenceCountingSegment>> apply(Interval input)
{
// 查找符合条件的TimelineObjectHolder,
return timeline.lookup(input);
}
}
)
.transformCat(
new Function<TimelineObjectHolder<String, ReferenceCountingSegment>, Iterable<QueryRunner<T>>>()
{
@Override
public Iterable<QueryRunner<T>> apply(
@Nullable
final TimelineObjectHolder<String, ReferenceCountingSegment> holder
)
{
if (holder == null) {
return null;
}
return FunctionalIterable
.create(holder.getObject())
.transform(
new Function<PartitionChunk<ReferenceCountingSegment>, QueryRunner<T>>()
{
@Override
public QueryRunner<T> apply(PartitionChunk<ReferenceCountingSegment> input)
{
// 装饰器模式构造一条具体查询QueryRunner
return buildAndDecorateQueryRunner(
factory,
toolChest,
input.getObject(),
new SegmentDescriptor(
holder.getInterval(),
holder.getVersion(),
input.getChunkNumber()
),
cpuTimeAccumulator
);
}
}
);
}
}
);
return CPUTimeMetricQueryRunner.safeBuild(
new FinalizeResultsQueryRunner<T>(
// 使用exec, queryRunners两个参数构造QueryRunner,在这个QueryRunner中多线程并发执行上步构造的一个个QueryRunner
// toolChest对返回的结果聚合
toolChest.mergeResults(factory.mergeRunners(exec, queryRunners)),
toolChest
),
toolChest,
emitter,
cpuTimeAccumulator,
true
);
factory.mergeRunners(exec,queryRunners)默认将执行到GroupByMergingQueryRunnerV2。在GroupByMergingQueryRunnerV2中完成关键的并大请求,通过RowBasaedGrouperHelper对每个请求结果排序,聚合,生成迭代器构造Sequence。
return new BaseSequence<>(
new BaseSequence.IteratorMaker<Row, CloseableGrouperIterator<RowBasedKey, Row>>()
{
@Override
public CloseableGrouperIterator<RowBasedKey, Row> make()
{
final List<ReferenceCountingResourceHolder> resources = Lists.newArrayList();
try {
// 堆外内存不足时,使用磁盘文件保存聚合的中间结果
final LimitedTemporaryStorage temporaryStorage = new LimitedTemporaryStorage(
temporaryStorageDirectory,
querySpecificConfig.getMaxOnDiskStorage()
);
final ReferenceCountingResourceHolder<LimitedTemporaryStorage> temporaryStorageHolder =
ReferenceCountingResourceHolder.fromCloseable(temporaryStorage);
resources.add(temporaryStorageHolder);
final ReferenceCountingResourceHolder<ByteBuffer> mergeBufferHolder;
try {
// 节点中配置了merge的堆外内存数量,druid.processing.numMergeBuffers,使用DefaultBlockingPool对堆外内存数组资源进行管理
// This will potentially block if there are no merge buffers left in the pool.
if (hasTimeout) {
final long timeout = timeoutAt - System.currentTimeMillis();
if (timeout <= 0 || (mergeBufferHolder = mergeBufferPool.take(timeout)) == null) {
throw new TimeoutException();
}
} else {
mergeBufferHolder = mergeBufferPool.take();
}
resources.add(mergeBufferHolder);
}
catch (Exception e) {
throw new QueryInterruptedException(e);
}
// 执行排序、聚合、创建迭代器的工具类,grouper是具体执行类
Pair<Grouper<RowBasedKey>, Accumulator<AggregateResult, Row>> pair = RowBasedGrouperHelper.createGrouperAccumulatorPair(
query,
false,
null,
config,
Suppliers.ofInstance(mergeBufferHolder.get()),
combineBufferSupplier,
concurrencyHint,
temporaryStorage,
spillMapper,
combiningAggregatorFactories,
exec,
priority,
hasTimeout,
timeoutAt,
mergeBufferSize
);
final Grouper<RowBasedKey> grouper = pair.lhs;
final Accumulator<AggregateResult, Row> accumulator = pair.rhs;
grouper.init();
final ReferenceCountingResourceHolder<Grouper<RowBasedKey>> grouperHolder =
ReferenceCountingResourceHolder.fromCloseable(grouper);
resources.add(grouperHolder);
ListenableFuture<List<AggregateResult>> futures = Futures.allAsList(
Lists.newArrayList(
Iterables.transform(
queryables,
new Function<QueryRunner<Row>, ListenableFuture<AggregateResult>>()
{
@Override
public ListenableFuture<AggregateResult> apply(final QueryRunner<Row> input)
{
if (input == null) {
throw new ISE(
"Null queryRunner! Looks to be some segment unmapping action happening"
);
}
ListenableFuture<AggregateResult> future = exec.submit(
new AbstractPrioritizedCallable<AggregateResult>(priority)
{
@Override
public AggregateResult call() throws Exception
{
try (
Releaser bufferReleaser = mergeBufferHolder.increment();
Releaser grouperReleaser = grouperHolder.increment()
) {
// 每个查询返回的Sequence执行累加操作accumulate,这个操作时遍历Sequence中的元素,执行accumulator里面的逻辑,
// accumulator里面实际就是根据纬度值聚合指标,将数据保存在哈希表中,哈希表存储在上面分配的堆外内存中
final AggregateResult retVal = input.run(queryPlusForRunners, responseContext)
.accumulate(
AggregateResult.ok(),
accumulator
);
// Return true if OK, false if resources were exhausted.
return retVal;
}
catch (QueryInterruptedException e) {
throw e;
}
catch (Exception e) {
log.error(e, "Exception with one of the sequences!");
throw Throwables.propagate(e);
}
}
}
);
if (isSingleThreaded) {
waitForFutureCompletion(
query,
Futures.allAsList(ImmutableList.of(future)),
hasTimeout,
timeoutAt - System.currentTimeMillis()
);
}
return future;
}
}
)
)
);
// 等待所有查询完成
if (!isSingleThreaded) {
waitForFutureCompletion(query, futures, hasTimeout, timeoutAt - System.currentTimeMillis());
}
// 这一步执行排序,构造迭代器:其中有两步排序,对每个查询结果的排序,对所有查询结果构造迭代器时的K路归并排序
return RowBasedGrouperHelper.makeGrouperIterator(
grouper,
query,
new Closeable()
{
@Override
public void close() throws IOException
{
for (Closeable closeable : Lists.reverse(resources)) {
CloseQuietly.close(closeable);
}
}
}
);
}
catch (Throwable e) {
// Exception caught while setting up the iterator; release resources.
for (Closeable closeable : Lists.reverse(resources)) {
CloseQuietly.close(closeable);
}
throw e;
}
}
@Override
public void cleanup(CloseableGrouperIterator<RowBasedKey, Row> iterFromMake)
{
iterFromMake.close();
}
}
);
线程池完成并发请求
每个节点配置一个待优先级的固定线程池,分配优先数量的线程。每个线程分配一个存储中间结果堆外内存数组,历史节点接收的查询请求,(可能对应多个执行线程 druid.processing.numMergerBuffers)一般比提供Jetty的http数量少(druid.server.http.numThreads)
排序、聚会、生成迭代器过程中的关键类:
- AbstractBufferHashGrouper抽象类,主要完成相同维度值的数据的聚合操作。这个类调用各种聚合的实现方法。
- BufferHashGrouper集成自AbstractBufferHashGrouper,将分配的堆外内存分成两份,一份存放哈希表,一份存档数据行在哈希表中的偏移offset,对数据行排序生成迭代器。
- SpillingGrouper 创建BufferHashGrouper,执行聚合操作,(AbstractBufferHashGrouper.aggreate()),在堆外内存不足时,将结果溢写到磁盘;在生成迭代器的过程中,对堆外内存中数据、磁盘中的数据执行K路合并。
- ConcurrentGrouper,根据维度值拆分的堆外内存数组,根据维度的哈希值获取SpillingGrouper完成聚合,构造迭代器时,调用各个spillingGrouper完成迭代,然后K路合并。新版本中,此处优化,各个spillingGrouper迭代在线程池中。
-
ParallerCombiner 在公用的执行查询轻轻的固定线程池执行K路合并。并行的聚合,会将数据写到磁盘。
分析具体的代码,说明重分片查询结果的聚合到迭代器的生成。
// key是维度值,keyHash是纬度值计算的哈希值
public AggregateResult aggregate(KeyType key, int keyHash)
{
// 维度值序列化后的字节数组,方便与哈希表中的值比较
final ByteBuffer keyBuffer = keySerde.toByteBuffer(key);
if (keyBuffer == null) {
// This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will
// be correct.
return Groupers.DICTIONARY_FULL;
}
if (keyBuffer.remaining() != keySize) {
throw new IAE(
"keySerde.toByteBuffer(key).remaining[%s] != keySerde.keySize[%s], buffer was the wrong size?!",
keyBuffer.remaining(),
keySize
);
}
// 计算数据行在哈希表中的位置
// find and try to expand if table is full and find again
int bucket = hashTable.findBucketWithAutoGrowth(keyBuffer, keyHash);
if (bucket < 0) {
// This may just trigger a spill and get ignored, which is ok. If it bubbles up to the user, the message will
// be correct.
return Groupers.HASH_TABLE_FULL;
}
// 数据行在哈希表中的偏移量
final int bucketStartOffset = hashTable.getOffsetForBucket(bucket);
final boolean bucketWasUsed = hashTable.isBucketUsed(bucket);
// 哈希表
final ByteBuffer tableBuffer = hashTable.getTableBuffer();
// Set up key and initialize the aggs if this is a new bucket.
if (!bucketWasUsed) {
// 纬度值放入哈希表
hashTable.initializeNewBucketKey(bucket, keyBuffer, keyHash);
for (int i = 0; i < aggregators.length; i++) {
// 聚合器初始化,如LongSumBufferAggregator执行buf.putLong(position, 0L)
aggregators[i].init(tableBuffer, bucketStartOffset + aggregatorOffsets[i]);
}
newBucketHook(bucketStartOffset);
}
if (canSkipAggregate(bucketWasUsed, bucketStartOffset)) {
return AggregateResult.ok();
}
// Aggregate the current row.
for (int i = 0; i < aggregators.length; i++) {
// 聚合
aggregators[i].aggregate(tableBuffer, bucketStartOffset + aggregatorOffsets[i]);
}
afterAggregateHook(bucketStartOffset);
return AggregateResult.ok();
}
分配的堆外内存数组在累加的过程中,哈希表大小可能不足,需要将当前的数组中数据序列化到磁盘中。
// key是维度值,keyHash是纬度值计算的哈希值
public AggregateResult aggregate(KeyType key, int keyHash)
{
// 上步的AbstractBufferHashGrouper累加操作
final AggregateResult result = grouper.aggregate(key, keyHash);
if (result.isOk() || !spillingAllowed || temporaryStorage.maxSize() <= 0) {
return result;
} else {
// 哈希表不够用
// Warning: this can potentially block up a processing thread for a while.
try {
spill();
}
catch (TemporaryStorageFullException e) {
return DISK_FULL;
}
catch (IOException e) {
throw Throwables.propagate(e);
}
// Try again.
return grouper.aggregate(key, keyHash);
}
}
private void spill() throws IOException
{
// BufferHashGrouper构造迭代器
try (CloseableIterator<Entry<KeyType>> iterator = grouper.iterator(true)) {
// 迭代器中的数据一行一行序列化压缩写入到文件,后面读的时候,也可以一行一行取出数据
files.add(spill(iterator));
dictionaryFiles.add(spill(keySerde.getDictionary().iterator()));
// 重置哈希表
grouper.reset();
}
}
druid的设计巧妙之处:利用有限堆外内存完成巨量的聚合操作,以及后续的数据读取。和mapreduce类似的原理。
BufferHashGrouper在构造迭代器过程中首先完成了哈希表的排序
/ wrappedOffsets 保存元素在哈希表中的偏移量
final List<Integer> wrappedOffsets = new AbstractList<Integer>()
{
@Override
public Integer get(int index)
{
return offsetList.get(index);
}
@Override
public Integer set(int index, Integer element)
{
final Integer oldValue = get(index);
offsetList.set(index, element);
return oldValue;
}
@Override
public int size()
{
return hashTable.getSize();
}
};
final BufferComparator comparator;
if (useDefaultSorting) {
comparator = keySerde.bufferComparator();
} else {
comparator = keySerde.bufferComparatorWithAggregators(aggregatorFactories, aggregatorOffsets);
}
// Sort offsets in-place.
Collections.sort(
wrappedOffsets,
new Comparator<Integer>()
{
@Override
public int compare(Integer lhs, Integer rhs)
{
// lhs,rhs是堆外内存数组中保存的记录数据行在哈希表中的偏移量,根据偏移量就能找到对应的数据行 就是排序数据的索引,数据不移动
final ByteBuffer tableBuffer = hashTable.getTableBuffer();
return comparator.compare(
tableBuffer,
tableBuffer,
lhs + HASH_SIZE,
rhs + HASH_SIZE
);
}
}
);
return new CloseableIterator<Entry<KeyType>>()
{
int curr = 0;
final int size = getSize();
@Override
public boolean hasNext()
{
return curr < size;
}
@Override
public Entry<KeyType> next()
{
if (curr >= size) {
throw new NoSuchElementException();
}
return bucketEntryForOffset(wrappedOffsets.get(curr++));
}
@Override
public void remove()
{
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException
{
// do nothing
}
};
druid巧妙的通过对偏移量的排序完成了对哈希表的排序。
SpillingGrouper对上一步每个分片产生的多个迭代器执行K路合并排序,没有真的在这里排序,采用sequence的模式,获取迭代器的元素时,执行K路合并排序的逻辑。lazy模式执行。
public CloseableIterator<Entry<KeyType>> iterator(final boolean sorted)
{
final List<CloseableIterator<Entry<KeyType>>> iterators = new ArrayList<>(1 + files.size());
iterators.add(grouper.iterator(sorted));
final Closer closer = Closer.create();
// 磁盘中的文件,保存有排序好的数据行
for (final File file : files) {
final MappingIterator<Entry<KeyType>> fileIterator = read(file, keySerde.keyClazz());
iterators.add(
CloseableIterators.withEmptyBaggage(
Iterators.transform(
fileIterator,
new Function<Entry<KeyType>, Entry<KeyType>>()
{
@Override
public Entry<KeyType> apply(Entry<KeyType> entry)
{
final Object[] deserializedValues = new Object[entry.getValues().length];
for (int i = 0; i < deserializedValues.length; i++) {
deserializedValues[i] = aggregatorFactories[i].deserialize(entry.getValues()[i]);
if (deserializedValues[i] instanceof Integer) {
// Hack to satisfy the groupBy unit tests; perhaps we could do better by adjusting Jackson config.
deserializedValues[i] = ((Integer) deserializedValues[i]).longValue();
}
}
return new Entry<>(entry.getKey(), deserializedValues);
}
}
)
)
);
closer.register(fileIterator);
}
final Iterator<Entry<KeyType>> baseIterator;
// 调用guava的K路合并排序接口Iterators.mergeSorted(iterators, comparator)
if (sortHasNonGroupingFields) {
baseIterator = CloseableIterators.mergeSorted(iterators, defaultOrderKeyObjComparator);
} else {
baseIterator = sorted ?
CloseableIterators.mergeSorted(iterators, keyObjComparator) :
CloseableIterators.concat(iterators);
}
return CloseableIterators.wrap(baseIterator, closer);
}
ConcurrentGrouper根据处理线程数拆分堆外内存数组,构造过个spillingGrouper,聚合操作中,根据维度哈希值选择对应spillingGrouper完成聚合操作;迭代器构造时,将每个分片构造的迭代器再次执行K路合并。
// key是维度值,keyHash是纬度值计算的哈希值
public AggregateResult aggregate(KeyType key, int keyHash)
{
if (!initialized) {
throw new ISE("Grouper is not initialized");
}
if (closed) {
throw new ISE("Grouper is closed");
}
if (!spilling) {
// 根据维度哈希值选择对应的SpillingGrouper
final SpillingGrouper<KeyType> hashBasedGrouper = groupers.get(grouperNumberForKeyHash(keyHash));
synchronized (hashBasedGrouper) {
if (!spilling) {
// SpillingGrouper执行累加操作 堆外内存数组不足时不会spill到文件,因为这里spilling==false
if (hashBasedGrouper.aggregate(key, keyHash).isOk()) {
return AggregateResult.ok();
} else {
spilling = true;
}
}
}
}
// At this point we know spilling = true
final SpillingGrouper<KeyType> tlGrouper = threadLocalGrouper.get();
synchronized (tlGrouper) {
tlGrouper.setSpillingAllowed(true);
return tlGrouper.aggregate(key, keyHash);
}
}
public CloseableIterator<Entry<KeyType>> iterator(final boolean sorted)
{
if (!initialized) {
throw new ISE("Grouper is not initialized");
}
if (closed) {
throw new ISE("Grouper is closed");
}
// 排序的话,会执行parallelSortAndGetGroupersIterator()方法,这里会往公用的固定线程池提交任务,执行上一步SpillingGrouper构造迭代器的逻辑,等待所有任务执行完成
final List<CloseableIterator<Entry<KeyType>>> sortedIterators = sorted && isParallelizable() ?
parallelSortAndGetGroupersIterator() :
getGroupersIterator(sorted);
// Parallel combine is used only when data is spilled. This is because ConcurrentGrouper uses two different modes
// depending on data is spilled or not. If data is not spilled, all inputs are completely aggregated and no more
// aggregation is required.
if (sorted && spilling && parallelCombiner != null) {
// First try to merge dictionaries generated by all underlying groupers. If it is merged successfully, the same
// merged dictionary is used for all combining threads
final List<String> dictionary = tryMergeDictionary();
if (dictionary != null) {
// 多线程执行K路合并排序
return parallelCombiner.combine(sortedIterators, dictionary);
}
}
// 单线程执行K路合并排序
return sorted ?
CloseableIterators.mergeSorted(sortedIterators, keyObjComparator) :
CloseableIterators.concat(sortedIterators);
}