Menu

  • Home
  • Work
    • Cloud
      • Virtualization
      • IaaS
      • PaaS
    • Java
    • Go
    • C
    • C++
    • JavaScript
    • PHP
    • Python
    • Architecture
    • Others
      • Assembly
      • Ruby
      • Perl
      • Lua
      • Rust
      • XML
      • Network
      • IoT
      • GIS
      • Algorithm
      • AI
      • Math
      • RE
      • Graphic
    • OS
      • Linux
      • Windows
      • Mac OS X
    • BigData
    • Database
      • MySQL
      • Oracle
    • Mobile
      • Android
      • IOS
    • Web
      • HTML
      • CSS
  • Life
    • Cooking
    • Travel
    • Gardening
  • Gallery
  • Video
  • Music
  • Essay
  • Home
  • Work
    • Cloud
      • Virtualization
      • IaaS
      • PaaS
    • Java
    • Go
    • C
    • C++
    • JavaScript
    • PHP
    • Python
    • Architecture
    • Others
      • Assembly
      • Ruby
      • Perl
      • Lua
      • Rust
      • XML
      • Network
      • IoT
      • GIS
      • Algorithm
      • AI
      • Math
      • RE
      • Graphic
    • OS
      • Linux
      • Windows
      • Mac OS X
    • BigData
    • Database
      • MySQL
      • Oracle
    • Mobile
      • Android
      • IOS
    • Web
      • HTML
      • CSS
  • Life
    • Cooking
    • Travel
    • Gardening
  • Gallery
  • Video
  • Music
  • Essay

Spring对JMS的支持

7
Aug
2015

Spring对JMS的支持

By Alex
/ in Java
/ tags ActiveMQ, JMS, Spring
0 Comments
简介

Spring 提供了JMS的集成,简化JMS的使用,提供的API封装类似于Spring的JDBC集成。

JMS的功能大体上分为两类——接收、发送消息。Spring提供了:

  1. JmsTemplate来完成消息的发送、同步接收
  2. 消息监听器容器(message listener containers)来异步的接收消息(事件驱动)
使用Spring JMS
JmsTemplate

JmsTemplate是Spring的jms-core包的核心类,它封装了资源的创建、释放部分,简化收发消息的代码。

JmsTemplate类是线程安全的,除非需要不同的收发配置(QoS),整个系统可以仅仅使用单例的JmsTemplaet。

基本API

发送消息时,你可以提供一个MessageCreator回调来实现消息发送:

Java
1
2
3
4
5
6
jmsTemplate.send( queueName, new MessageCreator() {
    @Override
    public Message createMessage( Session session ) throws JMSException {
        return session.createTextMessage( ... );
    }
} );

如果需要使用更加复杂的JMS API,可以调用下面的方法:

Java
1
2
3
4
5
6
7
8
9
10
jmsTemplate.execute( new SessionCallback<Object>() {
    public Object doInJms( Session session ) throws JMSException {
        return null;
    }
} );
jmsTemplate.execute( new ProducerCallback<Object>() {
    public Object doInJms( Session session, MessageProducer producer ) throws JMSException {
        return null;
    }
} );

JMS API中有很多QoS设置项,例如优先级、TTL,这些都暴露为JmsTemplate的属性。你可以根据业务需要创建多个JmsTemplate。某些JMS实现在ConnectionFactory级别管理QoS设置,要明确指定使用JmsTemplate上的QoS选项,需要调用 jmsTemplate.setExplicitQosEnabled( true ) 

JmsTemplate集成了简单请求/应答模式的支持:

Java
1
2
3
4
5
Message response = jmsTemplate.sendAndReceive( session -> {
    Message msg = session.createTextMessage();
    msg.setJMSReplyTo( "临时答复队列名称" );
    return msg;
} );
连接

你需要为JmsTemplate提供一个ConnectionFactory的引用,后者属于JMS规范的一部分,任何JMS提供商都需要实现。ConnectionFactory是创建到MOM中间件连接的工厂。

