SpringBoot整合RabbitMQ

中间件技术:RabbitMQ

有了中间件,就相当于以后上学不用走路,而是骑单车。:call_me_hand:

概念

中间件(英语:Middleware),又译中间件、中介层,是一类提供系统软件和应用软件之间连接、便于软件各部件之间的沟通的软件,应用软件可以借助中间件在不同的技术架构之间共享信息与资源。中间件位于客户机服务器的操作系统之上,管理着计算资源和网络通信。

就好似下图:

而RabbitMQ则是一个中间件。

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而聚类和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

简而言之,RabbitMQ是一套开源(MPL)的消息队列服务软件,我们使用这个消息队列,去传输信息。

RabbitMQ的特性

RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:

  • 可靠性(Reliability)

RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。

  • 灵活的路由(Flexible Routing)

在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。

  • 消息集群(Clustering)

多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。

  • 高可用(Highly Available Queues)

队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。

  • 多种协议(Multi-protocol)

RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。

  • 多语言客户端(Many Clients)

RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。

  • 管理界面(Management UI)

RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。

  • 跟踪机制(Tracing)

如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。

  • 插件机制(Plugin System)

RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

RabbitMQ的基本组成

  • Message

消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。

  • Publisher

消息的生产者,也是一个向交换器发布消息的客户端应用程序。

  • Exchange

交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

  • Routing Key

路由关键字,exchange根据这个关键字进行消息投递。

  • Binding

绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

  • Queue

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。

  • Connection

网络连接,比如一个TCP连接。

  • Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。

  • Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。

  • Virtual Host

虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。

  • Broker

表示消息队列服务器实体。它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输。

SpringBoot整合RabbitMQ

RabbitMQ需要提前在电脑中安装,这里不多赘述,先发个网址给大家安装:

https://blog.csdn.net/weixin_39735923/article/details/79288578

引入依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

看清楚,RabbitMQ的依赖叫做AMQP

yml配置

1
2
3
4
5
6
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest

因为我们不进行web的测试,这里仅仅只需要配置rabbitmq就行了。下面就正式开始测试,并且介绍一下RabbitMQ的三个模式。

Direct Exchange(直接交换)

直接交换:- 直接交换是一种基于消息路由密钥将消息路由到队列的交换。路由密钥是生产者添加的消息头中的消息属性。生产者在消息头中添加路由密钥,并将其发送到直接交换。收到消息后,交换 尝试匹配的路由键 与所有绑定到队列的结合键 它。如果找到匹配项,它将消息路由到绑定键 已匹配的队列,如果未找到匹配项,它将忽略该消息。

下面进行测试:

一对一发送

config包

1
2
3
4
5
6
7
8
9
10
11
@Configuration
public class RabbitConfig {

//注意这里导入的队列是:
//org.springframework.amqp.core.Queue;

@Bean
public Queue queue(){
return new Queue("hello");
}
}

rabbitmq包

发送者

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
public class Sender {

@Autowired
private AmqpTemplate amqpTemplate;

public void send(){
String context = "hello----"+new Date();
System.out.println("send:"+context);
//往名称为 hello 的queue中发送消息
this.amqpTemplate.convertAndSend("hello",context);
}
}

接收者

1
2
3
4
5
6
7
8
9
10
11
12
@Component
@RabbitListener(queues="hello")
public class Receiver {

//消息处理器
@RabbitHandler
public void process(String message){
System.out.println("Receiver:"+message);

}

}

测试类

我们这次就直接使用IDEA下的测试类,开始对队列进行测试。

1
2
3
4
5
6
7
8
9
10
11
@RunWith(SpringRunner.class)
@SpringBootTest
class RabbitmqApplicationTests {
@Autowired
Sender Sender;

@Test
public void contextLoads() {
Sender.send();
}
}

测试结果:

1
2
send:hello----Sun Jan 19 19:12:00 CST 2020
Receiver:hello----Sun Jan 19 19:12:00 CST 2020

可以看到,这是一个单对单的发送。

一对多发送

在Sender继续添加方法

