使用 Java client 操作 RabbitMQ

上一篇我们介绍了 RabbitMQ 的工作流程,以及常用的交换机,接下里我们结合具体的例子来看一下具体的应用。

使用 Java client 操作 RabbitMQ 可以参考以下步骤来实现:

  1. 创建连接工厂(ConnectionFactory),设置 RabbitMQ 服务信息、账号、密码等
  2. 使用连接工厂建立连接(Connection)
  3. 使用连接创建数据通道(Channel)
  4. 创建交换机(Exchange)、队列(Queue),绑定两者
  5. 使用数据通道发送、接收消息
  6. 释放数据通道、连接

本文的例子会结合 Fanout ExchangeDirect ExchangeTopic Exchange这三种常用的交换机来实现 。

一、准备工作

创建一个 Maven 项目,添加 RabbitMQ 依赖:

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.10.0</version>
</dependency>

考虑到代码的复用,我们先将一些通用的步骤封装一下:

public class RabbitMQConnection {
    ConnectionFactory connectionFactory;

    public RabbitMQConnection() {
        // 创建连接工厂
        connectionFactory = new ConnectionFactory();
        // 设置 RabbitMQ 服务地址
        connectionFactory.setHost("localhost");
        // 设置 RabbitMQ 服务端口
        connectionFactory.setPort(5672);
        // 设置账号
        connectionFactory.setUsername("admin");
        // 设置密码
        connectionFactory.setPassword("123456");
        // 设置虚拟主机
        connectionFactory.setVirtualHost("/");
    }


    public void create(String connectionName, RabbitMQTask rabbitMQTask) {
        Connection connection = null;
        Channel channel = null;
        try {
            // 创建连接
            connection = connectionFactory.newConnection(connectionName);
            // 创建数据通道
            channel = connection.createChannel();
            // 执行消息的发送、接收等业务
            rabbitMQTask.execute(channel);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 释放资源
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }

            }
        }
    }
}
public interface RabbitMQTask {
    void execute(Channel channel) throws IOException;
}

RabbitMQConnection中已经实现了资源的连接以及释放,第4、5步骤,需要在RabbitMQTask接口里,根据具体的业务去实现execute()方法。

简单起见,我们将生产者和消费者定义在同一个项目里。

二、Fanout Exchange

生产者代码如下:

public class Producer {
    public void work() {
        RabbitMQConnection rabbitMQConnection = new RabbitMQConnection();
        rabbitMQConnection.create("生产者", new RabbitMQTask() {
            public void execute(Channel channel) throws IOException {
                // 交换机名称
                String exchangeName = "fanout-example-exchange";
                // 交换机类型
                String exchangeType = "fanout";
                // 创建交换机,true表示持久化交换机,一般都为true
                channel.exchangeDeclare(exchangeName, exchangeType, true);

                // 创建消息队列
                /**
                 * 参数1:队列名称
                 * 参数2:是否需要持久化,非持久化队列在服务重启后,队列中的消息会丢失,一般都为true
                 * 参数3:排它性,是否是一个独占队列
                 * 参数4:队列中的消息被消费完后是否自动删除队列
                 * 参数5:附加参数,Headers Exchange 的参数可以在这里传递
                 */
                String queueName1 = "queue1";
                channel.queueDeclare(queueName1, true, false, false, null);
                String queueName2 = "queue2";
                channel.queueDeclare(queueName2, true, false, false, null);
                String queueName3 = "queue3";
                channel.queueDeclare(queueName3, true, false, false, null);

                // 绑定队列和交换机,不指定 routingKey
                channel.queueBind(queueName1, exchangeName, "");
                channel.queueBind(queueName2, exchangeName, "");
                channel.queueBind(queueName3, exchangeName, "");

                String routingKey = "";

                // 准备消息内容
                String message = "hello rabbitmq";
                // 发送消息
                channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
                System.out.println("生产者发送的消息是:" + message);
            }
        });
    }

    public static void main(String[] args) {
        new Producer().work();
    }
}

生产者核心的业务都是基于Channel对象实现的,包括创建Fanout类型的交换机,创建队列,将交换机和队列绑定,由于使用了Fanout类型的交换机所以绑定时不用指定routingKey,发送消息时需要携带交换机名称和一个空的routingKey

消费者代码如下:

public class Consumer implements Runnable {
    private String queueName;

    public Consumer(String queueName) {
        this.queueName = queueName;
    }

