数据库中间件 Sharding-JDBC 源码分析 —— 结果归并

1. 概述

本文分享查询结果归并的源码实现。
正如前文《SQL 执行》提到的“分表分库,需要执行的 SQL 数量从单条变成了多条”,多个 SQL执行 结果必然需要进行合并,例如:

SELECT * FROM t_order ORDER BY create_time

在各分片排序完后,Sharding-JDBC 获取到结果后,仍然需要再进一步排序。目前有 分页分组排序聚合列迭代 五种场景需要做进一步处理。当然,如果单分片 SQL执行 结果是无需合并的。下面我们一起看看查询结果归并的实现。

2. MergeEngine

MergeEngine,分片结果集归并引擎。

public final class MergeEngine {
    // 结果集集合
    private final List<ResultSet> resultSets;
    // Select SQL语句对象
    private final SelectStatement selectStatement;
    // 查询列名与位置映射
    private final Map<String, Integer> columnLabelIndexMap;
    
    public MergeEngine(final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException {
        this.resultSets = resultSets;
        this.selectStatement = selectStatement;
        // 获得 查询列名与位置映射
        columnLabelIndexMap = getColumnLabelIndexMap(resultSets.get(0));
    }
    
   /**
    * 获得 查询列名与位置映射
    *
    * @param resultSet 结果集
    * @return 查询列名与位置映射
    * @throws SQLException 当结果集已经关闭
    */
    private Map<String, Integer> getColumnLabelIndexMap(final ResultSet resultSet) throws SQLException {
        ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
        Map<String, Integer> result = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
        for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
            result.put(SQLUtil.getExactlyValue(resultSetMetaData.getColumnLabel(i)), i);
        }
        return result;
    }
    ...
}

当 MergeEngine 被创建时,会传入 resultSets 结果集集合,并根据其获得 columnLabelIndexMap 查询列名与位置映射。通过 columnLabelIndexMap,可以很方便的使用查询列名获得在返回结果记录列( header )的第几列。

MergeEngine#merge()方法作为入口提供查询结果归并功能。

    /**
     * Merge result sets.
     *
     * @return merged result set.
     * @throws SQLException SQL exception
     */
    public ResultSetMerger merge() throws SQLException {
        selectStatement.setIndexForItems(columnLabelIndexMap);
        return decorate(build());
    }

MergeEngine#merge()主体逻辑就两行代码,设置查询列位置信息,并返回合适的归并结果集接口( ResultSetMerger ) 实现。

2.1 SelectStatement#setIndexForItems()

// SelectStatement.java
/**
* 为选择项设置索引.
* 
* @param columnLabelIndexMap 列标签索引字典
*/
public void setIndexForItems(final Map<String, Integer> columnLabelIndexMap) {
   setIndexForAggregationItem(columnLabelIndexMap);
   setIndexForOrderItem(columnLabelIndexMap, orderByItems);
   setIndexForOrderItem(columnLabelIndexMap, groupByItems);
}

部分查询列是经过推到出来,在 SQL解析 过程中,未获得到查询列位置,需要通过该方法进行初始化。对这块不了解的同学,回头可以看下《SQL 解析》

  • setIndexForAggregationItem()处理 AVG 聚合计算列推导出其对应的 SUM / COUNT 聚合计算列的位置:
private void setIndexForAggregationItem(final Map<String, Integer> columnLabelIndexMap) {
        for (AggregationSelectItem each : getAggregationSelectItems()) {
            Preconditions.checkState(columnLabelIndexMap.containsKey(each.getColumnLabel()), String.format("Can't find index: %s, please add alias for aggregate selections", each));
            each.setIndex(columnLabelIndexMap.get(each.getColumnLabel()));
            for (AggregationSelectItem derived : each.getDerivedAggregationSelectItems()) {
                Preconditions.checkState(columnLabelIndexMap.containsKey(derived.getColumnLabel()), String.format("Can't find index: %s", derived));
                derived.setIndex(columnLabelIndexMap.get(derived.getColumnLabel()));
            }
        }
    }
  • setIndexForOrderItem()处理 ORDER BY / GROUP BY 列不在查询列推导出的查询列的位置:
private void setIndexForOrderItem(final Map<String, Integer> columnLabelIndexMap, final List<OrderItem> orderItems) {
        for (OrderItem each : orderItems) {
            if (-1 != each.getIndex()) {
                continue;
            }
            Preconditions.checkState(columnLabelIndexMap.containsKey(each.getColumnLabel()), String.format("Can't find index: %s", each));
            if (columnLabelIndexMap.containsKey(each.getColumnLabel())) {
                each.setIndex(columnLabelIndexMap.get(each.getColumnLabel()));
            }
        }
    }

