生产者-消费者模式用来协调数据生产和消费速度不一致问题,在hudi中数据写入时非常依赖该设计模式,且中间涉及一些比较好用的工具类可以直接拿来用,例如:ObjectSizeCalculator 来预估对象实例大小、BoundedInMemoryQueue 通过对象大小来调整记录上限的队列。
- 预先分配队列内存,通过数据采样来预估队列中实例占用字节大小,并动态调整队列上限。
- 通过信号量 Semaphore 进行并发控制,代表要缓存的最大记录数。
- 支持转换函数,生产者数据通过转换函数进行转换后写入队列。
- 队列新增数据:
// 生产者向队列插入一条记录
public void insertRecord(I t) throws Exception {
// If already closed, throw exception
if (isWriteDone.get()) {
throw new IllegalStateException("Queue closed for enqueueing new entries");
// We need to stop queueing if queue-reader has failed and exited.
// 获取信号量 -1,可以会阻塞
// We are retrieving insert value in the record queueing thread to offload computation
// around schema validation
// and record creation to it.
// 对记录进行类型转换
// HoodieRecord ----> HoodieInsertValueGenResult
final O payload = transformFunction.apply(t);
// 通过记录采样,调整队列上限
// 存储到内部的阻塞队列 LinkedBlockingQueue
- 队列记录数上限调整
// 通过采样调整队列上限
private void adjustBufferSizeIfNeeded(final O payload) throws InterruptedException {
// 数据大小采样判断,默认64条记录采样一次
if (this.samplingRecordCounter.incrementAndGet() % RECORD_SAMPLING_RATE != 0) {
// 通过 ObjectSizeCalculator.getObjectSize(t); 获取对象大小
final long recordSizeInBytes = payloadSizeEstimator.sizeEstimate(payload);
// 基于历史数据计算记录的平均字节大小
final long newAvgRecordSizeInBytes =
Math.max(1, (avgRecordSizeInBytes * numSamples + recordSizeInBytes) / (numSamples + 1));
// 当前允许缓存的数据条数
final int newRateLimit =
(int) Math.min(RECORD_CACHING_LIMIT, Math.max(1, this.memoryLimit / newAvgRecordSizeInBytes));
// 如果要缓存的记录数量有任何变化,那么我们增加或减少信号量,以将速率限制调整为新计算的值。
if (newRateLimit > currentRateLimit) {
// 增加信号量
rateLimiter.release(newRateLimit - currentRateLimit);
} else if (newRateLimit < currentRateLimit) {
// 减少信号量
rateLimiter.acquire(currentRateLimit - newRateLimit);
currentRateLimit = newRateLimit;
avgRecordSizeInBytes = newAvgRecordSizeInBytes;
- 消费队列数据:
private Option<O> readNextRecord() {
if (this.isReadDone.get()) {
return Option.empty();
// 释放信号量
Option<O> newRecord = Option.empty();
while (expectMoreRecords()) {
try {
// 带有超时时间的poll
newRecord = queue.poll(RECORD_POLL_INTERVAL_SEC, TimeUnit.SECONDS);
if (newRecord != null) {
} catch (InterruptedException e) {
LOG.error("error reading records from queue", e);
throw new HoodieException(e);
// Check one more time here as it is possible producer erred out and closed immediately
if (newRecord != null && newRecord.isPresent()) {
return newRecord;
} else {
// We are done reading all the records from internal iterator.
return Option.empty();
消费者会将数据下发给 HoodieWriteHandle 进行写入。flink 使用 ExplicitWriteHandler 操作 FlinkAppendHandle、FlinkCreateHandle , 使用 UpdateHandler 操作 FlinkMergeHandle 完成数据的写入。
* Consume entries from queue and execute callback function.
public abstract class BoundedInMemoryQueueConsumer<I, O> {
* BoundedInMemoryExecutor 调用,启动消费者来下发数据
* API to de-queue entries to memory bounded queue.
* @param queue In Memory bounded queue
public O consume(BoundedInMemoryQueue<?, I> queue) throws Exception {
Iterator<I> iterator = queue.iterator();
while (iterator.hasNext()) {
// 消费记录并下发给 HoodieWriteHandle
// 队列数据消费完毕,后置处理
// Notifies done
// 返回状态统计信息
return getResult();
* Consumer One record.
protected abstract void consumeOneRecord(I record);
* Notifies implementation that we have exhausted consuming records from queue.
protected abstract void finish();
* Return result of consuming records so far.
protected abstract O getResult();
- IteratorBasedQueueProducer 基于迭代器来生产记录,在插入、更新时使用。
- FunctionBasedQueueProducer 基于Function来生产记录,在合并日志log文件和数据parquet文件时使用,以便提供RealTimeView。
public class IteratorBasedQueueProducer<I> implements BoundedInMemoryQueueProducer<I> {
// input iterator for producing items in the buffer.
private final Iterator<I> inputIterator;
public IteratorBasedQueueProducer(Iterator<I> inputIterator) {
this.inputIterator = inputIterator;
public void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception {
LOG.info("starting to buffer records");
// 从传递的迭代器来生产数据
while (inputIterator.hasNext()) {
LOG.info("finished buffering records");
public class FunctionBasedQueueProducer<I> implements BoundedInMemoryQueueProducer<I> {
private static final Logger LOG = LogManager.getLogger(FunctionBasedQueueProducer.class);
private final Function<BoundedInMemoryQueue<I, ?>, Boolean> producerFunction;
public FunctionBasedQueueProducer(Function<BoundedInMemoryQueue<I, ?>, Boolean> producerFunction) {
this.producerFunction = producerFunction;
public void produce(BoundedInMemoryQueue<I, ?> queue) {
LOG.info("starting function which will enqueue records");
LOG.info("finished function which will enqueue records");
public class BoundedInMemoryExecutor<I, O, E> {
// Executor service used for launching writer thread.
private final ExecutorService executorService;
// Used for buffering records which is controlled by HoodieWriteConfig#WRITE_BUFFER_LIMIT_BYTES.
private final BoundedInMemoryQueue<I, O> queue;
// Producers
private final List<BoundedInMemoryQueueProducer<I>> producers;
// Consumer
private final Option<BoundedInMemoryQueueConsumer<O, E>> consumer;
public BoundedInMemoryExecutor(final long bufferLimitInBytes, BoundedInMemoryQueueProducer<I> producer,
Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction) {
this(bufferLimitInBytes, Arrays.asList(producer), consumer, transformFunction, new DefaultSizeEstimator<>());
public BoundedInMemoryExecutor(final long bufferLimitInBytes, List<BoundedInMemoryQueueProducer<I>> producers,
Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction,
final SizeEstimator<O> sizeEstimator) {
this.producers = producers;
this.consumer = consumer;
// 线程数量为 producers + consumer
// Ensure single thread for each producer thread and one for consumer
this.executorService = Executors.newFixedThreadPool(producers.size() + 1);
// 基于内存的有界队列
this.queue = new BoundedInMemoryQueue<>(bufferLimitInBytes, transformFunction, sizeEstimator);
* Callback to implement environment specific behavior before executors (producers/consumer) run.
public void preExecute() {
// Do Nothing in general context
* 启动全部生产者
* Start all Producers.
public ExecutorCompletionService<Boolean> startProducers() {
// Latch 控制何时以及哪个生产者线程将关闭队列
// Latch to control when and which producer thread will close the queue
final CountDownLatch latch = new CountDownLatch(producers.size());
final ExecutorCompletionService<Boolean> completionService =
new ExecutorCompletionService<Boolean>(executorService);
producers.stream().map(producer -> {
return completionService.submit(() -> {
try {
// 生产者下发数据到queue
} catch (Throwable e) {
LOG.error("error producing records", e);
throw e;
} finally {
synchronized (latch) {
if (latch.getCount() == 0) {
// 将生产标记为已完成,以便消费者能够退出
// Mark production as done so that consumer will be able to exit
return true;
return completionService;
* 启动唯一消费者
* Start only consumer.
private Future<E> startConsumer() {
return consumer.map(consumer -> {
return executorService.submit(() -> {
LOG.info("starting consumer thread");
try {
E result = consumer.consume(queue);
LOG.info("Queue Consumption is done; notifying producer threads");
return result;
} catch (Exception e) {
LOG.error("error consuming records", e);
throw e;
* 指定入口
* Main API to run both production and consumption.
public E execute() {
try {
// 1. 启动生产者,生产数据
// 2. 消费数据
Future<E> future = startConsumer();
// Wait for consumer to be done
// 阻塞拿结果
return future.get();
} catch (InterruptedException ie) {
throw new HoodieException(ie);
} catch (Exception e) {
throw new HoodieException(e);
public boolean isRemaining() {
return queue.iterator().hasNext();
public void shutdownNow() {
public BoundedInMemoryQueue<I, O> getQueue() {
return queue;
通过迭代器来构建 BoundedInMemoryExecutor 将要插入的数据进行下发,computeNext() 为调用入口。
public class FlinkLazyInsertIterable<T extends HoodieRecordPayload> extends HoodieLazyInsertIterable<T> {
public FlinkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
boolean areRecordsSorted,
HoodieWriteConfig config,
String instantTime,
HoodieTable hoodieTable,
String idPrefix,
TaskContextSupplier taskContextSupplier,
ExplicitWriteHandleFactory writeHandleFactory) {
super(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier, writeHandleFactory);
* 执行入口
* @return
protected List<WriteStatus> computeNext() {
// Executor service used for launching writer thread.
BoundedInMemoryExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor =
try {
final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
// 创建 BoundedInMemoryExecutor
bufferedIteratorExecutor = new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(),
new IteratorBasedQueueProducer<>(inputItr), // 生产者,对象创建时传递了要下发数据的集合的 Iterator
Option.of(getExplicitInsertHandler()), // 消费者,ExplicitWriteHandleFactory 用来创建 HoodieWriteHandle
getTransformFunction(schema, hoodieConfig)); // 转换函数
// 开始执行,阻塞获取数据写入的状态信息 WriteStatus
final List<WriteStatus> result = bufferedIteratorExecutor.execute();
assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
return result;
} catch (Exception e) {
throw new HoodieException(e);
} finally {
if (null != bufferedIteratorExecutor) {
private ExplicitWriteHandler getExplicitInsertHandler() {
HoodieWriteHandle handle = ((ExplicitWriteHandleFactory) writeHandleFactory).getWriteHandle();
// HoodieAppendHandle
return new ExplicitWriteHandler(handle);
用来做merge数据时使用,生产者为历史basefile数据,消费者为 FlinkMergeHandle。FlinkMergeHandle在初始化时已经包含了 logfile的全部数据。
public class FlinkMergeHelper<T extends HoodieRecordPayload> extends AbstractMergeHelper<T, List<HoodieRecord<T>>,
List<HoodieKey>, List<WriteStatus>> {
private FlinkMergeHelper() {
private static class MergeHelperHolder {
private static final FlinkMergeHelper FLINK_MERGE_HELPER = new FlinkMergeHelper();
public static FlinkMergeHelper newInstance() {
return FlinkMergeHelper.MergeHelperHolder.FLINK_MERGE_HELPER;
public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
HoodieMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> mergeHandle) throws IOException {
final GenericDatumWriter<GenericRecord> gWriter;
final GenericDatumReader<GenericRecord> gReader;
Schema readSchema;
// Read records from previous version of base file and merge.
final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();
HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();
if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) {
readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema();
gWriter = new GenericDatumWriter<>(readSchema);
gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetaFields());
} else {
// flink 走这 ==>
gReader = null;
gWriter = null;
readSchema = mergeHandle.getWriterSchemaWithMetaFields();
BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
// 构建 base file 读取器
HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.<GenericRecord>getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());
try {
final Iterator<GenericRecord> readerIterator;
if (baseFile.getBootstrapBaseFile().isPresent()) {
readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation);
} else {
// flink ==> 读取 base file (历史parquet)
readerIterator = reader.getRecordIterator(readSchema);
ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();
wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(),
new IteratorBasedQueueProducer<>(readerIterator), // 生产者
Option.of(new UpdateHandler(mergeHandle)), // 消费者
record -> {
if (!externalSchemaTransformation) {
// flink ==> 不进行转换
return record;
return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record);
} catch (Exception e) {
throw new HoodieException(e);
} finally {
if (reader != null) {
// parquet
if (null != wrapper) {
后续主要关注 FlinkMergeHelper及FlinkLazyInsertIterable 在flink 调用中的构建。