hudi 数据写入中的生产者-消费者模式

生产者-消费者模式用来协调数据生产和消费速度不一致问题,在hudi中数据写入时非常依赖该设计模式,且中间涉及一些比较好用的工具类可以直接拿来用,例如:ObjectSizeCalculator 来预估对象实例大小、BoundedInMemoryQueue 通过对象大小来调整记录上限的队列。


涉及的主要类

BoundedInMemoryQueue

基于内存的有界队列,特点:

  • 预先分配队列内存,通过数据采样来预估队列中实例占用字节大小,并动态调整队列上限。
  • 通过信号量 Semaphore 进行并发控制,代表要缓存的最大记录数。
  • 支持转换函数,生产者数据通过转换函数进行转换后写入队列。

主要代码流程:

  • 队列新增数据:
//  生产者向队列插入一条记录
public void insertRecord(I t) throws Exception {
    // If already closed, throw exception
    if (isWriteDone.get()) {
      throw new IllegalStateException("Queue closed for enqueueing new entries");
    }
    // We need to stop queueing if queue-reader has failed and exited.
    throwExceptionIfFailed();
    //  获取信号量 -1,可以会阻塞
    rateLimiter.acquire();
    // We are retrieving insert value in the record queueing thread to offload computation
    // around schema validation
    // and record creation to it.

    //  对记录进行类型转换
    //  HoodieRecord ---->  HoodieInsertValueGenResult
    final O payload = transformFunction.apply(t);

    // 通过记录采样,调整队列上限
    adjustBufferSizeIfNeeded(payload);

    // 存储到内部的阻塞队列 LinkedBlockingQueue
    queue.put(Option.of(payload));
  }
  • 队列记录数上限调整
//  通过采样调整队列上限  
private void adjustBufferSizeIfNeeded(final O payload) throws InterruptedException {
    //  数据大小采样判断,默认64条记录采样一次
    if (this.samplingRecordCounter.incrementAndGet() % RECORD_SAMPLING_RATE != 0) {
      return;
    }
    //  通过 ObjectSizeCalculator.getObjectSize(t); 获取对象大小
    final long recordSizeInBytes = payloadSizeEstimator.sizeEstimate(payload);
    //  基于历史数据计算记录的平均字节大小
    final long newAvgRecordSizeInBytes =
        Math.max(1, (avgRecordSizeInBytes * numSamples + recordSizeInBytes) / (numSamples + 1));

    // 当前允许缓存的数据条数
    final int newRateLimit =
        (int) Math.min(RECORD_CACHING_LIMIT, Math.max(1, this.memoryLimit / newAvgRecordSizeInBytes));

    //  如果要缓存的记录数量有任何变化,那么我们增加或减少信号量,以将速率限制调整为新计算的值。
    if (newRateLimit > currentRateLimit) {
      //  增加信号量
      rateLimiter.release(newRateLimit - currentRateLimit);
    } else if (newRateLimit < currentRateLimit) {
      //  减少信号量
      rateLimiter.acquire(currentRateLimit - newRateLimit);
    }

    currentRateLimit = newRateLimit;

    avgRecordSizeInBytes = newAvgRecordSizeInBytes;

    numSamples++;
  }
  • 消费队列数据:
private Option<O> readNextRecord() {
  if (this.isReadDone.get()) {
    return Option.empty();
  }
  // 释放信号量
  rateLimiter.release();

  Option<O> newRecord = Option.empty();
  while (expectMoreRecords()) {
    try {
      throwExceptionIfFailed();
      // 带有超时时间的poll
      newRecord = queue.poll(RECORD_POLL_INTERVAL_SEC, TimeUnit.SECONDS);
      if (newRecord != null) {
        break;
      }
    } catch (InterruptedException e) {
      LOG.error("error reading records from queue", e);
      throw new HoodieException(e);
    }
  }
  // Check one more time here as it is possible producer erred out and closed immediately
  throwExceptionIfFailed();

  if (newRecord != null && newRecord.isPresent()) {
    return newRecord;
  } else {
    // We are done reading all the records from internal iterator.
    this.isReadDone.set(true);
    return Option.empty();
  }
}

BoundedInMemoryQueueConsumer

消费者实现类

消费者会将数据下发给 HoodieWriteHandle 进行写入。flink 使用 ExplicitWriteHandler 操作 FlinkAppendHandle、FlinkCreateHandle , 使用 UpdateHandler 操作 FlinkMergeHandle 完成数据的写入。

