中间件技术:RabbitMQ 有了中间件,就相当于以后上学不用走路,而是骑单车。:call_me_hand:
概念 中间件(英语:Middleware),又译中间件、中介层,是一类提供系统软件和应用软件之间连接、便于软件各部件之间的沟通的软件,应用软件可以借助中间件在不同的技术架构之间共享信息与资源。中间件位于客户机服务器的操作系统之上,管理着计算资源和网络通信。
就好似下图:
而RabbitMQ则是一个中间件。
RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而聚类和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
简而言之,RabbitMQ是一套开源(MPL)的消息队列服务软件,我们使用这个消息队列,去传输信息。
RabbitMQ的特性 RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。具体特点包括:
RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
高可用(Highly Available Queues)
队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。
RabbitMQ的基本组成
消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
消息的生产者,也是一个向交换器发布消息的客户端应用程序。
交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
路由关键字,exchange根据这个关键字进行消息投递。
绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
网络连接,比如一个TCP连接。
信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
表示消息队列服务器实体。它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输。
SpringBoot整合RabbitMQ RabbitMQ需要提前在电脑中安装,这里不多赘述,先发个网址给大家安装:
https://blog.csdn.net/weixin_39735923/article/details/79288578
引入依赖 1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency >
看清楚,RabbitMQ的依赖叫做AMQP 。
yml配置 1 2 3 4 5 6 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest
因为我们不进行web的测试,这里仅仅只需要配置rabbitmq就行了。下面就正式开始测试,并且介绍一下RabbitMQ的三个模式。
Direct Exchange(直接交换) 直接交换:- 直接交换是一种基于消息路由密钥将消息路由到队列的交换。路由密钥是生产者添加的消息头中的消息属性。生产者在消息头中添加路由密钥,并将其发送到直接交换。收到消息后,交换 尝试匹配的路由键 与所有绑定到队列的结合键 它。如果找到匹配项,它将消息路由到绑定键 已匹配的队列,如果未找到匹配项,它将忽略该消息。
下面进行测试:
一对一发送 config包
1 2 3 4 5 6 7 8 9 10 11 @Configuration public class RabbitConfig { @Bean public Queue queue () { return new Queue("hello" ); } }
rabbitmq包
发送者
1 2 3 4 5 6 7 8 9 10 11 12 13 @Component public class Sender { @Autowired private AmqpTemplate amqpTemplate; public void send () { String context = "hello----" +new Date(); System.out.println("send:" +context); this .amqpTemplate.convertAndSend("hello" ,context); } }
接收者
1 2 3 4 5 6 7 8 9 10 11 12 @Component @RabbitListener (queues="hello" )public class Receiver { @RabbitHandler public void process (String message) { System.out.println("Receiver:" +message); } }
测试类
我们这次就直接使用IDEA下的测试类,开始对队列进行测试。
1 2 3 4 5 6 7 8 9 10 11 @RunWith (SpringRunner.class ) @SpringBootTest class RabbitmqApplicationTests { @Autowired Sender Sender; @Test public void contextLoads () { Sender.send(); } }
测试结果:
1 2 send:hello----Sun Jan 19 19:12:00 CST 2020 Receiver:hello----Sun Jan 19 19:12:00 CST 2020
可以看到,这是一个单对单的发送。
一对多发送 在Sender继续添加方法
1 2 3 4 5 6 public void send2 (int i) { String context = i+"" ; System.out.println(context+"--send:" ); this .amqpTemplate.convertAndSend("hello2" ,context); }
在建立两个类
1 2 3 4 5 6 7 8 9 10 11 @Component @RabbitListener (queues = "hello2" )public class Receiver1 { @RabbitHandler public void process (String message) { System.out.println("Receiver1:" +message); } }
1 2 3 4 5 6 7 8 9 @Component @RabbitListener (queues = "hello2" )public class Receiver2 { @RabbitHandler public void process (String message) { System.out.println("Receiver2:" +message); } }
开始下一轮测试:
1 2 3 4 5 6 7 @Test public void manyReceiver () { for (int i=0 ;i<10 ;i++){ Sender.send2(i); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 0--send: 1--send: 2--send: 3--send: 4--send: 5--send: 6--send: 7--send: 8--send: 9--send: Receiver1:0 Receiver2:1 Receiver1:2 Receiver2:3 Receiver2:5 Receiver1:4 Receiver2:7 Receiver1:6 Receiver2:9 Receiver1:8
可以从结果看到,我们发送了10个消息,这些消息有两个接收者。而这两个接收者均匀的接收了这10个消息。
多对多发送 有了前面两个案例,我们再来试试多对多发送是如何的:
再建立一个新的类:
1 2 3 4 5 6 7 8 9 10 11 12 @Component public class Sender2 { @Autowired private AmqpTemplate amqpTemplate; public void send2 (int i) { String context = i+"" ; System.out.println(context+"--send:" ); this .amqpTemplate.convertAndSend("hello2" ,context); } }
记得同时也需要注入AutoWired,再添加测试:
1 2 3 4 5 6 7 @Test public void many2many () { for (int i=0 ;i<10 ;i++){ Sender.send2(i); Sender2.send2(i); }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 0--send: 0--send: 1--send: 1--send: 2--send: 2--send: 3--send: 3--send: 4--send: 4--send: 5--send: 5--send: 6--send: 6--send: 7--send: 7--send: 8--send: 8--send: 9--send: 9--send: Receiver2:0 Receiver1:0 Receiver2:1 Receiver1:1 Receiver2:2 Receiver1:2 Receiver2:3 Receiver1:3 Receiver2:4 Receiver2:5 Receiver1:4 Receiver2:6 Receiver1:5 Receiver2:7 Receiver1:6 Receiver2:8 Receiver1:7 Receiver2:9 Receiver1:8 Receiver1:9
可见他们均匀的收到了消息,且不重复。
发送对象 先构建一个实体类:
1 2 3 4 5 @Data public class User implements Serializable { private String username; private String password; }
创建新的队列
1 2 3 4 @Bean public Queue queue3 () { return new Queue("object_queue" ); }
发送者和接收者:
1 2 3 4 5 6 7 8 9 10 11 12 13 @Component public class ObjectSender { @Autowired AmqpTemplate amqpTemplate; public void sendUser (User user) { System.out.println("Send object:" +user.toString()); this .amqpTemplate.convertAndSend("object_queue" ,user); } }
1 2 3 4 5 6 7 8 9 10 11 12 @Component @RabbitListener (queues = "object_queue" )public class ObjectReceiver { @RabbitHandler public void objectReceiver (User user) { System.out.println("Receiver object:" +user.toString()); } }
测试:
1 2 3 4 5 6 7 @Test public void object () { User user=new User(); user.setUsername("张三" ); user.setPassword("123456" ); objectSender.sendUser(user); }
1 2 Send object:User(username=张三, password=123456) Receiver object:User(username=张三, password=123456)
Topic Exchange(主题交换) 主题交换: 主题交换是基于队列绑定期间指定的路由键和路由模式之间的通配符匹配,将消息路由到队列的交换。 生产者在消息头中添加路由密钥,并将其发送到主题交换。收到消息后,交换 尝试将路由密钥与与其绑定的所有队列的绑定路由模式进行匹配。如果找到匹配项,它将把消息路由到路由模式匹配的队列,如果找不到匹配项,它将忽略该消息。
路由关键字:-这是单词列表,以句点(。)分隔,例如“ asia.china.beijing”
路由模式:- 这是在绑定队列期间指定的模式,它是单词和通配符的列表,例如“ * ”和“ # ”,以句点(。)分隔。通配符的使用如下:
“ * ”:-用于匹配路由键中特定位置的单词,例如路由模式“ asia.china。*”将与第一个单词为“ asia”且第二个单词为第二个的路由键匹配单词是“ china”,如“ asia.china.beijing”和“ asia.china.nanjing”。
“ # ”:-用于匹配零个或多个单词,例如“ asia.china。#”的路由模式将与以“ asia.china”开头的路由键(例如“ asia.china”和“ asia.china.beijing”。
可以说:topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列。
先配置新的configuration:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 @Configuration public class TopicRabbitConfig { final static String message = "topic.message" ; final static String messages = "topic.messages" ; @Bean public Queue queueMessage () { return new Queue(TopicRabbitConfig.message); } @Bean public Queue queueMessages () { return new Queue(TopicRabbitConfig.messages); } @Bean public TopicExchange exchange () { return new TopicExchange("topicExchange" ); } @Bean public Binding bindingExchangeMessage (Queue queueMessage, TopicExchange exchange) { return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message" ); } @Bean public Binding bingingExchangeMessages (Queue queueMessages, TopicExchange exchange) { return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#" ); } }
topic发送者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Component public class TopicSender { @Autowired AmqpTemplate amqpTemplate; public void send1 () { String context = "hi, i am message 1" ; System.out.println("Sender : " + context); amqpTemplate.convertAndSend("topicExchange" ,"topic.message" ,context); } public void send2 () { String context = "hi, i am messages 2" ; System.out.println("Sender : " + context); amqpTemplate.convertAndSend("topicExchange" , "topic.messages" , context); } }
两个接收者:
1 2 3 4 5 6 7 8 9 10 11 12 @Component @RabbitListener (queues = "topic.messages" )public class TopicReceiver2 { @RabbitHandler public void process (String message) { System.out.println("Receiver topic.messages: " + message); } }
1 2 3 4 5 6 7 8 9 10 11 12 @Component @RabbitListener (queues = "topic.message" )public class TopicReceiver1 { @RabbitHandler public void process (String message) { System.out.println("Receiver topic.message :" + message); } }
开始测试:
1 2 3 4 5 @Test public void topic () { topicSender.send1(); topicSender.send2(); }
结果:
1 2 3 4 5 Sender : hi, i am message 1 Sender : hi, i am messages 2 Receiver topic.message :hi, i am message 1 Receiver topic.messages: hi, i am message 1 Receiver topic.messages: hi, i am messages 2
可以看到,绑定的队列头不一样的队列,收到的信息也是不一样的,但是这是一个包含关系。
发送send1会匹配到topic.#和topic.message 两个Receiver都可以收到消息,发送send2只有topic.#可以匹配的只有Receiver2,Receiver2收到了第二条监听到消息
Fanout Exchange(扇出交换) 扇出交换:扇出交换是一种将接收到的消息路由到与其绑定的所有队列的交换。 当生产者将消息发送到扇出交换时,它将复制消息并将其路由到与其绑定的所有队列。它只是忽略路由键或生产者提供的任何模式匹配。当需要将同一消息存储在一个或多个队列中时,这种类型的交换非常有用。
简而言之,Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 @Configuration public class FanOutRabbitMq { @Bean public Queue AMessage () { return new Queue("fanout.1" ); } @Bean public Queue BMessage () { return new Queue("fanout.2" ); } @Bean public Queue CMessage () { return new Queue("fanout.3" ); } @Bean public FanoutExchange fanoutExchange () { return new FanoutExchange("fanoutExchange" ); } @Bean public Binding bindingExchangeA (Queue AMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(AMessage).to(fanoutExchange); } @Bean public Binding bindingExchangeB (Queue BMessage,FanoutExchange fanoutExchange) { return BindingBuilder.bind(BMessage).to(fanoutExchange); } @Bean Binding bindingExchangeC (Queue CMessage, FanoutExchange fanoutExchange) { return BindingBuilder.bind(CMessage).to(fanoutExchange); } }
一个广播:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Component public class FanoutSender { @Autowired AmqpTemplate amqpTemplate; public void send () { String context = "hi, fanout msg " ; System.out.println("Sender : " + context); amqpTemplate.convertAndSend("fanoutExchange" ,"" , context); } }
三个接收者:
1 2 3 4 5 6 7 8 9 10 11 12 13 @Component @RabbitListener (queues = "fanout.1" )public class FanoutReceiver1 { @RabbitHandler public void process (String message) { System.out.println("Receiver form fanout.A: " +message); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 @Component @RabbitListener (queues = "fanout.2" )public class FanoutReceiver2 { @RabbitHandler public void process (String message) { System.out.println("Receiver form fanout_2: " +message); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 @Component @RabbitListener (queues = "fanout.3" )public class FanoutReceiver3 { @RabbitHandler public void process (String message) { System.out.println("Receiver form fanout_3: " +message); } }
测试:
1 2 3 4 @Test public void fanout () { fanoutSender.send(); }
1 2 3 4 Sender : hi, fanout msg Receiver form fanout_3: hi, fanout msg Receiver form fanout_1: hi, fanout msg Receiver form fanout_2: hi, fanout msg
放个更形象的比喻:
异步 本来是想单开一篇,但实在是觉得太简单,所以也连着一块说了吧。
异步这个词之前在学习多线程的时候有遇见过,但是可能很多人不太理解该如何结合到Spring当中运用,这次就来粗略的展示一下:
首先需要一个服务层:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @Service public class AsyncService { @Async public void hello () { try { Thread.sleep(3000 ); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("数据处理中----" +new Date()); } }
需要在方法上面标明这是一个异步方法,这样后台就会为这个方法单开一个线程执行。
最后别忘了在启动器加上注解:
测试一下:
1 2 3 4 5 数据处理中----Sun Jan 19 22:30:13 CST 2020 数据处理中----Sun Jan 19 22:30:14 CST 2020 数据处理中----Sun Jan 19 22:30:15 CST 2020 数据处理中----Sun Jan 19 22:30:15 CST 2020 数据处理中----Sun Jan 19 22:30:16 CST 2020
可以从时间上看到,时间之差似乎只有1秒,仿佛没有把3秒的线程睡眠给编译进去。但是你可以把这个注解取消,就会发现它每个动作都得延迟3秒进行,这就是异步控制。
定时任务 在实际运用中,有时候需要定一个时间去发消息,比如各个月末的报表,某些定时推送的新闻,年终的奖金等等,就需要一个定时器去完成。这个在Spring当中,当然也有内置的方法,由于比较简单,所以也一块说了。
这个定时器也需要在启动器中加入:@EnableScheduling 进行开启。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 @Service public class ScheduledService { @Scheduled (cron = "0/1 * * * * ?" ) public void hello () { System.out.println("scheduled----------" ); } }
这个就代表了,每一秒都发一条信息:
1 2 3 scheduled---------- scheduled---------- scheduled----------
项目地址:https://github.com/Antarctica000/SpringBoot/tree/master/rabbitmq