2.2 ResultSetMerger

ResultSetMerger,归并结果集接口。

我们先来看看整体的类结构关系:


从功能上分成四种:

  • 分组:GroupByMemoryResultSetMerger、GroupByStreamResultSetMerger;包含聚合列
  • 排序:OrderByStreamResultSetMerger
  • 迭代:IteratorStreamResultSetMerger
  • 分页:LimitDecoratorResultSetMerger

从实现方式上分成三种:

  • Stream 流式:AbstractStreamResultSetMerger
  • Memory 内存:AbstractMemoryResultSetMerger
  • Decorator 装饰者:AbstractDecoratorResultSetMerger

Stream 流式:将数据游标与结果集的游标保持一致,顺序的从结果集中一条条的获取正确的数据。看完下文第三节 OrderByStreamResultSetMerger 可以形象的理解。

Memory 内存:需要将结果集的所有数据都遍历并存储在内存中,再通过内存归并后,将内存中的数据封装成结果集返回。看完下文第五节 GroupByMemoryResultSetMerger 可以形象的理解。

Decorator 装饰者:可以和前二者任意组合

MergeEngine#merge的结果ResultSetMerger封装在ShardingResultSet中:

public final class ShardingResultSet extends AbstractResultSetAdapter {
    // 合并结果器
    private final ResultSetMerger mergeResultSet;
    
    public ShardingResultSet(final List<ResultSet> resultSets, final ResultSetMerger mergeResultSet, final Statement statement) {
        super(resultSets, statement);
        this.mergeResultSet = mergeResultSet;
    }
    
    // 获取下一条记录
    @Override
    public boolean next() throws SQLException {
        return mergeResultSet.next();
    }
    
    @Override
    public boolean wasNull() throws SQLException {
        return mergeResultSet.wasNull();
    }
    
    @Override
    public boolean getBoolean(final int columnIndex) throws SQLException {
        return (boolean) ResultSetUtil.convertValue(mergeResultSet.getValue(columnIndex, boolean.class), boolean.class);
    }
    ...
    @Override
    public Object getObject(final int columnIndex) throws SQLException {
        return mergeResultSet.getValue(columnIndex, Object.class);
    }
    
    @Override
    public Object getObject(final String columnLabel) throws SQLException {
        return mergeResultSet.getValue(columnLabel, Object.class);
    }
}

ShardingResultSet 实现了ResultSet。由上一篇文章可知,查询语句的入口是ShardingPreparedStatement#execute方法,之所以未采用常用的executeQuery,是因为后者只支持返回一个结果集ResultSet,不符合分片的场景。使用方法execute执行该过程后,必须调用方法Statement#getResultSet获得第一个结果集,然后调用适当的ResultSet#getXXX方法获取其中的值。要获得第二个结果集,需要先调用getMoreResults方法,然后再调用Statement#getResultSet方法。因为ShardingPreparedStatement实现了Statement,于是会走ShardingPreparedStatement#getResultSet方法:

public ResultSet getResultSet() throws SQLException {
        if (null != currentResultSet) {
            return currentResultSet;
        }
        if (1 == routedStatements.size()) {
            currentResultSet = routedStatements.iterator().next().getResultSet();
            return currentResultSet;
        }
        List<ResultSet> resultSets = new ArrayList<>(routedStatements.size());
        // 将多个分片结果集存入集合中
        for (PreparedStatement each : routedStatements) {
            resultSets.add(each.getResultSet());
        }
        // 如果为查询语句,则进行结果归并操作
        if (routeResult.getSqlStatement() instanceof SelectStatement) {
            currentResultSet = new ShardingResultSet(resultSets, new MergeEngine(resultSets, (SelectStatement) routeResult.getSqlStatement()).merge(), this);
        } else {
            currentResultSet = resultSets.get(0);
        }
        return currentResultSet;
    }

如果为查询语句,则进行结果归并操作MergeEngine#merge,将对应的合并结果集器ResultSetMerger存入分片结果集ShardingResultSet中。

当调用ResultSet#next方法移动结果集的游标时,就会走ShardingResultSet#next方法。这样取数据时ResultSet#getXXX,就可以通过合并结果器ResultSetMerger#getValue()来获得合并后的数据。

