教程
进入官网https://www.rabbitmq.com/
点击get started
点击 RabbitMQ Tutorials
里面有7种教程。
网上有很多资料都说几种模式,好多图都是从这里来的。
hello word
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import java.io.IOException; import java.util.concurrent.TimeoutException;
public class Common {
public static Connection getConnection() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); return connection; }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
public class Send { private final static String QUEUE_NAME = "hello";
public static void main(String[] args) { try (Connection connection = Common.getConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, true, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| import com.rabbitmq.client.*;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = Common.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); }
}
|
工作队列
消息⽣产能⼒⼤于消费能⼒,增加多⼏个消费节点
和简单队列类似,增加多个⼏个消费节点,处于竞争关 系
默认:多个消费者循环调度。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
public class Send1 {
private final static String QUEUE_NAME = "work_mq_rr";
public static void main(String[] args) { try (Connection connection = Common.getConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, true, false, false, null); for(int i=0;i<10;i++){ String message = "Hello World!"+i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DeliverCallback;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException;
public class Recv1_1 {
private final static String QUEUE_NAME = "work_mq_rr";
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = Common.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new RuntimeException(e); } String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(Thread.currentThread().getId() + " [x] Received '" + message + "'"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DeliverCallback;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException;
public class Recv1_2 {
private final static String QUEUE_NAME = "work_mq_rr";
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = Common.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null);
DeliverCallback deliverCallback = (consumerTag, delivery) -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new RuntimeException(e); } String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(Thread.currentThread().getId() + " [x] Received '" + message + "'"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); }
}
|
先启动两个消费者,再启动⽣产者。
观察发现sleep(1) 的很早接收完5条,sleep(3)的也是接收5条,说明不看处理效率。
消费快的消费者多消费
在消费者添加:
这时再启动两个消费者,再启动生产者,这时发现sleep(1)收到的消息多,sleep(3)的收到的少。
发布与订阅
发布-订阅模型中,消息⽣产者不再是直接⾯对 queue(队列名称),⽽是直⾯exchange,都需要经过 exchange来进⾏消息的发送, 所有发往同⼀个fanout交 换机的消息都会被所有监听这个交换机的消费者接收到。
发布订阅-消息模型引⼊fanout交换机。
1
| String queueName = channel.queueDeclare().getQueue();
|
这个获取的队列是独占的(exclusive:true), 自动删除(autodelete:true)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
public class Send3 {
private final static String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] args) { try (Connection connection = Common.getConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String message = "Hello World pub !"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } }
}
|
消费1与消费2一样
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DeliverCallback;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException;
public class Recv3_1 {
private final static String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = Common.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME,"");
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { }); }
}
|
启动两个消费者,再启动生产者,发现两个消费者都能收到同样的消息。
路由
根据routingkey路由到不同的队列。
交换机类型是Direct。
消息⽣产者发送消息给交换机,需要指定routingKey,交换机根据消息的路由key,转发给对应的队列。
例⼦:⽇志采集系统 ELK
⼀个队列收集error信息-》告警
⼀个队列收集全部信息-》⽇常使⽤
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
public class Send4 { private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) { try (Connection connection = Common.getConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String error = "我是错误⽇志"; String info = "我是info⽇志"; String debug = "我是debug⽇志"; channel.basicPublish(EXCHANGE_NAME, "errorRoutingKey", null, error.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, "infoRoutingKey", null, info.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, "debugRoutingKey", null, debug.getBytes(StandardCharsets.UTF_8)); System.out.println("消息发送成功"); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DeliverCallback;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
public class Recv4_1 {
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = Common.getConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANGE_NAME,"errorRoutingKey"); channel.queueBind(queueName,EXCHANGE_NAME,"infoRoutingKey"); channel.queueBind(queueName,EXCHANGE_NAME,"debugRoutingKey");
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { }); }
}
|
主题
topic交换机⽀持通配符匹配模式
交换机是 topic, 可以实现发布订阅模式fanout和路由模 式Direct 的功能,更加灵活,⽀持模式匹配,通配符等
交换机同过通配符进⾏转发到对应的队列,* 代表⼀个 词,#代表1个或多个词,⼀般⽤#作为通配符居多,⽐ 如 #.order, 会匹配 info.order 、sys.error.order, ⽽ *.order ,只会匹配 info.order, 之间是使⽤. 点进⾏分 割多个词的; 如果是 ., 则info.order、error.order都 会匹配。
例⼦:⽇志采集系统
⼀个队列收集订单系统的error⽇志信息,order.log.error
⼀个队列收集全部系统的全部⽇志信息, #.log
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
public class Send5 { private final static String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] args) { try (Connection connection = Common.getConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String error = "我是订单错误⽇志"; String info = "我是订单info⽇志"; String debug = "我是商品debug⽇志"; channel.basicPublish(EXCHANGE_NAME, "order.log.error", null, error.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, "order.log.info", null, info.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, "product.log.debug", null, debug.getBytes(StandardCharsets.UTF_8)); System.out.println("消息发送成功"); } catch (IOException e) { throw new RuntimeException(e); } catch (TimeoutException e) { throw new RuntimeException(e); } }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DeliverCallback;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
public class Recv5_1 {
private final static String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = Common.getConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,EXCHANGE_NAME,"order.log.error");
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; channel.basicConsume(queueName, false, deliverCallback, consumerTag -> { }); }
}
|
待续