MySqlSourceReader类核心功能
数据分片
执行start方法向MySqlSourceEnumerator发起分片请求, 触发MySqlSourceEnumerator类的handleSplitRequest方法, 并向reader分配分片
public void start() {
if (getNumberOfCurrentlyAssignedSplits() <= 1) {
context.sendSplitRequest();
}
}
分片处理
执行addSplits方法, 处理MySqlSourceEnumerator分配的chunk分片
private void addSplits(List<MySqlSplit> splits, boolean checkTableChangeForBinlogSplit) {
// restore for finishedUnackedSplits
// 存放未处理的分片
List<MySqlSplit> unfinishedSplits = new ArrayList<>();
for (MySqlSplit split : splits) {
LOG.info("Source reader {} adds split {}", subtaskId, split);
// 全量阶段
if (split.isSnapshotSplit()) {
MySqlSnapshotSplit snapshotSplit = split.asSnapshotSplit();
// 判断该分片是否已完成, 如已完成存放到finishedUnackedSplits map中
if (snapshotSplit.isSnapshotReadFinished()) {
finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);
} else if (sourceConfig
.getTableFilters()
.dataCollectionFilter()
.isIncluded(split.asSnapshotSplit().getTableId())) {
// 如匹配到配置的同步表, 放入未处理分片的列表中
unfinishedSplits.add(split);
} else {
LOG.debug(
"The subtask {} is skipping split {} because it does not match new table filter.",
subtaskId,
split.splitId());
}
} else {
// Binlog阶段
MySqlBinlogSplit binlogSplit = split.asBinlogSplit();
// When restore from a checkpoint, the finished split infos may contain some splits
// for the deleted tables.
// We need to remove these splits for the deleted tables at the finished split
// infos.
// 如果从CK恢复, 需移除可能被删除的同步表
if (checkTableChangeForBinlogSplit) {
binlogSplit =
filterOutdatedSplitInfos(
binlogSplit,
sourceConfig
.getMySqlConnectorConfig()
.getTableFilters()
.dataCollectionFilter());
}
// Try to discovery table schema once for newly added tables when source reader
// start or restore
// 是否有新增同步表
boolean checkNewlyAddedTableSchema =
!mySqlSourceReaderContext.isHasAssignedBinlogSplit()
&& sourceConfig.isScanNewlyAddedTableEnabled();
mySqlSourceReaderContext.setHasAssignedBinlogSplit(true);
// the binlog split is suspended
if (binlogSplit.isSuspended()) {
suspendedBinlogSplit = binlogSplit;
} else if (!binlogSplit.isCompletedSplit()) {
uncompletedBinlogSplits.put(binlogSplit.splitId(), binlogSplit);
requestBinlogSplitMetaIfNeeded(binlogSplit);
} else {
uncompletedBinlogSplits.remove(binlogSplit.splitId());
MySqlBinlogSplit mySqlBinlogSplit =
discoverTableSchemasForBinlogSplit(
binlogSplit, sourceConfig, checkNewlyAddedTableSchema);
unfinishedSplits.add(mySqlBinlogSplit);
}
LOG.info(
"Source reader {} received the binlog split : {}.", subtaskId, binlogSplit);
context.sendSourceEventToCoordinator(new BinlogSplitAssignedEvent());
}
}
// notify split enumerator again about the finished unacked snapshot splits
reportFinishedSnapshotSplitsIfNeed();
// add all un-finished splits (including binlog split) to SourceReaderBase
// 调用父类SourceReaderBase.addSplits方法, 添加 un-finished splits
if (!unfinishedSplits.isEmpty()) {
super.addSplits(unfinishedSplits);
} else if (suspendedBinlogSplit
!= null) { // only request new snapshot split if the binlog split is suspended
context.sendSplitRequest();
}
}
继续调用SourceReaderBase.addSplits方法
@Override
public void addSplits(List<SplitT> splits) {
LOG.info("Adding split(s) to reader: {}", splits);
// Initialize the state for each split.
splits.forEach(
s ->
splitStates.put(
s.splitId(), new SplitContext<>(s.splitId(), initializedState(s))));
// Hand over the splits to the split fetcher to start fetch.
splitFetcherManager.addSplits(splits);
}
继续调用splitFetcherManager.addSplits方法, 其中splitFetcherManager为SingleThreadFetcherManager对象, 在MySqlSourceReader构造函数中进行的初始化
@Override
public void addSplits(List<SplitT> splitsToAdd) {
// 获取正在运行的fetcher
SplitFetcher<E, SplitT> fetcher = getRunningFetcher();
if (fetcher == null) {
/**
* 创建SplitFetcher, 并加入到fetchers集合中
* */
fetcher = createSplitFetcher();
// Add the splits to the fetchers.
// 创建AddSplitsTask任务, 并加入到taskQueue中待执行
fetcher.addSplits(splitsToAdd);
// 启动执行fetcher
startFetcher(fetcher);
} else {
// 创建AddSplitsTask任务, 并加入到taskQueue中待执行
fetcher.addSplits(splitsToAdd);
}
}
继续查看SplitFetcher run方法
@Override
public void run() {
LOG.info("Starting split fetcher {}", id);
try {
while (runOnce()) {
// nothing to do, everything is inside #runOnce.
}
} catch (Throwable t) {
errorHandler.accept(t);
} finally {
try {
splitReader.close();
} catch (Exception e) {
errorHandler.accept(e);
} finally {
LOG.info("Split fetcher {} exited.", id);
// This executes after possible errorHandler.accept(t). If these operations bear
// a happens-before relation, then we can checking side effect of
// errorHandler.accept(t)
// to know whether it happened after observing side effect of shutdownHook.run().
shutdownHook.run();
}
}
}
继续调用runOnce()
boolean runOnce() {
// first blocking call = get next task. blocks only if there are no active splits and queued
// tasks.
SplitFetcherTask task;
lock.lock();
try {
if (closed) {
return false;
}
// 从taskQueue获取待执行的任务
task = getNextTaskUnsafe();
if (task == null) {
// (spurious) wakeup, so just repeat
return true;
}
LOG.debug("Prepare to run {}", task);
// store task for #wakeUp
this.runningTask = task;
} finally {
lock.unlock();
}
// execute the task outside of lock, so that it can be woken up
boolean taskFinished;
try {
// 执行SplitFetcherTask run方法
taskFinished = task.run();
} catch (Exception e) {
throw new RuntimeException(
String.format(
"SplitFetcher thread %d received unexpected exception while polling the records",
id),
e);
}
// re-acquire lock as all post-processing steps, need it
lock.lock();
try {
this.runningTask = null;
processTaskResultUnsafe(task, taskFinished);
} finally {
lock.unlock();
}
return true;
}
继续调用AddSplitsTask run方法, 这里的splitReader对象是MySqlSplitReader, 执行其handleSplitsChanges方法,
@Override
public boolean run() {
for (SplitT s : splitsToAdd) {
assignedSplits.put(s.splitId(), s);
}
splitReader.handleSplitsChanges(new SplitsAddition<>(splitsToAdd));
return true;
}
handleSplitsChanges方法只是把MySqlSplit添加到对应的snapshotSplits或者binlogSplits列表中
@Override
public void handleSplitsChanges(SplitsChange<MySqlSplit> splitsChanges) {
if (!(splitsChanges instanceof SplitsAddition)) {
throw new UnsupportedOperationException(
String.format(
"The SplitChange type of %s is not supported.",
splitsChanges.getClass()));
}
LOG.info("Handling split change {}", splitsChanges);
for (MySqlSplit mySqlSplit : splitsChanges.splits()) {
if (mySqlSplit.isSnapshotSplit()) {
snapshotSplits.add(mySqlSplit.asSnapshotSplit());
} else {
binlogSplits.add(mySqlSplit.asBinlogSplit());
}
}
}
在前面createSplitFetcher方法创建SplitFetcher对象时, 其构造函数中会创建FetchTask
SplitFetcher(
int id,
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
SplitReader<E, SplitT> splitReader,
Consumer<Throwable> errorHandler,
Runnable shutdownHook,
Consumer<Collection<String>> splitFinishedHook,
boolean allowUnalignedSourceSplits) {
this.id = id;
this.elementsQueue = checkNotNull(elementsQueue);
this.splitReader = checkNotNull(splitReader);
this.errorHandler = checkNotNull(errorHandler);
this.shutdownHook = checkNotNull(shutdownHook);
this.allowUnalignedSourceSplits = allowUnalignedSourceSplits;
// 创建fetchTask, 其中run 方法中会调用splitReader.fetch()方法
this.fetchTask =
new FetchTask<>(
splitReader,
elementsQueue,
ids -> {
ids.forEach(assignedSplits::remove);
splitFinishedHook.accept(ids);
LOG.info("Finished reading from splits {}", ids);
},
id);
}
最终调用MySqlSplitReader.fetch()方法去拉取数据, 由DebeziumReader读取数据