消息队列

消息(Message)是指在应用间传送的数据。消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。

消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

以常见的订单系统为例,用户点击【下单】按钮之后的业务逻辑可能包括:扣减库存、生成相应单据、发红包、发短信通知。在业务发展初期这些逻辑可能放在一起同步执行,随着业务的发展订单量增长,需要提升系统服务的性能,这时可以将一些不需要立即生效的操作拆分出来异步执行,比如发放红包、发短信通知等。这种场景下就可以用 MQ ,在下单的主流程(比如扣减库存、生成相应单据)完成之后发送一条消息到 MQ 让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由 MQ 推送消息),当发现 MQ 中有发红包或发短信之类的消息时,执行相应的业务逻辑。

以上是用于业务解耦的情况,其它常见场景包括最终一致性、广播、错峰流控等等。

RabbitMQ 特点

RabbitMQ 是一个由 Erlang 语言开发的 AMQP 的开源实现。

AMQP :Advanced Message Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间件设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件的限制。

RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:

1、可靠性(Reliability)

RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

2、灵活的路由(Flexible Routing)

在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。

3、 消息集群(Clustering)

多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。

3、 高可用(Highly Available Queues)

队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

4、多种协议(Multi-protocol)

RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。

5、多语言客户端(Many Clients)

RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。

6、 管理界面(Management UI)

RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。

7、 跟踪机制(Tracing)

如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。

RabbitMQ安装

一般来说安装 RabbitMQ 之前要安装 Erlang ,可以去Erlang官网下载。接着去RabbitMQ官网下载安装包,之后解压缩即可。

Erlang官方下载地址:https://www.erlang.org/downloads

RabbitMQ官方下载地址:https://www.rabbitmq.com/download.html

依赖包安装

安装RabbitMQ之前必须要先安装所需要的依赖包可以使用下面的一次性安装命令

yum install gcc glibc-devel make ncurses-devel openssl-devel xmlto -y

安装Erlang

1、 将Erlang源代码包otp_src_19.3.tar.gz上传到Linux的/home目录下

2、解压erlang 源码包

1
tar -zxvf otp_src_19.3.tar.gz

3、手动创建erlang 的安装目录

1
mkdir /usr/local/erlang

4、进入erlang的解压目录

1
cd otp_src_19.3

5、配置erlang的安装信息

1
./configure --prefix=/usr/local/erlang --without-javac

6、编译并安装

1
make && make install

7、配置环境变量

1
vim /etc/profile

8、将这些配置填写到profile文件的最后

1
2
3
ERL_HOME=/usr/local/erlang
PATH=$ERL_HOME/bin:$PATH
export ERL_HOME PATH

9、启动环境变量配置文件

1
source /etc/profile

开始安装

1、 将RabbitMQ安装包rabbitmq-server-3.7.2-1.el7.noarch.rpm上传到/home目录

2、安装RabbitMQ

1
rpm -ivh --nodeps rabbitmq-server-3.7.2-1.el7.noarch.rpm

启动和关闭

1、启动RabbitMQ

1
rabbitmq-server start &

1.5、后台启动
有时&不能后台启动,用以下命令即可(关闭服务的状态下,后台启动)

1
rabbitmq-server -detached

注意:这里可能会出现错误,错误原因是/var/lib/rabbitmq/.erlang.cookie文件权限不够。

解决方案对这个文件授权

1
2
3
chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie

chmod 400 /var/lib/rabbitmq/.erlang.cookie

2、停止服务

1
rabbitmqctl stop

插件管理

1、添加插件

1
rabbitmq-plugins enable {插件名}

2、删除插件

1
rabbitmq-plugins disable {插件名}

注意:RabbitMQ启动以后可以使用浏览器进入管控台,但是默认情况RabbitMQ不允许直接使用浏览器浏览器进行访问因此必须添加插件

1
rabbitmq-plugins enable rabbitmq_management

3、使用浏览器访问管控台http://RabbitMQ服务器IP:15672

http://192.168.147.130:15672

用户管理

RabbitMQ安装成功后使用默认用户名guest登录

账号:guest

密码:guest

注意:这里guest只允许本机登录访问需要创建用户并授权远程访问命令如下

1、 添加用户:rabbitmqctl add_user {username} {password}

1
rabbitmqctl add_user root root

