初体验
新建一个maven项目。
添加父pom:
1 2 3 4 5 6
| <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.1</version> <relativePath/> </parent>
|
添加依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| <dependencies>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency>
<dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency>
</dependencies>
|
添加启动类:
1 2 3 4 5 6 7 8
| @SpringBootApplication public class MyApplication {
public static void main(String[] args) { SpringApplication.run(MyApplication.class, args); }
}
|
添加application.yml
这时候启动应该不成问题。
添加amqp依赖:
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
添加消息队列配置:
1 2 3 4 5 6 7 8
| spring: rabbitmq: host: 127.0.0.1 port: 5672 virtual-host: / password: 123456 username: admin
|
添加配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitMQConfig {
public static final String EXCHANGE_NAME = "order_exchange"; public static final String QUEUE_NAME = "order_queue";
@Bean public Exchange orderExchange() { return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); }
@Bean public Queue orderQueue() { return QueueBuilder.durable(QUEUE_NAME).build(); }
@Bean public Binding orderBinding(Queue queue, Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs(); }
}
|
消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
import java.io.IOException;
@Component @RabbitListener(queues = "order_queue") public class OrderMQListener {
@RabbitHandler public void releaseCouponRecord(String msg, Message message) throws IOException { long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("msgTag="+msgTag); System.out.println("message="+message); System.out.println("监听到消息:消息内容:"+new String(message.getBody(), "utf-8")); }
}
|
生产者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import org.example.MyApplication; import org.example.RabbitMQConfig; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class) @SpringBootTest(classes = MyApplication.class) public class DemoApplicationTests {
@Autowired private RabbitTemplate template;
@Test public void send() { template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","新订单来啦1"); }
}
|
运行spring boot 项目, 再运行单元测试方法send()。看到消费者打印消息。
可靠性的回调(ConfirmCallback与ReturnsCallback)
ConfirmCallback
⽣产者到交换机
⽣产者投递消息后,如果Broker收到消息后,会给⽣产 者⼀个ACK。⽣产者通过ACK,可以确认这条消息是否 正常发送到Broker,这种⽅式是消息可靠性投递的核⼼。
开启confirmCallback
1 2 3 4
| spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type:correlated
|
代码实例
注释掉RabbitMQConfig类中代码只留两个静态变量(因为如果这类的代码会创建交换机、队列、绑定关系)。
在单元测试类中添加:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Test public void testConfirmCallback() { template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("confirm=====>"); System.out.println("confirm==== ack=" + ack); System.out.println("confirm==== cause=" + cause); } });
template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "新订单来啦1"); }
|
直接启动单元测试方法,ack=true的消息。
修改交换机名称,找不到交换机,所有收到ack=false的消息和原因。
ReturnsCallback
交换机到队列。
消息从交换器发送到对应队列失败时触发。
两种模式:
交换机到队列不成功,则丢弃消息(默认)。
交换机到队列不成功,返回给消息⽣产者,触发returnCallback。
开启ReturnsCallback
1 2
| spring.rabbitmq.publisher-returns=true
|
代码实例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Test public void testReturnCallback() { template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { @Override public void returnedMessage(ReturnedMessage returned) { int code = returned.getReplyCode(); System.out.println("code=" + code);
System.out.println("returned=" + returned.toString()); } });
template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "xxx.order.new", "新订单来啦11"); }
|
直接启动单元测试方法,收到code=3xx的消息,把消息内容发送回来了。
把routekey(xxx.order.new) 改为 order.new,这时收不到ReturnsCallback。
mandatory
开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息到RabbitTemplate.ReturnCallback中的returnedMessage方法。
浏览RabbitAutoConfiguration发现如下代码:
1 2 3 4
| private boolean determineMandatoryFlag() { Boolean mandatory = this.properties.getTemplate().getMandatory(); return (mandatory != null ? mandatory : this.properties.isPublisherReturns()); }
|
如果设置了mandatory参数,则直接取值;如若mandatory参数为空,则取之于否起开了消息回退,所以设置一个publisher-returns参数即可。
ACK
为什么要有ACK机制。因为mq需要知道哪些消息已经发了,哪些没有发。
消费者从RabbitMQ收到消息并处理完成后,反馈给 RabbitMQ,RabbitMQ收到反馈后才将此消息从队列 中删除。
消费者在处理消息出现了⽹络不稳定、服务器异常等现 象,那么就不会有ACK反馈,RabbitMQ会认为这个消 息没有正常消费,会将消息重新放⼊队列中。
只有当消费者正确发送ACK反馈,RabbitMQ确认收到 后,消息才会从RabbitMQ服务器的数据中删除。
消息的ACK确认机制默认是打开的,消息如未被进⾏ ACK的消息,这条消息被标记为Unacked。
确认模式
自动确认和手动确认。
自动确认就是最初的例子的消费者。
手动确认
1 2 3 4 5
| spring: rabbitmq: listener: simple: acknowledge-mode: MANUAL
|
这时候发送消息,也是可以收到消息的,但是进入rabbit后台管理页面,查看相应的队列,发现消息为Unacked。
把消费者改为:
1 2 3 4 5 6 7 8
| @RabbitHandler public void releaseCouponRecord(String msg, Message message, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws IOException { long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println("msgTag="+msgTag); System.out.println("message="+message); System.out.println("监听到消息:消息内容:"+new String(message.getBody(), "utf-8")); channel.basicAck(deliveryTag, false); }
|
1
| channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
|
ack表示确认消息。multiple:false只确认该delivery_tag的消息,true确认该delivery_tag前的所有消息(tag=10 7、8、9也被确认)。
1
| channel.basicReject(msg.getMessageProperties().getDeliveryTag(),false);
|
Reject表示拒绝消息。requeue:false表示被拒绝的消息是丢弃;true表示重回队列
1
| channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,false);
|
nack表示拒绝消息。multiple表示拒绝指定了delivery_tag的所有未确认的消息,requeue表示是不是重回队列
deliveryTag 表示消息投递序号,每次消费消息或者消息重新投递后, deliveryTag都会增加。
重试
我使用spring boot 整合rabbitmq后,在消费者方法中添加异常,但是一直不会等到重试。等看源码时再说吧。
为什么要重试机制。因为有可能会出现网络波动或者消费者逻辑代码错误。如果自动ACK,消息直接丢了,如果手动ACK直接进入unacked状态,程序断开于rabbitmq的链接后unacked的消息状态会重新变为ready 等待消费。
如果是代码逻辑问题,重试几次还是不成功,程序员改代码,重启(断开于rabbitmq的连接),消费。
如果是网络波动,不管是自动ack还是手动ack都会重试几次,不可能一直波动吧。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| spring: rabbitmq: listener: simple: retry: enabled: true max-attempts: 5 max-interval: 10000ms initial-interval: 2000ms multiplier: 2
|
这时我在消费者中修改:
1 2 3 4 5 6 7 8 9 10 11
| @RabbitHandler public void releaseCouponRecord(String msg, Message message, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws IOException { long msgTag = message.getMessageProperties().getDeliveryTag(); System.out.println(new Date(System.currentTimeMillis())); System.out.println("msgTag="+msgTag); System.out.println("message="+message); System.out.println("监听到消息:消息内容:"+new String(message.getBody(), "utf-8")); throw new IOException(); }
|
看时间,发现配置生效,只有最后一次会抛出异常。
死信队列
什么是TTL?
time to live 消息存活时间
如果消息在存活时间内未被消费,则会别清除
RabbitMQ⽀持两种ttl设置:
单独消息进⾏配置ttl
整个队列进⾏配置ttl(使用的居多)
什么是rabbitmq的死信队列?
没有被及时消费的消息存放的队列。
什么是rabbitmq的死信交换机?
Dead Letter Exchange(死信交换机,缩写:DLX)当 消息成为死信后,会被重新发送到另⼀个交换机,这个交换机就是DLX死信交换机。
交换机 —-> 队列 —消息过期,成为死信–> 死信交换机 —> 死信队列
消息有哪⼏种情况成为死信?
1.消费者拒收消息(basic.reject/ basic.nack),并且 没有重新⼊队 requeue=false。
2.消息在队列中未被消费,且超过队列或者消息本身的过期时间TTL(time-to-live)。
3.队列的消息⻓度达到极限。
消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列。
实例
新建一个topic类型的名为dead_exchange的死信交换机,跟普通的交换机是一样的。
新建一个名为dead_queue的队列,跟普通的队列一样。
在死信交换机下建立绑定关系,dead_queue 与 dead.#
新建一个名为product_queue的队列,并设置参数:
x-message-ttl = 10000
x-dead-letter-exchange = dead_exchange
x-dead-letter-routing-key = dead.product
这时候用管理平台发送消息,10秒后会有消息进入死信队列。
也可以在发送消息时设置header,expiration = 5000, 5秒后会有消息进入死信队列。
RabbitMQ中TTL设置:
队列过期时间使⽤参数,对整个队列消息统⼀过期, x-message-ttl, 单位ms(毫秒)。
消息过期时间使⽤参数(如果队列头部消息未过期,队列中部消息已经过期,中部还在队列⾥⾯)
两者都配置的话,时间短的先触发。
延时队列
什么是延迟队列?
⼀种带有延迟功能的消息队列,Producer 将消息发送到消息队列服务端,但并不期望这条消息⽴⻢投递,⽽是推迟到在当前时间点之后的某⼀个时间投递到 Consumer 进⾏消费,该消息即定时消息。
使⽤场景:
1.通过消息触发⼀些定时任务,⽐如在某⼀固定时间点向 ⽤户发送提醒消息;
2.⽤户登录之后5分钟给⽤户做分类推送、⽤户多少天未 登录给⽤户做召回推送;
3.消息⽣产和消费有时间窗⼝要求:⽐如在天猫电商交易 中超时未⽀付关闭订单的场景,在订单创建时会发送⼀ 条 延时消息。这条消息将会在 30 分钟以后投递给消费 者,消费者收到此消息后需要判断对应的订单是否已完 成⽀付。 如⽀付未完成,则关闭订单。如已完成⽀付则忽略。
实现方式
本身没有直接支持延迟队列的功能,但是可以通过TTL和DLX模拟出延迟队列的功能。
还可以安装rabbitmq_delayed_message_exchange插件来实现。
TTL和DLX
这一种实现上面已经在管理平台操作了,转换成代码即可。
插件方式
安装插件有别的文章已经写好了。
声明一个延时队列交换机:
1 2 3
| Map<String, Object> args = new HashMap<String, Object>(); args.put("x-delayed-type", "direct"); channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
|
或者
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Bean("exchangeA") public CustomExchange delayedExchange(){ HashMap<String, Object> arguments = new HashMap<>(); arguments.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message",true,false,arguments); }
|
发送消息:
1 2 3 4 5 6 7 8 9 10
| @Test public void send1() { template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME, "order.new", "新订单来啦1", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(60 * 1000); return message; } }); }
|
RabbitmqAdmin使用和到底用不用
RabbitAdmin类完成对Exchange,Queue,Binging的操作,在容器中管理了RabbitAdmin类的时候,可以对Exchange,Queue,Binging进行自动声明。
1 2 3 4 5 6 7 8 9 10 11 12
| @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){ RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory); rabbitAdmin.declareExchange(new DirectExchange("xxx.direct.exchange",true,false)); rabbitAdmin.declareQueue(new Queue("xxx.debug",true)); rabbitAdmin.declareBinding(new Binding("xxx.debug",Binding.DestinationType.QUEUE, "xxx.direct.exchange","xxx.hehe",new HashMap())); return rabbitAdmin; }
|
在spring boot 启动后,会自动创建交换机、队列、绑定关系。
到底用不用
用:可以明确绑定关系,在用迁移或者本地搭建的需求时,可以自动创建。
不用:让运维人员来维护交换机、队列、绑定关系,迁移和本地搭建时比较麻烦,流程规范了也就是繁琐了。
SimpleMessageListenerContainer
消息监听器容器
感觉有一定的局限性,不使用。
MessageListenerAdapter
消息监听适配器(adapter),通过反射将消息处理委托给目标监听器的处理方法,并进行灵活的消息类型转换。
感觉有一定的局限性,不使用。
MessageConverter
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @RabbitHandler public void releaseCouponRecord1(String msg, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws IOException { System.out.println("监听到消息:消息内容1:"+ msg); channel.basicAck(deliveryTag, false); }
@RabbitHandler public void releaseCouponRecord2(Message msg, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws IOException { System.out.println("监听到消息:消息内容2:"+ msg); channel.basicAck(deliveryTag, false); }
@RabbitHandler public void releaseCouponRecord3(byte[] msg, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws IOException { System.out.println("监听到消息:消息内容3:"+ msg); channel.basicAck(deliveryTag, false); }
|
如果在管理平台发送xxx文字,会发送至方法3。
spring boot 提供了MessageConverter用于消息的转化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Bean public MessageConverter myMessageConverter() { return new MessageConverter(){
@Override public Message toMessage(Object o, MessageProperties messageProperties) throws MessageConversionException { System.out.println("tomessage1" + o); return null; }
@Override public Message toMessage(Object object, MessageProperties messageProperties, Type genericType) throws MessageConversionException { System.out.println("tomessage2" + object); return MessageConverter.super.toMessage(object, messageProperties, genericType); }
@Override public Object fromMessage(Message message) throws MessageConversionException { System.out.println("fromMessage2" + message); try { return new String(message.getBody(), "utf-8"); } catch (UnsupportedEncodingException e) { throw new RuntimeException(e); } } }; }
|
这时候会调用第一个方法。
spring 根据 contentType 找到适合的处理方法处理。
方法一:生产者消费者统一contentType
方法二:设置默认处理方法
1 2 3
| @RabbitHandler(isDefault = true) public void process(Message message) { }
|
方法三:把RabbitListener标注在方法上
实践总结
一般可以在发送之后保存至可持久化的数据库中。
在confirmCallback和returnsCallback中修改相应表的状态,如果没有发送成功记录原因等。这样可以快速的排查问题。
再加上rabbitmqAdmin的用不用。
最好设置消费者默认处理方法为Message message对象。
可选消息转换类。