/**
 * Consume entries from queue and execute callback function.
 */
public abstract class BoundedInMemoryQueueConsumer<I, O> {

  /**
   *  BoundedInMemoryExecutor 调用,启动消费者来下发数据
   * API to de-queue entries to memory bounded queue.
   *
   * @param queue In Memory bounded queue
   */
  public O consume(BoundedInMemoryQueue<?, I> queue) throws Exception {
    Iterator<I> iterator = queue.iterator();

    while (iterator.hasNext()) {
      // 消费记录并下发给 HoodieWriteHandle
      consumeOneRecord(iterator.next());
    }
    // 队列数据消费完毕,后置处理
    // Notifies done
    finish();
    // 返回状态统计信息
    return getResult();
  }
  /**
   * Consumer One record.
   */
  protected abstract void consumeOneRecord(I record);
  /**
   * Notifies implementation that we have exhausted consuming records from queue.
   */
  protected abstract void finish();
  /**
   * Return result of consuming records so far.
   */
  protected abstract O getResult();

}

BoundedInMemoryQueueProducer

生产者接口及实现类
  • IteratorBasedQueueProducer 基于迭代器来生产记录,在插入、更新时使用。
  • FunctionBasedQueueProducer 基于Function来生产记录,在合并日志log文件和数据parquet文件时使用,以便提供RealTimeView。
public class IteratorBasedQueueProducer<I> implements BoundedInMemoryQueueProducer<I> {
  // input iterator for producing items in the buffer.
  private final Iterator<I> inputIterator;

  public IteratorBasedQueueProducer(Iterator<I> inputIterator) {
    this.inputIterator = inputIterator;
  }

  @Override
  public void produce(BoundedInMemoryQueue<I, ?> queue) throws Exception {
    LOG.info("starting to buffer records");
    // 从传递的迭代器来生产数据
    while (inputIterator.hasNext()) {
      queue.insertRecord(inputIterator.next());
    }
    LOG.info("finished buffering records");
  }
}

public class FunctionBasedQueueProducer<I> implements BoundedInMemoryQueueProducer<I> {

  private static final Logger LOG = LogManager.getLogger(FunctionBasedQueueProducer.class);

  private final Function<BoundedInMemoryQueue<I, ?>, Boolean> producerFunction;

  public FunctionBasedQueueProducer(Function<BoundedInMemoryQueue<I, ?>, Boolean> producerFunction) {
    this.producerFunction = producerFunction;
  }

  @Override
  public void produce(BoundedInMemoryQueue<I, ?> queue) {
    LOG.info("starting function which will enqueue records");
    producerFunction.apply(queue);
    LOG.info("finished function which will enqueue records");
  }
}

BoundedInMemoryExecutor

通过有界的内存队列协调生产者和生产者的执行器。

public class BoundedInMemoryExecutor<I, O, E> {
  // Executor service used for launching writer thread.
  private final ExecutorService executorService;
  // Used for buffering records which is controlled by HoodieWriteConfig#WRITE_BUFFER_LIMIT_BYTES.
  private final BoundedInMemoryQueue<I, O> queue;
  // Producers
  private final List<BoundedInMemoryQueueProducer<I>> producers;
  // Consumer
  private final Option<BoundedInMemoryQueueConsumer<O, E>> consumer;

  public BoundedInMemoryExecutor(final long bufferLimitInBytes, BoundedInMemoryQueueProducer<I> producer,
      Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction) {
    this(bufferLimitInBytes, Arrays.asList(producer), consumer, transformFunction, new DefaultSizeEstimator<>());
  }

  public BoundedInMemoryExecutor(final long bufferLimitInBytes, List<BoundedInMemoryQueueProducer<I>> producers,
      Option<BoundedInMemoryQueueConsumer<O, E>> consumer, final Function<I, O> transformFunction,
      final SizeEstimator<O> sizeEstimator) {
    this.producers = producers;
    this.consumer = consumer;
    //  线程数量为 producers + consumer
    // Ensure single thread for each producer thread and one for consumer
    this.executorService = Executors.newFixedThreadPool(producers.size() + 1);
    //  基于内存的有界队列
    this.queue = new BoundedInMemoryQueue<>(bufferLimitInBytes, transformFunction, sizeEstimator);
  }

  /**
   * Callback to implement environment specific behavior before executors (producers/consumer) run.
   */
  public void preExecute() {
    // Do Nothing in general context
  }

