Kafka-connect-hdfs源码解析

写数据流程分析

启动task类方法,HdfsSinkTask.java类中start

@Override
public void start(Map<String, String> props) {
  Set<TopicPartition> assignment = context.assignment();
  try {
    HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props);
    boolean hiveIntegration = connectorConfig.getBoolean(HiveConfig.HIVE_INTEGRATION_CONFIG);
    if (hiveIntegration) {
      StorageSchemaCompatibility compatibility = StorageSchemaCompatibility.getCompatibility(
          connectorConfig.getString(StorageSinkConnectorConfig.SCHEMA_COMPATIBILITY_CONFIG)
      );
      if (compatibility == StorageSchemaCompatibility.NONE) {
        throw new ConfigException(
            "Hive Integration requires schema compatibility to be BACKWARD, FORWARD or FULL"
        );
      }
    }

    //check that timezone it setup correctly in case of scheduled rotation
    if (connectorConfig.getLong(HdfsSinkConnectorConfig.ROTATE_SCHEDULE_INTERVAL_MS_CONFIG) > 0) {
      String timeZoneString = connectorConfig.getString(PartitionerConfig.TIMEZONE_CONFIG);
      if (timeZoneString.equals("")) {
        throw new ConfigException(PartitionerConfig.TIMEZONE_CONFIG,
            timeZoneString, "Timezone cannot be empty when using scheduled file rotation."
        );
      }
      DateTimeZone.forID(timeZoneString);
    }

    int schemaCacheSize = connectorConfig.getInt(
        HdfsSinkConnectorConfig.SCHEMA_CACHE_SIZE_CONFIG
    );
    avroData = new AvroData(schemaCacheSize);
    hdfsWriter = new DataWriter(connectorConfig, context, avroData);   //初始化DataWriter
    recover(assignment);
    if (hiveIntegration) {
      syncWithHive();
    }
  } catch (ConfigException e) {
    throw new ConnectException("Couldn't start HdfsSinkConnector due to configuration error.", e);
  } catch (ConnectException e) {
    // Log at info level to help explain reason, but Connect logs the actual exception at ERROR
    log.info("Couldn't start HdfsSinkConnector:", e);
    log.info("Shutting down HdfsSinkConnector.");
    if (hdfsWriter != null) {
      try {
        try {
          log.debug("Closing data writer due to task start failure.");
          hdfsWriter.close();
        } finally {
          log.debug("Stopping data writer due to task start failure.");
          hdfsWriter.stop();
        }
      } catch (Throwable t) {
        log.debug("Error closing and stopping data writer: {}", t.getMessage(), t);
      }
    }
    // Always throw the original exception that prevent us from starting
    throw e;
  }

  log.info("The connector relies on offsets in HDFS filenames, but does commit these offsets to "
      + "Connect to enable monitoring progress of the HDFS connector. Upon startup, the HDFS "
      + "Connector restores offsets from filenames in HDFS. In the absence of files in HDFS, "
      + "the connector will attempt to find offsets for its consumer group in the "
      + "'__consumer_offsets' topic. If offsets are not found, the consumer will "
      + "rely on the reset policy specified in the 'consumer.auto.offset.reset' property to "
      + "start exporting data to HDFS.");
}

初始化DataWriter,DataWriter.java

