RocketMQ是一个分布式开放消息中间件,底层基于队列模型来实现消息收发功能。RocketMQ集群中包含4个模块:Namesrv, Broker, Producer, Consumer。
主要功能
- 削峰填谷(主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题)
- 系统解耦(解决不同重要程度、不同能力级别系统之间依赖导致一死全死)
- 提升性能(当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统)
- 蓄流压测(线上有些链路不好压测,可以通过堆积一定量消息再放开来压测)
各个模块的作用
- Namesrv: 存储当前集群所有Brokers信息、Topic跟Broker的对应关系。
- Broker: 集群最核心模块,主要负责Topic消息存储、消费者的消费位点管理(消费进度)。
- Producer: 消息生产者,每个生产者都有一个ID(编号),多个生产者实例可以共用同一个ID。同一个ID下所有实例组成一个生产者集群。
- Consumer: 消息消费者,每个订阅者也有一个ID(编号),多个消费者实例可以共用同一个ID。同一个ID下所有实例组成一个消费者集群。
各个模块功能关系参考博客:https://www.cnblogs.com/wxd0108/p/6041829.html
功能架构部署图:
MQ集群工作流程
- 启动Namesrv,Namesrv起来后监听端口,等待Broker、Produer、Consumer连上来,相当于一个路由控制中心。
- Broker启动,跟所有的Namesrv保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有topic信息。注册成功后,namesrv集群中就有Topic跟Broker的映射关系。
- 收发消息前,先创建topic,创建topic时需要指定该topic要存储在哪些Broker上。也可以在发送消息时自动创建Topic。
- Producer发送消息,启动时先跟Namesrv集群中的其中一台建立长连接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,然后跟对应的Broker建立长连接,直接向Broker发消息。
- Consumer跟Producer类似。跟其中一台Namesrv建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。
Producer
示例代码:
这里用InitializingBean
,DisposableBean
来管理mq的生命周期,InitializingBean
用来初始化mq配置信息,DisposableBean
在mq执行完成后用来销毁bean。
@Component
public class CancelDisplayProducer implements InitializingBean, DisposableBean {
private static final Logger logger= LoggerFactory.getLogger(CancelDisplayProducer.class);
private DefaultMQProducer defaultMQProducer;
@Value("${crk.topic}")
private String topicName;
@Value("${crk.nameServer}")
private String nameServer;
@Value(("${crk.groupName}"))
private String groupName;
public SendResult sendCancelDisplayMq(String tag, String msg, Object primaryKey, Object hashVal){
logger.info("发送取消延时队列消息内容{}",msg);
Message rocketMsg = null;
com.alibaba.rocketmq.client.producer.SendResult sendResult = null;
try {
rocketMsg = new Message(topicName, tag, primaryKey + "", msg.getBytes("UTF-8"));
//设置该消息延迟1s发送
rocketMsg.setDelayTimeLevel(1);
sendResult = defaultMQProducer.send(rocketMsg, new MessageQueueSelector() {
//发送顺序消息
@Override
public MessageQueue select(List list, Message message, Object obj) {
int hashCode = obj.hashCode();
if(hashCode < 0) {
hashCode = Math.abs(hashCode);
}
int index = hashCode % list.size();
return list.get(index);
}
}, hashVal);
if(sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK) {
logger.info("发送取消延时队列消息成功,发送内容:{},keys:{}", msg, primaryKey);
}
} catch (Exception e) {
logger.error("发送取消延时队列消息异常【{}】", e);
}
return sendResult;
}
@Override
public void destroy() throws Exception {
defaultMQProducer.shutdown();
}
@Override
public void afterPropertiesSet() throws Exception {
logger.info("groupName=" + groupName);
logger.info("nameServer=" + nameServer);
//初始化
defaultMQProducer = new DefaultMQProducer();
defaultMQProducer.setNamesrvAddr(nameServer);
defaultMQProducer.setProducerGroup(groupName);
defaultMQProducer.setRetryTimesWhenSendFailed(5);
defaultMQProducer.setInstanceName("openCarCancelDisplayInstance");
//设置超时时间为5s
defaultMQProducer.setSendMsgTimeout(5000);
defaultMQProducer.start();
logger.info("DefaultMQProudcer start success!");
}
}
//***调用生产者发送消息***
cancelDisplayProducer.sendCancelDisplayMq("cancleDisplay",JSONObject.toJSONString(bodyJson),orderNo,orderNo);
Producer顺序发送
Rocketmq能够保证消息严格顺序,但是Rocketmq需要producer保证顺序消息按顺序发送到同一个queue中,比如购买流程(1)下单(2)支付(3)支付成功,
这三个消息需要根据特定规则将这个三个消息按顺序发送到一个queue
Producer端确保消息顺序唯一要做的事情就是将消息路由到特定的分区(这里的分区可以理解为不同的队列),在RocketMQ中,通过MessageQueueSelector来实现分区的选择。
如何实现把顺序消息发送到同一个queue:
一般消息是通过轮询所有队列发送的,顺序消息可以根据业务比如说订单号orderId相同的消息发送到同一个队列, 或者同一用户userId发送到同一队列等等
messageQueueList [orderId%messageQueueList.size()]
messageQueueList [userId%messageQueueList.size()]
Consumer
示例代码:
@Component
public class CancelDisplayConsumer implements InitializingBean, DisposableBean {
private static final String CANCEL_DISPLAY_GROUP_NAME="cancle_display_consumer_group";
private static final String CANCLE_DISPLAY_INSTANCE_NAME="cancle_display_consumer_instance";
private static final Logger logger= LoggerFactory.getLogger(CancelDisplayConsumer.class);
private DefaultMQPushConsumer consumer;
@Autowired
private CancelDisplayProducer cancelDisplayProducer;
@Autowired
private IComTransChannelConfigService comTransChannelConfigService;
@Value("${crk.nameServer}")
private String nameServer;
@Value("${crk.topic}")
private String topicName;
@Autowired
private IHongqiOrderMappingService hongqiOrderMappingService;
@Override
public void destroy() throws Exception {
consumer.shutdown();
logger.info("订单取消延时队列消费消息关闭");
}
@Override
public void afterPropertiesSet() throws Exception {
try {
consumer = new DefaultMQPushConsumer(CANCEL_DISPLAY_GROUP_NAME);
consumer.setNamesrvAddr(nameServer);
consumer.setInstanceName(CANCLE_DISPLAY_INSTANCE_NAME);
consumer.subscribe(topicName, "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderlyContext consumeOrderlyContext) {
for(MessageExt messageExt : list) {
logger.info("消费取消延迟消息start:{}", list);
String body = new String(messageExt.getBody());
JSONObject bodyJson = JSONObject.parseObject(body);
String orderNo = bodyJson.getString("orderNo");
String channel=bodyJson.getString("channel");
MDC.put("traceId", messageExt.getMsgId());
//逻辑代码忽略.........
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
} catch (MQClientException e) {
logger.error("消费延迟取消消息consume启动异常:{}",e);
}
}
}
如何保证消息不丢失
分别从Producer发送机制、Broker的持久化机制,以及消费者的offSet机制来最大程度保证消息不易丢失
一、producer重试发送消息
- 默认情况下,可以通过同步的方式阻塞式的发送,check SendStatus,状态是OK,表示消息一定成功的投递到了Broker,状态超时或者失败,则会触发默认的2次重试。此方法的发送结果,可能Broker存储成功了,也可能没成功
- 采取事务消息的投递方式,并不能保证消息100%投递成功到了Broker,但是如果消息发送Ack失败的话,此消息会存储在CommitLog当中,但是对ConsumerQueue是不可见的。可以在日志中查看到这条异常的消息,严格意义上来讲,也并没有完全丢失
- RocketMQ支持 日志的索引,如果一条消息发送之后超时,也可以通过查询日志的API,来check是否在Broker存储成功
二、broker的持久化机制
- 消息支持持久化到Commitlog里面,即使宕机后重启,未消费的消息也是可以加载出来的
2.Broker自身支持同步刷盘、异步刷盘的策略,可以保证接收到的消息一定存储在本地的内存中
- Broker集群支持 1主N从的策略,支持同步复制和异步复制的方式,同步复制可以保证即使Master 磁盘崩溃,消息仍然不会丢失
三、消费端的重试机制
消费者可以根据自身的策略批量Pull消息
1. Consumer自身维护一个持久化的offset(对应MessageQueue里面的min offset),标记已经成功消费或者已经成功发回到broker的消息下标
- 如果Consumer消费失败,那么它会把这个消息发回给Broker,发回成功后,再更新自己的offset
- 如果Consumer消费失败,发回给broker时,broker挂掉了,那么Consumer会定时重试这个操作
如果Consumer和broker一起挂了,消息也不会丢失,因为consumer 里面的offset是定时持久化的,重启之后,继续拉取offset之前的消息到本地
关于offset:
RocketMQ 中, 一 种类型的消息会放到 一 个 Topic 里,为了能够并行, 一般一个 Topic 会有多个 Message Queue (也可以 设置成一个), Offset是指某个 Topic下的一条消息在某个 Message Queue里的 位置,通过 Offset的值可以定位到这条消息,或者指示 Consumer从这条消息 开始向后继续处理。
Offset主要分为本地文件类型和 Broker代存的类型两种。
Rocketmq集群有两种消费模式
默认是 CLUSTERING 模式,也就是同一个 Consumer group 里的多个消费者每人消费一部分,各自收到的消息内容不一样。 这种情况下,由 Broker 端存储和控制 Offset 的值,使用 RemoteBrokerOffsetStore 结构。
BROADCASTING模式下,每个 Consumer 都收到这个 Topic 的全部消息,各个 Consumer 间相互没有干扰, RocketMQ 使用 LocalfileOffsetStore,把 Offset存到本地。