  /**
   *   启动全部生产者
   * Start all Producers.
   */
  public ExecutorCompletionService<Boolean> startProducers() {
    //  Latch 控制何时以及哪个生产者线程将关闭队列
    // Latch to control when and which producer thread will close the queue
    final CountDownLatch latch = new CountDownLatch(producers.size());
    final ExecutorCompletionService<Boolean> completionService =
        new ExecutorCompletionService<Boolean>(executorService);

    producers.stream().map(producer -> {
      return completionService.submit(() -> {
        try {
          preExecute();
          // 生产者下发数据到queue
          producer.produce(queue);
        } catch (Throwable e) {
          LOG.error("error producing records", e);
          queue.markAsFailed(e);
          throw e;
        } finally {
          synchronized (latch) {
            latch.countDown();
            if (latch.getCount() == 0) {
              // 将生产标记为已完成,以便消费者能够退出
              // Mark production as done so that consumer will be able to exit
              queue.close();
            }
          }
        }
        return true;
      });
    }).collect(Collectors.toList());
    return completionService;
  }

  /**
   *  启动唯一消费者
   * Start only consumer.
   */
  private Future<E> startConsumer() {
    return consumer.map(consumer -> {
      return executorService.submit(() -> {
        LOG.info("starting consumer thread");
        preExecute();
        try {
          E result = consumer.consume(queue);
          LOG.info("Queue Consumption is done; notifying producer threads");
          return result;
        } catch (Exception e) {
          LOG.error("error consuming records", e);
          queue.markAsFailed(e);
          throw e;
        }
      });
    }).orElse(CompletableFuture.completedFuture(null));
  }

  /**
   *  指定入口
   * Main API to run both production and consumption.
   */
  public E execute() {
    try {
      // 1. 启动生产者,生产数据
      startProducers();
      // 2. 消费数据
      Future<E> future = startConsumer();
      // Wait for consumer to be done
      // 阻塞拿结果
      return future.get();
    } catch (InterruptedException ie) {
      shutdownNow();
      Thread.currentThread().interrupt();
      throw new HoodieException(ie);
    } catch (Exception e) {
      throw new HoodieException(e);
    }
  }

  public boolean isRemaining() {
    return queue.iterator().hasNext();
  }

  public void shutdownNow() {
    executorService.shutdownNow();
  }

  public BoundedInMemoryQueue<I, O> getQueue() {
    return queue;
  }
}

FlinkLazyInsertIterable

flink 使用的迭代器

通过迭代器来构建 BoundedInMemoryExecutor 将要插入的数据进行下发,computeNext() 为调用入口。
构造函数指定了下发数据集合迭代器,及HoodieWriteHandle的工厂类。

public class FlinkLazyInsertIterable<T extends HoodieRecordPayload> extends HoodieLazyInsertIterable<T> {

  public FlinkLazyInsertIterable(Iterator<HoodieRecord<T>> recordItr,
                                 boolean areRecordsSorted,
                                 HoodieWriteConfig config,
                                 String instantTime,
                                 HoodieTable hoodieTable,
                                 String idPrefix,
                                 TaskContextSupplier taskContextSupplier,
                                 ExplicitWriteHandleFactory writeHandleFactory) {
    super(recordItr, areRecordsSorted, config, instantTime, hoodieTable, idPrefix, taskContextSupplier, writeHandleFactory);
  }

  /**
   *   执行入口
   * @return
   */
  @Override
  protected List<WriteStatus> computeNext() {
    // Executor service used for launching writer thread.
    BoundedInMemoryExecutor<HoodieRecord<T>, HoodieInsertValueGenResult<HoodieRecord>, List<WriteStatus>> bufferedIteratorExecutor =
        null;
    try {
      final Schema schema = new Schema.Parser().parse(hoodieConfig.getSchema());
      //  创建 BoundedInMemoryExecutor
      bufferedIteratorExecutor = new BoundedInMemoryExecutor<>(hoodieConfig.getWriteBufferLimitBytes(),
          new IteratorBasedQueueProducer<>(inputItr),  // 生产者,对象创建时传递了要下发数据的集合的 Iterator
          Option.of(getExplicitInsertHandler()),      // 消费者,ExplicitWriteHandleFactory 用来创建 HoodieWriteHandle
          getTransformFunction(schema, hoodieConfig)); // 转换函数

      // 开始执行,阻塞获取数据写入的状态信息 WriteStatus
      final List<WriteStatus> result = bufferedIteratorExecutor.execute();
      assert result != null && !result.isEmpty() && !bufferedIteratorExecutor.isRemaining();
      return result;
    } catch (Exception e) {
      throw new HoodieException(e);
    } finally {
      if (null != bufferedIteratorExecutor) {
        bufferedIteratorExecutor.shutdownNow();
      }
    }
  }

