RabbitMQ笔记

MQ(Message Queue)消息队列
通过典型的生产者和消费者模式,生产者不断向消息队列生产消息,消费者不断从队列中获取消息。生产和消费的过程都是异步的,实现系统间的解耦。

RabbitMQ
使用erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点的发布和订阅)、可靠性、安全。
AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求其次。

RabbitMQ安装

拉取镜像

  docker pull rabbitmq:3.7.7-management

启动容器并设置用户密码

      docker run -d --name rabbitmq --hostname rabbitmq -p 5672:5672 -p 15672:15672 -v /home/rabbitmq/data:/var/lib/rabbitmq -e RABBITMQ_DEFAULT_VHOST=myvhost  -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123 rabbitmq:3.7.7-management
-d 后台运行容器;
--name 指定容器名;
--hostname 为启动后的rabbitmq指定名称,集群时启动多个rabbitmq需要分别指定hostname,以此区分
-p 指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号);
-v 映射目录或文件;
-e 指定环境变量;
RABBITMQ_DEFAULT_VHOST:默认虚拟主机名(一个hostname下有可以有多个 virtual host);
RABBITMQ_DEFAULT_USER:默认的用户名;
RABBITMQ_DEFAULT_PASS:默认用户名的密码

通过浏览器访问web管理界面
http://****:15672

虚拟主机

虚拟主机概念是RabbitMQ的核心,在用户未自定义虚拟主机前已经内置有虚拟主机,在使用RabbitMQ中,可以进行自定义配置虚拟主机.一个虚拟主机中可以含有多个队列信息。

虚拟主机最大的好处在于可以根据不同的用户分配不同的操作空间。

使用

首先,创建虚拟主机


创建虚拟主机

创建用户,并将用户绑定到虚拟主机上


创建用户

绑定虚拟主机

Java整合RabbitMQ

# IDEA创建一个maven项目

# 在pom.xml导入依赖
    <!--引入rabbitmq的相关依赖-->
    <dependency>
      <groupId>com.rabbitmq</groupId>
      <artifactId>amqp-client</artifactId>
      <version>4.0.3</version>
    </dependency>

Hello World 模型

Hello World 模型
//消息生产者
public class Provider {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        //创建连接mq的连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置主机ip
        connectionFactory.setHost("120.***");
        //设置端口
        connectionFactory.setPort(5672);
        //设置虚拟主机
        connectionFactory.setVirtualHost("/ems");
        //设置虚拟主机的账户密码
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("123");
        //获取连接对象
        Connection connection = connectionFactory.newConnection();
        //获取连接通道
        Channel channel = connection.createChannel();
        //通道绑定对应消息队列
        //参数1:队列名称,不存在是自动创建
      //2:队列是否持久化,true为  队列持久化  ,但并不对消息持久化,重启后消息会丢失,
        // false时队列消息都消失
        //3:是否独占队列,true独占
        //4:消费完成后是否自动删除队列,true为自动删除队列。
        //并且消费者也需要设置为true,在消费者消费完关闭通道后就会自动删除
        //5:附加参数
        channel.queueDeclare("hello", false, false, false, null);
        //发布消息
        //参数1:交换机名称;2:队列名称;
        // 3:传递消息额外设置;设置为null时,消息不能持久化。
        // MessageProperties.PERSISTENT_TEXT_PLAIN可实现消息持久化
        // 4:消息的具体内容
        channel.basicPublish("","hello",null,"hello zhangsan...".getBytes());
        channel.close();
        connection.close();
    }
}
//消费者消费消息
public class Customer {
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("120.*");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/ems");
        connectionFactory.setUsername("ems");
        connectionFactory.setPassword("123");
        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare("hello", false, false, false, null);
        //消费消息
        //参数1:消息队列名称;2:开始消息的自动确认机制,3:消费时的毁掉接口
        channel.basicConsume("hello",true,new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, 
AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("new String(body) = " + new String(body));
            }
        });
//此时最好不要关闭channel和connection,才能保证消费者始终监听消费队列中的情况。
        //channel.close();
        //connection.close();
    }
}

Work Queue模型(平均分配消息)

当消息处理比较耗时的时候,生产消息的速度远远大于消费的速度。长此以往,消息会慢慢堆积越来越多,无法及时处理。

此时可以采用work queue模型,让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费就会消失,不会被重复执行。


work queue模型

测试结果是


消费者1

消费者2

小结
默认情况下,RabbitMQ将按顺序对每个消息发送给下一个使用中,每个消费者会收到相同数量的消息,这种分发消息的方式称为循环。

