rocketmq的cluster布署

同步刷盘与异步刷盘

跟mysql的同步刷盘和异步刷盘一样。

先放到内存中,是发送一条消息就刷到磁盘中,还是几条消息才刷到磁盘中。

同步复制与异步复制

同mysql的同步复制与异步复制。

当出现主备主从架构后,主同步数据至备或者从。同步就是一条消息发送到主后,再同步到备或者从,都同步后才返回消息提交成功。

异步是只发送到主,就返回提交成功,异步传输到备或者从。

RAID10磁盘

RAID 10(也称为RAID 1+0)是一种磁盘阵列配置,它结合了RAID 1和RAID 0的特性。RAID 1提供了数据冗余,通过在两个或更多的磁盘上镜像存储数据,而RAID 0提供了性能增益,通过将数据分成块并在多个磁盘上并行读写。RAID 10通过将这两种方法结合起来,旨在提供更好的性能和更高的可靠性。

在RAID 10中,至少需要四个磁盘驱动器。数据被分成块,并且每个块都被写入两个磁盘中,这两个磁盘构成一个镜像对。然后,这些镜像对被组合成一个RAID 0阵列,以提供性能增益。因此,RAID 10同时提供了数据冗余和良好的读/写性能。

主要优点包括:

  1. 高性能: RAID 10提供了比许多其他RAID级别更好的读/写性能,因为它可以利用RAID 0的并行性。
  2. 高可靠性: 由于数据被镜像到多个磁盘上,RAID 10提供了较高的可靠性。即使一个磁盘出现故障,仍然有其他镜像磁盘上的数据可用。
  3. 容忍多个磁盘故障: 在某些情况下,RAID 10可以容忍多个磁盘的故障,具体取决于这些故障是如何发生的。

然而,与之相关的缺点包括:

  1. 成本: RAID 10至少需要四个磁盘,并且由于提供冗余的磁盘,相对于其他RAID级别,成本较高。
  2. 存储效率: 由于镜像,RAID 10的存储效率较低,只有总容量的一半可以用于存储数据。

总体而言,RAID 10是一种适用于需要高性能和数据冗余的应用场景的磁盘配置。

什么是proxy

RocketMQ Proxy是一个RocketMQ Broker的代理服务,支持客户端用GRPC协议访问Broker。

RocketMQ Proxy主要解决了4.9.X版本客户端多语言客户端(c/c++, golang, csharp,rust,python, nodejs)实现Remoing协议难度大、复杂、功能不一致、维护工作大的问题。

RocketMQ Proxy使用业界熟悉的GRPC协议, 各个语言代码统一、简单,使得多语言使用RocketMQ更方便、容易。

Proxy 承担了协议适配、权限管理、消息管理等计算功能,Broker 则更加专注于存储。这样存储和计算相分离,在云原生环境下可以更好地进行资源调度。

模式部署方式介绍

https://rocketmq.apache.org/zh/docs/deploymentOperations/01deploy

Apache RocketMQ 5.0 版本完成基本消息收发,包括 NameServer、Broker、Proxy 组件。 在 5.0 版本中 Proxy 和 Broker 根据实际诉求可以分为 Local 模式和 Cluster 模式,一般情况下如果没有特殊需求,或者遵循从早期版本平滑升级的思路,可以选用Local模式。

  • 在 Local 模式下,Broker 和 Proxy 是同进程部署,只是在原有 Broker 的配置基础上新增 Proxy 的简易配置就可以运行。
  • 在 Cluster 模式下,Broker 和 Proxy 分别部署,即在原有的集群基础上,额外再部署 Proxy 即可。

在 Cluster 模式下,Broker 与 Proxy分别部署,我可以在 NameServer和 Broker都启动完成之后再部署 Proxy。

在 Cluster模式下,一个 Proxy集群和 Broker集群为一一对应的关系,可以在 Proxy的配置文件 rmq-proxy.json 中使用 rocketMQClusterName 进行配置

nameServer

NameServer需要先于Broker启动,且如果在生产环境使用,为了保证高可用,建议一般规模的集群启动3个NameServer,各节点的启动命令相同。

nameServer不管是Local还是Cluster都是一样的。

broker和proxy在Local和Cluster的区别

主要是broker和proxy是否在一起布署。

多组节点(集群)单副本模式 (多主)

一个集群内全部部署 Master 角色,不部署Slave 副本,例如2个Master或者3个Master,这种模式的优缺点如下:

  • 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
  • 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。