资源缓存

标准的JMS API,在收发消息时需要使用很多中间对象:ConnectionFactory ⇨ Connection ⇨ Session ⇨ MessageProducer ⇨ send()。这些中间对象如果反复创建、销毁,会影响性能。Spring提供了一些具有缓存能力的ConnectionFactory实现:

SingleConnectionFactory
在每次被调用createConnection()时,总是返回同一个连接,并且忽略对close()的调用

CachingConnectionFactory

在SingleConnectionFactory的基础上,增加了Session、MessageProducer、MessageConsumer缓存的能力。默认缓存大小1

设置sessionCacheSize可以增加Session的缓存数量。由于会话是基于不同签收模式(acknowledgment mode)来缓存的,因此实际缓存的Session数量可能多于声明的数量,sessionCacheSize设置为1时最多缓存4个(对应4种签收模式)

MessageProducer、MessageConsumer连同它们所属的Session一起缓存。在缓存时考虑其特别设置的属性。MessageProducer基于destination缓存。MessageConsumer基于destination、selector、noLocal递送标记、持久化订阅名称来缓存

消息监听容器

Spring支持类似于EJB的消息驱动Bean的功能,该功能由消息监听容器实现,你可以用XML配置或者注解驱动的方式,指定POJO的方法会自动被调用以完成消息的处理。

消息监听容器负责从JMS队列/主题监听消息,并负责管理多线程的消息消费、事务管理、资源获取/是否。

消息监听容器的实现类主要有:

SimpleMessageListenerContainer

在启动时创建固定数量的Session、Consumer,使用JMS标准API MessageConsumer.setMessageListener()来注册监听器,由JMS实现来执行回调

支持JMS原生事务 —— 切换sessionTransacted标记或者设置acknowledge为acknowledge,你的回调抛出的异常会导致回滚

不支持外部管理的事务,兼容性较好

DefaultMessageListenerContainer 

基于轮询方式实现。支持外部管理事务,当使用JtaTransactionManager时,每个接收到的消息都注册到XA事务,与JEE环境兼容

容器的缓存级别可以被定制,如果不启用缓存,每当接收一个连接时都会创建Collection、Session。不启用缓存 + 非持久化订阅可能导致在高负载时丢失消息

该容器还能够支持消息代理宕机重启后,自动恢复自己的功能。默认的,它使用一个简单的BackOff实现,每5秒重试连接到代理,你可以指定自己的BackOff实现

XML配置示例
XML
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
<!--
  container-type = default表示DefaultMessageListenerContainer
  concurrency = 1 表示每一个jms:listener对应的Session/Consumer个数(也就是线程数?)
  destination-type = durableTopic 表示目标是持久化订阅,这种情况下 client-id + subscription是必须的
  cache为缓存方式:
      none:不缓存
      connection:为每个监听器线程缓存Connection对象
      session:为每个监听器线程缓存Connection、Session对象
      consumer:为每个监听器线程缓存Connection、Session、Consumer对象
      auto:默认值,通常取值consumer,但是指定了外部事务管理器时,取值none
      transaction-manager:事务管理器
-->
<jms:listener-container container-type="default" connection-factory="cf" acknowledge="auto" concurrency="1"
                        destination-type="durableTopic" client-id="APP" cache="auto">
    <jms:listener destination="监听的主题" ref="你提供的回调Bean" method="你提供的回调方法" subscription="持久化订阅名称"/>
</jms:listener-container>
 
<!-- 不使用JMS名字空间时的类似配置 -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="messageListener" ref="messageListener" />
</bean>
<bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
    <constructor-arg>
        <bean class="你提供的回调Bean"/>
    </constructor-arg>
    <property name="defaultListenerMethod" value="你提供的回调方法"/>
    <!-- 禁止消息类型转换 -->
    <property name="messageConverter">
        <null/>
    </property>
