ActiveMQ学习笔记
中间件是一类连接软件组件和应用的计算机软件,它包括一组服务。 以便于运行在一台或多台机器上的多个软件通过网络进行交互。该技术所提供 的互操作性,推动了一致分布式体系架构的演进,通常用于支持并简化 那些复杂的分布式应用程序。
所谓面向消息的中间件(Message-orientedMiddleware,MOM) ,它的基本功能是:将信息以消息的形式,从一个应用程序传送到另一个或多个应用程序。其主要特征包括:
- 消息以异步的方式发送和接收,消息发送者不需要等待消息接受者的响应
- 可靠传输,数据不能丢失,有的时候,也会要求不能重复传输
- 事务性以及分布式事务(XA)支持,可以和其他MOM、数据库等资源进行事 务性支持,确保操作的原子性
产品名称 | 开发者 | 授权 | 特点 |
IBM WebSphereMQ | IBM | 商业 | 稳定性高,市场占有率高,与 WebSphereMessageBroker紧密结合, 基于Eclipse的管理界面,多语言客户 端 |
ActiveMQ | Apache | 开源 | 使用最多的开源MOM,可拔插的持久化 方案,多种传输机制(虚拟机内、TCP、 UDP、HTTP、SSL等)多种传输协议 (Openwire、AMQP、REST、STOMP、 XMPP等),多语言客户端,简单的集群 和故障转移配置,嵌入JVM |
Java消息服务(Java Message Service,JMS)定义了Java中访问消息中间件的 接口。JMS只是接口,并没有给予实现,实现JMS接口的消息中间件称为 JMS Provider,例如ActiveMQ。
概念 | 说明 |
JMSProvider | 实现JMS接口的消息中间件 |
PTP | 点对点的消息模型 |
Pub/Sub | 发布/订阅的消息模型 |
Destination | 目标,消息的目的地 |
Queue | 队列目标 |
Topic | 主题目标 |
ConnectionFactory | 连接工厂,JMS用它创建连接 |
Connection | JMS客户端到JMSProvider的连接 |
Session | 会话,一个发送或接收消息的线程持有 |
MessageProducer | 由Session对象创建的用来发送消息的对象 |
MessageConsumer | 由Session对象创建的用来接收消息的对象 |
Acknowledge | 签收,即MessageConsumer确认消息收妥的操作 |
Transaction | 事务 |
在JMS编程模型中,JMS客户端(组件或应用程序)通过JMS消息服务交换消息。 消息生产者将消息发送至消息服务,消息消费者则从消息服务接收这些消息。
这些消息的传送操作是使用一组实现JMS应用编程接口(API)的对象(由 JMSProvide提供)来执行的。
在JMS编程模型中JMS客户端使用 ConnectionFactory对象创建一个连接,向消息服务发送消息以及从消息服务接收消息均是通过此连接来进行。
Connection是客户端与消息服务的活动连接。 创建连接时,将分配通信资源以及验证客户端。这是一个相当重要的对象,大多数客户端均使用一个连接来进行所有的消息传送。连接用于创建会话。
Session是一个用于生成和使用消息的单线程上下文。它用于创建发送的生产 者和接收消息的消费者,并为所发送的消息定义发送顺序。会话通过大量确认选项或通过事务来支持可靠传送。
客户端使用MessageProducer向指定的物理目标发送消息。生产者可指定一个默认传送模式(持久性消息与非持久性消息)、优先级和有效期值,以控制生产者向物 理目标发送的所有消息。
同样,客户端使用MessageConsumer对象从指定的物 理目标(在API中表示为目标对象)接收消息。消费者可使用消息选择器,借助它,消息服务可以只向消费者发送与选择标准匹配的那些消息。消费者可以支持同步或异步消息接收。异步使用可通过向消费者注册MessageListener来 实现。当会话线程调用MessageListener对象的onMessage方法时,客户端将使用消息。
消息从一个生产者传送至一个消费者。在此传送模型中,目标是一个队列。消息首先被传送至队列目标,然后根据队列传送策略,从该队列将消息传送至向 此队列进行注册的某一个消费者,一次只传送一条消息。可以向队列目标发送 消息的生产者的数量没有限制,但每条消息只能发送至、并由一个消费者成功 使用。如果没有已经向队列目标注册的消费者,队列将保留它收到的消息,并 在某个消费者向该队列进行注册时将消息传送给该消费者。
消息从一个生产者传送至任意数量的消费者。在此传送模型中,目标是一个主 题。消息首先被传送至主题目标,然后传送至所有已订阅此主题的活动消费 者。可以向主题目标发送消息的生产者的数量没有限制,并且每个消息可以发 送至任意数量的订阅消费者。主题目标也支持持久订阅的概念。持久订阅表示 消费者已向主题目标进行注册,但在消息传送时此消费者可以处于非活动状态。 当此消费者再次处于活动状态时,它将接收此信息。
- 创建连接使用的工厂类JMS ConnectionFactory
- 使用管理对象 JMS ConnectionFactory 建立连接 Connection
- 使用连接Connection建立会话Session
- 使用会话Session和管理对象Destination创建消息生产者 MessageSender
- 使用消息生产者MessageSender发送消息
- 创建连接使用的工厂类JMS ConnectionFactory
- 使用管理对象 JMS ConnectionFactory 建立连接 Connection
- 使用连接Connection建立会话Session
- 使用会话Session和管理对象Destination创建消息消费者MessageReceiver
- 使用消息消费者MessageReceiver接受消息setMessageListener 将 MessageListener 接口绑定到 MessageReceiver
JMS消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:
- 客户接收消息
- 客户处理消息
- 消息被确认
在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话 中,消息何时被确认取决于创建会话时的签收模式(acknowledgement mode)。 该参数有以下三个可选值:
- Session.AUTO_ACKNOWLEDGE 当客户成功的从receive方法返回的时候或者从MessageListener.onMessage方法成功返回的时候,会话自动确认
- Sessiion.TRANSACTION 用session.commit()进行签收
- Session.CLIENT_ACKNOWLEDGE 客户通过消息的acknowledge方法确认消息
基本类型 | 点对点模式 | 发布/订阅模式 |
ConnectionFactory | QueueConnectionFactory | TopicConnectionFactory |
Connection | QueueConnection | TopicConnection |
Session | QueueSession | TopicPublisher |
Destination | QueueSession | Topic |
MessageProducer | QueueSender | |
MessageConsumer | QueueReceiver,QueueBrowseer | TopicSubscriber |
生产者在发送消息前,可以设置一系列的JMS消息头:
消息头 | 自动 | 说明 |
JMSDestination | 是 | 消息发送的目的地:主要是指Queue和Topic |
JMSDeliveryMode | 是 | 传送模式有两种模式 :持久模式和非持久模式 |
JMSExpiration | 是 | 消息过期时间,等于 Destination 的send 方法中的timeToLive值加 上发送时刻的GMT 时间值。如果timeToLive 值等于零,则 JMSExpiration 被设为零,表示该消息永不过期。如果发送后,在消 息过期时间之后消息还没有被发送到目的地,则该消息被清除 |
JMSPriority | 是 | 消息优先级,从 0-9 十个级别,0-4 是普通消息,5-9 是加急消息。 JMS 不要求JMS Provider 严格按照这十个优先级发送消息,但必须 保证加急消息要先于普通消息到达。默认是4级 |
JMSMessageID | 是 | 唯一识别每个消息的标识,由JMS Provider 产生 |
JMSTimestamp | 是 | 消息时间戳 |
JMSCorrelationID | 否 | 用来连接到另外一个消息,典型的应用是在回复消息中连接到原消息 在大多数情况下,JMSCorrelationID用于将一条消息标记为对 JMSMessageID标示的上一条消息的应答,不过,JMSCorrelationID可 以是任何值,不仅仅是JMSMessageID |
JMSReplyTo | 否 | 提供本消息回复消息的目标地址,目标通常是一个临时队列 |
JMSType | 否 | 消息类型的识别符 |
JMSRedelivered | 是 | 如果一个客户端收到一个设置了JMSRedelivered属性的消息,则表示 可能客户端曾经在早些时候收到过该消息,但并没有签收 (acknowledged) |
消息类型 | 说明 |
TextMessage | java.lang.String 对象,如xml 文件内容 |
MapMessage | 名/值对的集合,名是String 对象,值类型可以是Java 任何基本类型 |
BytesMessage | 字节流 |
StreamMessage | Java 中的输入输出流 |
ObjectMessage | 串行化Java对象并传输,注意不兼容其它编程语言 |
Message | 没有消息体,只有消息头和属性 |
- 应用程序特定的属性。例如:
12TextMessage msg = session.createTextMessage();msg.setStringProperty( "username", username ); - JMS定义的属性:包含一系列JMSX开头的属性,具体请参考JMS规范
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( "tcp://brokerHost:brokerPort" ); Connection connection = null; try { //如果代理启用身份验证,这里需要指定用户、密码参数 connection = factory.createConnection(); connection.start(); //创建会话时有事务和确认选项 Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE ); //如果代理上没有队列,则创建,否则,直接使用 Destination dest = session.createQueue( queueName ); //创建消息生产者 MessageProducer producer = session.createProducer( dest ); //设置消息是否持久化 producer.setDeliveryMode( DeliveryMode.NON_PERSISTENT ); //创建一个文本消息 TextMessage message = session.createTextMessage( msg ); //设置消息属性 message.setIntProperty( "id", 1 ); producer.send( message ); } finally { connection.close(); } |
1 2 3 4 5 6 7 8 |
//如果代理上没有主题,则创建,否则,直接使用 Destination dest = session.createTopic( topicName ); //创建消息生产者 MessageProducer producer = session.createProducer( dest ); //设置消息是否持久化 producer.setDeliveryMode( DeliveryMode.NON_PERSISTENT ); //创建一个文本消息 TextMessage message = session.createTextMessage(msg); producer.send( message ); session.commit(); |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); Properties props = new Properties(); // 设置预读取属性 props.setProperty( "prefetchPolicy.queuePrefetch", "1000" ); props.setProperty( "prefetchPolicy.queueBrowserPrefetch", "500" ); props.setProperty( "prefetchPolicy.durableTopicPrefetch", "100" ); props.setProperty( "prefetchPolicy.topicPrefetch", "32766" ); cf.setProperties( props ); Connection connection = null; try { connection = cf.createConnection(); connection.start(); //创建会话时有事务和确认选项,如果确认选项为:SESSION_TRANSACTED,则必须启用事务 Session session = connection.createSession( true, Session.SESSION_TRANSACTED ); //如果代理上没有队列,则创建,否则,直接使用 Destination dest = session.createQueue( queueName ); //创建消息生产者 MessageConsumer consumer = session.createConsumer( dest ); //等待队列里面有消息可消费,最多1秒,超时返回NULL long timeout = 1000; TextMessage msg = (TextMessage) consumer.receive( timeout ); String recMsg = msg == null ? null : msg.getText(); //如果设置:Session.CLIENT_ACKNOWLEDGE,则必须手工确认:msg.acknowledge() session.commit();//如果设置:Session.SESSION_TRANSACTED,则必须提交或者回滚 return recMsg; } finally { connection.close(); } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
public void subscribeTo1pic( String topicName, final Callback<Void, String> cb ) throws JMSException { Connection connection = null; try { connection = factory.createConnection(); connection.start(); Session session = connection.createSession( false, Session.CLIENT_ACKNOWLEDGE ); Destination dest = session.createTopic( topicName ); MessageConsumer consumer = session.createConsumer( dest ); //注意:MessageListener亦适用于队列 consumer.setMessageListener( new MessageListener() { @Override public void onMessage( Message message ) { TextMessage msg = (TextMessage) message; try { cb.callback( msg.getText() ); msg.acknowledge(); } catch ( Throwable t ) { LOGGER.error( "Failed to Process message: " + t.getMessage(), t ); } } } ); } finally { connection.close(); } } |
ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。
- 多种语言编写客户端:Java, C, C++, C#,Ruby, Perl, Python, PHP
- 支持多种应用协议:OpenWire、Stomp REST、WS Notification、XMPP、AMQP
- 完全支持JMS1.1和J2EE1.4规范(持久化,XA消息,事务)。支持分布式事务
- 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里
- 一支持多种传送协议:in-VM、TCP、SSL、NIO、UDP、JGroups、JXTA
- 支持通过JDBC和journal提供高速的消息持久化
- 支持多种集群特征:故障转移(主/从结构,HA)、ActiveMQ网络集群
- 支持自动发现:基于组播或者ZeroConf技术,可以自动化的建立ActiveMQ 集群(Network of Brokers,代理网络),或者(客户端)连接到ActiveMQ
- 与开源ESB产品例如Camel紧密集成
- 安全性保证:ActiveMQ支持基于SSL的传输,在广域网上保证加密的传输,结 合数字证书,可以保证双向身份验证
即一个运行中的ActiveMQ实例,该实例可以独立运行,也可以嵌入在已有的 JVM中运行。从这个名字也可以看出基于ActiveMQ的消息系统的运行方式。
表示ActiveMQ使用的消息传递机制,这是一个混合的概念,不是简单的指网络 协议或者应用协议。用于将ActiveMQ实例暴露给客户端,或者其它代理以构建代理网络。常用 的传输包括:
传输名称 | URI前缀 | 端口 | 说明 |
TCP | tcp | 61616 | 传统的TCP套接字 |
NIO | nio | 61618 | 需要更少的线程,提供更好的性能 |
UDP | udp | 防火墙,低延迟 | |
SSL | ssl | 提供加密传输 | |
HTTP/HTTPS | http(s) | 可以供基于浏览器的客户端使用 | |
VM Protocol | vm |
允许在JVM内部通信,从而避免了网络传输的开销 同一JVM中的其它代码可以直接使用该传输,无需额外配置 |
在配置传输的同时,可选的可以指定在其上使用的线路协议。Openwire协议最常用,高效、可靠。
ActiveMQ使用一个叫发现代理(Discovery Agent)的机制,来用于发现远程 服务,例如远程代理。使用此机制,可以:
- 让客户端自动连接到代理
- 构建 代理的网络。
目前,有两种类型的发现代理:
- Multicast:ActiveMQ内置的一种发现代理,可以定位代理的URI列表。参考本文稿后续的 Discovery传输
- Zeroconf:Zeroconf是一种标准,基于UDP/multicast,ActiveMQ使用的是jmDNS库,该库是Zeroconf标准的实现
支持消费者的高可用性、负载均衡:
- 如果一个Consumer死掉,该消息会转发到其它的Consumer消费的Queue上
- 如果一个Consumer获得消息比其 它Consumer快,那么他将获得更多的消息
- 如果一个Conseumer消费缓慢,则 其它Consumer会替换它
如果一个Broker死掉了,Client可以自动链接到其它Broker 上(可以使用故障转移传输failover:// 来配置客户端)。
构建代理集群时,可以使用:
- 静态发现,静态指定其它代理的列表
- 动态发现,使用动态发现(组播、零配置)机制自动获取其它代理
可以配置两个Broker的主从关系,并保证二者的状态完全一致。这种方式主要 用于提供高可用性。
ActiveMQ实例(代理)到实例的通信,是跨地域、跨网络的消息可靠传输的关键。通常的部署方式是,在每个局域网设置一个代理,代理与代理在广域网 之间进行交互,客户端与局域网内的代理交互——这样的方式尽量减少广域网 上的流量和连接数。
代理网络是以通道的形式将一个Broker和其他的Broker链接起来通信。 代理网络默认是单向的:一个Broker在一端发送消息,另一Broker在另一端接 收消息,这就是所谓的“桥接”。
在ActiveMQ 5.x,也可以创建一个双向(duplex)的通道对于两个Broker。他将不仅发送消息而且也能从相同的通道来接收消息。 双向通道的配置,会自动传递到对方代理上。
允许用一个虚拟的destination 代表多个destinations。例如可以通过 composite destinations在一个操作中同时向12个queue发送消息。
用来支持联合的名字分层体系(federated name hierarchies)。ActiveMQ支持以下三种通配符:
- .用作路径上名字间的分隔符
- *用于匹配任意字符,但是不能跨越点号
- >用于匹配任意字符,可以跨越点号
考虑一个报价应用的主题命名,需要区分商品类别、市场名称、商品名称:
主题 | 说明 |
PRICE.> | 任何商品在任何交易市场的价格 |
PRICE.STOCK.> | 任何市场的股票价格 |
PRICE.STOCK.NASDAQ.* | 纳斯达克市场任何股票的价格 |
PRICE.STOCK.*.IBM | IBM的股票在任何市场的价格 |
默认选项,消息Producer不需要等待Consumer确认签收即完成发送操作。
ActiveMQ缺省支持批量确认消息。由于批量确认会提高性能,因此这是缺省的 确认方式。
ActiveMQ可以保证主题的所有消费者以相同的顺序接收消息。
默认的ActiveMQ会保证所有消费者以相同顺序接收来自同一生产者的消息。然而,由于多线程和异步处理,不同生产者产生的消息的接受顺序无法保证。此行为可以通过destinationPolicy实现。
这类消息由ActiveMQ系统内部使用,用于收发系统事件,例如新的消费者的注册。代理网络很大程度上依赖通知消息工作。
可以在内部进行消息格式的转换,提供MessageTransformer接口。 在ActiveMQConnectionFactory、ActiveMQConnection、ActiveMQSession、 ActiveMQMessageConsumer、ActiveMQMessageProducer等对象上调用 setTransformer方法可以设置消息消息转换器。
Queue中的消息是按照顺序被分发到consumers的。然而,当你有多个 consumers同时从相同的queue中提取消息时,你将失去这个保证。因为这些消 息是被多个线程并发的处理。有的时候,保证消息按照顺序处理是很重要的。 例如,你可能不希望在插入订单操作结束之前执行更新这个订单的操作。 ActiveMQ从4.x版本起开始支持Exclusive Consumer (或者说Exclusive Queues)。 Broker会从多个consumers中挑选一个consumer来处理queue中所 有的消息,从而保证了消息的有序处理。如果这个consumer失效,那么broker 会自动切换到其它的consumer。
设置属性JMSXGroupID,可以保证同一个组的数据,仅被发送给一个Consumer。 可以在大消息分割的场景下使用。
JMS Selectors用于在订阅中,基于消息属性和Xpath语法对进行消息的过滤。JMS Selectors由SQL92语义定义。JMS Selectors用于在订阅中,基于消息属性和Xpath语 法对进行消息的过滤。
默认的死信队列名称为:ActivemMQ.DLQ。所有不能消费的消息被传递到该死队列中。 过期消息的处理:
- 持久化消息默认是存入死信队列,设置 processExpired=false,则直接删除
- 非持久化消息,必须设置 processNonPersistent=true才能放入死信队列
要创建分布式队列或主题,需要进行代理之间的通信,ActiveMQ支持两种类型的代理通信方式:
- 主/从代理:用于提供高可用性,消息在主从代理之间保持复制。当主代理宕机,则从 代理自动接管并提供服务
- 存储和转发—代理网络:消息从一个在代理之间进行传递,直到其到达消费者那里, 每个消息在一个时刻,只会属于单个代理(即时它宕掉)
分布式队列:Producer发布一个消息到代理,代理将其存储,如果该代理配置了 store/forward到其它代理,消息将会按照一定的算法转发给其它代理。注意:只有目 标代理上有该消息的消费者,消息才会被转发
分布式主题:与分布式队列类似,不同的是,每个队主题感兴趣的客户端,均会 收到一份消息的拷贝,ActiveMQ内置算法保证不会在环形网络中出现无限循环。
本章内容主要讨论ActiveMQ实例(服务器端)的管理和配置。
解压ActiveMQ二进制包到任意目录,把它的bin目录添加到PATH环境变量中。执行InstallService.bat可以安装为Windows服务。
在命令行中执行 activemq start即可启动ActiveMQ实例。运行 activemq stop则停止实例。
默认的Web管理控制台位于:http://localhost:8161/admin 默认用户密码均为admin。
默认的运行日志位于data/activemq.log。
默认的消息持久化数据库位于data/kahadb。
ActiveMQ独立运行时,内部维护了一个Spring IoC容器的实例,其相关服务均是以 Spring Bean的形式存在的,因此,嵌入已有JVM运行非常简单,只需要提供Spring 支持即可。 此外,ActiveMQ提供了一系列的API,方便使用纯编程的方式启动、配置实例。
只有在嵌入模式下,才可以使用VM Transport。
ActiveMQ服务器可以已硬编码的方式进行配置,而不使用XML配置文件。通常情况下不需要 使用硬编码方式,除非需要进行运行时动态的控制。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
public void start() throws Exception { //代理服务对象 broker = new BrokerService(); //基于AMQ日志的持久化适配器 JournalPersistenceAdapterFactory jpaf = new JournalPersistenceAdapterFactory(); //日志数量 jpaf.setJournalLogFiles( 5 ); //日志文件大小 jpaf.setJournalLogFileSize( 1024 * 1024 * 128 ); broker.setPersistenceFactory( jpaf ); //设置持久化机制 // 基于KahaDB的持久化适配器 KahaDBPersistenceAdapter pa = new KahaDBPersistenceAdapter(); pa.setCheckForCorruptJournalFiles( true ); File userHome = new File( System.getProperty( "user.home" ) ); File amqDataDir = new File( userHome, "ActiveMQ/data" ); amqDataDir.mkdirs(); pa.setDirectory( amqDataDir ); broker.setPersistenceAdapter( pa ); broker.setBrokerName( "UniqueName" ); //设置代理的名称,每个代理应该具有独特的名称 broker.addConnector( "tcp://localhost:61616" ); //设置传输连接器 broker.setPersistent( false ); //如果写这一句,那么仅使用内存来存储消息 broker.start(); } public void stop() throws Exception { broker.stop(); } |
下面的代码示例了两个基于发现传输连接在一起的代理协同工作的场景:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 |
public class DiscoveryBroker { public static final String QUEUE_NAME = "AMQ.BrokerB"; public static BrokerService create( String name ) throws Exception { BrokerService broker = new BrokerService(); broker.setBrokerName( name ); broker.setPersistent( false ); broker.setUseJmx( false ); // 异步启动网络连接器 broker.setStartAsync( true ); // 基于发现机制的网络连接器:发现其它代理 List<NetworkConnector> ncs = new ArrayList<>(); URI discoveryURI = new URI( "multicast://default?group=pems" ); // DiscoveryNetworkConnector是默认的网络连接器实现,不管是否基于动态发现 NetworkConnector nc = new DiscoveryNetworkConnector( discoveryURI ); // 允许双向通信 nc.setDuplex( true ); nc.setName( "NC_" + name ); ncs.add( nc ); // 如果使用静态URI,类似:broker.addNetworkConnector("static://tcp://host:port"); // broker.addNetworkConnector( nc ); broker.setNetworkConnectors( ncs ); // 基于发现机制的传输连接器:让别人连接到当前代理 List<TransportConnector> tcs = new ArrayList<>(); TransportConnector tc = new TransportConnector(); tc.setUri( new URI( "tcp://0.0.0.0:0" ) ); tc.setDiscoveryUri( discoveryURI ); tcs.add( tc ); broker.setTransportConnectors( tcs ); return broker; } } public class DiscoveryBrokerA extends DiscoveryBroker { private static final Logger LOGGER = LoggerFactory.getLogger( DiscoveryBrokerA.class ); public static void main( String[] args ) throws Exception { BrokerService brokerA = DiscoveryBroker.create( "BrokerA" ); brokerA.start(); new Thread( new Runnable() { @Override public void run() { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( "vm://BrokerA" ); Connection conn; try { conn = factory.createConnection(); // 连接必须启动 conn.start(); Session session = conn.createSession( false, Session.AUTO_ACKNOWLEDGE ); Destination dest = session.createQueue( QUEUE_NAME ); MessageProducer producer = session.createProducer( dest ); producer.setDeliveryMode( DeliveryMode.NON_PERSISTENT ); List<String> prevChildren = new ArrayList<>(); int count = 0; for ( ; ; ) { LOGGER.debug( "Sending message to queue {}", QUEUE_NAME ); MapMessage msg = new ActiveMQMapMessage(); msg.setString( "name", "Alex" ); msg.setInt( "age", 30 ); msg.setBoolean( "married", true ); Map<String, Object> meng = new LinkedHashMap<>(); meng.put( "name", "Meng" ); meng.put( "age", 27 ); msg.setObject( "spouse", meng ); List<String> children = new ArrayList<>( prevChildren ); msg.setObject( "children", children ); children.add( "Child" + ++count ); producer.send( msg ); prevChildren = children; TimeUnit.SECONDS.sleep( 10 ); } } catch ( Exception e ) { LOGGER.error( e.getMessage(), e ); } } } ).start(); } } public class DiscoveryBrokerB extends DiscoveryBroker { private static final Logger LOGGER = LoggerFactory.getLogger( DiscoveryBrokerB.class ); public static void main( String[] args ) throws Exception { BrokerService brokerB = DiscoveryBroker.create( "BrokerB" ); brokerB.start(); new Thread( new Runnable() { @Override public void run() { ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( "vm://BrokerB" ); Connection conn; try { conn = factory.createConnection(); conn.start(); Session session = conn.createSession( false, Session.AUTO_ACKNOWLEDGE ); Destination dest = session.createQueue( QUEUE_NAME ); MessageConsumer consumer = session.createConsumer( dest ); consumer.setMessageListener( new MessageListener() { @Override public void onMessage( Message message ) { MapMessage msg = (MapMessage) message; Object children = null; try { children = msg.getObject( "children" ); } catch ( JMSException e ) {} LOGGER.debug( "Received message from {}, children property: {}", QUEUE_NAME, children ); } } ); } catch ( JMSException e ) { LOGGER.error( e.getMessage(), e ); } } } ).start(); TimeUnit.SECONDS.sleep( 600 ); } } |
无论是独立运行还是嵌入已有VM运行,ActiveMQ的配置均是以 Springframework为基础的,整体配置框架如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="data/kahadb"> <networkConnectors>网络连接器,用于构建代理网络</networkConnectors> <transportConnectors>传输连接器,代理在此连接器上等待客户端、其它代理的连接</transportConnectors> <persistenceAdapter>配置持久化适配器,实现消息持久化存储</persistenceAdapter> <destinationPolicy>为目标设置各种控制策略</destinationPolicy> <managementContext></managementContext> <systemUsage>限制使用系统资源的量</systemUsage> <plugins></plugins> </broker> </beans> |
transportConnectors子元素用于配置ActiveMQ使用的传输,其每个子元素 transportConnector定义一种传输方式,包含name、uri等元素,可以配置多个传输方式 —— 也就是说一个ActiveMQ实例可以支持很多底层通信机制。
传输连接器可以用于:
- 虚拟机外部的客户端连接到代理
- 其它代理连接到当前代理
基本配置格式如下:
1 2 3 |
<transportConnectors> <transportConnector name="" uri=""/> </transportConnectors> |
transportConnector包含以下服务器端选项:
选项 | 说明 |
name | 默认null,此传输连接器的名称 |
uri |
默认null,URI的格式为: protocol://host:port?key=val&key=val,问号后面的是参数列表。URI的示例:tcp://0.0.0.0:61616、amqp://0.0.0.0:5672、ssl://localhost:61616。 这些URI包含了两方面的含义,一方面是声明代理支持的连接方式,系统会做相应的处理以便支持该连接;另一方面,客户端也使用此格式的URI来获取到代理的连接 注意,URI会构建为java.net.URI对象,因此任何空白符都是不允许的 |
discoveryURI | 默认null,可以设置一个组播发现地址,以便客户端、其它代理自动发现当前代理 |
enableStatusMonitor | 默认false,可以监控连接状态,判断其是否被阻塞(Blocked) |
updateClusterClients | 默认false,当代理集群发生改变后,是否更新客户端连接(如果客户端使用failover:// transport的话) |
rebalanceClusterClients | 默认false,当代理网络的拓扑改变后,是否在集群内部重新平衡客户端(负载均衡) |
updateClusterClientsOnRemove | 默认false,当代理从集群中移除后,是否更新客户端 |
updateClusterFilter | 默认null,逗号分隔的正则式。代理名称匹配其一的代理,可以引发客户端更新 |
allowLinkStealing |
默认false,对于MQTT则默认启用 所谓连接偷取,是指最后两个/多个具有相同ID(JMS的clientID)的连接,被认为是合法的,并且比较老的连接会被代理自动丢弃 |
本节介绍客户端也支持的选项,发起连接的时候可以使用。注意这些选项:
- 可以在传输URI中作为参数。示例 tcp://localhost:61616?jms.useAsyncSend=true
- 也可以通过编程方式设置ActiveMQConnectionFactory、ActiveMQConnection对象的属性
- 当在brokerURL或者代理的传输连接器中使用这些选项时,必须加上 jms.前缀
选项列表:
选项 | 说明 | ||
alwaysSessionAsync |
默认true,表示为Connection中的每个Session分配独立的线程,用于消息的派发 当存在多个会话,或者会话不处于Session.AUTO_ACKNOWLEDGE或Session.DUPS_OK_ACKNOWLEDGE模式,则总是使用独立线程 |
||
alwaysSyncSend |
默认false,设置为true则MessageProducer总是同步的发送消息,甚至是递送模式(Delivery Mode)不需要同步的情况下 |
||
auditDepth | 默认2048,审计重复、乱序消息时,消息窗口的大小(即审计探测的深度) | ||
auditMaximumProducerNumber | 默认64,最多被审计的生产者的数量 | ||
checkForDuplicates | 默认true,消费者是否检查重复消息,并且进行适当处理,避免同一消息被意外的处理两次 | ||
clientID | 默认null,为连接设置JMS的clientID | ||
closeTimeout |
默认15000,单位ms。默认情况下,在连接上执行close()操作需要等待代理的确认,此超时防止代理不可用时客户端的无限等待 |
||
consumerExpiryCheckEnabled | 默认true,是否让消费者检查消息是否过期,禁用后可能消费已经过期的消息 | ||
copyMessageOnSend |
默认true,当基于JMS的send()发送消息时,是否把消息拷贝到一个新的JMS Message对象中。默认设置为true 以便和JMS规范兼容 如果你再send()后不会再修改消息,则可以设置为false,这样可以提升性能 |
||
disableTimeStampsByDefault | 默认false,是否禁用消息的时间戳,禁用后可以获得较小的性能提升 | ||
dispatchAsync |
默认false,代理是否应该异步的分发消息给消费者 如果消费者比较慢,则异步递送消息更加有意义。当消费者很快的时候,同步递送可以避免synchronization和上下文切换的开销 当基于同步递送时,生产者可能因为消费者缓慢而被阻塞 |
||
nestedMapAndListEnabled |
默认true,是否支持结构化消息属性、MapMessage,并且支持内嵌的Map或List对象 JMS扩展特性允许你附带Map、List属性到JMS消息对象。或者,直接在MapMessage中使用内嵌的Map或者List对象 这一特性高效的发送类型安全的结构信息,而避免串行化/反串行化的开销 相关方法:
其中value参数可以是Map、List、数字、字符串类型及其嵌套结构 |
||
objectMessageSerializationDefered |
默认false,当通过设置对象到ObjectMessage上时,JMS规范要求对象被set()方法立即串行化 设置为true,则不兼容JMS规范,仅仅在需要通过套接字发送消息时,才进行串行化 |
||
optimizeAcknowledge |
默认false。是否启用优化的消息签收模式 —— 批量的签收 作为备选,你可以在消费者上设置Session.DUPS_OK_ACKNOWLEDGE签收模式,通常会更快一些 警告:启用后,可能导致重新连接后与自动签收有关的问题 |
||
optimizeAcknowledgeTimeOut | 默认300,单位ms。两次批量签收行为的最大间隔 | ||
optimizedAckScheduledAckInterval |
默认0,单位ms。如果大于0则经过指定的间隔,所有未签收的消息都被批量签收 可以防止长时间运行的消费者,不再接收消息后,未签收的消息有机会被签收 |
||
optimizedMessageDispatch | 默认true。仅对于持久化订阅有意义,设置为true则具有更大的预读取(prefetch)限制 | ||
useAsyncSend |
默认false。是否使用异步发送 强制使用异步发送可以获得很大的性能提升。但是这意味着send()会立即返回,而不管消息是否被递送,因而可能导致消息丢失 |
||
useCompression | 默认false。是否启用消息体压缩 | ||
useRetroactiveConsumer |
默认false。是否启用可追溯消费者 可追溯消费者允许非持久化的主题订阅者能够接收到老旧(在非持久化订阅者启动之前发布)的消息 |
||
warnAboutUnstartedConnectionTimeout | 默认500,单位ms。创建JMS Connection后,必须start()才能接受消息。这个选项可以在你忘记start()时给予提示 |
在计算机网络领域,线路协议(wire protocol)这一术语通常和传输协议(transport protocols)—— 例如 TCP或者UDP ——进行区分,它用于在应用程序级别表示信息的方式。这些协议可以是基于文本的,也可以是二进制的。
从5.13.0开始,ActiveMQ支持在TCP、SSL、NIO、NIO SSL之上进行自动的线路协议检测,支持的线路协议包括:OpenWire、STOMP、AMQP、MQTT。
配置示例:
1 2 3 4 5 6 7 8 |
<!-- 在TCP之上的自动线路协议检测 --> <transportConnector name="auto" uri="auto://localhost:5671"/> <!-- 在启用SSL的TCP之上的自动线路协议检测 --> <transportConnector name="auto+ssl" uri="auto+ssl://localhost:5671"/> <!-- 在NIO TCP之上的自动线路协议检测 --> <transportConnector name="auto+nio" uri="auto+nio://localhost:5671"/> <!-- 在NIO SSL TCP之上的自动线路协议检测 --> <transportConnector name="auto+nio+ssl" uri="auto+nio+ssl://localhost:5671"/> |
用于支持在JVM内部进行客户端之间的连接,避免网络占用。第一个启用VM连接的 客户端,会启动一个内嵌的Broker实例。最后一个客户端断开,则会停止实例。
URI示例:
1 |
vm://brokerName?transportOptions |
更复杂的URI格式:
1 |
vm:(broker:(tcp://localhost)?brokerOptions)?transportOptions |
VM传输不是基于网络套接字,而是直接的Java方法调用,因此速度很快。
此传输专有选项:
选项 | 默认值 | 说明 |
marshal | false | 如果设置为true,则基于此传输发送的任何命令,均使用某种WireFormat进行分解(unmarshall)和编组(marshall) |
wireFormat | default | 使用的WireFormat工厂的名称 |
wireFormat.* | 这些属性用来配置WireFormat | |
create | true | 如果代理不存在,是否按需创建 |
waitForStart | -1 | 如果大于0,表示等待代理启动的毫秒数 |
broker.* | 这些属性用来配置代理 |
高级消息队列协议(Advanced Message Queuing Protocol,AMQP),是提供统一消息服务的应用层标准高级消息队列协议,是一个开放标准。ActiveMQ支持该协议的1.0版本。
注意使用AMQP时,目标的地址前缀以queue://或者topic://,前者为默认,可以省略。
配置示例:
1 2 3 4 5 6 7 8 |
<transportConnectors> <transportConnector name="amqp" uri="amqp://0.0.0.0:5672"/> </transportConnectors> <!-- AMQP over NIO --> <transportConnector name="amqp+nio" uri="amqp+nio://localhost:5672"/> <!-- AMQP over SSL --> <transportConnector name="amqp+ssl" uri="amqp+ssl://localhost:5671"/> |
关于此传输的更多信息,参考ActiveMQ文档。
允许客户端基于TCP套接字连接到远程的ActiveMQ代理。示例:
1 2 3 4 |
# 服务器端,在传输连接器的属性中配置 tcp://localhost:61616?transport.threadName&transport.trace=false&transport.soTimeout=60000 # 客户端,在代理URI中配置 tcp://localhost:61616?threadName&trace=false&soTimeout=60000 |
此传输专有选项:
选项 | 默认值 | 说明 |
backlog | 5000 | 此传输的服务端,等待被接受的连接的队列的大小 |
closeAsync | true |
是否异步的关闭套接字,对于STOMP之类的线路协议,此选项应设置为false STOMP之类的协议,通常为每个读写操作打开一个套接字,同步的关闭可以防止代理没有可用的套接字(当套接字回收极快的情况下) |
connectionTimeout | 30000 | 连接超时,单位ms |
daemon | false | 传输的线程是否工作在守护模式下,当嵌入到其它JVM或者Web应用中运行ActiveMQ代理时应该设置为true,这样容器才能正确的关闭 |
diffServ | 0 |
仅客户端使用。设置出站流量的差异化服务流量( Differentiated Services traffic)的类别 |
dynamicManagement | false | 设置为true则TransportLogger可以被JMX管理 |
ioBufferSize | 8 * 1024 | TCP层和OpenWire层之间的缓冲,此缓冲在基于wireFormat进行编组时使用 |
jmxPort | 1099 | JMX的端口 |
keepAlive | false | 是否启用TCP保活,用于防止在TCP层出现连接超时 |
logWriterName | default | org.apache.activemq.transport.LogWriter实现的名字 |
maximumConnections | Integer.MAX_VALUE | 此代理允许的最大连接数 |
minmumWireFormatVersion | 0 | 最小支持的远程wireFormat版本,0表示不检查版本 |
socketBufferSize | 64 * 1024 | 套接字读写缓冲的大小 |
soLinger | Integer.MIN_VALUE | 设置TCP的soLinger选项,设置为-1则禁用此选项。当套接字被关闭时,此选项影响缓冲中残留的尚未发送的流量的处理方式 |
soTimeout | 0 | 套接字读超时时间,如果操作没有在超时前完成会导致套接字被关闭 |
soWriteTimeout | 0 | 套接字写超时时间,如果操作没有在超时前完成会导致套接字被关闭 |
stackSize | 0 | 此传输的后台读线程的栈大小,必须设置为128K的倍数 |
tcpNoDelay | false | 是否启用TCP_NODELAY选项 |
threadName | 设置传输的后台线程的名称 | |
trace | false |
设置为true,则所有通过此传输发送的命令都被记录 通过Log4J查看这些日志的方式: log4j.logger.org.apache.activemq.transport.TransportLogger=DEBUG |
trafficClass | 0 | 在套接字上设置的Traffic类 |
typeOfService | 0 | 仅客户端。偏好的服务类型(Type of Service),在出站数据包上设置 |
useInactivityMonitor | true | 设置为false 则禁用InactivityMonitor,连接永远不会超时 |
useKeepAlive | true | 设置为true则在空闲连接上发布KeepAliveInfo消息,防止其超时 |
useLocalHost | false |
设置为true则使用localhost而非实际的本地主机名来连接到本机。Mac OS X不支持使用本地主机名连接到本机,因此只能使用localhost |
useQueueForAccept | true | 如果设置为true,则被接受的套接字排队,由额外线程异步的处理 |
wireFormat | default | 使用的WireFormat工厂的名称 |
wireFormat.* | 这些属性用来配置WireFormat |
此传输和普通的TCP传输很类似,区别仅仅是此传输基于Java NIO API实现,提高了性能和可扩容性。
NIO是仅服务器端的传输配置,如果在客户端使用此传输,会初始化一个常规的TCP传输。注意,最初的NIO传输是TCP + Openwire的替代,其它线路协议 —— AMQP, MQTT, Stomp ——均有自己的NIO实现,配置时协议前缀是xxx+nio,例如:
1 2 3 4 |
# MQTT + NIO mqtt+nio://localhost:1883 # Openwire + NIO nio://hostname:port?key=value |
配置选项和TCP传输相同。此外,你可以设置一些JVM系统属性,来微调NIO传输的线程使用:
JVM属性 | 说明 |
org.apache.activemq.transport.nio.SelectorManager.corePoolSize | 默认10。连接池中维持的最小线程数量 |
org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize | 默认1024。连接池中允许的最大线程数量 |
org.apache.activemq.transport.nio.SelectorManager.workQueueCapacity | 增加线程池大小时,工作队列积压的至少深度 |
org.apache.activemq.transport.nio.SelectorManager.rejectWork | 默认false。为了保证既有的QoS,允许拒绝工作 |
允许客户端使用SSL over TCP连接到代理。从5.4开始,任何SSLServerSocket类的选项可以通过?transport.xxx来设置:
1 2 |
ssl://localhost:61616?transport.enabledCipherSuites=SSL_RSA_WITH_RC4_128_SHA,SSL_DH_anon_WITH_3DES_EDE_CBC_SHA ssl://localhost:61616?transport.needClientAuth=true |
基于Spring的JMS客户端配置示例:
1 2 3 4 5 6 7 8 9 |
<bean id="AMQJMSConnectionFactory" class="org.apache.activemq.ActiveMQSslConnectionFactory"> <property name="trustStore" value="/path/to/truststore.ts" /> <property name="trustStorePassword" value="password" /> <property name="keyStore" value="/path/to/keystore.ks" /> <property name="keyStorePassword" value="password" /> <property name="brokerURL" value="ssl://localhost:61616" /> <property name="userName" value="admin" /> <property name="password" value="admin" /> </bean> |
除非 transport.needClientAuth=true ,否则只需要单向身份认证,客户端不需要配置keystore,而仅需要truststore来验证服务器身份。
示例:
1 2 3 |
<transportConnectors> <transportConnector name="nio+ssl" uri="nio+ssl://0.0.0.0:61616"/> </<transportConnectors> |
在客户端使用该传输,会得到一个普通的SSL传输。
此传输的实现位于activemq-optional包中。使用XML载荷穿过HTTP隧道,可以让ActiveMQ客户端和代理穿越某些网络环境下的防火墙。
对于客户端,除了JMS之外,可以考虑ActiveMQ的REST和Ajax支持。
从5.4开始,ActiveMQ支持基于HTML5 WebSocket,在浏览器端直接和代理进行交互。
由于JavaScript处理JSON格式很简单,因此Stomp是WebSocket传输下很好的线路协议。从5.9开始,高效的二进制协议MQTT也被支持。
配置示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
<transportConnectors> <transportConnector name="websocket" uri="ws://0.0.0.0:61614"/> </transportConnectors> <!-- 启用安全WebSocket,5.7开始支持 --> <transportConnectors> <transportConnector name="secure_websocket" uri="wss://0.0.0.0:61614"/> </transportConnectors> <!-- 为代理配置SSL上下文 --> <sslContext keyStore="file:${activemq.conf}/broker.ks" keyStorePassword="password" trustStore="file:${activemq.conf}/broker.ts" trustStorePassword="password" /> |
ActiveMQ提供了一种高级别的传输,这些传输总是在上面那些基础传输的更上一层工作。
该传输是一种逻辑传输,在其他传输的上层工作,在ActiveMQ3中,称Reliable传输。 它的价值是保障高可用性,维护一组URI的列表,从中随机(默认)寻找一个进行连接 尝试,如果失败,则会尝试其他的URI。该传输的URI格式:
1 2 3 4 5 6 |
failover:(uri1,...,uriN)?transportOptions&nestedURIOptions # 或者 failover:uri1,...,uriN # 示例 failover:(tcp://localhost:61616,tcp://remotehost:61616)?initialReconnectDelay=100 |
该传输的选项包括:
选项 | 默认值 | 说明 |
backup | false | 是否初始化并持有针对备选传输的连接,这样故障转移更快 |
initialReconnectDelay | 10 | 第一次重连尝试之前等待的毫秒数 |
maxCacheSize | 131072 |
被跟踪消息的缓存的大小,字节 仅仅trackMessages设置为true时才有意义 |
maxReconnectAttempts |
从5.6开始,默认值-1表示永远尝试,0表示禁止重连 5.6之前,默认0,表示永远尝试 大于0的值表示最大重连尝试次数,超过此次数后,错误被发送给客户端代码 |
|
maxReconnectDelay | 30000 | 第二次以及后续重连尝试,最大的延迟时间 |
nested.* | 从5.9开始,应用到下层URI列表中每一项的配置 | |
randomize | true | 随机的选择一个URI进行重连 |
reconnectDelayExponent | 2.0 | 重连时延迟会逐步增加,这个指数说明增加的幅度 |
reconnectSupported | true | 客户端是否应该向代理报告ConnectionControl事件,和rebalanceClusterClients有关 |
startupMaxReconnectAttempts | -1 |
-1表示启动时重连的次数不限制 大于0表示启动时最大重连次数,超过此次数后向客户端代码报告错误 |
timeout | -1 | 在没有重连操作干扰的情况下,send操作超时时间,ms |
trackMessages | false | 保留尚未发送的消息,在重连成功后刷出到代理 |
updateURIsSupported | true | 5.4+,客户端是否允许代理推送故障转移备选URI列表 |
useExponentialBackOff | true | 连接重试时,是否使用指数级的延迟增长 |
warnAfterReconnectAttempts | 10 | 在多少次重连尝试后,记录警告 |
故障转移的服务器端配置:
选项 | 默认值 | 说明 |
updateClusterClients | false | 如果true,推送变更后的代理集群信息给客户端 |
rebalanceClusterClients | false |
如果true,新的代理连接到集群中后,发送信息给客户端,要求再平衡 —— 均衡分布客户端 priorityBackup=true 覆盖此选项 |
updateClusterClientsOnRemove | false | 如果true,当代理离开集群后,更新客户端 |
updateClusterFilter | null | 逗号分隔的正则式,匹配的代理名称将作为故障转移的成员 |
在任何其它传输之上工作,应用重连、复制逻辑。它使用发现传输来检测代理,并复制命令到这些代理。该传输的URI格式:
1 2 3 |
fanout:(discoveryURI)?transportOptions # 或 fanout:discoveryURI |
该传输的选项包括:
选项 | 默认值 | 说明 |
initialReconnectDelay | 10 | 第一次重连(首次连接失败后)尝试前,等待的毫秒数 |
maxReconnectDelay | 30000 | 两次重连最大的间隔 |
useExponentialBackOff | true | 连接重试时,是否使用指数级的延迟增长 |
backOffMultiplier | 2 | 延迟增长的倍数 |
maxReconnectAttempts | 0 | 最大重连次数,超过此次数后错误信息发送给客户端 |
fanOutQueues | false |
如果设置为true,则命令被复制到主题的同,也复制到队列 默认的该传输仅仅复制命令到主题,因此,假设你想发送命令到多个代理的多个队列上,设置为true |
minAckCount | 2 | 最少需要连接上的代理数量 |
此传输的工作方式和Failover传输类似,但是它基于所谓发现代理(discovery agent)来定位可以连接的URI的列表。Fanout传输会使用到该传输,以便发现需要复制命令的目标代理。URI格式:
1 2 3 |
discovery:(discoveryAgentURI)?transportOptions # 或 discovery:discoveryAgentURI |
想要基于此传输发现代理,目标代理必须启用了组播发现代理。服务器配置示例:
1 2 3 4 5 |
<transportConnectors> <!-- 端口0表示随机选择TCP传输端口 ,discoveryUri说明发现代理使用的组播地址,默认 --> <!-- URI为multicast://default,即multicast://239.255.2.3:6155 --> <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/> </transportConnectors> |
该传输的选项包括:
选项 | 默认值 | 说明 |
reconnectDelay | 10 | 等待发现的延迟 |
initialReconnectDelay | 10 | 第一次尝试重连(首次连接失败后)到一个发现的URI前,等待的毫秒数 |
maxReconnectDelay | 30000 | 两次重连最大的间隔 |
useExponentialBackOff | true | 连接重试时,是否使用指数级的延迟增长 |
backOffMultiplier | 2 | 延迟增长的倍数 |
maxReconnectAttempts | 0 | 最大重连次数,超过此次数后错误信息发送给客户端 |
group | default | 组播分组标识符 |
类似于发现传输,只是它基于ZeroConf协议。URI格式:
1 2 3 |
zeroconf:serviceName?transportOptions # 或 zeroconf:serviceName |
服务器端配置示例:
1 2 3 4 |
<transportConnectors> <transportConnector name="openwire" uri="tcp://0.0.0.0:61616" discoveryUri="zeroconf:_activemq_development" /> </transportConnectors> |
在进行传输配置时,你可以指定wireFormat说明要使用何种线路协议,可以指定wireFormat.*来为线路协议指定配置。
选项 | 默认值 | 说明 |
stackTraceEnabled | true | 代理发生的异常堆栈是否发送到客户端 |
tcpNoDelayEnabled | true | 对数据格式没有影响,提示Soecket启用TCP_NO_DELAY |
cacheEnabled | true | 是否进行缓存,避免重复内容的数据转换开销 |
tightEncodingEnabled | true | 是否使用紧凑的格式,压缩大小 |
prefixPacketSize | true | 数据包的前缀包含其大小 |
maxInactivityDuration | 30000 | 认为Socket已经断开的超时时间 |
maxInactivityDuration InitalDelay | 10000 | 认为Socket已经断开的超时,初始判断的延迟 |
cacheSize | 1024 | 最大缓存数量 |
maxFrameSize | MAX_LONG | 最大可以发送的帧大小 |
Spring客户端配置示例:
1 2 3 4 |
<bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="failover:(tcp://localhost:61616?jms.optimizeAcknowledge=false&wireFormat.maxInactivityDuration=30000)"/> </bean> |
为提供无限制可扩展性(scalability ),往往需要把若干ActiveMQ代理连接到一 起,进而让分布在各地的客户端逻辑的连接在一起进行消息交互。代理网络提供了 分布式队列与主题(distributed queues and topics)的支持。
当使用客户端/服务器模型或者星状模型时,代理可能会引起单点故障。启用代理网络后此问题不再存在。
客户端可以连接到代理网络中的任意一个代理,并在此代理出现故障时,转移到其它代理。
默认情况下,代理网络中节点之间的连接是单向的 —— 建立连接通道的代理可以向目标代理发送消息,反过来则不行。从5.x开始,通道可以配置为双向的,这样对于星状结构中位于防火墙背后的Hub可以受益。
配置代理网络最简单的方式是基于Spring的XML配置。支持两种发现代理的机制:
- 硬编码参与网络的其它代理的URI
- 使用发现传输
硬编码的示例:
1 2 3 4 5 6 7 8 9 10 |
<broker brokerName="receiver" persistent="false" useJmx="false"> <networkConnectors> <!-- 这里指向其它代理通过传输连接器暴露的URI --> <networkConnector uri="static:(tcp://localhost:62001,tcp://host2:62000)"/> </networkConnectors> <transportConnectors> <!-- 本代理暴露的URI --> <transportConnector uri="tcp://localhost:62002"/> </transportConnectors> </broker> |
基于组播发现的示例:
1 2 3 4 5 6 7 8 9 10 |
<broker name="sender" persistent="false" useJmx="false"> <networkConnectors> <!-- 使用组播发现其它代理 --> <networkConnector uri="multicast://default"/> </networkConnectors> <transportConnectors> <!-- 所有参与网络的代理都需要这样配置 --> <transportConnector uri="tcp://localhost:0" discoveryUri="multicast://default"/> </transportConnectors> </broker> |
默认情况下,网络连接器( network connector)作为代理启动流程的一部分、串行的初始化。如果某些网络很缓慢,会导致整个集群中代理启动都缓慢。从5.5开始你可以设置代理属性 networkConnectorStartAsync="true" 这会导致代理使用异步线程来启动网络连接器。
属性 | 默认值 | 说明 |
name | bridge | 网络的名称,如果同一对代理之间有多重网络,必须制定不一样的名称 |
dynamicOnly | false | 如果设置为true,则仅仅本地持久化订阅者重新激活后,才激活相应的网络化的持久订阅 |
decreaseNetworkConsumerPriority | false |
如果设置为true,从-5开始,离消息生产者越远(跨越代理的数量)的消费者,其优先级越低 默认的,所有消费者具有优先级0,和本地消费者一样优先 |
networkTTL | 1 | 消息、订阅能够穿透的代理数量 |
messageTTL | 1 | 从生产者角度来看,消息能够穿透的代理数量 |
consumerTTL | 1 | 从消费者角度来看,订阅(主题或者队列)能够穿透的代理数量 |
conduitSubscriptions | true | 订阅同一目标的多个消费者,是否被网络当做单一消费者看待 |
excludedDestinations | 匹配此列表的那些目标,不会通过代理网络转发(仅应用到dynamicallyIncludedDestinations) | |
dynamicallyIncludedDestinations |
匹配此列表的那些目标,会通过代理网络转发 注意:如果此列表为空,则不在excludedDestinations中的所有目标都被转发 |
|
useVirtualDestSubs | false | 如果为true,则网络连接会监听通知消息(advisory messages)以获得虚拟目标消费者 |
staticallyIncludedDestinations | 匹配此列表的目标总是跨越代理网络被转发,甚至没有消费者关心此目标 | |
duplex | false |
设置为true,则网络连接通道是双向的。当前代理可以基于此通道进行生产、消费,而不仅仅是生产 尝试通过单向通道接收消息,会收到如下错误: Could not start network bridge between: vm://BrokerB?async=false&network=true and: tcp://192.168.0.89:61616 due to: Connection refused (Connection refused) |
prefetchSize | 1000 | 在网络连接器的消费者上设置预读取大小。必须大于0,因为网络消费者不执行轮询操作 |
suppressDuplicateQueueSubscriptions | false |
如果为true,则代理网络中中继过来的重复订阅被忽略 例如通过组播连接到一起的代理A、B、C。A上的本地消费者会传递并映射为B、C的网络消费者。进一步C上的网络消费者(源自A)会传递并映射到B、B也传递并映射到C。当此选项为true时,B和C之间的网络桥会并抑制 通过此设置来减少可选的路由路径,可以为跨越网络迁移的生产者/消费者提供一种确定性 —— 潜在的死路由被消除了 触发此抑制行为,需要networkTTL到达或者超过代理数量 |
bridgeTempDestinations | true |
是否在代理网络中通过通知消息来广播新创建的临时目标 临时目标通常用于执行请求/应答模式,该选项默认true,这样可以让执行请求应答模式通信的消费者位于代理网络中的其它节点上,并可以设置JMSReplyTo然后把应答消息返回给最初的生产者 如果应用程序中大量使用请求/应答模型,该选项为true会导致在代理网络中产生额外的流量。原因是,通常JMSReplyTo总是填写一个唯一地址,则就意味着总是会创建新的临时目标,进而广播到整个代理网络 如果设置为false,则基于请求应答模型通信的生产者/消费者必须连接到同一个代理上。否则你会收到temp destination does not exist错误 |
alwaysSyncSend | false | 设置为true时,非持久化消息基于请求/应答的方式,而不是单向的方式发送给远程代理 —— 和持久化消息的处理方式一样 |
staticBridge | false | 如果设置为true,代理不会因为新的消费者的出现而做出响应,它只会使用staticallyIncludedDestinations来创建demand subscriptions |
userName | null | 针对远程代理进行身份验证时的用户名 |
password | null | 针对远程代理进行身份验证时的密码 |
使用硬编码URI时,你可以设置initialReconnectDelay、maxReconnectDelay、useExponentialBackOff、backOffMultiplier这些选项,来控制重连操作:
1 |
uri="static:(tcp://host1:61616,tcp://host2:61616)?maxReconnectDelay=5000&useExponentialBackOff=false" |
代理网络的一个常见用途是,创建一个Master和多个Slave之间的连接:
1 2 3 |
<networkConnectors> <networkConnector uri="masterslave:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/> </networkConnectors> |
这个通常和failover传输一起使用。
代理网络执行可靠的消息存储、转发。如果源(消息)是持久的 —— 队列上的持久消息、持久化主题订阅 —— 则代理网络仍然保证其持久性。
但是,如果源是非持久性的,则代理网络不会为其增加持久性特征。当非持久化源 —— 非持久化主题订阅、临时目标 —— 位于代理网络中时,出现故障时,正在传递中的消息可能丢失。
在代理网络中,整体的消息顺序(Total ordering)不被保留。代理网络引入了额外的消费者 —— 网络桥消费者通过producer.send(..)实现消息转发,这样,消息从其本地的队列头中取出,又放到目标代理对应队列尾部。
ActiveMQ依赖于活动消费者(订阅者,subscriptions)的信息,来决定是否在代理网络上传递消息。代理把来自远程代理的订阅、和本地的订阅同等看待,并且路由相关消息的拷贝到所有消费者。
对于主题订阅,远程代理B会把所有消息拷贝看做是合法的,因此它把这些(来自代理A的)消息路由给它的N个本地订阅(消费者)时,就会发生消息重复 —— 每个拷贝都被转发给每个本地订阅者。
ActiveMQ的默认行为是,把远程代理B上的N个匹配的订阅看成是单个订阅,避免重复消息的发生。
设置duplex=true后,连接双方使用同一连接进行通信,构成双向网桥。此双向配置会自动传递到对方,因此对方不需要任何配置。
代理网络的工作机制依赖于通知消息(advisory messages)——代理使用这些消息来 感知远程代理上的新消费者。在代理启动时,它会创建 ActiveMQ.Advisory.Consumer.> 的消费者,这样,当远程代理连接/断开了Consumer时,本地代理会获得通知。
在小型网络、少量目标、消费者的情况下没有问题,反之就会导致大量网络开销,所有,在代理网络的配置中,包含若干种过滤代理共享目标的方法:
- dynamicallyIncludedDestinations:这意味着只有远程代理上有匹配目标的消费者时,才进行转发:
1234567<networkConnector uri="static:(tcp://host)"><dynamicallyIncludedDestinations><!-- 仅当远程代理上存在如下目标的消费者时,才转发这些目标上的消息 --><queue physicalName="include.test.foo"/><topic physicalName="include.test.bar"/></dynamicallyIncludedDestinations></networkConnector> - staticallyIncludedDestinations:这意味着不管远程代理上有没有匹配目标的消费者,消息均被转发:
12345<networkConnector uri="static:(tcp://host)" staticBridge="true"><staticallyIncludedDestinations><queue physicalName="always.include.queue"/></staticallyIncludedDestinations></networkConnector>
可以通过队列/主题名称或者通配符应用一系列的策略,在destinationPolicy 元素下指定。示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
<destinationPolicy> <policyMap> <policyEntries> <policyEntry queue=">" producerFlowControl="true" memoryLimit="20mb"> <deadLetterStrategy> <individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/> </deadLetterStrategy> </policyEntry> <policyEntry topic=">" producerFlowControl="true" memoryLimit="20mb"> </policyEntry> </policyEntries> </policyMap> </destinationPolicy> |
支持的策略包括:
expireMessagesPeriodÈ
属性 | 默认值 | 说明 |
producerFlowControl | true | 如果代理没有足够的资源(例如内存),那么生产者将减速并最终被 阻塞 |
enableAudit | true | 跟踪重复消息(重复消息可能出现在非持久化消息的failover) |
useCache | true | 持久化消息可以被缓存,以便后续的快速存取 |
maxPageSize | 200 | 最大单次可以从持久化存储获取消息的页数 |
maxBrowsePageSize | 400 | 在浏览时,最大单次可以从持久化存储获取消息的页数 |
memoryLimit | 指定目标的内存限制 | |
minimumMessageSize | 1024 | 用于嵌入式代理:假设的消息最小内存使用量 |
cursorMemoryHighWaterMark | 70% | 触发写入磁盘的内存用量高水位 |
prioritizedMessages | false | 让存储根据消息优先级进行安排 |
advisoryForConsumed | false | 当消息被消费,发送一个通知消息 |
advisoryForDelivery | false | 当消息被发送到客户端,发送一个通知消息 |
advisoryForSlowConsumers | false | 当某个消费者运行缓慢时,发送一个通知消息 |
advsioryForFastProducers | false | 当某个消费者运行迅速时,发送一个通知消息 |
advisoryWhenFull | false | 在资源耗尽(内存、存储、临时存储)耗尽时,发送一个通知消息 |
gcInactiveDestinations | false | 删除不活动的目标 |
inactiveTimoutBeforeGC | 5000 | 目标在多长时间ms内不活动,被认为是可删除的 |
usePrefetchExtension | true | 预读取扩展在消息被递送,但是没有确认时使用 |
slowConsumerStrategy | null | 处理缓慢消费者的策略 |
用于队列目标的额外策略 | ||
useConsumerPriority | true | 分发消息时,根据消费者优先级计算分发目标 |
strictOrderDispatch | false | 如果启用,队列将一直供单个消费者使用,直到预读取缓冲满了 |
optimizedDispatch | false | 不使用单独的线程来发送队列中的消息 |
lazyDispatch | false | |
consumersBeforeDispatchStarts | 0 | 第一个消费者连接后,等待N个消费者连接,才进行消息分发 |
timeBeforeDispatchStarts | 0 | 当第一个消费者连接,等待消息分发的延迟 |
queuePrefetch | 预读取的默认数量 | |
expireMessagesPeriod | 30000 | 检查消息过期的周期,0表示不检查,单位ms |
persistJMSRedelivered | false | 如果为真,在持久消息第一次被分发前,消息被重写以反映可能的传 递,确保JMSRedelivered头是一个可靠的值 |
用于主题目标的额外策略 | ||
topicPrefetch | 预读取的默认数量 | |
durableTopicPrefetch | 持久订阅者预读取的默认数量 | |
advisoryForDiscardingMess ages | false | 当消息被从非持久订阅中废弃时,发送一个通知消息 |
expireMessagesPeriod | 30000 | 为不活动的持久化订阅检查消息过期的周期,0表示不检查,单位ms |
persistenceAdapter用于配置ActiveMQ持久化消息的方式,目前支持以下几种存 储机制:
- AMQ消息存储:默认的消息存储。消息被存储到数据日志中,日志文件的默认 大小为32mb,如果单个消息可以超过此大小,则需要调整。如果日志文件中 的所有消息被成功消费,ActiveMQ会做标记,以便清理和归档。示例:
1<journalPersistenceAdapter directory="${activemq.base}/data" maxFileLength="32mb" /> - KahaDB消息存储:提供了容量的提升和恢复能力(5.3以上采用)。示例:
1<kahaDB directory="${activemq.data}/kahadb" /> - JDBC消息存储:消息使用JDBC接口,存放于数据库中,指定数据源即可。示例:
1<jdbcPersistenceAdapter dataSource="#mysql-ds"/> - Memory消息存储:基于内存的消息存储。指定broker的属性 persistent="false"即可
ActiveMQ 4.x以上的版本,提供了可拔插的安全机制,主要包括:
- JAAS——Java认证与授权服务
- 一个内置的基于XML配置的简单认证(simpleAuthentication插件)
simpleAuthentication的例子(注意启用了匿名访问):
1 2 3 4 5 6 7 |
<simpleAuthenticationPlugin anonymousAccessAllowed="true"> <users> <authenticationUser username="system" password="manager" groups="users,admins"/> <authenticationUser username="user" password="password" groups="users"/> <authenticationUser username="guest" password="password" groups="guests"/> </users> </simpleAuthenticationPlugin> |
如果代理网络的一方启用了身份验证,那么,连接到它时,必须提供身份验证信 息:
1 2 3 4 |
<networkConnectors> <networkConnector name="brokerAbridge" userName="user" password="password" uri="static://(tcp://brokerA:61616)"/> </networkConnectors> |
支持对队列、主题的3种访问权限:读(Read)、写(Write)、管理(Admin)。 其中管理权限用于动态延迟创建队列。授权配置的例子如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
<plugins> <authorizationPlugin> <map> <authorizationMap> <authorizationEntries> <authorizationEntry queue=">" read="admins" write="admins" admin="admins"/> <authorizationEntry queue="USERS.>" read="users" write="users" admin="users"/> <authorizationEntry queue="GUEST.>" read="guests" write="guests,users" admin="guests,users"/> <authorizationEntry topic=">" read="admins" write="admins" admin="admins"/> <authorizationEntry topic="USERS.>" read="users" write="users" admin="users"/> <authorizationEntry topic="GUEST.>" read="guests" write="guests,users" admin="guests,users"/> <authorizationEntry topic="ActiveMQ.Advisory.>" read="guests,users" write="guests,users" admin="guests,users"/> </authorizationEntries> <tempDestinationAuthorizationEntry> <tempDestinationAuthorizationEntry read="tempDestinationAdmins" write="tempDestinationAdmins" admin="tempDestinationAdmins"/> </tempDestinationAuthorizationEntry> </authorizationMap> </map> </authorizationPlugin> </plugins> |
在某些情况下,特别是嵌入式代理,可以限制ActiveMQ使用的内存、硬盘资源的数量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
<systemUsage> <systemUsage> <!-- 内存用量,用于跟踪目的地、进行消息缓存 --> <memoryUsage> <memoryUsage limit="20 mb"/> </memoryUsage> <!-- 下面是磁盘中用于持久化消息的用量 -> <storeUsage> <storeUsage limit="1 gb" name="foo"/> </storeUsage> <!-- 下面是磁盘中用于非持久化消息的用量 --> <tempUsage> <tempUsage limit="100 mb"/> </tempUsage> </systemUsage> </systemUsage> |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
SystemUsage usage = new SystemUsage(); Runtime runtime = Runtime.getRuntime(); MemoryUsage mu = new MemoryUsage(); mu.setPercentOfJvmHeap( 20 ); usage.setMemoryUsage( mu ); StoreUsage su = new StoreUsage(); su.setStore( pa ); su.setLimit( DISK_USAGE_LIMIT_DEFAULT ); usage.setStoreUsage( su ); TempUsage tu = new TempUsage(); su.setStore( pa ); su.setLimit( DISK_USAGE_LIMIT_DEFAULT ); usage.setTempUsage( tu ); broker.setSystemUsage( usage ); |
1 |
<amq:connectionFactory id="jmsFactory" brokerURL="vm://localhost"/> |
使用PooledConnectionFactory可以提供连接池功能:连接、会话、消息生产者的实例可以被缓存,以便和Spring JmsTemplate、MessageListeners以及Camel等框架集成使用。
注意:该连接池不会缓存消息消费者,通常消息消费者在启动后即保持活动,直到必要时手工关闭。 另外,Spring自带的CachingConnectionFactory也可以作为连接池使用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>vm://dataSyncBroker-${appCfg.currentMonitorCenterId}</value> </property> </bean> </property> </bean> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="jmsFactory" /> <!-- 下面的属性必须为true,才能设置Qos相关的消息头,例如deliveryMode, priority, timeToLive --> <property name="explicitQosEnabled" value="true" /> </bean> |
通常使用JmsTemplate来同步的生产、消费消息:
1 2 3 4 5 6 7 |
jmsTemplate.send( getDataSyncQueueName( mc ), new MessageCreator() { @Override public Message createMessage( Session session ) throws JMSException { TextMessage message = session.createTextMessage( msg.toJSON() ); return message; } } ); |
DefaultMessageListenerContainer组件不是单个类,而是一个良好抽象的、用于接收JMS消息的抽象层。其概念类似 于J2EE的消息驱动Bean(Message Driven Beans,MDB)。其包含的特性有:
- 多层次的JMS资源缓存(连接、会话)、消费者缓存
- 根据负载,动态增减同时处理消息的消费者数量
- 如果代理不可用,自动重新建立连接
- 基于Spring TaskExecutor的异步消息监听
- 在消息接收、监听执行时,支持本地JMS事务,以及外部分布式事务管理器
- 支持多种消息确认模式
配置示例:
1 2 3 |
<jms:listener-container container-type="default" connection-factory="jmsFactory" acknowledge="auto"> <jms:listener destination="DATA.SYNC.1000" ref="msgTransceiver" method="onMessage" /> </jms:listener-container> |
容器类型这里选择的是default,还可以选择simple,后者不支持事务处理。msgTransceiver.onMessage的方法为监听到消息后的回调:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
public class MsgTransceiverImpl implements MsgTransceiver, MessageListener { public void onMessage( Message message ) { try { TextMessage msg = (TextMessage) message; String text = msg.getText(); LOGGER.debug( "Received Message: \n" + text ); onMsgReceived( Msg.create( text ) ); } catch ( JMSException e ) { LOGGER.error( e.getMessage(), e ); } } } |
除了实现JMS标准的MessageListener以外,还可以使用:
- SessionAwareMessageListener,接口提供对Session的访问,在实现请求/应答模式 时非常有用注意:你需要自行覆盖handleListenerException方法进行异常处理
- MessageListenerAdapter,使用此接口,可以避免代码与JMS有关联,可以处理类型 具体化的消息。
Leave a Reply