// MergeEngine.java
    /**
     * 合并结果集
     *
     * @return merged result set.
     * @throws SQLException SQL exception
     */
    public ResultSetMerger merge() throws SQLException {
        selectStatement.setIndexForItems(columnLabelIndexMap);
        return decorate(build());
    }
    
    private ResultSetMerger build() throws SQLException {
        if (!selectStatement.getGroupByItems().isEmpty() || !selectStatement.getAggregationSelectItems().isEmpty()) {
            if (selectStatement.isSameGroupByAndOrderByItems()) {
                // groupBy 和 orderBy 项相同
                return new GroupByStreamResultSetMerger(columnLabelIndexMap, resultSets, selectStatement);
            } else {
                // groupBy 和 orderBy 项不同
                return new GroupByMemoryResultSetMerger(columnLabelIndexMap, resultSets, selectStatement);
            }
        }
        if (!selectStatement.getOrderByItems().isEmpty()) {
            //  orderBy 存在
            return new OrderByStreamResultSetMerger(resultSets, selectStatement.getOrderByItems());
        }
        return new IteratorStreamResultSetMerger(resultSets);
    }
    
    private ResultSetMerger decorate(final ResultSetMerger resultSetMerger) throws SQLException {
        Limit limit = selectStatement.getLimit();
        if (null == limit) {
            return resultSetMerger;
        }
        if (DatabaseType.MySQL == limit.getDatabaseType() || DatabaseType.PostgreSQL == limit.getDatabaseType() || DatabaseType.H2 == limit.getDatabaseType()) {
            // MySQL/PostgreSQL/H2 使用 LimitDecoratorResultSetMerger 装饰
            return new LimitDecoratorResultSetMerger(resultSetMerger, selectStatement.getLimit());
        }
        if (DatabaseType.Oracle == limit.getDatabaseType()) {
            return new RowNumberDecoratorResultSetMerger(resultSetMerger, selectStatement.getLimit());
        }
        if (DatabaseType.SQLServer == limit.getDatabaseType()) {
            return new TopAndRowNumberDecoratorResultSetMerger(resultSetMerger, selectStatement.getLimit());
        }
        return resultSetMerger;
    }
2.2.1 AbstractStreamResultSetMerger

AbstractStreamResultSetMerger,流式归并结果集抽象类,提供从当前结果集获得行数据。

public abstract class AbstractStreamResultSetMerger implements ResultSetMerger {
    // 当前结果集
    private ResultSet currentResultSet;
    
    private boolean wasNull;
    
    protected ResultSet getCurrentResultSet() throws SQLException {
        if (null == currentResultSet) {
            throw new SQLException("Current ResultSet is null, ResultSet perhaps end of next.");
        }
        return currentResultSet;
    }
    
    @Override
    public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
        Object result;
        if (Object.class == type) {
            result = getCurrentResultSet().getObject(columnIndex);
        } else if (boolean.class == type) {
            result = getCurrentResultSet().getBoolean(columnIndex);
        } else if (byte.class == type) {
            result = getCurrentResultSet().getByte(columnIndex);
        } else if (short.class == type) {
            result = getCurrentResultSet().getShort(columnIndex);
        } 
        ... // 省略其他数据类型读取类似代码
    }
}
2.2.2 AbstractMemoryResultSetMerger

AbstractMemoryResultSetMerger,内存归并结果集抽象类,提供从内存数据行对象( MemoryResultSetRow ) 获得行数据。

public abstract class AbstractMemoryResultSetMerger implements ResultSetMerger {
    // 字段和其位置的映射
    private final Map<String, Integer> labelAndIndexMap;
    // 内存数据行对象
    @Setter
    private MemoryResultSetRow currentResultSetRow;
    
    private boolean wasNull;
    
    @Override
    public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
        if (Blob.class == type || Clob.class == type || Reader.class == type || InputStream.class == type || SQLXML.class == type) {
            throw new SQLFeatureNotSupportedException();
        }
        Object result = currentResultSetRow.getCell(columnIndex);
        wasNull = null == result;
        return result;
    }
    ...
}

和 AbstractStreamResultSetMerger 对比,貌似区别不大?!确实,从抽象父类上看,两种实现方式差不多。抽象父类提供给实现子类的是数据读取的功能,真正的流式归并、内存归并是在子类实现上体现。

/**
 * 内存数据行对象.
 * 
 * @author zhangliang
 */
public class MemoryResultSetRow {
    
    // 行数据
    private final Object[] data;
    
    public MemoryResultSetRow(final ResultSet resultSet) throws SQLException {
        data = load(resultSet);
    }
    
