rocketmq的使用

使用

https://rocketmq.apache.org/zh/docs/quickStart/01quickstart

安装条件

  1. 64位操作系统,推荐 Linux/Unix/macOS
  2. 64位 JDK 1.8+
  3. 如果源码编译安装需要安装maven

下载源码并编译

1
2
3
4
5
wget https://dist.apache.org/repos/dist/release/rocketmq/5.1.4/rocketmq-all-5.1.4-source-release.zip
unzip rocketmq-all-5.1.4-source-release.zip
cd rocketmq-all-5.1.4-source-release/
mvn -Prelease-all -DskipTests -Dspotbugs.skip=true clean install -U
cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4

如果下载编译好的包直接进入下面一步。

启动NameServer

1
2
3
4
5
6
cd /usr/local/rocketmq-all-5.1.4-bin-release
### 启动namesrv
nohup sh bin/mqnamesrv &
### 验证namesrv是否启动成功
tail -f ~/logs/rocketmqlogs/namesrv.log
### 结果:The Name Server boot success. serializeType=JSON

nameserver相当于注册中心, 端口为9876

启动Broker+Proxy

5.x 版本下即 Broker 和 Proxy 同进程部署。

脚本里使用G1收集器:sh

1
在runbroker.sh文件中 -XX:+UseG1GC之前加上-XX:+UnlockExperimentalVMOptions

把堆内存改小,原来8g改为2g。

1
2
3
4
### 先启动broker
nohup sh bin/mqbroker -n localhost:9876 --enable-proxy &

tail -f ~/logs/rocketmqlogs/proxy.log

broker是具体存储消息的服务,默认端口10911.

这种属于Local模式, 则是通过BrokerController对象调用Broker的方法实现发送、消费等业务;

如果Proxy启动Cluster模式,则是通过RemotingClient访问Broker实现发送、消费等业务。代码的时候以前使用nameAddr=localhost:9876。 有proxy后使用localhost:10911。

使用命令行测试消息收发

1
2
3
4
5
6
export NAMESRV_ADDR=localhost:9876
### 生产消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

### 消费消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

关闭服务器

1
2
3
4
# 关闭broker
sh bin/mqshutdown broker
# 关闭nameServer
sh bin/mqshutdown namesrv

console控制台

需要安装maven

https://codeload.github.com/apache/rocketmq-externals/zip/refs/tags/rocketmq-console-1.0.0

1
2
unzip rocketmq-externals-rocketmq-console-1.0.0.zip
cd /usr/local/rocketmq-externals-rocketmq-console-1.0.0/rocketmq-console

修改配置文件application.properties:

1
rocketmq.config.namesrvAddr=localhost:9876

运行:

1
mvn spring-boot:run

或者

1
2
mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-console-ng-1.0.0.jar

访问localhost:8080

Dashboard控制台

https://github.com/apache/rocketmq-dashboard

应该是console拆分出去重命名为Dashboard了。

RocketMQ >= 5.0使用新版本控制台,RocketMQ< 5.0时,使用旧版本。

https://github.com/apache/rocketmq-dashboard/blob/master/docs/1_0_0/UserGuide_CN.md

https://codeload.github.com/apache/rocketmq-dashboard/zip/refs/tags/rocketmq-dashboard-1.0.0

1
2
3
unzip rocketmq-dashboard-rocketmq-dashboard-1.0.0.zip
mvn clean package -Dmaven.test.skip=true
java -jar target/rocketmq-dashboard-1.0.1-SNAPSHOT.jar

普通maven应用使用

创建主题

通过mqadmin创建 Topic。

1
sh bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster

引入依赖

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>5.1.4</version>
</dependency>

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47

import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerExample {
private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);

public static void main(String[] args) throws ClientException {
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
String endpoint = "192.168.158.135:8081";
// 消息发送的目标Topic名称,需要提前创建。
String topic = "TestTopic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();
// 初始化Producer时需要设置通信配置以及预绑定的Topic。
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
// 普通消息发送。
Message message = provider.newMessageBuilder()
.setTopic(topic)
// 设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
// 设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
// 消息体。
.setBody("messageBody".getBytes())
.build();
try {
// 发送消息,需要关注发送结果,并捕获失败等异常。
SendReceipt sendReceipt = producer.send(message);
logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (ClientException e) {
logger.error("Failed to send message", e);
}
// producer.close();
}
}

消费者

Apache RocketMQ 支持SimpleConsumer和PushConsumer两种消费者类型,您可以选择以下任意一种方式订阅消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import java.io.IOException;
import java.util.Collections;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PushConsumerExample {
private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);

private PushConsumerExample() {
}