    @Override
    public void run() {
        RabbitMQConnection rabbitMQConnection = new RabbitMQConnection();
        rabbitMQConnection.create("消费者", new RabbitMQTask() {
            public void execute(Channel channel) throws IOException {
                // 接收消息
                channel.basicConsume(queueName, true, new DeliverCallback() {
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        System.out.println(queueName + "收到的消息是:" + new String(message.getBody(), "utf-8"));
                    }
                }, new CancelCallback() {
                    public void handle(String consumerTag) throws IOException {
                        System.out.println("接收消息失败");
                    }
                });

                System.out.println(queueName + "开始接收消息");
                System.in.read();
            }
        });
    }

    public static void main(String[] args) {
        new Thread(new Consumer("queue1")).start();
        new Thread(new Consumer("queue2")).start();
        new Thread(new Consumer("queue3")).start();
    }
}

消费者的实现比较简单,用多线程模拟三个消费者,分别接收三个队列中的消息。

分别启动生产者和消费者,结果符合Fanout类型交换机的效果,消息分别进入到三个队列中,最终被消费者掉:

生产者

消费者

三、Direct Exchange

掌握 Fanout Exchange 的用法,学习 Direct Exchange 就很简单了。

修改一下生产者的代码:

public void work() {
        RabbitMQConnection rabbitMQConnection = new RabbitMQConnection();
        rabbitMQConnection.create("生产者", new RabbitMQTask() {
            public void execute(Channel channel) throws IOException {
                // 交换机名称
                String exchangeName = "direct-example-exchange";
                // 交换机类型
                String exchangeType = "direct";
                // 创建交换机,true表示持久化交换机,一般都为true
                channel.exchangeDeclare(exchangeName, exchangeType, true);

                // 创建消息队列
                String queueName1 = "queue1";
                channel.queueDeclare(queueName1, true, false, false, null);
                String queueName2 = "queue2";
                channel.queueDeclare(queueName2, true, false, false, null);
                String queueName3 = "queue3";
                channel.queueDeclare(queueName3, true, false, false, null);

                // 绑定队列和交换机,指定routingKey
                channel.queueBind(queueName1, exchangeName, "red");
                channel.queueBind(queueName2, exchangeName, "green");
                channel.queueBind(queueName3, exchangeName, "blue");

                String routingKey = "red";
                String routingKey2 = "blue";

                // 准备消息内容
                String message = "hello rabbitmq";
                String message2 = "hello amqp";
                // 发送消息
                channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
                channel.basicPublish(exchangeName, routingKey2, null, message.getBytes());
                System.out.println("生产者发送的消息是:" + message);
                System.out.println("生产者发送的消息是:" + message2);
            }
        });
    }

使用 Direct 类型的交换机时,绑定交换机和队列时需要指定routingKey,发送消息时也要携带上routingKey去匹配消息队列。消费者代码不需要修改。按照预期生产者的消息最终会分别进入queue1queue3,最终被消费掉。

运行程序,结果符合预期:

生产者
消费者

四、Topic Exchange

上一篇我们已经了解到,Topic Exchange 和 Direct Exchange 的差别就是 Topic Exchange 的routingKey支持通配符模糊匹配,更像一种精细化的 Direct Exchange。

只需要修改生产者的代码:

public void work() {
        RabbitMQConnection rabbitMQConnection = new RabbitMQConnection();
        rabbitMQConnection.create("生产者", new RabbitMQTask() {
            public void execute(Channel channel) throws IOException {
                // 交换机名称
                String exchangeName = "topic-example-exchange";
                // 交换机类型
                String exchangeType = "topic";
                // 创建交换机,true表示持久化交换机,一般都为true
                channel.exchangeDeclare(exchangeName, exchangeType, true);

                // 创建消息队列
                String queueName1 = "queue1";
                channel.queueDeclare(queueName1, true, false, false, null);
                String queueName2 = "queue2";
                channel.queueDeclare(queueName2, true, false, false, null);
                String queueName3 = "queue3";
                channel.queueDeclare(queueName3, true, false, false, null);

                // 绑定队列和交换机,指定routingKey
                channel.queueBind(queueName1, exchangeName, "*.red.#");
                channel.queueBind(queueName2, exchangeName, "green.*");
                channel.queueBind(queueName3, exchangeName, "#.blue.#");

                String routingKey = "green.red";

                // 准备消息内容
                String message = "hello rabbitmq";
                // 发送消息
                channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
                System.out.println("生产者发送的消息是:" + message);
            }
        });
    }

按照模糊匹配规则,消息会进入queue1queue2,最终被消费掉:

生产者

消费者

https://www.rabbitmq.com/getstarted.html

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 206,968评论 6 482
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 88,601评论 2 382
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 153,220评论 0 344
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 55,416评论 1 279
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 64,425评论 5 374
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,144评论 1 285
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,432评论 3 401
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,088评论 0 261
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 43,586评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,028评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,137评论 1 334
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,783评论 4 324
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,343评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,333评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,559评论 1 262
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,595评论 2 355
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,901评论 2 345

推荐阅读更多精彩内容