1
2
3
4
5
6
//给hello2发送消息,并接受一个计数参数
public void send2(int i){
String context = i+"";
System.out.println(context+"--send:");
this.amqpTemplate.convertAndSend("hello2",context);
}

在建立两个类

1
2
3
4
5
6
7
8
9
10
11
@Component
@RabbitListener(queues = "hello2")
public class Receiver1 {


@RabbitHandler
public void process(String message){

System.out.println("Receiver1:"+message);
}
}
1
2
3
4
5
6
7
8
9
@Component
@RabbitListener(queues = "hello2")
public class Receiver2 {

@RabbitHandler
public void process(String message){
System.out.println("Receiver2:"+message);
}
}

开始下一轮测试:

1
2
3
4
5
6
7
@Test
public void manyReceiver(){
for (int i=0;i<10;i++){
Sender.send2(i);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
0--send:
1--send:
2--send:
3--send:
4--send:
5--send:
6--send:
7--send:
8--send:
9--send:
Receiver1:0
Receiver2:1
Receiver1:2
Receiver2:3
Receiver2:5
Receiver1:4
Receiver2:7
Receiver1:6
Receiver2:9
Receiver1:8

可以从结果看到,我们发送了10个消息,这些消息有两个接收者。而这两个接收者均匀的接收了这10个消息。

多对多发送

有了前面两个案例,我们再来试试多对多发送是如何的:

再建立一个新的类:

1
2
3
4
5
6
7
8
9
10
11
12
@Component
public class Sender2 {

@Autowired
private AmqpTemplate amqpTemplate;

public void send2(int i){
String context = i+"";
System.out.println(context+"--send:");
this.amqpTemplate.convertAndSend("hello2",context);
}
}

记得同时也需要注入AutoWired,再添加测试:

1
2
3
4
5
6
7

@Test
public void many2many(){
for (int i=0;i<10;i++){
Sender.send2(i);
Sender2.send2(i);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
0--send:
0--send:
1--send:
1--send:
2--send:
2--send:
3--send:
3--send:
4--send:
4--send:
5--send:
5--send:
6--send:
6--send:
7--send:
7--send:
8--send:
8--send:
9--send:
9--send:
Receiver2:0
Receiver1:0
Receiver2:1
Receiver1:1
Receiver2:2
Receiver1:2
Receiver2:3
Receiver1:3
Receiver2:4
Receiver2:5
Receiver1:4
Receiver2:6
Receiver1:5
Receiver2:7
Receiver1:6
Receiver2:8
Receiver1:7
Receiver2:9
Receiver1:8
Receiver1:9

可见他们均匀的收到了消息,且不重复。

发送对象

先构建一个实体类:

1
2
3
4
5
@Data
public class User implements Serializable {
private String username;
private String password;
}

创建新的队列

1
2
3
4
@Bean
public Queue queue3(){
return new Queue("object_queue");
}

发送者和接收者:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
public class ObjectSender {

@Autowired
AmqpTemplate amqpTemplate;

public void sendUser(User user){

System.out.println("Send object:"+user.toString());
this.amqpTemplate.convertAndSend("object_queue",user);

}
}
1
2
3
4
5
6
7
8
9
10
11
12

@Component
@RabbitListener(queues = "object_queue")
public class ObjectReceiver {

@RabbitHandler
public void objectReceiver(User user){

System.out.println("Receiver object:"+user.toString());

}
}

测试:

1
2
3
4
5
6
7
@Test
public void object(){
User user=new User();
user.setUsername("张三");
user.setPassword("123456");
objectSender.sendUser(user);
}
1
2
Send object:User(username=张三, password=123456)
Receiver object:User(username=张三, password=123456)

Topic Exchange(主题交换)

主题交换: 主题交换是基于队列绑定期间指定的路由键和路由模式之间的通配符匹配,将消息路由到队列的交换。 生产者在消息头中添加路由密钥,并将其发送到主题交换。收到消息后,交换 尝试将路由密钥与与其绑定的所有队列的绑定路由模式进行匹配。如果找到匹配项,它将把消息路由到路由模式匹配的队列,如果找不到匹配项,它将忽略该消息。

路由关键字:-这是单词列表,以句点(。)分隔,例如“ asia.china.beijing”

路由模式:- 这是在绑定队列期间指定的模式,它是单词和通配符的列表,例如“ * ”和“ # ”,以句点(。)分隔。通配符的使用如下:

  • “ * ”:-用于匹配路由键中特定位置的单词,例如路由模式“ asia.china。*”将与第一个单词为“ asia”且第二个单词为第二个的路由键匹配单词是“ china”,如“ asia.china.beijing”和“ asia.china.nanjing”。
  • “ # ”:-用于匹配零个或多个单词,例如“ asia.china。#”的路由模式将与以“ asia.china”开头的路由键(例如“ asia.china”和“ asia.china.beijing”。

可以说:topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列。

先配置新的configuration:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Configuration
public class TopicRabbitConfig {


final static String message = "topic.message";
final static String messages = "topic.messages";


//创建两个 Queue
@Bean
public Queue queueMessage() {
return new Queue(TopicRabbitConfig.message);
}

@Bean
public Queue queueMessages() {
return new Queue(TopicRabbitConfig.messages);
}

//配置 TopicExchange,指定名称为 topicExchange
@Bean
public TopicExchange exchange() {
return new TopicExchange("topicExchange");
}

//给队列绑定 exchange 和 routing_key

@Bean
public Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}

@Bean
public Binding bingingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
}

topic发送者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
public class TopicSender {

@Autowired
AmqpTemplate amqpTemplate;

public void send1(){
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
amqpTemplate.convertAndSend("topicExchange","topic.message",context);
}

public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
amqpTemplate.convertAndSend("topicExchange", "topic.messages", context);
}

}

两个接收者:

1
2
3
4
5
6
7
8
9
10
11
12
@Component
@RabbitListener(queues = "topic.messages")
public class TopicReceiver2 {

@RabbitHandler
public void process(String message){

System.out.println("Receiver topic.messages: "+ message);

}

}
1
2
3
4
5
6
7
8
9
10
11
12
@Component
@RabbitListener(queues = "topic.message")
public class TopicReceiver1 {

@RabbitHandler
public void process(String message){

System.out.println("Receiver topic.message :"+ message);

}

}

开始测试:

1
2
3
4
5
@Test
public void topic(){
topicSender.send1();
topicSender.send2();
}

结果:

1
2
3
4
5
Sender : hi, i am message 1
Sender : hi, i am messages 2
Receiver topic.message :hi, i am message 1
Receiver topic.messages: hi, i am message 1
Receiver topic.messages: hi, i am messages 2

可以看到,绑定的队列头不一样的队列,收到的信息也是不一样的,但是这是一个包含关系。

发送send1会匹配到topic.#和topic.message 两个Receiver都可以收到消息,发送send2只有topic.#可以匹配的只有Receiver2,Receiver2收到了第二条监听到消息

Fanout Exchange(扇出交换)

扇出交换:扇出交换是一种将接收到的消息路由到与其绑定的所有队列的交换。 当生产者将消息发送到扇出交换时,它将复制消息并将其路由到与其绑定的所有队列。它只是忽略路由键或生产者提供的任何模式匹配。当需要将同一消息存储在一个或多个队列中时,这种类型的交换非常有用。

简而言之,Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。

配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
@Configuration
public class FanOutRabbitMq {


//创建三个队列

@Bean
public Queue AMessage() {
return new Queue("fanout.1");
}

@Bean
public Queue BMessage() {
return new Queue("fanout.2");
}

@Bean
public Queue CMessage() {
return new Queue("fanout.3");
}


//创建exchange,指定交换策略

@Bean
public FanoutExchange fanoutExchange() {

return new FanoutExchange("fanoutExchange");
}


//分别给三个队列指定exchange,这里使用了1、2、3三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:

@Bean
public Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange){
return BindingBuilder.bind(AMessage).to(fanoutExchange);

}

@Bean
public Binding bindingExchangeB(Queue BMessage,FanoutExchange fanoutExchange){
return BindingBuilder.bind(BMessage).to(fanoutExchange);

}

@Bean
Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(CMessage).to(fanoutExchange);
}


}