  @SuppressWarnings("rawtypes")
  private ExplicitWriteHandler getExplicitInsertHandler() {
    HoodieWriteHandle handle = ((ExplicitWriteHandleFactory) writeHandleFactory).getWriteHandle();
    //  HoodieAppendHandle
    return new ExplicitWriteHandler(handle);
  }
}

FlinkMergeHelper

用来做merge数据时使用,生产者为历史basefile数据,消费者为 FlinkMergeHandle。FlinkMergeHandle在初始化时已经包含了 logfile的全部数据。

public class FlinkMergeHelper<T extends HoodieRecordPayload> extends AbstractMergeHelper<T, List<HoodieRecord<T>>,
    List<HoodieKey>, List<WriteStatus>> {

  private FlinkMergeHelper() {
  }

  private static class MergeHelperHolder {
    private static final FlinkMergeHelper FLINK_MERGE_HELPER = new FlinkMergeHelper();
  }

  public static FlinkMergeHelper newInstance() {
    return FlinkMergeHelper.MergeHelperHolder.FLINK_MERGE_HELPER;
  }

  @Override
  public void runMerge(HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> table,
                       HoodieMergeHandle<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> mergeHandle) throws IOException {
    final GenericDatumWriter<GenericRecord> gWriter;
    final GenericDatumReader<GenericRecord> gReader;
    Schema readSchema;
    // Read records from previous version of base file and merge.
    final boolean externalSchemaTransformation = table.getConfig().shouldUseExternalSchemaTransformation();
    HoodieBaseFile baseFile = mergeHandle.baseFileForMerge();

    if (externalSchemaTransformation || baseFile.getBootstrapBaseFile().isPresent()) {
      readSchema = HoodieFileReaderFactory.getFileReader(table.getHadoopConf(), mergeHandle.getOldFilePath()).getSchema();
      gWriter = new GenericDatumWriter<>(readSchema);
      gReader = new GenericDatumReader<>(readSchema, mergeHandle.getWriterSchemaWithMetaFields());
    } else {
      // flink 走这 ==>
      gReader = null;
      gWriter = null;
      readSchema = mergeHandle.getWriterSchemaWithMetaFields();
    }

    BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null;
    Configuration cfgForHoodieFile = new Configuration(table.getHadoopConf());
    //  构建 base file 读取器
    HoodieFileReader<GenericRecord> reader = HoodieFileReaderFactory.<GenericRecord>getFileReader(cfgForHoodieFile, mergeHandle.getOldFilePath());
    try {
      final Iterator<GenericRecord> readerIterator;
      if (baseFile.getBootstrapBaseFile().isPresent()) {
        readerIterator = getMergingIterator(table, mergeHandle, baseFile, reader, readSchema, externalSchemaTransformation);
      } else {
        // flink ==>   读取 base file (历史parquet)
        readerIterator = reader.getRecordIterator(readSchema);
      }

      ThreadLocal<BinaryEncoder> encoderCache = new ThreadLocal<>();
      ThreadLocal<BinaryDecoder> decoderCache = new ThreadLocal<>();

      wrapper = new BoundedInMemoryExecutor(table.getConfig().getWriteBufferLimitBytes(),
          new IteratorBasedQueueProducer<>(readerIterator),  // 生产者
          Option.of(new UpdateHandler(mergeHandle)),        //  消费者
          record -> {
              if (!externalSchemaTransformation) {
                // flink ==>  不进行转换
                return record;
              }
             return transformRecordBasedOnNewSchema(gReader, gWriter, encoderCache, decoderCache, (GenericRecord) record);
            }
      );

      wrapper.execute();
    } catch (Exception e) {
      throw new HoodieException(e);
    } finally {
      if (reader != null) {
        // parquet
        reader.close();
      }

      mergeHandle.close();
      if (null != wrapper) {
        wrapper.shutdownNow();
      }
    }
  }
}

后续主要关注 FlinkMergeHelper及FlinkLazyInsertIterable 在flink 调用中的构建。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,651评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,468评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,931评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,218评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,234评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,198评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,084评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,926评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,341评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,563评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,731评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,430评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,036评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,676评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,829评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,743评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,629评论 2 354

推荐阅读更多精彩内容