2、 删除用户:rabbitmqctl delete_user {username}

3、 修改密码:rabbitmqctl change_password {username} {newpassword}

1
rabbitmqctl change_password root root

4、 设置用户角色:rabbitmqctl set_user_tags {username} {tag}

1
rabbitmqctl set_user_tags root administrator

tag参数表示用户角色取值为:management、monitoring、policymaker、administrator

各角色详解:

management

用户可以通过AMQP做的任何事外加:

列出自己可以通过AMQP登入的virtual hosts

查看自己的virtual hosts中的queues, exchanges 和 bindings

查看和关闭自己的channels 和 connections

查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动。

policymaker

management可以做的任何事外加:

查看、创建和删除自己的virtual hosts所属的policies和parameters

monitoring

management可以做的任何事外加:

列出所有virtual hosts,包括他们不能登录的virtual hosts

查看其他用户的connections和channels

查看节点级别的数据如clustering和memory使用情况

查看真正的关于所有virtual hosts的全局的统计信息

administrator

policymaker和monitoring可以做的任何事外加:

创建和删除virtual hosts

查看、创建和删除users

查看创建和删除permissions

关闭其他用户的connections

权限管理

1、 授权命令:rabbitmqctl set_permissions [-p vhostpath] {user} {conf} {write} {read}

-p vhostpath :用于指定一个资源的命名空间,例如 –p / 表示根路径命名空间

user:用于指定要为哪个用户授权填写用户名
conf:一个正则表达式match哪些配置资源能够被该用户配置。
write:一个正则表达式match哪些配置资源能够被该用户读。
read:一个正则表达式match哪些配置资源能够被该用户访问。

例如:

1
rabbitmqctl set_permissions -p / root '.*' '.*' '.*'

用于设置root用户拥有对所有资源的 读写配置权限

2、查看用户权限 rabbitmqctl list_permissions [vhostpath]

例如

查看根径经下的所有用户权限

1
rabbitmqctl list_permissions

查看指定命名空间下的所有用户权限

1
rabbitmqctl list_permissions /abc

3、查看指定用户下的权限rabbitmqctl list_user_permissions {username}

例如

查看root用户下的权限

1
rabbitmqctl list_user_permissions root

4、清除用户权限rabbitmqctl clear_permissions {username}

例如:

清除root用户的权限

1
rabbitmqctl clear_permissions root

vhost管理

vhost是RabbitMQ中的一个命名空间,可以限制消息的存放位置利用这个命名空间可以进行权限的控制有点类似Windows中的文件夹一样,在不同的文件夹中存放不同的文件。

1、添加vhost: rabbitmqctl add vhost {name}

例如

1
rabbitmqctl add_vhost powernode

2、删除vhost:rabbitmqctl delete vhost {name}

例如

1
rabbitmqctl delete_vhost powernode

消息发送和接收

所有 MQ 产品从模型抽象上来说都是一样的过程:
消费者(consumer)订阅某个队列。
生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。

上面是MQ的基本抽象模型,但是不同的MQ产品有有者不同的机制,RabbitMQ实际基于AMQP协议的一个开源实现,因此RabbitMQ内部也是AMQP的基本概念。

RabbitMQ的内部接收如下:

1、Message
消息,消息是不具体的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

2、Publisher
消息的生产者,也是一个向交换器发布消息的客户端应用程序。

3、Exchange
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。.

4 Binding
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

5、Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

6、Connection
网络连接,比如一个TCP连接。

7、Channel
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

8、Consumer
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

9、Virtual Host
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 /

10、Broker
表示消息队列服务器实体。

Exchange 类型

Exchange分发消息时根据类型的不同分发策略有区别,目前共四种类型:direct、fanout、topic、headers 。headers 匹配 AMQP 消息的 header 而不是路由键,此外 headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了,所以直接看另外三种类型

  1. direct
    消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“green”,则只转发 routing key 标记为“green”的消息,它是完全匹配、单播的模式。

  2. fanout
    每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。

    • 适用场景:
      群聊功能,广播消息给当前群聊中的所有人
      大型玩家在玩在线游戏的时候,可以用它来广播重大消息
  3. topic
    topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”“*”#匹配0个或多个单词,*匹配不多不少一个单词。

    • 适用场景:
      新闻的分类更新
      同一个任务多个工作者协调完成
      同一问题需要特定人员知晓

