奇妙的 JDBC batch insert 到 Doris 异常的问题

遇到一个很奇怪的异常,通过 JDBC batch insert 时,会报 Unknown command(27) 的异常。

exception.png

而且这个问题很容易复现,复现例子:

  1. 建表语句
create table t_selection_test (
    a VARCHAR(96),
    b VARCHAR(20),
    c VARCHAR(96),
    d DECIMAL(38, 10)
) DUPLICATE KEY (a)
DISTRIBUTED BY HASH(a) BUCKETS 10
PROPERTIES (
    "replication_num" = "1"
);
  1. 写入代码
Connection conn = DriverManager.getConnection(
        "jdbc:mysql://127.0.0.1:9030/test?rewriteBatchedStatements=true",
        "root", "");
String sql = "INSERT INTO t_select_test (a, b, c, d) VALUES(?, ?, ?, ?)";
PreparedStatement ps = conn.prepareStatement(sql);

for (int i = 0; i < 4; i++) {
    ps.setString(1, "a");
    ps.setString(2, "b");
    ps.setString(3, "c");
    ps.setBigDecimal(4, BigDecimal.TEN);
    ps.addBatch();
}

ps.executeBatch();

最开始怀疑是 Doris 的问题,查看 Fe 源码发现,Mysql 解析的代码在 org.apache.doris.qe.MysqlConnectProcessor#dispatch 方法

private void dispatch() throws IOException {
    int code = packetBuf.get();
    MysqlCommand command = MysqlCommand.fromCode(code);
    if (command == null) {
        ErrorReport.report(ErrorCode.ERR_UNKNOWN_COM_ERROR);
        ctx.getState().setError(ErrorCode.ERR_UNKNOWN_COM_ERROR, "Unknown command(" + code + ")");
        LOG.warn("Unknown command(" + code + ")");
        return;
    }

    ......

}

其中 MysqlCommand 中确实没有 code 为 27 的命令定义,查看 MySQL 的命令报文定义,发现 27(16 进制的 0x1B)为 COM_SET_OPTION 指令,用于打开或关闭多语句执行,具体定义如下:

只有两个枚举值

  • MYSQL_OPTION_MULTI_STATEMENTS_ON - 1
  • MYSQL_OPTION_MULTI_STATEMENTS_OFF - 0

这个时候就感觉可能是 mysql-connector-java 的问题,又试了几个版本的 connector 包,最终发现在 8.0.28 以及之前的版本会有这个问题,然后就开始了我们的 debug 之旅。

问题分析

版本众多,我们就只来看看问题分界的两个版本:8.0.28 和 8.0.29

8.0.28

首先我们通过测试代码中的 ps.executeBatch() 方法进入

调用的是实现类方法 com.mysql.cj.jdbc.StatementImpl#executeBatch

public int[] executeBatch() throws SQLException {
    return Util.truncateAndConvertToInt(executeBatchInternal());
}

进来直接调用 executeBatchInternal 方法,实际是调用的实现类方法 com.mysql.cj.jdbc.ClientPreparedStatement#executeBatchInternal

protected long[] executeBatchInternal() throws SQLException {
    synchronized (checkClosed().getConnectionMutex()) {

        ......

        try {
            
            ......

            if (!this.batchHasPlainStatements && this.rewriteBatchedStatements.getValue()) {
                                // line 425
                if (getParseInfo().canRewriteAsMultiValueInsertAtSqlLevel()) {
                    return executeBatchedInserts(batchTimeout);
                }

                if (!this.batchHasPlainStatements && this.query.getBatchedArgs() != null
                        && this.query.getBatchedArgs().size() > 3 /* cost of option setting rt-wise */) {
                  // line 431  
                  return executePreparedBatchAsMultiStatement(batchTimeout);
                }
            }

            return executeBatchSerially(batchTimeout);
        } finally {
            this.query.getStatementExecuting().set(false);

            clearBatch();
        }
    }
}

在 425 行 canRewriteAsMultiValueInsertAtSqlLevel 判断为 false,继续向下执行(这个地方很重要,我们后面分析)

然后会最终走到 431 行的 executePreparedBatchAsMultiStatement 方法中

