Kafka之深入服务端

[TOC]

6.1 协议设计

在实际应用中, Kafka 经常被用作高性能、可扩展的消息中间件 。 Kafka 自定义了 一组基于 TCP 的二进制协议,只要遵守这组协议的格式,就可以向 Kafka 发送消息,也可以从 Kafka 中 拉取消息,或者做一些其他的事情,比如提交消费位移等。

在目前的 Kafka 2.0.0 中, 一共包含了 43 种协议类型,每种协议类型都有对应的请求 (Request)和响应 Response),它们都遵守特定的协议模式。每种类型的 Request 都包含相同 结构的协议请求头( RequestHeader)和不同结构的协议请求体 CRequestBody),如图 6-1 所示。


image.png

协议请求头中包含 4 个域( Field) : api key、 api_version、 correlation id 和client_id


image.png

每种类型的 Response 也包含相同结构的协议响应头( ResponseHeader)和不同结构的响应 体(ResponseBody) ,如图 6-2所示。


image.png

协议响应头中只有 一个 correlation id,对应的释义可以参考表 6-1 中 的相关描述 。

细心的读者会发现不管是在图 6-1 中还是在图 6-2 中都有类似 int32、 int16、 string 的字样, 它们用 来表示当前域的数据类型 。 Kafka 中所有协议类型的 Request 和 Response 的结构都是具 备固定格式的,并且它 们 都构建于多种基本数据类型之上 。 这些基本数据类型如图 6-2 所示。

image.png
image.png

下面就 以最常见的消息发送和消息拉取的两种协议类型做细致的讲解。首先要讲述的是消 息发送的协议类型,即 ProduceRequest/ProduceResponse,对应的 api_key= 0,表示 PRODUCE。 从Kafka建立之初, 其所支持的协议类型就一直在增加, 并且对特定的协议类型而言,内部的 组织结构也并非一成不变。 以 ProduceRequest/ ProduceResponse 为例, 截至 目前就经历了 7 个 版本(VO~V6) 的变迁。 下面就以最新版本 CV6, 即api_version=6) 的结构为例来做细致的 讲解。 ProduceRequest 的组织结构如图 6-3 所示。

image.png

除了请求头中的 4个域, 其余 ProduceRequest请求体中各个域的含义如表 6-3 所示。


image.png

在 2.2.l 节中我们了解到:消息累加器 RecordAccumulator 中的消息是以<分区, Deque< ProducerBatch>>的形式进行缓存的,之后由 Sender线程转变成<Node, List<ProducerBatch>>的 形式,针对每个 Node, Sender线程在发送消息前会将对应的 List<ProducerBatch>形式的内容转 变成 ProduceRequest 的具体结构 。 List<ProducerBatch>中 的内容首先会按照主题名称进行分类(对应 ProduceRequest 中的域 topic),然后按照分区编号进行分类(对应 ProduceRequest 中 的域 partition),分类之后的 ProducerBatch集合就对应 ProduceRequest中的域 record set。 从另 一个角度来讲 , 每个分区中的消息是顺序追加的 , 那么在客户端中按照分区归纳好之后就 可以省去在服务端 中转换的操作了 , 这样将负载的压力分摊给了客户端,从而使服务端可以专 注于它的分内之事,如此也可以提升 整体 的性能 。

image.png

除了响应头中的 correlation_id,其余 ProduceResponse各个域的含义如表 6-4所示。


image.png

我们再来了解一下拉取消息的协议类型,即 FetchRequest/FetchResponse,对应的 api_key= 1, 表示 FETCH。 截至目前, FetchRequest/FetchResponse 一共历经了 9 个版本 (VO~V8)的变迁, 下面就以最新版本 (V8)的结构为例来做细致的讲解。 FetchRequest的组织结构如图 6-5所示。

image.png

除了请求头中的 4个域,其余 FetchRequest中各个域的含义如表 6-5所示。

image.png
image.png

不管是 follower 副本还是普通的消费者客户端,如果要拉取某个分区中的消息,就需要指 定详细的拉取信息, 也就是需要设定 partit工on、 fetch offset、 log start offset 和max bytes这4个域的具体值, 那么对每个分区而言,就需要占用4B+8B+8B+4B=24B的 空间 。 一般情况下,不管是 follower 副本还是普通的消费者,它们的订阅信息是长期固定的。 也就是说, FetchRequest 中的 topics 域的内容是长期固定的,只有在拉取开始时或发生某些 异常时会有所变动 。 FetchRequest 请求是一个非常频繁的请求,如果要拉取的分区数有很多,比如有 1000个分区,那么在网络上频繁交互 FetchRequest时就会有固定的 1000×24B ~ 24KB 的字节的内容在传动,如果可以将这 24阳的状态保存起来,那么就可以节省这部分所占用的 带宽。

Kafka 从 1.1.0 版本开始针对 FetchRequest 引入了 session_id、 epoch 和 forgotten topics_data等域, session_id和epoch确定一条拉取链路的fetchsession,当session建 立或变更时会发送全量式的 FetchRequest,所谓的全量式就是指请求体中包含所有需要拉取 的 分区信息 : 当 session 稳定时则会发送增量式的 FetchRequest 请求,里面的 topics 域为空 ,因 为 topics 域的内容己经被缓存在了 session 链路的两侧。如果需要从当前 fetch session 中取消 对某些分区的拉取订阅,则可以使用 forgotten topics data 字段来实现。

这个改进在大规模(有大量的分区副本需要及时同步)的 Kafka集群中非常有用,它可以 提升集群间的网络带宽的有效使用率。不过对客户端而言效果不是那么明显,一般情况下单个 客户端不会订阅太多的分区,不过总体上这也是一个很好的优化改进。

与 FetchRequest对应的 FetchResponse 的组织结构 CV8 版本)可以参考图 6-6。

image.png

FetchResponse结构中的域也很多,它主要分为 4层,第 l 层包含 throttle time ms、 error_code、 session_id 和 responses,前面 3 个域都见过,其中 session_id 和 FetchRequest 中的 session id 对应。 responses 是一个数组类型,表示响应的具体内容, 也就是 FetchResponse 结构中的第 2 层,具体地细化到每个分区的响应。第 3 层中包含分区的元 数据信息( partition 、 error code 等)及具体的消息 内 容( record set ) aborted_transactions 和事务相关。