Java调用Rabbitmq客户端

此处是原生写法,以后一般都是用springboot的啦

在java中实现对Rabbitmq进行消息队列编程,需要导入rabbitmq客户端。为了实现生产者和消费者的通信,接下来分别创建两个项目进行模拟。

导包

1
2
3
4
5
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.13.1</version>
</dependency>

小总结

一般情况下交换机和队列都是让监听者/消费者去创建(也要看实际业务情况)

创建工厂、配置好地址端口号账号密码实例化链接对象实例化通道对象
就可以设置队列交换机

创建队列:

1
2
3
4
5
6
7
//创建队列 ,名字为myQueue
//参数一:队列名
//参数二:消息是否持久化
//参数三:是否独有的,一般false
//参数四:队列消费完后是否删除
//参数五:???
channel.queueDeclare("myQueue", true, false, false, null);

直连交换机

1
2
3
4
5
//参数一:交换机名
//参数二:队列名/路由名
//参数三:道具???
//参数四:要发送的信息,需编码
channel.basicPublish("","myQueue",null,message.getBytes("UTF-8"));

声明交换机

1
2
3
4
//参数一:交换机名
//参数二:类型
//参数三:是否持久化
channel.exchangeDeclare("topicExchange","topic",true);

绑定

1
2
3
4
//参数一:队列名
//参数二:交换机名
//参数三:key
channel.queueBind("topicQueue01","topicExchange","aa.#");

无交换机模式

生产者

创建发送者类Sender,编写发送消息的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

public class Sender {
public static void main(String[] args) throws IOException, TimeoutException {
//创建链接工厂对象
ConnectionFactory factory=new ConnectionFactory();
factory.setHost("119.91.252.224");//设置RabbitMQ的主机IP
factory.setPort(5672);//设置RabbitMQ的端口号
factory.setUsername("root");//设置访问用户名
factory.setPassword("root");//设置访问密码
Connection connection=null;//定义链接对象
Channel channel=null;//定义通道对象
connection=factory.newConnection();//实例化链接对象
channel=connection.createChannel();//实例化通道对象
String message ="Hello World!";
//创建队列 ,名字为myQueue
//参数一:队列名
//参数二:消息是否持久化
//参数三:是否独有的,一般false
//参数四:队列消费完后是否删除
//参数五:???
channel.queueDeclare("myQueue", true, false, false, null);
//发送消息到指定队列
//参数一:交换机名
//参数二:队列名/路由名
//参数三:道具???
//参数四:要发送的信息,需编码
channel.basicPublish("","myQueue",null,message.getBytes("UTF-8"));
System.out.println("消息发送成功: "+message);
channel.close();
connection.close();
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class Receiver {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("root"); //设置用户名
factory.setPassword("root"); //设置密码
factory.setHost("119.91.252.224"); //消费队列服务器名
//建立到代理服务器到连接
Connection conn = factory.newConnection();
//获得信道
final Channel channel = conn.createChannel();
//声明队列
channel.queueDeclare("myQueue", true, false, false, null);
//消费消息
boolean autoAck = true;
String consumerTag = "";
//接收消息
//参数1 队列名称
//参数2 是否自动确认消息 true表示自动确认 false表示手动确认
//参数3 为消息标签 用来区分不同的消费者这里暂时为""
// 参数4 消费者回调方法用于编写处理消息的具体代码(例如打印或将消息写入数据库)
channel.basicConsume("myQueue", autoAck, consumerTag, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
//获取消息数据
String bodyStr = new String(body, "UTF-8");
System.out.println(bodyStr); //输出消息
}
});

//流关闭,工厂关闭
channel.close();
conn.close();
}
}

direct模式

只接收相同key、路由匹配的

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class DirectReceiver {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("119.91.252.224");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

//创建队列
channel.queueDeclare("myDirectQue1",true,false,false,null);
// 船舰交换机
channel.exchangeDeclare("directExchange", "direct",true);
// 队列,交换机,key相互绑定
channel.queueBind("myDirectQue1","directExchange", "directRoutingKey");
// 开始监听
channel.basicConsume("myDirectQue1",true,"",new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("消费者 = " + message);
}
});
}
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