@SuppressWarnings("unchecked")
public DataWriter(
    HdfsSinkConnectorConfig connectorConfig,
    SinkTaskContext context,
    AvroData avroData,
    Time time
) {
  this.time = time;
  try {
    String hadoopHome = connectorConfig.getString(HdfsSinkConnectorConfig.HADOOP_HOME_CONFIG);
    System.setProperty("hadoop.home.dir", hadoopHome);

    this.connectorConfig = connectorConfig;
    this.avroData = avroData;
    this.context = context;

    String hadoopConfDir = connectorConfig.getString(
        HdfsSinkConnectorConfig.HADOOP_CONF_DIR_CONFIG
    );
    log.info("Hadoop configuration directory {}", hadoopConfDir);
    Configuration conf = connectorConfig.getHadoopConfiguration();
    if (!hadoopConfDir.equals("")) {
      conf.addResource(new Path(hadoopConfDir + "/core-site.xml"));
      conf.addResource(new Path(hadoopConfDir + "/hdfs-site.xml"));
    }

    boolean secureHadoop = connectorConfig.getBoolean(
        HdfsSinkConnectorConfig.HDFS_AUTHENTICATION_KERBEROS_CONFIG
    );
    if (secureHadoop) {
      SecurityUtil.setAuthenticationMethod(
          UserGroupInformation.AuthenticationMethod.KERBEROS,
          conf
      );
      String principalConfig = connectorConfig.getString(
          HdfsSinkConnectorConfig.CONNECT_HDFS_PRINCIPAL_CONFIG
      );
      String keytab = connectorConfig.getString(
          HdfsSinkConnectorConfig.CONNECT_HDFS_KEYTAB_CONFIG
      );

      if (principalConfig == null || keytab == null) {
        throw new ConfigException(
            "Hadoop is using Kerberos for authentication, you need to provide both a connect "
                + "principal and the path to the keytab of the principal.");
      }

      conf.set("hadoop.security.authentication", "kerberos");
      conf.set("hadoop.security.authorization", "true");
      String hostname = InetAddress.getLocalHost().getCanonicalHostName();
      String namenodePrincipalConfig = connectorConfig.getString(
          HdfsSinkConnectorConfig.HDFS_NAMENODE_PRINCIPAL_CONFIG
      );

      String namenodePrincipal = SecurityUtil.getServerPrincipal(
          namenodePrincipalConfig,
          hostname
      );
      // namenode principal is needed for multi-node hadoop cluster
      if (conf.get("dfs.namenode.kerberos.principal") == null) {
        conf.set("dfs.namenode.kerberos.principal", namenodePrincipal);
      }
      log.info("Hadoop namenode principal: " + conf.get("dfs.namenode.kerberos.principal"));

      UserGroupInformation.setConfiguration(conf);
      // replace the _HOST specified in the principal config to the actual host
      String principal = SecurityUtil.getServerPrincipal(principalConfig, hostname);
      UserGroupInformation.loginUserFromKeytab(principal, keytab);
      final UserGroupInformation ugi = UserGroupInformation.getLoginUser();
      log.info("Login as: " + ugi.getUserName());

      final long renewPeriod = connectorConfig.getLong(
          HdfsSinkConnectorConfig.KERBEROS_TICKET_RENEW_PERIOD_MS_CONFIG
      );

      isRunning = true;
      ticketRenewThread = new Thread(new Runnable() {
        @Override
        public void run() {
          synchronized (DataWriter.this) {
            while (isRunning) {
              try {
                DataWriter.this.wait(renewPeriod);
                if (isRunning) {
                  ugi.reloginFromKeytab();
                }
              } catch (IOException e) {
                // We ignore this exception during relogin as each successful relogin gives
                // additional 24 hours of authentication in the default config. In normal
                // situations, the probability of failing relogin 24 times is low and if
                // that happens, the task will fail eventually.
                log.error("Error renewing the ticket", e);
              } catch (InterruptedException e) {
                // ignored
              }
            }
          }
        }
      });
      log.info("Starting the Kerberos ticket renew thread with period {}ms.", renewPeriod);
      ticketRenewThread.start();
    }

    url = connectorConfig.getUrl();
    topicsDir = connectorConfig.getString(StorageCommonConfig.TOPICS_DIR_CONFIG);

    @SuppressWarnings("unchecked")
    Class<? extends HdfsStorage> storageClass = (Class<? extends HdfsStorage>) connectorConfig
        .getClass(StorageCommonConfig.STORAGE_CLASS_CONFIG);
    storage = io.confluent.connect.storage.StorageFactory.createStorage(
        storageClass,
        HdfsSinkConnectorConfig.class,
        connectorConfig,
        url
    );

    createDir(topicsDir);
    createDir(topicsDir + HdfsSinkConnectorConstants.TEMPFILE_DIRECTORY);
    String logsDir = connectorConfig.getString(HdfsSinkConnectorConfig.LOGS_DIR_CONFIG);
    createDir(logsDir);

    // Try to instantiate as a new-style storage-common type class, then fall back to old-style
    // with no parameters
    try {
      Class<io.confluent.connect.storage.format.Format> formatClass =
          (Class<io.confluent.connect.storage.format.Format>)
              connectorConfig.getClass(HdfsSinkConnectorConfig.FORMAT_CLASS_CONFIG);
      newFormat = formatClass.getConstructor(HdfsStorage.class).newInstance(storage);
      newWriterProvider = newFormat.getRecordWriterProvider();
      schemaFileReader = newFormat.getSchemaFileReader();
    } catch (NoSuchMethodException e) {
      Class<Format> formatClass =
          (Class<Format>) connectorConfig.getClass(HdfsSinkConnectorConfig.FORMAT_CLASS_CONFIG);
      format = formatClass.getConstructor().newInstance();
      writerProvider = format.getRecordWriterProvider();
      final io.confluent.connect.hdfs.SchemaFileReader oldReader
          = format.getSchemaFileReader(avroData);
      schemaFileReader = new SchemaFileReader<HdfsSinkConnectorConfig, Path>() {
        @Override
        public Schema getSchema(HdfsSinkConnectorConfig hdfsSinkConnectorConfig, Path path) {
          try {
            return oldReader.getSchema(hdfsSinkConnectorConfig.getHadoopConfiguration(), path);
          } catch (IOException e) {
            throw new ConnectException("Failed to get schema", e);
          }
        }

        @Override
        public Iterator<Object> iterator() {
          throw new UnsupportedOperationException();
        }

        @Override
        public boolean hasNext() {
          throw new UnsupportedOperationException();
        }

        @Override
        public Object next() {
          throw new UnsupportedOperationException();
        }

        @Override
        public void remove() {
          throw new UnsupportedOperationException();
        }

        @Override
        public void close() throws IOException {

        }
      };
    }

    partitioner = newPartitioner(connectorConfig);

    assignment = new HashSet<>(context.assignment());

    hiveIntegration = connectorConfig.getBoolean(HiveConfig.HIVE_INTEGRATION_CONFIG);
    if (hiveIntegration) {
      hiveDatabase = connectorConfig.getString(HiveConfig.HIVE_DATABASE_CONFIG);
      hiveMetaStore = new HiveMetaStore(conf, connectorConfig);
      if (format != null) {
        hive = format.getHiveUtil(connectorConfig, hiveMetaStore);
      } else if (newFormat != null) {
        final io.confluent.connect.storage.hive.HiveUtil newHiveUtil
            = ((HiveFactory) newFormat.getHiveFactory())
            .createHiveUtil(connectorConfig, hiveMetaStore);
        hive = new HiveUtil(connectorConfig, hiveMetaStore) {
          @Override
          public void createTable(
              String database, String tableName, Schema schema,
              Partitioner partitioner
          ) {
            newHiveUtil.createTable(database, tableName, schema, partitioner);
          }

          @Override
          public void alterSchema(String database, String tableName, Schema schema) {
            newHiveUtil.alterSchema(database, tableName, schema);
          }
        };
      } else {
        throw new ConnectException("One of old or new format classes must be provided");
      }
      executorService = Executors.newSingleThreadExecutor();
      hiveUpdateFutures = new LinkedList<>();
    }

    topicPartitionWriters = new HashMap<>();
    for (TopicPartition tp : assignment) {
      TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(
          tp,
          storage,
          writerProvider,
          newWriterProvider,
          partitioner,
          connectorConfig,
          context,
          avroData,
          hiveMetaStore,
          hive,
          schemaFileReader,
          executorService,
          hiveUpdateFutures,
          time
      );
      topicPartitionWriters.put(tp, topicPartitionWriter);
    }
  } catch (ClassNotFoundException
          | IllegalAccessException
          | InstantiationException
          | InvocationTargetException
          | NoSuchMethodException e
  ) {
    throw new ConnectException("Reflection exception: ", e);
  } catch (IOException e) {
    throw new ConnectException(e);
  }
}

