RocketMq随手记

1.RocketMq

RocketMq是阿里开源的一款消息中间件产品,从特性上来讲,它支持事务性消息。这一点是很多业务系统中选用它的主要原因之一。

从结构上来说,RocketMq在整个系统前方加了一层name server。当broker启动时,它会去向name server注册自己,并且和name server之间维持心跳链接。而producer(消息生产者)在启动时,则会从name server那里拉去topic对应的broker具体的地址,然后直接向broker发生message。

name server中,用Hashmap存储了当前有效的broker节点,如果在timeout(默认是2分钟)时长后,还没有收到一个broker节点的心跳通讯,就会把该节点从brokerLiveTable中移除,并且从brokerAddrTable中移除。如果该节点是master节点,那么它下属的slave broker也都会一起被移除。在这里,其实name server就类似于kafka中的zookeeper,它充当了服务治理、负载均衡的角色。

name server中还维护了topic和它所有queue的列表,名称、读队列数量、写队列数量、同/异步复制标记等。消费者可以从name server中知道它想要的topic有哪些queue,在哪个broker上。从这里来看,name server还充当了消息路由的角色。

2.一些需要知道的特性

rocketmq不保证消息不重复:消息重复的根本原因在于,网络错误,这是无法避免的。如果一定要由中间件来解决这个问题,往往回造成性能损失。所以,常规情况下,我们一般在业务代码中完成去重,即冥等。

对于rocketmq来说,最重要的特性就是事务消息了。RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段通过第一阶段拿到的地址去访问消息,并修改状态。细心的你可能又发现问题了,如果确认消息发送失败了怎么办?RocketMQ会定期扫描消息集群中的事物消息,这时候发现了Prepared消息,它会向消息发送者确认,Bob的钱到底是减了还是没减呢?如果减了是回滚还是继续发送确认消息呢?RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

消息存储:

如果一个消息包含key值的话,会使用IndexFile存储消息索引,文件的内容结构如图:



消息索引

索引文件主要用于根据key来查询消息的,流程主要是:

  1. 根据查询的 key 的 hashcode%slotNum 得到具体的槽的位置(slotNum 是一个索引文件里面包含的最大槽的数目,例如图中所示 slotNum=5000000)
  2. 根据 slotValue(slot 位置对应的值)查找到索引项列表的最后一项(倒序排列,slotValue 总是指向最新的一个索引项)
  3. 遍历索引项列表返回查询时间范围内的结果集(默认一次最大返回的 32 条记录)

其实就是一个老版本的类HashMap实现而已。

参考&引用:
鼬神@Apache RocketMQ背后的设计思路与最佳实践