    // 加载 ResultSet 当前行数据到内存
    private Object[] load(final ResultSet resultSet) throws SQLException {
        int columnCount = resultSet.getMetaData().getColumnCount();
        Object[] result = new Object[columnCount];
        for (int i = 0; i < columnCount; i++) {
            // resultSet 的下标是从 1 开始的
            result[i] = resultSet.getObject(i + 1);
        }
        return result;
    }
    
    /**
     * 获取数据.
     * 
     * @param columnIndex 列索引
     * @return 数据
     */
    public Object getCell(final int columnIndex) {
        Preconditions.checkArgument(columnIndex > 0 && columnIndex < data.length + 1);
        return data[columnIndex - 1];
    }
    
    /**
     * 设置数据.
     *
     * @param columnIndex 列索引
     * @param value 值
     */
    public void setCell(final int columnIndex, final Object value) {
        Preconditions.checkArgument(columnIndex > 0 && columnIndex < data.length + 1);
        data[columnIndex - 1] = value;
    }
}

调用load()方法,将当前结果集的一条行数据加载到内存。

2.2.3 AbstractDecoratorResultSetMerger

AbstractDecoratorResultSetMerger,装饰结果集归并抽象类,通过调用其装饰的归并对象getValue()方法获得行数据。

public abstract class AbstractDecoratorResultSetMerger implements ResultSetMerger {

    /**
     * 装饰的归并对象
     */
    private final ResultSetMerger resultSetMerger;
        
    @Override
    public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
        return resultSetMerger.getValue(columnIndex, type);
    }
}

3. OrderByStreamResultSetMerger

OrderByStreamResultSetMerger,基于 Stream 方式排序归并结果集实现。

3.1 归并算法

归并操作(merge),也叫归并算法,指的是将两个已经排序的序列合并成一个序列的操作。归并排序算法依赖归并操作。
【迭代法】
1. 申请空间,使其大小为两个已经排序序列之和,该空间用来存放合并后的序列
2. 设定两个指针,最初位置分别为两个已经排序序列的起始位置
3. 比较两个指针所指向的元素,选择相对小的元素放入到合并空间,并移动指针到下一位置
4. 重复步骤 3 直到某一指针到达序列尾
5. 将另一序列剩下的所有元素直接复制到合并序列尾

public class OrderByStreamResultSetMerger extends AbstractStreamResultSetMerger {
    // 排序列
    private final List<OrderItem> orderByItems;

    // 排序值对象队列
    @Getter(AccessLevel.PROTECTED)
    private final Queue<OrderByValue> orderByValuesQueue;
    
    // 是否第一个 ResultSet 已经调用 #next()
    @Getter(AccessLevel.PROTECTED)
    private boolean isFirstNext;
    
    public OrderByStreamResultSetMerger(final List<ResultSet> resultSets, final List<OrderItem> orderByItems) throws SQLException {
        this.orderByItems = orderByItems;
        this.orderByValuesQueue = new PriorityQueue<>(resultSets.size());
        orderResultSetsToQueue(resultSets);
        isFirstNext = true;
    }
    
    private void orderResultSetsToQueue(final List<ResultSet> resultSets) throws SQLException {
        for (ResultSet each : resultSets) {
            OrderByValue orderByValue = new OrderByValue(each, orderByItems);
            if (orderByValue.next()) {
                orderByValuesQueue.offer(orderByValue);
            }
        }
        // 设置当前 ResultSet,这样 #getValue() 能拿到记录
        setCurrentResultSet(orderByValuesQueue.isEmpty() ? resultSets.get(0) : orderByValuesQueue.peek().getResultSet());
    }
    ...
}

属性 orderByValuesQueue 使用的队列实现是优先级队列( PriorityQueue )。我们记住几个方法的用途:

  • #offer():增加元素。增加时,会将该元素和已有元素们按照优先级进行排序
  • #peek():获得优先级第一的元素
  • #pool():获得优先级第一的元素并移除

一个 ResultSet 构建一个 OrderByValue 用于排序,即上文归并算法提到的“空间”。

public final class OrderByValue implements Comparable<OrderByValue> {
    // 已排序结果集
    @Getter
    private final ResultSet resultSet;
    // 排序列
    private final List<OrderItem> orderByItems;
    // 排序列对应的值数组, 因为一条记录可能有多个排序列,所以是数组
    private List<Comparable<?>> orderValues;
    
    /**
     * 遍历下一个结果集游标.
     *
     * @return has next data
     * @throws SQLException SQL Exception
     */
    public boolean next() throws SQLException {
        boolean result = resultSet.next();
        orderValues = result ? getOrderValues() : Collections.<Comparable<?>>emptyList();
        return result;
    }
    