public static void main(String[] args) throws ClientException, IOException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
String endpoints = "192.168.158.135:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
// 订阅消息的过滤规则,表示订阅所有Tag的消息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// 为消费者指定所属的消费者分组,Group需要提前创建。
String consumerGroup = "YourConsumerGroup";
// 指定需要订阅哪个目标Topic,Topic需要提前创建。
String topic = "TestTopic";
// 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// 设置消费者分组。
.setConsumerGroup(consumerGroup)
// 设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// 设置消费监听器。
.setMessageListener(messageView -> {
// 处理消息并返回消费结果。
logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// 如果不需要再使用 PushConsumer,可关闭该实例。
// pushConsumer.close();
}
}

spring boot项目使用

https://github.com/apache/rocketmq-spring

https://github.com/apache/rocketmq-spring/tree/master/rocketmq-spring-boot-samples

引入依赖

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>

配置

application.properties

1
2
3
4
5
rocketmq.name-server=192.168.158.136:9876
rocketmq.producer.group=my-group1
rocketmq.producer.sendMessageTimeout=300000
# default producer tls config
rocketmq.producer.tls-enable=false

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class ProducerController {

@Resource
private RocketMQTemplate rocketMQTemplate;

@GetMapping("/sendMsg")
public void sendMsg() {
rocketMQTemplate.syncSend("TestTopic", "消息内容");
}

}

请求http://localhost:8080/sendMsg

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13

import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

@Service
@RocketMQMessageListener(topic = "TestTopic", consumerGroup = "string_consumer")
public class StringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.printf("------- StringConsumer received: %s \n", message);
}
}

读文档

需要把官方文档大概读一遍。

5.0的文档与4.0的文档相比:4.0的文档中初识RocketMq中也比较重要,但5.0中没有。

初识rocketmq

RocketMQ的基础消息模型就是一个简单的Pub/Sub模型。发布订阅模型。

rocketmq是基于主题(topic)的消息队列系统。

rocketmq为了支持高并发和水平扩展,中间的消息主题需要进行分区,同一个Topic会有多个生产者,同一个信息会有多个消费者,消费者之间要进行负载均衡等。

rocketmq的消息模型

上图包括两个生产者两个消息Topic,以及两组消费者 Comsumer

存储消息Topic的 代理服务器( Broker ),是实际部署过程对应的代理服务器。

为了消息写入能力的水平扩展,RocketMQ 对 Topic进行了分区,这种操作被称为队列(MessageQueue)。

为了消费能力的水平扩展,ConsumerGroup的概念应运而生。

rocketmq的部署模型

Apache RocketMQ 部署架构上主要分为四部分:

生产者 Producer:

发布消息的角色。Producer通过 MQ 的负载均衡模块选择相应的 Broker 集群队列进行消息投递,投递的过程支持快速失败和重试。

消费者 Consumer:

消息消费的角色。

支持以推(push),拉(pull)两种模式对消息进行消费。

同时也支持集群方式和广播方式的消费。

提供实时消息订阅机制,可以满足大多数用户的需求。

名字服务器 NameServer:

NameServer是一个简单的 Topic 路由注册中心,支持 Topic、Broker 的动态注册与发现。

主要包括两个功能:

  • Broker管理,NameServer接受Broker集群的注册信息并且保存下来作为路由信息的基本数据。然后提供心跳检测机制,检查Broker是否还存活;
  • 路由信息管理,每个NameServer将保存关于 Broker 集群的整个路由信息和用于客户端查询的队列信息。Producer和Consumer通过NameServer就可以知道整个Broker集群的路由信息,从而进行消息的投递和消费。

NameServer通常会有多个实例部署,各实例间相互不进行信息通讯。Broker是向每一台NameServer注册自己的路由信息,所以每一个NameServer实例上面都保存一份完整的路由信息。当某个NameServer因某种原因下线了,客户端仍然可以向其它NameServer获取路由信息。

代理服务器 Broker:

Broker主要负责消息的存储、投递和查询以及服务高可用保证。

NameServer几乎无状态节点,因此可集群部署,节点之间无任何信息同步。Broker部署相对复杂。

在 Master-Slave 架构中,Broker 分为 Master 与 Slave。一个Master可以对应多个Slave,但是一个Slave只能对应一个Master。Master 与 Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。

小结
  • 每个 BrokerNameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer。
  • ProducerNameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取Topic路由信息,并向提供 Topic 服务的 Master 建立长连接,且定时向 Master 发送心跳。Producer 完全无状态。
  • ConsumerNameServer 集群中的其中一个节点建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接,且定时向 Master、Slave发送心跳。Consumer 既可以从 Master 订阅消息,也可以从Slave订阅消息。

RocketMQ集群工作流程

1.启动NameServer

启动NameServer。NameServer启动后监听端口,等待Broker、Producer、Consumer连接,相当于一个路由控制中心。

2.启动 Broker

