基于CMS接口的ActiveMQ CPP客户端示例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
#ifndef AMQUTILS_H_ #define AMQUTILS_H_ #include <cms/Connection.h> #include <cms/Session.h> #include <decaf/lang/Exception.h> using namespace decaf::lang; using namespace cms; namespace amqutils { inline void closeQuitely( Connection* conn, Session* session ) { if ( session ) try { session->close(); } catch ( Exception& e ) { } if ( conn ) try { conn->close(); } catch ( Exception& e ) { } } } #endif |
生产者头文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
#ifndef PRODUCER_H_ #define PRODUCER_H_ #include <activemq/core/ActiveMQConnectionFactory.h> #include <boost/shared_ptr.hpp> #include <stdlib.h> #include <stdio.h> using namespace activemq; using namespace activemq::core; using namespace decaf; using namespace decaf::lang; using namespace decaf::util; using namespace decaf::util::concurrent; using namespace cms; using namespace std; using namespace boost; class Producer { private: boost::shared_ptr<ActiveMQConnectionFactory> amqf; Connection* connection; Session* session; public: Producer( string& uri, string& userName, string& password ); Producer( boost::shared_ptr<ActiveMQConnectionFactory> amqf ); virtual ~Producer(); virtual void init(); virtual void send( string& queueName, string& msg ); virtual void pub( string& topicName, string& msg ); }; #endif |
生产者实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
#include "Producer.h" #include <cms/Connection.h> #include <boost/scoped_ptr.hpp> #include "amqutils.h" #include <slf4cmacros.h> Producer::Producer( string& uri, string& userName, string& password ) : connection( NULL ), session( NULL ) { amqf = shared_ptr<ActiveMQConnectionFactory>( new ActiveMQConnectionFactory( uri, userName, password ) ); } Producer::Producer( shared_ptr<ActiveMQConnectionFactory> amqf ) : amqf( amqf ), connection( NULL ), session( NULL ) { } Producer::~Producer() { SLF_DEBUG( "准备销毁ActiveMQ连接、会话" ); amqutils::closeQuitely( connection, session ); delete session; delete connection; } void Producer::init() { SLF_DEBUG( "准备创建ActiveMQ连接" ); connection = amqf->createConnection(); SLF_DEBUG( "准备创建ActiveMQ会话" ); session = connection->createSession(); } void Producer::send( string& queueName, string& msg ) { SLF_DEBUG( "准备发送ActiveMQ消息到队列" + queueName ); scoped_ptr<Destination> dest( session->createQueue( queueName ) ); scoped_ptr<MessageProducer> producer( session->createProducer( dest.get() ) ); scoped_ptr<TextMessage> message( session->createTextMessage( msg ) ); producer->send( message.get() ); } void Producer::pub( string& topicName, string& msg ) { SLF_DEBUG( "准备发布ActiveMQ消息到主题" + topicName ); scoped_ptr<Destination> dest( session->createTopic( topicName ) ); scoped_ptr<MessageProducer> producer( session->createProducer( dest.get() ) ); scoped_ptr<TextMessage> message( session->createTextMessage( msg ) ); producer->send( message.get() ); } |
消费者头文件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 |
#ifndef CONSUMER_H_ #define CONSUMER_H_ #include <activemq/core/ActiveMQConnectionFactory.h> #include <boost/shared_ptr.hpp> #include <stdlib.h> #include <stdio.h> #include <boost/ptr_container/ptr_map.hpp> #include <boost/ptr_container/ptr_vector.hpp> using namespace activemq; using namespace activemq::core; using namespace decaf; using namespace decaf::lang; using namespace decaf::util; using namespace decaf::util::concurrent; using namespace cms; using namespace std; using namespace boost; class Consumer { private: boost::shared_ptr<ActiveMQConnectionFactory> amqf; Connection* connection; Session* session; ptr_map<string, MessageConsumer*> consumers; ptr_vector<MessageListener> listeners; public: Consumer( string& uri, string& userName, string& password ); Consumer( boost::shared_ptr<ActiveMQConnectionFactory> amqf ); virtual ~Consumer(); virtual void init(); string receive( string& queueName, int timeout ); void listen( string queueName, void (*listener)( TextMessage* ) ); }; #endif |
消费者实现,注意阻塞等待receive、监听器listen两种消费方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
#include "Consumer.h" #include <cms/Connection.h> #include <boost/scoped_ptr.hpp> #include "amqutils.h" #include <slf4cmacros.h> #include <boost/lexical_cast.hpp> Consumer::Consumer( string& uri, string& userName, string& password ) : connection( NULL ), session( NULL ) { amqf = boost::shared_ptr<ActiveMQConnectionFactory>( new ActiveMQConnectionFactory( uri, userName, password ) ); } Consumer::Consumer( boost::shared_ptr<ActiveMQConnectionFactory> amqf ) : amqf( amqf ), connection( NULL ), session( NULL ) { } Consumer::~Consumer() { SLF_DEBUG( "准备销毁ActiveMQ连接、会话" ); amqutils::closeQuitely( connection, session ); delete session; delete connection; } void Consumer::init() { SLF_DEBUG( "准备创建ActiveMQ连接" ); connection = amqf->createConnection(); connection->start(); //必须,否则不会触发监听 SLF_DEBUG( "准备创建ActiveMQ会话" ); session = connection->createSession(); } string Consumer::receive( string& queueName, int timeout ) { { string msg = "准备等待队列" + queueName + "并接受一条消息,超时为" + lexical_cast<string, int>( timeout ) + "秒"; SLF_DEBUG( msg ); } scoped_ptr<Destination> dest( session->createQueue( queueName ) ); scoped_ptr<MessageConsumer> consumer( session->createConsumer( dest.get() ) ); Message* message = consumer->receive( timeout * 1000 ); if ( message ) { TextMessage* tm = dynamic_cast<TextMessage*>( message ); string content( tm->getText() ); delete message; return content; } else { throw CMSException( "在超过时间限制之前没有获取到消息." ); } } class PlainMessageListener : public MessageListener { public: typedef void (*ListenerCallback)( TextMessage* ); virtual void onMessage( const Message* message ); PlainMessageListener( ListenerCallback callback ) : listenerCallback( callback ) { } ~PlainMessageListener() { SLF_DEBUG( "准备销毁监听器..." ); } private: ListenerCallback listenerCallback; }; void PlainMessageListener::onMessage( const Message* message ) { const TextMessage* msg = dynamic_cast<const TextMessage*>( message ); listenerCallback( const_cast<TextMessage*>( msg ) ); } void Consumer::listen( string queueName, void (*listener)( TextMessage* ) ) { scoped_ptr<Destination> dest( session->createQueue( queueName ) ); MessageConsumer* consumer( session->createConsumer( dest.get() ) ); if ( consumers.count( queueName ) == 1 ) { throw CMSException( "当前消费者已经在监听队列:" + queueName ); } else { consumers[queueName] = consumer; SLF_DEBUG( "准备创建监听器..." ); MessageListener* pml = new PlainMessageListener( listener ); consumer->setMessageListener( pml ); listeners.push_back( pml ); } } |
主函数代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
#include "Producer.h" #include "Consumer.h" #include <activemq/library/ActiveMQCPP.h> #include <boost/filesystem.hpp> #include <boost/thread.hpp> #include <boost/bind.hpp> #include <boost/lexical_cast.hpp> #include <boost/date_time/posix_time/posix_time_duration.hpp> #include <slf4cmacros.h> const char* QUEUE_NAME = "DEMO.Q2"; using namespace std; void producerRunner( boost::shared_ptr<ActiveMQConnectionFactory> amqf ) { Producer p( amqf ); SLF_DEBUG( "准备初始化生产者" ); p.init(); string queneName( QUEUE_NAME ); SLF_DEBUG( "准备向:" + queneName + "发送消息" ); for ( int i = 0; i < 10; i++ ) { string content( "Hello C++, " + lexical_cast<string, int>( i ) ); p.send( queneName, content ); SLF_DEBUG( "已发送:" + content + ",当前线程将休眠" ); this_thread::sleep( posix_time::seconds( 5 ) ); } } void consumerListener( TextMessage* msg ) { SLF_DEBUG( "监听到消息,内容为:" + msg->getText() ); } void consumerRunner( boost::shared_ptr<ActiveMQConnectionFactory> amqf ) { Consumer c( amqf ); SLF_DEBUG( "准备初始化消费者" ); c.init(); string queneName( QUEUE_NAME ); SLF_DEBUG( "准备监听队列:" + queneName ); c.listen( queneName, &consumerListener ); SLF_DEBUG( "当前线程将休眠60秒,以等待消息" ); this_thread::sleep( posix_time::seconds( 60 ) ); } int main() { SLF_DEBUG( "初始化ActiveMQ-CPP运行时库" ); activemq::library::ActiveMQCPP::initializeLibrary(); { boost::shared_ptr<ActiveMQConnectionFactory> amqf( new ActiveMQConnectionFactory( "tcp://localhost:61616" ) ); thread_group g; SLF_DEBUG( "准备发动生产者线程" ); g.create_thread( bind( &producerRunner, amqf ) ); SLF_DEBUG( "准备发动消费者线程" ); g.create_thread( bind( &consumerRunner, amqf ) ); SLF_DEBUG( "等待线程子线程结束..." ); g.join_all(); } SLF_DEBUG( "销毁ActiveMQ-CPP运行时库" ); activemq::library::ActiveMQCPP::shutdownLibrary(); } |
Leave a Reply