除了 Kafka 客户端开发人员,绝大多数的其他开发人员基本接触不到或不需要接触具体的 协议,那么我们为什么还要了解它们呢?其实,协议的具体定义可以让我们从另一个角度来了 解 Kafka 的本质 。以 PRODUCE 和 FETCH 为例,从协议 结构中就可 以看出消息 的 写入和拉取 消费都是细化到每 一个分区层级的。并且,通过了解各个协议版本变迁的细节也能够从侧面了 解 Kafka 变迁的历史,在变迁的过程中遇到 过哪方面的瓶颈, 又采取哪种优 化手段,比如 FetchRequest 中的 session_id 的引 入 。

6.2 时间轮

Kafka中存在大量的延时操作,比如延时生产、延时拉取和延时删除等。 Kafka并没有使用 JDK 自带的 Timer 或 DelayQueue 来实现延时的功能,而是基于时间轮的概念自定义实现了一个 用于延时功能的定时器( SystemTimer)。 JDK 中 Timer 和 DelayQueue 的插入和删除操作的平 均时间复杂度为 O(nlogn)并不能满足 Kafka 的高性能要求,而基于时间轮可以将插入和删除操 作的时间复杂度都降为 0(1)。 时间轮的应用并非 Kafka独有,其应用场景还有很多,在 Netty、 Akka, Quartz、 ZooKeeper 等组件中都存在时间轮的踪影 。

如图 6-7 所示, Kafka 中的时间轮( TimingWheel)是一个存储定时任务的环形队列 , 底层 采用数组实现,数组中的每个元素可以存放一个定时任务列表( TimerTaskList)。 TimerTaskList 是一个环形的双向链表,链表中的每一项表示的都是定时任务项( TimerTaskEntry),其中封装了真正的定时任务 (TimerTask) 。

时间轮由多个时间格组成, 每个时 间格代表 当前时间轮的基本时间跨度( tic灿ifs) 。时 间 轮的时间格个数是固定的,可用 wheelSize 来表示,那么整个时间轮的总体时间跨度( interval) 可以通过公式 tic灿ifs×wheelSize计算得出。 时间轮还有一个表盘指针(currentTime),用来表 示时间轮当前所处的时间, currentTime 是 tic灿ifs 的整数倍 。 currentTime 可以将整个时间轮划分 为到期部分和未到期部分, currentTime 当前指向的时间格也属于到期部分,表示刚好到期,需 要处理此时间格所对应的 TimerTaskList 中的所有任务。

image.png

若时间轮的 tic灿也为 lms 且 wheelSize 等于 20,那么可以计算得出总体时间跨度 interval 为 20msa 初始情况下表盘指针 currentTime 指向时间格 0,此时有一个定时为 2ms 的任务插进 来会存放到时间格为 2 的 TimerTaskList 中 。 随着时间的不断推移 , 指针 currentTime 不断向 前 推进,过了 2ms 之后,当到达时间格 2 时,就需要将时间格 2 对应的 TimeTaskList 中的任务进 行相应的到期操作。此时若又有一个定时为 8ms 的任务插进来,则会存放到时间格 10 中, currentTime再过 8ms后会指向时间格 10。 如果同时有一个定时为 19ms 的任务插进来怎么办? 新来的 TimerTaskEntry 会复用原来的 TimerTaskList,所以它会插入原本己经到期的时间格 l。 总之,整个时间轮的总体跨度是不变的,随着指针 currentTim巳的不断推进,当前时间轮所能处 理的时间段也在不断后移,总体时间范围在 currentTime 和 currentTime+interval 之间 。

如果此时有一个定时为 350ms 的任务该如何处理?直接扩充 wheelSize 的大小? Kafka 中不 乏几万甚至几十万毫秒的定时任务,这个 wheelSize 的扩充没有底线,就算将所有的定时任务的 到期时间都设定一个上限,比如 100 万毫秒,那么这个 wheelSize为 100 万毫秒的时间轮不仅占 用很大的内存空间,而且也会拉低效率 。 Kafka 为此引入了层级时间轮的概念,当任务的到期 时间超过了当前时间轮所表示的时间范围时,就会尝试添加到上层时间轮中 。

如图 6-8 所示,复用之前的案例,第一层的时间轮 tic灿也=lms、whee!Size=20、inte凹al=20ms。 第二层的时间轮的 tic刷s为第一层时间轮的 interval,即 20ms。 每一层时间轮的 whee!Size是固 定的,都是 20, 那么第二层的时间轮的总体时间跨度 interval 为 400ms。 以此类推,这个 400ms 也是第三层的 tickMs 的大小, 第三层的时间轮的总体时 间跨度为 8000ms。

对于之前所说的 350ms 的定时任务,显然第一层时间轮不能满足条件,所以就升级到第二 层时 间轮中, 最终被插入第二层时间轮中时间格 17 所对应的 TimerTaskList。如果此时又有一个 定时为 450ms 的任务,那么显然第二层时间轮也无法满足条件,所以又升级到第三层时间轮中, 最终被插入第三层时间轮中时间格 l 的 TimerTaskList。 注意到在到期时间为[400ms,800ms)区间
内的多个任务(比如 446ms、 455ms 和 473ms 的定时任务)都会被放入第 三层 时间轮的时间格1,时间格 I 对应的 TimerTaskList 的超时时间为 400ms。 随着时间的流逝,当此 TimerTaskList 到期之时,原本定时为 450ms 的任务还剩下 50ms 的时间,还不能执行这个任务的到期操作 。 这里就有一个时间轮 降级的操作 , 会将这个剩余时间为 50ms 的定时任务重新提交到层级时间 轮中,此时第一层时间轮的总体时间跨度不够 ,而第二层足够,所以该任务被放到第二层时 间 轮到期时间为[40ms,60ms)的时间格中。 再经历40ms之后,此时这个任务又被“察觉”,不过 还剩余 lOms,还是不能立即执行到期操作 。 所以还要再有一次时间轮的降级,此任务被添加到 第一层时间轮到期时间为[1Oms,11ms)的时间格中,之后再经历 lOms后,此任务真正到期,最 终执行相应的到期操作 。

image.png

