跟着rabbit官网教程练习

教程

进入官网https://www.rabbitmq.com/

点击get started

点击 RabbitMQ Tutorials

里面有7种教程。

网上有很多资料都说几种模式,好多图都是从这里来的。

hello word

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Common {

public static Connection getConnection() throws IOException, TimeoutException {
//1.创建连接工厂
//MQ采用工厂模式来完成连接的创建
ConnectionFactory factory = new ConnectionFactory();
//2.在工厂对象中设置连接信息(ip,port,virtualhost,username,password)
//设置MQ安装的服务器ip地址
factory.setHost("127.0.0.1");
//设置端口号
factory.setPort(5672);
//设置虚拟主机名称
factory.setVirtualHost("/");
//MQ通过用户来管理
//设置用户名称
//设置用户密码
factory.setUsername("admin");
factory.setPassword("123456");
//3.通过工厂对象获取连接
Connection connection = factory.newConnection();
return connection;
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Send {
private final static String QUEUE_NAME = "hello";

public static void main(String[] args) {
//获取连接
try (Connection connection = Common.getConnection();
//创建Channel
Channel channel = connection.createChannel()) {
//通道绑定对应消息队列
//参数1: 队列名称 如果队列不存在自动创建
//参数2: 用来定义队列特性是否要持久化 true 持久化队列 false 不持久化
//参数3: exclusive 是否独占队列,表示声明的当前队列只允许当前的连接所使用 true 独占队列 false 不独占
//参数4: autoDelete: 是否在消费完成后自动删除队列 true 自动删除 false 不自动删除
//参数5: 额外附加参数
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello World!";
//basicPublish将消息发送到指定的交换机
//发布消息
// 参数1:交换机名称
// 参数2:队列名称
// 参数3:传递息额外设置(MessageProperties.PERSISTENT_TEXT_PLAIN 设置消息持久化)
// 参数4:消息的具体内容
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Recv {

private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = Common.getConnection();
//创建Channel
Channel channel = connection.createChannel();
//通道绑定队列:与生产端一致
channel.queueDeclare(QUEUE_NAME, true, false, false, null);


// Consumer consumer = new DefaultConsumer(channel){
// @Override
// public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// String msg = new String(body);
// System.out.println(msg);
// }
// };
// channel.basicConsume(QUEUE_NAME, true, consumer);

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}

}

工作队列

消息⽣产能⼒⼤于消费能⼒,增加多⼏个消费节点

和简单队列类似,增加多个⼏个消费节点,处于竞争关 系

默认:多个消费者循环调度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Send1 {

private final static String QUEUE_NAME = "work_mq_rr";

public static void main(String[] args) {
try (Connection connection = Common.getConnection();
Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
//发送 10个 模拟生产者生产能力强
for(int i=0;i<10;i++){
String message = "Hello World!"+i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Recv1_1 {

private final static String QUEUE_NAME = "work_mq_rr";

public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = Common.getConnection();
//创建Channel
Channel channel = connection.createChannel();
//通道绑定队列:与生产端一致
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
// 模拟消费者消费慢
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(Thread.currentThread().getId() + " [x] Received '" + message + "'");
//⼿⼯确认消息消费,不是多条确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// 关闭⾃动确认消息
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Recv1_2 {

private final static String QUEUE_NAME = "work_mq_rr";

public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = Common.getConnection();
//创建Channel
Channel channel = connection.createChannel();
//通道绑定队列:与生产端一致
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
try {
// 模拟消费者消费慢
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(Thread.currentThread().getId() + " [x] Received '" + message + "'");
//⼿⼯确认消息消费,不是多条确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// 关闭⾃动确认消息
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}

}

先启动两个消费者,再启动⽣产者。

观察发现sleep(1) 的很早接收完5条,sleep(3)的也是接收5条,说明不看处理效率。

消费快的消费者多消费

在消费者添加:

1
channel.basicQos(1);

这时再启动两个消费者,再启动生产者,这时发现sleep(1)收到的消息多,sleep(3)的收到的少。

发布与订阅

发布-订阅模型中,消息⽣产者不再是直接⾯对 queue(队列名称),⽽是直⾯exchange,都需要经过 exchange来进⾏消息的发送, 所有发往同⼀个fanout交 换机的消息都会被所有监听这个交换机的消费者接收到。

发布订阅-消息模型引⼊fanout交换机。

1
String queueName = channel.queueDeclare().getQueue();

这个获取的队列是独占的(exclusive:true), 自动删除(autodelete:true)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Send3 {

private final static String EXCHANGE_NAME = "exchange_fanout";

public static void main(String[] args) {
try (Connection connection = Common.getConnection();
Channel channel = connection.createChannel()) {
//绑定交换机,fanout扇形,即⼴播类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String message = "Hello World pub !";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}

}

消费1与消费2一样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class Recv3_1 {

private final static String EXCHANGE_NAME = "exchange_fanout";

public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = Common.getConnection();
//创建Channel
Channel channel = connection.createChannel();
//绑定交换机,fanout扇形,即⼴播类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
//获取队列(排它队列)
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机,fanout交换机不⽤指定routingkey
channel.queueBind(queueName, EXCHANGE_NAME,"");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
//⼿⼯确认消息消费,不是多条确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// 关闭⾃动确认消息
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
}

}

启动两个消费者,再启动生产者,发现两个消费者都能收到同样的消息。

路由

根据routingkey路由到不同的队列。

交换机类型是Direct。

消息⽣产者发送消息给交换机,需要指定routingKey,交换机根据消息的路由key,转发给对应的队列。

例⼦:⽇志采集系统 ELK

⼀个队列收集error信息-》告警

⼀个队列收集全部信息-》⽇常使⽤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Send4 {
private final static String EXCHANGE_NAME = "exchange_direct";

public static void main(String[] args) {
try (Connection connection = Common.getConnection();
Channel channel = connection.createChannel()) {
//绑定交换机,直连交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String error = "我是错误⽇志";
String info = "我是info⽇志";
String debug = "我是debug⽇志";
channel.basicPublish(EXCHANGE_NAME, "errorRoutingKey", null, error.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "infoRoutingKey", null, info.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "debugRoutingKey", null, debug.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送成功");
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Recv4_1 {

private final static String EXCHANGE_NAME = "exchange_direct";

public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = Common.getConnection();
//创建Channel
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

//获取队列(排它队列)
String queueName = channel.queueDeclare().getQueue();
//绑定队列和交换机,另外⼀个节点只绑定⼀个errorRoutingKey
channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey");
channel.queueBind(queueName,EXCHANGE_NAME,"infoRoutingKey");
channel.queueBind(queueName,EXCHANGE_NAME,"debugRoutingKey");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
//⼿⼯确认消息消费,不是多条确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// 关闭⾃动确认消息
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
}

}

主题

topic交换机⽀持通配符匹配模式

交换机是 topic, 可以实现发布订阅模式fanout和路由模 式Direct 的功能,更加灵活,⽀持模式匹配,通配符等

交换机同过通配符进⾏转发到对应的队列,* 代表⼀个 词,#代表1个或多个词,⼀般⽤#作为通配符居多,⽐ 如 #.order, 会匹配 info.order 、sys.error.order, ⽽ *.order ,只会匹配 info.order, 之间是使⽤. 点进⾏分 割多个词的; 如果是 ., 则info.order、error.order都 会匹配。

例⼦:⽇志采集系统

⼀个队列收集订单系统的error⽇志信息,order.log.error

⼀个队列收集全部系统的全部⽇志信息, #.log

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Send5 {
private final static String EXCHANGE_NAME = "exchange_topic";

public static void main(String[] args) {
try (Connection connection = Common.getConnection();
Channel channel = connection.createChannel()) {
//绑定交换机,直连交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String error = "我是订单错误⽇志";
String info = "我是订单info⽇志";
String debug = "我是商品debug⽇志";
channel.basicPublish(EXCHANGE_NAME, "order.log.error", null, error.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "order.log.info", null, info.getBytes(StandardCharsets.UTF_8));
channel.basicPublish(EXCHANGE_NAME, "product.log.debug", null, debug.getBytes(StandardCharsets.UTF_8));
System.out.println("消息发送成功");
} catch (IOException e) {
throw new RuntimeException(e);
} catch (TimeoutException e) {
throw new RuntimeException(e);
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

public class Recv5_1 {

private final static String EXCHANGE_NAME = "exchange_topic";

public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = Common.getConnection();
//创建Channel
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

//获取队列(排它队列)
String queueName = channel.queueDeclare().getQueue();

//绑定队列和交换机,节点一配置
channel.queueBind(queueName,EXCHANGE_NAME,"order.log.error");
// 节点二配置
// channel.queueBind(queueName,EXCHANGE_NAME,"*.log.*");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println(" [x] Received '" + message + "'");
//⼿⼯确认消息消费,不是多条确认
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
// 关闭⾃动确认消息
channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { });
}

}

待续


跟着rabbit官网教程练习
http://hanqichuan.com/2023/06/15/mq/跟着rabbit官网教程练习/
作者
韩启川
发布于
2023年6月15日
许可协议