1 消费者
1创建连接--------2创建信道---------3声明交换机(名称,类型)------4发送消息交换器(交换器,路由)
public class FanoutProducer {
public final static String EXCHANGE_NAME = "fanout_logs";
public static void main(String[] args) throws IOException, TimeoutException {
/**
* 创建连接连接到MabbitMQ
*/
ConnectionFactory factory = new ConnectionFactory();
// 设置MabbitMQ所在主机ip或者主机名
factory.setHost("127.0.0.1");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个信道
Channel channel = connection.createChannel();
// 指定转发
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String queueName = "producer_create";
channel.queueDeclare(queueName,false,false,
false,null);
channel.queueBind(queueName,EXCHANGE_NAME,"test");
//所有日志严重性级别
String[] severities={"error","info","warning"};
for(int i=0;i<3;i++){
String severity = severities[i%3];//每一次发送一条不同严重性的日志
// 发送的消息
String message = "Hello World_"+(i+1);
//参数1:exchange name
//参数2:routing key
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] Sent '" + severity +"':'"+ message + "'");
}
// 关闭频道和连接
channel.close();
connection.close();
}
}
2 消费者
1创建连接------2创建信道----3声明交换机类型(名称,类型)-----创建一个队列----队列绑定到交换机(队列名称 交换机名称 路由名称)
public class Consumer1 {
public static void main(String[] argv) throws IOException,
InterruptedException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
// 打开连接和创建频道,与发送端一样
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.exchangeDeclare(FanoutProducer.EXCHANGE_NAME, "direct");
// 声明一个随机队列
String queueName = channel.queueDeclare().getQueue();
//所有日志严重性级别
String[] severities={"error","info","warning"};
for (String severity : severities) {
//关注所有级别的日志(多重绑定)
channel.queueBind(queueName, FanoutProducer.EXCHANGE_NAME, severity);
}
System.out.println(" [*] Waiting for messages:");
// 创建队列消费者
final Consumer consumerA = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" Received " + envelope.getRoutingKey() + "':'" + message + "'");
}
};
channel.basicConsume(queueName, true, consumerA);
}
}
信道:信道是生产消费者与rabbit通信的渠道,生产者publish或是消费者subscribe一个队列都是通过信道来通信的。信道是建立在TCP连接上的虚拟连接,什么意思呢?就是说rabbitmq在一条TCP上建立成百上千个信道来达到多个线程处理,这个TCP被多个线程共享,每个线程对应一个信道,信道在rabbit都有唯一的ID ,保证了信道私有性,对应上唯一的线程使用
交换器 生产者发送消息的地方 通过信道发送
总结
生产者---------通过信道 按照某种标识(路由) 发送给到到交换器
消费者 --------- 创建队列,通过某种标识(路由)绑定到交换器,获取指定的消息进行消费