首页 网维知识库 RocketMq架构原理和使用总结

RocketMq架构原理和使用总结

RocketMQ是一个分布式开放消息中间件,底层基于队列模型来实现消息收发功能。RocketMQ集群中包含4个模块:Namesrv, Broker, Producer, Consu…

RocketMQ是一个分布式开放消息中间件,底层基于队列模型来实现消息收发功能。RocketMQ集群中包含4个模块:Namesrv, Broker, Producer, Consumer。

RocketMq架构原理和使用总结插图

主要功能

  • 削峰填谷(主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题)
  • 系统解耦(解决不同重要程度、不同能力级别系统之间依赖导致一死全死)
  • 提升性能(当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统)
  • 蓄流压测(线上有些链路不好压测,可以通过堆积一定量消息再放开来压测)

各个模块的作用

  • Namesrv: 存储当前集群所有Brokers信息、Topic跟Broker的对应关系。
  • Broker: 集群最核心模块,主要负责Topic消息存储、消费者的消费位点管理(消费进度)。
  • Producer: 消息生产者,每个生产者都有一个ID(编号),多个生产者实例可以共用同一个ID。同一个ID下所有实例组成一个生产者集群。
  • Consumer: 消息消费者,每个订阅者也有一个ID(编号),多个消费者实例可以共用同一个ID。同一个ID下所有实例组成一个消费者集群。

各个模块功能关系参考博客:https://www.cnblogs.com/wxd0108/p/6041829.html

功能架构部署图:

RocketMq架构原理和使用总结插图1

MQ集群工作流程

  1. 启动Namesrv,Namesrv起来后监听端口,等待Broker、Produer、Consumer连上来,相当于一个路由控制中心。 
  2. Broker启动,跟所有的Namesrv保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有topic信息。注册成功后,namesrv集群中就有Topic跟Broker的映射关系。 
  3. 收发消息前,先创建topic,创建topic时需要指定该topic要存储在哪些Broker上。也可以在发送消息时自动创建Topic。 
  4. Producer发送消息,启动时先跟Namesrv集群中的其中一台建立长连接,并从Namesrv中获取当前发送的Topic存在哪些Broker上,然后跟对应的Broker建立长连接,直接向Broker发消息。
  5. Consumer跟Producer类似。跟其中一台Namesrv建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

Producer

示例代码:

这里用InitializingBean,DisposableBean来管理mq的生命周期,InitializingBean用来初始化mq配置信息,DisposableBean在mq执行完成后用来销毁bean。

@Component
public class CancelDisplayProducer implements InitializingBean, DisposableBean {

    private static final Logger logger= LoggerFactory.getLogger(CancelDisplayProducer.class);

    private DefaultMQProducer defaultMQProducer;
    @Value("${crk.topic}")
    private String topicName;

    @Value("${crk.nameServer}")
    private String nameServer;

    @Value(("${crk.groupName}"))
    private String groupName;

    public SendResult sendCancelDisplayMq(String tag, String msg, Object primaryKey, Object hashVal){
        logger.info("发送取消延时队列消息内容{}",msg);
        Message rocketMsg = null;
        com.alibaba.rocketmq.client.producer.SendResult sendResult = null;
        try {
            rocketMsg =  new Message(topicName, tag, primaryKey + "", msg.getBytes("UTF-8"));
            //设置该消息延迟1s发送
            rocketMsg.setDelayTimeLevel(1);
            sendResult = defaultMQProducer.send(rocketMsg, new MessageQueueSelector() {
            //发送顺序消息
                @Override
                public MessageQueue select(List list, Message message, Object obj) {
                    int hashCode = obj.hashCode();
                    if(hashCode < 0) {
                        hashCode = Math.abs(hashCode);
                    }
                    int index = hashCode % list.size();
                    return list.get(index);
                }
            }, hashVal);
            if(sendResult != null && sendResult.getSendStatus() == SendStatus.SEND_OK) {
                logger.info("发送取消延时队列消息成功,发送内容:{},keys:{}", msg, primaryKey);
            }
        } catch (Exception e) {
            logger.error("发送取消延时队列消息异常【{}】", e);
        }
        return sendResult;

    }