在单个NameServer情况下使用的。对于多个NameServer的集群,Broker启动命令中-n后面的地址列表用分号隔开即可,例如 192.168.1.1:9876;192.161.2:9876

多节点(集群)多副本模式-异步复制(多主多从异步复制)

每个Master配置一个Slave,有多组 Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:

  • 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
  • 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。

多节点(集群)多副本模式-同步双写(多主多从同步复制)

每个Master配置一个Slave,有多对 Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:

  • 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
  • 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。

主备自动切换模式部署

https://rocketmq.apache.org/zh/docs/deploymentOperations/03autofailover

主要增加支持自动主从切换的 Controller 组件,其可以独立部署也可以内嵌在 NameServer 中。

Controller 组件提供选主能力,若需要保证 Controller 具备容错能力,Controller 部署需要三副本及以上(遵循 Raft 的多数派协议)。

Controller 部署有两种方式。一种是嵌入于 NameServer 进行部署,可以通过配置 enableControllerInNamesrv 打开(可以选择性打开,并不强制要求每一台 NameServer 都打开),在该模式下,NameServer 本身能力仍然是无状态的,也就是内嵌模式下若 NameServer 挂掉多数派,只影响切换能力,不影响原来路由获取等功能。另一种是独立部署,需要单独部署 Controller 组件。

Cluster模式多主多从同步复制部署

机器规划

ip 配置 部署内容
192.168.158.139 内存3G,硬盘24G namerServer、broker -a master、proxy、controller
192.168.158.140 内存3G,硬盘24G namerServer、broker-b master、proxy、controller
192.168.158.141 内存3G,硬盘24G namerServer、broker-a slave、proxy、controller
192.168.158.142 内存1.5G硬盘24G Broker-b slave

环境变量

vim /etc/profile

1
2
3
4
5
6
7
export JAVA_HOME=/usr/java/jdk1.8.0-aarch64
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
export PATH=${JAVA_HOME}/bin:$PATH
export M2_HOME=/usr/local/apache-maven-3.9.5
export PATH=$PATH:$M2_HOME/bin
export ROCKETMQ_HOME=/usr/local/rocketmq-all-5.1.4-bin-release

source /etc/profile

nameServer

1
2
3
4
5
6
### 首先启动Name Server
nohup sh mqnamesrv &

### 验证Name Server 是否启动成功
tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...

把9876端口打开。

broker

1
2
在runbroker.sh文件中 -XX:+UseG1GC之前加上-XX:+UnlockExperimentalVMOptions
把堆内存改小,原来8g改为1g。
1
2
3
4
5
6
7
8
9
10
11
### 在机器A,启动第一个Master,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqbroker -n "192.168.158.139:9876;192.168.158.140:9876;192.168.158.141:9876" -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &

### 在机器B,启动第二个Master,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqbroker -n "192.168.158.139:9876;192.168.158.140:9876;192.168.158.141:9876" -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &

### 在机器C,启动第一个Slave,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqbroker -n "192.168.158.139:9876;192.168.158.140:9876;192.168.158.141:9876" -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &

### 在机器D,启动第二个Slave,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqbroker -n "192.168.158.139:9876;192.168.158.140:9876;192.168.158.141:9876" -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &

把10909、10911、10912端口打开

proxy

1
2
3
4
5
6
7
8
### 在机器A,启动第一个Proxy,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqproxy -n "192.168.158.139:9876;192.168.158.140:9876;192.168.158.141:9876" &

### 在机器B,启动第二个Proxy,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqproxy -n "192.168.158.139:9876;192.168.158.140:9876;192.168.158.141:9876" &

### 在机器C,启动第三个Proxy,例如NameServer的IP为:192.168.1.1
nohup sh bin/mqproxy -n "192.168.158.139:9876;192.168.158.140:9876;192.168.158.141:9876" &

把8080、8081端口打开

控制台

下载Dashboard。

修改配置文件application.properties:

1
rocketmq.config.namesrvAddr=192.168.158.139:9876;192.168.158.140:9876;192.168.158.141:9876

好像也不支持proxy。

java程序

使用官网中快速开始的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.message.Message;
import org.apache.rocketmq.client.apis.producer.Producer;
import org.apache.rocketmq.client.apis.producer.SendReceipt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerExample {
private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);

