本文最后更新于 2023-12-09,文章内容可能已经过时。

MQ:Message Queue:

MQ 的全称是消息队列(Message Queue)。消息队列是一种用于在分布式系统中传递消息的通信模式。它允许应用程序或系统的不同组件通过异步方式进行通信,提高了系统的可伸缩性、可靠性和灵活性。在消息队列中,消息生产者将消息发送到队列,而消息消费者从队列中接收消息,实现了解耦和异步通信。

本质上,mq能做的就是一个接口内访问另一个接口。但是集成度更高,更加安全和快捷。

主要讲解rockermq.

1.安装:

安装特点:

  • NameServer是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。

  • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。 注意:当前RocketMQ版本在部署架构上支持一Master多Slave,但只有BrokerId=1的从服务器才会参与消息的读负载。

  • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。

  • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,消费者在向Master拉取消息时,Master服务器会根据拉取偏移量与最大偏移量的距离(判断是否读老消息,产生读I/O),以及从服务器是否可读等因素建议下一次是从Master还是Slave拉取。

集群工作流程:

  • 启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。

  • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。

  • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。

  • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

1.1 docker安装:

我们在安装rocketmq后,要开放的端口一般有4个:9876,10911,10912,10909.

搜索镜像
docker search rocketmq
拉取镜像
docker pull rocketmqinc/rocketmq
启动nameserver
mkdir -p /docker/rocketmq/nameserver/logs /docker/rocketmq/nameserver/store
docker run -d --restart=always --name rmqnamesrv --privileged=true -p 9876:9876  -v /docker/rocketmq/nameserver/logs:/root/logs -v /docker/rocketmq/nameserver/store:/root/store -e "MAX_POSSIBLE_HEAP=100000000" rocketmqinc/rocketmq sh mqnamesrv
启动broker
创建配置文件
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1 = 主机的IP
启动容器
docker run -d --restart=always --name rmqbroker --link rmqnamesrv:namesrv -p 10911:10911 -p 10909:10909 --privileged=true -v /docker/rocketmq/data/broker/logs:/root/logs -v /docker/rocketmq/data/broker/store:/root/store -v /docker/rocketmq/conf/broker.conf:/opt/docker/rocketmq/broker.conf -e "NAMESRV_ADDR=namesrv:9876" -e "MAX_POSSIBLE_HEAP=200000000" rocketmqinc/rocketmq sh mqbroker -c /opt/docker/rocketmq/broker.conf

name-server参数说明:

参数

说明

-d

以守护进程的方式启动

- -restart=always

docker重启时候容器自动重启

- -name rmqnamesrv

把容器的名字设置为rmqnamesrv

-p 9876:9876

把容器内的端口9876挂载到宿主机9876上面

-v /docker/rocketmq/nameserver/logs:/root/logs

目录挂载

-v /docker/rocketmq/nameserver/store

目录挂载

rmqnamesrv

容器的名字

-e “MAX_POSSIBLE_HEAP=100000000”

设置容器的最大堆内存为100000000

rocketmqinc/rocketmq

使用的镜像名称

sh mqnamesrv

启动namesrv服务

broker参数说明:

-d

以守护进程的方式启动

- -restart=always

docker重启时候容器自动重启

- -name rmqbroker

把容器的名字设置为rmqbroker

- --link rmqnamesrv:namesrv

和rmqnamesrv容器通信

-p 9876:9876

把容器内的端口9876挂载到宿主机9876上面

-p 10909:10909

把容器的vip通道端口挂载到宿主机

-e “NAMESRV_ADDR=namesrv:9876”

指定namesrv的地址为本机namesrv的ip地址:9876

-e “MAX_POSSIBLE_HEAP=200000000” rocketmqinc/rocketmq sh mqbroker

指定broker服务的最大堆内存

rocketmqinc/rocketmq

使用的镜像名称

sh mqbroker -c /opt/docker/rocketmq/broker.conf