public class DirectSender {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("119.91.252.224");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

String message = "direct的消息!";
// 发送
channel.basicPublish("directExchange", "directRoutingKey", null, message.getBytes("utf-8"));
System.out.println("发送成功");

}
}

fanout模式

类似广播

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

public class FanoutReceiver {
public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("119.91.252.224");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 随机生成队列名
String queueName= channel.queueDeclare().getQueue();
// 第二个属性 == fanout
channel.exchangeDeclare("fanoutExchange","fanout",true);
//将这个随机的队列绑定到交换机中, 由于是fanout类型的交换机因此不需指定RoutingKey进行绑定
channel.queueBind(queueName,"fanoutExchange","");
// 开始监听
channel.basicConsume(queueName,true,"",new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message=new String(body);
System.out.println("Receive01消费者 ---"+message);
}
});
}
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

public class FanoutSender {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("119.91.252.224");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

String message = "fanout的测试消息!";
// 发送
channel.basicPublish("fanoutExchange", "", null, message.getBytes("utf-8"));
System.out.println("消息发送成功");

}
}

topic模式

根据不同的key接受相应的消息

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

public class TopicReceiver {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("119.91.252.224");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("topicQueue01",true,false,true,null);
// 声明交换机
channel.exchangeDeclare("topicExchange","topic",true);
// 绑定,第三个属性类似正则表达式(但有区别)
channel.queueBind("topicQueue01","topicExchange","aa.#");
// 开始监听
channel.basicConsume("topicQueue01",true,"",new DefaultConsumer(channel){
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message=new String(body);
System.out.println("Receive01消费者aa.# ---"+message);
}
});
}
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class TopicSender {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("119.91.252.224");
factory.setPort(5672);
factory.setUsername("root");
factory.setPassword("root");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

String message="topic的测试消息!";
// 发送指定交换机,符合的key
channel.basicPublish("topicExchange","aa.bb.cc",null,message.getBytes("utf-8"));
System.out.println("消息发送成功");
}
}

确认机制

在使用RabbitMQ的时候,我们可以通过消息持久化操作来解决因为服务器的异常奔溃导致的消息丢失,除此之外我们还会遇到一个问题,当消息的发布者在将消息发送出去之后,消息到底有没有正确到达broker代理服务器呢?如果不进行特殊配置的话,默认情况下发布操作是不会返回任何信息给生产者的,也就是默认情况下我们的生产者是不知道消息有没有正确到达broker的,如果在消息到达broker之前已经丢失的话,持久化操作也解决不了这个问题,因为消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎么解决呢?

RabbitMQ为我们提供了两种方式:

通过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案;
通过将channel设置成confirm模式来实现;

事务机制

RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback()

txSelect:用于将当前channel设置成transaction模式

txCommit:用于提交事务

txRollback:用于回滚事务

在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。

发送者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class Sender {
public static void main(String[] args) throws IOException, TimeoutException {
//创建链接工厂对象
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("119.91.252.224");//设置RabbitMQ的主机IP
factory.setPort(5672);//设置RabbitMQ的端口号
factory.setUsername("root");//设置访问用户名
factory.setPassword("root");//设置访问密码

Connection connection = null;
Channel channel = null;

connection = factory.newConnection();

channel = connection.createChannel();

// 声明交换机
channel.exchangeDeclare("directTransactionExchange", "direct", true);

String message = "事务的第一条消息";

String message2 = "事务的第二条消息";

long start = System.currentTimeMillis();
// 开启事务
channel.txSelect();

// 这里循环只为测试时间,对比效率
for (int i = 0; i < 10000; i++) {

// 发送
channel.basicPublish("directTransactionExchange", "transactionRoutingKey", null, message.getBytes("utf-8"));

channel.basicPublish("directTransactionExchange", "transactionRoutingKey", null, message2.getBytes("utf-8"));
}

//提交事务
channel.txCommit();
long end = System.currentTimeMillis() - start;
System.out.println("运行时间" + end);
// 运行时间1706
System.out.println("消息发送成功");
}
}

