kafka的三层消息架构

第一层,主题层。每个主题可以配置M个分区,而每个分区又可以配置N个副本。

第二层,分区层。每个分区的N个副本中只能有一个充当领导者角色,对外提供服务;其他N减1个副本是追随者副本,只提供数据冗余。

第三层,消息层。分区中包含若干条消息,每条消息的位移从0开始,依次递增。

最后,客户端程序只能和分区的领导者副本进行交互。

kafka分区和副本有啥区别和联系?

副本是在分区这个层级定义的。每个分区下可以配置若干个副本,其中只能有1个领导者副本和N减1个追随者副本。

kafka线上集群部署方案怎么做?

磁盘容量。假设有业务每天要向kafka集群发送1亿条消息,每条消息保存两份防止丢失,另外消息默认保存两周时间。现假设消息平均大小是1KB,那预估一下磁盘空间。总的空间大小等于1亿乘以1KB乘以2除以1000除以1000等于200GB。一般情况kafka集群除了消息数据,还要存储索引数据,为这些数据预留10%,也就是220GB。要保存两周,也就是220GB乘以14即3TB。kafka支持数据压缩,假设压缩比是0.75,则最后的存储空间是2.25TB。总结一下,需要考虑的因素:新增消息数、消息留存时间、平均消息大小、备份数、是否启用压缩。

kafka幂等生产者和事务生产者区别?

设置生产者的enable idempotence参数为true,kafka帮我们做消息幂等,但他只保证某个主题一个分区上的幂等性。其次,他只能实现单个会话幂等性,如果生产者宕机,不保证幂等性。

设置生产者的enable idempotencce参数为true,并设置生产者的transactional id为名字。事务生产者保证跨分区、跨会话幂等性。

讲讲kafka消费者组

消费者组下可以有一个或多个consumer实例,这个实例可以是一个单独的进程,也可以是同一进程下的线程。消费者组用group id字符串唯一标识一个消费者组。

消费者组下所有实例订阅的主题的单个分区,只能分配给组内某个consumer消费。这个分区当然也可以被其他消费者组消费。

理想情况下,消费者实例的数量应该等于该组订阅主题的分区总数。

讲讲kafka消费者组重平衡

kafka中协调者为消费者组服务,负责为消费者组进行重平衡,进行消费位移管理,管理组成员。各个broker有自己的协调者,消费者组用kafka的位移主题确定自己的协调者所在的broker的索引。

重平衡是由于消费者组实例个数减少导致的,在有些情况,消费者实例会被协调者错误的认为已停止,从而被踢出消费者组。

第一种发生重平衡是因为消费者实例的心跳线程没有及时发送心跳。我们需要调整消费者实例的session timeout ms和heartbeat interval ms参数,要保证消费者实例至少要发送3轮心跳请求,之后才能被协调者判定为dead。即session timeout ms大于等于三倍的heart beat interval ms。

第二种发生重平衡时因为消费者消费时间过长。消费逻辑很重时,可以调整消费者实例的max poll interval ms参数,或者将消费逻辑异步。

第三种发生重平衡时因为消费者实例发生了频繁Full GC。

重平衡发生时,协调者要控制消费者组的状态流转,kafka是用消费者状态机实现的。消费者组有五个状态,empty,dead,preparing balance,completing balance,stable。

消费者端重平衡流程。重平衡分为两个步骤,加入组和等待领导者消费者分配方案,对应两类请求,join group请求和sync group请求。

broker端重平衡的流程,分为四种场景。

场景一,新消费成员入组。成员2向协调者发送加入组请求,协调者返回加入组响应。同时协调者收到组内成员1心跳请求时,会返回携带重平衡开始提示的心跳响应。然后成员1和成员2发送等待领导者消费者分配方案请求,协调者分别返回响应。

场景二,消费成员主动离组。此时和场景一基本类似,对于离组的成员,协调者返回离组响应,而对仍在消费者组的实例,协调者会对他们的心跳请求返回带有重平衡提示的心跳响应。

场景三,消费成员宕机离组。此时协调者只会对仍在消费者组的实例进程重平衡,宕机的会超时。

场景四,重平衡时协调者对消费者组内成员提交位移的处理。

讲讲kafka位移提交

消费者实例能同时消费多个分区的数据,所以位移是在分区粒度进行的。提交分为手动提交和自动提交,手动提交又分为同步提交和异步提交。

自动提交可以配置消费者参数enable auto commit为true。默认情况消费者每5秒自动提交位移,如果中间发生了重平衡,那么之前消费的消息要重新消费一次。

手动提交可以用commitSync和commitAsync方法,commitAsync需要传入回调函数,处理异常。值得注意的是,commitAsync不能替代commitSync,因为出现问题时他不能重试,如果提交位移时失败,他重试的时候可能位移已经更新很久,这时候在重试已经没有意义了。

最佳实践是同时用commitSync和commitAsync,对于常规提交用commitAsync避免阻塞,而在消费者实例即将关闭前,用commitSync进行同步位移提交,保证位移正确。

kafka提供了commitAsync和commitSync携带Map形参的方法,Map的key是分区,value是OffsetAndMetadata对象,保存位移,这样在大批量处理很多消息的消费者实例,可以用Map在短时间内提交位移,结合业务场景定制。

