parseOffsetSpec 根据 time 配置解析出获取 offset 的方式,earliest、lastest或者指定的时间戳
然后通过 KafkaAdminClient.listOffsets 方法获取 offset,通过 getListOffsetsCalls 构建了获取 offset 的调用
getListOffsetsCalls 首先构建 leader 节点和 topic 的关系
然后构建了对于 leader broker 的 ListOffset 的调用
调用会被放到 AdminClientRunnable 做异步请求,在 processRequests 方法中被处理
生成 request 通过 NetworkClinet.send 发送到对应的节点
访问的 ApiKey 为 LIST_OFFSETS
调用 selector.send 根据目的地 id 打开对应的 channel 发送请求
对于请求的处理在 KafkaApis.handleListOffsetRequest 中,0.x和1.x以上版本有不同的处理方式
对于 0.x 版本,调用 ReplicaManager.legacyFetchOffsetsForTimestamp 获取 offset
调用 Partition.legacyFetchOffsetsForTimestamp,从 local log file 获取小于制定时间的全部offset,然后把 大于 highWatermark 的 offset 丢弃后返回结果
对于 1.x 及以上版本,,调用 ReplicaManager.fetchOffsetForTimestamp 获取 offset
最后调用 Partition.fetchOffsetForTimestamp,通过 logManager 获取对应 partition 的 offset