同步刷盘与异步刷盘
跟mysql的同步刷盘和异步刷盘一样。
先放到内存中,是发送一条消息就刷到磁盘中,还是几条消息才刷到磁盘中。
同步复制与异步复制
同mysql的同步复制与异步复制。
当出现主备主从架构后,主同步数据至备或者从。同步就是一条消息发送到主后,再同步到备或者从,都同步后才返回消息提交成功。
异步是只发送到主,就返回提交成功,异步传输到备或者从。
RAID10磁盘
RAID 10(也称为RAID 1+0)是一种磁盘阵列配置,它结合了RAID 1和RAID 0的特性。RAID 1提供了数据冗余,通过在两个或更多的磁盘上镜像存储数据,而RAID 0提供了性能增益,通过将数据分成块并在多个磁盘上并行读写。RAID 10通过将这两种方法结合起来,旨在提供更好的性能和更高的可靠性。
在RAID 10中,至少需要四个磁盘驱动器。数据被分成块,并且每个块都被写入两个磁盘中,这两个磁盘构成一个镜像对。然后,这些镜像对被组合成一个RAID 0阵列,以提供性能增益。因此,RAID 10同时提供了数据冗余和良好的读/写性能。
主要优点包括:
- 高性能: RAID 10提供了比许多其他RAID级别更好的读/写性能,因为它可以利用RAID 0的并行性。
- 高可靠性: 由于数据被镜像到多个磁盘上,RAID 10提供了较高的可靠性。即使一个磁盘出现故障,仍然有其他镜像磁盘上的数据可用。
- 容忍多个磁盘故障: 在某些情况下,RAID 10可以容忍多个磁盘的故障,具体取决于这些故障是如何发生的。
然而,与之相关的缺点包括:
- 成本: RAID 10至少需要四个磁盘,并且由于提供冗余的磁盘,相对于其他RAID级别,成本较高。
- 存储效率: 由于镜像,RAID 10的存储效率较低,只有总容量的一半可以用于存储数据。
总体而言,RAID 10是一种适用于需要高性能和数据冗余的应用场景的磁盘配置。
什么是proxy
RocketMQ Proxy是一个RocketMQ Broker的代理服务,支持客户端用GRPC协议访问Broker。
RocketMQ Proxy主要解决了4.9.X版本客户端多语言客户端(c/c++, golang, csharp,rust,python, nodejs)实现Remoing协议难度大、复杂、功能不一致、维护工作大的问题。
RocketMQ Proxy使用业界熟悉的GRPC协议, 各个语言代码统一、简单,使得多语言使用RocketMQ更方便、容易。
Proxy 承担了协议适配、权限管理、消息管理等计算功能,Broker 则更加专注于存储。这样存储和计算相分离,在云原生环境下可以更好地进行资源调度。
模式部署方式介绍
https://rocketmq.apache.org/zh/docs/deploymentOperations/01deploy
Apache RocketMQ 5.0 版本完成基本消息收发,包括 NameServer、Broker、Proxy 组件。 在 5.0 版本中 Proxy 和 Broker 根据实际诉求可以分为 Local 模式和 Cluster 模式,一般情况下如果没有特殊需求,或者遵循从早期版本平滑升级的思路,可以选用Local模式。
- 在 Local 模式下,Broker 和 Proxy 是同进程部署,只是在原有 Broker 的配置基础上新增 Proxy 的简易配置就可以运行。
- 在 Cluster 模式下,Broker 和 Proxy 分别部署,即在原有的集群基础上,额外再部署 Proxy 即可。
在 Cluster 模式下,Broker 与 Proxy分别部署,我可以在 NameServer和 Broker都启动完成之后再部署 Proxy。
在 Cluster模式下,一个 Proxy集群和 Broker集群为一一对应的关系,可以在 Proxy的配置文件 rmq-proxy.json
中使用 rocketMQClusterName
进行配置
nameServer
NameServer需要先于Broker启动,且如果在生产环境使用,为了保证高可用,建议一般规模的集群启动3个NameServer,各节点的启动命令相同。
nameServer不管是Local还是Cluster都是一样的。
broker和proxy在Local和Cluster的区别
主要是broker和proxy是否在一起布署。
多组节点(集群)单副本模式 (多主)
一个集群内全部部署 Master 角色,不部署Slave 副本,例如2个Master或者3个Master,这种模式的优缺点如下:
- 优点:配置简单,单个Master宕机或重启维护对应用无影响,在磁盘配置为RAID10时,即使机器宕机不可恢复情况下,由于RAID10磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高;
- 缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
在单个NameServer情况下使用的。对于多个NameServer的集群,Broker启动命令中-n
后面的地址列表用分号隔开即可,例如 192.168.1.1:9876;192.161.2:9876
。
多节点(集群)多副本模式-异步复制(多主多从异步复制)
每个Master配置一个Slave,有多组 Master-Slave,HA采用异步复制方式,主备有短暂消息延迟(毫秒级),这种模式的优缺点如下:
- 优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,同时Master宕机后,消费者仍然可以从Slave消费,而且此过程对应用透明,不需要人工干预,性能同多Master模式几乎一样;
- 缺点:Master宕机,磁盘损坏情况下会丢失少量消息。
多节点(集群)多副本模式-同步双写(多主多从同步复制)
每个Master配置一个Slave,有多对 Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功,这种模式的优缺点如下:
- 优点:数据与服务都无单点故障,Master宕机情况下,消息无延迟,服务可用性与数据可用性都非常高;
- 缺点:性能比异步复制模式略低(大约低10%左右),发送单个消息的RT会略高,且目前版本在主节点宕机后,备机不能自动切换为主机。
主备自动切换模式部署
https://rocketmq.apache.org/zh/docs/deploymentOperations/03autofailover
主要增加支持自动主从切换的 Controller 组件,其可以独立部署也可以内嵌在 NameServer 中。
Controller 组件提供选主能力,若需要保证 Controller 具备容错能力,Controller 部署需要三副本及以上(遵循 Raft 的多数派协议)。
Controller 部署有两种方式。一种是嵌入于 NameServer 进行部署,可以通过配置 enableControllerInNamesrv 打开(可以选择性打开,并不强制要求每一台 NameServer 都打开),在该模式下,NameServer 本身能力仍然是无状态的,也就是内嵌模式下若 NameServer 挂掉多数派,只影响切换能力,不影响原来路由获取等功能。另一种是独立部署,需要单独部署 Controller 组件。
Cluster模式多主多从同步复制部署
机器规划
ip |
配置 |
部署内容 |
192.168.158.139 |
内存3G,硬盘24G |
namerServer、broker -a master、proxy、controller |
192.168.158.140 |
内存3G,硬盘24G |
namerServer、broker-b master、proxy、controller |
192.168.158.141 |
内存3G,硬盘24G |
namerServer、broker-a slave、proxy、controller |
192.168.158.142 |
内存1.5G硬盘24G |
Broker-b slave |
环境变量
vim /etc/profile
1 2 3 4 5 6 7
| export JAVA_HOME=/usr/java/jdk1.8.0-aarch64 export JRE_HOME=${JAVA_HOME}/jre export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib export PATH=${JAVA_HOME}/bin:$PATH export M2_HOME=/usr/local/apache-maven-3.9.5 export PATH=$PATH:$M2_HOME/bin export ROCKETMQ_HOME=/usr/local/rocketmq-all-5.1.4-bin-release
|
source /etc/profile
nameServer
1 2 3 4 5 6
| nohup sh mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log The Name Server boot success...
|
把9876端口打开。
broker
1 2
| 在runbroker.sh文件中 -XX:+UseG1GC之前加上-XX:+UnlockExperimentalVMOptions 把堆内存改小,原来8g改为1g。
|
1 2 3 4 5 6 7 8 9 10 11
| nohup sh bin/mqbroker -n "192.168.158.139:9876;192.168.158.140:9876;192.168.158.141:9876" -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties &
nohup sh bin/mqbroker -n "192.168.158.139:9876;192.168.158.140:9876;192.168.158.141:9876" -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties &
nohup sh bin/mqbroker -n "192.168.158.139:9876;192.168.158.140:9876;192.168.158.141:9876" -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties &
nohup sh bin/mqbroker -n "192.168.158.139:9876;192.168.158.140:9876;192.168.158.141:9876" -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties &
|
把10909、10911、10912端口打开
proxy
1 2 3 4 5 6 7 8
| nohup sh bin/mqproxy -n "192.168.158.139:9876;192.168.158.140:9876;192.168.158.141:9876" &
nohup sh bin/mqproxy -n "192.168.158.139:9876;192.168.158.140:9876;192.168.158.141:9876" &
nohup sh bin/mqproxy -n "192.168.158.139:9876;192.168.158.140:9876;192.168.158.141:9876" &
|
把8080、8081端口打开
控制台
下载Dashboard。
修改配置文件application.properties:
1
| rocketmq.config.namesrvAddr=192.168.158.139:9876;192.168.158.140:9876;192.168.158.141:9876
|
好像也不支持proxy。
java程序
使用官网中快速开始的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientConfigurationBuilder; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.message.Message; import org.apache.rocketmq.client.apis.producer.Producer; import org.apache.rocketmq.client.apis.producer.SendReceipt; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
public class ProducerExample { private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);
public static void main(String[] args) throws ClientException { String endpoint = "192.168.158.139:8081;192.168.158.140:8081;192.168.158.141:8081"; String topic = "TestTopic"; ClientServiceProvider provider = ClientServiceProvider.loadService(); ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint); ClientConfiguration configuration = builder.build(); Producer producer = provider.newProducerBuilder() .setTopics(topic) .setClientConfiguration(configuration) .build(); Message message = provider.newMessageBuilder() .setTopic(topic) .setKeys("messageKey") .setTag("messageTag") .setBody("messageBody".getBytes()) .build(); try { SendReceipt sendReceipt = producer.send(message); logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId()); } catch (ClientException e) { logger.error("Failed to send message", e); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| import java.io.IOException; import java.util.Collections; import org.apache.rocketmq.client.apis.ClientConfiguration; import org.apache.rocketmq.client.apis.ClientException; import org.apache.rocketmq.client.apis.ClientServiceProvider; import org.apache.rocketmq.client.apis.consumer.ConsumeResult; import org.apache.rocketmq.client.apis.consumer.FilterExpression; import org.apache.rocketmq.client.apis.consumer.FilterExpressionType; import org.apache.rocketmq.client.apis.consumer.PushConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
public class PushConsumerExample { private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);
private PushConsumerExample() { }
public static void main(String[] args) throws ClientException, IOException, InterruptedException { final ClientServiceProvider provider = ClientServiceProvider.loadService(); String endpoints = "192.168.158.139:8081;192.168.158.140:8081;192.168.158.141:8081"; ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder() .setEndpoints(endpoints) .build(); String tag = "*"; FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG); String consumerGroup = "YourConsumerGroup"; String topic = "TestTopic"; PushConsumer pushConsumer = provider.newPushConsumerBuilder() .setClientConfiguration(clientConfiguration) .setConsumerGroup(consumerGroup) .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression)) .setMessageListener(messageView -> { logger.info("Consume message successfully, messageId={}", messageView.getMessageId()); return ConsumeResult.SUCCESS; }) .build(); Thread.sleep(Long.MAX_VALUE); } }
|
spring boot 好像不太支持proxy模式,写nameServer的地址是可以的。
controller主备自动切换模式
使用内嵌部署方式部署controller。
先停止nameServer和broker和proxy。
namerServer
添加namerServer的配置文件namesrv.conf :
139:
1 2 3 4 5 6 7
| enableControllerInNamesrv = true controllerDLegerGroup = group1 controllerDLegerPeers = n0-192.168.158.139:9877;n1-192.168.158.140:9878;n2-192.168.158.141:9879 controllerDLegerSelfId = n0 controllerStorePath = /root/DledgerController enableElectUncleanMaster = false notifyBrokerRoleChanged = true
|
140:
1 2 3 4 5 6 7
| enableControllerInNamesrv = true controllerDLegerGroup = group1 controllerDLegerPeers = n0-192.168.158.139:9877;n1-192.168.158.140:9878;n2-192.168.158.141:9879 controllerDLegerSelfId = n1 controllerStorePath = /root/DledgerController enableElectUncleanMaster = false notifyBrokerRoleChanged = true
|
141:
1 2 3 4 5 6 7
| enableControllerInNamesrv = true controllerDLegerGroup = group1 controllerDLegerPeers = n0-192.168.158.139:9877;n1-192.168.158.140:9878;n2-192.168.158.141:9879 controllerDLegerSelfId = n2 controllerStorePath = /root/DledgerController enableElectUncleanMaster = false notifyBrokerRoleChanged = true
|
分别开放9877、9878、9879端口。
启动namerServer
1
| nohup sh bin/mqnamesrv -c conf/namesrv.conf &
|
broker
在配置文件中添加
1 2
| enableControllerMode = true controllerAddr = 192.168.158.139:9877;192.168.158.140:9878;192.168.158.141:9879
|
更多参数查看官网。
启动broker
跟上面启动一样。
proxy
跟上面启动一样。
测试
在139上停止broker:
1
| sh bin/mqshutdown broker
|
在控制台上发现141变为了master。
在139上启动broker,在控制台上发现139变为了slave。