    // 获得 排序列对应的值数组
    private List<Comparable<?>> getOrderValues() throws SQLException {
        List<Comparable<?>> result = new ArrayList<>(orderByItems.size());
        for (OrderItem each : orderByItems) {
            Object value = resultSet.getObject(each.getIndex());
            Preconditions.checkState(null == value || value instanceof Comparable, "Order by value must implements Comparable");
            result.add((Comparable<?>) value);
        }
        return result;
    }
    
    @Override
    public int compareTo(final OrderByValue o) {
        for (int i = 0; i < orderByItems.size(); i++) {
            OrderItem thisOrderBy = orderByItems.get(i);
            int result = ResultSetUtil.compareTo(orderValues.get(i), o.orderValues.get(i), thisOrderBy.getType(), thisOrderBy.getNullOrderType());
            if (0 != result) {
                return result;
            }
        }
        return 0;
    }
}

调用OrderByValue#next()方法时,获得其对应结果集排在第一条的记录,通过getOrderValues()计算该记录的排序字段值。这样两个OrderByValue 通过compareTo()方法可以比较两个结果集。

3.2 #next()

通过调用OrderByStreamResultSetMerger#next()获得orderByValuesQueue中当前排在第一的记录。

// OrderByStreamResultSetMerger.java
@Override
public boolean next() throws SQLException {
   if (orderByValuesQueue.isEmpty()) {
       return false;
   }
   if (isFirstNext) {
       isFirstNext = false;
       return true;
   }
   // 取出 orderByValuesQueue 中第一个(优先级最高)OrderByValue
   OrderByValue firstOrderByValue = orderByValuesQueue.poll();
   // 如果该 OrderByValue 中的 ResultSet 还有下一条记录,继续添加到队列中
   if (firstOrderByValue.next()) {
       orderByValuesQueue.offer(firstOrderByValue);
   }
   if (orderByValuesQueue.isEmpty()) {
       return false;
   }
   // 设置当前 ResultSet
   setCurrentResultSet(orderByValuesQueue.peek().getResultSet());
   return true;
}

isFirstNext变量的判断看着是不是很“灵异”?因为 orderResultSetsToQueue()(构造器中调用)处设置了第一次的 ResultSet。如果不加这个标记,会导致第一条记录“不见”了。

通过不断的Queue#poll()Queue#offset()实现排序。巧妙!

4. GroupByStreamResultSetMerger

GroupByStreamResultSetMerger,基于 Stream 方式分组归并结果集实现。 它继承自 OrderByStreamResultSetMerger,在排序的逻辑上,实现分组功能。实现原理也较为简单:

public final class GroupByStreamResultSetMerger extends OrderByStreamResultSetMerger {

    /**
     * 查询列名与位置映射
     */
    private final Map<String, Integer> labelAndIndexMap;
    /**
     * Select SQL语句对象
     */
    private final SelectStatement selectStatement;
    /**
     * 当前结果记录
     */
    private final List<Object> currentRow;
    /**
     * 当前记录 GROUP BY 条件
     */
    private List<?> currentGroupByValues;
    
    public GroupByStreamResultSetMerger(
            final Map<String, Integer> labelAndIndexMap, final List<ResultSet> resultSets, final SelectStatement selectStatement, final OrderType nullOrderType) throws SQLException {
        super(resultSets, selectStatement.getOrderByItems(), nullOrderType);
        this.labelAndIndexMap = labelAndIndexMap;
        this.selectStatement = selectStatement;
        currentRow = new ArrayList<>(labelAndIndexMap.size());
        // 初始化当前记录 GROUP BY 条件
        currentGroupByValues = getOrderByValuesQueue().isEmpty() ? Collections.emptyList() : new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues();
    }
    
    @Override
    public Object getValue(final int columnIndex, final Class<?> type) throws SQLException {
        // columnIndex 是从 1 开始的
        return currentRow.get(columnIndex - 1);
    }
    @Override
    public Object getValue(final String columnLabel, final Class<?> type) throws SQLException {
        Preconditions.checkState(labelAndIndexMap.containsKey(columnLabel), String.format("Can't find columnLabel: %s", columnLabel));
        return currentRow.get(labelAndIndexMap.get(columnLabel) - 1);
    }
}

currentRow为当前结果记录,使用#getValue()方法获得当前结果记录的查询列值。

currentGroupByValues为当前记录 GROUP BY 条件,通过 GroupByValue 生成:

public final class GroupByValue {
    
    // 分组条件值数组
    private final List<?> groupValues;
    