接收者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class Reciver {
public static void main(String[] args) throws IOException, TimeoutException {
//创建链接工厂对象
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("119.91.252.224");//设置RabbitMQ的主机IP
factory.setPort(5672);//设置RabbitMQ的端口号
factory.setUsername("root");//设置访问用户名
factory.setPassword("root");//设置访问密码

Connection connection = null;
Channel channel = null;

connection = factory.newConnection();

channel = connection.createChannel();

// 声明队列
channel.queueDeclare("transactionQueue", true, false, false, null);
// 声明交换机
channel.exchangeDeclare("directTransactionExchange", "direct", true);
// 绑定队列
channel.queueBind("transactionQueue", "directTransactionExchange", "transactionRoutingKey");

//开始消费
channel.basicConsume("transactionQueue", true, "", new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("消费者 ---" + message);
}
});

}
}

生产者确认模式

生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker回传给生产者的确认消息中delivery-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理;

confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息;
开启confirm模式的方法:

生产者通过调用channel的confirmSelect方法将channel设置为confirm模式,(注意一点,已经在transaction事务模式的channel是不能再设置成confirm模式的,即这两种模式是不能共存的)

普通confirm模式

每发送一条消息,调用waitForConfirms()方法等待服务端confirm,这实际上是一种串行的confirm,每publish一条消息之后就等待服务端confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

public class SenderConfirm {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建链接工厂对象
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("119.91.252.224");//设置RabbitMQ的主机IP
factory.setPort(5672);//设置RabbitMQ的端口号
factory.setUsername("root");//设置访问用户名
factory.setPassword("root");//设置访问密码

Connection connection = null;
Channel channel = null;

connection = factory.newConnection();

channel = connection.createChannel();

//创建队列名字confirmQueue
channel.queueDeclare("confirmQueue",true,false,false,null);

//开启发送方确认模式
String message="普通发送者确认模式测试消息!";
//启动发送者确认模式
channel.confirmSelect();

long time=System.currentTimeMillis();
// 循环100次,发送消息到指定的队列
for(int i=1;i<=100;i++){
message="Hello World!..."+i;
System.out.println(message);
channel.basicPublish("","confirmQueue",null,message.getBytes("utf-8"));
}
// System.out.println(channel.waitForConfirms()); 返回布尔类型
System.out.println("单条确认使用时间="+(System.currentTimeMillis()-time));
// 时间大概10左右
System.out.println("消息发送成功"+message);

}

//阻塞线程等待服务返回响应 ,用于是否消费发送成功,如果服务确认消费已经发送完成则返回true 否则返回false
//可以为这个方法指定一个毫秒用于确定我们的需要等待服务确认的超时时间,
//如果超过了指定的时间以后则会抛出异常InterruptedException 表示服务器出现问题了需要补发消息或
//将消息缓存到Redis中稍后利用定时任务补发
//无论是返回false还是抛出异常消息都有可能发送成功有可能没有发送成功
//如果我们要求这个消息一定要发送到队列例如订单数据,那怎么我们可以采用消息补发
//所谓补发就是重新发送一次消息,可以使用递归或利用Redis+定时任务来完成补发
}

批量confirm模式

每发送一批消息之后,调用waitForConfirms()方法,等待服务端confirm,这种批量确认的模式极大的提高了confirm效率,但是如果一旦出现confirm返回false或者超时的情况,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息,如果这种情况频繁发生的话,效率也会不升反降;

这种模式生产者不是每发送一条就等待broker确认,而是发送一批,实现代码见下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49

public class SenderPConfirm {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建链接工厂对象
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("119.91.252.224");//设置RabbitMQ的主机IP
factory.setPort(5672);//设置RabbitMQ的端口号
factory.setUsername("root");//设置访问用户名
factory.setPassword("root");//设置访问密码

Connection connection = null;
Channel channel = null;


connection=factory.newConnection();
channel=connection.createChannel();
channel.queueDeclare("confirmQueue",true,false,false,null);

//开启发送方确认模式
channel.confirmSelect();

long time=System.currentTimeMillis();
for(int i=0;i<10000;i++) {
String message=String.format("时间=》%s",new Date().getTime());
System.out.println(message);
channel.basicPublish("","confirmQueue",null,message.getBytes("utf-8"));
int a = 100/0;
}

channel.waitForConfirmsOrDie(); //直到所有的消息都发布,只要有一个没有确认就抛异常
/**
* waitForConfirmsOrDie 批量消息确认,它会同时向服务中确认之前当前通道中发送的所有的消息是否已经全部成功写入
* 这个方法没有任何的返回值,如果服务器中有一条消息没有能够成功或向服务器发送确认时服务不可访问都被认定为
* 消息确认失败,可能有有消息没有发送成功,我们需要进行消费的补发。
* 如果无法向服务器获取确认信息那么方法就会抛出InterruptedException异常,这时就需要补发消息到队列
* waitForConfirmsOrDie方法可以指定一个参数timeout 用于等待服务器的确认时间,如果超过这个时间也会
* 抛出异常,表示确认失败需要补发消息
*
* 注意:
* 批量消息确认的速度比普通的消息确认要快,但是如果一旦出现了消息补发的情况,我们不能确定具体
* 是哪条消息没有完成发送,需要将本次的发送的所有消息全部进行补发
*
*/

System.out.println("批量确认使用时间="+(System.currentTimeMillis()-time));
// 批量确认使用时间=1047
System.out.println("全部发送完成...");
}
}

