请求端配置文件rabbitmq-context-client.xml<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task-3.1.xsd">
<!-- 定义RabbitMQ的连接工厂连接池 -->
<task:executor id="rmpsConnectionTaskExecutor" pool-size="1"/>
<rabbit:connection-factory id="rmpsConnectionFactory" channel-cache-size="10" executor="rmpsConnectionTaskExecutor"
host="localhost" port="5672" username="hjh" password="hjh"
virtual-host="hjh_host" />
<!--定义rabbit template用于数据的接收和发送 -->
<rabbit:template id="rmpsAmqpTemplate" connection-factory="rmpsConnectionFactory"
exchange="SLS_RMPS_EXCHANGE">
</rabbit:template>
<!--如果不指定这个admin,只要队列不存在,spring容器启动失败,也就是说队列的自动声明需要admin -->
<rabbit:admin id="rmpsConnectAdmin" connection-factory="rmpsConnectionFactory" />
<rabbit:queue id="clientToServerQueue" name="CLIENT_TO_SERVER_QUEUE" auto-declare="true"
durable="true" declared-by="rmpsConnectAdmin"/>
<rabbit:direct-exchange name="SLS_RMPS_EXCHANGE"
durable="true" declared-by="rmpsConnectAdmin">
<rabbit:bindings>
<rabbit:binding queue="clientToServerQueue" key="rmpsToJcKey" />
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 监听容器需要自己重写spring默认的监听容器 p:messageListener-ref="rmpsAmqpTemplate为重点,不能少-->
<bean id="container"
class="net.nxmax.atp.remoting.DynamicReplyMessageListenerContainer"
p:connectionFactory-ref="rmpsConnectionFactory"
p:prefetchCount="5" p:acknowledgeMode="NONE" p:concurrentConsumers="3"
p:messageListener-ref="rmpsAmqpTemplate" p:rabbitAdmin-ref="rmpsConnectAdmin">
</bean>
<bean id="crossDetection" class="com.hjh.rabbitmq.cross.CrossDetectionByMqService">
<property name="amqpTemplate" ref="rmpsAmqpTemplate" />
</bean>
</beans>
监听容器需要自己重写spring默认的监听容器
packagenet.nxmax.atp.remoting;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
importorg.springframework.amqp.core.Queue;
importorg.springframework.amqp.rabbit.core.RabbitTemplate;
importorg.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
publicclassDynamicReplyMessageListenerContainerextendsSimpleMessageListenerContainer
{
protectedstaticfinalLoggerlogger =LoggerFactory.getLogger(DynamicReplyMessageListenerContainer.class);
@Override
protectedvoid doInitialize()throwsException
{
logger.info("执行DynamicReplyMessageListenerContainer.doInitialize()");
super.doInitialize();
Objectlistener = getMessageListener();
if(listener instanceofRabbitTemplate) {
//这个队列是服务端的答复队列,
Queuequeue1 =newQueue("JC_TO_RMPS_QUEUE_1");
Queuequeue2 =newQueue("JC_TO_RMPS_QUEUE_2");
setQueues(queue1,queue2);
}
}
}
客户端请求实现类
packagecom.hjh.rabbitmq.cross;
importjava.util.concurrent.Semaphore;
importorg.springframework.amqp.core.Message;
importorg.springframework.amqp.core.MessageProperties;
importorg.springframework.amqp.rabbit.core.RabbitTemplate;
/**
* 交叉检测业务服务类,通过mq的方式请求
* @author jianhua.huang
* */
//@Service("crossDetectionByMq")
publicclass CrossDetectionByMqService
{
privateRabbitTemplate amqpTemplate;
//信号量,用来控制并发线程的最大数量
privateSemaphore sempahore =newSemaphore(6);
publicRabbitTemplate getAmqpTemplate()
{
returnamqpTemplate;
}
publicvoidsetAmqpTemplate(RabbitTemplate amqpTemplate)
{
this.amqpTemplate = amqpTemplate;
}
/**
* 交叉检测方法入口
* @author jianhua.huang
* */
publicvoiddoCrossDetection(StringmsgExt,StringroutingKey)
{
//通过mq远程请求交叉检测数据
MessageProperties msgProp =newMessageProperties();
msgProp.setContentType("text");
Stringstr="凹凸曼,凹凸曼 | "+ msgExt +","+ routingKey;
Message msg =newMessage(str.getBytes(),msgProp);
amqpTemplate.setReplyTimeout(11000);
Objectresponse =null;
longstart = System.currentTimeMillis();
try
{
sempahore.acquire();
System.out.println("发送出去的消息为 : {"+str+"}");
amqpTemplate.setReplyAddress(routingKey);
response = amqpTemplate.convertSendAndReceive("rmpsToJcKey", msg);
System.out.println("类型 : "+ response.getClass().getSimpleName());
}
catch(InterruptedException e)
{
e.printStackTrace();
}
finally
{
System.out.println(msgExt +"释放一个许可");
sempahore.release();
}
longend = System.currentTimeMillis();
System.out.println("response 为: {"+ response +"},耗时 = "+ (end - start));
}
}
客户端main启动实现,主要是测试并发
packagenet.nxmax.atp.startup;
importjava.util.concurrent.CountDownLatch;
importjava.util.concurrent.ExecutorService;
importjava.util.concurrent.Executors;
importorg.springframework.context.ApplicationContext;
importorg.springframework.context.support.ClassPathXmlApplicationContext;
importcom.hjh.rabbitmq.cross.CrossDetectionByMqService;
/**
* 非客户端启动类
* @author jianhua.huang
* */
publicclassSimpleClientStartupimplementsRunnable
{
privateString routeKey;
privateCountDownLatch countDown;
privateCrossDetectionByMqService cross;
publicSimpleClientStartup(String routeKey, CountDownLatch countDown, CrossDetectionByMqService cross)
{
this.routeKey = routeKey;
this.countDown = countDown;
this.cross = cross;
}
publicstaticvoidmain(String[] args)
{
ApplicationContext ctx =newClassPathXmlApplicationContext("rpc/fixq/rabbitmq-context-client.xml");
CrossDetectionByMqService cross = ctx.getBean("crossDetection",CrossDetectionByMqService.class);
cross.doCrossDetection(Thread.currentThread().getName(),"JC_TO_RMPS_KEY_1");
// ExecutorService service = Executors.newCachedThreadPool();
// final CountDownLatch countDown = new CountDownLatch(10);
//
// String[] keys = {"JC_TO_RMPS_KEY_1","JC_TO_RMPS_KEY_2"};
//
// for(int i=0;i<10;i++)
// {
// int idx = i % 2;
// String routeKey = keys[idx];
// service.execute(new SimpleClientStartup(routeKey, countDown, cross));
// countDown.countDown();
// }
// service.shutdown();
}
@Override
publicvoidrun()
{
try
{
countDown.await();
cross.doCrossDetection(Thread.currentThread().getName(), routeKey);
}catch(Exception e)
{
e.printStackTrace();
}
}
publicvoidtestCountDown()
{
ExecutorService service = Executors.newCachedThreadPool();
finalCountDownLatch cdOrder =newCountDownLatch(1);
finalCountDownLatch cdAnswer =newCountDownLatch(3);
for(inti=0;i<3;i++)
{
Runnable runnable =newRunnable()
{
publicvoidrun()
{
try
{
System.out.println("线程"+ Thread.currentThread().getName() +"正准备接受命令");
cdOrder.await();
System.out.println("线程"+ Thread.currentThread().getName() +"已接受命令");
// Thread.sleep((long)(Math.random()*10000));
cdAnswer.countDown();
System.out.println("线程"+ Thread.currentThread().getName() +"回应命令处理结果");
}catch(Exception e)
{
e.printStackTrace();
}
}
};
service.execute(runnable);
}
try
{
Thread.sleep((long)(Math.random()*10000));
System.out.println("线程"+ Thread.currentThread().getName() +"即将发布命令");
cdOrder.countDown();
System.out.println("线程"+ Thread.currentThread().getName() +"已发送命令,正在等待结果");
cdAnswer.await();
System.out.println("线程"+ Thread.currentThread().getName() +"已收到所有响应结果");
}catch(Exception e)
{
e.printStackTrace();
}
service.shutdown();
}
}
<beansxmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"
xmlns:lang="http://www.springframework.org/schema/lang"xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd
http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd">
<rabbit:connection-factoryid="rabbitConnectionFactory"
host="localhost"cache-mode="CHANNEL"username="hjh"password="hjh"virtual-host="hjh_host"/>
<rabbit:templateid="amqpTemplateInternetProxy"exchange="SLS_RMPS_EXCHANGE"
connection-factory="rabbitConnectionFactory">
</rabbit:template>
<rabbit:adminid="rmpsConnectAdmin"connection-factory="rabbitConnectionFactory"/>
<rabbit:listener-containeracknowledge="none"
concurrency="3"prefetch="10">
<rabbit:listenerref="service"queue-names="CLIENT_TO_SERVER_QUEUE"/>
</rabbit:listener-container>
<beanid="service"
class="com.hjh.rabbitmq.cross.CrossDetectionConsumer">
<propertyname="amqpTemplate"ref="amqpTemplateInternetProxy"/>
</bean>
</beans>
packagecom.hjh.rabbitmq.cross;
importorg.springframework.amqp.core.Message;
importorg.springframework.amqp.core.MessageListener;
importorg.springframework.amqp.core.MessageProperties;
importorg.springframework.amqp.rabbit.core.RabbitTemplate;
importorg.springframework.stereotype.Service;
@Service
publicclassCrossDetectionConsumerimplementsMessageListener
{
privateRabbitTemplate amqpTemplate;
publicvoidonMessage(Message message)
{
String msg =newString(message.getBody());
String json ="模拟交叉检测平台的响应信息, 响应线程名称为【 "+ msg.substring(msg.indexOf("|") +1) +"】rpc 请求!";
MessageProperties msgProp = message.getMessageProperties();
String replyTo = msgProp.getReplyTo();
System.out.println("replyTo = "+ replyTo);
System.out.println("当前处理线程名称 : "+ Thread.currentThread().getName());
Message responseMsg =newMessage(json.getBytes(),message.getMessageProperties());
//"jcToRmpsKey"
try
{
Thread.sleep(5000);
}catch(Exception e)
{
// TODO: handle exception
}
amqpTemplate.send(replyTo, responseMsg);
}
publicRabbitTemplategetAmqpTemplate()
{
returnamqpTemplate;
}
publicvoidsetAmqpTemplate(RabbitTemplate amqpTemplate)
{
this.amqpTemplate = amqpTemplate;
}
}
packagenet.nxmax.atp.startup;
importjava.io.BufferedReader;
importjava.io.InputStreamReader;
importorg.slf4j.Logger;
importorg.slf4j.LoggerFactory;
importorg.springframework.amqp.core.AmqpTemplate;
importorg.springframework.context.support.ClassPathXmlApplicationContext;
publicclass ServerStartup {
/** Logger */
privatestaticfinalLoggerlog= LoggerFactory
.getLogger(ServerStartup.class);
publicstaticvoidmain(String[] args)throwsException {
newServerStartup().go();
}
privatevoidgo()throwsException {
log.info("Create ApplicationContext");
ClassPathXmlApplicationContext ctx =newClassPathXmlApplicationContext("rpc/fixq/applicationContext.xml");
AmqpTemplate amqpTemplate = ctx.getBean("amqpTemplateInternetProxy",AmqpTemplate.class);
// amqpTemplate.convertAndSend("", "");
log.info("Create ApplicationContext Finish.");
BufferedReaderr =newBufferedReader(newInputStreamReader(System.in));
while(!r.readLine().equalsIgnoreCase("exit")) {
}
log.info("Exit now");
ctx.destroy();
}
}
注意:
由于用spring整合rabbitmq来实现rpc调用技术,在客户端(即request端)访问远程服务器的时候,其本质就是用jdk的
阻塞队列做的线程同步,因此并不限于服务器到底是使用异步的rabbitmq还是同步的rpc机制,这个总结是基于作者阅读
源码发现的。
总之一句话,rabbitmq本质上是不存在rpc同步挑用这一说,只是spring在做代码封装的时候就考虑到了这个功能,所以在代码层面做出了支持。
对此项技术有疑问的童鞋,可以发送邮件至1807325935@qq.com中一起探讨。