消息自动确认机制
RabbitMQ在分配消息给消费者时,无论消费者不论是否完成消息处理,都会自动确认消息:如下autoAck:true。因此消息会堆积在通道中。

将autoAck:false时 关闭 自动确认消息,处理完消息才会向队列确认消息。并设置通道中一次只能有一个消息,channel.basicQos(1);
这样就能实现消费者之间能者多劳。

由于关闭了自动确认消息,所以需要手动确认。见如下代码channel.basicAck(envelope.getDeliveryTag(),false);
参数1:手动确认消息,参数2:false每次确认一个

否则,会出现消息未确认的情况:

消费者代码改进,实现消费者能者多劳。

public class Customer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = Utils.getConnection();
        //获取连接通道
        Channel channel = connection.createChannel();
        //通道中一次只能有一个消息。
        channel.basicQos(1);
        channel.queueDeclare("work", true, false, false, null);
        //消费消息
        //参数1:消息队列名称;
        // 2:消息的自动确认机制,true时自动确认消息,不论消息是否已经处理完成。消费者都会向队列自动确认,消息可以堆积在通道中
        //  false时  关闭  自动确认消息,处理完消息才会向队列确认消息,。
        // 3:消费时的毁掉接口
        channel.basicConsume("work", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1 = " + new String(body));
                //由于关闭了自动确认消息,所以需要手动确认
                //参数1:手动确认消息,参数2:false每次确认一个
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        });
    }

fanout 广播模型

生产者发送消息,只能发送到交换机,交换机把消息发送给绑定过的所有的队列,实现一条消息被多个消费者使用。

#广播模式的生产者
public class Provider {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = Utils.getConnection();
        Channel channel = connection.createChannel();
        //将通道绑定交换机,
        //参数1:交换机名称
        //参数2:交换机类型,fanout为广播模型
        channel.exchangeDeclare("logs","fanout");
        //发布消息
        channel.basicPublish("logs","",null,"fanout test".getBytes());
        Utils.closeConnectionAndChannel(channel,connection);
    }
}
# 广播模型消费者1
public class Customer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = Utils.getConnection();
        Channel channel = connection.createChannel();
        //创建临时队列
        String queue = channel.queueDeclare().getQueue();
        //临时队列绑定交换机
        channel.queueBind(queue,"logs","");
        channel.basicConsume(queue,true,new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者1 = " + new String(body));
            }
        })
    }
}

Routing路由模型

Direct 直连

在fanout模式下,一条消息会被所有订阅的队列都消费。
但是在某些场景下,我们希望不同的消息被不同的队列消费,这时就要用到Direct类型的exchange。

Direct类型下:

  • 队列和交换机不再是任意绑定。需要指定一个队列RoutingKey。
  • 消息的生产者向Exchange发送消息时,也必须指定消息的RoutingKey。
  • Exchange根据消息的 RoutingKey与队列RoutingKey进行判断,两者一致时队列才会收到消息。

图解过程:

  • P生产者:向Exchange发送消息,并指定一个消息RoutingKey
  • X交换机:接收生产者的消息,然后把消息传递给 RoutingKey匹配的队列。
  • C1消费者:其所在队列指定了只需要RoutingKey为error的消息。
  • C2消费者:其所在队列指定了只需要RoutingKey为info、error、warning的消息。
public class Provider {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = Utils.getConnection();
        Channel channel = connection.createChannel();
        //将通道声明指定交换机,
        //参数1:交换机名称
        //参数2:交换机类型,direct直连类型
        channel.exchangeDeclare("directLogs","direct");
        //设置消息的RoutingKey
        String routingKey="error";
        channel.basicPublish("directLogs",routingKey,null,"direct模式的error1111111111111".getBytes());
        Utils.closeConnectionAndChannel(channel,connection);
    }
}
public class Customer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = Utils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("directLogs","direct");
        String queue = channel.queueDeclare().getQueue();
        //临时队列绑定交换机,并指定队列的RoutingKey。此时只接收RoutingKey为info的消息
        channel.queueBind(queue,"directLogs","info");
        //channel.queueBind(queue,"directLogs","error");
        channel.basicConsume(queue,true,new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者info = " + new String(body));
            }
        });
    }

Topic 订阅

Topic类型和Direct类型都是根据RoutingKey把消息路由到不同的队列。只不过Topic类型可以让队列在绑定RoutingKey的时候使用通配符。Topic的RoutingKey一般都是由一个或多个单词组成,多个单词之间以“.”分割。

通配符
“ * ”:匹配一个单词
“ # ”:匹配一个或多个单词

