springboot简单使用rabbitmq

初体验

新建一个maven项目。

添加父pom:

1
2
3
4
5
6
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.1</version>
<relativePath/>
</parent>

添加依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<dependencies>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>

</dependencies>

添加启动类:

1
2
3
4
5
6
7
8
@SpringBootApplication
public class MyApplication {

public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}

}

添加application.yml

1
2
server:
port: 8080

这时候启动应该不成问题。

添加amqp依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

添加消息队列配置:

1
2
3
4
5
6
7
8
#消息队列
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
virtual-host: /
password: 123456
username: admin

添加配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMQConfig {

public static final String EXCHANGE_NAME = "order_exchange";
public static final String QUEUE_NAME = "order_queue";

/**
* 交换机
* @return
*/
@Bean
public Exchange orderExchange() {
// 这里是build模式
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
//return new TopicExchange(EXCHANGE_NAME, true, false);
}

/**
* 队列
* @return
*/
@Bean
public Queue orderQueue() {
return QueueBuilder.durable(QUEUE_NAME).build();
//return new Queue(QUEUE_NAME, true,false, false, null);
}

/**
* 交换机和队列绑定关系
*/
@Bean
public Binding orderBinding(Queue queue, Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
//return new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, "order.#", null);
}

}

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
@RabbitListener(queues = "order_queue")
public class OrderMQListener {

/**
* RabbitHandler 会⾃动匹配 消息类型(消息⾃动确认)
* @param msg
* @param message
* @throws IOException
*/
@RabbitHandler
public void releaseCouponRecord(String msg, Message message) throws IOException {
long msgTag = message.getMessageProperties().getDeliveryTag();
System.out.println("msgTag="+msgTag);
System.out.println("message="+message);
System.out.println("监听到消息:消息内容:"+new String(message.getBody(), "utf-8"));
}

}

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import org.example.MyApplication;
import org.example.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest(classes = MyApplication.class)
public class DemoApplicationTests {

@Autowired
private RabbitTemplate template;

@Test
public void send() {
template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","新订单来啦1");
}

}

运行spring boot 项目, 再运行单元测试方法send()。看到消费者打印消息。

可靠性的回调(ConfirmCallback与ReturnsCallback)

ConfirmCallback

⽣产者到交换机

⽣产者投递消息后,如果Broker收到消息后,会给⽣产 者⼀个ACK。⽣产者通过ACK,可以确认这条消息是否 正常发送到Broker,这种⽅式是消息可靠性投递的核⼼。

开启confirmCallback

1
2
3
4
#旧版,确认消息发送成功,通过实现ConfirmCallBack接⼝,消息发送到交换器Exchange后触发回调
spring.rabbitmq.publisher-confirms=true
#新版,NONE值是禁⽤发布确认模式,是默认值,CORRELATED值是发布消息成功到交换器后会触发回调⽅法
spring.rabbitmq.publisher-confirm-type:correlated

代码实例

注释掉RabbitMQConfig类中代码只留两个静态变量(因为如果这类的代码会创建交换机、队列、绑定关系)。

