【RabbitMQ】AMQP协议、生产者和消费者的代码编写
AMQP
AMQP
(Advanced Message Queuing Protocol
)是一种高级消息队列协议
- 定义了一套确定的消息交换功能,包括交换机(
Exchange
),队列(Queue
)等 - 这些组件共同工作,使得生产者能够将消息发送到交换器,然后由队列接收并等待消费者接收
AMQP
还定义了一个网络协议,允许客户端应用通过该协议与消息代理和AMQP
模型进行交互通信
RabbitMQ
是遵从 AMQP
协议的,换句话说,RabbitMQ
就是 AMQP
协议的 Erlang
的视线(当然 RabbitMQ
还支持 STOMP2
,MQTT2
等协议)
AMQP
的模型结构和RabbitMQ
的模型结构是一样的
1 . 引入依赖
在程序中加入相关依赖
代码语言:javascript代码运行次数:0运行复制<!-- .rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.25.0</version>
</dependency>
2 . 生产者代码编写
生产消息,并把消息投放到队列中
1. 建立连接
建立连接,需要建立一个连接工厂,然后把信息都放进去,需要连接的时候,直接就从工厂里面拿就可以了
建立连接需要的信息
- IP
- 端口号
- 账号
- 密码
// 1. 建立连接,创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672); // 需要提前开放端口号
connectionFactory.setUsername("study"); // 账号
connectionFactory.setPassword("study"); // 密码
connectionFactory.setVirtualHost("coding"); // 虚拟主机
// 工厂建立好了之后,从里面拿出一个连接
Connection connection = connectionFactory.newConnection();
2. 开启信道
在建立好连接之后,就需要在连接里面创建一个信道,供消息传输
代码语言:javascript代码运行次数:0运行复制// 2. 开启信道
Channel channel = connection.createChannel();
3. 声明交换机
可以直接使用内置的交换机,就不需要进行操作
4. 声明队列
声明队列需要用到一个 queueDeclare
方法
// 4. 声明队列
/**
* 声明队列使用的方法的参数
* queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5)
* 参数说明:
* var1(queue): 队列名称
* var2(durable): 是否持久化
* var3(exclusive): 是否独占
* var4(autoDelete): 是否自动删除
* var5(arguments): 参数
*/
channel.queueDeclare("hello", true, false, false, null);
- 如果没有一个叫
hello
的队列,就会创建;有则不创建 durable
:是否持久化true
:设置队列为持久化,持久化的队列会存盘,服务器重启之后,消息不会丢失
exclusive
:是否独占,只能有一个消费者监听队列- 当
Connection
关闭时,是否删除队列
- 当
autoDelete
:是否自动删除,当没有Consumer
的时候,自动删除掉
5. 发送消息
发送消息的时候,会用到 basicPublish()
方法
// 5. 发送消息
/**
* basicPublish(String var1, String var2, AMQP.BasicProperties var3, byte[] var4) * 参数说明:
* var1(exchange): 交换机名称
* var2(routingKey): 内置交换机,routingKey和队列名称保持一致(发送到名字一样的队列去)
* var3(props): 配置
* var4(body): 消息
*/
for (int i = 0; i < 10; i++) {
String msg = "hello rabbitmq" + i;
channel.basicPublish("", "hello", null, msg.getBytes());
}
System.out.println("消息发送成功");
- 此处使用的内置交换机,使用内置交换机时,
routingKey
要和队列名称一样,才可以路由到对应的队列上去
6. 释放资源
显示地关闭 Channel
是一个好习惯,但这不是必须的,Connection
关闭的时候,Channel
也会自动关闭
// 6. 资源释放
channel.close();
connection.close();
- 两个释放的顺序不能改变
- 先释放信道,再释放连接
此时我们运行程序,就能看到 RabbitMQ
界面客户端里面:
- 一条信息已经准备好了
3 . 消费者代码编写
- 从队列中获取消息
1. 建立连接
消费者接收消息,我们也要先建立连接。此处建立连接的思路和生成者的一样
- 先根据相关信息创建连接工厂
- 然后从连接工厂中拿连接
// 1. 创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("coding");
connectionFactory.setUsername("study");
connectionFactory.setPassword("study");
// 取出连接
Connection connection = connectionFactory.newConnection();
2. 创建信道
创建信道,供 Brocker
和消费者之间进行通信
// 2. 创建 Channel
Channel channel = connection.createChannel();
3. 声明队列
这里用到的方法,和创建生产者中用到的方法一样—— queueDeclare()
// 3. 声明队列
channel.queueDeclare("hello", true, false, false, null);
hello
:队列的名字true
:持久化,队列会存盘,服务器重启之后,消息不会丢失false
:不是独占,不是只有一个消费者监听队列false
:不是自动删除,当没有Consumer
的时候,也不会自动删除掉null
:没有其他参数
4. 消费消息
这里会用到 basicConsume()
方法
// 4. 消费消息
/**
* basicConsume(String var1, boolean var2, Consumer var3) * 参数说明:
* var1(queue): 队列名称
* var2(autoAck): 是否自动确认
* var3(callback): 接收到消息后,执行的逻辑
*/
channel.basicConsume("hello", true, consumer);
Consumer
Consumer
用于定义消息消费者的行为。当我们需要从 RabbitMQ
接收消息的时候,需要提供一个实现了 Consumer
接口的对象
DefaultConsumer consumer = new DefaultConsumer(channel){
// 从队列中收到消息,就会执行的方法
@Override
/**
* handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) * 参数说明:
* consumerTag: 消费者标签,通常是消费者在订阅队列时指定的
* envelope: 包含消息的封包信息,如队列名称,交换机等
* properties: 一些配置信息
* body: 消息的具体内容
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//TODO
System.out.println("接收到消息: " + new String(body));
}
};
DefaultConsumer
是RabbitMQ
提供的一个默认消费者,实现了Consumer
接口- 从队列接收到消息的时候,就会自动调用核心方法——
handleDelivery()
- 在这个方法中,我们可以定义如何处理接收到的消息,例如打印消息内容,处理业务逻辑或者将消息存储到数据库等
5. 释放资源
代码语言:javascript代码运行次数:0运行复制// 5. 释放资源
channel.close();
connection.close();
在释放资源之前可以让程序休眠两秒钟,等待回调函数执行完之后再释放,就可以看到打印出来的完整信息了
源代码
生产者
代码语言:javascript代码运行次数:0运行复制package rabbitmq;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ProducerDemo {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 建立连接,创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672); // 需要提前开放端口号
connectionFactory.setUsername("study"); // 账号
connectionFactory.setPassword("study"); // 密码
connectionFactory.setVirtualHost("coding"); // 虚拟主机
// 工厂建立好了之后,从里面拿出一个连接
Connection connection = connectionFactory.newConnection();
// 2. 开启信道
Channel channel = connection.createChannel();
// 3. 声明交换机 使用内置的交换机
// 4. 声明队列
/**
* 声明队列使用的方法的参数
* queueDeclare(String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5)
* 参数说明:
* var1(queue): 队列名称
* var2(durable): 可持久化
* var3(exclusive): 是否独占
* var4(autoDelete): 是否自动删除
* var5(arguments): 参数
*/
channel.queueDeclare("hello", true, false, false, null);
// 交换机要将信息传给对应的队列,所以他们之间存在绑定关系
// 这个绑定关系我们先省略掉,因为我们使用的是内置交换机,
// 5. 发送消息
/**
* basicPublish(String var1, String var2, AMQP.BasicProperties var3, byte[] var4) * 参数说明:
* var1(exchange): 交换机名称
* var2(routingKey): 内置交换机,routingKey和队列名称保持一致(发送到名字一样的队列去)
* var3(props): 配置
* var4(body): 消息
*/
for (int i = 0; i < 10; i++) {
String msg = "hello rabbitmq" + i;
channel.basicPublish("", "hello", null, msg.getBytes());
}
System.out.println("消息发送成功");
// 6. 资源释放
//channel.close();
//connection.close();
}
}
消费者
代码语言:javascript代码运行次数:0运行复制package rabbitmq;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerDemo {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// 1. 创建连接
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("coding");
connectionFactory.setUsername("study");
connectionFactory.setPassword("study");
Connection connection = connectionFactory.newConnection();
// 2. 创建 Channel Channel channel = connection.createChannel();
// 3. 声明队列
channel.queueDeclare("hello", true, false, false, null);
// 4. 消费消息
/**
* basicConsume(String var1, boolean var2, Consumer var3) * 参数说明:
* var1(queue): 队列名称
* var2(autoAck): 是否自动确认
* var3(callback): 接收到消息后,执行的逻辑
*/
DefaultConsumer consumer = new DefaultConsumer(channel){
// 从队列中收到消息,就会执行的方法
@Override
/**
* handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) * 参数说明:
* consumerTag: 消费者标签,通常是消费者在订阅队列时指定的
* envelope: 包含消息的封包信息,如队列名称,交换机等
* properties: 一些配置信息
* body: 消息的具体内容
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//TODO
System.out.println("接收到消息: " + new String(body));
}
};
channel.basicConsume("hello", true, consumer);
// 等待程序执行完成
Thread.sleep(2000);
// 5. 释放资源
channel.close();
connection.close();
}
}
本文参与 腾讯云自媒体同步曝光计划,分享自作者个人站点/博客。 原始发表:2025-04-16,如有侵权请联系 cloudcommunity@tencent 删除连接协议rabbitmqamqp队列
发布评论