#Topic之生产者
public class Provider {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = Utils.getConnection();
        Channel channel = connection.createChannel();
        //声明指定交换机,
        //参数2:交换机类型,topic
        channel.exchangeDeclare("topics","topic");
        //设置消息的RoutingKey
        String routingKey="user.save.all";
        channel.basicPublish("topics",routingKey,null,"topic模式的测试".getBytes());
        Utils.closeConnectionAndChannel(channel,connection);
    }
}
#topic之消费者
public class Customer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = Utils.getConnection();
        Channel channel = connection.createChannel();
        channel.exchangeDeclare("topics","topic");
        String queue = channel.queueDeclare().getQueue();
        //临时队列绑定交换机,并指定队列的RoutingKey。此时只接收RoutingKey为user接任意个数字符的消息
        channel.queueBind(queue,"topics","user.#");
        channel.basicConsume(queue,true,new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者user.save = " + new String(body));
            }
        });
    }
}

SpringBoot整合RabbitMQ

<!-- rabbitmq 依赖-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
#配置文件application.properties
spring.application.name=rabbitmq-springboot
spring.rabbitmq.host=120.79.28.120
spring.rabbitmq.port=5672
spring.rabbitmq.username=ems
spring.rabbitmq.password=123
spring.rabbitmq.virtual-host=/ems
#简单测试 helloworld 模式
#生产者

@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
    //注入RabbitMQTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Test
    public void test(){
        rabbitTemplate.convertAndSend("hello","hello,world.....");
    }
}

# 消费者
/**
 * @RabbitListener 声明消费者监听
 * queuesToDeclare声明一个队列
 */
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "hello"))
public class HelloCustomer {
    /*@RabbitHandler声明receive()为处理消息队列的回调方法*/
    @RabbitHandler
    public void receive(String message) {
        System.out.println("-------------------message=" + message);
    }
}
#work模型
#生产者
    @Test
    public void testWork() {
        for (int i = 0; i < 10; i++) {
            rabbitTemplate.convertAndSend("work", "work model " + i);
        }
    }

#两个消费者
@Component
public class WorkCustomer {
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }
    @RabbitListener(queuesToDeclare = @Queue("work"))
    public void receive2(String message){
        System.out.println("message2 = " + message);
    }
}
#广播模式
#生产者
    @Test
    public void testFanout() {
        /**
         * 参数1:exchange
         * 参数2:routingKey
         */
            rabbitTemplate.convertAndSend("fanout","", "fanout model ");
    }

#消费者
@Component
public class FanoutCustomer {
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,//创建临时队列
                    exchange = @Exchange(value = "fanout",type = "fanout"))//绑定交换机
    })
    public void receive1(String m){
        System.out.println("receive1 = " + m);
    }
    @RabbitListener(bindings = {
            @QueueBinding(value = @Queue,//创建临时队列
                    exchange = @Exchange(value = "fanout",type = "fanout"))//绑定交换机
    })
    public void receive2(String m){
        System.out.println("receive2 = " + m);
    }
}
#路由模式
#生产者
    @Test
    public void testRouting() {
        /**
         * 参数1:exchange
         * 参数2:routingKey
         */
        rabbitTemplate.convertAndSend("direct","info", "direct模式进行info路由发送 ");
    }


#消费者
@Component
public class RoutingCustomer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,//创建临时队列
                    exchange = @Exchange(value = "direct",type = "direct"),//自定义交换机和指定模型
                    key = {"info","error"}//指定路由Key
            )
    })
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,//创建临时队列
                    exchange = @Exchange(value = "direct",type = "direct"),//自定义交换机和指定模型
                    key = {"error"}//指定路由Key
            )
    })
    public void receive2(String message){
        System.out.println("message2 = " + message);
    }
}
#此时,receive1有路由key(“info”),所以只有receive1能够收到消息。
#Topic模式,订阅模式,动态路由
#生产者
    @Test
    public void testTopic() {
        /**
         * 参数1:exchange
         * 参数2:routingKey
         */
        rabbitTemplate.convertAndSend("topic","user.save", "user.save模式进行发送 ");
    }

#消费者
@Component
public class TopicCustomer {
    @RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue,
                    exchange = @Exchange(value = "topic",type = "topic"),//交换机=topic,模式type=topic
                    key = {"user.*","order.#"}//*匹配一个字符串,#匹配0个或多个
            )
    })
    public void receive1(String message){
        System.out.println("message1 = " + message);
    }
}

RabbitMQ 使用场景

来自RabbitMQ 简介以及使用场景

解耦
(为面向服务的架构(SOA)提供基本的最终一致性实现)

场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。

传统模式的缺点:

  • 假如库存系统无法访问,则订单减库存将失败,从而导致订单失败
  • 订单系统与库存系统耦合