初始化DataWriter后,下一步执行方法recover方法

private void recover(Set<TopicPartition> assignment) {
  for (TopicPartition tp : assignment) {
    hdfsWriter.recover(tp);
  }
}

hdfsWriter.recover(tp)方法跳转到DataWriter类中执行
DataWriter.java

public void recover(TopicPartition tp) {
  topicPartitionWriters.get(tp).recover();
}

TopicPartitionWriter.java中执行recover

@SuppressWarnings("fallthrough")
public boolean recover() {
  try {
    switch (state) {
      case RECOVERY_STARTED:
        log.info("Started recovery for topic partition {}", tp);
        pause();
        nextState();
      case RECOVERY_PARTITION_PAUSED:
        log.debug("Start recovery state: Apply WAL for topic partition {}", tp);
        applyWAL();
        nextState();
      case WAL_APPLIED:
        log.debug("Start recovery state: Truncate WAL for topic partition {}", tp);
        truncateWAL();
        nextState();
      case WAL_TRUNCATED:
        log.debug("Start recovery state: Reset Offsets for topic partition {}", tp);
        resetOffsets();
        nextState();
      case OFFSET_RESET:
        log.debug("Start recovery state: Resume for topic partition {}", tp);
        resume();
        nextState();
        log.info("Finished recovery for topic partition {}", tp);
        break;
      default:
        log.error(
            "{} is not a valid state to perform recovery for topic partition {}.",
            state,
            tp
        );
    }
  } catch (ConnectException e) {
    log.error("Recovery failed at state {}", state, e);
    setRetryTimeout(timeoutMs);
    return false;
  }
  return true;
}