    @Override
    public void destroy() throws Exception {
        defaultMQProducer.shutdown();
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        logger.info("groupName=" + groupName);
        logger.info("nameServer=" + nameServer);
        //初始化
        defaultMQProducer = new DefaultMQProducer();
        defaultMQProducer.setNamesrvAddr(nameServer);
        defaultMQProducer.setProducerGroup(groupName);
        defaultMQProducer.setRetryTimesWhenSendFailed(5);
        defaultMQProducer.setInstanceName("openCarCancelDisplayInstance");
        //设置超时时间为5s
        defaultMQProducer.setSendMsgTimeout(5000);
        defaultMQProducer.start();
        logger.info("DefaultMQProudcer start success!");
    }
}

//***调用生产者发送消息***
cancelDisplayProducer.sendCancelDisplayMq("cancleDisplay",JSONObject.toJSONString(bodyJson),orderNo,orderNo);

Producer顺序发送

Rocketmq能够保证消息严格顺序,但是Rocketmq需要producer保证顺序消息按顺序发送到同一个queue中,比如购买流程(1)下单(2)支付(3)支付成功,

这三个消息需要根据特定规则将这个三个消息按顺序发送到一个queue
Producer端确保消息顺序唯一要做的事情就是将消息路由到特定的分区(这里的分区可以理解为不同的队列),在RocketMQ中,通过MessageQueueSelector来实现分区的选择。

如何实现把顺序消息发送到同一个queue:

RocketMq架构原理和使用总结插图2

一般消息是通过轮询所有队列发送的,顺序消息可以根据业务比如说订单号orderId相同的消息发送到同一个队列, 或者同一用户userId发送到同一队列等等

messageQueueList [orderId%messageQueueList.size()]

messageQueueList [userId%messageQueueList.size()]

Consumer

示例代码:

@Component
public class CancelDisplayConsumer implements InitializingBean, DisposableBean {

    private static  final String CANCEL_DISPLAY_GROUP_NAME="cancle_display_consumer_group";

    private static  final  String CANCLE_DISPLAY_INSTANCE_NAME="cancle_display_consumer_instance";

    private static  final Logger logger= LoggerFactory.getLogger(CancelDisplayConsumer.class);

    private DefaultMQPushConsumer consumer;

    @Autowired
    private CancelDisplayProducer cancelDisplayProducer;
    @Autowired
    private IComTransChannelConfigService comTransChannelConfigService;
    @Value("${crk.nameServer}")
    private String nameServer;
    @Value("${crk.topic}")
    private String topicName;
    @Autowired
    private IHongqiOrderMappingService hongqiOrderMappingService;
    @Override
    public void destroy() throws Exception {
        consumer.shutdown();
        logger.info("订单取消延时队列消费消息关闭");
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        try {
            consumer = new DefaultMQPushConsumer(CANCEL_DISPLAY_GROUP_NAME);
            consumer.setNamesrvAddr(nameServer);
            consumer.setInstanceName(CANCLE_DISPLAY_INSTANCE_NAME);
            consumer.subscribe(topicName, "*");
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List list, ConsumeOrderlyContext consumeOrderlyContext) {

                        for(MessageExt messageExt : list) {
                            logger.info("消费取消延迟消息start:{}", list);
                            String body = new String(messageExt.getBody());
                            JSONObject bodyJson = JSONObject.parseObject(body);
                            String orderNo = bodyJson.getString("orderNo");
                            String channel=bodyJson.getString("channel");
                            MDC.put("traceId", messageExt.getMsgId());
                            //逻辑代码忽略.........
                    return ConsumeOrderlyStatus.SUCCESS;
                }
            });
            consumer.start();
        } catch (MQClientException e) {
            logger.error("消费延迟取消消息consume启动异常:{}",e);
        }
    }
}

如何保证消息不丢失

分别从Producer发送机制、Broker的持久化机制,以及消费者的offSet机制来最大程度保证消息不易丢失

