博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
SpringJMS解析2-JmsTemplate
阅读量:6520 次
发布时间:2019-06-24

本文共 5608 字,大约阅读时间需要 18 分钟。

尽管消息接收可以使用消息监听器的方式替代模版方法,但是在发送的时候是无法替代的,在Spring中必须要使用JmsTemplate提供的方法来进行发送操作,可见JmsTemplate类的重要性,那么我们对于Spring整合消息服务的分析就从JmsTemplate开始。

查看JmsTemplate的类型层级结构图发现实现了InitializingBean接口,接口方法实现是在JmsAccessor类中。发现函数中只是一个验证的功能,并没有逻辑实现。

public void afterPropertiesSet() {  if (getConnectionFactory() == null) {    throw new IllegalArgumentException("Property 'connectionFactory' is required");  }}

在Spring中发送消息可以通过JmsTemplate中提供的方法来实现。

public void send(final String destinationName, final MessageCreator messageCreator) throws JmsException {    execute(new SessionCallback() {      @Override      public Object doInJms(Session session) throws JMSException {        Destination destination = resolveDestinationName(session, destinationName);        doSend(session, destination, messageCreator);        return null;      }    }, false);  }

现在的风格不得不让我们回想起JdbcTemplate的类实现风格,极为相似,都是提取一个公共的方法作为最底层、最通用的功能实现,然后又通过回调函数的不同来区分个性化的功能。我们首先查看通用代码的抽取实现。

通用代码抽取execute()

在execute中封装了Connection以及Session的创建操作

public 
T execute(SessionCallback
action, boolean startConnection) throws JmsException { Assert.notNull(action, "Callback object must not be null"); Connection conToClose = null; Session sessionToClose = null; try { //事务相关 Session sessionToUse = ConnectionFactoryUtils.doGetTransactionalSession( getConnectionFactory(), this.transactionalResourceFactory, startConnection); if (sessionToUse == null) { //创建connection conToClose = createConnection(); //根据connection创建session sessionToClose = createSession(conToClose); //是否开启向服务器推送连接信息,只有接收信息时需要,发送时不需要 if (startConnection) { conToClose.start(); } sessionToUse = sessionToClose; } if (logger.isDebugEnabled()) { logger.debug("Executing callback on JMS Session: " + sessionToUse); } //调用回调函数 return action.doInJms(sessionToUse); } catch (JMSException ex) { throw convertJmsAccessException(ex); } finally { //关闭session JmsUtils.closeSession(sessionToClose); //释放连接 ConnectionFactoryUtils.releaseConnection(conToClose, getConnectionFactory(), startConnection); } }

为了发送一条消息需要做很多工作,需要很多的辅助代码,而这些代码又都是千篇一律的,没有任何的差异,所以execute方法的目的就是帮助我们抽离这些冗余代码使我们更加专注于业务逻辑的实现。从函数中看,这些冗余代码包括创建Connection、创建Session、当然也包括关闭Session和关闭Connection。而在准备工作结束后,调用回调函数将程序引入用户自定义实现的个性化处理。

发送消息的实现

有了基类辅助实现,使Spring更加专注于个性的处理,也就是说Spring使用execute方法中封装了冗余代码,而将个性化的代码实现放在了回调函数doInJms函数中。在发送消息的功能中回调函数通过局部类实现。

protected void doSend(Session session, Destination destination, MessageCreator messageCreator)      throws JMSException {    Assert.notNull(messageCreator, "MessageCreator must not be null");    MessageProducer producer = createProducer(session, destination);    try {      Message message = messageCreator.createMessage(session);      if (logger.isDebugEnabled()) {        logger.debug("Sending created message: " + message);      }      doSend(producer, message);      // Check commit - avoid commit call within a JTA transaction.      if (session.getTransacted() && isSessionLocallyTransacted(session)) {        // Transacted session created by this template -> commit.        JmsUtils.commitIfNecessary(session);      }    }    finally {      JmsUtils.closeMessageProducer(producer);    }  }

在发送消息遵循着消息发送的规则,比如根据Destination创建MessageProducer、创建Message,并使用MessageProducer实例来发送消息。

接收消息

我们通常使用jmsTemplate.receive(destination)来接收简单的消息,那么这个功能Spring是如何封装的呢?

@Override    public Message receive(Destination destination) throws JmsException {        return receiveSelected(destination, null);    }    @Override    public Message receiveSelected(final Destination destination, final String messageSelector) throws JmsException {        return execute(new SessionCallback
() { @Override public Message doInJms(Session session) throws JMSException { return doReceive(session, destination, messageSelector); } }, true); } protected Message doReceive(Session session, Destination destination, String messageSelector) throws JMSException { return doReceive(session, createConsumer(session, destination, messageSelector)); } protected Message doReceive(Session session, MessageConsumer consumer) throws JMSException { try { // Use transaction timeout (if available). long timeout = getReceiveTimeout(); JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager.getResource(getConnectionFactory()); if (resourceHolder != null && resourceHolder.hasTimeout()) { timeout = Math.min(timeout, resourceHolder.getTimeToLiveInMillis()); } Message message = doReceive(consumer, timeout); if (session.getTransacted()) { // Commit necessary - but avoid commit call within a JTA transaction. if (isSessionLocallyTransacted(session)) { // Transacted session created by this template -> commit. JmsUtils.commitIfNecessary(session); } } else if (isClientAcknowledge(session)) { // Manually acknowledge message, if any. if (message != null) { message.acknowledge(); } } return message; } finally { JmsUtils.closeMessageConsumer(consumer); } }

实现的套路与发送差不多,同样还是使用execute函数来封装冗余的公共操作,而最终的目标还是通过consumer.receive()来接收消息,其中的过程就是对于MessageConsumer的创建以及一些辅助操作。

 

转载地址:http://mwgfo.baihongyu.com/

你可能感兴趣的文章
中科院院士谭铁牛:人工智能发展需要理性务实
查看>>
真正的开源与人造开源之间的斗争愈演愈烈
查看>>
Coding and Paper Letter(十七)
查看>>
ES6特性之:模板字符串
查看>>
NIO框架入门(四):Android与MINA2、Netty4的跨平台UDP双向通信实战
查看>>
Netflix如何节省92%视频编码成本?
查看>>
ios兼容iphonex刘海屏解决方案
查看>>
HBuilder使用夜神模拟器调试Android应用
查看>>
就是要你懂TCP -- 握手和挥手
查看>>
Andrew Ng机器学习公开课笔记 -- Regularization and Model Selection
查看>>
《Python游戏编程快速上手》一1.3 如何使用本书
查看>>
《Android游戏开发详解》——第1章,第1.3节声明和初始化变量
查看>>
《Visual Studio程序员箴言》----1.2 滚动与导航
查看>>
Processing编程学习指南2.7 Processing参考文档
查看>>
架构师速成-架构目标之伸缩性\安全性
查看>>
用机器学习流程去建模我们的平台架构
查看>>
执行可运行jar包时读取jar包中的文件
查看>>
linux下ExtMail邮件使用及管理平台
查看>>
linux中iptables设置自建dns服务器的端口
查看>>
TP5+PHPexcel导入xls,xlsx文件读取数据
查看>>