</bean>
注解配置示例

编程式配置DMLC:

Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Configuration
@EnableJms
public class AppConfig {
 
    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() {
        DefaultJmsListenerContainerFactory factory =
                new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setDestinationResolver(destinationResolver());
        factory.setConcurrency("3-10");
        return factory;
    }
}

监听器配置:

Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
public class MyService {
    // 使用@Header注入单个消息头、使用@Headers注入所有消息头,接收参数必须为Map
    // 使用@Payload可以明确指定接收载荷的参数
    // Message、Session可以被注入
    @JmsListener(destination = "queue")
    public void processOrder(String data, @Header("ordertype") String orderType) {
    }
    
    
    @JmsListener(destination = "myDestination")
    @SendTo("status") // 可以指定应答队列
    public OrderStatus processOrder(Order order) {
        return status;
    }
} 
MessageListenerAdapter

这个组件允许你把任意POJO作为MDP(消息驱动POJO)使用,而不需要实现MessageListener接口:

Java
1
2
3
4
5
public class MessageListenerAdapter implements MessageListener, SessionAwareMessageListener<Message>{
    public MessageListenerAdapter(Object delegate) {
         setDelegate(delegate);
    }
}

构造方法参数delegate可以是任何对象, MessageListenerAdapter假设它实现以下接口:

Java
1
2
3
4
5
6
7
public interface MessageDelegate {
    // 根据消息类型的不同,自动选择调用的方法
    void handleMessage(String message);
    void handleMessage(Map message);
    void handleMessage(byte[] message);
    void handleMessage(Serializable message);
}

delegate也可以提供其它形式的方法,但是需要指定defaultListenerMethod配置。 

回调方法

回调方法可以具有参数,对应接收到的消息,也可以具有返回值,对应发送到原始消息Reply-To字段指定的,或者listener默认配置的应答队列的应答消息。

参数类型可以是各种各样的,但是需要messageConverter的配合。

事务管理

Spring提供了针对单个ConnectionFactory的JmsTransactionManager,事务资源是绑定到线程的。JmsTransactionManager会自动检测到事务资源并打开之。

在JEE环境下,ConnectionFactory可能对Connection、Session进行缓存,资源因而是跨事务重用的。在独立运行环境下,使用Spring的SingleConnectionFactory会导致Connection共享,而每个事务使用独立的Session。

JmsTemplate也可以使用JtaTransactionManager + 支持XA的ConnectionFactory,以支持分布式事务。

在独立运行环境下,你需要设置JmsTemplate的sessionAcknowledgeMode、sessionTransacted来告知Spring是否使用Jms事务。当联用PlatformTransactionManager、JmsTemplate时,JmsTemplate总是提供事务性的Session对象

消息转换

JmsTemplate的 convertAndSend()、 receiveAndConvert()可以在收发消息时完成数据类型转换。 转换工作代理给了MessageConverter类,缺省实现支持String  ⇨ TextMessage、byte[] ⇨ BytesMesssage、java.util.Map ⇨ MapMessage之间的转换。

如果要对转换后,发送前对JMS消息对象进行处理,可以使用MessagePostProcessor:

Java
1
2
3
4
5
6
7
jmsTemplate.convertAndSend( queue, map, new MessagePostProcessor() {
    public Message postProcessMessage( Message message ) throws JMSException {
        message.setIntProperty( "AccountID", 1000 );
        message.setJMSCorrelationID( "123-00001" );
        return message;
    }
} );
常见问题
DMLC无法接收消息

通过JMX查看ActiveMQ队列状态,发现消息已经被消费。打开WARN级别的Spring日志,发现报方法找不到。配置的回调方法:

Java
1
2
public void onP2PMessage( Message message ) throws Exception {
}

此问题的原因是,回调方法的参数必须是JMS消息的载荷。因此,对于TextMessage,回调参数类型应该是String。 

 