在单元测试类中添加:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
public void testConfirmCallback() {
template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/**
*
* @param correlationData 配置
* @param ack 交换机是否收到消息,true是成
功,false是失败
* @param cause 失败的原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("confirm=====>");
System.out.println("confirm==== ack=" + ack);
System.out.println("confirm==== cause=" + cause);
// 根据ACK状态做对应的消息更新操作 TODO
}
});

template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "新订单来啦1");
}

直接启动单元测试方法,ack=true的消息。

修改交换机名称,找不到交换机,所有收到ack=false的消息和原因。

ReturnsCallback

交换机到队列。

消息从交换器发送到对应队列失败时触发。

两种模式:

交换机到队列不成功,则丢弃消息(默认)。

交换机到队列不成功,返回给消息⽣产者,触发returnCallback。

开启ReturnsCallback

1
2
#新版
spring.rabbitmq.publisher-returns=true

代码实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void testReturnCallback() {
//为true,则交换机处理消息到路由失败,则会返回给⽣产者
//开启强制消息投递(mandatory为设置为true),但消息未被路由⾄任何⼀个queue,则回退⼀条消息
template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void
returnedMessage(ReturnedMessage returned) {
int code = returned.getReplyCode();
System.out.println("code=" + code);

System.out.println("returned=" + returned.toString());
}
});

template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "xxx.order.new", "新订单来啦11");
}

直接启动单元测试方法,收到code=3xx的消息,把消息内容发送回来了。

把routekey(xxx.order.new) 改为 order.new,这时收不到ReturnsCallback。

mandatory

开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息到RabbitTemplate.ReturnCallback中的returnedMessage方法。

浏览RabbitAutoConfiguration发现如下代码:

1
2
3
4
private boolean determineMandatoryFlag() {
Boolean mandatory = this.properties.getTemplate().getMandatory();
return (mandatory != null ? mandatory : this.properties.isPublisherReturns());
}

如果设置了mandatory参数,则直接取值;如若mandatory参数为空,则取之于否起开了消息回退,所以设置一个publisher-returns参数即可。

ACK

为什么要有ACK机制。因为mq需要知道哪些消息已经发了,哪些没有发。

消费者从RabbitMQ收到消息并处理完成后,反馈给 RabbitMQ,RabbitMQ收到反馈后才将此消息从队列 中删除。

消费者在处理消息出现了⽹络不稳定、服务器异常等现 象,那么就不会有ACK反馈,RabbitMQ会认为这个消 息没有正常消费,会将消息重新放⼊队列中。

只有当消费者正确发送ACK反馈,RabbitMQ确认收到 后,消息才会从RabbitMQ服务器的数据中删除。

消息的ACK确认机制默认是打开的,消息如未被进⾏ ACK的消息,这条消息被标记为Unacked。

确认模式

自动确认和手动确认。

自动确认就是最初的例子的消费者。

手动确认

1
2
3
4
5
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: MANUAL

这时候发送消息,也是可以收到消息的,但是进入rabbit后台管理页面,查看相应的队列,发现消息为Unacked。

把消费者改为:

1
2
3
4
5
6
7
8
@RabbitHandler
public void releaseCouponRecord(String msg, Message message, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws IOException {
long msgTag = message.getMessageProperties().getDeliveryTag();
System.out.println("msgTag="+msgTag);
System.out.println("message="+message);
System.out.println("监听到消息:消息内容:"+new String(message.getBody(), "utf-8"));
channel.basicAck(deliveryTag, false);
}
1
channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);

ack表示确认消息。multiple:false只确认该delivery_tag的消息,true确认该delivery_tag前的所有消息(tag=10 7、8、9也被确认)。

1
channel.basicReject(msg.getMessageProperties().getDeliveryTag(),false);

Reject表示拒绝消息。requeue:false表示被拒绝的消息是丢弃;true表示重回队列

1
channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,false);

nack表示拒绝消息。multiple表示拒绝指定了delivery_tag的所有未确认的消息,requeue表示是不是重回队列

deliveryTag 表示消息投递序号,每次消费消息或者消息重新投递后, deliveryTag都会增加。

重试

我使用spring boot 整合rabbitmq后,在消费者方法中添加异常,但是一直不会等到重试。等看源码时再说吧。

为什么要重试机制。因为有可能会出现网络波动或者消费者逻辑代码错误。如果自动ACK,消息直接丢了,如果手动ACK直接进入unacked状态,程序断开于rabbitmq的链接后unacked的消息状态会重新变为ready 等待消费。

如果是代码逻辑问题,重试几次还是不成功,程序员改代码,重启(断开于rabbitmq的连接),消费。

如果是网络波动,不管是自动ack还是手动ack都会重试几次,不可能一直波动吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true
# 重试次数,默认为3次
max-attempts: 5
# 重试最大间隔时间
max-interval: 10000ms
# 重试初始间隔时间
initial-interval: 2000ms
# 间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
multiplier: 2

这时我在消费者中修改:

1
2
3
4
5
6
7
8
9
10
11
@RabbitHandler
public void releaseCouponRecord(String msg, Message message, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws IOException {
long msgTag = message.getMessageProperties().getDeliveryTag();
System.out.println(new Date(System.currentTimeMillis()));
System.out.println("msgTag="+msgTag);
System.out.println("message="+message);
System.out.println("监听到消息:消息内容:"+new String(message.getBody(), "utf-8"));
// 抛出一个运行时异常,模仿消费异常
throw new IOException();
// channel.basicAck(deliveryTag, false);
}

看时间,发现配置生效,只有最后一次会抛出异常。

死信队列

什么是TTL?

time to live 消息存活时间

如果消息在存活时间内未被消费,则会别清除

RabbitMQ⽀持两种ttl设置:

单独消息进⾏配置ttl

整个队列进⾏配置ttl(使用的居多)

什么是rabbitmq的死信队列?

没有被及时消费的消息存放的队列。

什么是rabbitmq的死信交换机?

Dead Letter Exchange(死信交换机,缩写:DLX)当 消息成为死信后,会被重新发送到另⼀个交换机,这个交换机就是DLX死信交换机。

交换机 —-> 队列 —消息过期,成为死信–> 死信交换机 —> 死信队列

消息有哪⼏种情况成为死信?

1.消费者拒收消息(basic.reject/ basic.nack),并且 没有重新⼊队 requeue=false。

2.消息在队列中未被消费,且超过队列或者消息本身的过期时间TTL(time-to-live)。

3.队列的消息⻓度达到极限。

消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列。

实例

新建一个topic类型的名为dead_exchange的死信交换机,跟普通的交换机是一样的。

新建一个名为dead_queue的队列,跟普通的队列一样。

在死信交换机下建立绑定关系,dead_queue 与 dead.#

新建一个名为product_queue的队列,并设置参数:

x-message-ttl = 10000

x-dead-letter-exchange = dead_exchange

x-dead-letter-routing-key = dead.product

这时候用管理平台发送消息,10秒后会有消息进入死信队列。

也可以在发送消息时设置header,expiration = 5000, 5秒后会有消息进入死信队列。

RabbitMQ中TTL设置:

队列过期时间使⽤参数,对整个队列消息统⼀过期, x-message-ttl, 单位ms(毫秒)。

消息过期时间使⽤参数(如果队列头部消息未过期,队列中部消息已经过期,中部还在队列⾥⾯)

两者都配置的话,时间短的先触发。

延时队列

什么是延迟队列?

⼀种带有延迟功能的消息队列,Producer 将消息发送到消息队列服务端,但并不期望这条消息⽴⻢投递,⽽是推迟到在当前时间点之后的某⼀个时间投递到 Consumer 进⾏消费,该消息即定时消息。

使⽤场景:

1.通过消息触发⼀些定时任务,⽐如在某⼀固定时间点向 ⽤户发送提醒消息;

2.⽤户登录之后5分钟给⽤户做分类推送、⽤户多少天未 登录给⽤户做召回推送;

3.消息⽣产和消费有时间窗⼝要求:⽐如在天猫电商交易 中超时未⽀付关闭订单的场景,在订单创建时会发送⼀ 条 延时消息。这条消息将会在 30 分钟以后投递给消费 者,消费者收到此消息后需要判断对应的订单是否已完 成⽀付。 如⽀付未完成,则关闭订单。如已完成⽀付则忽略。

实现方式

本身没有直接支持延迟队列的功能,但是可以通过TTL和DLX模拟出延迟队列的功能。

还可以安装rabbitmq_delayed_message_exchange插件来实现。

TTL和DLX

这一种实现上面已经在管理平台操作了,转换成代码即可。

插件方式

安装插件有别的文章已经写好了。

声明一个延时队列交换机:

1
2
3
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);

或者

1
2
3
4
5
6
7
8
9
10
11
12
13
@Bean("exchangeA")
public CustomExchange delayedExchange(){
HashMap<String, Object> arguments = new HashMap<>();
//自定义交换机的类型
arguments.put("x-delayed-type", "direct");
/**
* 交换机名
* 交换机类型
* 持久化
* 自动删除
*/
return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,arguments);
}

