rabbitmq整合spring实现rpc技术调用

请求端配置文件rabbitmq-context-client.xml<?xml version="1.0" encoding="UTF-8"?>

<beans xmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

xmlns:rabbit="http://www.springframework.org/schema/rabbit"

xmlns:task="http://www.springframework.org/schema/task"

xmlns:p="http://www.springframework.org/schema/p"

xsi:schemaLocation="http://www.springframework.org/schema/beans

http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

http://www.springframework.org/schema/rabbit

http://www.springframework.org/schema/rabbit/spring-rabbit-1.2.xsd

http://www.springframework.org/schema/task

http://www.springframework.org/schema/task/spring-task-3.1.xsd">

<!-- 定义RabbitMQ的连接工厂连接池 -->

<task:executor id="rmpsConnectionTaskExecutor" pool-size="1"/>

<rabbit:connection-factory id="rmpsConnectionFactory" channel-cache-size="10" executor="rmpsConnectionTaskExecutor"

host="localhost" port="5672" username="hjh" password="hjh"

virtual-host="hjh_host" />

<!--定义rabbit template用于数据的接收和发送 -->

<rabbit:template id="rmpsAmqpTemplate" connection-factory="rmpsConnectionFactory"

exchange="SLS_RMPS_EXCHANGE">

</rabbit:template>

<!--如果不指定这个admin,只要队列不存在,spring容器启动失败,也就是说队列的自动声明需要admin -->

<rabbit:admin id="rmpsConnectAdmin" connection-factory="rmpsConnectionFactory" />

<rabbit:queue id="clientToServerQueue" name="CLIENT_TO_SERVER_QUEUE" auto-declare="true"

durable="true" declared-by="rmpsConnectAdmin"/>

<rabbit:direct-exchange name="SLS_RMPS_EXCHANGE"

durable="true" declared-by="rmpsConnectAdmin">

<rabbit:bindings>

<rabbit:binding queue="clientToServerQueue" key="rmpsToJcKey" />

</rabbit:bindings>

</rabbit:direct-exchange>

<!-- 监听容器需要自己重写spring默认的监听容器 p:messageListener-ref="rmpsAmqpTemplate为重点,不能少-->

<bean id="container"

class="net.nxmax.atp.remoting.DynamicReplyMessageListenerContainer"

p:connectionFactory-ref="rmpsConnectionFactory"

p:prefetchCount="5" p:acknowledgeMode="NONE" p:concurrentConsumers="3"

p:messageListener-ref="rmpsAmqpTemplate" p:rabbitAdmin-ref="rmpsConnectAdmin">

</bean>

<bean id="crossDetection" class="com.hjh.rabbitmq.cross.CrossDetectionByMqService">

<property name="amqpTemplate" ref="rmpsAmqpTemplate" />

</bean>

</beans>

监听容器需要自己重写spring默认的监听容器

packagenet.nxmax.atp.remoting;

importorg.slf4j.Logger;

importorg.slf4j.LoggerFactory;

importorg.springframework.amqp.core.Queue;

importorg.springframework.amqp.rabbit.core.RabbitTemplate;

importorg.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;

publicclassDynamicReplyMessageListenerContainerextendsSimpleMessageListenerContainer

{

protectedstaticfinalLoggerlogger =LoggerFactory.getLogger(DynamicReplyMessageListenerContainer.class);

@Override

protectedvoid doInitialize()throwsException

{

logger.info("执行DynamicReplyMessageListenerContainer.doInitialize()");

super.doInitialize();

Objectlistener = getMessageListener();

if(listener instanceofRabbitTemplate) {

//这个队列是服务端的答复队列,

Queuequeue1 =newQueue("JC_TO_RMPS_QUEUE_1");

Queuequeue2 =newQueue("JC_TO_RMPS_QUEUE_2");

setQueues(queue1,queue2);

}

}

}

客户端请求实现类

packagecom.hjh.rabbitmq.cross;

importjava.util.concurrent.Semaphore;

importorg.springframework.amqp.core.Message;

importorg.springframework.amqp.core.MessageProperties;

importorg.springframework.amqp.rabbit.core.RabbitTemplate;

/**

* 交叉检测业务服务类,通过mq的方式请求

* @author jianhua.huang

* */

//@Service("crossDetectionByMq")

publicclass CrossDetectionByMqService