设 计 源于生活。我 们 常见的钟表就是一种具有 三 层结构的时间轮,第一层时间轮 tic陆也=lms、 whee1Size=60、 interval=1min,此为秒钟 : 第二层 tic灿1s=lmin、 wh巳e1Size=60、 interval=1hour,此为分钟; 第三层 tickMs=1hour、 wheelSize=12、 interval=12hours,此为时钟。

6.3 延时操作

如果在使用生产者客户端发送消息 的时候将 acks 参数设置为一1,那么就意味着需要等待ISR 集合 中的所有副 本都确认收到消息之后才能 正确地收到响 应 的结 果,或者捕 获超时异常 。

如图 6-9、图 6-10 和 图 6-1l 所示,假设某个分区有 3 个副本: leader、 follower! 和 follower2, 它们都在分区的 ISR集合中。 为了简化说明,这里我们不考虑 ISR集合伸缩的情况。 Kafka在 收到客户端的生产请求(ProduceRequest)后,将消息 3和消息 4写入 leader副本的本地日志文 件 。 由于客户端设置 了 acks 为一1, 那么需要等 到 follower! 和 follower2 两个副本都收到消息 3 和消 息 4 后才能告知客户端正确地接收了所发送的消息 。 如果在 一 定 的时间内, follower! 副本 或 follower2 副本没能 够完全拉取 到消 息 3 和消息 4,那么就需要返 回超时异常给客户端 。生产 请求的超时时间由 参数 request . timeout .ms 配置,默认值为 30000,即 30s。

那么这里 等待消息 3 和消息 4 写入 followerl 副本和 follower2 副本,井返回相应的响应结 果给 客户端 的动作是由谁 来执行的呢?在将消息写入 leader 副本的本地日志文件之后, Kafka 会创建一个延时的生产操作( DelayedProduce),用来处理消息正常写入所有副本或超时的情况, 以返回相应的响应结果给客户端。

image.png

在 Kafka 中有多种延时操作,比如前面提及的延时生产,还有延时拉取( DelayedFetch)、 延时数据删除( DelayedD巳leteRecords)等 。 延时操作需要延时返回响应的结果,首先它必须有 一个超时时间( delayMs),如果在这个超时时间内 没有完成既定的任务,那么就需要强制完成 以返回响应结果给客户端 。其次 ,延时操作不同于定时操作,定时操作是指在特定时间之后执 行的操作,而延时操作可以在所设定的超时时间之前完成,所以延时操作能够支持外部事件的 触发。就延时生产操作而言,它的外部事件是所要写入消息的某个分区的 HW (高水位)发生 增长。也就是说,随着 follower副本不断地与 leader副本进行消息同步,进而促使 HW进一步 增长, HW 每增长-次都会检测是否能够完成此次延时生产操作,如果可以就执行以此返回响 应结果给客户端;如果在超时时间内始终无法完成,则强制执行 。

延时操作创建之后会被加入延时操作管理器( DelayedOperationPurgatory)来做专 门 的处理。 延时操作有可能会超时,每个延时操作管理器都会配备一个定时器( SystemTimer)来做超时管 理 , 定时器的底层就是采用时间轮( TimingWheel)实现的 。 在 6.2 节中提及时间轮的轮转是靠“收割机”线程 ExpiredOperationReap巳r来驱动的,这里的“收割机”线程就是由延时操作管理 器启动的。 也就是说,定时器、 “收割机”线程和延时操作管理器都是一一对应的。 延时操作 需要支持外部事件的触发,所以还要配备 一个监听池来负责监听每个分区的外部事件一一查看 是否有分区的 HW 发生了增长 。 另外需要补充的是,ExpiredOperationReaper 不仅可以推进时间 轮,还会定期清理监昕池中己 完成的延时操作。

图 6-12 描绘了客户端在请求写入消息到收到响应结果的过程中与延时生产操作相关的细 节, 在了解相关的概念之后应该比较容易理解: 如果客户端设置的 acks 参数不为一1,或者没 有成功的消息写入,那么就直接返回结果给客户端,否 则 就需要创建延时生产操作并存入延时 操作管理器,最终要么由外部事件触发,要么由超 时触发而执行 。

image.png

有延时生产就有延时拉取。 以图6-13为例,两个folower副本都己经拉取到了leader副本的最新位置,此时又向 leader副本发送拉取请求,而 leader副本并没有新的消息写入,那么此 时 leader 副本该如何处理呢?可以 直接返回空的拉取结果给 follower 副本,不过在 lead巳r 副本一直没有 新消息写入的情况下follower 副本会一直发送拉取请求,井且总收到空的拉取结果,这样徒耗资源,显然不太合理 。

image.png

Kafka 选择了延时 操作来处理这种情况。 Kafka 在处理拉取请求时,会先读取一次日志文件 , 如果收集不到足够多fetchMinBytes,由参数 fetch.mi口.bytes 配置,默认值为 l)的消息, 那么就会创建一个延时拉取操作( DelayedFetch) 以等待拉取到足够数量 的消息 。当延 时拉取操 作执行时,会再读取一次 日志文件,然后将拉取结果返回给 follower 副本。 延时拉取操作也会 有一个专门的延时操作管理器负责管理,大体的脉络与延时生产操作相同,不再赘述。 如果拉 取进度一直没有追赶上 leader副本,那么在拉取 leader副本的消息时一般拉取的消息大小都会 不小于 fetc出1inBytes,这样 Kafka也就不会创建相应的延时拉取操作, 而是立即返回拉取结果。

延时拉取操作同样是由超时触发或外部事件触发而被执行的。 超时触发很好理解,就是等 到超时时间之后触发第 二次读取 日志文件的操作 。外部事件触发就稍复杂了一些,因为拉取请 求不单单 由 follower 副本发起 ,也可以由消费者客户端发起,两种情况所对应的外部事件也是 不同的。如果是 follower 副本的延时拉取,它的外部事件就是消息追加到了 leader 副本的本地日志文件中 :如果是消费者客户端的延时拉取,它的外部事件可以简单地理解为 HW 的增长。