protected long[] executePreparedBatchAsMultiStatement(int batchTimeout) throws SQLException {
    synchronized (checkClosed().getConnectionMutex()) {
        
        ......

        try {
            
            ......

            try {
                if (!multiQueriesEnabled) {
                    // line 494
                    ((NativeSession) locallyScopedConn.getSession()).enableMultiQueries();
                }

                ......

            } finally {
                if (batchedStatement != null) {
                    batchedStatement.close();
                    batchedStatement = null;
                }
            }

            ......
              
    }
}

然后我们继续执行,直到执行 494 行的 enableMultiQueries 异常发生

然后我们再次 debug 进入到这个方法,看看它干了什么

public void enableMultiQueries() {
    sendCommand(this.commandBuilder.buildComSetOption(((NativeProtocol) this.protocol).getSharedSendPacket(), 0), false, 0);
    ((NativeServerSession) getServerSession()).preserveOldTransactionState();
}

诶,就是它了,buildComSetOption ,顾名思义,构建了 COM_SET_OPTION 命令并发送。

找到了罪魁祸首,我们就来看看为什么他要发送这个指令

回到 425 行的这个判断,为什么明明我们在 JDBC url 上面开启了 rewriteBatchedStatements,但是却没有执行呢?

我们点开 canRewriteAsMultiValueInsertAtSqlLevel 方法,发现只是简单的返回了 ParseInfocanRewriteAsMultiValueInsert 属性,默认值为 false, 那我们看看这个属性有没有被赋值

public boolean canRewriteAsMultiValueInsertAtSqlLevel() {
    return this.canRewriteAsMultiValueInsert;
}

经过 Debug 发现,在ClientPreparedStatement 初始化时,调用 com.mysql.cj.ParseInfo#ParseInfo(java.lang.String, com.mysql.cj.Session, java.lang.String) 构造方法后再调用com.mysql.cj.ParseInfo#ParseInfo(java.lang.String, com.mysql.cj.Session, java.lang.String, boolean) 这个构造方法构建 ParseInfo 对象

public ParseInfo(String sql, Session session, String encoding) {
    this(sql, session, encoding, true);
}

public ParseInfo(String sql, Session session, String encoding, boolean buildRewriteInfo) {
    
    ......

    // line 185
    if (buildRewriteInfo) {
        this.canRewriteAsMultiValueInsert = this.numberOfQueries == 1 && !this.parametersInDuplicateKeyClause
                && canRewrite(sql, this.isOnDuplicateKeyUpdate, this.locationOfOnDuplicateKeyUpdate, this.statementStartPos);
        if (this.canRewriteAsMultiValueInsert && session.getPropertySet().getBooleanProperty(PropertyKey.rewriteBatchedStatements).getValue()) {
            buildRewriteBatchedParams(sql, session, encoding);
        }
    }
}

然后我们继续向下看

在 185 行,由于buildRewriteInfotrue,进入到 if 语句中

canRewriteAsMultiValueInsert 为 3 个判断条件的的与值, 其中:

  1. this.numberOfQueries == 1 判断是不是只有一个语句,这个当然为 true

  2. !this.parametersInDuplicateKeyClause 判断参数不在 ON DUPLICATE KEY 的语法中,这个也为 true

  3. canRewrite 这个方法我们跟进去看

    protected static boolean canRewrite(String sql, boolean isOnDuplicateKeyUpdate, int locationOfOnDuplicateKeyUpdate, int statementStartPos) {
        // Needs to be INSERT or REPLACE.
        // Can't have INSERT ... SELECT or INSERT ... ON DUPLICATE KEY UPDATE with an id=LAST_INSERT_ID(...).
    
        if (StringUtils.startsWithIgnoreCaseAndWs(sql, "INSERT", statementStartPos)) {
             // line 660
            if (StringUtils.indexOfIgnoreCase(statementStartPos, sql, "SELECT", OPENING_MARKERS, CLOSING_MARKERS, SearchMode.__MRK_COM_MYM_HNT_WS) != -1) {
                return false;
            }
            if (isOnDuplicateKeyUpdate) {
                int updateClausePos = StringUtils.indexOfIgnoreCase(locationOfOnDuplicateKeyUpdate, sql, " UPDATE ");
                if (updateClausePos != -1) {
                    return StringUtils.indexOfIgnoreCase(updateClausePos, sql, "LAST_INSERT_ID", OPENING_MARKERS, CLOSING_MARKERS,
                            SearchMode.__MRK_COM_MYM_HNT_WS) == -1;
                }
            }
            return true;
        }
    
        return StringUtils.startsWithIgnoreCaseAndWs(sql, "REPLACE", statementStartPos)
                && StringUtils.indexOfIgnoreCase(statementStartPos, sql, "SELECT", OPENING_MARKERS, CLOSING_MARKERS, SearchMode.__MRK_COM_MYM_HNT_WS) == -1;
    }
    

有注释提示,只有 INSERT 或 REPLACE 语句才能被重写,而 INSERT SELECTINSERT ON DUPLICATE KEY UPDATE 语法不能被重写

这个 SELECT 是不是有点似曾相识? 我们接着往下走

直到走到 660 行这个判断,然后返回了 false

恍然大悟,这个地方判断 sql 是不是 INSERT SELECT 语句,因为我们表名中包含 select 字串,所以这个 StringUtils.indexOfIgnoreCase 方法返回值不是 -1

SearchMode.__MRK_COM_MYM_HNT_WS 配合第 4、5 两个参数,不匹配两个标记所引用的内容,这也就是为什么表名通过重音标号引用后没有出现异常

出现问题的地方找到了,我们再来看执行正常的版本是如何处理的

8.0.29

还是通过测试代码中的 ps.executeBatch() 方法进入

一样走到com.mysql.cj.jdbc.ClientPreparedStatement#executeBatchInternal

protected long[] executeBatchInternal() throws SQLException {
    synchronized (checkClosed().getConnectionMutex()) {

        ......

        try {
            
          ......
          
            if (!this.batchHasPlainStatements && this.rewriteBatchedStatements.getValue()) {
                                // line 408
                if (getQueryInfo().isRewritableWithMultiValuesClause()) {
                    return executeBatchedInserts(batchTimeout);
                }
                                
                if (!this.batchHasPlainStatements && this.query.getBatchedArgs() != null
                        && this.query.getBatchedArgs().size() > 3 /* cost of option setting rt-wise */) {
                    return executePreparedBatchAsMultiStatement(batchTimeout);
                }
            }

            return executeBatchSerially(batchTimeout);
        } finally {
            this.query.getStatementExecuting().set(false);

            clearBatch();
        }
    }
}

走到 408 行,isRewritableWithMultiValuesClause 判断为 true,调用 executeBatchedInserts 执行批量写入,正常结束返回

这个地方和 8.0.28 版本明显不一样了,我们来看看 isRewritableWithMultiValuesClause 是如何判断的

这块也是相似的直接返回了 isRewritableWithMultiValuesClause 成员变量的值

public boolean isRewritableWithMultiValuesClause() {
    return this.isRewritableWithMultiValuesClause;
}

debug 看到在构建 ClientPreparedStatement 时, 通过 com.mysql.cj.QueryInfo#QueryInfo(java.lang.String, com.mysql.cj.Session, java.lang.String) 构造方法构造 QueryInfo 对象时进行了赋值

public QueryInfo(String sql, Session session, String encoding) {
    
    ......
    // line 97
    boolean rewriteBatchedStatements = session.getPropertySet().getBooleanProperty(PropertyKey.rewriteBatchedStatements).getValue();
    ......

    // Skip comments at the beginning of queries.
    this.queryStartPos = strInspector.indexOfNextAlphanumericChar();
    if (this.queryStartPos == -1) {
        this.queryStartPos = this.queryLength;
    } else {
        this.numberOfQueries = 1;
        this.statementFirstChar = Character.toUpperCase(strInspector.getChar());
    }

    // Only INSERT and REPLACE statements support multi-values clause rewriting.
    // line 127
    boolean isInsert = strInspector.matchesIgnoreCase(INSERT_STATEMENT) != -1;
    if (isInsert) {
        strInspector.incrementPosition(INSERT_STATEMENT.length()); // Advance to the end of "INSERT".
    }
    boolean isReplace = !isInsert && strInspector.matchesIgnoreCase(REPLACE_STATEMENT) != -1;
    if (isReplace) {
        strInspector.incrementPosition(REPLACE_STATEMENT.length()); // Advance to the end of "REPLACE".
    }

    // Check if the statement has potential to be rewritten as a multi-values clause statement, i.e., if it is an INSERT or REPLACE statement and
    // 'rewriteBatchedStatements' is enabled.
    boolean rewritableAsMultiValues = (isInsert || isReplace) && rewriteBatchedStatements;

    ......
    // complex grammar check
    
    // line 271
    this.isRewritableWithMultiValuesClause = rewritableAsMultiValues;

    ......
}

首先在 97 行,获取到 session 中设置的 rewriteBatchedStatements 属性

在 127 行,判断是不是 INSERTREPLACE 语句,并且开启了 rewriteBatchedStatements 属性,

然后进行了一系列复杂的匹配,来判断是否为 INSERT INTO VALUES 这样的语法,并且带有 ? 占位符

最后在 271 行把判断结果赋值给 isRewritableWithMultiValuesClause

总结

所以,这个异常的问题就在于,判断是否可以重写 batch statement sql 的时候,对于 INSERT INTO VALUES 语法的判断上,8.0.28 以及更早的版本的逻辑不够严谨,8.0.29 上面进行了更加复杂的判断,从而避免了这样的错误

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容