一个广播:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

@Component
public class FanoutSender {

@Autowired
AmqpTemplate amqpTemplate;


public void send(){

String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
//这里使用了1、2、3三个队列绑定到Fanout交换机上面,发送端的routing_key写任何字符都会被忽略:
amqpTemplate.convertAndSend("fanoutExchange","", context);

}

}

三个接收者:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
@RabbitListener(queues = "fanout.1")
public class FanoutReceiver1 {


@RabbitHandler
public void process(String message){

System.out.println("Receiver form fanout.A: "+message);

}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
@RabbitListener(queues = "fanout.2")
public class FanoutReceiver2 {


@RabbitHandler
public void process(String message){

System.out.println("Receiver form fanout_2: "+message);

}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
@RabbitListener(queues = "fanout.3")
public class FanoutReceiver3 {


@RabbitHandler
public void process(String message){

System.out.println("Receiver form fanout_3: "+message);

}

}

测试:

1
2
3
4
@Test
public void fanout(){
fanoutSender.send();
}
1
2
3
4
Sender : hi, fanout msg 
Receiver form fanout_3: hi, fanout msg
Receiver form fanout_1: hi, fanout msg
Receiver form fanout_2: hi, fanout msg

放个更形象的比喻:

异步

本来是想单开一篇,但实在是觉得太简单,所以也连着一块说了吧。

异步这个词之前在学习多线程的时候有遇见过,但是可能很多人不太理解该如何结合到Spring当中运用,这次就来粗略的展示一下:

首先需要一个服务层:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Service
public class AsyncService {

@Async//告诉spring这是一个异步的方法
public void hello(){

try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}


System.out.println("数据处理中----"+new Date());
}
}

需要在方法上面标明这是一个异步方法,这样后台就会为这个方法单开一个线程执行。

最后别忘了在启动器加上注解:

1
@EnableAsync//开启异步注解功能

测试一下:

1
2
3
4
5
数据处理中----Sun Jan 19 22:30:13 CST 2020
数据处理中----Sun Jan 19 22:30:14 CST 2020
数据处理中----Sun Jan 19 22:30:15 CST 2020
数据处理中----Sun Jan 19 22:30:15 CST 2020
数据处理中----Sun Jan 19 22:30:16 CST 2020

可以从时间上看到,时间之差似乎只有1秒,仿佛没有把3秒的线程睡眠给编译进去。但是你可以把这个注解取消,就会发现它每个动作都得延迟3秒进行,这就是异步控制。

定时任务

在实际运用中,有时候需要定一个时间去发消息,比如各个月末的报表,某些定时推送的新闻,年终的奖金等等,就需要一个定时器去完成。这个在Spring当中,当然也有内置的方法,由于比较简单,所以也一块说了。

这个定时器也需要在启动器中加入:@EnableScheduling 进行开启。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Service
public class ScheduledService {

/**
* second(秒), minute(分), hour(时), day of month(日), month(月), day of week(周几).
* 0 * * * * MON-FRI
* 【0 0/5 14,18 * * ?】 每天14点整,和18点整,每隔5分钟执行一次
* 【0 15 10 ? * 1-6】 每个月的周一至周六10:15分执行一次
* 【0 0 2 ? * 6L】每个月的最后一个周六凌晨2点执行一次
* 【0 0 2 LW * ?】每个月的最后一个工作日凌晨2点执行一次
* 【0 0 2-4 ? * 1#1】每个月的第一个周一凌晨2点到4点期间,每个整点都执行一次;
*/

@Scheduled(cron = "0/1 * * * * ?")
public void hello(){
System.out.println("scheduled----------");
}
}

这个就代表了,每一秒都发一条信息:

1
2
3
scheduled----------
scheduled----------
scheduled----------

项目地址:https://github.com/Antarctica000/SpringBoot/tree/master/rabbitmq