启动 Broker。与所有 NameServer 保持长连接,定时发送心跳包。心跳包中包含当前 Broker 信息以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic跟Broker 的映射关系。

3.创建 Topic

创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建Topic。

4.生产者发送消息

生产者发送消息。启动时先跟 NameServer 集群中的其中一台建立长连接,并从 NameServer 中获取当前发送的 Topic存在于哪些 Broker 上,轮询从队列列表中选择一个队列,然后与队列所在的 Broker建立长连接从而向 Broker发消息。

5.消费者接受消息

消费者接受消息。跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,然后开始消费消息。

领域模型概述

消息生产

生产者(producter):

Apache RocketMQ 中用于产生消息的运行实体,一般集成于业务调用链路的上游。生产者是轻量级匿名无身份的。

消息存储

主题(topic):

Apache RocketMQ 消息传输和存储的分组容器,主题内部由多个队列组成,消息的存储和水平扩展实际是通过主题内的队列实现的。

队列(MessageQueue):

Apache RocketMQ 消息传输和存储的实际单元容器,类比于其他消息队列中的分区。 Apache RocketMQ 通过流式特性的无限队列结构来存储消息,消息在队列内具备顺序性存储特征。

消息(Message):

Apache RocketMQ 的最小传输单元。消息具备不可变性,在初始化发送和完成存储后即不可变。

消息消费

消费者分组(ConsumerGroup):

Apache RocketMQ 发布订阅模型中定义的独立的消费身份分组,用于统一管理底层运行的多个消费者(Consumer)。同一个消费组的多个消费者必须保持消费逻辑和配置一致,共同分担该消费组订阅的消息,实现消费能力的水平扩展。

消费者(consumer):

Apache RocketMQ 消费消息的运行实体,一般集成在业务调用链路的下游。消费者必须被指定到某一个消费组中。

订阅关系(subscription):

Apache RocketMQ 发布订阅模型中消息过滤、重试、消费进度的规则配置。订阅关系以消费组粒度进行管理,消费组通过定义订阅关系控制指定消费组下的消费者如何实现消息过滤、消费重试及消费进度恢复等。

Apache RocketMQ 的订阅关系除过滤表达式之外都是持久化的,即服务端重启或请求断开,订阅关系依然保留。

异步通信

异步消息通信模式下,各子系统之间无需强耦合直接连接,调用方只需要将请求转化成异步事件(消息)发送给中间代理,发送成功即可认为该异步链路调用完成,剩下的工作中间代理会负责将事件可靠通知到下游的调用系统,确保任务执行完成。该中间代理一般就是消息中间件。

异步通信的优势如下:

  • 系统拓扑简单。由于调用方和被调用方统一和中间代理通信,系统是星型结构,易于维护和管理。

  • 上下游耦合性弱。上下游系统之间弱耦合,结构更灵活,由中间代理负责缓冲和异步恢复。 上下游系统间可以独立升级和变更,不会互相影响。

  • 容量削峰填谷。基于消息的中间代理往往具备很强的流量缓冲和整形能力,业务流量高峰到来时不会击垮下游。

发布订阅模型

发布订阅模型具有如下特点:

  • 消费独立:相比队列模型的匿名消费方式,发布订阅模型中消费方都会具备的身份,一般叫做订阅组(订阅关系),不同订阅组之间相互独立不会相互影响。
  • 一对多通信:基于独立身份的设计,同一个主题内的消息可以被多个订阅组处理,每个订阅组都可以拿到全量消息。因此发布订阅模型可以实现一对多通信。

领域模型

官网领域模型下的各各目录还是可以看看的,主要是使用建议还是值得的。

功能特性

消费者分类

Apache RocketMQ 提供了不同的消费者类型: PushConsumer 、SimpleConsumer 和 PullConsumer。

在实际使用场景中,PullConsumer 仅推荐在流处理框架中集成使用,大多数消息收发场景使用 PushConsumer 和 SimpleConsumer 就可以满足需求。

若您的业务场景发生变更,或您当前使用的消费者类型不适合当前业务,您可以选择在 PushConsumer 和SimpleConsumer 之间变更消费者类型。变更消费者类型不影响当前Apache RocketMQ 资源的使用和业务处理。

生产环境中相同的 ConsumerGroup 下严禁混用 PullConsumer 和其他两种消费者,否则会导致消息消费异常。

对比项 PushConsumer SimpleConsumer PullConsumer
接口方式 使用监听器回调接口返回消费结果,消费者仅允许在监听器范围内处理消费逻辑。 业务方自行实现消息处理,并主动调用接口返回消费结果。 业务方自行按队列拉取消息,并可选择性地提交消费结果
消费并发度管理 由SDK管理消费并发度。 由业务方消费逻辑自行管理消费线程。 由业务方消费逻辑自行管理消费线程。
负载均衡粒度 5.0 SDK是消息粒度,更均衡,早期版本是队列维度 消息粒度,更均衡 队列粒度,吞吐攒批性能更好,但容易不均衡
接口灵活度 高度封装,不够灵活。 原子接口,可灵活自定义。 原子接口,可灵活自定义。
适用场景 适用于无自定义流程的业务消息开发场景。 适用于需要高度自定义业务流程的业务开发场景。 仅推荐在流处理框架场景下集成使用

