消息中间件整合
rocketmq如何保证消息不丢失 page 63
生产阶段可靠性。1.请求确认机制。生产者调用发送消息api,mq客户端将消息发送到broker,等待broker返回确认响应。只有收到确认才认为消息成功。2.异常处理。代码必须处理异常情况。
存储阶段。1.消息写入磁盘确认。broker收到消息持久化到磁盘后再确认,rocketmq将刷盘模式配置为sync flush。2.副本机制。
消费阶段。1.先执行业务逻辑再确认。rabbitmq消费可以先落库,再调用basic ack。2.幂等。在业务数据引入唯一标识,如数据库唯一索引。
kafka如何保证消息不丢失 page 1
kafka的持久化只保证对已提交的消息做有限度的持久化保证。如果n个broker都宕机,则kafka会丢失数据。
解决方案,调整kafka参数,保证无丢失。
首先,不用producer send msg方法,而要使用producer send msg callback方法,带有回调通知的方法。
其次,设置ack为all,ack是生产者的参数,设置为all,表明所有副本broker(包含领导者和追随者副本)都要收到消息,该消息才算“已提交”。
然后设置retries为一个较大的值,网络出现瞬时抖动时,生产者能自动重新发送消息。
然后设置unclean leader election enable参数为false,这是broker端的参数,他控制的是哪些broker有资格竞选分区的leader。如果一个broker落后原先leader太多,那么他一旦成为新的leader,必然会造成消息丢失。因此要把该参数设置为false,不允许这种情况发生。
然后设置mini insync replicas为大于1的值。这仍然是broker端参数,控制的是消息至少被写入多少个副本才算是已提交,保证消息持久性。
然后设置replication factor大于mini insync replicas。如果两者相等,那么只要有一个副本宕机,整个分区无法工作。
最后确保消息消费完再提交。消费者端有参数enable auto commit,应设置为false,手动提交位移,这对于单消费者多线程处理场景是必要的。
rocketmq如何处理重复消息(RocketMQ怎么保证消息不重复消费?)page 48
消息队列都是至少一次,消息重复不可避免。思路是将业务消费设计成幂等。
1.用数据库唯一约束实现幂等。消费消息时,将业务操作关键信息存到数据库,设置唯一约束,如订单id+账号id建立联合索引。
2.数据更新设置前提条件。CAS。
3.记录并检查操作。用全局唯一ID。
rocketmq如何保证消息有序性(kafka如何保证消息有序性)page 50
全局有序。用单队列,单分区。
局部有序。将相同业务消息统一分配到同一分区。
如果消费者故障导致乱序,broker会重新分配分区到其他消费者。解决方法是对分区和消费者绑定采用单线程,乱序后可以回放消息修正。
分区扩容,扩容过程中重新映射key和分区的关系,解决方案是在低峰期扩容,扩容前暂停消息生产、待队列消息消费完毕再扩容。或者用新旧主题替换,新主题用扩容后分区配置,等旧主题消费完毕切换。
Rocketmq如何处理消息堆积(kafka如何处理消息堆积)page 2
水平扩容。增加消费者实例数量。同步扩容分区数量,因为消费者在每个分区只能单线程消费,所以必须保证消费者实例数量与分区数匹配,否则多余实例没有作用。这个操作要业务和消息中间件运维团队配合,保证分区扩容和消费者实例扩容同步进行。
优化消费者业务逻辑。1.剥离耗时操作。收到消息后,先进行预处理,将复杂耗时业务异步到其他线程或进程处理。2.批量消费。如消费者能批量入库,可以用批量消费减少每条消息单独确认的间接成本,但要注意一条消息消费失败整批重试,且多线程批量消费受限于处理最慢的线程。
异常消息处理机制。1.死信队列。反复消费失败的消息,用死信队列,避免阻塞。
MQ 消息堆积了,加了消费者,TPS 没有上升怎么回事?(高德地图)
理想情况下,消费者实例的数量应该等于该组订阅主题的分区总数。如果消费者实例数量已经大于分区总数,则多出来的实例处于空闲。
kafka消息量太大导致读消息延迟时间很长怎么办(字节)
首先在broker端,可以增加num replica fetchers的值,加快追随者副本拉取速度。
其次在生产者端,设置linger ms为0,即让消息尽快发送出去不停留,并可以关闭压缩,设置compression type参数为none。可以设置acks参数为1,仅等待领导者副本写入后返回确认。
最后在消费者端,设置fetch mini bytes为1,只要broker有能返回的数据,立即让他返回给消费者,缩短消费者端延迟。
描述一下 RocketMQ 从生产到消费的全过程(讲讲Rocketmq从生产到消费的全过程)
Rocketmq生产到消费经过了生产者、broker、消费者三个模块。在生产端,客户端用四层tcp协议和nameserver建立连接,用remoting协议从nameserver获取集群元数据信息,根据元数据信息,和对应broker建立tcp连接。如果客户端指定目标topic,则消息先经过消息分区分配,然后才将数据发送到broker中。因为remoting协议不支持批量传输,所以数据直接发送到对应broker,可以用单向发送、同步发送、异步发送三种方式。
broker收到数据后,用remoting协议反序列化数据。然后解析处理数据,整合数据,在底层写入同一个文件中,存储数据时进行分段存储。如果是集群部署,并设置了副本,则数据会分发到其他broker的副本中。当数据过期后,broker会自动清理节点上数据。
在消费端,可以选择pull、push、pop其中一种消费模型。客户端同样需要先和nameserver完成寻址。消费端有分组概念,会先进行消费者和分区的消费关系分配过程,然后消费者和自己消费的分区的leader所在的broker建立连接。接着客户端用remoting协议从broker消费数据。数据消费成功后,最后会提交消费进度。
kafka如何保证消息只被消费一次?(kafka如何处理重复消息) page 49
消息队列都是至少一次,消息重复不可避免。思路是将业务消费设计成幂等。
1.用数据库唯一约束实现幂等。消费消息时,将业务操作关键信息存到数据库,设置唯一约束,如订单id+账号id建立联合索引。
2.数据更新设置前提条件。CAS。
3.记录并检查操作。用全局唯一ID。
注意,kafka有exactly once,但他是为流计算场景设计的,不是我们讨论的幂等。
如果异步MQ发送消息失败了怎么办(京东)
kafka可以配置追随者副本a c k后才认为消息发送成功,否则不会提交位移,且提交位移时如果失败,则可以传入回调函数,处理异常。
Broker和Topic的对应关系是什么?(饿了么考过)page 50
Broker和topic是多对多关系,一个topic可以跨越多个Broker。
rocketMQ和kafka的区别?(讲讲消息队列的选型)(网易)page 50
架构。1.kafka分区模型,每个主题分为多个分区。以分区为单位进行多副本异步复制,可用ack参数配置为同步复制。2.rocketmq统一commitlog,所有消息写入一个文件,有消费队列。
性能。kafka用Scala和Java开发,设计上用了批量和异步的思想,生产者用批量合并和filechannel的sendfile系统调用,TPS能达到百万级,但异步批量带来了响应延时高的问题。且ack=0或1模式可能丢失消息。rocketmq用Java开发,对在线业务的响应做了很多优化,大多数情况可以做到毫秒级别响应,单机TPS在十万。
RocketMQ通过什么机制确保消息消费?(网易)page 49
消息持久化。rocketMQ将消息持久化到磁盘,即使broker宕机也不丢失。rocketMQ用commitlog存储消息原始数据,ConsumeQueue存储消息索引信息。
消息重试机制。如果消费者消费失败,rocketMQ会自动重试,直到消息被成功消费或者达到最大重试次数。如果重试次数超过阈值,消息转入死信队列。
消息顺序消费。
消息事务,事务消息。rocketMQ分为两个阶段,预提交和提交。在预提交阶段,消息被暂存,等待事务最终确认。如果事务成功,消息被提交到队列,如果失败消息回滚。
消息回溯。rocketMQ维护消费位点,允许消费者从某个时间点重新消费消息。
kafka如何保证高吞吐的?它是如何保证消息不丢失的?(Kafka 相较于其他消息队列(MQ),如何实现高吞吐量?)(阿里考过,得物,拼多多)page 49
kafka怎么保证数据一致?/kafka如何保证可靠传输?(百度)
kafka底层做了哪些优化让他如此高性能?(shopee)
kafka消息丢失的问题?(腾讯)
高吞吐。顺序读写,kafka使用磁盘顺序读写,而不是随机读写,可以提高效率。 零拷贝技术。kafka使用零拷贝减少了数据在内存中拷贝次数。 批量处理,kafka支持批量发送和接收消息。
不丢失的保证。持久化存储,kafka将消息持久化到磁盘,即使服务器宕机消息也不丢失。 副本机制。kafka每个分区都有多个 副本 ,其中一个副本是 领导者 ,其他副本是 追随者 ,如果领导者失败,追随者可以接管。 ack 机制。kafka提供不同的ack级别,生产者选择不同的ack级别保证消息可靠。如ack为all表示所有副本都确认收到消息后生产者才认为消息发送成功。
RocketMQ实现定时消息的原理,如果每个延迟队列都定时任务遍历判断是否到期,任务数量大的时候会不会产生延迟?(rocketmq当延迟级别(即延迟队列)数量较多时,是否由于多个定时轮询任务并发扫描大量队列和消息,导致消息到期后触发投递出现延迟?)(淘天)page 1
是会产生延迟。
解决方案。1.减少延迟级别数量。rocketmq支持18个延迟级别,从1秒到2小时,减少延迟级别可以减少定时任务数量,降低系统负担。2.用外部定时任务系统,将延迟消息的投递交给专门的定时任务系统如Quartz,减轻rocketmq负担。
kafka丢消息如何发现?(腾讯)
消息确认机制:如果ack设置为0或1,可能消息丢失,因为生产者不会等待所有副本的确认。
消息持久化:kafka通过日志文件持久化消息,如果磁盘写入失败会导致消息丢失。
监控报警:用kafka Manager监控集群状态。
kafka消费者数量大于分区数量会出现什么情况?(腾讯)
消费者闲置:因为他们没有分配到任何分区。
kafka同一个消费者组中消息是顺序的吗?(滴滴)page 49
不是顺序的。
在一个分区内,消息是按照顺序消费的。
跨分区的消息顺序:如果消息分布在不同分区中,kafka不保证顺序。同一个消费者组中不同消费者可能会同时消费不同分区的消息。
消息队列中间件如何选型?(阿里巴巴)
性能要求:消息队列的吞吐量和延迟是关键指标。kafka在高吞吐量(QPS)场景表现优异,rocketMQ在低延迟(毫秒)和可靠性表现更突出。
public class TransactionProducer {
public static void main(String[] args) throws MQClientException {
TransactionMQProducer producer = new TransactionMQProducer("TransactionProducerGroup");
producer.setNamesrvAddr("localhost:9876");
// 设置事务监听器
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
try {
// 模拟本地事务操作
System.out.println("Executing local transaction...");
Thread.sleep(1000);
System.out.println("Local transaction executed successfully.");
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
e.printStackTrace();
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 检查本地事务状态
System.out.println("Checking local transaction state...");
return LocalTransactionState.COMMIT_MESSAGE;
}
});
producer.start();
Message msg = new Message("TransactionTopic", "TagA", "Hello RocketMQ".getBytes(StandardCharsets.UTF_8));
producer.sendMessageInTransaction(msg, null);
// 防止生产者立即退出
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
producer.shutdown();
}
}
如何处理重复消息?(科大考过,美团)page 48
怎么实现接口幂等?/怎么实现消息幂等?(百度,得物多次考,作业帮,shopee)
重复消息的常见原因:生产者重复发送,生产者在未收到Broker确认响应时可能会重新发送消息。消费者重复消费,消费者在处理完消息后,如果在更新消费偏移量前系统崩溃,可能导致重启后再次消费同一消息。
解决重复消息的策略:业务层面的幂等保证,如使用数据版本号来控制操作, 每次操作前对比版本号,只有当消息版本号高于当前数据版本号才执行更新,然后递增版本号 ;如使用数据库唯一键约束防止重复记录插入,INSERT INTO … ON DUPLICATE KEY UPDATE。
解决方法 。用数据库唯一约束,改造业务逻辑,将转账ID和用户ID联合起来创建唯一约束。
更新数据库时设置前置条件,比如版本号,或者如果余额为多少元,再增加100元。
唯一标识符。客户端生成唯一标识符,服务端处理请求前查询是否存在或已被处理。
令牌机制。在提交表单前,服务器给客户端发送令牌token。客户端请求携带令牌,服务端会验证token有效性。
如何处理消息堆积?(得物,满帮)
检查是否存在BUG,评估消费者处理能力是否足够应对生产者输出速度。
优化消费逻辑,如果消费者逐条处理消息,改为批量处理。
水平扩容,增加消费者数量,或者增加Topic的队列数,这样,每个队列可以独立由一个消费者处理,提高消费速度。
Rocketmq为什么快?(网易)page 48
存储结构高效,rocketmq采用commitlog允许写入时顺序写。rocketmq还用了mappedbytebuffer直接在内存操作文件,减少拷贝。
零拷贝技术。避免从磁盘到内核再到用户最后发送到网络。减少中间数据拷贝环节。
网络模型。rocketmq采用netty能够处理大量并发连接。 分布式架构,rocketmq支持水平拓展,将消息分布到多个broker节点提高整体处理速度。
rocketMQ如何提高吞吐量的?(小红书)
异步刷盘。消息写入内存后立即返回,后台线程异步将消息持久化到磁盘。
零拷贝技术。通过直接将数据从磁盘拷贝到网络,避免了内核空间和用户空间之间的数据拷贝,提高了数据传输效率。
批量发送。多个消息打包成一个批次进行发送。
消息压缩。
负载均衡。rocketMQ将消息均匀分发到不同的broker节点上。
零拷贝拷贝几次?(shopee)
零拷贝目的是减少数据在内核空间和用户控件之间的拷贝次数,传统数据传输通常要多次拷贝,比如从磁盘读取数据到内核缓冲区,再从内核缓冲区拷贝到用户缓冲区,最后再从用户缓冲区拷贝到网络发送缓冲区。
零拷贝通常用sendfile或mmap系统调用,直接在内核空间完成数据传输,这样减少了至少一次数据拷贝。
传统数据传输通常会有3次或者4次拷贝,而使用了零拷贝后次数可以减少到1次或2次。
比如使用sendfile系统调用时,数据可以直接从磁盘缓冲区拷贝到网络发送缓冲区,中间不需要经过用户空间,这就减少了2次拷贝。
Java哪里用到了零拷贝(腾讯)
在处理文件上传和下载时,用Java NIO的FileChannel类结合transferTo或transferFrom方法,可以将文件数据从文件系统缓存传输到套接字缓存。
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.FileChannel;
import java.nio.channels.SocketChannel;
public class ZeroCopyExample {
public static void main(String[] args) throws IOException {
String filePath = "example.txt";
FileInputStream fileInputStream = new FileInputStream(filePath);
FileChannel fileChannel = fileInputStream.getChannel();
SocketChannel socketChannel = SocketChannel.open();
socketChannel.connect(new InetSocketAddress("localhost", 8080));
// 使用零拷贝技术传输文件内容
long bytesTransferred = fileChannel.transferTo(0, fileChannel.size(), socketChannel);
System.out.println("Bytes transferred: " + bytesTransferred);
fileChannel.close();
socketChannel.close();
}
}
读取一次文件再通过socket发送到网络总共经历了几次拷贝?(shopee)
四次。磁盘读取到内核缓冲区,内核拷贝到用户缓冲区,再从用户拷贝到socket缓冲区,最后从socket拷贝到网络发送缓冲区。
如何写一个消息中间件?(腾讯)
核心组件:生产者,消费者,Broker,注册中心。
流转过程:生产者将消息发送到Broker,Broker暂存消息并负责消息管理,消费者从Broker获取消息并消费。
技术要点,使用Netty实现各模块间的通信,使用现有服务发现技术如Zookeeper。像kafka那样,采用分区概念增强可扩展性和并发能力。数据存储方面,提供同步和异步的刷盘选项。故障恢复和高可用,利用选举算法确保Leader宕机时,Follower能迅速接管。
为什么kafka比RocketMQ吞吐量高?
存储机制 kafka采用日志文件存储,非常适合顺序写。RocketMQ使用文件系统,在高吞吐量情况下,日志性能更强。 分区机制 kafka每个主题可以有多个分区,每个分区可以分布在不同服务器上。 网络模型 kafka采用零拷贝,减少网络传播中拷贝次数。 消费模型 kafka是基于拉取的,消费者根据自己消费能力拉取消息,适合高吞吐量场景,RocketMQ是基于推送的。
RocketMQ发一个消息会经过哪些环节?
生产者创建一条消息,查询 NameServer 获取该消息topic的路由信息,根据特定的负载均衡策略选择一个队列,进而确定将消息发送给哪个broker。broker接受生产者消息,存储到broker磁盘中,broker发送一个确认响应给生产者。生产者执行本地事务,成功则向broker发送commit,broker向消费者发送消息。消费者处理完后,向broker发送ack,确认消息被消费,broker更新消息的状态。
如何保证消息的有序性?
全局有序和部分有序。全局有序需要满足单一生产者,单一队列,单线程消费。全局有序较少见,因为他限制了系统的并发能力。部分有序通过多队列分区,消息路由策略,单线程队列消费实现。多队列分区,根据需要,将Topic内部划分为多个队列或分区。消息路由策略,通过策略将同类消息发送到固定队列中,可以根据消息的键值决定路由到哪个队列。
讲一讲如何做到幂等?分布式锁解决了什么问题?
幂等是一个操作无论执行多少次,结果都是一样的。 实现方法。唯一标识符。每次操作生成 唯一的标识符 ,并将这个标识符存在数据库中,每次操作前检查这个标识符是否存在,如果存在则跳过。 状态机。将操作设计成 状态机 的一部分,每次操作只改变状态机的状态,而不是直接修改数据。 乐观锁,在数据库中使用 乐观锁 ,通过版本号或时间戳保证一致性。
分布式锁解决的问题。 资源互斥访问 ,确保同一时间内只有一个节点能访问共享资源,避免数据竞争和不一致。 防止重复操作 ,在并发场景下,确保操作只执行一次。 任务调度 ,在分布式系统中,确保任务只被一个节点执行。 分布式锁的实现。基于数据库,用数据库的唯一索引或行锁实现。 基于redis,用redis setnx命令实现分布式锁。 基于ZooKeeper,用ZooKeeper临时顺序节点实现分布式锁。
public boolean idempotentOperation(String uniqueId, Object data) {
//检查唯一标识符是否已经存在
if (existsInDatabase(uniqueId)) {
return false; //已经执行过,跳过操作
}
//执行操作
performOperation(data);
//存储唯一标识符
storeUniqueId(uniqueId);
return true;
}
各注册中心如Zookeeper、Consul、Eureka或Nacos之间的选型对比?
ZooKeeper优点是成熟稳定,通过ZAB协议确保数据的一致性,缺点是配置和管理复杂。Consul优点是提供一套简单的REST API接口,易于使用,缺点是资源占用高。Eureka优点是与Spring Cloud集成度高,是Spring Cloud体系下默认选择,缺点是一致性较弱,选择了AP设计。
你提到了选举算法,讲讲Bully 算法、Raft 算法、ZAB 算法
选举算法选择一个或多个领导者节点,以便统一决策。Bully算法,当一个节点认为领导者失效时,他会发起选举,向所有比自己更高ID的节点发送选举消息,如果没有更高ID的节点响应,发布者宣布自己为领导者,并向所有节点发送胜选消息,如果有更高ID节点响应,则接管选举流程重复步骤。Raft算法,如果Follower在一定时间内没有收到Leader的消息,他将变为Candidate并发起选举,Candidate向所有节点请求选票,如果得到大多数选票则成为Leader,Leader定期向所有Follower发送心跳信息,维持领导。ZAB算法,ZAB协议在启动时执行领导者选举,选出Leader,开始接收客户端事务请求,形成事务提议,Leader将这些事务提议广播给所有Follower,当大多数Follower响应并提交事务后,Leader宣布事务完成并对外提供服务。
你提到了分区,kafka这个分区和 RocketMQ 的队列有什么不同啊?具体分区要怎么实现?
Kafka的分区是指将一个主题(Topic)分割成多个独立的部分,每个分区是一个有序的日志文件,数据按写入顺序存储,并允许并发处理。分区有助于提升可扩展性和吞吐量,因为生产者可以并行地写入多个分区,而消费者也可以并行地读取不同的分区。
RocketMQ的队列则是类似的概念,但它的队列是主题下的子单位。每个队列可以理解为分区,但RocketMQ的设计中,每个主题下可以有多个队列,消息可以在这些队列之间进行负载均衡和分配。具体分区的实现涉及到对消息进行哈希分配、保证数据一致性,并通过协调服务(如Zookeeper)来管理分区的元数据和负载均衡。
然后你提到顺序写,为什么要顺序写啊?你说的内存映射和零拷贝又是什么啊?那你知道 RocketMQ 和 Kafka 用了哪个吗?
顺序写比随机写更高效,因为大多数存储系统在处理顺序写操作的性能远高于随机写。内存映射是将磁盘上的文件映射到进程的地址空间实现,使得文件内容可以像访问普通内存一样被访问。零拷贝是直接在内核缓冲区和网络接口之间传输数据。
消息队列设计成推消息还是拉消息?RocketMQ和Kafka是怎么做的?
推模式,Broker主动向Consumer推送消息,消费者被动接收。拉模式,Consumer主动向Broker请求消息。RocketMQ的实现,看似是推模式的PushConsumer,实际上他背后使用的是拉模式的机制。PushConsumer是对拉模式的一种封装,使得从用户角度看起来像是消息被推送。RocketMQ使用长轮询优化拉模式,当消费者发送请求时,如果Broker没有足够的消息满足请求,他会保持连接直到新的消息到达或超时。Kafka纯粹使用拉模式,消费者定期向Broker发送请求以拉取新的消息,Kafka允许消费者在请求中设置长时间的等待,如果没有足够消息可供消费时, 请求会在Broker保持挂起状态,直到有足够的消息或请求超时。长轮询机制是消息队列中用来减少拉取操作频率,保证效率的技术。
消息队列的事务消息你了解吗?RocketMQ和Kafka是怎么做的?
分布式事务有2PC和TCC等。2PC,两阶段提交,典型的分布式事务。TCC,尝试确认取消,需要业务侧进行较大改造适应这种模式。
事务消息是处理分布式事务的另一种策略,适用于异步更新且对数据实时性要求不高的场景。
RocketMQ的事务消息。RocketMQ首先发送一个半消息到Broker,这个消息暂时对消费者不可见,在发送完半消息后,生产者执行本地事务。根据本地事务的执行结果生产者会通知Broker提交或回滚消息,如果提交或回滚失败,Broker会定期回查事务的状态,以确定最终操作。Kafka的事务消息。生产者向事务协调者请求开启事务,生产者发送消息,对消费者不可见,完成消息发送后,生产者要求提交事务,事务协调者标记事务结束,消费者现在可以看到这些消息。RocketMQ提供的事务消息适用于那些需要保证消息发送和本地事务严格一致性的场景,而Kafka的事务消息适合需要在一个事务中发送多条消息的场景。
比 RocketMQ 更好的事务消息实现是什么?
事务消息的顺序和依赖性。RocketMQ采用的是先发一个半消息,只有当事务成功后,消费者才可见这条消息,确保消息队列中消息的可靠性后再执行本地事务,这种方法的缺点是如果MQ服务出现问题,本地事务无法执行,影响整个应用的正常运行。QMQ采用本地消息表的方法,将消息发送与数据库事务合并处理,先写数据库再发送MQ,即使MQ服务出现问题,也不会影响数据库事务的完成,保证业务连续性。事务消息的实现机制。RocketMQ需要回查机制确认事务的状态,增加了复杂性。QMQ可以在一次事务中发送多个消息,且整合了Spring事务管理,简化了开发过程。
Kafka的索引设计有什么亮点?
Kafka采用稀疏索引减少索引文件对内存的占用,这样可以在内存中保存更多索引。Kafka索引有三种,位移索引,保留消息的位移值,时间戳索引,记录消息的时间戳,和事务索引,启用Kafka事务时使用。Kafka为了提高索引查找效率,采用了基于热点数据的优化二分查找策略,为了避免缺页中断,将索引分为热区和冷区,使得热区的页始终保持在内存中,提高了查找速度。Kafka设定的热区大小保证了热区查找时所需的页数最大为3页,保证这些页几乎都在内存。
Kafka日志段如何读写解析?
Kafka将数据存储翻在按分区划分的日志文件中。每个分区对应一个或多个日志段。每个日志段包括一个消息日志文件,一个位移索引文件和一个时间戳索引文件。日志段的写入。检查当前日志段是否为空,记录时间用于后续日志切分。位移值必须合法,通过AbstractIndex.toRelative()方法验证。使用FileChannel写入数据,写入策略依据配置决定是否立即刷盘。更新日志段的最大时间戳和对应的位移值,这些值用于日志的定期删除管理,如果达到了索引间隔,更新索引项。日志段的读取,通过OffsetIndex定位消息的物理位置和大小,获取LogOffsetdata,其中包含消息的位移,所在日志段的起始位移和物理位置。调整返回数据的大小,确保至少返回一条消息,避免消费者因为消息过大而无法接收。根据物理位置和指定的大小,使用FileRecords.slice()方法读取数据。
Kafka控制器事件处理全流程说一下
Kafka Controller的核心职责是管理和协调集群,包括主题、分区、分区Leader选举、Broker状态监听和元数据管理。Controller通过ZooKeeper进行节点的选举和故障转移,原有的多线程并发处理已改为单线程事件队列处理,以提升性能和简化代码。Controller向集群其他Broker发送请求以同步源数据,包括LeaderAndlsrRequest、StopReplicaRequest和UpdateMetadataRequest。当前社区正计划移除ZooKeeper,采用Raft算法进行Controller选举,并将元数据存储在日志中。
Kafka请求处理的流程说一下
Reactor模式。Kafka使用Reactor模式来处理高并发的网络请求。Broker段通过一个Acceptor线程(mainReactor)监听新连接,然后将连接分配给Processor线程(subReactor)。Processor线程处理实际的I/O操作,将请求放入队列中,I/O线程池从队列中取出请求并处理,最终将响应返还给客户端。请求处理流程。Kafka的请求处理流程包括连接的接受、请求的封装、请求的处理和响应的发送。网络线程负责处理请求的接收与封装,而I/O线程池负责具体地请求处理。请求优先级划分。Kafka将请求分为数据请求和控制请求两类。控制请求如删除topic,领导者选举通常要优先处理。
Kafka为什么要抛弃ZooKeeper?
ZooKeeper用于管理分布式系统中的元数据和配置,包括Kafka的Broker信息、主题、分区等。ZooKeeper也负责服务发现与故障转移和控制器选举。抛弃ZooKeeper的原因。运维复杂度,Kafka依赖ZooKeeper增加了运维复杂度。性能瓶颈。ZooKeeper的强一致性保证导致其写入性能较差。Kafka的改进。内部存储。Kafka将元数据存储到自己的内部系统中,而不是依赖ZooKeeper。Kraft协议。Kafka引入了基于Raft协议的KRaft模式,取代了ZooKeeper的控制器选举机制。
讲一讲你理解的的RocketMQ
RocketMQ包含四个核心部分Producer,Consumer,Broker和NameServer。生产者负责生成并发送消息到指定的Topic。生产者集群通过长连接与NameServer连接,获取Topic和Broker的映射关系。每30秒从NameServer拉取最新路由信息,并更新本地路由表。消费者集群从Broker中拉取消息进行处理。消费模式包括集群模式,多个消费者均衡消费,和广播模式,每个消费者都消费完整消息。Broker负责消息的存储转发和消费,Broker向NameServer注册路由信息并进行心跳检测。各NameServer之间不进行通信,每台NameServer拥有完整的路由信息。RocketMQ交互流程:NameServer启动后,Broker定期向其发送心跳包,包含Broker的状态和Topic信息。Producer和Consumer通过NameServer获取Topic路由信息,确定消息的存储位置。RocketMQ中值得注意的点。订阅一致性,ConsumerGroup中每个Consumer必须有相同的订阅,否则会导致订阅信息冲突。队列数量。增加机器无法提升消费速度,队列数量也需相应增加。RocketMQ最佳实践。Tags的使用。一个应用使用一个Topic,通过Tags区分业务。JVM参数调整。启用-XX:+AlwaysPreTouch,会增加启动时间,但是提高运行性能。
RocketMQ和Kafka为什么能这么快?能从底层存储讲讲吗?
存储机制。RocketMQ使用单个CommitLog文件进行顺序写入。消息在文件中顺序追加,消费时通过ConsumerQueue索引文件定位消息,RocketMQ利用mmap减少用户空间和内核空间的数据拷贝,此外RocketMQ使用文件预分配和预热策略,通过mlock和madvise提高性能。Kafka消息分区为单独的文件进行顺序写入。Kafka的消息存储使用了顺序写入的机制,但读取和写入时分区的切换会导致全局视角下的随机访问。Kafka通过sendfile实现零拷贝。
时间轮在Netty和Kafka中如何应用的?为什么不用Timer,延时线程池?
Timer是Java早期提供的一个定时任务工具,可移植性延迟任务和周期任务。Timer的核心是一个优先队列和一个单线程的执行任务线程。Java5.0开始,ScheduledThreadPoolExecutor替代Timer,支持多线程任务执行,通过一个优先队列DelayedWorkQueue实现任务调度。时间轮算法通过一个环形数组和槽来管理任务,时间轮的每个槽位可以存储待执行的任务,时间轮通过一个指针不断移动来执行任务。Netty和Kafka中的时间轮应用。Netty的HashedWheelTimer,使用单层时间轮,通过一个循环数组来管理任务,每个槽位内的任务通过双向链表进行管理。Kafka使用多层次时间轮,结合DelayQueue解决了时间轮空推进的问题。