指定配置文件启动broker节点

1.2 普通安装:

首先安装java.

//下载
wget https://dist.apache.org/repos/dist/release/rocketmq/5.1.1/rocketmq-all-5.1.1-bin-release.zip
unzip rocketmq-all-5.1.1-bin-release.zip

下载目录介绍

  • bin:启动脚本,包括shell脚本和CMD脚本

  • conf:实例配置文件 ,包括broker配置文件、logback配置文件等

  • lib:依赖jar包,包括Netty、commons-lang、FastJSON等

修改配置:

修改runbroker.sh runserver.sh

JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m"

将启动jvm内存参数调小

修改conf/broker.conf

这里需要改一下Broker配置文件,需要指定NameServer的地址,因为需要Broker需要往NameServer注册

在文件末尾追加namesrv地址

namesrvAddr = localhost:9876

因为NameServer跟Broker在同一台机器,所以是localhost,NameServer端口默认的是9876。

文件末尾继续追加brokerIp,IP值是当前部署broker的服务器外网IP

brokerIP1 = 192.168.200.143
brokerIP2 = 192.168.200.143

因为Broker向NameServer进行注册的时候,带过去的ip如果不指定就会自动获取,但是自动获取的有个坑,就是有可能客户端无法访问到这个自动获取的ip,所以我建议手动指定客户端可以访问到的服务器ip。img

启动

nameserver:

sh ./mqnamesrv

cd /root/rocketmq/rocketmq-all/bin
​
nohup sh `./mqnamesrv` &

在bin目录下执行

运行jps,出现了namesrvstartup即为成功

broker:

进入bin目录执行

nohup sh ./mqbroker -c ../conf/broker.conf -n localhost:9876 autoCreateTopicEnable=true &

jps查看当前已启动的java进程,出现brokerstartup即为成功

2.特性:

2.1 MQ选择:

特性

ActiveMQ

RabbitMQ

RocketMQ

kafka

吞吐量

万级

万级

10万级

10万级

时效性

ms级

微秒级

ms级

ms级

可用性

高:主从

高:主从

非常高:分布式

非常高:分布式

消息可靠性

可能丢失

可能丢失

0丢失

0丢失

2.1.1 RabbitMQ:

  1. 开发语言: RabbitMQ 使用 Erlang 语言进行开发,这使得它在处理并发连接时非常强大和高效。

  2. 协议支持: RabbitMQ 支持多种协议,包括AMQP(Advanced Message Queuing Protocol)等,这使得它具有很好的可扩展性和与其他系统集成的能力。

  3. 可靠性: RabbitMQ 提供了强大的消息持久性和可靠性保证,可以确保消息不会在系统故障或重启时丢失。

  4. 社区和生态系统: RabbitMQ 有一个活跃的社区,广泛用于企业和开源项目,有大量的文档和教程可用。

2.1.2 RocketMQ:

  1. 开发语言: RocketMQ 使用 Java 进行开发,因此更适合Java生态系统。

  2. 协议支持: RocketMQ 采用了自己的通信协议,与其他消息队列系统可能不太兼容。

  3. 可靠性: RocketMQ 也提供了消息持久性和可靠性,可以保证消息不会丢失。

  4. 分布式特性: RocketMQ 是为构建分布式系统而设计的,提供了一些分布式特性,如水平扩展和高可用性。

  5. 适用场景: RocketMQ 在阿里巴巴集团的业务中得到广泛应用,特别是在大规模的分布式场景下。

选择 RabbitMQ 还是 RocketMQ 取决于项目的具体需求和技术栈。如果你的项目主要使用Java,并且你需要一个在Java生态系统中更紧密集成的消息队列系统,RocketMQ可能是一个更好的选择。如果你需要更广泛的语言支持和更成熟的社区支持,RabbitMQ可能更适合。

3.使用:

3.1 Java原生使用:

3.1.1 依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>5.1.1</version>
</dependency>