    public GroupByValue(final ResultSet resultSet, final List<OrderItem> groupByItems) throws SQLException {
        groupValues = getGroupByValues(resultSet, groupByItems);
    }
    
     /**
     * 获得分组条件值数组
     * 例如,`GROUP BY user_id, order_status` 返回的某条记录结果为 `userId = 1, order_status = 3`,对应的 `groupValues = [1, 3]`
     * @param resultSet 结果集(单分片)
     * @param groupByItems 分组列
     * @return 分组条件值数组
     * @throws SQLException 当结果集关闭
     */
    private List<?> getGroupByValues(final ResultSet resultSet, final List<OrderItem> groupByItems) throws SQLException {
        List<Object> result = new ArrayList<>(groupByItems.size());
        for (OrderItem each : groupByItems) {
            result.add(resultSet.getObject(each.getIndex()));
        }
        return result;
    }
}

GroupByStreamResultSetMerger 在创建时,当前结果记录实际未合并,需要先调用#next(),再使用#getValue()等方法获取值。

4.1 AggregationUnit

AggregationUnit,归并计算单元接口,有两个接口方法:

  • #merge():归并聚合值
  • #getResult():获取计算结果

一共有三个实现类:

  • AccumulationAggregationUnit:累加聚合单元,解决 COUNT、SUM 聚合列
  • ComparableAggregationUnit:比较聚合单元,解决 MAX、MIN 聚合列
  • AverageAggregationUnit:平均值聚合单元,解决 AVG 聚合列

实现都比较易懂,我们就不浪费篇幅贴代码啦。

4.2 #next()

我们先看看大体的调用流程:



看起来代码比较多,逻辑其实比较清晰,对照着顺序图顺序往下读即可。

// GroupByStreamResultSetMerger.java
@Override
public boolean next() throws SQLException {
   // 清除当前结果记录
   currentRow.clear();
   if (getOrderByValuesQueue().isEmpty()) {
       return false;
   }
   //
   if (isFirstNext()) {
       super.next();
   }
   // 顺序合并相同分组条件的记录
   if (aggregateCurrentGroupByRowAndNext()) {
       // 生成下一条结果记录 GROUP BY 条件,作为当前条件
       currentGroupByValues = new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues();
   }
   return true;
}

private boolean aggregateCurrentGroupByRowAndNext() throws SQLException {
   boolean result = false;
   // 生成计算单元
   Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap = Maps.toMap(selectStatement.getAggregationSelectItems(), new Function<AggregationSelectItem, AggregationUnit>() {
       
       @Override
       public AggregationUnit apply(final AggregationSelectItem input) {
           return AggregationUnitFactory.create(input.getType());
       }
   });
   // 循环顺序合并相同分组条件的记录
   while (currentGroupByValues.equals(new GroupByValue(getCurrentResultSet(), selectStatement.getGroupByItems()).getGroupValues())) {
       // 归并聚合值
       aggregate(aggregationUnitMap);
       // 缓存当前记录到结果记录
       cacheCurrentRow();
       // 获取下一条记录
       result = super.next();
       if (!result) {
           break;
       }
   }
   // 设置当前记录的聚合字段结果
   setAggregationValueToCurrentRow(aggregationUnitMap);
   return result;
}
    
private void aggregate(final Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap) throws SQLException {
   for (Entry<AggregationSelectItem, AggregationUnit> entry : aggregationUnitMap.entrySet()) {
       List<Comparable<?>> values = new ArrayList<>(2);
       if (entry.getKey().getDerivedAggregationSelectItems().isEmpty()) { // SUM/COUNT/MAX/MIN 聚合列
           values.add(getAggregationValue(entry.getKey()));
       } else {
           for (AggregationSelectItem each : entry.getKey().getDerivedAggregationSelectItems()) { // AVG 聚合列
               values.add(getAggregationValue(each));
           }
       }
       entry.getValue().merge(values);
   }
}
    
private void cacheCurrentRow() throws SQLException {
   for (int i = 0; i < getCurrentResultSet().getMetaData().getColumnCount(); i++) {
       currentRow.add(getCurrentResultSet().getObject(i + 1));
   }
}
    
private Comparable<?> getAggregationValue(final AggregationSelectItem aggregationSelectItem) throws SQLException {
   Object result = getCurrentResultSet().getObject(aggregationSelectItem.getIndex());
   Preconditions.checkState(null == result || result instanceof Comparable, "Aggregation value must implements Comparable");
   return (Comparable<?>) result;
}
    