在rocketmq spring boot starter中,使用的是PushConsumer。

只有引入rocketmq-client-java才会有push、simple、pull的区别吧。

了解更多看官网内容吧。

普通消息

应用场景

典型场景一:微服务异步解耦

以在线的电商交易场景为例,上游订单系统将用户下单支付这一业务事件封装成独立的普通消息并发送至Apache RocketMQ服务端,下游按需从服务端订阅消息并按照本地消费逻辑处理下游任务。每个消息之间都是相互独立的,且不需要产生关联。

典型场景二:数据集成传输

以离线的日志收集场景为例,通过埋点组件收集前端应用的相关操作日志,并转发到 Apache RocketMQ 。每条消息都是一段日志数据,Apache RocketMQ 不做任何处理,只需要将日志数据可靠投递到下游的存储系统和分析系统即可,后续功能由后端应用完成。

spring boot 代码

上面最开始就是普通消息。

顺序消息

应用场景

典型场景一:撮合交易

以证券、股票交易撮合场景为例,对于出价相同的交易单,坚持按照先出价先交易的原则,下游处理订单的系统需要严格按照出价顺序来处理订单。

典型场景二:数据实时增量同步

以数据库变更增量同步场景为例,上游源端数据库按需执行增删改操作,将二进制操作日志作为消息,通过 Apache RocketMQ 传输到下游搜索系统,下游系统按顺序还原消息数据,实现状态数据按序刷新。如果是普通消息则可能会导致状态混乱,和预期操作结果不符,基于顺序消息可以实现下游状态和上游操作结果一致。

如何保证顺序性

Apache RocketMQ 顺序消息的顺序关系通过消息组(MessageGroup)判定和识别,发送顺序消息时需要为每条消息设置归属的消息组,相同消息组的多条消息之间遵循先进先出的顺序关系,不同消息组、无消息组的消息之间不涉及顺序性。

Apache RocketMQ 的消息的顺序性分为两部分,生产顺序性和消费顺序性。

生产顺序性

Apache RocketMQ 通过生产者和服务端的协议保障单个生产者串行地发送消息,并按序存储和持久化。

如需保证消息生产的顺序性,则必须满足以下条件:

  • 单一生产者:消息生产的顺序性仅支持单一生产者,不同生产者分布在不同的系统,即使设置相同的消息组,不同生产者之间产生的消息也无法判定其先后顺序。
  • 串行发送:Apache RocketMQ 生产者客户端支持多线程安全访问,但如果生产者使用多线程并行发送,则不同线程间产生的消息将无法判定其先后顺序。

满足以上条件的生产者,将顺序消息发送至 Apache RocketMQ 后,会保证设置了同一消息组的消息,按照发送顺序存储在同一队列中。服务端顺序存储逻辑如下:

  • 相同消息组的消息按照先后顺序被存储在同一个队列。
  • 不同消息组的消息可以混合在同一个队列中,且不保证连续。

消费顺序性

Apache RocketMQ 通过消费者和服务端的协议保障消息消费严格按照存储的先后顺序来处理。

如需保证消息消费的顺序性,则必须满足以下条件:

投递顺序:

Apache RocketMQ 通过客户端SDK和服务端通信协议保障消息按照服务端存储顺序投递,但业务方消费消息时需要严格按照接收—处理—应答的语义处理消息,避免因异步处理导致消息乱序。

消费者类型为PushConsumer时, Apache RocketMQ 保证消息按照存储顺序一条一条投递给消费者,若消费者类型为SimpleConsumer,则消费者有可能一次拉取多条消息。此时,消息消费的顺序性需要由业务方自行保证。

有限重试:

Apache RocketMQ 顺序消息投递仅在重试次数限定范围内,即一条消息如果一直重试失败,超过最大重试次数后将不再重试,跳过这条消息消费,不会一直阻塞后续消息处理。

对于需要严格保证消费顺序的场景,请务设置合理的重试次数,避免参数不合理导致消息乱序。