一、producer重试发送消息

  1. 默认情况下,可以通过同步的方式阻塞式的发送,check SendStatus,状态是OK,表示消息一定成功的投递到了Broker,状态超时或者失败,则会触发默认的2次重试。此方法的发送结果,可能Broker存储成功了,也可能没成功
  2. 采取事务消息的投递方式,并不能保证消息100%投递成功到了Broker,但是如果消息发送Ack失败的话,此消息会存储在CommitLog当中,但是对ConsumerQueue是不可见的。可以在日志中查看到这条异常的消息,严格意义上来讲,也并没有完全丢失
  3. RocketMQ支持 日志的索引,如果一条消息发送之后超时,也可以通过查询日志的API,来check是否在Broker存储成功

二、broker的持久化机制

  1. 消息支持持久化到Commitlog里面,即使宕机后重启,未消费的消息也是可以加载出来的

2.Broker自身支持同步刷盘、异步刷盘的策略,可以保证接收到的消息一定存储在本地的内存中

  1. Broker集群支持 1主N从的策略,支持同步复制和异步复制的方式,同步复制可以保证即使Master 磁盘崩溃,消息仍然不会丢失

三、消费端的重试机制

消费者可以根据自身的策略批量Pull消息
1. Consumer自身维护一个持久化的offset(对应MessageQueue里面的min offset),标记已经成功消费或者已经成功发回到broker的消息下标

  1. 如果Consumer消费失败,那么它会把这个消息发回给Broker,发回成功后,再更新自己的offset
  2. 如果Consumer消费失败,发回给broker时,broker挂掉了,那么Consumer会定时重试这个操作

如果Consumer和broker一起挂了,消息也不会丢失,因为consumer 里面的offset是定时持久化的,重启之后,继续拉取offset之前的消息到本地

关于offset:

RocketMQ 中, 一 种类型的消息会放到 一 个 Topic 里,为了能够并行, 一般一个 Topic 会有多个 Message Queue (也可以 设置成一个), Offset是指某个 Topic下的一条消息在某个 Message Queue里的 位置,通过 Offset的值可以定位到这条消息,或者指示 Consumer从这条消息 开始向后继续处理。

Offset主要分为本地文件类型和 Broker代存的类型两种。

RocketMq架构原理和使用总结插图3

Rocketmq集群有两种消费模式

默认是 CLUSTERING 模式,也就是同一个 Consumer group 里的多个消费者每人消费一部分,各自收到的消息内容不一样。 这种情况下,由 Broker 端存储和控制 Offset 的值,使用 RemoteBrokerOffsetStore 结构。

BROADCASTING模式下,每个 Consumer 都收到这个 Topic 的全部消息,各个 Consumer 间相互没有干扰, RocketMQ 使用 LocalfileOffsetStore,把 Offset存到本地。

免责声明:文章内容不代表本站立场,本站不对其内容的真实性、完整性、准确性给予任何担保、暗示和承诺,仅供读者参考,文章版权归原作者所有。如本文内容影响到您的合法权益(内容、图片等),请及时联系本站,我们会及时删除处理。

作者: 3182235786a

为您推荐

windows8

windows8

Windows 8 是微软公司于 2012 年推出的一款操作系统,因其独特的界面设计和功能受到广泛关注。本文将从 Win...
Windows 下载指南:获取最新版本的 Windows 操作系统

Windows 下载指南:获取最新版本的 Windows 操作系统

作为全球最受欢迎的操作系统之一,Windows 提供了丰富的功能和用户友好的界面。如果您想获取最新版本的 Windows...
windows资源管理器已停止工作

windows资源管理器已停止工作

Windows 资源管理器已停止工作是 Windows 操作系统中常见的一个问题,通常表现为资源管理器窗口无法正常打开或...
Windows 10 激活方法详解:轻松激活您的操作系统

Windows 10 激活方法详解:轻松激活您的操作系统

购买了全新的Windows 10操作系统后,如何激活它成为许多用户关注的问题。本文将为您详细介绍Windows 10的激...
windows10激活工具

windows10激活工具

Windows 10 激活工具是一款用于激活 Windows 10 操作系统的软件。通过使用激活工具,用户可以轻松地激活...

发表回复

返回顶部