kafka commit failed exception异常怎么处理

异常的注释: /**

  • This exception is raised when an offset commit with {@link KafkaConsumer#commitSync()} fails
  • with an unrecoverable error. This can happen when a group rebalance completes before the commit
  • could be successfully applied. In this case, the commit cannot generally be retried because some
  • of the partitions may have already been assigned to another member in the group. */ 预防commit failed exception的方法。首先,可以尽量缩短单条消息的处理时间。其次,增加消费者实例允许下游系统消费一批消息的最大时长。或者减少消费者实例一次性消费的消息总数。最后在消费者实例是用多线程解耦。

kafka生产者和消费者怎么管理TCP连接的?

kafka生产者实例创建时启动sender线程,创建与bootstrap servers中所有broker的TCP连接。kafka生产者实例首次更新元数据信息后,还会再次和所有broker建立TCP连接。如果发送消息时,发现没有和该broker建立连接,也会再次创建连接。需要注意生产者实例的connections max idle ms参数,如果大于0,则启动时的TCP连接会超时关闭,但如果设置为-1则永远都不会关闭。默认值是9分钟。

kafka消费者实例是在poll方法中创建TCP连接的,这个方法有三个时机会创建TCP连接。首先是实例启动后调用poll方法时他要向kafka集群发送find coordinater TCP请求,获取自己的broker的协调者。其次是连接协调者时,他会和协调者建立TCP连接,连入协调者后,消费者可以加入消费者组、提交位移。最后是消息消费时,消费者会为每个分区创建和该分区的领导者副本所在的broker连接的TCP。

kafka副本机制

kafka追随者副本是不对外提供服务的。

kafka选择不像MySQL那样将副本用于数据的读的原因有两点。

原因一,实现写后读一致性。写后读就是用生产者api向kafka集群成功写入消息后,马上用消费者api去读刚刚生产的消息。比如我发微博,发完一条微博,肯定希望立马能看到。如果允许追随者副本对外提供服务,由于副本同步是异步的,客户端可能看不到最新写入的消息。

原因二,避免幻读。如果允许追随者副本提供读服务,假设有两个追随者副本F1和F2,他们异步拉取领导者副本数据。假设F1拉取了领导者副本数据而F2没有拉取,那么,此时如果有消费者先从F1拉取消息之后又从F2拉取消息,他会看到第一次消费时的最新消息在第二次消费时不见了,这就是幻读。但是,如果所有读请求都是leader处理,那么就不会发生幻读。

ISR。kafka用ISR标识已经和领导者副本同步的副本集合。注意,领导者副本天然就在ISR中。副本能否进入ISR集合,不取决于他和领导者副本相差的消息数,而是看副本和领导者副本落后的时间间隔,kafka用replica lag time max ms参数控制。

如果领导者副本发生宕机,此时副本能否进入ISR集合,取决于kafka的参数,即unclean leader election enable,他控制是否允许脏副本参加选举。

kafka请求怎么处理的

kafka使用reactor模式。kafka broker端有一个socket server组件,类似于reactor模式中的Dispatcher,他也有acceptor线程和一个网络线程池。网络线程池有一个共享的队列存放请求。网络线程并没有自己处理请求,而是又交给一个线程池,io线程池。每个网络线程有一个线程专属的队列存放响应。kafka请求如果不能被立即处理,如设置了ack为all,要等待其他副本响应,则会把这个请求缓存到Purgatory组件中。

kafka控制器

控制器和Zookeeper一起工作。每台broker都能充当控制器。kafka把第一个在Zookeeper中成功创建/controller节点的broker指定为控制器。

控制器负责增加删除主题,以及为主题增加分区。他还负责新增broker,broker主动关闭。

高水位和leader epoch

当我把一条条带时间戳的事件丢给流处理系统,系统并不知道未来还有哪些旧事件会因为网络抖动而迟到,因此系统会不断估算一个水位T,它的意思是我现在认为,未来不会再有时间戳小于等于T的新事件到达。

在kafka中,水位不是由时间决定的,他是由位移决定的。kafka有两个和水位相关的概念,高水位hw和日志末端位移l e o。hw是已提交消息和未提交消息的边界,hw是第一个未提交消息。l e o是副本写入下一条消息的位移值。

高水位的更新机制。hw的更新在领导者副本和追随者副本中是不一样的。

领导者副本。当接收到生产者请求时,先写入消息到本地磁盘,然后获取领导者副本所在broker保存的所有远程副本l e o值,以及自己的hw值,取这些值的最小值,更新hw值。当接收到追随者副本拉取消息时,他先读取磁盘的消息数据,然后用追随者副本的请求中的位移更新远程副本l e o值。

追随者副本。他会从领导者副本拉取消息,先写入消息到本地磁盘,然后更新l e o的值,然后将自己的hw值更新为领导者副本的hw和自己的l e o值的最小值。

为了解决高水位更新的一致性问题,kafka引入了epoch任期机制。

怎么监控kafka

TODO,36

怎么调优kafka

TODO,38

TODO,39,创建自己的日志流处理平台