update类型的数据无法更新到目标库,通过追踪源码,发现同步源库的binlog日志中UPDATE类型的old参数为null,导致跳出更新程序,无法更新。
仔细检查了目标库的binlog是否开启,以及binlog的格式设置,以及binlog_row_image的设置,均满足官方要求。通过换服务器安装,换目标库等等的尝试发现都无果之后,走上了修改源码的道路。这里定位问题所在为源库MySQL的未知设置问题。
修改com.alibaba.otter.canal.client.adapter.rdb.service.RdbSyncService类中的update方法如下。
第一种方式对binlog中的old参数值不进行判断,直接对所有字段值进行update操作;
第二种方式兼容原有逻辑,如果有old参数值按照原有逻辑进行,如果没有按照上述第一种方式进行;
第一种方式
/**
* 更新操作
*
* @param config 配置项
* @param dml DML数据
*/
private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
Map<String, Object> data = dml.getData();
if (data == null || data.isEmpty()) {
return;
}
//这里先设置了插入,根据需求,可以自行选择是否需要。
insert(batchExecutor,config,dml);
//将old的判断去除。
// Map<String, Object> old = dml.getOld();
// if (old == null || old.isEmpty()) {
// return;
// }
//将下方的old调整为传过来的data.核心思想就是根据data解析字段为目标字段,并对目标库中的所有字段进行更新。
DbMapping dbMapping = config.getDbMapping();
Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
StringBuilder updateSql = new StringBuilder();
updateSql.append("UPDATE ").append(SyncUtil.getDbTableName(dbMapping)).append(" SET ");
// Set<String> columnsSet = data.keySet();
List<Map<String, ?>> values = new ArrayList<>();
boolean hasMatched = false;
// for (String srcColumnName : old.keySet()) {
for (String srcColumnName : data.keySet()) {
List<String> targetColumnNames = new ArrayList<>();
columnsMap.forEach((targetColumn, srcColumn) -> {
if (srcColumnName.equalsIgnoreCase(srcColumn)) {
targetColumnNames.add(targetColumn);
}
});
if (!targetColumnNames.isEmpty()) {
hasMatched = true;
for (String targetColumnName : targetColumnNames) {
updateSql.append("`").append(targetColumnName).append("`").append("=?, ");
Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
if (type == null) {
throw new RuntimeException("Target column: " + targetColumnName + " not matched");
}
BatchExecutor.setValue(values, type, data.get(srcColumnName));
}
}
}
if (!hasMatched) {
logger.warn("Did not matched any columns to update ");
return;
}
int len = updateSql.length();
updateSql.delete(len - 2, len).append(" WHERE ");
// 拼接主键
// appendCondition(dbMapping, updateSql, ctype, values, data, old);
appendCondition(dbMapping, updateSql, ctype, values, data, data);
batchExecutor.execute(updateSql.toString(), values);
if (logger.isTraceEnabled()) {
logger.trace("Update target table, sql: {}", updateSql);
}
}
第二种方式
/**
* 更新操作
*
* @param config 配置项
* @param dml DML数据
*/
private void update(BatchExecutor batchExecutor, MappingConfig config, SingleDml dml) throws SQLException {
Map<String, Object> data = dml.getData();
if (data == null || data.isEmpty()) {
return;
}
//这里用common作为公共变量存储data或者old
Map<String, Object> common = new ConcurrentHashMap<String, Object>();
Map<String, Object> old = dml.getOld();
if (old == null || old.isEmpty()) {
common = data;
}else{
common = old;
}
DbMapping dbMapping = config.getDbMapping();
Map<String, String> columnsMap = SyncUtil.getColumnsMap(dbMapping, data);
Map<String, Integer> ctype = getTargetColumnType(batchExecutor.getConn(), config);
StringBuilder updateSql = new StringBuilder();
updateSql.append("UPDATE ").append(SyncUtil.getDbTableName(dbMapping)).append(" SET ");
List<Map<String, ?>> values = new ArrayList<>();
boolean hasMatched = false;
// for (String srcColumnName : old.keySet()) {
for (String srcColumnName : common.keySet()) {
List<String> targetColumnNames = new ArrayList<>();
columnsMap.forEach((targetColumn, srcColumn) -> {
if (srcColumnName.equalsIgnoreCase(srcColumn)) {
targetColumnNames.add(targetColumn);
}
});
if (!targetColumnNames.isEmpty()) {
hasMatched = true;
for (String targetColumnName : targetColumnNames) {
updateSql.append("`").append(targetColumnName).append("`").append("=?, ");
Integer type = ctype.get(Util.cleanColumn(targetColumnName).toLowerCase());
if (type == null) {
throw new RuntimeException("Target column: " + targetColumnName + " not matched");
}
BatchExecutor.setValue(values, type, data.get(srcColumnName));
}
}
}
if (!hasMatched) {
logger.warn("Did not matched any columns to update ");
return;
int len = updateSql.length();
updateSql.delete(len - 2, len).append(" WHERE ");
// 拼接主键
// appendCondition(dbMapping, updateSql, ctype, values, data, old);
appendCondition(dbMapping, updateSql, ctype, values, data, common);
batchExecutor.execute(updateSql.toString(), values);
if (logger.isTraceEnabled()) {
logger.trace("Update target table, sql: {}", updateSql);
}
}