3.1.2 生产者:

public class Producer {
    public static void main(String[] args) throws Exception {
        //创建一个生产者,指定生产者组为StarGeo
        DefaultMQProducer producer = new DefaultMQProducer("StarGeo");

        // 指定NameServer的地址
        producer.setNamesrvAddr("154.8.204.64:9876");
        // 第一次发送可能会超时,设置的比较大
        producer.setSendMsgTimeout(1000000);

        // 启动生产者
        producer.start();

        // 创建一条消息
        // topic为HomuraAkime
        // 消息内容为homura daisuki
        // tags 为 homura
        Message msg = new Message("HomuraAkime", "homura", "homura daisuki ".getBytes(RemotingHelper.DEFAULT_CHARSET));

        // 发送消息并得到消息的发送结果,然后打印
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);

        // 关闭生产者
        producer.shutdown();
    }

}

3.1.3 消费者:

public class Consumer {
    public static void main(String[] args) throws InterruptedException, MQClientException {

        // 通过push模式消费消息,指定消费者组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("StarGeoConsumer");

        // 指定NameServer的地址
        consumer.setNamesrvAddr("154.8.204.64:9876");

        // 订阅这个topic下的所有的消息
        consumer.subscribe("HomuraAkime", "*");

        // 注册一个消费的监听器,当有消息的时候,会回调这个监听器来消费消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.printf("消费消息:%s", new String(msg.getBody()) + "\n");
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 启动消费者
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }

}

3.2 spring集成:

3.2.1 依赖:

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>2.1.1</version>
</dependency>

3.2.2 yml配置:

rocketmq:
  producer:
    group: homura
  name-server: 154.8.204.64:9876

3.2.3 生产者:

@Slf4j
@RestController
public class RocketMqDemo {

    @Autowired
    RocketMQTemplate rocketMQTemplate;

    @GetMapping("send/{id}")
    public String send(@PathVariable("id") String id){
        UserVo userVo  = new UserVo(id,"侯征");
        log.warn(JSON.toJSONString(userVo));
        rocketMQTemplate.send("rocket-topic-01", MessageBuilder.withPayload(userVo).build());
        return "SUCESS";
    }
}

3.2.4 消费者:

@Slf4j
@Component
@RocketMQMessageListener(topic = "rocket-topic-01", consumerGroup = "my-rocket-topic-01")
public class UserConsumer implements RocketMQListener<UserVo> {

    @Override
    public void onMessage(UserVo message) {
        log.warn("接受到消息: {}",message.toString());
    }
}

4.应用场景:

4.1 解耦:

在跨系统调用接口时,会出现代码侵入性,而mq可以解决这种问题。

20200303143813540

20200303154127310

4.2 异步:

首先要知道什么叫异步:

  1. 同步(Synchronous):

    • 在同步操作中,发起请求的任务会被阻塞,直到得到响应或完成特定操作为止。

    • 执行顺序是按照调用的顺序进行的,即一个任务在完成之前会一直等待。

    • 同步操作通常意味着代码会按照顺序执行,每个操作都会等待前一个操作的完成。

  2. 异步(Asynchronous):

    • 在异步操作中,发起请求的任务不会被阻塞,它可以继续执行其他任务而不必等待操作完成。

    • 异步操作通常涉及回调机制、事件驱动或者使用异步任务来处理。

    • 异步操作允许并发执行多个任务,提高系统的响应性和性能。

简单来说,同步就是顺序执行,异步就是乱序执行,能缩短所需要的时间。

使用mq可以实现异步,在跨系统接口调用时,使用mq可以把请求发送出去然后返回,从而缩短处理时间。

20200306181649572

20200306184550280

4.3 削峰:

当接口吞吐量过大时,可以先把请求放入mq,然后再从mq中取出能处理的请求进行处理。

20200307135352764

2020030714124572

但是使用mq也会出现系统可用性降低,系统复杂程度提高,并有可能会出现一致性问题。