基于JMS的ActiveMQ Java客户端示例
JMS生产者:
JMS消费者:
结合Spring使用,配置文件示例
监听器实现类示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
package cc.gmem.demo.amq.client; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { private ActiveMQConnectionFactory factory; public Producer( String brokerHost, int brokerPort ) { factory = new ActiveMQConnectionFactory( "tcp://" + brokerHost + ":" + brokerPort ); } public void sendMsgToQueue( String queueName, String msg ) throws JMSException { Connection connection = null; try { //如果代理启用身份验证,这里需要指定用户、密码参数 connection = factory.createConnection(); connection.start(); //创建会话时有事务和确认选项 Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE ); //如果代理上没有队列,则创建,否则,直接使用 Destination dest = session.createQueue( queueName ); //创建消息生产者 MessageProducer producer = session.createProducer( dest ); //设置消息是否持久化 producer.setDeliveryMode( DeliveryMode.NON_PERSISTENT ); //创建一个文本消息 TextMessage message = session.createTextMessage( msg ); producer.send( message ); } finally { connection.close(); } } } |
JMS消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
package cc.gmem.demo.amq.client; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Consumer { private static final Logger LOGGER = LoggerFactory.getLogger( Consumer.class ); private ActiveMQConnectionFactory factory; public Consumer( String brokerHost, int brokerPort ) { factory = new ActiveMQConnectionFactory( "tcp://" + brokerHost + ":" + brokerPort ); } public String reveiveMsgFromQueue( String queueName ) throws JMSException { Connection connection = null; try { //如果代理启用身份验证,这里需要指定用户、密码参数 connection = factory.createConnection(); connection.start(); //创建会话时有事务和确认选项,如果确认选项为:SESSION_TRANSACTED,则必须启用事务 Session session = connection.createSession( true, Session.SESSION_TRANSACTED ); //如果代理上没有队列,则创建,否则,直接使用 Destination dest = session.createQueue( queueName ); //创建消息生产者 MessageConsumer consumer = session.createConsumer( dest ); //等待队列里面有消息可消费,最多1秒,超时返回NULL long timeout = 1000; TextMessage msg = (TextMessage) consumer.receive( timeout ); String recMsg = msg == null ? null : msg.getText(); //如果设置:Session.CLIENT_ACKNOWLEDGE,则必须手工确认:msg.acknowledge() session.commit();//如果设置:Session.SESSION_TRANSACTED,则必须提交或者回滚 return recMsg; } finally { connection.close(); } } } |
结合Spring使用,配置文件示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.0.xsd "> <!-- 启用连接池支持的连接工厂 --> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>vm://localBroker</value> </property> </bean> </property> </bean> <!-- 以事件驱动的方式接收消息 --> <!-- tipsMessageListener即为监听器对象,通常实现JMS的MessageListener接口 --> <jms:listener-container container-type="default" connection-factory="jmsFactory" acknowledge="auto"> <jms:listener destination="TIPS.10000.BATCH" ref="tipsMessageListener" method="onMessage" /> </jms:listener-container> <!-- jmsTemplate通常用来发送消息 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jmsFactory" /> <property name="explicitQosEnabled" value="true" /> </bean> </beans> |
监听器实现类示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public void onMessage( Message message ) { try { TextMessage msg = (TextMessage) message; String text = msg.getText(); LOGGER.debug( "Received Message: \n" + text ); onMsgReceived( msgService.create( text ) ); } catch ( JMSException e ) { LOGGER.error( e.getMessage(), e ); } } |
Leave a Reply