public static void main(String[] args) throws ClientException {
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
String endpoint = "192.168.158.139:8081;192.168.158.140:8081;192.168.158.141:8081";
// 消息发送的目标Topic名称,需要提前创建。
String topic = "TestTopic";
ClientServiceProvider provider = ClientServiceProvider.loadService();
ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
ClientConfiguration configuration = builder.build();
// 初始化Producer时需要设置通信配置以及预绑定的Topic。
Producer producer = provider.newProducerBuilder()
.setTopics(topic)
.setClientConfiguration(configuration)
.build();
// 普通消息发送。
Message message = provider.newMessageBuilder()
.setTopic(topic)
// 设置消息索引键,可根据关键字精确查找某条消息。
.setKeys("messageKey")
// 设置消息Tag,用于消费端根据指定Tag过滤消息。
.setTag("messageTag")
// 消息体。
.setBody("messageBody".getBytes())
.build();
try {
// 发送消息,需要关注发送结果,并捕获失败等异常。
SendReceipt sendReceipt = producer.send(message);
logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
} catch (ClientException e) {
logger.error("Failed to send message", e);
}
// producer.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import java.io.IOException;
import java.util.Collections;
import org.apache.rocketmq.client.apis.ClientConfiguration;
import org.apache.rocketmq.client.apis.ClientException;
import org.apache.rocketmq.client.apis.ClientServiceProvider;
import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
import org.apache.rocketmq.client.apis.consumer.FilterExpression;
import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
import org.apache.rocketmq.client.apis.consumer.PushConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PushConsumerExample {
private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);

private PushConsumerExample() {
}

public static void main(String[] args) throws ClientException, IOException, InterruptedException {
final ClientServiceProvider provider = ClientServiceProvider.loadService();
// 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
String endpoints = "192.168.158.139:8081;192.168.158.140:8081;192.168.158.141:8081";
ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
.setEndpoints(endpoints)
.build();
// 订阅消息的过滤规则,表示订阅所有Tag的消息。
String tag = "*";
FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
// 为消费者指定所属的消费者分组,Group需要提前创建。
String consumerGroup = "YourConsumerGroup";
// 指定需要订阅哪个目标Topic,Topic需要提前创建。
String topic = "TestTopic";
// 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
// 设置消费者分组。
.setConsumerGroup(consumerGroup)
// 设置预绑定的订阅关系。
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
// 设置消费监听器。
.setMessageListener(messageView -> {
// 处理消息并返回消费结果。
logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
// 如果不需要再使用 PushConsumer,可关闭该实例。
// pushConsumer.close();
}
}

spring boot 好像不太支持proxy模式,写nameServer的地址是可以的。

controller主备自动切换模式

使用内嵌部署方式部署controller。

先停止nameServer和broker和proxy。

namerServer

添加namerServer的配置文件namesrv.conf :

139:

1
2
3
4
5
6
7
enableControllerInNamesrv = true
controllerDLegerGroup = group1
controllerDLegerPeers = n0-192.168.158.139:9877;n1-192.168.158.140:9878;n2-192.168.158.141:9879
controllerDLegerSelfId = n0
controllerStorePath = /root/DledgerController
enableElectUncleanMaster = false
notifyBrokerRoleChanged = true

140:

1
2
3
4
5
6
7
enableControllerInNamesrv = true
controllerDLegerGroup = group1
controllerDLegerPeers = n0-192.168.158.139:9877;n1-192.168.158.140:9878;n2-192.168.158.141:9879
controllerDLegerSelfId = n1
controllerStorePath = /root/DledgerController
enableElectUncleanMaster = false
notifyBrokerRoleChanged = true

141:

1
2
3
4
5
6
7
enableControllerInNamesrv = true
controllerDLegerGroup = group1
controllerDLegerPeers = n0-192.168.158.139:9877;n1-192.168.158.140:9878;n2-192.168.158.141:9879
controllerDLegerSelfId = n2
controllerStorePath = /root/DledgerController
enableElectUncleanMaster = false
notifyBrokerRoleChanged = true

分别开放9877、9878、9879端口。

启动namerServer

1
nohup sh bin/mqnamesrv -c conf/namesrv.conf &

broker

在配置文件中添加

1
2
enableControllerMode = true
controllerAddr = 192.168.158.139:9877;192.168.158.140:9878;192.168.158.141:9879

更多参数查看官网。

启动broker

跟上面启动一样。

proxy

跟上面启动一样。

测试

在139上停止broker:

1
sh bin/mqshutdown broker

在控制台上发现141变为了master。

在139上启动broker,在控制台上发现139变为了slave。


rocketmq的cluster布署
http://hanqichuan.com/2023/11/18/mq/rocketmq的cluster布署/
作者
韩启川
发布于
2023年11月18日
许可协议