发送消息:

1
2
3
4
5
6
7
8
9
10
@Test
public void send1() {
template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "新订单来啦1", new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setDelay(60 * 1000);
return message;
}
});
}

RabbitmqAdmin使用和到底用不用

RabbitAdmin类完成对Exchange,Queue,Binging的操作,在容器中管理了RabbitAdmin类的时候,可以对Exchange,Queue,Binging进行自动声明。

1
2
3
4
5
6
7
8
9
10
11
12
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
//定义Exchange
rabbitAdmin.declareExchange(new DirectExchange("xxx.direct.exchange",true,false));
//定义队列
rabbitAdmin.declareQueue(new Queue("xxx.debug",true));
//定义绑定关系
rabbitAdmin.declareBinding(new Binding("xxx.debug",Binding.DestinationType.QUEUE,
"xxx.direct.exchange","xxx.hehe",new HashMap()));
return rabbitAdmin;
}

在spring boot 启动后,会自动创建交换机、队列、绑定关系。

到底用不用

用:可以明确绑定关系,在用迁移或者本地搭建的需求时,可以自动创建。

不用:让运维人员来维护交换机、队列、绑定关系,迁移和本地搭建时比较麻烦,流程规范了也就是繁琐了。

SimpleMessageListenerContainer

消息监听器容器