spring boot 代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class ProducerController {

@Resource
private RocketMQTemplate rocketMQTemplate;

@GetMapping("/sendMsg")
public void sendMsg() {
// 参数一:topic 如果想添加tag,可以使用"topic:tag"的写法
// 参数二:消息内容
// 参数三:hashKey 用来计算决定消息发送到哪个消息队列, 一般是订单ID,产品ID等
rocketMQTemplate.syncSendOrderly("TestTopic", "orderId1创建", "orderId1");
rocketMQTemplate.syncSendOrderly("TestTopic", "orderId2创建", "orderId2");
rocketMQTemplate.syncSendOrderly("TestTopic", "orderId1支付", "orderId1");
rocketMQTemplate.syncSendOrderly("TestTopic", "orderId1完成", "orderId1");
rocketMQTemplate.syncSendOrderly("TestTopic", "orderId2支付", "orderId2");
rocketMQTemplate.syncSendOrderly("TestTopic", "orderId2完成", "orderId2");
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

// consumeMode = ConsumeMode.ORDERLY 消息模式使用顺序
@Service
@RocketMQMessageListener(topic = "TestTopic", consumerGroup = "string_consumer", consumeMode = ConsumeMode.ORDERLY)
public class StringConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.printf("------- StringConsumer received: %s \n", message);
}
}

定时/延时消息

应用场景

典型场景一:分布式定时调度

在分布式定时调度场景下,需要实现各类精度的定时任务,例如每天5点执行文件清理,每隔2分钟触发一次消息推送等需求。传统基于数据库的定时调度方案在分布式场景下,性能不高,实现复杂。基于 Apache RocketMQ 的定时消息可以封装出多种类型的定时触发器。

典型场景二:任务超时处理

以电商交易场景为例,订单下单后暂未支付,此时不可以直接关闭订单,而是需要等待一段时间后才能关闭订单。使用 Apache RocketMQ 定时消息可以实现超时任务的检查触发。

基于定时消息的超时任务处理具备如下优势:

  • 精度高、开发门槛低:基于消息通知方式不存在定时阶梯间隔。可以轻松实现任意精度事件触发,无需业务去重。
  • 高性能可扩展:传统的数据库扫描方式较为复杂,需要频繁调用接口扫描,容易产生性能瓶颈。 Apache RocketMQ 的定时消息具有高并发和水平扩展的能力。

spring boot代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class ProducerController {

@Resource
private RocketMQTemplate rocketMQTemplate;

@GetMapping("/sendMsg")
public void sendMsg() {
rocketMQTemplate.syncSendDelayTimeMills("TestTopic", "延时消息内容",20_000);
Long deliverTimeStamp = System.currentTimeMillis() + 10 * 1000;
rocketMQTemplate.syncSendDeliverTimeMills("TestTopic", "定时消息内容", deliverTimeStamp);
}

}

事务消息

应用场景

基于Apache RocketMQ分布式事务消息:支持最终一致性

功能原理

什么是事务消息

事务消息是 Apache RocketMQ 提供的一种高级消息类型,支持在分布式场景下保障消息生产和本地事务的最终一致性。

事务消息处理流程

事务消息交互流程如下图所示。

  1. 生产者将消息发送至Apache RocketMQ服务端。
  2. Apache RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息被标记为”暂不能投递”,这种状态下的消息即为半事务消息。
  3. 生产者开始执行本地事务逻辑。
  4. 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:
    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为Rollback:服务端将回滚事务,不会将半事务消息投递给消费者。
  5. 在断网或者是生产者应用重启的特殊情况下,若服务端未收到发送者提交的二次确认结果,或服务端收到的二次确认结果为Unknown未知状态,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。 说明 服务端回查的间隔时间和最大回查次数。
  6. 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
  7. 生产者根据检查到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

spring boot代码

添加mybatis-plus代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.5.4</version>
</dependency>

<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-generator</artifactId>
<version>3.5.4</version>
</dependency>

<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
<version>2.3.31</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.32</version>
</dependency>

application.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
rocketmq.name-server=192.168.158.136:9876
rocketmq.producer.group=my-group2
rocketmq.producer.sendMessageTimeout=300000
# default producer tls config
rocketmq.producer.tls-enable=false

spring.datasource.type=com.zaxxer.hikari.HikariDataSource
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test?useSSL=true\
&useUnicode=true\
&characterEncoding=UTF-8\
&useServerPrepStmts=true\
&serverTimezone=Asia/Shanghai\
&zeroDateTimeBehavior=CONVERT_TO_NULL
spring.datasource.username=root
spring.datasource.password=root123456

这里改变了生产者的组和数据库参数。