private void setAggregationValueToCurrentRow(final Map<AggregationSelectItem, AggregationUnit> aggregationUnitMap) {
   for (Entry<AggregationSelectItem, AggregationUnit> entry : aggregationUnitMap.entrySet()) {
       currentRow.set(entry.getKey().getIndex() - 1, entry.getValue().getResult()); // 获取计算结果
   }
}

5. GroupByMemoryResultSetMerger

GroupByMemoryResultSetMerger,基于内存分组归并结果集实现。

区别于 GroupByStreamResultSetMerger,其无法使用每个分片结果集的有序的特点,只能在内存中合并后,进行整个重新排序。因而,性能和内存都较 GroupByStreamResultSetMerger 会差。

主流程如下:


public final class GroupByMemoryResultSetMerger extends AbstractMemoryResultSetMerger {
    
    private final SelectStatement selectStatement;
    // 内存结果集
    private final Iterator<MemoryResultSetRow> memoryResultSetRows;
    
    public GroupByMemoryResultSetMerger(
            final Map<String, Integer> labelAndIndexMap, final List<ResultSet> resultSets, final SelectStatement selectStatement) throws SQLException {
        super(labelAndIndexMap);
        this.selectStatement = selectStatement;
        memoryResultSetRows = init(resultSets);
    }
    
    private Iterator<MemoryResultSetRow> init(final List<ResultSet> resultSets) throws SQLException {
        Map<GroupByValue, MemoryResultSetRow> dataMap = new HashMap<>(1024);
        Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap = new HashMap<>(1024);
        // 遍历结果集
        for (ResultSet each : resultSets) {
            while (each.next()) {
                // 生成分组条件
                GroupByValue groupByValue = new GroupByValue(each, selectStatement.getGroupByItems());
                // 初始化分组条件到 dataMap、aggregationMap 映射
                initForFirstGroupByValue(each, groupByValue, dataMap, aggregationMap);
                // 归并聚合值
                aggregate(each, groupByValue, aggregationMap);
            }
        }
        // 设置聚合列结果到内存记录
        setAggregationValueToMemoryRow(dataMap, aggregationMap);
        // 内存排序
        List<MemoryResultSetRow> result = getMemoryResultSetRows(dataMap);
        if (!result.isEmpty()) {
            // 设置当前 ResultSet,这样 #getValue() 能拿到记录
            setCurrentResultSetRow(result.get(0));
        }
        return result.iterator();
    }
    ...
}

#initForFirstGroupByValue()初始化分组条件到 dataMap,aggregationMap 映射中,这样可以调用#aggregate()将聚合值归并到 aggregationMap 里的该分组条件。

private void initForFirstGroupByValue(final ResultSet resultSet, final GroupByValue groupByValue, final Map<GroupByValue, MemoryResultSetRow> dataMap, 
                                          final Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap) throws SQLException {
        // 初始化分组条件到 dataMap
        if (!dataMap.containsKey(groupByValue)) {
            dataMap.put(groupByValue, new MemoryResultSetRow(resultSet));
        }
        // 初始化分组条件到 aggregationMap
        if (!aggregationMap.containsKey(groupByValue)) {
            Map<AggregationSelectItem, AggregationUnit> map = Maps.toMap(selectStatement.getAggregationSelectItems(), new Function<AggregationSelectItem, AggregationUnit>() {
                
                @Override
                public AggregationUnit apply(final AggregationSelectItem input) {
                    return AggregationUnitFactory.create(input.getType());
                }
            });
            aggregationMap.put(groupByValue, map);
        }
    }

聚合完每个分组条件后,将聚合列结果 aggregationMap 合并到 dataMap

    private void setAggregationValueToMemoryRow(final Map<GroupByValue, MemoryResultSetRow> dataMap, final Map<GroupByValue, Map<AggregationSelectItem, AggregationUnit>> aggregationMap) {
       for (Entry<GroupByValue, MemoryResultSetRow> entry : dataMap.entrySet()) { // 遍 历内存记录
           for (AggregationSelectItem each : selectStatement.getAggregationSelectItems()) { // 遍历 每个聚合列
               entry.getValue().setCell(each.getIndex(), aggregationMap.get(entry.getKey()).get(each).getResult());
           }
       }
    }

调用#getMemoryResultSetRows()方法对内存记录进行内存排序。

// GroupByMemoryResultSetMerger.java
private List<MemoryResultSetRow> getMemoryResultSetRows(final Map<GroupByValue, MemoryResultSetRow> dataMap) {
   List<MemoryResultSetRow> result = new ArrayList<>(dataMap.values());
   Collections.sort(result, new GroupByRowComparator(selectStatement, nullOrderType)); // 内存排序
   return result;
}

