RW#run
轮询注册队列中是否有AbstractConnection
,若存在且为读事件则调用AbstractConnection#asynRead
异步读取数据,实际处理逻辑见NIOSocketWR#asynRead
-
NIOSocketWR#asynRead
从 前端连接的channel
中读取数据,并且保存到对应AbstractConnection
的readBuffer
中,之后调用AbstractConnection#onReadData
处理读取到的数据@Override public void asynRead() throws IOException { ByteBuffer theBuffer = con.readBuffer; if (theBuffer == null) { theBuffer = con.processor.getBufferPool().allocate(con.processor.getBufferPool().getChunkSize()); con.readBuffer = theBuffer; } // 从 SocketChannel 中读取数据,并且保存到 AbstractConnection 的 readBuffer 中,readBuffer 处于 write mode,返回读取了多少字节 int got = channel.read(theBuffer); // 调用处理读取到的数据的方法 con.onReadData(got); }
AbstractConnection#onReadData
读取readBuffer
中的数据并调用AbstractConnection#handle
方法进行下一步处理,其内部调用FrontendCommandHandler#handle
FrontendCommandHandler#handle
根据data[4]
来判断命令类型,客户端命令请求报文格式如下图:
data 的第五个字节存储命令类型,客户端命令请求报文命令类型详情表见附录1。我们以 MySQLPacket.COM_QUERY
为例进行接下来的讨论。当 data[4] == MySQLPacket.COM_QUERY
时,调用 FrontendConnection#query(byte[])
public void handle(byte[] data) {
// 判断命令类型
switch (data[4]) {
...
// INSERT/SELECT/UPDATE/DELETE 等 SQL 归属于 MySQLPacket.COM_QUERY
case MySQLPacket.COM_QUERY:
commands.doQuery();
source.query(data);
break;
...
}
}
-
FrontendConnection#query(byte[])
将data
字节数组转化成String
类型的 SQL,ServerQueryHandler#query(String)
方法public void query(byte[] data) { MySQLMessage mm = new MySQLMessage(data); // 从 data[5] 即第六个字节开始读取参数体 mm.position(5); String sql = mm.readString(charset); // 执行 sql 语句,内部调用 ServerQueryHandler#query(String) this.query( sql ); }
-
ServerQueryHandler#query(String)
解析 SQL 类型,根据sqlType
使用不同的Handler
做处理@Override public void query(String sql) { ServerConnection c = this.source; /* 解析 SQL 类型 */ int rs = ServerParse.parse(sql); int sqlType = rs & 0xff; switch (sqlType) { // explain2 datanode=? sql=? case ServerParse.EXPLAIN2: Explain2Handler.handle(sql, c, rs >>> 8); break; case ServerParse.SELECT: SelectHandler.handle(sql, c, rs >>> 8); break; case ... default: if (readOnly) { LOGGER.warn(new StringBuilder().append("User readonly:").append(sql).toString()); c.writeErrMessage(ErrorCode.ER_USER_READ_ONLY, "User readonly"); break; } c.execute(sql, rs & 0xff); } }
-
例如
sqlType == ServerParse.SELECT
时使用SelectHandler
做进一步处理public static void handle(String stmt, ServerConnection c, int offs) { int offset = offs; c.setExecuteSql(null); switch (ServerParseSelect.parse(stmt, offs)) { case ServerParseSelect.DATABASE: SelectDatabase.response(c); break; case ServerParseSelect.USER: SelectUser.response(c); break; case ... default: c.setExecuteSql(stmt); c.execute(stmt, ServerParse.SELECT); } }
-
SelectHandler
进一步解析select
语句,针对不同的select
进行不同的处理,默认直接调用ServerConnection#execute(java.lang.String, int)
,该方法首先进行一些常规检查(连接状态检查、事务状态检查、当前 DB 检查等),然后调用ServerConnection#routeEndExecuteSQL
进行路由计算(包括全局序列号、SQL 语句拦截等。路由计算详细另述)并得到路由结果RouteResultset
,之后调用NonBlockingSession#execute
public void routeEndExecuteSQL(String sql, final int type, final SchemaConfig schema) { // 路由计算 RouteResultset rrs = MycatServer .getInstance() .getRouterservice() .route(MycatServer.getInstance().getConfig().getSystem(), schema, type, sql, this.charset, this); if (rrs != null) { // session 执行 session.execute(rrs, rrs.isSelectForUpdate() ? ServerParse.UPDATE : type); } }
-
NonBlockingSession#execute
获取路由的dataNode
节点,若节点数为 1 则调用SingleNodeHandler#execute
处理 sql,否则调用MultiNodeQueryHandler#execute
处理 SQL。此处我们假定前端 SQL 命令只路由到一个 dataNode,则调用SingleNodeHandler#execute
处理 SQL/** * NonBlockingSession#execute */ @Override public void execute(RouteResultset rrs, int type) { RouteResultsetNode[] nodes = rrs.getNodes(); if (nodes.length == 1) { singleNodeHandler = new SingleNodeHandler(rrs, this); singleNodeHandler.execute(); } else { multiNodeHandler = new MultiNodeQueryHandler(type, rrs, autocommit, this); multiNodeHandler.execute(); } }
-
SingleNodeHandler#execute
获取后端连接BackendConnection
,并调用SingleNodeHandler#_execute
,该方法直接调用BackendConnection#execute
public void execute() throws Exception { // 获取后端数据库连接 final BackendConnection conn = session.getTarget(node); // 若存在 dataNode 对应的 BackendConnection if (session.tryExistsCon(conn, node)) { _execute(conn); } else { // create new connection do something... } } private void _execute(BackendConnection conn) { conn.execute(node, session.getSource(), session.getSource().isAutocommit()); }
-
当 schema.xml 中配置
<dataHost>
的dbDriver=='jdbc'
时,调用JDBCConnection#execute
处理 SQL(JDBCConnection
继承BackendConnection
)。该方法新开一个线程处理 SQL,最终调用JDBCConnection#ouputResultSet
执行 SQL 并将结果写入ServerConnection
// JDBCConnection.class @Override public void execute(final RouteResultsetNode node, final ServerConnection source, final boolean autocommit) { this.sqlSelectLimit = source.getSqlSelectLimit(); Runnable runnable = new Runnable() { @Override public void run() { // 调用 JDBCConnection#ouputResultSet executeSQL(node, source, autocommit); } }; MycatServer.getInstance().getBusinessExecutor().execute(runnable); }
-
JDBCConnection#ouputResultSet
获取数据库连接并执行 SQL,然后将得到的结果集ResultSet
解析为 ResultSet 响应报文并写入ServerConnection
private void ouputResultSet(ServerConnection sc, String sql) throws SQLException { ResultSet rs = null; Statement stmt = null; try { stmt = con.createStatement(); rs = stmt.executeQuery(sql); List<FieldPacket> fieldPks = new LinkedList<FieldPacket>(); // 根据 resultset 加载列信息,保存至 fieldPks ResultSetUtil.resultSetToFieldPacket(sc.getCharset(), fieldPks, rs, this.isSpark); // 获取列数 int colunmCount = fieldPks.size(); ByteBuffer byteBuf = sc.allocate(); /* 1 写入 resultset header packet */ ResultSetHeaderPacket headerPkg = new ResultSetHeaderPacket(); headerPkg.fieldCount = fieldPks.size(); headerPkg.packetId = ++packetId; // 将 ResultSetHeaderPacket 的数据写入 byteBuf byteBuf = headerPkg.write(byteBuf, sc, true); byteBuf.flip(); byte[] header = new byte[byteBuf.limit()]; // 将 byteBuf 中的信息写入 header 中 byteBuf.get(header); // byteBuf 标记归位 byteBuf.clear(); /* 2 写入 field packet */ List<byte[]> fields = new ArrayList<byte[]>(fieldPks.size()); Iterator<FieldPacket> itor = fieldPks.iterator(); while (itor.hasNext()) { FieldPacket curField = itor.next(); curField.packetId = ++packetId; // 将 FieldPacket 的数据写入 byteBuf byteBuf = curField.write(byteBuf, sc, false); // position 设回 0,并将 limit 设成之前的 position 的值 // limit:缓冲区数组中不可操作的下一个元素的位置:limit<=capacity byteBuf.flip(); byte[] field = new byte[byteBuf.limit()]; // 将 byteBuf 中的信息写入 field 中 byteBuf.get(field); byteBuf.clear(); // 将 field 放入 fields fields.add(field); } /* 3 写入 EOF packet */ EOFPacket eofPckg = new EOFPacket(); eofPckg.packetId = ++packetId; // 将 EOFPacket 的数据写入 byteBuf byteBuf = eofPckg.write(byteBuf, sc, false); byteBuf.flip(); byte[] eof = new byte[byteBuf.limit()]; // 将 byteBuf 中的信息写入 eof 中 byteBuf.get(eof); byteBuf.clear(); this.respHandler.fieldEofResponse(header, fields, eof, this); /* 4 写入 Row Data packet */ // output row while (rs.next()) { ResultSetMetaData resultSetMetaData = rs.getMetaData(); int size = resultSetMetaData.getColumnCount(); StringBuilder builder = new StringBuilder(); for (int i = 1; i <= size; i++) { builder.append(resultSetMetaData.getColumnName(i) + "=" + rs.getString(i)); if (i < size) { builder.append(", "); } } LOGGER.debug("JDBCConnection.ouputResultSet sql: {}, resultSet: {}", sql, builder.toString()); RowDataPacket curRow = new RowDataPacket(colunmCount); for (int i = 0; i < colunmCount; i++) { int j = i + 1; if (MysqlDefs.isBianry((byte) fieldPks.get(i).type)) { curRow.add(rs.getBytes(j)); } else if (fieldPks.get(i).type == MysqlDefs.FIELD_TYPE_DECIMAL || fieldPks.get(i).type == (MysqlDefs.FIELD_TYPE_NEW_DECIMAL - 256)) { // field type is unsigned byte // ensure that do not use scientific notation format BigDecimal val = rs.getBigDecimal(j); curRow.add(StringUtil.encode(val != null ? val.toPlainString() : null, sc.getCharset())); } else { curRow.add(StringUtil.encode(rs.getString(j), sc.getCharset())); } } curRow.packetId = ++packetId; // 将 RowDataPacket 的数据写入 byteBuf byteBuf = curRow.write(byteBuf, sc, false); byteBuf.flip(); byte[] row = new byte[byteBuf.limit()]; byteBuf.get(row); byteBuf.clear(); this.respHandler.rowResponse(row, this); } fieldPks.clear(); // end row /* 5 写入 EOF packet */ eofPckg = new EOFPacket(); eofPckg.packetId = ++packetId; byteBuf = eofPckg.write(byteBuf, sc, false); byteBuf.flip(); eof = new byte[byteBuf.limit()]; byteBuf.get(eof); sc.recycle(byteBuf); this.respHandler.rowEofResponse(eof, this); } finally { if (rs != null) { try { rs.close(); } catch (SQLException e) { } } if (stmt != null) { try { stmt.close(); } catch (SQLException e) { } } } }