1
2
3
4
5
6
7
8
9
10
@SpringBootApplication
@MapperScan("com.example.rocketmqtest.mapper")
@EnableTransactionManagement
public class RocketmqTestApplication {

public static void main(String[] args) {
SpringApplication.run(RocketmqTestApplication.class, args);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import java.io.Serializable;

/**
* <p>
*
* </p>
*
* @author hqc
* @since 2023-11-18
*/
public class User implements Serializable {

private static final long serialVersionUID = 1L;

/**
* 主键ID
*/
private Long id;

/**
* 姓名
*/
private String name;

/**
* 年龄
*/
private Integer age;

/**
* 邮箱
*/
private String email;

public Long getId() {
return id;
}

public void setId(Long id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public Integer getAge() {
return age;
}

public void setAge(Integer age) {
this.age = age;
}

public String getEmail() {
return email;
}

public void setEmail(String email) {
this.email = email;
}

@Override
public String toString() {
return "User{" +
"id = " + id +
", name = " + name +
", age = " + age +
", email = " + email +
"}";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import com.example.rocketmqtest.entity.User;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;

/**
* <p>
* Mapper 接口
* </p>
*
* @author hqc
* @since 2023-11-18
*/
public interface UserMapper extends BaseMapper<User> {

}
1
2
3
4
5
6
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.example.rocketmqtest.rocketmq-test.mapper.UserMapper">

</mapper>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import com.example.rocketmqtest.entity.User;
import com.baomidou.mybatisplus.extension.service.IService;
import org.springframework.transaction.annotation.Transactional;

/**
* <p>
* 服务类
* </p>
*
* @author hqc
* @since 2023-11-18
*/
public interface IUserService extends IService<User> {

@Transactional(rollbackFor = Exception.class)
void update1();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import com.example.rocketmqtest.entity.User;
import com.example.rocketmqtest.mapper.UserMapper;
import com.example.rocketmqtest.service.IUserService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;

/**
* <p>
* 服务实现类
* </p>
*
* @author hqc
* @since 2023-11-18
*/
@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {

@Resource
private RocketMQTemplate rocketMQTemplate;

@Transactional(rollbackFor = Exception.class)
@Override
public void update1() {
User user = this.getById(1);
user.setAge(100);
this.updateById(user);

Message msg = MessageBuilder.withPayload("事务消息").
setHeader(RocketMQHeaders.TRANSACTION_ID, user.getId()).build();
rocketMQTemplate.sendMessageInTransaction("TestTranctioinTopic", msg, "orderId1");
//int i = 1/0;
User user2 = this.getById(2);
user2.setAge(100);
this.updateById(user2);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import com.example.rocketmqtest.service.IUserService;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class ProducerController {

@Resource
private RocketMQTemplate rocketMQTemplate;

@Autowired
private IUserService iUserService;

@GetMapping("/sendMsg")
public void sendMsg() {
Message msg = MessageBuilder.withPayload("事务消息").
setHeader(RocketMQHeaders.TRANSACTION_ID, 1).build();
rocketMQTemplate.sendMessageInTransaction("TestTranctioinTopic", msg, "orderId1");
}

@GetMapping("/sendMsg2")
public void sendMsg2() {
iUserService.update1();
}

}

事物消息生产者监听器:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import com.example.rocketmqtest.entity.User;
import com.example.rocketmqtest.service.IUserService;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionListener;
import org.apache.rocketmq.spring.core.RocketMQLocalTransactionState;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

@Service
@RocketMQTransactionListener
public class TranscationProducterListener implements RocketMQLocalTransactionListener {

@Autowired
private IUserService iUserService;

@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
User user = iUserService.getById(transId);
System.out.println(transId);
if (100 == user.getAge()) {
// 本地事务执行成功
return RocketMQLocalTransactionState.COMMIT;
} else {
// 本地事务执行失败
return RocketMQLocalTransactionState.ROLLBACK;
}
// 本地事务执行异常
//return RocketMQLocalTransactionState.UNKNOWN;
}

@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transId = (String) msg.getHeaders().get(RocketMQHeaders.TRANSACTION_ID);
User user = iUserService.getById(transId);
System.out.println(transId);
if (100 == user.getAge()) {
// 本地事务执行成功
return RocketMQLocalTransactionState.COMMIT;
} else {
// 本地事务执行失败
return RocketMQLocalTransactionState.ROLLBACK;
}
// 本地事务执行异常
//return RocketMQLocalTransactionState.UNKNOWN;
}

}

这个类才会消息状态,半事务消息转为提交待消费。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;

/**
* StringTransactionalConsumer
*/
@Service
@RocketMQMessageListener(topic = "TestTranctioinTopic", consumerGroup = "string_trans_consumer")
public class StringTransactionalConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String message) {
System.out.printf("------- StringTransactionalConsumer received: %s \n", message);
}
}

通过注释和取消注释来验证消息发送

消息发送方式

4.0的文档中生产者->普通消息发送

Apache RocketMQ可用于以三种方式发送消息:同步、异步和单向传输。前两种消息类型是可靠的,因为无论它们是否成功发送都有响应。

同步发送

同步发送是最常用的方式,是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式,可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。

同步发送的整个代码流程如下:

  1. 首先会创建一个producer。普通消息可以创建 DefaultMQProducer,创建时需要填写生产组的名称,生产者组是指同一类Producer的集合,这类Producer发送同一类消息且发送逻辑一致。
  2. 设置 NameServer 的地址。Apache RocketMQ很多方式设置NameServer地址(客户端配置中有介绍),这里是在代码中调用producer的API setNamesrvAddr进行设置,如果有多个NameServer,中间以分号隔开,比如”127.0.0.2:9876;127.0.0.3:9876”。
  3. 第三步是构建消息。指定topic、tag、body等信息,tag可以理解成标签,对消息进行再归类,RocketMQ可以在消费端对tag进行过滤。
  4. 最后调用send接口将消息发送出去。同步发送等待结果最后返回SendResult,SendResut包含实际发送状态还包括SEND_OK(发送成功), FLUSH_DISK_TIMEOUT(刷盘超时), FLUSH_SLAVE_TIMEOUT(同步到备超时), SLAVE_NOT_AVAILABLE(备不可用),如果发送失败会抛出异常。

同步发送方式请务必捕获发送异常,并做业务侧失败兜底逻辑,如果忽略异常则可能会导致消息未成功发送的情况。

异步发送

异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。

异步发送需要实现异步发送回调接口(SendCallback)。

消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息,发送方通过回调接口接收服务端响应,并处理响应结果。异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景。例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

异步发送与同步发送代码唯一区别在于调用send接口的参数不同,异步发送不会等待发送返回,取而代之的是send方法需要传入 SendCallback 的实现,SendCallback 接口主要有onSuccess 和 onException 两个方法,表示消息发送成功和消息发送失败。

单向模式发送

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。

单向模式调用sendOneway,不会对返回结果有任何等待和处理。

消息发送重试及消息流控

消息发送重试

Apache RocketMQ 客户端连接服务端发起消息发送请求时,可能会因为网络故障、服务异常等原因导致调用失败。为保证消息的可靠性, Apache RocketMQ 在客户端SDK中内置请求重试逻辑,尝试通过重试发送达到最终调用成功的效果。

同步发送和异步发送模式均支持消息发送重试。

重试触发条件

触发消息发送重试机制的条件如下:

  • 客户端消息发送请求调用失败或请求超时
  • 网络异常造成连接失败或请求超时。
  • 服务端节点处于重启或下线等状态造成连接失败。
  • 服务端运行慢造成请求超时。
  • 服务端返回失败错误码
    • 系统逻辑错误:因运行逻辑不正确造成的错误。
    • 系统流控错误:因容量超限造成的流控错误。

对于事务消息,只会进行透明重试(transparent retries),网络超时或异常等场景不会进行重试。

重试流程

生产者在初始化时设置消息发送最大重试次数,当出现上述触发条件的场景时,生产者客户端会按照设置的重试次数一直重试发送消息,直到消息发送成功或达到最大重试次数重试结束,并在最后一次重试失败后返回调用错误响应。

  • 同步发送:调用线程会一直阻塞,直到某次重试成功或最终重试失败,抛出错误码和异常。
  • 异步发送:调用线程不会阻塞,但调用结果会通过异常事件或者成功事件返回。
重试间隔
  • 除服务端返回系统流控错误场景,其他触发条件触发重试后,均会立即进行重试,无等待间隔。
  • 若由于服务端返回流控错误触发重试,系统会按照指数退避策略进行延迟重试。指数退避算法通过以下参数控制重试行为:
    • INITIAL_BACKOFF: 第一次失败重试前后需等待多久,默认值:1秒。
    • MULTIPLIER :指数退避因子,即退避倍率,默认值:1.6。
    • JITTER :随机抖动因子,默认值:0.2。
    • MAX_BACKOFF :等待间隔时间上限,默认值:120秒
    • MIN_CONNECT_TIMEOUT :最短重试间隔,默认值:20秒。
功能约束
  • 链路耗时阻塞评估:从上述重试机制可以看出,在重试流程中生产者仅能控制最大重试次数。若由于系统异常触发了SDK内置的重试逻辑,则服务端需要等待最终重试结果,可能会导致消息发送请求链路被阻塞。对于某些实时调用类场景,您需要合理评估每次调用请求的超时时间以及最大重试次数,避免影响全链路的耗时。
  • 最终异常兜底: Apache RocketMQ 客户端内置的发送请求重试机制并不能保证消息发送一定成功。当最终重试仍然失败时,业务方调用需要捕获异常,并做好冗余保护处理,避免消息发送结果不一致。
  • 消息重复问题:因远程调用的不确定性,当Apache RocketMQ客户端因请求超时触发消息发送重试流程,此时客户端无法感知服务端的处理结果,客户端进行的消息发送重试可能会产生消息重复问题,业务逻辑需要自行处理消息重复问题。****

消息流控机制

消息流控指的是系统容量或水位过高, Apache RocketMQ 服务端会通过快速失败返回流控错误来避免底层资源承受过高压力。

触发条件

Apache RocketMQ 的消息流控触发条件如下:

  • 存储压力大:参考消费进度管理的原理机制,消费者分组的初始消费位点为当前队列的最大消费位点。若某些场景例如业务上新等需要回溯到指定时刻前开始消费,此时队列的存储压力会瞬间飙升,触发消息流控。
  • 服务端请求任务排队溢出:若消费者消费能力不足,导致队列中有大量堆积消息,当堆积消息超过一定数量后会触发消息流控,减少下游消费系统压力。
流控行为

当系统触发消息发送流控时,客户端会收到系统限流错误和异常,错误码信息如下:

  • reply-code:530
  • reply-text:TOO_MANY_REQUESTS

客户端收到系统流控错误码后,会根据指数退避策略进行消息发送重试。

处理建议
  • 如何避免触发消息流控:触发限流的根本原因是系统容量或水位过高,您可以利用可观测性功能监控系统水位容量等,保证底层资源充足,避免触发流控机制。
  • 突发消息流控处理:如果因为突发原因触发消息流控,且客户端内置的重试流程执行失败,则建议业务方将请求调用临时替换到其他系统进行应急处理。

消息过滤

Apache RocketMQ 支持Tag标签过滤和SQL属性过滤,这两种过滤方式对比如下:

对比项 Tag标签过滤 SQL属性过滤
过滤目标 消息的Tag标签。 消息的属性,包括用户自定义属性以及系统属性(Tag是一种系统属性)。
过滤能力 精准匹配。 SQL语法匹配。
适用场景 简单过滤场景、计算逻辑简单轻量。 复杂过滤场景、计算逻辑较复杂。
1
rocketMQTemplate.syncSend() 参数一destination – formats: `topicName:tags`

订阅关系一致性

过滤表达式属于订阅关系的一部分,Apache RocketMQ 的领域模型规定,同一消费者分组内的多个消费者的订阅关系包括过滤表达式,必须保持一致,否则可能会导致部分消息消费不到。

消息者负载均衡

  • 消费组间广播消费 :如上图所示,每个消费者分组只初始化唯一一个消费者,每个消费者可消费到消费者分组内所有的消息,各消费者分组都订阅相同的消息,以此实现单客户端级别的广播一对多推送效果。

    该方式一般可用于网关推送、配置推送等场景。

  • 消费组内共享消费 :如上图所示,每个消费者分组下初始化了多个消费者,这些消费者共同分担消费者分组内的所有消息,实现消费者分组内流量的水平拆分和均衡负载。

    该方式一般可用于微服务解耦场景。

1
2
3
4
5
@RocketMQMessageListener 注解中 messageModel
public enum MessageModel {
BROADCASTING("BROADCASTING"),
CLUSTERING("CLUSTERING");
}

内容还是看官网吧。

消费进度管理

  • 消费者启动后从哪里开始消费消息?
  • 消费者每次消费成功后如何标记消息状态,确保下次不会再重复处理该消息?
  • 某消息被指定消费者消费过一次后,如果业务出现异常需要做故障恢复,该消息能否被重新消费?

消费重试

消费者类型 重试过程状态机 重试间隔 最大重试次数
PushConsumer 已就绪 处理中 待重试 提交 * 死信 消费者分组创建时元数据控制。 无序消息:阶梯间隔 顺序消息:固定间隔时间 消费者分组创建时的元数据控制。
SimpleConsumer 已就绪 处理中 提交 死信 通过API修改获取消息时的不可见时间。 消费者分组创建时的元数据控制。

更多看官网。

消息存储和清理机制

Apache RocketMQ 使用存储时长作为消息存储的依据,即每个节点对外承诺消息的存储时长。在存储时长范围内的消息都会被保留,无论消息是否被消费;超过时长限制的消息则会被清理掉。

在 Apache RocketMQ中,消息保存时长并不能完整控制消息的实际保存时间,因为消息存储仍然使用本地磁盘,本地磁盘空间不足时,为保证服务稳定性消息仍然会被强制清理,导致消息的实际保存时长小于设置的保存时长。

更多看官网。

RocketMQ Promethus Exporter

Rocketmq-exporter 是用于监控 RocketMQ broker 端和客户端所有相关指标的系统,通过 mqAdmin 从 broker 端获取指标值后封装成 87 个 cache。

就是可以提供监控的exporter。

Metrics

  • 服务端 Metrics 指标
  • 生产者 Metrics 指标
  • 消费者 Metrics 指标

权限控制

看官网吧


rocketmq的使用
http://hanqichuan.com/2023/11/16/mq/rocketmq的使用/
作者
韩启川
发布于
2023年11月16日
许可协议