ActiveMQ知识集锦
与Spring DMLC集成,进行持久化订阅时,会报此错误,报错的根源是:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
@Override public void setClientID(String newClientID) throws JMSException { checkClosedOrFailed(); // false if (this.clientIDSet) { throw new IllegalStateException("The clientID has already been set"); } // true if (this.isConnectionInfoSentToBroker) { // 不允许在“已经使用”的连接上执行设置clientID的操作 throw new IllegalStateException("Setting clientID on a used Connection is not allowed"); } this.info.setClientId(newClientID); this.userSpecifiedClientID = true; ensureConnectionInfoSent(); } |
可以看到,状态isConnectionInfoSentToBroker变为true后,就不能再设置ClientID,修改此字段值的,只有:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
protected void ensureConnectionInfoSent() throws JMSException { synchronized(this.ensureConnectionInfoSentMutex) { // Can we skip sending the ConnectionInfo packet?? if (isConnectionInfoSentToBroker || closed.get()) { return; } //TODO shouldn't this check be on userSpecifiedClientID rather than the value of clientID? if (info.getClientId() == null || info.getClientId().trim().length() == 0) { // 此生成器的结果具有随机性 info.setClientId(clientIdGenerator.generateId()); } syncSendPacket(info.copy()); // 这里修改了字段值 this.isConnectionInfoSentToBroker = true; ... } } |
断点跟踪,发现调用栈片断:
1 2 3 4 5 6 7 8 9 |
// ActiveMQConnection.java void ensureConnectionInfoSent(); Session createSession(boolean transacted, int acknowledgeMode); // ConnectionPool.java 这是对单个ActiveMQConnection的封装,允许多个Session共享之 Session makeSession(SessionKey key); ConnectionPool (Connection connection); // JmsTemplate.java send(final Destination destination, final MessageCreator messageCreator); // 业务代码略 |
也就是说,业务代码调用JmsTemplate发送JMS消息,导致isConnectionInfoSentToBroker为true且ClientID被设置。进一步分析发现:
- ConnectionPool只包含一个实际连接ActiveMQConnection,由多个Session共享
- 使用的ConnectionFactory:PooledConnectionFactory,仅仅持有一个ConnectionPool,也就是仅仅一个ActiveMQConnection
- AMQ的ClientId是在Collection级别设置的,只能在连接第一次使用前设置一次,这里由业务代码发起的JmsTemplate调用设置
- ConnectionPool自动生成的ClientID,具有随机性,无法用于持久化订阅。因为持久化订阅者的识别方式是ClientID + subscription name
因此,使用AMQ的情况下,要进行持久化订阅,应当为DMLC提供一个可控的ConnectionFactory,比如单独分配ConnectionFactory。
报错信息示例:
[WARN ] [triggerStartAsyncNetworkBridgeCreation: remoteBroker=tcp://Zircon/127.0.1.1:38592@47262, localBroker= vm://BrokerA#19200] 2017-08-08 12:21:21 org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:770)
Failed to add Connection BrokerA->BrokerB-32884-1502096377514-3780:1
javax.jms.InvalidClientIDException: Broker: BrokerA - Client: NC_BrokerA_BrokerB_inbound_BrokerA already connected from vm://BrokerA#8
这里的情况是,BrokerA 到BrokerB之间的通道对应了一个客户端,其ID已经被占用。原因可能是BrokerB代理意外终止导致。
解决办法:设置 TransportConnector的属性: tc.setAllowLinkStealing( true )
发现设置了异步启动 broker.setStartAsync( true )的代理,在构成代理网络上,容易出现偶发性的网络相关错误,并发生代理宕机重启后,某些Consumer不再收到消息的现象,关闭异步启动后收不到消息的现象消失。
可能相关的报错信息:
javax.jms.JMSException: peer (vm://BrokerB#3) stopped.
Caused by: org.apache.activemq.transport.TransportDisposedIOException: peer (vm://BrokerA#1) stopped.
javax.jms.IllegalStateException: The Consumer is closed
java.lang.NegativeArraySizeException
at org.apache.activemq.store.kahadb.disk.journal.DataFileAccessor.readRecord(DataFileAccessor.java:92)
服务非正常重启导致kahadb日志损坏,可以设置属性解决:
1 2 3 |
<persistenceAdapter> <kahaDB directory="D:/amq/datasync/kahadb" checkForCorruptJournalFiles="true" /> </persistenceAdapter> |
检查以下项目:
- 如果使用多播自动发现,检查有没有使用正确的网卡,参考:我的另一篇文章
- 保证代理名称的唯一性
现象:不能接收到任何组播(Multicast)消息
原因:可能是因为生产者操作系统内核支持IPv6,但是网络本身、消费者操作系统内核不支持IPv6。
解决:当操作系统内核支持IPv6时,JRE会默认使用IPv6,要改变此行为,可以设置JVM系统属性:
1 |
-Djava.net.preferIPv4Stack=true |
如下配置:
1 |
<transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/> |
uri中的端口指定为0,表示由ActiveMQ选择一个可用的端口。由于使用发现机制,限定端口没有必要。
代理、队列、网络连接器、链接器的名称,都可以通过API自由设定。
假设代理名称为BrokerA,当:
- 基于双向自动发现的网络连接器进行连接时,代理BrokerA通过NC_BrokerA连接,代理BrokerB通过NC_BrokerB连接,则BrokerB在BrokerA队列上的远程消费者的ClientID命名为:
- NC_BrokerA_BrokerB_inbound_BrokerA,这是由BrokerA双向网络连接(反向)产生的远程消费者
- NC_BrokerB_BrokerB_inbound_BrokerA,这是由BrokerB双向网络连接(正向)产生的远程消费者
- 基于单向自动发现的网络连接器进行连接时,代理BrokerA通过NC_BrokerA连接,代理BrokerB通过NC_BrokerB连接,则BrokerB在BrokerA队列上的远程消费者的ClientID命名为:
- NC_BrokerA_BrokerB_inbound_BrokerA,这是由BrokerA单向网络连接产生的远程消费者
- 基于双向静态网络连接器进行连接,代理BrokerB通过NC_BrokerB连接,则BrokerB在BrokerA队列上的远程消费者的ClientID命名为:
- NC_BrokerB_BrokerB_inbound_BrokerA
- 基于单向静态网络连接器进行连接,代理BrokerB通过NC_BrokerB连接,则BrokerB在BrokerA队列上没有消费者
单向网络连接,仅能用来发送消息,不能接收网络代理发来的消息。
Leave a Reply