直接上生产者代码
package com.demo.RabbitMQ;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
//消息队列hello world
//生产者、消费者都要声明路由器---如果声明了队列,可以不声明路由器。
public class ProducerMQ {
//定义队列名字
private final static String QUEUE_NAME = "hello";
@SuppressWarnings("ProducerMQ")
public static void main(String[] args) throws Exception {
//创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置链接地址
connectionFactory.setHost( "127.0.0.1" );
//创建一个连接
Connection connection = connectionFactory.newConnection();
//通过连接创建一个通信管道
Channel channel = connection.createChannel();
Map <String, Object> map = new HashMap();
map.put( "AWK", "98K" );
// 参数一:队列名称,参数二:是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数
channel.queueDeclare( QUEUE_NAME, false, false, false, null );
//发送的消息
String message = "Hello World";
//转换成byte传出去
map.put( "AWK", "AK47" );
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode( 2 ) // 传送方式
.contentEncoding( "UTF-8" ) // 编码方式
.expiration( "10000" ) // 过期时间
.headers( map ) //自定义属性
.build();
//basicPublish 参数一:交换机名称;参数二:队列名称,参数三:消息的其他属性-路由的headers信息;参数四:消息主体
channel.basicPublish( "", QUEUE_NAME, properties, message.getBytes() );
//关闭通道
channel.close();
//关闭连接
connection.close();
}
}
消费者(消息接受者)
package com.demo.RabbitMQ;
import com.rabbitmq.client.*;
import java.io.IOException;
//接收者
//生产者、消费者都要声明路由器---如果声明了队列,可以不声明路由器。
public class ConsumerMQ {
//接受队列得名字
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost( "127.0.0.1" );
//创建通信连接
Connection connection = connectionFactory.newConnection();
//建立通道
Channel chanel = connection.createChannel();
// 声明队列【参数说明:参数一:队列名称,参数二:
// 是否持久化;参数三:是否独占模式;参数四:消费者断开连接时是否删除队列;参数五:消息其他参数】
chanel.queueDeclare( QUEUE_NAME, true, false, false, null );
// 第一种获取消息的方式 持续消息获取使用:basic.consume;单个消息获取使用:basic.get
Consumer consumer = new DefaultConsumer( chanel ) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String( body );
//接收到的路由的headers等
System.out.println( "头部" + properties.getHeaders() );
System.out.println( "接收到的消息" + message );
}
};
// queue 所订阅的队列 autoAck 是否开启自动应答,默认是开启的,如果需要手动应答应该设置为false
// callback接收到消息之后执行的回调方法
chanel.basicConsume( QUEUE_NAME, true, consumer );
// 第二种消息获取 单个消息获取采用GetResponse
// @Param队列名称 Boolean autoAck 是否自动确认
// while (true) {
// GetResponse getResponse = chanel.basicGet( QUEUE_NAME, false );
// String message = new String( getResponse.getBody(), "UTF-8" );
// System.out.println( message );
// UInt64 deliveryTag, 结果是否为多条数据
// chanel.basicAck( getResponse.getEnvelope().getDeliveryTag(), true );
// }
//
}
}
在RabbitMQ管理台手动添加队列的时候选择持久化选择 Durable持久那么消费者端也必须设置持久否则会报如下错误
image.png
接受的时候会报错。
image.png
需设置消费者是否持久化 为true
image.png