目前版本的 Kafka 压引入了事务的概念,对于消费者或 follower 副本而言 ,其默认的事务 隔离级 别为 “read_uncommitted” 。 不过消费者可以通过客户端参数 isolation . level 将事 务隔离级 别设置为“ read_committed" (注意: follower 副本不可以将事务隔离级别修改为这个 值〉,这样消费者拉取不到生产者已经写 入却尚未提交的消息 。 对应的消费者的延时拉取 , 它 的外部事件实际上会切换为由LSO (LastStableOffset)的增长来触发。 LSO是HW之前除去未 提交的事务消息的最大偏移量, LSO运HW,

6.4 控制器

在 Kafka 集群中会有一个或多个 broker,其中有一个 broker 会被选举为控制器( Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的 leader 副本出现故 障时,由控制器负责为该分区选举新的 leader副本。当检测到某个分区的 ISR集合发生变化时, 由控制器负责通知所有 broker更新其元数据信息。当使用 kafka-topics.sh 脚本为某个 topic 增加分区数量时,同样还是由控制器负责分区的重新分配 。

6.4.1 控制器的选举及异常恢复

Kafka 中的控制器选举工作依赖于 ZooKeeper,成功竞选为控制器的 broker会在 ZooKeeper中创建/ controller 这个临时( EPHEMERAL)节点,此临时节点的内容参考如下 :

{ ” version ” : 1 ,” brokerid ”: 0 , ”timestamp” · ” 1 5 2 9 2 1 0 2 7 8 9 8 8 ” }

其中version在目前版本中固定为1, broker工d表示成为控制器的broker的id编号, tim e stamp 表示竞选成为控制器时的时间戳。

在任意时刻,集群中有且仅有一个控制器。每个 broker 启动的时候会去尝试读取 /controller 节点的 brokerid 的值,如果读取到 brokerid 的值不为一l,则表示己经有其 他 broker 节 点成功竞选为控制器,所以当前 broker 就会放弃竞选;如果 ZooKeeper 中不存在 /controller 节点,或者这个节点中的数据异常,那么就会尝试去创建/ controller 节点。 当前 broker 去创建节点的时候,也有可能其他 broker 同时去尝试创建这个节点,只有创建成功 的那个 broker 才会成为控制 器,而创建失败的 broker 竞选失败 。 每个 broker 都会在内存中保存 当前控制器的 brokerid 值,这个值可以标识为 activeControllerld。

ZooKeeper 中还有一个与控制器有关的/ controller_epoch 节点,这个节点是持久 (PERSISTENT)节点,节点中存放的是一个整型的 controller epoch 值。 controller
epoch 用于记录控制器发生变更的次数,即记录当前的控制器是第几代控制器,我们也可以称 之为“控制器的纪元”。

controller epoch 的初始值为 l,即集群中第一个控制器的纪元为 l,当控制器发生变更 时,每选出一个新的控制器就将该字段值加 1。每个和控制器交互的请求都会携带 controller epoch 这个宇段,如果请求的 controller_epoch 值小于内存中的 controller_epoch值, 则认为这个请求是向己经过期的控制器所发送的请求,那么这个请求会被认定为无效的请求。 如果请求的 controller epoch 值大于内存中的 controller_epoch 值,那么说明 己经有 新的控制器当选了 。 由此可见, Kafka 通过 controller epoch 来保证控制器的唯一性,进而保证相 关操作 的一致性。

具备控制器身份的broker需要比其他普通的broker多一份职责, 具体细节如下:

  • 监听分区相关的变化。为 ZooKeeper 中的/admin/reassign partitions 节点注 册 PartitionReassignmentHandler, 用 来 处 理分区重分 配的 动 作 。 为 ZooKeeper 中的 /工sr_change_not工f工cat工on 节点注册 IsrChangeNotificetionHandler,用来处理 ISR 集合变更 的动作 。 为 ZooKeeper 中的 /admin/preferred-replica-election 节 点添加 PreferredReplicaElectionHandler,用来处理优先副本 的选举动作。
  • 监听 主题 相 关 的 变 化 。为 ZooKeeper 中的 /brokers/topics 节 点添 加 TopicChangeHandl町, 用来 处 理主题增减 的 变 化: 为 ZooKeeper 中 的 /admin/ de l e t e topics 节点添加 TopicDeletionHandler,用来处理删 除主题 的动作。
  • 监听 broker相关的变化。为 ZooKeeper中的/brokers/ids 节点添加 BrokerChangeHandler, 用来处理 broker增减的变化。
  • 从 ZooKeeper 中读取获取当前所有与主题、分区及 broker 有关的信息并进行相应的管 理。 对所 有主题 对 应 的 ZooKeeper 中的 /brokers/topics/<topic>节 点添 加 PartitionModificationsHandler, 用来监听主题中的分区分配变化 。
  • 启动并管理分区状态机和副本状态机。
  • 如果参数 auto.leader.rebalance.enable 设置为 true,则还会开启一个名为 “auto-leader-rebalance-task” 的定时任务来负责维护分区的优先副本的均衡。

控制器在选举成功之后会读取 ZooKeeper 中各个节点的数据来初始化上下文信息 (ControllerContext),并且需要管理这些上下文信息。 比如为某个主题增加了若干分区 , 控制 器在负责创建这些分区的同 时要更新上下文信息 , 并且需要将这些变更信息 同步到其他普通的 broker 节点中。不管是监听器触发的事件,还是定时任务触发的事件,或者是其他事件( 比如 ControlledShutdown, 具体可以参考 6.4.2 节)都会读取或更新控制器中的上下文信息, 那么这 样就会涉及多线程间的同步 。 如果单纯使用锁机制来实现 , 那么整体的性能会大打折扣 。针对 这一现象, Kafka 的控制器使用单线程基于事件队列的模型, 将每个事件都做一层封装, 然后 按照事 件 发生 的 先后顺序暂存 到 LinkedB!ockingQueue 中 ,最后使 用 一个专 用的 线程 (ControllerEventThread)按照 FIFO (FirstInputFirstOutput,先入先出)的原则顺序序处理各个
事件,这样不需要锁机制就可以在多线程间维护线程安全, 具体可以参考图 6-140

在 Kafka 的早期版本中,并没有采用 Kafka Controler 这样一个概念来对分区和副本的状态 进行管理,而是依赖于 ZooKeeper, 每个 broker都会在 ZooKeeper上为分区和副本注册大量的 监昕器( Watcher) 。当 分区或副本状态变化 时 ,会唤醒很多不必要的监昕器,这种严重依赖ZooKeeper 的设计会有脑裂、羊群效应 ,以及造成 ZooKeeper 过载的隐患( 旧版的消费者客户 端存在同样的问题, 详 细内 容参考 7.2.1 节) 。 在目前的新版本的设计中,只有 Kafka Controller 在 ZooKeeper 上注册相应的监昕器,其 他的 broker 极少需要再监 听 ZooKeeper 中的 数据变化 , 这样省去了很多不必要的麻烦。不过每个 broker还是会对/controller 节点添加监听器, 以 此来监 昕此节点的 数据变化 (ControllerCbangeHandler) 。

image.png

当/controller 节点的数据发生变化时, 每个 broker 都会更新自身内存中保存的 activeControllerld。 如果 broker 在数据变更前是控制器,在数据变更后自身的 brokerid 值与 新的 activeControllerld 值不一致,那么就需要“退位” , 关闭相应的资源,比如关闭状态机、 注销相应的监听器等 。 有可能控制器由于异常而下线,造成/ controller 这个临时节点被自 动删除 ; 也有可能是其他原因将此节点删除了 。

当/controller 节点被删除时,每个 broker都会进行选举,如果 broker在节点被删除前 是控制器,那么在选举前还需要有 一个“退位”的动作 。 如果有特殊需要 ,则可以手 动删除 /controller 节点来触发新 一轮的选举 。 当然关 闭控制器所对应 的 broker,以 及手动 向 /controller 节点写入新的 brokerid 的所对应的数据,同样可 以触发新一轮的选举 。

6.4.2 优雅关闭

如何优雅地关闭 Kafka?笔者在做测试的时候经常性使用 jps (或者 ps ax)配合 kill -9 的方式来快速 关闭 Kafka broker 的服务进程,显然 kill -9 这种 “强杀”的方式并不够优雅, 它并不会等待 Kafka 进程合理关闭一些资源及保存一些运行数据之后再实施关闭动作。在有些 场景中,用户希望主动关闭正常运行的服务,比如更换硬件、操作系统升级、修改 Kafka 配置 等。如果依然使用上述方式关闭就略显粗暴 。

那么合理的操作应该是什么呢? Kafka 自身提供了 一 个脚本工具,就是存放在其 bin 目录 下的 kafka-server-stop . sh,这个脚本的内容非常简单,具体内容如下:

PIDS=♀(ps ax I grep -i ’kafka\.Kafka’ I grep java I grep -v grep I awk ’(print $1)’)
if [ -z "♀PIDS” ] ; then
echo ”No kafka server to stop” exit 1
else
kill -s TERM ♀PIDS fi

可以看出 kafka-server stop.sh 首先通过 ps ax 的方式找出正在运行 Kafka 的进程 号 PIDS,然后使用 kill -s TERM $PIDS 的方式来关闭。 但是这个脚本在很多时候并不奏 效,这一点与ps命令有关系。 在Linux操作系统中, ps命令限制输出的字符数不得超过页大 小 PAGE_SIZE, 一般 CPU 的内存管理单元(Memory Management Unit,简称 MMU)的 PAGE_SIZE 为 4096。 也就是说, ps 命令的输出的字符串长度限制在 4096 内,这会有什么问 题呢?我们使用 ps ax 命 令来输出与 Kafka 进程相 关的信息,如图 6-15 所示 。


image.png

细心的读者可以留 意到 白色部分中的信息并没有打印全,因为己经达到了 4096 的字符数的 限制。 而且打印的信息里面也没有 kafka-server-stop.sh 中 ps ax I grep -i ’ kafka \ . Kafka ’所需要 的“ kafka.Kafka,,这个关键字段,因为这个关键字段在 4096 个字 符的范围之外。与 Kafka 进程有关的输出信息太长,所以 kafka-server-stop . sh 脚本在很
多情况 下并不 会奏效。

注意要点:Kafak服务启动的入口叫Kafka.Kafka scala语言写的object

那么怎么解决这种问题呢?我们先来看一下 ps 命令的相关源码(Linux 2.6.x 源码的/fs/proc/base.c 文件中的部分 内容):

image.png

我们可以看到 ps 的输出长度 len 被硬编码成小于等于 PAGE SIZE 的大小,那么我们调 大这个 PAGE SIZE 的大小不就可以了吗?这样是肯定行不通的,因为对于一个 CPU来说,它 的 MMU 的页大小 PAGE SIZE 的值是固定的,无法通过参数调节。 要想改变 PAGE SIZE 的 大小,就必须更换成相应的 CPU,显然这也太过于“兴师动众”了 。还有一种办法是 ,将上面 代码中的 PAGE SIZE 换成一个更大的其他值,然后 重新编译,这个办法对于大多数人来说不 太适用, 需要掌握一定深度的Linux的相关知识。

那么 有没有 其他的办法呢?这里我们可以 直接修改 kafka-server-stop.sh 脚本的内 容,将其中的第一行命 令修改 如下:

PIDS=$(ps ax I grep -i ’kafka’ I grep java I grep -v grep I awk ’ {print $1)’)

即把“\ .Kafka”去掉,这样在绝大多数情况下是可以奏效的。如果有极端情况,即使这 样 也不能 关 闭,那么只 需要按 照以下两个步骤就可以优雅地关闭 Kafka 的服务进程:

(1 )获取 Kafka 的服务进程号 PIDS。 可以使用 Java 中的 jps 命令或使用 Linux 系统中 的 ps 命令来查看。

(2)使用kill -s TERM ♀PIDS或kill 15 ♀PIDS的方式来关闭进程,注意千万 不要使用 kill 斗 的方式。

为什么这样关闭的方式会是优雅的? Kafka 服务入口程序中有一个名为“ kafka-shutdown­ hock”的关闭钩子 , 待 Kafka 进程捕获终止信号的时候会执行这个关闭钩子中的内容,其中除 了正常关闭一些必要的资源,还会执行一 个 控制关闭( ControlledShutdown)的 动 作 。 使用 ControlledShutdown的方式关闭 Kafka有两个优点: 一是可以让消息完全同步到磁盘上,在服务 下次重新上线时不需要进行日志的恢复操作 ; 二是 ControllerShutdown 在关 闭服务之前,会对 其上的 leader 副本进行迁移,这样就可以减少分区的不可用时间 。

若要成功执行 Co由olledShutdown 动作还需要有一个先决条件, 就是参数 controlled. shutdown.enable 的值需要设置为 true,不过这个参数的默认值就为 true,即默认开始此 项功能 。 ControlledShutdown 动作如果执行不成功还会重试执行,这个重试的动作由参数 controlled.shutdown.max.retries 配置,默认为 3 次, 每次重试的间隔由参数 controlled . shutdown . retry .backoff .ms 设置,默认为 5000ms

下面我们具体探讨 ControlledShutdown 的整个执行过程。

参考图 ι16, 假设此时有两个 broker,其中待 关闭的 brok町 的 id 为 x, Kafka 控制器所对 应 的 broker 的 id 为 y。待关 闭的 broker 在执行 ControlledShutdown 动 作时 首先与 Kafka 控 制器 建立专用连接(对应图 6-16 中的步骤1) , 然后发送 ControlledShutdownRequest 请求, ControlledShutdownRequest 请求中只有一个 brokerld 字段, 这个 brokerld 字段的值设置为自身 的brokerId的值,即x (对应图6-16中的步骤2) 。
Kafka 控制 器在收到 ControlledShutdownRequest 请求之后会将与待关 闭 broker 有关联 的所 有分区进行专 门 的处理,这里的“有关联”是指分区中有副本位于这个待 关 闭的 broker 之上 (这 里会涉及 Kafka控制器与待关闭 broker之间的多次交互动作,涉及 leader副本的迁移和副本的 关闭动作,对应图 6-16 中的步骤3〉。


image.png

如果这些分区的副本数大于 1 且 leader副本位于待关闭 broker上,那么需要实施 leader副 本的迁移及新的 ISR 的 变更。具体的选举分配的方案由专用的选举器 ControlledShutdown­ LeaderSelector提供

如果这些分区的副本数只是大于 1, leader 副本并不位于待关闭 broker 上,那么就由 Kafka 控制器来指导这些副本的 关闭 。 如果这些分区的副本数只是为 1, 那么这个副本的关闭动作会 在整个 ControlledShutdown 动作执行之后由副本管理器来具体实施 。

对于分区的副本数大于 l 且 leader 副本位于待关闭 broker 上的这种情况,如果在 Kafka 控 制器处理之后 leader 副本还没有成功迁移,那么会将这些没有成功迁移 leader 副本的分区记录 下来,并且写入 ControlledShutdownResponse 的响应(对应图 6-16 中的步骤4,整个ControlledShutdown 动作是 一个同步阻塞的过程) 。ControlledShutdownResponse 的结构如图 6-18 所示。


image.png

待关闭的 broker 在收到 ControlledShutdownResponse 响应之后,需要判断整个 Con位olledShu创own 动作是否执行成功,以此来进行可能的 重试或继续 执行接下来的关闭 资源 的动作 。 执行成功的 标准是 Con位olledShutdownResponse 中 error_code 字段值为 0,并且 partitions remaining 数组字段为空。

在了解了整个 ControlledShutdown 动作的具体细节之后,我们不难看出这一切实质上都是 由 ControlledShutdownRequest请求引发的,我们完全可以自己开发一个程序来连接 Kafka控制 器,以此来模拟对某个 broker 实施 ControlledShutdown 的动作。为了实现方便,我们可以对 KafkaAdminC!ient 做 一些扩展来达到目的。

6.4.3 分区 leader 的选举

分区 leader副本的选举由控制器负责具体实施。当创建分区(创建主题或增加分区都有创 建分区的动作〉或分区上线(比如分区中原先的 leader 副本下线,此时分区需要选举一个新的 leader 上 线来对外提供服务)的时候都需要执行 leader 的选举动作,对应的选举策略为 OftlinePartitionLeaderElectionStrategy。 这种策略的基本思路是按照 AR 集合中副本的顺序查找 第一个存活的副本,并且这个副本在 JSR集合中。 一个分区的 AR集合在分配的时候就被指定, 并且只要不发生重分配的情况,集合内部副本的顺序是保持不变的,而分区的 ISR 集合中副本 的顺序可能会改变 。

注意这里是根据AR的顺序而不是ISR的顺序进行选举的。举个例子, 集群中有3个节点: brokerO、 brokerl 和 broker2, 在某一时刻具有 3个分区且副本因子为 3 的主题 topic扣ader的具 体信息如下 :


image.png

如 果 ISR 集合中 没有可用的副本 , 那么此时还要再检查一下所配置的 unclean .leader . e l e c t i o n .四 able 参数(默认值为 false) 。 如果这个参数配置为 true,那么表示允许从非 ISR 列表中 的选举 leader,从 AR 列表中找到 第一个存活的副本 即为 leader。

当分区进行重分配(可以先回顾一下 4.3.2节的内容)的时候也需要执行 leader的选举动作,对应的选举策略为 ReassignPartiti。此eaderElectionStrategy。这个选举策略 的思路 比较简单 : 从
重分配的 AR 列表中找到第 一个存活的副本,且这个副本在目前的 ISR 列表 中 。

还有 一 种情况会发生 leader 的选举,当某节点被优雅地关 闭 ( 也 就是 执 行 ControlledShutdown)时,位于这个节点上的 lead巳r副本都会下线,所以与此对应的分区需要执 行 leader 的选举。与此对应 的选举策略( ControlledShutdownPartitionLeaderElectionStrategy)为 : 从 AR 列表中找到第一个存活的副本,且这个副本在目前的 ISR列表中,与此同时还要确保这 个副本不处于正在被关闭的节点上 。

6.5 参数解密

如果 broker端没有显式配置 listeners (或 advertised. listeners)使用 IP地址, 那么最好将 bootstrap.server 配置成主机名而不要使用 IP 地址,因为 Kafka 内部使用的是 全称域名(FullyQualifiedDomainName) 。 如果不统一, 则会出现无法获取元数据的异常。

6.5.1 broker.id

broker . id 是 broker 在启动之前必须设定 的参数之一,在 Kafka 集群 中 ,每个 broker 都 有唯一的 id (也可以记作 brokerld)值用来区分彼此。 broker 在启动时会在 ZooKeeper 中的 /brokers/ids 路径下创建一个以当前 brokerId为名称的虚节点, broker 的健康状态检查就依 赖于此虚节点。当 broker 下线时,该虚节点会自动删除,其他 broker 节点或客户端通过判断 /brokers/ids 路径下是否有此 broker 的 brokerld 节点来确定该 broker 的健康状态。

可以通过 broker 端的配置文件 config/server.properties 里的 broker . id 参数来配置 brokerid, 默认情况下broker.id值为 l。在Kafka中, brokerld值必须大于等于0才有可能 正常启动,但这里并不是只能通过配置文件 config/server.properties 来设定这个值,还可以通过 meta.properties 文件或 自动生成功能来实现。

首先了解一下 meta.properties 文件, meta.properties 文件中的内容参考 如下:

#Sun May 27 23:03:04 CST 2018 
version=O
broker.id=O

meta.properties文件中记录了与当前 Kafka版本对应的一个 version字段,不过目前只有一个为0的固定值。还有一个broker.id,即brokerid值。 broker在成功启动之后在每个日志根
目录下都会有一个 meta.properties 文件 。

m巳ta.properties 文件与 broker . id 的关联如下 :

Cl)如果 log.d工r 或 log.d工rs 中配置了多个日志根目录,这些日志根目录中的 meta.properties 文件所配置的 broker . id 不一致则会抛出 InconsistentBrokerldException 的 异常。
(2)如果 config/server.pr叩erties配置文件里配置的 broker.工d的值和 meta.properties文 件里的 broker . 工d 值不 一致 ,那么同样会抛出 InconsistentBrokerldException 的 异常 。
( 3 )如 果 config/server.properties 配置文件中井未配置 broker .工d 的值,那么就以 meta.properties文件中的 broker. id值为准。
(4)如果没有 meta.properties文件,那么在获取合适的 broker. id值之后会创建一个新 的 meta.prop巳rties文件并将 broker.id值存入其中。
如果 config/server.properties 配置文件中并未配置 broker. id,并且日志根目录中也没有 任何 meta.properties 文件( 比如第-次启动 时 ) ,那么应该如何 处理呢 ?
Kafka 还提供 了另外两个 broker 端参数 : broker. id.generatio口.enable 和 reserved. broker.max.id来配合生成新的 brokerId。broker. id.geηeratio口.enable 参数用来配置是否开启自动生成 brokerId 的功能,默认情况下为廿ue, 即开启此功能 。自 动生 成的 brokerId 有一个基准值,即自动生成的 brokerId 必 须超过这个基准值,这个基准值通过 reserverd .broker.max . id 参数配置,默认值为 1000。 也就是说,默认情况下自动生成的 brokerId 从 1001 开始 。

自动生成的 brokerId的原理是先往 ZooKeeper中的/brokers/seqid节点中写入一个空宇 符串 ,然后获取返回的 Stat 信 息中 的 version 值 ,进而将 version 的值和 reserved.broker .max . id 参数配置 的值相加。先往节点中 写入数据再获取 Stat 信息 , 这样 可以确保返回的 version 值大于 0 ,进而 就可 以确 保生 成的 brokerId 值大于 reserved.broker.max.id 参数 配置的值,符合非自动生成的 broker .id 的 值在 [O, reserved.broker.max.id]区间设定。
初始化时 ZooKeeper 中 /brokers/seq工d 节 点 的状态如下 :

[zk: xxx.xxx.xxx.xxx:2181/kafka(CONNECTED) 6] get /brokers/seqid null
cZxid = Ox200001b2b
ctime =Mon Nov 13 17:39:54 CST 2018
mZx 工d = Ox20000lb2b
 mtime = Mon Nov 13 17: 39 :54 CST 2018 pZxid = Ox20000lb2b
cversion = 0
dataV ersion = 0
aclV ersion = 0 ephemeralOwner = OxO dataLength = O numChildren = 0

可以看到 dataVersion=O,这个就是前面所说的version,在插入一个空字符串之后,dataVersio就自增1表示数据发生了变更, 这样通过 ZooKeeper 的这个功能来实现集群层面的序号递增,整体上相当于一个发号器 。

[zk: xxx.xxx.xxx.xxx:2181/kafka(CONNECTED) 7] set /brokers/seqid ”” cZxid = Ox200001b2b
ctime =Mon Nov 13 17:39:54 CST 2017
mZxid = Ox2000e6eb2
mtime = Mo口 May 28 18:19 : 03 CST 2018 pZxid = Ox200001b2b
cversion = 0
dataV ersion = 1
aclV ersion = 0 ephemeralOwner = OxO dataLength = 2 numChildren = 0

大多数情况下我们一般通过井且习惯于用最普通的 config/server.properties 配置文件的方式 来设定 brokerld 的值,如果知晓其中的细枝末节,那么在遇到诸如 InconsistentBrokerldException 异常时就可以处理得游刃有余,也可以通过自动生成 brokerId 的功能来实现一些另类的功能 。

6.5.2 bootstrap.servers

bootstrap.servers 不仅是 Kafka Producer、 Kafka Consumer 客户端 中的必备 参数 ,而 且在 KafkaConnect、 KafkaStreams和 KafkaAdminClient中都有涉及, 是一个至关重要的参数。

如果你使用过旧版的生产者或旧版的消费者客户端,那么你可能还会对 bootstrap . servers 相关的另外两个参数 metada .broker .list 和 zookeeper.connect 有些许印象,这3个参数也见证了 Kafka 的升级变迁。

我们一般可以简单地认为 bootstrap.servers 这个参数所要指定的就是将要连接的Kafka集群的 broker地址列表。不过从深层次的意义上来讲,这个参数配置的是用来发现 Kafka 集群元数据信息的服务地址。为了更加形象地说明问题,我们先来看一下图 6-19。

image.png

客户端 KafkaProducer1 与 Kafka Cluster 直连,这是客户端给我们的既定印象,而事实上客户端连接 Kafka集群要经历以下3个程,如图 6-19 中的右边所示。

Cl)客户端 KafkaProducer2 与 bootstrap.servers 参数所指定的 Server连接,井发送MetadataRequest 请求来获取集群的元数据信息 。

(2) Server在收到 MetadataRequest请求之后,返回 MetadataResponse给 KafkaProducer2,在 MetadataResponse 中包含了集群的元数据信息。

(3)客户端 KafkaProducer2 收到的 MetadataResponse 之后解析出其中包含的集群元数据信息,然后与集群中的各个节点建立连接,之后就可以发送消息了。

在绝大多数情况下, Kafka 本身就扮演着第一步和第二步中的 Server 角色,我们完全可以 将这个 Server 的角色从 Kafka 中剥离出来。我们可以在这个 Server 的角色上大做文章,比如添 加一些路由的功能、负载均衡的功能 。

下面演示如何将 Server 的角色与 Kafka 分开。默认情况下,客户端从 Kafka 中的 某个节点 来拉取集群的元数据信息,我们可以将所拉取的元数据信息复制一份存放到 Server 中,然后对 外提供这份副本的内容信 息。

由此可见,我们首先需要做的就是获取集群信息的副本,可以在 Kafka 的 org.apache.kafka. comrnon.request.M巳tadataResponse 的构造函数中嵌入代码来复制信息, MetadataResponse 的构造 函数如下所示。

public MetadataResponse(int throttleTimeMs , List<Node> brokers , St ring clusterid, 工nt controllerid,
 L工st<Top工cMetadata> topicMetadata) { this.throttleTimeMs = throttleTimeMs;
this.brokers =brokers ;
this .controller= getControllerNode(controllerid, brokers) ; this.topicMetadata = topicMetadata;
this.clusterid = clusterid;
//客户端在获取集群的元数据之后会调用 这个构造函数,所以在这里嵌入代码将 5 个成
//员变量的值保存起来,为后面的 Server 提供 内容

获取集群元数据的副本之后,我们就可以实现一个服务程序来接收 MetadataRequest请求和 MetadataResponse,从零开始构建 一个这样的服务程序也需要不少的工作量 , 需要实现对 MetadataRequest与 MetadataResponse相关协议解析和包装,这里不妨再修改一下 Kafka 的代码,返回让其只提供 Server相关的内容。整个示例的架构如图 6-20所示。

image.png

为了演示方便, 图 6-20 中的 Kafka Clusterl 和 Kafka Cluster2都只包含一个 broker节点。 Kafka Clusterl 扮演的是 Se凹er 的角色,下面我们修改它的代码让其返回 Kafka Cluster2 的集群 元数据信息 。假设我们己经通过前面一步的操作获取了 Kafka Cluster2 的集群元数据信息,在 Kafka Clusterl 中将这份副本回放。

修改完 Kafka ClusterI 的代码之后我们将它和 Kafka Cluster2 都启动起来,然后创建一个生 产者 KafkaProducer 来持续发送消息,这个 KafkaProducer 中的 bootstrap . servers 参数配 置为 Kafka Cluster!的服务地址。 我们再创建一个消费者 KafkaConsumer 来持续消费消息,这 个 KafkaConsumer 中的 bootstrap . servers 参数配置为 Kafka Cluster2 的服务地址 。

实验证明 , KatkaP1oducer 中发送的消息都流入 Kafka Cluster2 并被 KafkaConsumer 消费。 查看 Kafka Cluster1 中的日志文件, 发现并没有消息流入。 如果此时我们再关闭 Kafka Cluster1 的服务,会发现 KafkaProducer和 KafkaConsumer都运行完好,已经完全没有 KafkaCluster! 的 任何事情了 。

这里只是为了讲解 bootstrap.servers 参数所代表的真正含义而做的一些示例演示, 笔者并不建议在真实应用中像示例中的一样分离 出 Server 的角色 。

在旧版的生产者客户端(Scala 版本)中还没有 bootstrap . servers 这个参数 , 与此对 应的是 metadata.broker. list参数。metadata.broker. list这个参数很直观,metadata 表示元数据, broker.list表示 broker的地址列表, 从取名我们可以看出这个参数很直接地表示所 要连接 的 Kafka broker 的地址,以此 获取元数据。 而 新版 的 生产者客户端 中的bootstrap.servers 参数的取名显然更有内涵,可以直观地翻译为“引导程序的服务地址” , 这样在取名上就多了一层 “代理”的空间,让人可以遐想出 Server角色与 Ka仅a分离的可能。 在旧版的消费者客户端( Scala版本)中也没有 bootstrap. servers 这个参数,与此对应的 是 zookeeper . connect 参数,意为通过 ZooKeeper 来建立消费连接。

很多读者从 0.8.x 版本开始沿用到现在的 2.0.0 版本, 对于版本变迁的客户端中出现的bootstrap.servers、 metadata.broker.list、 zookeeper.connect 参数往往不是 很清楚。这一现象还存在 Kafka 所提供的诸多脚本之中,在这些脚本中连接 Kafka 采用的选项 参数有 一 bootstrap-server、 --broker-list 和一一zookeeper (分别与前面的 3 个参数 对应〕,这让很 多 Kafka 的老手也很难分 辨哪个脚本该用哪个选项参数。

--bootstrap-server 是一个逐渐盛行的选项参数,这一点毋庸置疑。而一broker-list 己经被淘汰,但在 2.0.0 版本中还没有完全被摒弃,在 kafka-console-producer.sh脚本中还是使用 的这个选项参数,在后 续的 Kafka 版本中 可能会被替代为 --bootstrap-server 。 一 zookeeper 这个边项参数也逐渐被替代,在目前的 2.0.0 版本中, kafka-console-consumer.sh 中已经完全没有了它的 影子,但并不意味着这个参数在其 他脚 本中 也被摒弃了 。在 kafka-topics.sh脚本 中还是使用的一 zookeeper 这个选项参数,并且在未来的可期版本中也不 见得会被替换,因为 kafka-topics.sh脚本实际上操纵的就是 ZooKeeper 中的节点, 而不是 Kafka 本身,它并没有被替代的必要。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,723评论 6 481
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,485评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 152,998评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,323评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,355评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,079评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,389评论 3 400
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,019评论 0 259
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,519评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,971评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,100评论 1 333
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,738评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,293评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,289评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,517评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,547评论 2 354
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,834评论 2 345