{

privateRabbitTemplate amqpTemplate;

//信号量,用来控制并发线程的最大数量

privateSemaphore sempahore =newSemaphore(6);

publicRabbitTemplate getAmqpTemplate()

{

returnamqpTemplate;

}

publicvoidsetAmqpTemplate(RabbitTemplate amqpTemplate)

{

this.amqpTemplate = amqpTemplate;

}

/**

* 交叉检测方法入口

* @author jianhua.huang

* */

publicvoiddoCrossDetection(StringmsgExt,StringroutingKey)

{

//通过mq远程请求交叉检测数据

MessageProperties msgProp =newMessageProperties();

msgProp.setContentType("text");

Stringstr="凹凸曼,凹凸曼 | "+ msgExt +","+ routingKey;

Message msg =newMessage(str.getBytes(),msgProp);

amqpTemplate.setReplyTimeout(11000);

Objectresponse =null;

longstart = System.currentTimeMillis();

try

{

sempahore.acquire();

System.out.println("发送出去的消息为 : {"+str+"}");

amqpTemplate.setReplyAddress(routingKey);

response = amqpTemplate.convertSendAndReceive("rmpsToJcKey", msg);

System.out.println("类型 : "+ response.getClass().getSimpleName());

}

catch(InterruptedException e)

{

e.printStackTrace();

}

finally

{

System.out.println(msgExt +"释放一个许可");

sempahore.release();

}

longend = System.currentTimeMillis();

System.out.println("response 为: {"+ response +"},耗时 = "+ (end - start));

}

}

客户端main启动实现,主要是测试并发

packagenet.nxmax.atp.startup;

importjava.util.concurrent.CountDownLatch;

importjava.util.concurrent.ExecutorService;

importjava.util.concurrent.Executors;

importorg.springframework.context.ApplicationContext;

importorg.springframework.context.support.ClassPathXmlApplicationContext;

importcom.hjh.rabbitmq.cross.CrossDetectionByMqService;

/**

* 非客户端启动类

* @author jianhua.huang

* */

publicclassSimpleClientStartupimplementsRunnable

{

privateString routeKey;

privateCountDownLatch countDown;

privateCrossDetectionByMqService cross;

publicSimpleClientStartup(String routeKey, CountDownLatch countDown, CrossDetectionByMqService cross)

{

this.routeKey = routeKey;

this.countDown = countDown;

this.cross = cross;

}

publicstaticvoidmain(String[] args)

{

ApplicationContext ctx =newClassPathXmlApplicationContext("rpc/fixq/rabbitmq-context-client.xml");

CrossDetectionByMqService cross = ctx.getBean("crossDetection",CrossDetectionByMqService.class);

cross.doCrossDetection(Thread.currentThread().getName(),"JC_TO_RMPS_KEY_1");

// ExecutorService service = Executors.newCachedThreadPool();

// final CountDownLatch countDown = new CountDownLatch(10);

//

// String[] keys = {"JC_TO_RMPS_KEY_1","JC_TO_RMPS_KEY_2"};

//

// for(int i=0;i<10;i++)

// {

// int idx = i % 2;

// String routeKey = keys[idx];

// service.execute(new SimpleClientStartup(routeKey, countDown, cross));

// countDown.countDown();

// }

// service.shutdown();

}

@Override

publicvoidrun()

{

try

{

countDown.await();

cross.doCrossDetection(Thread.currentThread().getName(), routeKey);

}catch(Exception e)

{

e.printStackTrace();

}

}

publicvoidtestCountDown()

{

ExecutorService service = Executors.newCachedThreadPool();

finalCountDownLatch cdOrder =newCountDownLatch(1);

finalCountDownLatch cdAnswer =newCountDownLatch(3);

for(inti=0;i<3;i++)

{

Runnable runnable =newRunnable()

{

publicvoidrun()

{

try

{

System.out.println("线程"+ Thread.currentThread().getName() +"正准备接受命令");

cdOrder.await();

System.out.println("线程"+ Thread.currentThread().getName() +"已接受命令");

// Thread.sleep((long)(Math.random()*10000));

cdAnswer.countDown();

System.out.println("线程"+ Thread.currentThread().getName() +"回应命令处理结果");

}catch(Exception e)

{

e.printStackTrace();

}

}

};

service.execute(runnable);

}

try

{

Thread.sleep((long)(Math.random()*10000));

System.out.println("线程"+ Thread.currentThread().getName() +"即将发布命令");

cdOrder.countDown();

System.out.println("线程"+ Thread.currentThread().getName() +"已发送命令,正在等待结果");

cdAnswer.await();

System.out.println("线程"+ Thread.currentThread().getName() +"已收到所有响应结果");

}catch(Exception e)

{

e.printStackTrace();

}

service.shutdown();

}

}

服务端配置文件applicationContext.xml

<beansxmlns="http://www.springframework.org/schema/beans"

xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"

xmlns:lang="http://www.springframework.org/schema/lang"xmlns:rabbit="http://www.springframework.org/schema/rabbit"

xmlns:util="http://www.springframework.org/schema/util"

xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd

http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd

http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd

http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd

http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-3.0.xsd">

<rabbit:connection-factoryid="rabbitConnectionFactory"

host="localhost"cache-mode="CHANNEL"username="hjh"password="hjh"virtual-host="hjh_host"/>

<rabbit:templateid="amqpTemplateInternetProxy"exchange="SLS_RMPS_EXCHANGE"

connection-factory="rabbitConnectionFactory">

</rabbit:template>

<rabbit:adminid="rmpsConnectAdmin"connection-factory="rabbitConnectionFactory"/>

<rabbit:listener-containeracknowledge="none"

concurrency="3"prefetch="10">

<rabbit:listenerref="service"queue-names="CLIENT_TO_SERVER_QUEUE"/>

</rabbit:listener-container>

<beanid="service"

class="com.hjh.rabbitmq.cross.CrossDetectionConsumer">

<propertyname="amqpTemplate"ref="amqpTemplateInternetProxy"/>

</bean>

</beans>

服务端CrossDetectionConsumer类型实现

packagecom.hjh.rabbitmq.cross;

importorg.springframework.amqp.core.Message;

importorg.springframework.amqp.core.MessageListener;

importorg.springframework.amqp.core.MessageProperties;

importorg.springframework.amqp.rabbit.core.RabbitTemplate;

importorg.springframework.stereotype.Service;

@Service

publicclassCrossDetectionConsumerimplementsMessageListener

{

privateRabbitTemplate amqpTemplate;

publicvoidonMessage(Message message)

{

String msg =newString(message.getBody());

String json ="模拟交叉检测平台的响应信息, 响应线程名称为【 "+ msg.substring(msg.indexOf("|") +1) +"】rpc 请求!";

MessageProperties msgProp = message.getMessageProperties();

String replyTo = msgProp.getReplyTo();

System.out.println("replyTo = "+ replyTo);

System.out.println("当前处理线程名称 : "+ Thread.currentThread().getName());

Message responseMsg =newMessage(json.getBytes(),message.getMessageProperties());

//"jcToRmpsKey"

try

{

Thread.sleep(5000);

}catch(Exception e)

{

// TODO: handle exception

}

amqpTemplate.send(replyTo, responseMsg);

}

publicRabbitTemplategetAmqpTemplate()

{

returnamqpTemplate;

}

publicvoidsetAmqpTemplate(RabbitTemplate amqpTemplate)

{

this.amqpTemplate = amqpTemplate;

}

}

服务端main启动类

packagenet.nxmax.atp.startup;

importjava.io.BufferedReader;

importjava.io.InputStreamReader;

importorg.slf4j.Logger;

importorg.slf4j.LoggerFactory;

importorg.springframework.amqp.core.AmqpTemplate;

importorg.springframework.context.support.ClassPathXmlApplicationContext;

publicclass ServerStartup {

/** Logger */

privatestaticfinalLoggerlog= LoggerFactory

.getLogger(ServerStartup.class);

publicstaticvoidmain(String[] args)throwsException {

newServerStartup().go();

}

privatevoidgo()throwsException {

log.info("Create ApplicationContext");

ClassPathXmlApplicationContext ctx =newClassPathXmlApplicationContext("rpc/fixq/applicationContext.xml");

AmqpTemplate amqpTemplate = ctx.getBean("amqpTemplateInternetProxy",AmqpTemplate.class);

// amqpTemplate.convertAndSend("", "");

log.info("Create ApplicationContext Finish.");

BufferedReaderr =newBufferedReader(newInputStreamReader(System.in));

while(!r.readLine().equalsIgnoreCase("exit")) {

}

log.info("Exit now");

ctx.destroy();

}

}

注意:

由于用spring整合rabbitmq来实现rpc调用技术,在客户端(即request端)访问远程服务器的时候,其本质就是用jdk的

阻塞队列做的线程同步,因此并不限于服务器到底是使用异步的rabbitmq还是同步的rpc机制,这个总结是基于作者阅读

源码发现的。

总结

总之一句话,rabbitmq本质上是不存在rpc同步挑用这一说,只是spring在做代码封装的时候就考虑到了这个功能,所以在代码层面做出了支持。

对此项技术有疑问的童鞋,可以发送邮件至1807325935@qq.com中一起探讨。

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 211,265评论 6 490
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,078评论 2 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 156,852评论 0 347
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,408评论 1 283
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,445评论 5 384
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,772评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,921评论 3 406
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,688评论 0 266
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,130评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,467评论 2 325
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,617评论 1 340
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,276评论 4 329
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,882评论 3 312
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,740评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,967评论 1 265
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,315评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,486评论 2 348

推荐阅读更多精彩内容