开篇
这篇文章的目的主要是讲解RM的执行回滚的Executor对象即undoExecutor,执行回滚日志就是由undoExecutor去执行的。
undoExecutor源码分析
public class UndoExecutorFactory {
public static AbstractUndoExecutor getUndoExecutor(String dbType, SQLUndoLog sqlUndoLog) {
if (!dbType.equals(JdbcConstants.MYSQL)) {
throw new NotSupportYetException(dbType);
}
switch (sqlUndoLog.getSqlType()) {
case INSERT:
return new MySQLUndoInsertExecutor(sqlUndoLog);
case UPDATE:
return new MySQLUndoUpdateExecutor(sqlUndoLog);
case DELETE:
return new MySQLUndoDeleteExecutor(sqlUndoLog);
default:
throw new ShouldNeverHappenException();
}
}
}
说明:
- UndoExecutorFactory负责根据不同的回滚日志返回对应的undoExecutor对象。
说明:
- AbstractUndoExecutor作为回滚类的抽象基类。
- MySQLUndoDeleteExecutor负责回滚delete操作。
- MySQLUndoInsertExecutor负责回滚insert操作。
- MySQLUndoUpdateExecutor负责回滚update操作。
AbstractUndoExecutor
public abstract class AbstractUndoExecutor {
protected SQLUndoLog sqlUndoLog;
protected abstract String buildUndoSQL();
public AbstractUndoExecutor(SQLUndoLog sqlUndoLog) {
this.sqlUndoLog = sqlUndoLog;
}
public void executeOn(Connection conn) throws SQLException {
dataValidation(conn);
try {
// 拼接undoSql的模板
String undoSQL = buildUndoSQL();
// 获取PreparedStatement对象
PreparedStatement undoPST = conn.prepareStatement(undoSQL);
// 获取回滚的记录
TableRecords undoRows = getUndoRows();
// 遍历所有待回滚的记录然后一条条的拼接字段
for (Row undoRow : undoRows.getRows()) {
ArrayList<Field> undoValues = new ArrayList<>();
Field pkValue = null;
for (Field field : undoRow.getFields()) {
if (field.getKeyType() == KeyType.PrimaryKey) {
pkValue = field;
} else {
undoValues.add(field);
}
}
// 针对每一条回滚记录进行准备
undoPrepare(undoPST, undoValues, pkValue);
// 执行回滚操作
undoPST.executeUpdate();
}
} catch (Exception ex) {
if (ex instanceof SQLException) {
throw (SQLException) ex;
} else {
throw new SQLException(ex);
}
}
}
protected void undoPrepare(PreparedStatement undoPST,
ArrayList<Field> undoValues,
Field pkValue) throws SQLException {
int undoIndex = 0;
for (Field undoValue : undoValues) {
undoIndex++;
undoPST.setObject(undoIndex, undoValue.getValue(), undoValue.getType());
}
// PK is at last one.
// INSERT INTO a (x, y, z, pk) VALUES (?, ?, ?, ?)
// UPDATE a SET x=?, y=?, z=? WHERE pk = ?
// DELETE FROM a WHERE pk = ?
undoIndex++;
undoPST.setObject(undoIndex, pkValue.getValue(), pkValue.getType());
}
protected abstract TableRecords getUndoRows();
protected void dataValidation(Connection conn) throws SQLException {
// Validate if data is dirty.
}
}
说明:
- AbstractUndoExecutor定义了回滚操作的整个命令行模板流程。
- 拼接undoSql的模板,buildUndoSQL()。
- 获取PreparedStatement对象,conn.prepareStatement(undoSQL)。
- 遍历所有待回滚的记录然后一条条的拼接字段。
- 针对每一条回滚记录进行准备,undoPrepare(undoPST, undoValues, pkValue)。
- 执行回滚操作,undoPST.executeUpdate()。
- buildUndoSQL()和getUndoRows()由子类具体实现。
MySQLUndoInsertExecutor
public class MySQLUndoInsertExecutor extends AbstractUndoExecutor {
@Override
protected String buildUndoSQL() {
TableRecords afterImage = sqlUndoLog.getAfterImage();
List<Row> afterImageRows = afterImage.getRows();
if (afterImageRows == null || afterImageRows.size() == 0) {
throw new ShouldNeverHappenException("Invalid UNDO LOG");
}
Row row = afterImageRows.get(0);
StringBuffer mainSQL = new StringBuffer(
"DELETE FROM " + sqlUndoLog.getTableName());
StringBuffer where = new StringBuffer(" WHERE ");
boolean first = true;
for (Field field : row.getFields()) {
if (field.getKeyType() == KeyType.PrimaryKey) {
where.append(field.getName() + " = ? ");
}
}
return mainSQL.append(where).toString();
}
@Override
protected void undoPrepare(PreparedStatement undoPST,
ArrayList<Field> undoValues, Field pkValue)
throws SQLException {
undoPST.setObject(1, pkValue.getValue(), pkValue.getType());
}
public MySQLUndoInsertExecutor(SQLUndoLog sqlUndoLog) {
super(sqlUndoLog);
}
@Override
protected TableRecords getUndoRows() {
return sqlUndoLog.getAfterImage();
}
}
说明:
- Insert的回滚操作在于逆向进行delete操作,MySQLUndoInsertExecutor负责拼接delete的SQL。
- delete的SQL的where条件就是insert生成的主键primary key。
- 整个回滚操作在父类AbstractUndoExecutor定义。
MySQLUndoDeleteExecutor
public class MySQLUndoDeleteExecutor extends AbstractUndoExecutor {
public MySQLUndoDeleteExecutor(SQLUndoLog sqlUndoLog) {
super(sqlUndoLog);
}
@Override
protected String buildUndoSQL() {
TableRecords beforeImage = sqlUndoLog.getBeforeImage();
List<Row> beforeImageRows = beforeImage.getRows();
if (beforeImageRows == null || beforeImageRows.size() == 0) {
throw new ShouldNeverHappenException("Invalid UNDO LOG");
}
Row row = beforeImageRows.get(0);
StringBuffer insertColumns = new StringBuffer();
StringBuffer insertValues = new StringBuffer();
Field pkField = null;
boolean first = true;
for (Field field : row.getFields()) {
if (field.getKeyType() == KeyType.PrimaryKey) {
pkField = field;
continue;
} else {
if (first) {
first = false;
} else {
insertColumns.append(", ");
insertValues.append(", ");
}
insertColumns.append(field.getName());
insertValues.append("?");
}
}
if (first) {
first = false;
} else {
insertColumns.append(", ");
insertValues.append(", ");
}
insertColumns.append(pkField.getName());
insertValues.append("?");
return "INSERT INTO " + sqlUndoLog.getTableName()
+ "(" + insertColumns.toString() + ")
VALUES (" + insertValues.toString() + ")";
}
@Override
protected TableRecords getUndoRows() {
return sqlUndoLog.getBeforeImage();
}
}
说明:
- Delete的回滚操作在于逆向进行Insert操作,MySQLUndoDeleteExecutor负责拼接Insert的SQL。
- Insert的拼接的SQL是insert tableName (column1,column2) values (?,?).
- 整个回滚操作在父类AbstractUndoExecutor定义。
MySQLUndoUpdateExecutor
public class MySQLUndoUpdateExecutor extends AbstractUndoExecutor {
@Override
protected String buildUndoSQL() {
TableRecords beforeImage = sqlUndoLog.getBeforeImage();
List<Row> beforeImageRows = beforeImage.getRows();
if (beforeImageRows == null || beforeImageRows.size() == 0) {
throw new ShouldNeverHappenException("Invalid UNDO LOG"); // TODO
}
Row row = beforeImageRows.get(0);
StringBuffer mainSQL = new StringBuffer(
"UPDATE " + sqlUndoLog.getTableName() + " SET ");
StringBuffer where = new StringBuffer(" WHERE ");
boolean first = true;
for (Field field : row.getFields()) {
if (field.getKeyType() == KeyType.PrimaryKey) {
where.append(field.getName() + " = ?");
} else {
if (first) {
first = false;
} else {
mainSQL.append(", ");
}
mainSQL.append(field.getName() + " = ?");
}
}
return mainSQL.append(where).toString();
}
public MySQLUndoUpdateExecutor(SQLUndoLog sqlUndoLog) {
super(sqlUndoLog);
}
@Override
protected TableRecords getUndoRows() {
return sqlUndoLog.getBeforeImage();
}
}
说明:
- update的回滚操作在于逆向进行update操作,MySQLUndoUpdateExecutor负责拼接update的SQL。
- Insert的拼接的SQL是update tableName set column1=? where column=?。
- 整个回滚操作在父类AbstractUndoExecutor定义。