task线程启动后执行执行workersinktask中的iteration()方法
第二步执行iteration()方法;

protected void iteration() {
    final long offsetCommitIntervalMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);

    try {
        long now = time.milliseconds();

        // Maybe commit
        if (!committing && (context.isCommitRequested() || now >= nextCommit)) {
            commitOffsets(now, false);
            nextCommit = now + offsetCommitIntervalMs;
            context.clearCommitRequest();
        }

        final long commitTimeoutMs = commitStarted + workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);

        // Check for timed out commits
        if (committing && now >= commitTimeoutMs) {
            log.warn("{} Commit of offsets timed out", this);
            commitFailures++;
            committing = false;
        }

        // And process messages
        long timeoutMs = Math.max(nextCommit - now, 0);
        poll(timeoutMs);
    } catch (WakeupException we) {
        log.trace("{} Consumer woken up", this);

        if (isStopping())
            return;

        if (shouldPause()) {
            pauseAll();
            onPause();
            context.requestCommit();
        } else if (!pausedForRedelivery) {
            resumeAll();
            onResume();
        }
    }
}
/**
 * Poll for new messages with the given timeout. Should only be invoked by the worker thread.
 */
protected void poll(long timeoutMs) {
    rewind();
    long retryTimeout = context.timeout();
    if (retryTimeout > 0) {
        timeoutMs = Math.min(timeoutMs, retryTimeout);
        context.timeout(-1L);
    }

    log.trace("{} Polling consumer with timeout {} ms", this, timeoutMs);
    ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs);
    assert messageBatch.isEmpty() || msgs.isEmpty();
    log.trace("{} Polling returned {} messages", this, msgs.count());

    convertMessages(msgs);
    deliverMessages();
}
private void deliverMessages() {
    // Finally, deliver this batch to the sink
    try {
        // Since we reuse the messageBatch buffer, ensure we give the task its own copy
        log.trace("{} Delivering batch of {} messages to task", this, messageBatch.size());
        long start = time.milliseconds();
        task.put(new ArrayList<>(messageBatch));
        recordBatch(messageBatch.size());
        sinkTaskMetricsGroup.recordPut(time.milliseconds() - start);
        currentOffsets.putAll(origOffsets);
        messageBatch.clear();
        // If we had paused all consumer topic partitions to try to redeliver data, then we should resume any that
        // the task had not explicitly paused
        if (pausedForRedelivery) {
            if (!shouldPause())
                resumeAll();
            pausedForRedelivery = false;
        }
    } catch (RetriableException e) {
        log.error("{} RetriableException from SinkTask:", this, e);
        // If we're retrying a previous batch, make sure we've paused all topic partitions so we don't get new data,
        // but will still be able to poll in order to handle user-requested timeouts, keep group membership, etc.
        pausedForRedelivery = true;
        pauseAll();
        // Let this exit normally, the batch will be reprocessed on the next loop.
    } catch (Throwable t) {
        log.error("{} Task threw an uncaught and unrecoverable exception. Task is being killed and will not "
                + "recover until manually restarted. Error: {}", this, t.getMessage(), t);
        throw new ConnectException("Exiting WorkerSinkTask due to unrecoverable exception.", t);
    }
}

task.put(new ArrayList<>(messageBatch));具体实现方法为hdfssinktask中的put方法,实现如下

@Override
public void put(Collection<SinkRecord> records) throws ConnectException {
  if (log.isDebugEnabled()) {
    log.debug("Read {} records from Kafka", records.size());
  }
  try {
    hdfsWriter.write(records);
  } catch (ConnectException e) {
    throw new ConnectException(e);
  }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,776评论 6 496
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,527评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 160,361评论 0 350
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,430评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,511评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,544评论 1 293
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,561评论 3 414
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,315评论 0 270
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,763评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,070评论 2 330
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,235评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,911评论 5 338
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,554评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,173评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,424评论 1 268
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,106评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,103评论 2 352