ActiveMQ代理网络无法连接的问题一例
环境说明:ActiveMQ 5.10.0,Windows Server 2008 R2。
最近开发人员在做基于ActiveMQ的数据同步测试时,其中的一台AMQ服务器一直无法连接到代理网络中,久久找不到原因。
我检查了一下配置,各ActiveMQ服务器使用多播地址:multicast://default进行自动发现,配置文件很简单,看不出什么问题。
通过jConsole连接:
发现问题服务器上networkConnectors里面没有任何内容。
开启DEBUG级别的ActiveMQ日志,没有发现和建立代理网络连接有关的任何日志,估计问题服务器的网络存在问题,使用iperf监测网络上的多播数据报:
在问题服务器上,上述脚本没有任何输出,其它服务器上则不断收到多播数据报。并且,问题服务器上的绑定地址不是我们内网的192.168.0网段。打开网络连接查看,该IP是Vmware虚拟网卡的地址,尝试禁用虚拟网卡,重启后,一切恢复正常。考虑问题和多网卡有关——ActiveMQ在进行多播时,使用了错误的网卡。启用Vmware虚拟网卡并重启,然后打开MulticastDiscoveryAgent的TRACE日志:
监测到以下日志:
可以看到,多播的地址、端口、组都是正确的,后面还有一些interface、network interface、join network interface,看着好像和网卡设置有关,查看MulticastDiscoveryAgent的源代码:
可以看到,ActiveMQ对JDK标准类java.net.MulticastSocket的实例mcast进行了网卡相关的设置,相关方法的用途如下:
MulticastDiscoveryAgent根据自身的属性配置情况来调用上述的方法,但是在ActiveMQ官方文档中,并没有找到interface、networkInterface等属性如何在XML中配置的说明,跟踪MulticastDiscoveryAgent的创建过程:
从第14行可以看到,ActiveMQ会解析URI中的所有参数,并利用反射机制设置到MulticastDiscoveryAgent对象中,因此,只需要将需要设置的属性附到URI参数中即可:
至此,问题解决。
总结:ActiveMQ的自动代理发现机制比较方便、灵活,但是受客户环境影响较大(网络结构、防火墙等),如果AMQ代理的IP地址固定或基本不会增减,可以考虑使用静态的代理网络。
最近开发人员在做基于ActiveMQ的数据同步测试时,其中的一台AMQ服务器一直无法连接到代理网络中,久久找不到原因。
我检查了一下配置,各ActiveMQ服务器使用多播地址:multicast://default进行自动发现,配置文件很简单,看不出什么问题。
通过jConsole连接:
1 |
service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi |
发现问题服务器上networkConnectors里面没有任何内容。
开启DEBUG级别的ActiveMQ日志,没有发现和建立代理网络连接有关的任何日志,估计问题服务器的网络存在问题,使用iperf监测网络上的多播数据报:
1 2 3 4 5 6 7 |
iperf -s -u -B 239.255.2.3 -p 6155 -i 1 ------------------------------------------------------------ Server listening on UDP port 6155 Binding to local address 192.168.7.130 Receiving 1470 byte datagrams UDP buffer size: 8.00 KByte (default) ------------------------------------------------------------ |
在问题服务器上,上述脚本没有任何输出,其它服务器上则不断收到多播数据报。并且,问题服务器上的绑定地址不是我们内网的192.168.0网段。打开网络连接查看,该IP是Vmware虚拟网卡的地址,尝试禁用虚拟网卡,重启后,一切恢复正常。考虑问题和多网卡有关——ActiveMQ在进行多播时,使用了错误的网卡。启用Vmware虚拟网卡并重启,然后打开MulticastDiscoveryAgent的TRACE日志:
1 |
log4j.logger.org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgent=TRACE,DataSync |
监测到以下日志:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
[TRACE] [Thread-1] 2014-09-16 11:16:40 org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgent.start(MulticastDiscoveryAgent.java:290) start - discoveryURI = multicast://default [TRACE] [Thread-1] 2014-09-16 11:16:40 org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgent.start(MulticastDiscoveryAgent.java:302) start - myHost = 239.255.2.3 [TRACE] [Thread-1] 2014-09-16 11:16:40 org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgent.start(MulticastDiscoveryAgent.java:303) start - myPort = 6155 [TRACE] [Thread-1] 2014-09-16 11:16:40 org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgent.start(MulticastDiscoveryAgent.java:304) start - group = default [TRACE] [Thread-1] 2014-09-16 11:16:40 org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgent.start(MulticastDiscoveryAgent.java:305) start - interface = null [TRACE] [Thread-1] 2014-09-16 11:16:40 org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgent.start(MulticastDiscoveryAgent.java:306) start - network interface = null [TRACE] [Thread-1] 2014-09-16 11:16:40 org.apache.activemq.transport.discovery.multicast.MulticastDiscoveryAgent.start(MulticastDiscoveryAgent.java:307) start - join network interface = null |
可以看到,多播的地址、端口、组都是正确的,后面还有一些interface、network interface、join network interface,看着好像和网卡设置有关,查看MulticastDiscoveryAgent的源代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 |
public void start() throws Exception { if (started.compareAndSet(false, true)) { if (group == null || group.length() == 0) { throw new IOException("You must specify a group to discover"); } String type = getType(); if (!type.endsWith(".")) { LOG.warn("The type '" + type + "' should end with '.' to be a valid Discovery type"); type += "."; } if (discoveryURI == null) { discoveryURI = new URI(DEFAULT_DISCOVERY_URI_STRING); } if (LOG.isTraceEnabled()) LOG.trace("start - discoveryURI = " + discoveryURI); String myHost = discoveryURI.getHost(); int myPort = discoveryURI.getPort(); if( DEFAULT_HOST_STR.equals(myHost) ) myHost = DEFAULT_HOST_IP; if(myPort < 0 ) myPort = DEFAULT_PORT; if (LOG.isTraceEnabled()) { LOG.trace("start - myHost = " + myHost); LOG.trace("start - myPort = " + myPort); LOG.trace("start - group = " + group ); LOG.trace("start - interface = " + mcInterface ); LOG.trace("start - network interface = " + mcNetworkInterface ); LOG.trace("start - join network interface = " + mcJoinNetworkInterface ); } this.inetAddress = InetAddress.getByName(myHost); this.sockAddress = new InetSocketAddress(this.inetAddress, myPort); mcast = new MulticastSocket(myPort); mcast.setLoopbackMode(loopBackMode); mcast.setTimeToLive(getTimeToLive()); if (mcJoinNetworkInterface != null) { mcast.joinGroup(sockAddress, NetworkInterface.getByName(mcJoinNetworkInterface)); } else { mcast.joinGroup(inetAddress); } mcast.setSoTimeout((int)keepAliveInterval); if (mcInterface != null) { mcast.setInterface(InetAddress.getByName(mcInterface)); } if (mcNetworkInterface != null) { mcast.setNetworkInterface(NetworkInterface.getByName(mcNetworkInterface)); } runner = new Thread(this); runner.setName(this.toString() + ":" + runner.getName()); runner.setDaemon(true); runner.start(); doAdvertizeSelf(); } } |
可以看到,ActiveMQ对JDK标准类java.net.MulticastSocket的实例mcast进行了网卡相关的设置,相关方法的用途如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
/** * 加入指定网络接口上的多播组。 * @param mcastaddr 准备加入的多播地址 * @param netIf 指定用于接收多播数据报的本地网络接口。 * 如果不指定,将使用setInterface(InetAddress)、setNetworkInterface(NetworkInterface)所指定的接口 */ public void joinGroup(SocketAddress mcastaddr, NetworkInterface netIf) throws IOException; /** * 设置相关受影响方法使用的多播网络接口,对于多重网络的主机有意义 * @param inf IP地址对象 */ public void setInterface(InetAddress inf) throws SocketException; /** * 设置发送多播数据报使用的网络接口 * @param netIf 网络接口对象 */ public void setNetworkInterface(NetworkInterface netIf) throws SocketException; |
MulticastDiscoveryAgent根据自身的属性配置情况来调用上述的方法,但是在ActiveMQ官方文档中,并没有找到interface、networkInterface等属性如何在XML中配置的说明,跟踪MulticastDiscoveryAgent的创建过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
protected DiscoveryAgent doCreateDiscoveryAgent(URI uri) throws IOException { try { if (LOG.isTraceEnabled()) { LOG.trace("doCreateDiscoveryAgent: uri = " + uri.toString()); } MulticastDiscoveryAgent mda = new MulticastDiscoveryAgent(); mda.setDiscoveryURI(uri); // allow MDA's params to be set via query arguments // (e.g., multicast://default?group=foo Map options = URISupport.parseParameters(uri); IntrospectionSupport.setProperties(mda, options); return mda; } catch (Throwable e) { throw IOExceptionSupport.create("Could not create discovery agent: " + uri, e); } } |
从第14行可以看到,ActiveMQ会解析URI中的所有参数,并利用反射机制设置到MulticastDiscoveryAgent对象中,因此,只需要将需要设置的属性附到URI参数中即可:
1 2 3 4 5 6 7 8 |
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="dataSyncBroker" dataDirectory="D:/amq/datasync/data"> <networkConnectors> <networkConnector uri="multicast://default?joinNetworkInterface=net13&networkInterface=net13&interface=192.168.0.89" dynamicOnly="true" networkTTL="3" prefetchSize="1" decreaseNetworkConsumerPriority="true" /> </networkConnectors> <transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" discoveryUri="multicast://default" /> </transportConnectors> |
至此,问题解决。
总结:ActiveMQ的自动代理发现机制比较方便、灵活,但是受客户环境影响较大(网络结构、防火墙等),如果AMQ代理的IP地址固定或基本不会增减,可以考虑使用静态的代理网络。
Leave a Reply