异步Confirm模式

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

public class SenderYConfirm {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
//创建链接工厂对象
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("119.91.252.224");//设置RabbitMQ的主机IP
factory.setPort(5672);//设置RabbitMQ的端口号
factory.setUsername("root");//设置访问用户名
factory.setPassword("root");//设置访问密码

Connection connection = null;
Channel channel = null;


connection=factory.newConnection();
channel=connection.createChannel();
channel.queueDeclare("confirmQueue",true,false,false,null);

//开启发送方确认模式
channel.confirmSelect();

long time=System.currentTimeMillis();
for(int i=0;i<10000;i++) {
String message = "生产者:"+i;
System.out.println(message);
channel.basicPublish("", "confirmQueue", null, message.getBytes("utf-8"));
}

channel.addConfirmListener(new ConfirmListener() {
//消息确认以后的回调方法
//参数 1 为被确认的消息的编号 从 1 开始自动递增用于标记当前是第几个消息
//参数 2 为当前消息是否同时确认了多个
//注意:如果参数 2 为true 则表示本次确认同时确认了多条消息,消息等于当前参数1 (消息编号)的所有消息 全部被确认 如果为false 则表示只确认多了当前编号的消息
public void handleAck(long l, boolean b) throws IOException {
System.out.println("消息被确认了 --- 消息编号:" + l + " 是否确认了多条:" + b);
}

//消息没有确认的回调方法
//如果这个方法被执行表示当前的消息没有被确认 需要进行消息补发
//参数 1 为没有被确认的消息的编号 从 1 开始自动递增用于标记当前是第几个消息
//参数 2 为当前消息是否同时没有确认多个
//注意: 如果参数2 为true 则表示小于当前编号的所有的消息可能都没有发送成功需要进行消息的补发
// 如果参数2 为false则表示当前编号的消息没法发送成功需要进行补发
public void handleNack(long l, boolean b) throws IOException {
System.out.println("消息没有被确认-----消息编号:" + l + " 是否没有确认多条:" + b);
}
});


System.out.println("批量确认使用时间="+(System.currentTimeMillis()-time));
// 批量确认使用时间=458
System.out.println("全部发送完成...");
}
}

可以看到,虽然我们还是发送了100条消息,同样我们并没有收到100个ack消息 ,只收到两个或者三个ack消息,并且这两个ack消息的multiple域都为true,这点和测试1是相同的,你多次运行程序会发现每次发送回来的ack消息中的deliveryTag域的值并不是一样的,说明broker端批量回传给发送者的ack消息并不是以固定的批量大小回传的;

类似测核酸,十个人为一组进行抽查

从以上测试示例时间就可以看到waitForConfirmsOrDie方法发送100条消息并且全部收到确认需要135ms,测试2中通过监听器的方式仅仅需要1ms,说明调用waitForConfirmsOrDie会造成程序的阻塞,通过监听器并不会造成程序的阻塞

SpringBoot整合

之前的代码可以看到有很多地方冗余,spring boot就是减少重复代码的。

导包,配置文件

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1
2
3
4
spring.rabbitmq.addresses=119.91.252.224
spring.rabbitmq.port=5672
spring.rabbitmq.username=root
spring.rabbitmq.password=root

direct

发送者