← libvirt学习笔记
Next Post →

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code class="" title="" data-url=""> <del datetime=""> <em> <i> <q cite=""> <strike> <strong> <pre class="" title="" data-url=""> <span class="" title="" data-url="">

Related Posts

  • 基于JMS的ActiveMQ Java客户端示例
  • ActiveMQ知识集锦
  • ActiveMQ代理网络无法连接的问题一例
  • Spring知识集锦
  • Spring与Quartz的任务调度比较

Recent Posts

  • Investigating and Solving the Issue of Failed Certificate Request with ZeroSSL and Cert-Manager
  • A Comprehensive Study of Kotlin for Java Developers
  • 背诵营笔记
  • 利用LangChain和语言模型交互
  • 享学营笔记
ABOUT ME

汪震 | Alex Wong

江苏淮安人,现居北京。目前供职于腾讯云,专注容器方向。

GitHub:gmemcc

Git:git.gmem.cc

Email:gmemjunk@gmem.cc@me.com

ABOUT GMEM

绿色记忆是我的个人网站,域名gmem.cc中G是Green的简写,MEM是Memory的简写,CC则是我的小天使彩彩名字的简写。

我在这里记录自己的工作与生活,同时和大家分享一些编程方面的知识。

GMEM HISTORY
v2.00:微风
v1.03:单车旅行
v1.02:夏日版
v1.01:未完成
v0.10:彩虹天堂
v0.01:阳光海岸
MIRROR INFO
Meta
  • Log in
  • Entries RSS
  • Comments RSS
  • WordPress.org
