Hello World
这部分的教程我们将写两个程序:一个生产者发送一条消息和一个消费者接收消息并打印出来。我们会略过java api
的一些细节的地方,我们从非常简单的开始。一个Hello World
的消息。
在下图总,P
是我们的生产者,C
是我们的消费者。中间的箱子是队列 —— 代表消费者的消息缓存区。
The Java client library
RabbitMQ支持多种协议。这个教程使用
AMQP 0-9-1
,开源的、通用的消息协议。有许多不同的语言实现RabbitMQ,此处我们用java来实现。
下载客户端库,它依赖
SLF4J API
和SLF4J Simple
。拷贝这些文件到你的工作目录。
请注意:
SLF4J Simple
对这个教程来说已经足够,但是最好使用完全成熟的日志开发库,例如:logback。
RabbitMQ的java客户端也支持Maven,groupId是com.rabbitmq
和artifactId是amqp-client
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.0.2</version>
<scope>compile</scope>
</dependency>
现在我们有了java的客户端以及它的依赖,我们可以写一些代码了。
NOTE:官网是没有使用IDE的。这里我用的是idea,毕竟大家都是开发者,IDE会方便点。
发送(Sending)
我们将调用我们的消息生产者(发送者)Send
和消息的消费者(接收者)Recv
。生产者将连接到RabbitMQ,发送一条消息,然后退出。
在Send.java
中,我们需要导入一些类:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
设置类和队列名:
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws java.io.IOException {
...
}
}
然后我们创建一个服务连接:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
这个连接抽象了socket连接,维护版本协议和身份认证。这里我们用的是本地机器——因此是localhost
。如果我们想在不同的机器连接到中间件,只需要指定不同的名字和IP地址即可。
接下来我们创建一个通道(channel)。
我们还需要定义一个队列供我们将消息推送给它。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
定义一个队列是幂等的——它只有在队列不存在的时候创建。消息的内容是字节数组,所以你能编码成任何你想要的数据。
最后,我们要关闭通道(channel)和连接(conection);
channel.close();
connection.close();
下面是完整的类:Send.java
package com.roachfu.tutorial.rabbitmq.website.helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者
*/
public class Send {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个通道
Channel channel = connection.createChannel();
//指定队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World";
// 向队列中发送一条消息
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
// 关闭通道和连接
channel.close();
connection.close();
}
}
接收端(Receiving)
消费者从RabbitMQ中拉取消息,不同于生产者生产一条消息,我们会一直监听消息并答应它们。
代码(在Recv.java中)导入的和Send导入的差不多:
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
额外不同的DefaultConsumer
是一个实现了Consumer
接口的类,通过服务拉取缓存的消息。
设置也和publisher差不多,我们打开一个连接和通道并定义一个队列,需要注意的是队列名和Send.java
中定义的一样。
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv)
throws java.io.IOException,
java.lang.InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
...
}
}
注意:在这里我们也要定义队列,因为我们有可能在生产者之前启动消费者,我们需要保证我们在消费消息的时候队列是存在的。
我们要告知服务从队列中将消息交付给我们。由于它将异步的推送消息给我们,我们提供一个回调对象缓存消息直到我们已经使用了它们。这就是DefaultConsumer
子类所做的。
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
以下是Recv.java完整代码
package com.roachfu.tutorial.rabbitmq.website.helloworld;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者
*/
public class Recv {
private static final String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. . . ");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
输出
- 生产者
[x] Sent 'Hello World'
- 消费者
[*] Waiting for messages. . .
[x] Received 'Hello World'