1
2
3
4
5
6
7
8
9
10
11
@Component
public class PaymentNotifySender {
@Autowired
private AmqpTemplate rabbitTemplate;

public void sender(String msg){
System.out.println("notify.payment 已发送消息: "+msg);
//第一个参数为对列名字,第二个参数为消息内容
rabbitTemplate.convertAndSend("notify.payment", msg);
}
}

@Test 调用发送者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@SpringBootTest
class RabbitmqSenderApplicationTests {

@Autowired
private PaymentNotifySender sender;

@Test
public void test_sender1() {
long start = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
sender.sender("测试1:"+i);
}
System.out.println(System.currentTimeMillis() - start);
}
}

接收者

1
2
3
4
5
6
7
8
@Configuration
public class DirectConfig {
@Bean
public Queue paymentNotifyQueue() {
//创建队列
return new Queue("notify.payment");
}
}
1
2
3
4
5
6
7
8
@Component
@RabbitListener(queues = "notify.payment") //监听
public class PaymentNotifyReceive {
@RabbitHandler //消息处理
public void receive(String msg) {
System.out.println("notify.payment接收消息: "+msg);
}
}

Topic模式

topic转发信息主要是依据通配符,队列和交换机的绑定。主要是依据一种模式(通配符+字符串),而当发送消息的时候,只有指定的Key和该模式相匹配的时候,消息才会被发送到该消息队列中.

生产者

用于创建队列,交换机,绑定交换机。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

@Configuration
public class TopicConfig {
//配置一个routingKey为api.core的消息队列并绑定在coreExchange交换机上(交换机的匹配规则为api.core.*)
//配置一个routingKey为api.payment的消息队列并绑定在paymentExchange交换机上(交换机的匹配规则为api.payment.#)

@Bean
public Queue coreQueue() {
return new Queue("api.core");
}
@Bean
public Queue paymentQueue() {
return new Queue("api.payment");
}

@Bean //交换机coreExchange
public TopicExchange coreExchange() {
return new TopicExchange("coreExchange");
}
@Bean //交换机paymentExchange
public TopicExchange paymentExchange() {
return new TopicExchange("paymentExchange");
}

@Bean
/*号只能向后多匹配一层路径(表示一个词)
#号可以向后匹配多层路径(表示零个或多个词)
*/
//BindingBuilder.bind(指定队列).to(交换机).with(路由键); 路由键相当于队列名
//队列coreQueue绑定到交换机coreExchange
public Binding bindingCoreExchange(Queue coreQueue, TopicExchange coreExchange) {
return BindingBuilder.bind(coreQueue).to(coreExchange).with("api.core.*");
}

@Bean //队列paymentQueue绑定到paymentExchange交换机
public Binding bindingPaymentExchange(Queue paymentQueue, TopicExchange paymentExchange) {
return BindingBuilder.bind(paymentQueue).to(paymentExchange).with("api.payment.#");
}
}

负责用户消息发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

@Component
public class ApiCoreSender {
@Autowired
private RabbitTemplate rabbitTemplate;

public void user(String msg){
System.out.println("api.core.user send message: "+msg);
//rabbitTemplate.convertAndSend("交换机名",“路由键”,“消息内容”)
//RabbitMQ将会根据第二个参数去寻找有没有匹配此规则的队列,如果有,则把消息给它,如果有不止一个,则把消息分发给匹配的队列(每个队列都有消息!)
rabbitTemplate.convertAndSend("coreExchange", "api.core.user", msg);
}

//查询用户方法
public void userQuery(String msg){
System.out.println("api.core.user.query send message: "+msg);
rabbitTemplate.convertAndSend("coreExchange", "api.core.core.query", msg);
}
}