Recent Posts
  • Investigating and Solving the Issue of Failed Certificate Request with ZeroSSL and Cert-Manager
    In this blog post, I will walk ...
  • A Comprehensive Study of Kotlin for Java Developers
    Introduction Purpose of the Study Understanding the Mo ...
  • 背诵营笔记
    Day 1 Find Your Greatness 原文 Greatness. It’s just ...
  • 利用LangChain和语言模型交互
    LangChain是什么 从名字上可以看出来,LangChain可以用来构建自然语言处理能力的链条。它是一个库 ...
  • 享学营笔记
    Unit 1 At home Lesson 1 In the ...
  • K8S集群跨云迁移
    要将K8S集群从一个云服务商迁移到另外一个,需要解决以下问题: 各种K8S资源的迁移 工作负载所挂载的数 ...
  • Terraform快速参考
    简介 Terraform用于实现基础设施即代码(infrastructure as code)—— 通过代码( ...
  • 草缸2021
    经过四个多月的努力,我的小小荷兰景到达极致了状态。

  • 编写Kubernetes风格的APIServer
    背景 前段时间接到一个需求做一个工具,工具将在K8S中运行。需求很适合用控制器模式实现,很自然的就基于kube ...
  • 记录一次KeyDB缓慢的定位过程
    环境说明 运行环境 这个问题出现在一套搭建在虚拟机上的Kubernetes 1.18集群上。集群有三个节点: ...
  • eBPF学习笔记
    简介 BPF,即Berkeley Packet Filter,是一个古老的网络封包过滤机制。它允许从用户空间注 ...
  • IPVS模式下ClusterIP泄露宿主机端口的问题
    问题 在一个启用了IPVS模式kube-proxy的K8S集群中,运行着一个Docker Registry服务 ...
  • 念爷爷
      今天是爷爷的头七,十二月七日、阴历十月廿三中午,老人家与世长辞。   九月初,回家看望刚动完手术的爸爸,发

  • 6 杨梅坑

  • liuhuashan
    深圳人才公园的网红景点 —— 流花山

  • 1 2020年10月拈花湾

  • 内核缺陷触发的NodePort服务63秒延迟问题
    现象 我们有一个新创建的TKE 1.3.0集群,使用基于Galaxy + Flannel(VXLAN模式)的容 ...
  • Galaxy学习笔记
    简介 Galaxy是TKEStack的一个网络组件,支持为TKE集群提供Overlay/Underlay容器网 ...
TOPLINKS
  • Zitahli's blue 91 people like this
  • 梦中的婚礼 64 people like this
  • 汪静好 61 people like this
  • 那年我一岁 36 people like this
  • 为了爱 28 people like this
  • 小绿彩 26 people like this
  • 彩虹姐姐的笑脸 24 people like this
  • 杨梅坑 6 people like this
  • 亚龙湾之旅 1 people like this
  • 汪昌博 people like this
  • 2013年11月香山 10 people like this
  • 2013年7月秦皇岛 6 people like this
  • 2013年6月蓟县盘山 5 people like this
  • 2013年2月梅花山 2 people like this
  • 2013年淮阴自贡迎春灯会 3 people like this
  • 2012年镇江金山游 1 people like this
  • 2012年徽杭古道 9 people like this
  • 2011年清明节后扬州行 1 people like this
  • 2008年十一云龙公园 5 people like this
  • 2008年之秋忆 7 people like this
  • 老照片 13 people like this
  • 火一样的六月 16 people like this
  • 发黄的相片 3 people like this
  • Cesium学习笔记 90 people like this
  • IntelliJ IDEA知识集锦 59 people like this
  • 基于Kurento搭建WebRTC服务器 38 people like this
  • Bazel学习笔记 37 people like this
  • PhoneGap学习笔记 32 people like this
  • NaCl学习笔记 32 people like this
  • 使用Oracle Java Mission Control监控JVM运行状态 29 people like this
  • Ceph学习笔记 27 people like this
  • 基于Calico的CNI 27 people like this
Tag Cloud
ActiveMQ AspectJ CDT Ceph Chrome CNI Command Cordova Coroutine CXF Cygwin DNS Docker eBPF Eclipse ExtJS F7 FAQ Groovy Hibernate HTTP IntelliJ IO编程 IPVS JacksonJSON JMS JSON JVM K8S kernel LB libvirt Linux知识 Linux编程 LOG Maven MinGW Mock Monitoring Multimedia MVC MySQL netfs Netty Nginx NIO Node.js NoSQL Oracle PDT PHP Redis RPC Scheduler ServiceMesh SNMP Spring SSL svn Tomcat TSDB Ubuntu WebGL WebRTC WebService WebSocket wxWidgets XDebug XML XPath XRM ZooKeeper 亚龙湾 单元测试 学习笔记 实时处理 并发编程 彩姐 性能剖析 性能调优 文本处理 新特性 架构模式 系统编程 网络编程 视频监控 设计模式 远程调试 配置文件 齐塔莉
Recent Comments
  • qg on Istio中的透明代理问题
  • heao on 基于本地gRPC的Go插件系统
  • 黄豆豆 on Ginkgo学习笔记
  • cloud on OpenStack学习笔记
  • 5dragoncon on Cilium学习笔记
  • Archeb on 重温iptables
  • C/C++编程:WebSocketpp(Linux + Clion + boostAsio) – 源码巴士 on 基于C/C++的WebSocket库
  • jerbin on eBPF学习笔记
  • point on Istio中的透明代理问题
  • G on Istio中的透明代理问题
  • 绿色记忆:Go语言单元测试和仿冒 on Ginkgo学习笔记
  • point on Istio中的透明代理问题
  • 【Maven】maven插件开发实战 – IT汇 on Maven插件开发
  • chenlx on eBPF学习笔记
  • Alex on eBPF学习笔记
  • CFC4N on eBPF学习笔记
  • 李运田 on 念爷爷
  • yongman on 记录一次KeyDB缓慢的定位过程
  • Alex on Istio中的透明代理问题
  • will on Istio中的透明代理问题
  • will on Istio中的透明代理问题
  • haolipeng on 基于本地gRPC的Go插件系统
  • 吴杰 on 基于C/C++的WebSocket库
©2005-2025 Gmem.cc | Powered by WordPress | 京ICP备18007345号-2