Spring对JMS的支持
Spring 提供了JMS的集成,简化JMS的使用,提供的API封装类似于Spring的JDBC集成。
JMS的功能大体上分为两类——接收、发送消息。Spring提供了:
- JmsTemplate来完成消息的发送、同步接收
- 消息监听器容器(message listener containers)来异步的接收消息(事件驱动)
JmsTemplate是Spring的jms-core包的核心类,它封装了资源的创建、释放部分,简化收发消息的代码。
JmsTemplate类是线程安全的,除非需要不同的收发配置(QoS),整个系统可以仅仅使用单例的JmsTemplaet。
发送消息时,你可以提供一个MessageCreator回调来实现消息发送:
1 2 3 4 5 6 |
jmsTemplate.send( queueName, new MessageCreator() { @Override public Message createMessage( Session session ) throws JMSException { return session.createTextMessage( ... ); } } ); |
如果需要使用更加复杂的JMS API,可以调用下面的方法:
1 2 3 4 5 6 7 8 9 10 |
jmsTemplate.execute( new SessionCallback<Object>() { public Object doInJms( Session session ) throws JMSException { return null; } } ); jmsTemplate.execute( new ProducerCallback<Object>() { public Object doInJms( Session session, MessageProducer producer ) throws JMSException { return null; } } ); |
JMS API中有很多QoS设置项,例如优先级、TTL,这些都暴露为JmsTemplate的属性。你可以根据业务需要创建多个JmsTemplate。某些JMS实现在ConnectionFactory级别管理QoS设置,要明确指定使用JmsTemplate上的QoS选项,需要调用 jmsTemplate.setExplicitQosEnabled( true )
JmsTemplate集成了简单请求/应答模式的支持:
1 2 3 4 5 |
Message response = jmsTemplate.sendAndReceive( session -> { Message msg = session.createTextMessage(); msg.setJMSReplyTo( "临时答复队列名称" ); return msg; } ); |
你需要为JmsTemplate提供一个ConnectionFactory的引用,后者属于JMS规范的一部分,任何JMS提供商都需要实现。ConnectionFactory是创建到MOM中间件连接的工厂。
标准的JMS API,在收发消息时需要使用很多中间对象:ConnectionFactory ⇨ Connection ⇨ Session ⇨ MessageProducer ⇨ send()。这些中间对象如果反复创建、销毁,会影响性能。Spring提供了一些具有缓存能力的ConnectionFactory实现:
SingleConnectionFactory 在每次被调用createConnection()时,总是返回同一个连接,并且忽略对close()的调用 |
CachingConnectionFactory 在SingleConnectionFactory的基础上,增加了Session、MessageProducer、MessageConsumer缓存的能力。默认缓存大小1 设置sessionCacheSize可以增加Session的缓存数量。由于会话是基于不同签收模式(acknowledgment mode)来缓存的,因此实际缓存的Session数量可能多于声明的数量,sessionCacheSize设置为1时最多缓存4个(对应4种签收模式) MessageProducer、MessageConsumer连同它们所属的Session一起缓存。在缓存时考虑其特别设置的属性。MessageProducer基于destination缓存。MessageConsumer基于destination、selector、noLocal递送标记、持久化订阅名称来缓存 |
Spring支持类似于EJB的消息驱动Bean的功能,该功能由消息监听容器实现,你可以用XML配置或者注解驱动的方式,指定POJO的方法会自动被调用以完成消息的处理。
消息监听容器负责从JMS队列/主题监听消息,并负责管理多线程的消息消费、事务管理、资源获取/是否。
消息监听容器的实现类主要有:
SimpleMessageListenerContainer 在启动时创建固定数量的Session、Consumer,使用JMS标准API MessageConsumer.setMessageListener()来注册监听器,由JMS实现来执行回调 支持JMS原生事务 —— 切换sessionTransacted标记或者设置acknowledge为acknowledge,你的回调抛出的异常会导致回滚 不支持外部管理的事务,兼容性较好 |
DefaultMessageListenerContainer 基于轮询方式实现。支持外部管理事务,当使用JtaTransactionManager时,每个接收到的消息都注册到XA事务,与JEE环境兼容 容器的缓存级别可以被定制,如果不启用缓存,每当接收一个连接时都会创建Collection、Session。不启用缓存 + 非持久化订阅可能导致在高负载时丢失消息 该容器还能够支持消息代理宕机重启后,自动恢复自己的功能。默认的,它使用一个简单的BackOff实现,每5秒重试连接到代理,你可以指定自己的BackOff实现 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
<!-- container-type = default表示DefaultMessageListenerContainer concurrency = 1 表示每一个jms:listener对应的Session/Consumer个数(也就是线程数?) destination-type = durableTopic 表示目标是持久化订阅,这种情况下 client-id + subscription是必须的 cache为缓存方式: none:不缓存 connection:为每个监听器线程缓存Connection对象 session:为每个监听器线程缓存Connection、Session对象 consumer:为每个监听器线程缓存Connection、Session、Consumer对象 auto:默认值,通常取值consumer,但是指定了外部事务管理器时,取值none transaction-manager:事务管理器 --> <jms:listener-container container-type="default" connection-factory="cf" acknowledge="auto" concurrency="1" destination-type="durableTopic" client-id="APP" cache="auto"> <jms:listener destination="监听的主题" ref="你提供的回调Bean" method="你提供的回调方法" subscription="持久化订阅名称"/> </jms:listener-container> <!-- 不使用JMS名字空间时的类似配置 --> <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="messageListener" ref="messageListener" /> </bean> <bean id="messageListener" class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> <constructor-arg> <bean class="你提供的回调Bean"/> </constructor-arg> <property name="defaultListenerMethod" value="你提供的回调方法"/> <!-- 禁止消息类型转换 --> <property name="messageConverter"> <null/> </property> </bean> |
编程式配置DMLC:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
@Configuration @EnableJms public class AppConfig { @Bean public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() { DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); factory.setConnectionFactory(connectionFactory()); factory.setDestinationResolver(destinationResolver()); factory.setConcurrency("3-10"); return factory; } } |
监听器配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
@Component public class MyService { // 使用@Header注入单个消息头、使用@Headers注入所有消息头,接收参数必须为Map // 使用@Payload可以明确指定接收载荷的参数 // Message、Session可以被注入 @JmsListener(destination = "queue") public void processOrder(String data, @Header("ordertype") String orderType) { } @JmsListener(destination = "myDestination") @SendTo("status") // 可以指定应答队列 public OrderStatus processOrder(Order order) { return status; } } |
这个组件允许你把任意POJO作为MDP(消息驱动POJO)使用,而不需要实现MessageListener接口:
1 2 3 4 5 |
public class MessageListenerAdapter implements MessageListener, SessionAwareMessageListener<Message>{ public MessageListenerAdapter(Object delegate) { setDelegate(delegate); } } |
构造方法参数delegate可以是任何对象, MessageListenerAdapter假设它实现以下接口:
1 2 3 4 5 6 7 |
public interface MessageDelegate { // 根据消息类型的不同,自动选择调用的方法 void handleMessage(String message); void handleMessage(Map message); void handleMessage(byte[] message); void handleMessage(Serializable message); } |
delegate也可以提供其它形式的方法,但是需要指定defaultListenerMethod配置。
回调方法可以具有参数,对应接收到的消息,也可以具有返回值,对应发送到原始消息Reply-To字段指定的,或者listener默认配置的应答队列的应答消息。
参数类型可以是各种各样的,但是需要messageConverter的配合。
Spring提供了针对单个ConnectionFactory的JmsTransactionManager,事务资源是绑定到线程的。JmsTransactionManager会自动检测到事务资源并打开之。
在JEE环境下,ConnectionFactory可能对Connection、Session进行缓存,资源因而是跨事务重用的。在独立运行环境下,使用Spring的SingleConnectionFactory会导致Connection共享,而每个事务使用独立的Session。
JmsTemplate也可以使用JtaTransactionManager + 支持XA的ConnectionFactory,以支持分布式事务。
在独立运行环境下,你需要设置JmsTemplate的sessionAcknowledgeMode、sessionTransacted来告知Spring是否使用Jms事务。当联用PlatformTransactionManager、JmsTemplate时,JmsTemplate总是提供事务性的Session对象
JmsTemplate的 convertAndSend()、 receiveAndConvert()可以在收发消息时完成数据类型转换。 转换工作代理给了MessageConverter类,缺省实现支持String ⇨ TextMessage、byte[] ⇨ BytesMesssage、java.util.Map ⇨ MapMessage之间的转换。
如果要对转换后,发送前对JMS消息对象进行处理,可以使用MessagePostProcessor:
1 2 3 4 5 6 7 |
jmsTemplate.convertAndSend( queue, map, new MessagePostProcessor() { public Message postProcessMessage( Message message ) throws JMSException { message.setIntProperty( "AccountID", 1000 ); message.setJMSCorrelationID( "123-00001" ); return message; } } ); |
通过JMX查看ActiveMQ队列状态,发现消息已经被消费。打开WARN级别的Spring日志,发现报方法找不到。配置的回调方法:
1 2 |
public void onP2PMessage( Message message ) throws Exception { } |
此问题的原因是,回调方法的参数必须是JMS消息的载荷。因此,对于TextMessage,回调参数类型应该是String。
Leave a Reply