负责订单消息发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component
public class ApiPaymentSender {
@Autowired
private RabbitTemplate rabbitTemplate;

//订单管理方法
public void order(String msg){
System.out.println("api.payment.order send message: "+msg);
rabbitTemplate.convertAndSend("paymentExchange", "api.payment.order", msg);
}

//订单查询
public void orderQuery(String msg){
System.out.println("api.payment.order.query send message: "+msg);
rabbitTemplate.convertAndSend("paymentExchange", "api.payment.order.query", msg);
}

//订单明细查询
public void orderDetailQuery(String msg){
System.out.println("api.payment.order.detail.query send message: "+msg);
rabbitTemplate.convertAndSend("paymentExchange", "api.payment.order.detail.query", msg);
}
}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
@SpringBootTest
public class ApiCoreSenderTest {
@Autowired
private ApiCoreSender sender;
@Test
public void test_user() {
sender.user("用户管理!");
}
@Test
public void test_userQuery() {
sender.userQuery("查询用户信息!");
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@SpringBootTest
public class ApiPaymentSenderTest {
@Autowired
private ApiPaymentSender sender;

@Test
public void test_order() {
sender.order("订单管理!");
}

@Test
public void test_orderQuery() {
sender.orderQuery("查询订单信息!");
}

@Test
public void test_orderDetailQuery() {
sender.orderDetailQuery("查询订单详情信息!");
}
}

消费者

1
2
3
4
5
6
7
8
9
@Component
public class ApiCoreRecive {
@RabbitHandler
@RabbitListener(queues = "api.core")
//注意:topic模式中只有消费端监听的路由键符合发送端的路由规则(路由键决定)的队列才会收到消息。
public void user(String msg) {
System.out.println("api.core 接受的消息: " + msg);
}
}
1
2
3
4
5
6
7
8
9
@Component
public class ApiPaymentRecive {
@RabbitHandler
@RabbitListener(queues = "api.payment")
//注意:topic模式中只有消费端监听的路由键符合发送端的路由规则(路由键决定)的队列才会收到消息。
public void order(String msg) {
System.out.println("api.payment.order 接收消息: "+msg);
}
}

启动消费者项目,分别调用生产者的用户消息和订单消息的发送,测试消费者接受到消息。

fanout模式

是路由广播的形式,将会把消息发给绑定它的全部队列,即便设置了key,也会被忽略.在广播模式中,发送者只要发送消息,所有绑定的队列都会接收到,所以在本例中,将交换机,队列的创建都交给消费者项目完成。

广播:发送到路由器的消息会使得绑定到该路由器的每一个Queue接收到消息,这个时候就算指定了Key,或者规则(即上文中convertAndSend方法的参数2),也会被忽略!
交换机类型:FanoutExchange
rabbitTemplate.convertAndSend(“交换机名”,“ ”,“消息内容”);//路由键被忽略
消费端:只要是绑定到该交换机上的都能收到消息

生产者

1
2
3
4
5
6
7
8
9
10
@Component
public class ApiReportSender {
@Autowired
private AmqpTemplate rabbitTemplate;

public void generateReports(String msg){
System.out.println("api.generate.reports 发送消息: "+msg);
rabbitTemplate.convertAndSend("reportExchange", "api.generate.reports", msg);
}
}

测试

1
2
3
4
5
6
7
8
9
@SpringBootTest
public class ApiReportSenderTest {
@Autowired
private ApiReportSender sender;
@Test
public void test_generateReports() {
sender.generateReports("开始生成报表!");
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Configuration
public class FanoutConfig {
@Bean //创建队列
public Queue reportPaymentQueue() {
return new Queue("api.report.payment");
}
@Bean //创建队列
public Queue reportRefundQueue() {
return new Queue("api.report.refund");
}
@Bean //创建交换机
public FanoutExchange reportExchange() {
return new FanoutExchange("reportExchange");
}

@Bean //配置一个routingKey为api.report.payment的消息队列并绑定在reportExchange交换机上
public Binding bindingReportPaymentExchange(Queue reportPaymentQueue, FanoutExchange reportExchange) {
return BindingBuilder.bind(reportPaymentQueue).to(reportExchange);
}
@Bean //配置一个routingKey为api.report.refund的消息队列并绑定在reportExchange交换机上
public Binding bindingReportRefundExchange(Queue reportRefundQueue, FanoutExchange reportExchange) {
return BindingBuilder.bind(reportRefundQueue).to(reportExchange);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
public class ApiReportReceived {
@RabbitHandler
@RabbitListener(queues = "api.report.payment")
public void payment(String msg) {
System.out.println("api.report.payment 接收的消息: "+msg);
}

@RabbitHandler
@RabbitListener(queues = "api.report.refund")
public void refund(String msg) {
System.out.println("api.report.refund 接收的消息: "+msg);
}
}

消息发送复杂对象

在生产者项目中定义实体对象Order,对象必须可序列化

在消费者项目中定义同样的实体类型Order,用于接收消息中的订单对象

集群