consumer除里指定的4个必要参数(bootstrap.servers、group.id、key.deserializer、value.deserializer)外,Java版本的consumer还提供了很多其他非常重要的参数:
- session.timeout.ms:用于consumer group检测组内成员发送崩溃的时间。在0.10.1.0版本之前,此参数还存在另一种含义:consumer消息处理逻辑的最大时间。
- max.poll.interval.ms:用于设置消息处理逻辑的最大时间。在0.10.1.0版本前是没有这个参数的,该参数的含义和session.timeout.ms是合并了的。
- auto.offset.reset:该参数主要有3种取值(earliest、latest、none)。指定了无位移信息或位移越界(即consumer要消费的消息的位移不再当前消息日志的合理范围内)时kafka的应对策略。这里的无位移信息或位移越界,只有满足这两个条件中的任意一个时,该参数才有效。例如:假设是首次运行一个consumer group并指定了从头开始消费(earliest)。显然该group会从头消费所有数据,因为此时group还没有发现任何位移信息。一旦group成功提交位移信息后,重启了group,依然指定从头消费,此时会发现该group并不会真正从头开始消费—因为kafka已经保存了该group的位移信息,它会忽略auto.offset.reset的设置。
earliest:指定从最早的位移开始消费。注意这里最早的位移不一定就是0。
latest:指定从最新处位移开始消费。
none:指定如果未发现位移信息或位移越界,则抛出异常。 - enable.auto.commit:该参数指定consumer是否自动提交位移。若设置为true,则consumer会在后台自动提交位移;否则,用户需要手动提交位移。
- fetch.max.bytes:指定consumer端单次获取数据的最大字节数。
- max.poll.records:该参数控制单次poll调用返回的最大消息数。默认值为500条。
- heartbeat.interval.ms:该参数控制coordinator决定开启新一轮rebalance时,它会将这个决定以REBALANCE_IN_PROGRESS异常的形式“塞进”consumer心跳请求的response中,这样其他成员才能拿到response后知道它需要重新加入group。显然这个过程是越开越好。这个参数就是干这个事情的。推荐设置为一个比较低的值,让group下的其他consumer成员能够更快地感知新一轮rebalance开启了。注意,该值必须要小于session.timeout.ms,因为如果consumer 在session.timeout.ms这段时间内未发送心跳,coordinator就会认为它已经dead了,就没必要让它直销coordinator的决定了。
connections.max.idle.ms:该参数用于控制关键空闲连接的时间,默认是9分钟,即会关闭9分钟都无消息的空闲连接,下次会再次进行Socket连接,如果不在乎这些Socket资源开销,比较推荐设置该参数数值为-1,即永远都不关闭这些空闲连接。