引入消息队列

  • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
  • 库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
  • 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦
  • 为了保证库存肯定有,可以将队列大小设置成库存数量,或者采用其他方式解决。

基于消息的模型,关心的是“通知”,而非“处理”。
短信、邮件通知、缓存刷新等操作使用消息队列进行通知。

异步提升效率

场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种
1.串行的方式;2.并行方式

  • 串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端
  • 并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间
  • 引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:

流量削峰

应用场景:流量削锋是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。可以在前端加入消息队列。

引入消息队列的优缺点

  • 优点
    优点就是以上的那些场景应用,就是在特殊场景下有其对应的好处,解耦异步削峰

  • 缺点

  1. 系统的可用性降低
    系统引入的外部依赖越多,系统越容易挂掉,本来只是A系统调用BCD三个系统接口就好,ABCD四个系统不报错整个系统会正常运行。引入了MQ之后,虽然ABCD系统没出错,但MQ挂了以后,整个系统也会崩溃。
  2. 系统的复杂性提高
    引入了MQ之后,需要考虑的问题也变得多了,如何保证消息没有重复消费?如何保证消息不丢失?怎么保证消息传递的顺序?
  3. 一致性问题
    A系统发送完消息直接返回成功,但是BCD系统之中若有系统写库失败,则会产生数据不一致的问题。

RabbitMQ 集群

来自RabbitMQ 的集群架构

普通模式(同步交换机)

实现 rabbitMQ 的高可用集群,一般在并发和数据量不高的情况下,这种模式非常的好用且简单。

队列中的所有消息将在所有节点之间复制,但是消息队列只位于主节点上。
生产者发布的消息只放在Master节点的交换机上。消费者消费时可以从slave节点中消费消息,但如果master节点宕机,slave节点也无法进行消费。

普通模式

镜像集群模式(队列也能同步)

保证 100% 数据不丢失。在实际工作中也是用得最多的,并且实现非常的简单,一般互联网大厂都会构建这种镜像集群模式。

镜像队列,目的是为了保证 rabbitMQ 数据的高可靠性解决方案,主要就是实现数据的同步,一般来讲是 2 - 3 个节点实现数据同步。对于 100% 数据可靠性解决方案,一般是采用 3 个节点。

镜像模式

集群搭建

  • 创建文件夹目录
mkdir cluster
cd cluster/
mkdir rabbitmq01 rabbitmq02 rabbitmq03
  • docker创建容器
docker run -d --hostname rabbitmq01 --name rabbitmqCluster01 -v /home/rabbitmq/cluster/rabbitmq01:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie' rabbitmq:3.7-management

参数说明:
--hostname 指定rabbitmq主机名称
--name 指定docker容器名
-v 容器卷挂载,实现docker容器内的文件同步到主机
-p 指定 主机端口:容器端口 之间的映射
-e 指定环境变量
--link 实现两个容器间的互相通信

集群中 RABBITMQ_ERLANG_COOKIE 参数的值必须相同。
因为RabbitMQ是用Erlang实现的,Erlang Cookie相当于不同节点之间相互通讯的秘钥,Erlang节点通过交换Erlang Cookie获得认证。

docker run -d --hostname rabbitmq02 --name rabbitmqCluster02 -v /home/rabbitmq/cluster/rabbitmq02:/var/lib/rabbitmq -p 15673:15672 -p 5673:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie'  --link rabbitmqCluster01:rabbitmq01 rabbitmq:3.7-management
docker run -d --hostname rabbitmq03 --name rabbitmqCluster03 -v /home/rabbitmq/cluster/rabbitmq03:/var/lib/rabbitmq -p 15674:15672 -p 5674:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitmqCookie'  --link rabbitmqCluster01:rabbitmq01 --link rabbitmqCluster02:rabbitmq02  rabbitmq:3.7-management
  • 启动成功

可以分别访问一下管理界面的网址
http://192.168.9.219:15672
http://192.168.9.219:15673
http://192.168.9.219:15674
默认账号密码:guest/guest

  • 容器节点加入集群
docker exec -it rabbitmqCluster01 bash  #进入容器1
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
exit

docker exec -it rabbitmqCluster02 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbitmq01 
rabbitmqctl start_app
exit
#参数“--ram”表示设置为内存节点,忽略此参数默认为磁盘节点。
#@rabbitmq01 表示被加入的主机名,这个主机名就是集群的名称,也是主节点


docker exec -it rabbitmqCluster03 bash
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster --ram rabbit@rabbitmq01
rabbitmqctl start_app
exit

