Appearance
RabbitMQ
待办 - RabbitMQ的使用方式
- Java 项目
xml
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
- SpringBoot 项目
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- SpringCloud 项目
xml
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
引言
模块之间的耦合度多高,导致一个模块宕机后,全部功能都不能用了,并且同步通讯的成本过高,用户体验差。
RabbitMQ引言 |
---|
RabbitMQ介绍
市面上比较火爆的几款MQ:
ActiveMQ,RocketMQ,Kafka,RabbitMQ。
语言的支持:ActiveMQ,RocketMQ只支持Java语言,Kafka可以支持多们语言,RabbitMQ支持多种语言。
效率方面:ActiveMQ,RocketMQ,Kafka效率都是毫秒级别,RabbitMQ是微秒级别的。
消息丢失,消息重复问题: RabbitMQ针对消息的持久化,和重复问题都有比较成熟的解决方案。
学习成本:RabbitMQ非常简单。
RabbitMQ是由Rabbit公司去研发和维护的,最终是在Pivotal。
RabbitMQ严格的遵循AMQP协议,高级消息队列协议,帮助我们在进程之间传递异步消息。
RabbitMQ安装
yml
version: "3.9"
services:
rabbitmq:
image: rabbitmq:3.8.9-management
restart: always
container_name: rabbitmq
ports:
- 5672:5672
- 15672:15672
volumes:
- ./data:/var/lib/rabbitmq
networks:
default:
external: true
name: global
测试
进入宿主机ip:15672
默认用户名: guest
默认密码: guest
RabbitMQ架构【重点
】
官方的简单架构图
Publisher - 生产者:发布消息到RabbitMQ中的Exchange
Consumer - 消费者:监听RabbitMQ中的Queue中的消息
Exchange - 交换机:和生产者建立连接并接收生产者的消息
Queue - 队列:Exchange会将消息分发到指定的Queue,Queue和消费者进行交互
Routes - 路由:交换机以什么样的策略将消息发布到Queue
简单架构图 |
---|
RabbitMQ的完整架构图
完整架构图
完整架构图 |
---|
查看图形化界面并创建一个Virtual Host
创建一个全新的用户和全新的Virtual Host,并且将test用户设置上可以操作/test的权限
监控界面 |
---|
RabbitMQ的使用【重点
】
RabbitMQ的通讯方式
- Simplest
The simplest thing that does something
- Work queues
Distributing tasks among workers
- Publish/Subscribe
Sending messages to many consumers at once
- Routing
Receiving messages selectively
- Topics
Receiving messages based on a pattern (topics)
- RPC
- Publisher Confirms
Reliable publishing with publisher confirms
Java连接RabbitMQ
创建maven项目
导入依赖
xml
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
创建工具类连接RabbitMQ
java
public static Connection getConnection(){
// 创建Connection工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.199.109");
factory.setPort(5672);
factory.setUsername("test");
factory.setPassword("test");
factory.setVirtualHost("/test");
// 创建Connection
Connection conn = null;
try {
conn = factory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
// 返回
return conn;
}
效果图 |
---|
Hello-World
一个生产者,一个默认的交换机,一个队列,一个消费者
结构图 |
---|
创建生产者,创建一个channel,发布消息到exchange,指定路由规则。
java
@Test
public void publish() throws Exception {
//1. 获取Connection
Connection connection = RabbitMQClient.getConnection();
//2. 创建Channel
Channel channel = connection.createChannel();
channel.queueDeclare("Hello-World-Queue", false, false, true, null);
//3. 发布消息到exchange,同时指定路由的规则
String msg = "Hello-World!";
// 参数1:指定exchange,使用""。
// 参数2:指定路由的规则,使用具体的队列名称。
// 参数3:指定传递的消息所携带的properties,使用null。
// 参数4:指定发布的具体消息,byte[]类型
channel.basicPublish("","HelloWorld",null,msg.getBytes());
// Ps:exchange是不会帮你将消息持久化到本地的,Queue才会帮你持久化消息。
System.out.println("生产者发布消息成功!");
//4. 释放资源
channel.close();
connection.close();
}
创建消费者,创建一个channel,创建一个队列,并且去消费当前队列
java
@Test
public void consume() throws Exception {
//1. 获取连接对象
Connection connection = RabbitMQClient.getConnection();
//2. 创建channel
Channel channel = connection.createChannel();
//3. 声明队列-HelloWorld
//参数1:queue - 指定队列的名称
//参数2:durable - 当前队列是否需要持久化(true)
//参数3:exclusive - 是否排外(conn.close() - 当前队列会被自动删除,当前队列只能被一个消费者消费)
//参数4:autoDelete - 如果这个队列没有消费者在消费,队列自动删除
//参数5:arguments - 指定当前队列的其他信息
channel.queueDeclare("HelloWorld",true,false,false,null);
//4. 开启监听Queue
DefaultConsumer consume = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收到消息:" + new String(body,"UTF-8"));
}
};
//参数1:queue - 指定消费哪个队列
//参数2:autoAck - 指定是否自动ACK (true,接收到消息后,会立即告诉RabbitMQ)
//参数3:consumer - 指定消费回调
channel.basicConsume("HelloWorld",true,consume);
System.out.println("消费者开始监听队列!");
// System.in.read();
System.in.read();
//5. 释放资源
channel.close();
connection.close();
}
Work
一个生产者,一个默认的交换机,一个队列,两个消费者
结构图 |
---|
只需要在消费者端,添加Qos能力以及更改为手动ack即可让消费者,根据自己的能力去消费指定的消息,而不是默认情况下由RabbitMQ平均分配了,生产者不变,正常发布消息到默认的exchange,并指定routing
消费者指定Qoa和手动ack
java
// 1. 连接
Connection conn = MQUtil.getConnection();
// 2. 获取channel
Channel channel = conn.createChannel();
// 3. 操作
// 告诉RabbitMQ,一次只发一个消息
// 原因是,RabbitMQ为了提升运行效率,不会一条一条地给消费者发送消息;而是多条多条地发送消息
// 因为测试中,希望每一个消费者都有机会收到消息,所以设置QoS为1,也就是一次只发一条消息
// 在真实场景中,不要设置
channel.basicQos(1);
channel.basicConsume("HelloQueue", false, new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
System.out.println("第2个消费者: " + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
System.in.read();
// 4. 关闭资源
channel.close();
conn.close();
Publish/Subscribe
exchange有4种模式,分别是header、fanout、direct和topic。其中fanout对应的是publish/subscribe模式,direct对应的是routing模式,topic对应的是topic模式
一个生产者,一个交换机,两个队列,两个消费者
每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机
结构图 |
---|
声明一个Fanout类型的exchange,并且将exchange和queue绑定在一起,绑定的方式就是直接绑定。
让生产者创建一个exchange并且指定类型,和一个或多个队列绑定到一起。
java
// 1. 获取连接
Connection conn = RabbitMQClient.getConnection();
// 2. 获取channel
Channel channel = conn.createChannel();
// 3. 操作
// 定义一个exchange(交换机)
channel.exchangeDeclare("pubsub_exchange", BuiltinExchangeType.FANOUT);
// 定义两个queue(队列)
channel.queueDeclare("pubsub_queue1", true, false, false, null);
channel.queueDeclare("pubsub_queue2", true, false, false, null);
// 将两个queue绑定到exchange上
channel.queueBind("pubsub_queue1", "pubsub_exchange", "");
channel.queueBind("pubsub_queue2", "pubsub_exchange", "");
// 向交换机发送消息
channel.basicPublish("pubsub_exchange", "", null, "Hello Pubsub!".getBytes());
// 4. 关闭资源
channel.close();
conn.close();
消费者还是正常的监听某一个队列即可。
Routing
一个生产者,一个交换机,两个队列,两个消费者
结构图 |
---|
生产者在创建DIRECT类型的exchange后,根据RoutingKey去绑定相应的队列,并且在发送消息时,指定消息的具体RoutingKey即可。
java
// 1. 获取Connection
Connection conn = RabbitMQClient.getConnection();
// 2. 获取Channel
Channel channel = conn.createChannel();
// 3. 操作
// 定义交换机
channel.exchangeDeclare("routing_exchange", BuiltinExchangeType.DIRECT);
// 定义两个队列
channel.queueDeclare("routing_queue_info", true, false, false, null);
channel.queueDeclare("routing_queue_error", true, false, false, null);
// 根据路由机制绑定队列
channel.queueBind("routing_queue_info", "routing_exchange", "INFO");
channel.queueBind("routing_queue_error", "routing_exchange", "ERROR");
channel.basicPublish("routing_exchange", "INFO", null, "This is a info msg".getBytes());
channel.basicPublish("routing_exchange", "ERROR", null, "This is a error msg".getBytes());
// 4. 关闭资源
channel.close();
conn.close();
消费者没有变化
Topic
一个生产者,一个交换机,两个队列,两个消费者
结构图 |
---|
生产者创建Topic的exchange并且绑定到队列中,这次绑定可以通过*和#关键字,对指定RoutingKey内容,编写时注意格式 xxx.xxx.xxx去编写, * -> 一个xxx,而# -> 代表多个xxx.xxx,在发送消息时,指定具体的RoutingKey到底是什么。
java
// 1. 获取Connection
Connection conn = RabbitMQClient.getConnection();
// 2. 获取Channel
Channel channel = conn.createChannel();
// 3. 操作
// 定义交换机
channel.exchangeDeclare("exchange_topic", BuiltinExchangeType.TOPIC);
// 定义两个队列
channel.queueDeclare("queue_topic1", true, false, false, null);
channel.queueDeclare("queue_topic2", true, false, false, null);
// 绑定
channel.queueBind("queue_topic1", "exchange_topic", "*.red.*");
channel.queueBind("queue_topic2", "exchange_topic", "fast.#");
channel.queueBind("queue_topic2", "exchange_topic", "*.*.rabbit");
// 发送到1、2队列中
channel.basicPublish("exchange_topic", "fast.red.monkey", null, "红快猴子".getBytes());
// 发不出去
channel.basicPublish("exchange_topic", "slow.black.dog", null, "黑慢狗".getBytes());
// 发到2队列中
channel.basicPublish("exchange_topic", "fast.white.cat", null, "快白猫".getBytes());
// 4. 关闭资源
channel.close();
conn.close();
消费者只是监听队列,没变化。
【扩展】死信机制
java
Connection conn = RabbitMQClient.getConnection();
Channel channel = connection.createChannel();
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "Exchange-TTL-To");
arguments.put("x-dead-letter-routing-key", "Routing-TTL-To");
arguments.put("x-message-ttl", 10000);
channel.exchangeDeclare("Exchange-TTL-From", BuiltinExchangeType.FANOUT);
channel.exchangeDeclare("Exchange-TTL-To", BuiltinExchangeType.FANOUT);
channel.queueDeclare("Queue-TTL-From", false, false, false, arguments);
channel.queueDeclare("Queue-TTL-To", false, false, false, null);
channel.queueBind("Queue-TTL-From", "Exchange-TTL-From", "Routing-TTL-From");
channel.queueBind("Queue-TTL-To", "Exchange-TTL-To", "Routing-TTL-To");
channel.basicPublish("Exchange-TTL-From", "", null, "Hello Dead Letter".getBytes());
channel.close();
conn.close();
RabbitMQ整合SpringBoot【重点
】
SpringBoot整合RabbitMQ
创建SpringBoot工程
导入依赖
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
编写配置文件
yml
spring:
rabbitmq:
host: 192.168.199.109
port: 5672
username: test
password: test
virtual-host: /test
声明exchange、queue
java
@Configuration
public class RabbitMQConfig {
//1. 创建exchange - topic
@Bean
public TopicExchange getTopicExchange(){
return new TopicExchange("boot-topic-exchange",true,false);
}
//2. 创建queue
@Bean
public Queue getQueue(){
return new Queue("boot-queue",true,false,false,null);
}
//3. 绑定在一起
@Bean
public Binding getBinding(TopicExchange topicExchange,Queue queue){
return BindingBuilder.bind(queue).to(topicExchange).with("*.red.*");
}
}
发布消息到RabbitMQ
java
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!");
}
创建消费者监听消息
java
@Component
public class Consumer {
@RabbitListener(queues = "boot-queue")
public void getMessage(Object message){
System.out.println("接收到消息:" + message);
}
}
手动Ack
添加配置文件
yml
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: manual
手动ack
java
@RabbitListener(queues = "boot-queue")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
System.out.println("接收到消息:" + msg);
int i = 1 / 0;
// 手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
RabbitMQ的其他操作
消息的可靠性
RabbitMQ的事务:事务可以保证消息100%传递,可以通过事务的回滚去记录日志,后面定时再次发送当前消息。事务的操作,效率太低,加了事务操作后,比平时的操作效率至少要慢100倍。
RabbitMQ除了事务,还提供了Confirm的确认机制,这个效率比事务高很多。
普通Confirm方式
java
//3.1 开启confirm
channel.confirmSelect();
//3.2 发送消息
String msg = "Hello-World!";
channel.basicPublish("","HelloWorld",null,msg.getBytes());
//3.3 判断消息发送是否成功
if(channel.waitForConfirms()){
System.out.println("消息发送成功");
}else{
System.out.println("发送消息失败");
}
批量Confirm方式
java
//3.1 开启confirm
channel.confirmSelect();
//3.2 批量发送消息
for (int i = 0; i < 1000; i++) {
String msg = "Hello-World!" + i;
channel.basicPublish("","HelloWorld",null,msg.getBytes());
}
//3.3 确定批量操作是否成功
channel.waitForConfirmsOrDie(); // 当你发送的全部消息,有一个失败的时候,就直接全部失败 抛出异常IOException
异步Confirm方式
java
//3.1 开启confirm
channel.confirmSelect();
//3.2 批量发送消息
for (int i = 0; i < 1000; i++) {
String msg = "Hello-World!" + i;
channel.basicPublish("","HelloWorld",null,msg.getBytes());
}
//3.3 开启异步回调
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送成功,标识:" + deliveryTag + ",是否是批量" + multiple);
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("消息发送失败,标识:" + deliveryTag + ",是否是批量" + multiple);
}
});
消息传递可靠性 |
---|
Return机制
Confirm只能保证消息到达exchange,无法保证消息可以被exchange分发到指定queue。
而且exchange是不能持久化消息的,queue是可以持久化消息。
采用Return机制来监听消息是否从exchange送到了指定的queue中
消息传递可靠性 |
---|
开启Return机制,并在发送消息时,指定mandatory为true
java
// 开启return机制
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 当消息没有送达到queue时,才会执行。
System.out.println(new String(body,"UTF-8") + "没有送达到Queue中!!");
}
});
// 在发送消息时,指定mandatory参数为true
channel.basicPublish("","HelloWorld",true,null,msg.getBytes());
SpringBoot实现
编写配置文件
yml
spring:
rabbitmq:
publisher-confirm-type: simple
publisher-returns: true
开启Confirm和Return
java
@Component
public class PublisherConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback ,RabbitTemplate.ReturnCallback {
@Autowired
private RabbitTemplate rabbitTemplate;
@PostConstruct // init-method
public void initMethod(){
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if(ack){
System.out.println("消息已经送达到Exchange");
}else{
System.out.println("消息没有送达到Exchange");
}
}
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("消息没有送达到Queue");
}
}
避免消息重复消费
重复消费消息,会对非幂等行操作造成问题
重复消费消息的原因是,消费者没有给RabbitMQ一个ack
重复消费 |
---|
为了解决消息重复消费的问题,可以采用Redis,在消费者消费消息之前,现将消息的id放到Redis中,
id-0(正在执行业务)
id-1(执行业务成功)
如果ack失败,在RabbitMQ将消息交给其他的消费者时,先执行setnx,如果key已经存在,获取他的值,如果是0,当前消费者就什么都不做,如果是1,直接ack。
极端情况:第一个消费者在执行业务时,出现了死锁,在setnx的基础上,再给key设置一个生存时间。
生产者,发送消息时,指定messageId
java
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(1) //指定消息书否需要持久化 1 - 需要持久化 2 - 不需要持久化
.messageId(UUID.randomUUID().toString())
.build();
String msg = "Hello-World!";
channel.basicPublish("","HelloWorld",true,properties,msg.getBytes());
消费者,在消费消息时,根据具体业务逻辑去操作redis
java
DefaultConsumer consume = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
Jedis jedis = new Jedis("192.168.199.109",6379);
String messageId = properties.getMessageId();
//1. setnx到Redis中,默认指定value-0
String result = jedis.set(messageId, "0", "NX", "EX", 10);
if(result != null && result.equalsIgnoreCase("OK")) {
System.out.println("接收到消息:" + new String(body, "UTF-8"));
//2. 消费成功,set messageId 1
jedis.set(messageId,"1");
channel.basicAck(envelope.getDeliveryTag(),false);
}else {
//3. 如果1中的setnx失败,获取key对应的value,如果是0,return,如果是1
String s = jedis.get(messageId);
if("1".equalsIgnoreCase(s)){
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
}
};
SpringBoot如何实现
导入依赖
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
编写配置文件
yml
spring:
redis:
host: 192.168.199.109
port: 6379
修改生产者
java
@Test
void contextLoads() throws IOException {
CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("boot-topic-exchange","slow.red.dog","红色大狼狗!!",messageId);
System.in.read();
}
修改消费者
java
@Autowired
private StringRedisTemplate redisTemplate;
@RabbitListener(queues = "boot-queue")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
//0. 获取MessageId
String messageId = message.getMessageProperties().getHeader("spring_returned_message_correlation");
//1. 设置key到Redis
if(redisTemplate.opsForValue().setIfAbsent(messageId,"0",10, TimeUnit.SECONDS)) {
//2. 消费消息
System.out.println("接收到消息:" + msg);
//3. 设置key的value为1
redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
//4. 手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else {
//5. 获取Redis中的value即可 如果是1,手动ack
if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
}
RabbitMQ应用
客户模块
导入依赖
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
编写配置文件
yml
spring:
rabbitmq:
host: 192.168.199.109
port: 5672
username: test
password: test
virtual-host: /test
编写配置类
java
@Configuration
public class RabbitMQConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("openapi-customer-exchange",true,false);
}
@Bean
public Queue queue(){
return new Queue("openapi-customer-queue");
}
@Bean
public Binding binding(Queue queue,TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("openapi.customer.*");
}
}
修改Service
java
//3. 发送消息
rabbitTemplate.convertAndSend("openapi-customer-exchange","openapi.customer.add",JSON.toJSON(customer));
/*//3. 调用搜索模块,添加数据到ES
//1. 准备请求参数和请求头信息
String json = JSON.toJSON(customer);
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.parseMediaType("application/json;charset=utf-8"));
HttpEntity<String> entity = new HttpEntity<>(json,headers);
//2. 使用RestTemplate调用搜索模块
`*/
客户模块
导入依赖
xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
编写配置文件
yml
spring:
rabbitmq:
host: 192.168.199.109
port: 5672
username: test
password: test
virtual-host: /test
listener:
simple:
acknowledge-mode: manual
编写配置类
java
@Configuration
public class RabbitMQConfig {
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("openapi-customer-exchange",true,false);
}
@Bean
public Queue queue(){
return new Queue("openapi-customer-queue");
}
@Bean
public Binding binding(Queue queue, TopicExchange topicExchange){
return BindingBuilder.bind(queue).to(topicExchange).with("openapi.customer.*");
}
}
编写消费者
java
@Component
public class CustomerListener {
@Autowired
private CustomerService customerService;
@RabbitListener(queues = "openapi-customer-queue")
public void consume(String json, Channel channel, Message message) throws IOException {
//1. 获取RoutingKey
String receivedRoutingKey = message.getMessageProperties().getReceivedRoutingKey();
//2. 使用switch
switch (receivedRoutingKey){
case "openapi.customer.add":
//3. add操作调用Service完成添加
customerService.saveCustomer(JSON.parseJSON(json, Customer.class));
//4. 手动ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
}