感觉有一定的局限性,不使用。

MessageListenerAdapter

消息监听适配器(adapter),通过反射将消息处理委托给目标监听器的处理方法,并进行灵活的消息类型转换。

感觉有一定的局限性,不使用。

MessageConverter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@RabbitHandler
public void releaseCouponRecord1(String msg, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws IOException {
System.out.println("监听到消息:消息内容1:"+ msg);
channel.basicAck(deliveryTag, false);
}

@RabbitHandler
public void releaseCouponRecord2(Message msg, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws IOException {
System.out.println("监听到消息:消息内容2:"+ msg);
channel.basicAck(deliveryTag, false);
}

@RabbitHandler
public void releaseCouponRecord3(byte[] msg, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws IOException {
System.out.println("监听到消息:消息内容3:"+ msg);
channel.basicAck(deliveryTag, false);
}

如果在管理平台发送xxx文字,会发送至方法3。

spring boot 提供了MessageConverter用于消息的转化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Bean
public MessageConverter myMessageConverter() {
return new MessageConverter(){

@Override
public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException {
System.out.println("tomessage1" + o);
return null;
}

@Override
public Message toMessage(Object object, MessageProperties messageProperties, Type genericType) throws MessageConversionException {
System.out.println("tomessage2" + object);
return MessageConverter.super.toMessage(object, messageProperties, genericType);
}

@Override
public Object fromMessage(Message message) throws MessageConversionException {
System.out.println("fromMessage2" + message);
try {
return new String(message.getBody(), "utf-8");
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
};
}

这时候会调用第一个方法。

spring 根据 contentType 找到适合的处理方法处理。

方法一:生产者消费者统一contentType

方法二:设置默认处理方法

1
2
3
@RabbitHandler(isDefault = true)
public void process(Message message) {
}

方法三:把RabbitListener标注在方法上

实践总结

一般可以在发送之后保存至可持久化的数据库中。

在confirmCallback和returnsCallback中修改相应表的状态,如果没有发送成功记录原因等。这样可以快速的排查问题。

再加上rabbitmqAdmin的用不用。

最好设置消费者默认处理方法为Message message对象。

可选消息转换类。


springboot简单使用rabbitmq
http://hanqichuan.com/2023/06/16/mq/springboot简单使用rabbitmq/
作者
韩启川
发布于
2023年6月16日
许可协议