基于JMS的ActiveMQ Java客户端示例
JMS生产者:
		
		
			
			
			
			
				
					
			
		
JMS消费者:
		
		
			
			
			
			
				
					
			
		
结合Spring使用,配置文件示例
		
		
			
			
			
			
				
					
			
		
监听器实现类示例
		
		
			
			
			
			
				
					
			
		
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50  | 
						package cc.gmem.demo.amq.client; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer {     private ActiveMQConnectionFactory factory;     public Producer( String brokerHost, int brokerPort )     {         factory = new ActiveMQConnectionFactory( "tcp://" + brokerHost + ":" + brokerPort );     }     public void sendMsgToQueue( String queueName, String msg ) throws JMSException     {         Connection connection = null;         try         {             //如果代理启用身份验证,这里需要指定用户、密码参数             connection = factory.createConnection();             connection.start();             //创建会话时有事务和确认选项             Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE );             //如果代理上没有队列,则创建,否则,直接使用             Destination dest = session.createQueue( queueName );             //创建消息生产者             MessageProducer producer = session.createProducer( dest );             //设置消息是否持久化             producer.setDeliveryMode( DeliveryMode.NON_PERSISTENT );             //创建一个文本消息             TextMessage message = session.createTextMessage( msg );             producer.send( message );         }         finally         {             connection.close();         }     } }  | 
					
JMS消费者:
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57  | 
						package cc.gmem.demo.amq.client; import javax.jms.Connection; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class Consumer {     private static final Logger       LOGGER = LoggerFactory.getLogger( Consumer.class );     private ActiveMQConnectionFactory factory;     public Consumer( String brokerHost, int brokerPort )     {         factory = new ActiveMQConnectionFactory( "tcp://" + brokerHost + ":" + brokerPort );     }     public String reveiveMsgFromQueue( String queueName ) throws JMSException     {         Connection connection = null;         try         {             //如果代理启用身份验证,这里需要指定用户、密码参数             connection = factory.createConnection();             connection.start();             //创建会话时有事务和确认选项,如果确认选项为:SESSION_TRANSACTED,则必须启用事务             Session session = connection.createSession( true, Session.SESSION_TRANSACTED );             //如果代理上没有队列,则创建,否则,直接使用             Destination dest = session.createQueue( queueName );             //创建消息生产者             MessageConsumer consumer = session.createConsumer( dest );             //等待队列里面有消息可消费,最多1秒,超时返回NULL             long timeout = 1000;             TextMessage msg = (TextMessage) consumer.receive( timeout );             String recMsg = msg == null ? null : msg.getText();             //如果设置:Session.CLIENT_ACKNOWLEDGE,则必须手工确认:msg.acknowledge()             session.commit();//如果设置:Session.SESSION_TRANSACTED,则必须提交或者回滚             return recMsg;         }         finally         {             connection.close();         }     } }  | 
					
结合Spring使用,配置文件示例
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31  | 
						<beans      xmlns="http://www.springframework.org/schema/beans"     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"      xmlns:jms="http://www.springframework.org/schema/jms"     xmlns:task="http://www.springframework.org/schema/task"     xsi:schemaLocation="         http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd         http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-3.0.xsd         http://www.springframework.org/schema/task  http://www.springframework.org/schema/task/spring-task-3.0.xsd   ">     <!-- 启用连接池支持的连接工厂 -->     <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">         <property name="connectionFactory">             <bean class="org.apache.activemq.ActiveMQConnectionFactory">                 <property name="brokerURL">                     <value>vm://localBroker</value>                 </property>             </bean>         </property>     </bean>     <!-- 以事件驱动的方式接收消息 -->     <!--  tipsMessageListener即为监听器对象,通常实现JMS的MessageListener接口 -->     <jms:listener-container container-type="default" connection-factory="jmsFactory" acknowledge="auto">         <jms:listener destination="TIPS.10000.BATCH" ref="tipsMessageListener" method="onMessage" />     </jms:listener-container>     <!-- jmsTemplate通常用来发送消息 -->     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">         <property name="connectionFactory" ref="jmsFactory" />         <property name="explicitQosEnabled" value="true" />     </bean> </beans>  | 
					
监听器实现类示例
| 
					 1 2 3 4 5 6 7 8 9 10 11 12 13 14  | 
						public void onMessage( Message message ) {     try     {         TextMessage msg = (TextMessage) message;         String text = msg.getText();         LOGGER.debug( "Received Message: \n" + text );         onMsgReceived( msgService.create( text ) );     }     catch ( JMSException e )     {         LOGGER.error( e.getMessage(), e );     } }  | 
					
            
Leave a Reply