// GroupByRowComparator.java
private int compare(final MemoryResultSetRow o1, final MemoryResultSetRow o2, final List<OrderItem> orderItems) {
   for (OrderItem each : orderItems) {
       Object orderValue1 = o1.getCell(each.getIndex());
       Preconditions.checkState(null == orderValue1 || orderValue1 instanceof Comparable, "Order by value must implements Comparable");
       Object orderValue2 = o2.getCell(each.getIndex());
       Preconditions.checkState(null == orderValue2 || orderValue2 instanceof Comparable, "Order by value must implements Comparable");
       int result = ResultSetUtil.compareTo((Comparable) orderValue1, (Comparable) orderValue2, each.getType(), nullOrderType);
       if (0 != result) {
           return result;
       }
   }
   return 0;
}

总的来说,GROUP BY 内存归并和我们日常使用 Map 计算用户订单数是比较相似的。

5.1 #next()

@Override
public boolean next() throws SQLException {
   if (memoryResultSetRows.hasNext()) {
       setCurrentResultSetRow(memoryResultSetRows.next());
       return true;
   }
   return false;
}

内存归并完成后,使用memoryResultSetRows不断获得下一条记录。

6. IteratorStreamResultSetMerger

IteratorStreamResultSetMerger,基于 Stream 迭代归并结果集实现。

public final class IteratorStreamResultSetMerger extends AbstractStreamResultSetMerger {
    
    // ResultSet 数组迭代器
    private final Iterator<ResultSet> resultSets;
    
    public IteratorStreamResultSetMerger(final List<ResultSet> resultSets) {
        this.resultSets = resultSets.iterator();
       // 设置当前 ResultSet,这样 #getValue() 能拿到记录
        setCurrentResultSet(this.resultSets.next());
    }
    
    @Override
    public boolean next() throws SQLException {
        // 当前 ResultSet 迭代下一条记录
        if (getCurrentResultSet().next()) {
            return true;
        }
        if (!resultSets.hasNext()) {
            return false;
        }
        // 获得下一个ResultSet, 设置当前 ResultSet
        setCurrentResultSet(resultSets.next());
        boolean hasNext = getCurrentResultSet().next();
        if (hasNext) {
            return true;
        }
        while (!hasNext && resultSets.hasNext()) {
            setCurrentResultSet(resultSets.next());
            hasNext = getCurrentResultSet().next();
        }
        return hasNext;
    }
}

7. LimitDecoratorResultSetMerger

LimitDecoratorResultSetMerger,基于 Decorator 分页结果集归并实现。

public final class LimitDecoratorResultSetMerger extends AbstractDecoratorResultSetMerger {
    
    private final Limit limit;
    
    // 是否全部记录都跳过了,即无符合条件记录
    private final boolean skipAll;
    
    // 当前已返回行数
    private int rowNumber;
    
    public LimitDecoratorResultSetMerger(final ResultSetMerger resultSetMerger, final Limit limit) throws SQLException {
        super(resultSetMerger);
        this.limit = limit;
        skipAll = skipOffset();
    }
    
    private boolean skipOffset() throws SQLException {
        for (int i = 0; i < limit.getOffsetValue(); i++) {
            if (!getResultSetMerger().next()) {
                return true;
            }
        }
        rowNumber = 0;
        return false;
    }
    
    @Override
    public boolean next() throws SQLException {
        if (skipAll) {
            return false;
        }
        // 部分db 可以直 offset,不写 limit 行数,例如 oracle
        if (limit.getRowCountValue() < 0) {
            return getResultSetMerger().next();
        }
        // 获得下一条记录
        return ++rowNumber <= limit.getRowCountValue() && getResultSetMerger().next();
    }
}

LimitDecoratorResultSetMerger 可以对其他 ResultSetMerger 进行装饰,调用其他ResultSetMerger#next()不断获得下一条记录。

8. 结语

随着结果归并完成之后,整个 Sharding-JDBC 内部路程就讲解完毕了。本文完,但也未完。

跨分片事务问题。例如:

UPDATE t_order SET nickname = ? WHERE user_id = ?

A 分片 connection.commit() 时,应用突然挂了!B 分片 connection.commit() 还来不及执行(A、B 分片不在同库)。下一篇文章,我们将探讨其分布式事务机制。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,126评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,254评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,445评论 0 341
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,185评论 1 278
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,178评论 5 371
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,970评论 1 284
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,276评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,927评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,400评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,883评论 2 323
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,997评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,646评论 4 322
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,213评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,204评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,423评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,423评论 2 352
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,722评论 2 345