至此,完成普通模式的集群。在管理界面中测试:
主节点rabbitmq01上新建一个Exchange,会立即同步到rabbitmq02、rabbitmq03。

  • 实现镜像模式集群
    在上面的基础上,再完成以下操作。
#在cluster中任意节点启用策略,策略会自动同步到集群节点
#此处进入第一个节点
docker exec -it rabbitmqCluster01 bash

rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]

-p Vhost: 可选参数,针对指定vhost下的queue进行设置
Name: policy的名称
Pattern: queue的匹配模式(正则表达式)
Definition: 镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
    ha-mode: 指明镜像队列的模式,有效值为 all/exactly/nodes
        all: 表示在集群中所有的节点上进行镜像
        exactly: 表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
        nodes: 表示在指定的节点上进行镜像,节点名称通过ha-params指定
    ha-params: ha-mode模式需要用到的参数
    ha-sync-mode: 进行队列中消息的同步方式,有效值为automatic和manual
priority: 可选参数,policy的优先级

#将所有队列设置为镜像队列,即队列会被复制到各个节点,各个节点状态保持一直。
#这行命令在vhost名称为hrsystem创建了一个策略,
#策略名称为ha-all,策略模式为 all 即复制到所有节点,
#包含新增节点,策略正则表达式为 “^” 表示所有匹配所有队列名称。
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'

#清除策略
rabbitmqctl clear_policy ha-all

#策略的名称以”two”开始的队列镜像到群集中的任意两个节点,并进行自动同步:
rabbitmqctl set_policy ha-two "^two." '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'

#以”node”开头的队列镜像到集群中的特定节点的策略:
rabbitmqctl set_policy ha-nodes "^nodes." '{"ha-mode":"nodes","ha-params":["rabbit@nodeA", "rabbit@nodeB"]}'


#在rabbitmq02主机中查看策略:
root@rabbitmq02:/ rabbitmqctl list_policies

Listing policies for vhost "/" ...
/   ha-all  ^   all {"ha-mode":"all"}   0

rabbitmq 常用命令

rabbitmqctl list_queues:查看所有队列信息

rabbitmqctl stop_app:关闭应用(关闭当前启动的节点)

rabbitmqctl start_app:启动应用,和上述关闭命令配合使用,达到清空队列的目的

rabbitmqctl reset:从管理数据库中移除所有数据,例如配置过的用户和虚拟宿主, 删除所有持久化的消息(这个命令要在rabbitmqctl stop_app之后使用),重置以后,用户,虚拟vhost,都会清除

rabbitmqctl force_reset:作用和rabbitmqctl reset一样,区别是无条件重置节点,不管当前管理数据库状态以及集群的配置。如果数据库或者集群配置发生错误才使用这个最后的手段

rabbitmqctl status:节点状态

rabbitmqctl add_user username password:添加用户

rabbitmqctl list_users:列出所有用户

rabbitmqctl list_user_permissions username:列出用户权限

rabbitmqctl change_password username newpassword:修改密码

rabbitmqctl add_vhost vhostpath:创建虚拟主机

rabbitmqctl list_vhosts:列出所有虚拟主机

rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*":设置用户权限

rabbitmqctl list_permissions -p vhostpath:列出虚拟主机上的所有权限

rabbitmqctl clear_permissions -p vhostpath username:清除用户权限

rabbitmqctl -p vhostpath purge_queue blue:清除队列里的消息

rabbitmqctl delete_user username:删除用户

rabbitmqctl delete_vhost vhostpath:删除虚拟主机

rabbitmqctl cluster_status    查看集群状态

#修改集群节点类型,使用此命令前要停止rabbitmq应用
rabbitmqctl change_cluster_node_type {disc|ram} node_name

#将节点重集群中删除,允许离线执行
rabbitmqctl forget_cluster_node [--offiine]

#在集群中的节点应用启动前咨询clusternode节点的最新信息,并更新
#相应的集群信息。这个和join_cluster不同,他不加入集群
rabbitmqctl update_cluster_nodes {clusternode}


#确保节点可以启动,即使他不是最后一个关闭的节点
rabbitmqctl force_boot
(集群中的节点相继宕机,如果要恢复之前的集群,则需要按照节点宕机的先后顺序,从后向前启动节点,因为最后宕机的节点数据最完善)


#设置集群名称。集群名称在客户端连接的时候回通报给客户端。
#集群名称默认是集群中第一个节点的名称
rabbitmqctl set_cluster_name {name}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,884评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,347评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,435评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,509评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,611评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,837评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,987评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,730评论 0 267
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,194评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,525评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,664评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,334评论 4 330
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,944评论 3 313
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,764评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,997评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,389评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,554评论 2 349

推荐阅读更多精彩内容