当使用 cannal adapter 时候 ,esMapping sql 使我感到疑惑 ,binlog 消息 通过这个sql 做了聚合 。 但是 下面这个sql 不是做了查询所有数据? 然后在同步到es中去的?
dataSourceKey: defaultDS # 源数据源的key, 对应上面配置的srcDataSources中的值
destination: example # cannal的instance或者MQ的topic
esMapping:
_index: mytest_user # es 的索引名称
_type: _doc # es 的doc名称
_id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
# pk: id # 如果不需要_id, 则需要指定一个属性为主键属性
# sql映射
sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
a.c_time as _c_time, c.labels as _labels from user a
left join role b on b.id=a.role_id
left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
group by user_id) c on c.user_id=a.id"
# objFields:
# _labels: array:; # 数组或者对象属性, array:; 代表以;字段里面是以;分隔的
# _obj: obj:{"test":"123"}
etlCondition: "where a.c_time>='{0}'" # etl 的条件参数
commitBatch: 3000 # 提交批大小
一路 从 CanalAdapterLoader , CanalAdapterWorker ,AbstractCanalAdapterWorker,ESAdapter ,最终在 ESSyncService sync 方法找到了答案 , 阿里同学的代码 写的还是很容易看懂的 。 注意下面方法 注释
// dml ,binlog msg 转化而成 , config 即是你配置的Es config 文件的配置类
// binlog msg 最终转化为 对应的es insert , update , delete 操作
public void sync(ESSyncConfig config, Dml dml) {
try {
// 如果是按时间戳定时更新则返回
if (config.getEsMapping().isSyncByTimestamp()) {
return;
}
long begin = System.currentTimeMillis();
String type = dml.getType();
if (type != null && type.equalsIgnoreCase("INSERT")) {
insert(config, dml);
} else if (type != null && type.equalsIgnoreCase("UPDATE")) {
update(config, dml);
} else if (type != null && type.equalsIgnoreCase("DELETE")) {
delete(config, dml);
} else {
return;
}
if (logger.isTraceEnabled()) {
logger.trace("Sync elapsed time: {} ms,destination: {}, es index: {}",
(System.currentTimeMillis() - begin),
dml.getDestination(),
config.getEsMapping().get_index());
}
} catch (Throwable e) {
logger.error("sync error, es index: {}, DML : {}", config.getEsMapping().get_index(), dml);
throw new RuntimeException(e);
}
}
insert 方法 , 如果 esMapping sql 是单表 所有字段都为简单字段且 binlog msg 对应 sql 主表 ,会调用 singleTableSimpleFiledInsert ,直接插入 es 对应索引中去 ,如果binlog msg 对应的是 sql 主表且 非简单字段 ,则 会从esMapping 拿到主键(列表) 然后 和sql 中 , msg匹配后 拼接后 ,查询 sql , 插入es ,如果 binlog msg 对应的是sql 的 从表 ,有3种更新方法 ,有点小看阿里同学了 ,这块设计,我之前没想到 。
/**
* 插入操作dml
*
* @param config es配置
* @param dml dml数据
*/
private void insert(ESSyncConfig config, Dml dml) {
List<Map<String, Object>> dataList = dml.getData();
if (dataList == null || dataList.isEmpty()) {
return;
}
SchemaItem schemaItem = config.getEsMapping().getSchemaItem();
for (Map<String, Object> data : dataList) {
if (data == null || data.isEmpty()) {
continue;
}
if (schemaItem.getAliasTableItems().size() == 1 && schemaItem.isAllFieldsSimple()) {
// ------单表 & 所有字段都为简单字段------
singleTableSimpleFiledInsert(config, dml, data);
} else {
// ------是主表 查询sql来插入------
if (schemaItem.getMainTable().getTableName().equalsIgnoreCase(dml.getTable())) {
mainTableInsert(config, dml, data);
}
// 从表的操作
for (TableItem tableItem : schemaItem.getAliasTableItems().values()) {
if (tableItem.isMain()) {
continue;
}
if (!tableItem.getTableName().equals(dml.getTable())) {
continue;
}
// 关联条件出现在主表查询条件是否为简单字段
boolean allFieldsSimple = true;
for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
if (fieldItem.isMethod() || fieldItem.isBinaryOp()) {
allFieldsSimple = false;
break;
}
}
// 所有查询字段均为简单字段
if (allFieldsSimple) {
// 不是子查询
if (!tableItem.isSubQuery()) {
// ------关联表简单字段插入------
Map<String, Object> esFieldData = new LinkedHashMap<>();
for (FieldItem fieldItem : tableItem.getRelationSelectFieldItems()) {
Object value = esTemplate.getValFromData(config.getEsMapping(),
data,
fieldItem.getFieldName(),
fieldItem.getColumn().getColumnName());
esFieldData.put(Util.cleanColumn(fieldItem.getFieldName()), value);
}
joinTableSimpleFieldOperation(config, dml, data, tableItem, esFieldData);
} else {
// ------关联子表简单字段插入------
subTableSimpleFieldOperation(config, dml, data, null, tableItem);
}
} else {
// ------关联子表复杂字段插入 执行全sql更新es------
wholeSqlOperation(config, dml, data, null, tableItem);
}
}
}
}
}