Apache Storm学习笔记
Apache Storm是一个分布式的实时计算系统,它能够可靠的对无边界的数据流进行处理。与Hadoop的批量处理方式不同,Storm对数据进行的是实时处理。Storm很简单,支持很多编程语言。
Storm的应用场景包括:实时分析、在线机器学习、持续计算、分布式RPC、分布式ETL。
Storm的性能很好,每个节点在一秒内能够处理多达100万个元组。Storm支持无限制扩容、容错、并且保证数据能够被处理。
你可以让既有的数据库、消息队列系统和Storm进行集成。Storm拓扑消费数据流,以任意复杂的方式处理数据流,数据流可以在任何计算阶段(Stage)重新分区。
实时计算的逻辑被封装到Storm拓扑中,拓扑和MapReduce的Job类似。一个关键的区别是,MapReduce Job最终会结束,而拓扑则一直运行下去。
你可以使用TopologyBuilder构建一个拓扑。
Storm把每个具体的工作委托给多种类型的组件处理。这些组件都装配在拓扑中。
拓扑可以跨越Storm集群的多个工作节点运行。
流是Storm的核心抽象,它是无边界的元组的序列。流以分布式的风格被并行的创建、处理。流的元素是元组,元组可以包含整数、字符串、浮点数、布尔、比特数组等类型的字段。你可以自己实现串行化器,支持任何自定义数据类型。
你可以使用OutputFieldsDeclarer来声明流以及流的Schema。在声明的时候,每个流被赋予一个标识符。仅仅释放单个流的Spout很常见,因此OutputFieldsDeclarer提供了声明单个流的便捷方法,不需要指定标识符,流的标识符自动设置为default。
Storm中被处理的数据的基本单元,本质上就是一个预定义的字段的值列表。
Spout可以释放出元组,不同的Bolt可以读取同一Spout产生的元组,并释放不同格式的元组,供其它Bolt消费。
在拓扑中,Spout是流的来源。一般来说,Spout从外部读取元组,并释放(Emit)出元组的流(Spout本意即喷射口)。Spout可以是可靠的(Reliable )或者不可靠的。可靠的Spout支持在处理失败后重复(Replay)元组。所有Spout必须实现IRichSpout接口。
Spout可以释放不止一个流,你可以多次调用 OutputFieldsDeclarer的 declareStream。你可以指定 SpoutOutputCollector.emit调用释放哪个流。
Spout的主要方法是nextTuple,调用它可以释放一个新的元组到拓扑中,如果没有可用的元组,该方法应该简单的返回。Storm强制要求所有Spout实现类的nextTuple方法是非阻塞性的,因为Storm在单个线程中调用Spout的所有方法。
其它重要的方法包括ack、fail,这些方法仅仅针对可靠Spout调用,分别在元组顺利通过拓扑后、处理失败后调用,将处理结果通知给Spout。
Spout将数据传递给Bolt这种组件进行实质上的逻辑处理。拓扑中最主要的内容就是Bolt构成的链条。
拓扑中所有处理逻辑发生在Bolt中,Bolt可以进行过滤、聚合、Join等操作,可以和数据库进行交互。
Bolt可以进行简单的流变换(Stream Transformation),复杂的流变换通常需要多个步骤,也就需要多个Bolt。
Bolt可以释放不止一个流,你可以多次调用 OutputFieldsDeclarer的 declareStream。你可以指定 OutputCollector.emit调用释放哪个流。
在声明Bolt的输入流时,你需要订阅其它组件的某个特定流,如果要订阅某个组件的多个流,你需要逐个订阅。InputDeclarer提供了订阅具有默认标识符的流的快捷方法,例如 declarer.shuffleGrouping("1")订阅组件1的 DEFAULT_STREAM_ID流。在构建拓扑时,你也可以声明这种流订阅。
Bolt的主要方法是execute,此方法接受一个元组输入。Bolt使用OutputCollector对象释放新的元组。Bolt会针对每个输入元组调用OutputCollector.ack方法,以通知Storm目标元组已经处理成功,这些ack调用最终导致Spout.ack被调用。大部分Bolt根据输入元组输出0-N个输出元组。IBasicBolt提供自动确认(Ack)功能。
在Bolt中启动新线程进行异步处理是很好的做法,OutputCollector是线程安全的,你可以在任何时候调用它。
Bolt的基础接口是IRichBolt。要设计进行过滤、简单function的Bolt可以选择实现IBasicBolt接口。Bolt调用OutputCollector来释放元组到它的输出流中。
定义拓扑的一部分工作是,指定每个Bolt能够接受哪些流作为其输入。流分组(Stream Grouping)指定了如何在Bolt的Task之间进行流的分区(Partition)。
Storm可以保证每个元组都被拓扑处理。为了确认元组的处理过程,Storm对Spout释放出的元组树进行跟踪,并以此确定何时元组树被成功处理。每个拓扑都具有一个消息超时属性,如果在此超时到达之前没有检测到元组已经处理成功,则认定处理失败,并Replay目标元组。
之所有叫元组树,是因为在任何一个Bolt中,一个源元组可能产生多个目标元组,这样经过多个Bolt后,就形成树状依赖关系。注意,目的和源的依赖关系需要编程式的声明,一个目标元组可以依赖于多个源元组。
每个Spout、Bolt在集群中执行一定数量的Task,每个Task对应一个执行线程。流分组决定了元组如何从一组Task分发到另外一组Task。调用TopologyBuilder的setSpout/setBolt方法可以设置并发度。
Topology跨越1-N个工作进程执行,每个工作进程都是一个JVM实例,负责执行拓扑中所有Task的一个子集。Storm会尝试尽可能平均的分配Task给Worker。例如,一个总和并行度为300的拓扑,分配了50个工作进程,则每个进程需要执行6个Task。
Storm集群包含两类节点:
- 主节点(Master):这类节点运行一个名为Nimbus的守护程序。此程序负责在集群中分发代码(处理逻辑)、向工作节点分配Task、监控Task是否执行失败
- 工作节点(Worker):这类节点运行一个名为Supervisor的守护程序。此程序负责执行拓扑的某个部分
Storm支持在本地磁盘或者ZooKeeper上保存集群的所有状态信息,因此守护程序宕机不会对系统的健康状况产生影响。
Storm和Hadoop的核心概念的对应关系如下图:
Storm | Hadoop | |
组件 | JobTracker | Nimbus |
TaskTracker | Supervisor | |
Child | Worker | |
应用 | Job | Topology |
接口 | Mapper/Reducer | Spout /Bolt |
Storm提供四类内置的调度器: DefaultScheduler, IsolationScheduler, MultitenantScheduler, ResourceAwareScheduler。
Storm有两种运行模式(Operation Modes)。
在此模式下,Storm的拓扑在本地机器中单个JVM中运行。此模式主要用于开发、测试和调试。
在此模式下,我们需要将定义好的拓扑提交到Storm集群中。集群通常由运行在很多机器中的进程组成。远程模式下不会显示调试信息,通常用于产品环境。
你也可以在单台机器上进行远程模式的部署。
Storm包含若干不同的守护进程,Nimbus能够调度Worker进程,Supervisors负责启动、杀死Worker进程。Logger View让你能够访问日志。Web服务器让你能够通过GUI查看Storm状态。
当Worker进程死掉后,Supervisor会重新启动它。如果Worker反复死掉并且不能完成到Nimbus的心跳检测,Nimbus会重新调度此Worker。
当节点宕机后,Nimbus会重新分配在节点上执行的Task给其它节点。
Nimbus/Supervisor被设计为无状态的,所有状态信息存放在ZooKeeper或者磁盘上。因此它们宕机后,你可以安全的重启它们。你应该考虑使用daemontools/monit之类的工具监控这类进程的状态,并在它们死掉后立即重启。
如果Nimbus节点(Master)宕机,Worker节点仍然会继续工作。此外Supervisor仍然会自动重启死掉的Worker进程。但是没有Nimbus就不能把Worker节点分配到其它机器上。从1.0.0开始,Storm支持Nimbus的HA。
在本章,我们创建一个简单的,分析动力环境监测值的拓扑。
此类型模拟监测值采集程序,以随机的方式产生监测值:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 |
package cc.gmem.study.storm.mv; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.Range; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import java.util.*; import java.util.concurrent.TimeUnit; import static org.slf4j.LoggerFactory.getLogger; public class MessageSource extends Thread { private Map<String, Pair<Double, Double>> config; private ObjectMapper om; Queue<String> messages; private static final Logger LOGGER = getLogger( MessageSource.class ); private boolean term; public MessageSource( Map<String, Pair<Double, Double>> config ) { super(); this.config = config; this.messages = new LinkedList<>(); this.om = new ObjectMapper(); start(); } @Override public void run() { setName( "MessageSource" ); LOGGER.debug( "Starting monitoring value source thread" ); while ( true ) { if ( term ) { LOGGER.debug( "Terminating monitoring value source" ); break; } List<Map<String, Object>> mvs = new ArrayList<>(); MutableInt idPfx = new MutableInt( 10 ); config.forEach( ( type, pair ) -> { int n = RandomUtils.nextInt( 1, 5 ); for ( int i = 0; i < n; i++ ) { Map<String, Object> mv = new LinkedHashMap<>(); double value = RandomUtils.nextDouble( pair.getLeft(), pair.getRight() ); mv.put( "id", idPfx.intValue() + RandomUtils.nextInt( 1, 5 ) ); mv.put( "type", type ); mv.put( "value", value ); mvs.add( mv ); } idPfx.add( 10 ); } ); LOGGER.debug( "Generated new message with {} monitoring value", mvs.size() ); try { String msg = om.writeValueAsString( mvs ); messages.offer( msg ); TimeUnit.SECONDS.sleep( 10 ); } catch ( Exception e ) { LOGGER.error( e.getMessage(), e ); } } } public boolean available() { return !messages.isEmpty(); } public String nextMessage() { return messages.poll(); } public void singalTerm() { this.term = true; } } |
MessageReader是此拓扑中唯一的Spout,从MessageSource中读取监测值报文并直接传递给Bolt处理。
任何Spout需要实现IRichSpout接口,你可以选择继承缺省适配BaseRichSpout。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
package cc.gmem.study.storm.mv; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichSpout; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; public class MessageReader implements IRichSpout { private static final Logger LOGGER = LoggerFactory.getLogger( MessageReader.class ); private MessageSource msgSrc; private SpoutOutputCollector collector; /** * 当前组件的任一个Task在集群的某个工作节点中初始化时,自动调用此方法 * * @param conf 配置信息,在创建拓扑时指定的配置合并到当前机器的集群配置而成 * @param context 拓扑的上下文信息,可以获得组件ID、Task ID、输入输出信息 * @param collector 输出收集器,线程安全,应该作为实例变量 * 此收集器可以在任何时候调用,以释放元组供下游组件消费 */ @Override public void open( Map conf, TopologyContext context, SpoutOutputCollector collector ) { LOGGER.debug( "Spout {}/{} opening", context.getThisComponentId() ,context.getThisTaskId()); Map<String, Pair<Double, Double>> config = new LinkedHashMap<>(); // 读取配置信息 List<List<Object>> msgSrcCfg = (List<List<Object>>) conf.get( "msgSrcCfg" ); msgSrcCfg.forEach( typeCfg -> { config.put( (String) typeCfg.get( 0 ), new ImmutablePair( (Double) typeCfg.get( 1 ), (Double) typeCfg.get( 2 ) ) ); } ); this.msgSrc = new MessageSource( config ); this.collector = collector; } /** * 当此Spout即将被关闭时调用 * <p> * 注意:此方法通常不保证一定被调用,因为在集群中Supervisor会以kill -9杀死工作进程。 * 当以本地模式运行Storm时,此方法保证被调用 */ @Override public void close() { LOGGER.debug( "Spout closing" ); msgSrc.singalTerm(); } /** * 当Spout被激活后调用,激活后nextTuple很快被调用 * 使用Storm客户端可以激活或者禁用Spout */ @Override public void activate() { } /** * 当Spout被禁用后调用,处于禁用状态的Spout的nextTuple不会被调用 */ @Override public void deactivate() { } /** * 此方法会被周期性的调用,释放下一个元组 */ @Override public void nextTuple() { if ( !msgSrc.available() ) { try { // 休眠至少10ms再返回,避免无限制的CPU占用 TimeUnit.MILLISECONDS.sleep( 10 ); return; } catch ( InterruptedException e ) { } } // Values是对ArrayList<Object>的简单封装,其值可以是任何可串行化的Java对象 String msg = msgSrc.nextMessage(); LOGGER.debug( "Emit new tuple: {}", msg ); this.collector.emit( new Values( msg ) ); } /** * 当此Spout释放出的具有指定ID的元组已经成功通过拓扑之后,Storm调用此方法 * 此方法的典型实现是,将对应数据从队列中移除并且阻止它被Replay * * @param msgId 元组的ID */ @Override public void ack( Object msgId ) { LOGGER.debug( "Tuple with id {} acknowledged", msgId ); } /** * 当此Spout释放出的具有指定ID的元组在某个环节处理失败后,Storm调用此方法 * 此方法的典型实现是,将对应数据放回队列,以便后续Replay * * @param msgId 元组的ID */ @Override public void fail( Object msgId ) { LOGGER.debug( "Tuple with id {} failed", msgId ); } /** * 此方法来自IComponent接口,用于声明针对此组件的配置。仅topology.*之下的配置可以被覆盖 * 当通过TopologyBuilder构建拓扑时,这里声明的配置可能被覆盖 * * @return */ @Override public Map<String, Object> getComponentConfiguration() { return null; } /** * 此方法来自IComponent接口,声明输出元组包含的字段 */ @Override public void declareOutputFields( OutputFieldsDeclarer declarer ) { // 为当前组件的默认输出流声明字段 // 你还可以调用 declarer.declareStream( streamId,fields ) 声明新的输出流 declarer.declare( new Fields( "message" ) ); } } |
这是一个Bolt,读取监测值报文并解析之,释放多个监测值信息元组。
任何Bolt都需要实现IRichBolt接口。你可以选择实现IBasicBolt(继承BaseBasicBolt),这样Storm会自动进行元组确认(Ack),使用IBasicBolt时你可以抛出FailedException表示当前元组处理失败。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 |
package cc.gmem.study.storm.mv; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang3.mutable.MutableInt; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.slf4j.Logger; import java.io.IOException; import java.util.List; import java.util.Map; import static org.slf4j.LoggerFactory.getLogger; public class MessageParser implements IRichBolt { private static final Logger LOGGER = getLogger( MessageParser.class ); private ObjectMapper om; private OutputCollector collector; /** * 当前组件的任一个Task在集群的某个工作节点中初始化时,自动调用此方法 * 类似Spout的open方法 */ @Override public void prepare( Map stormConf, TopologyContext context, OutputCollector collector ) { LOGGER.debug( "Bolt {}/{} preparing", context.getThisComponentId(), context.getThisTaskId() ); om = new ObjectMapper(); this.collector = collector; } /** * 处理单个输入元组,每当接收到新元组此方法都被调用 * Tuple对象包含一些元数据信息:此元组来自什么component/stream/task * 调用Tuple#getValue可以获得元组的实际数据 * <p> * Bolt不一定要立刻处理元组,例如你可能暂存它并在后续进行聚合或连接操作 * 每个元组都需要在未来某个时刻被ack/fail,否则Storm无法确认Spout释放的原初元组释放已经被完整除了 * 子接口IBasicBolt支持在execute()执行完毕后自动ack * <p> * 如果此Bolt要释放元组,需要调用prepare传入的OutputCollector * * @param input The input tuple to be processed. */ @Override public void execute( Tuple input ) { // 一个Bolt获得的元组可能来自不同的流 LOGGER.debug( "Received input tuple from {}", input.getSourceStreamId() ); // 元组字段可以根据名称或者索引读取 String msg = input.getStringByField( "message" ); try { List<Map<String, Object>> mvs = om.readValue( msg, List.class ); MutableInt count = new MutableInt( 0 ); mvs.forEach( mv -> { collector.emit( new Values( mv.get( "id" ), mv.get( "type" ), mv.get( "value" ) ) ); count.increment(); } ); LOGGER.debug( "{} monitoring value parsed", count.intValue() ); } catch ( IOException e ) { collector.fail( input ); } collector.ack( input ); } /** * 此Bolt即将被关闭时调用,此调用不保证一定发生 * 类似Spout的close方法 */ @Override public void cleanup() { } @Override public void declareOutputFields( OutputFieldsDeclarer declarer ) { declarer.declare( new Fields( "id", "type", "value" ) ); } @Override public Map<String, Object> getComponentConfiguration() { return null; } } |
这是一个Bolt,它读取MessageParser释放的监测值元组,然后进行实时统计分析。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 |
package cc.gmem.study.storm.mv; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple; import org.slf4j.Logger; import java.math.BigDecimal; import java.math.RoundingMode; import java.util.Map; import static java.math.BigDecimal.ONE; import static java.math.BigDecimal.ZERO; import static org.slf4j.LoggerFactory.getLogger; public class MonitorValueCounter extends BaseBasicBolt { private static final Logger LOGGER = getLogger( MonitorValueCounter.class ); BigDecimal[] counter; @Override public void prepare( Map stormConf, TopologyContext context ) { LOGGER.debug( "Bolt {}/{} preparing", context.getThisComponentId(), context.getThisTaskId() ); counter = new BigDecimal[]{ ZERO /*计数*/, ZERO /*平均*/, BigDecimal.valueOf( Integer.MAX_VALUE ) /*最小*/, ZERO /*最大*/ }; } @Override public void execute( Tuple input, BasicOutputCollector collector ) { String type = input.getStringByField( "type" ); BigDecimal value = BigDecimal.valueOf( input.getDoubleByField( "value" ) ); if ( counter[2].compareTo( value ) > 0 ) counter[2] = value; if ( counter[3].compareTo( value ) < 0 ) counter[3] = value; if ( counter[0].equals( ZERO ) ) { counter[0] = ONE; counter[1] = value; } else { BigDecimal newCount = counter[0].add( ONE ); counter[1] = counter[1].multiply( counter[0] ).add( value ).divide( newCount, RoundingMode.HALF_EVEN ); counter[0] = newCount; } LOGGER.debug( "Type: " + type + ", Count: {}, Avg: {}, Min: {}, Max: {}", counter ); } @Override public void declareOutputFields( OutputFieldsDeclarer declarer ) { } } |
主程序,创建拓扑并提交到集群中执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 |
package cc.gmem.study.storm.mv; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated.StormTopology; import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; public class MonitorValueStatTopology { public static void main( String[] args ) throws InterruptedException { TopologyBuilder builder = new TopologyBuilder(); // 每个组件都具有一个ID,供其它组件引用,以消费此组件的输出 builder.setSpout( "message-reader", new MessageReader() ); builder.setBolt( "message-parser", new MessageParser() ) .shuffleGrouping( "message-reader" ); // 消费message-reader的默认流,随机、均匀分发元组给当前Bolt builder.setBolt( "monitor-value-counter", new MonitorValueCounter() ) // 设置并发度 .setNumTasks( 3 ) // 消费message-parser的默认流,根据输入元组的type字段分发到不同的Task // 如果Task数量不足,则type字段值不同的元组会被分发给同一个Task .fieldsGrouping( "message-parser", new Fields( "type" ) ); Config conf = new Config(); List<Values> msgSrcCfg = new ArrayList<>(); msgSrcCfg.add( new Values( "温度", new Double( 0 ), new Double( 40 ) ) ); msgSrcCfg.add( new Values( "湿度", new Double( 0 ), new Double( 100 ) ) ); msgSrcCfg.add( new Values( "电压", new Double( 0 ), new Double( 360 ) ) ); conf.put( "msgSrcCfg", msgSrcCfg ); conf.setDebug( true ); // 对于每一个Spout Task,处于未决状态的元组最大数量 // 所谓未决,即元组尚未被ack/fail conf.put( Config.TOPOLOGY_MAX_SPOUT_PENDING, 1 ); // 创建一个本地模式的Storm集群 LocalCluster cluster = new LocalCluster(); // 创建拓扑 StormTopology topology = builder.createTopology(); // 提交拓扑到集群 cluster.submitTopology( "monitor-value-stat-topology", conf, topology ); TimeUnit.SECONDS.sleep( 60 ); // 关闭集群 cluster.shutdown(); } } |
Storm对于下述运行在集群中的拓扑中的实体:
- 工作进程(Worker processes)
- 执行器(Executors,线程)
- 任务(Task)
进行了明确的区分:
- Storm集群中的节点(机器)可以运行0-N个工作进程,这些工作进程可以属于1-N个拓扑。每个工作进程就是一个JVM,它仅仅为单个拓扑运行Executor。也就是说Worker不能被两个拓扑共享
- 单个Worker进程中可以运行1-N个Executor,每个Executor都是Worker进程产生的线程。每个Executor运行单个拓扑的单个组件的1-N个Task。也就是说Executor不能被两个组件共享
- 任务进行实际的数据处理,它执行Spout或者Bolt中的代码逻辑。它由Executor来执行
可以看到:
- 工作进程执行一个拓扑的子集,它只能属于一个拓扑
- 工作进程可以运行多个执行器线程,这些线程可以运行不同的组件。但是每个线程只能运行一种组件的任务
- 任务必须在执行器线程内部运行
要对并行度进行配置,参考并发度配置。
流分组指定了如何在Bolt的Task之间进行流的分区(Partition)——流的每个元组应该由哪个Task处理。
在定义拓扑的时候,你可以为Bolt指定流分组,指定流分组的同时也就指定了Bolt的数据源。你可以针对一个bolt多次指定流分组,也就是指定多个输入流:
1 2 3 4 |
builder.setBolt( "monitor-value-counter", new MonitorValueCounter() ) .fieldsGrouping( "message-parser", new Fields( "type" ) ) // 为Bolt指定另一个输入:signals-reader组件的signals流 .allGrouping("signals-reader","signals"); |
要实现自己的流分组,需要实现CustomStreamGrouping接口。通常情况下,你应该优先考虑使用Storm内置的流分组。
流分组 | 说明 | ||
Shuffle | 最常用的一种流分组。元组被随机的分发给Bolt的Task,此实现保证每个Task接受相等数量的元组 | ||
Fields |
根据元组的字段值进行分组,字段值相同的元组被分发给同一个Task 可以基于1-N个字段的值进行分组 |
||
Partial Key | 元组首先按照字段分组的方式被分区,并进一步的被两个下游的Bolt进行负载均衡 | ||
All |
输入流的每个元组被复制到Bolt的所有实例(Task) 需要执行某种广播逻辑时,考虑使用这种流分组 |
||
Global | 整个流被转发给单个具有最小Id的那个Task | ||
None | 表示不关心如何进行分组,当前此分组的行为和Shuffle类似。Storm在处理这种方式分组的流时,上游Spout/Bolt和下游Bolt使用同一执行线程 | ||
Direct |
上游Spout/Bolt(生产者)直接决定元组分发给下游Bolt(消费者)的哪个Task 这种分组方式仅仅能应用到被声明为直接流(Direct Stream)的流上。要释放元组到直接流上,必须调用OutputCollector.emitDirect方法。获得消费者的Task Id途径可以是:
示例代码:
|
||
Local or shuffle | 如果下游Bolt在相同(与释放流的组件)工作进程(Work Process)中行运行着一个或者更多的Task,则元组被分发给这些进程内的Task。否则,行为与Shuffle grouping相同 |
实现CustomStreamGrouping、Serializable接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
package cc.gmem.study.storm.mv; import org.apache.storm.generated.GlobalStreamId; import org.apache.storm.grouping.CustomStreamGrouping; import org.apache.storm.task.WorkerTopologyContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.Serializable; import java.util.List; import static org.slf4j.LoggerFactory.getLogger; // 实现Serializable接口,Storm需要跨JVM分发此对象 public class DeviceStreamGrouping implements CustomStreamGrouping, Serializable { private static final Logger LOGGER = getLogger( DeviceStreamGrouping.class ); private List<Integer> targetTasks; /** * 初始化流分组实例,告知流分组必要的信息 * * @param context 拓扑上下文 * @param stream 此分组应用到的流 * @param targetTasks 接收流的bolt的Task Id的列表 */ @Override public void prepare( WorkerTopologyContext context, GlobalStreamId stream, List<Integer> targetTasks ) { LOGGER.debug( "Apply grouping on stream {}.{}", stream.get_componentId(), stream.get_streamId() ); this.targetTasks = targetTasks; } /** * 确定元组分发到哪个Task * * @param taskId 源组件产生当前元组的那个Task的Id * @param values 当前元组 * @return 接受此元组的Task Id列表 */ @Override public List<Integer> chooseTasks( int taskId, List<Object> values ) { return targetTasks; } } |
使用上述自定义流分组:
1 |
builder.setBolt(...).customGrouping( "message-parser", new DeviceStreamGrouping() ); |
此类用于在当前JVM中建立一个本地模式的集群。通常在开发阶段使用,可以方便的对各种拓扑进行调试:
1 2 3 4 |
LocalCluster cluster = new LocalCluster(); StormTopology topology = builder.createTopology(); // 提交到本地集群 cluster.submitTopology( "topology-name", conf, topology ); |
此类用于将拓扑提交到一个远程Storm集群中运行。与本地模式不同,你不能对远程集群进行控制。
1 |
StormSubmitter.submitTopology("monitor-value-stat-topology", conf, builder.createTopology()); |
下文有一个实际的例子。
通过storm客户端命令,也可以将拓扑提交到远程集群。首先你需要打JAR包,然后执行:
1 2 3 4 |
# 提交执行,需要指定main方法及其参数,main方法调用了StormSubmitter.submitTopology storm jar topology.jar cc.gmem.study.storm.mv.MonitorValueStatTopology arg0 arg1... # 要终止提交的拓扑,需要传入其Id storm kill monitor-value-stat-topology |
注意,JAR的入口函数必须调用submitTopology。
本节列出常见的拓扑模式(Pattern)。
很多情况下,出于效率的考虑,你可能希望一批批的处理元组,而不是一个个的。例如,你可能希望批量的将元组入库,以利用数据库的吞吐能力。
你可以简单的将元组存储到实例变量中,等到积累的足够多了,批量的处理它们然后统一Ack。
如果执行批处理的Bolt释放元组,你可能需要使用多重锚定,确定输出元组和输入元组之间的因果关系。
很多Bolt都是读取一个输入元组、处理后根据此元组释放0-N个元组,最后对输入元组进行Ack。基于IBasicBolt可以获得此模式的封装。
经过字段分组之后,该字段一样的元组总是由同一个Bolt处理。假设Bolt需要根据该字段进行某种转换,那么在Bolt实例变量中缓存这种转换结果,其缓存性能会较高。
例如,一个Bolt负责接收短URL并展开它,展开需要通过HTTP来调用第三方服务,因此缓存展开URL很有必要。可以在Bolt中创建一个短URL到展开URL的HashMap作为缓存。配合字段分组,让相同短URL总是分发给同一Bolt实例,这样上述缓存的命中率会比不分组高得多。
某些情况下,你需要处理很多输入元组,并且释放出其中最xx的10个元组。
最简单的方案是,使用全局流分组,也就是让所有元组都输入到单个Bolt实例中。这种方案可能会导致性能瓶颈。
一个改进的方案是,让多个Bolt都进行TopN计算,在一个下游的Bolt中,重新进行一次TopN计算:
1 2 3 4 |
builder.setBolt("rank", new RankObjects(), parallelism) .fieldsGrouping("objects", new Fields("value")); // 字段分组,每个实例进行自己的TopN计算 builder.setBolt("merge", new MergeObjects()) .globalGrouping("rank"); // 全局分组,但是要处理的数据量很少,不会出现瓶颈 |
我们知道,基于ack/fail可以保证消息:
- 要么完全通过拓扑
- 要么,可选的,进行Replay
问题是,如果进行Replay时,如何保证事务性?或者说,如何保证某些逻辑不会重复执行。
从0.7开始,Storm引入了事务性拓扑,可以保证Replay安全的进行,确保它们仅仅被处理一次。
在事务性拓扑中,Storm混合使用并行、串行的元组处理流。Spout生成的一批元组,被并行的发送给Bolt处理。某些Bolt被称为Committer,它们按照严格有序的方式提交已经被处理过的元组批次。
例如,Spout释放两个元组批次,每个批次包含5个元组,这两个元组被并发的交付Bolt处理,但是,如果链条中存在Committer Bolt,则在此Bolt批次必须严格有序的串行化 —— 它不会在第一个批次成功提交之前,提交第二个批次。
Storm事务由两个部分阶段组成:
- 处理阶段:完全并行阶段,很多批次可以被同时的执行
- 提交阶段:完全有序阶段,保证批次按照顺序逐一执行
我们以一个Twitter分析工具的例子来了解事务性拓扑的工作机制。此工具的逻辑如下:
- 读取Tweets并存储到Redis
- 通过若干Bolt处理Tweets
- 将处理结果存放到另一个Redis数据库,处理结果包括:所有主题标签(Hashtag)的及其出现次数、用户及其出现在Tweet中的次数、主题标签和用户出现在同一Tweet中的次数
用于构建此工具的拓扑示意如下:
说明如下:
- TweetsTransactionalSpout连接到数据源,读取并释放元组批次到拓扑
- 随后的两个Bolt会接受所有元组
- UserSplitterBolt,从Tweet中搜索用户(@之后的单词)
- HashatagSplitter,从Tweet中搜索主题标签(#之后的单词)
- 后面的UserHashtagJoinBolt,对前面两个Bolt的结果进行Join,获得每个用户+标签的组合同时出现的次数。此Bolt是BaseBatchBolt的子类型
- 最后的RedisCommitterBolt是一个Committer,它接受3个流。进行各种统计,并在处理完一批次后进行入库
相关说明已经附加在源码注释:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 |
package cc.gmem.study.storm.twt; import org.apache.storm.coordination.BatchOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.topology.base.BaseBatchBolt; import org.apache.storm.topology.base.BaseTransactionalBolt; import org.apache.storm.topology.base.BaseTransactionalSpout; import org.apache.storm.transactional.ICommitter; import org.apache.storm.transactional.ITransactionalSpout; import org.apache.storm.transactional.TransactionAttempt; import org.apache.storm.transactional.TransactionalTopologyBuilder; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import redis.clients.jedis.Jedis; import redis.clients.jedis.Transaction; import java.io.Serializable; import java.math.BigInteger; import java.util.*; public class TwitterAnalyticsTool { /* 封装Redis相关操作 */ public static class RQ { // 存放下一个读取游标的键 public static final String NEXT_READ = "NEXT_READ"; // 存放下一个写入游标的键 public static final String NEXT_WRITE = "NEXT_WRITE"; Jedis jedis; public RQ() { jedis = new Jedis( "localhost" ); } /* 可读Tweet量 */ public long getAvailableToRead( long current ) { return getNextWrite() - current; } public long getNextRead() { String sNextRead = jedis.get( NEXT_READ ); if ( sNextRead == null ) return 1; return Long.valueOf( sNextRead ); } public long getNextWrite() { return Long.valueOf( jedis.get( NEXT_WRITE ) ); } public void close() { jedis.disconnect(); } public void setNextRead( long nextRead ) { jedis.set( NEXT_READ, "" + nextRead ); } /* 从指定游标from开始,读取quantity条Tweet */ public List<String> getMessages( long from, int quantity ) { String[] keys = new String[quantity]; for ( int i = 0; i < quantity; i++ ) keys[i] = "" + ( i + from ); return jedis.mget( keys ); } } /** * 事务元数据,释放元组批次时用到此元数据: * from 事务的起始游标 * quantity 事务处理Tweet的数量 * <p> * 元数据必须包含Replay此事务所牵涉到的批次的所需要的全部信息 */ public static class TransactionMetadata implements Serializable { private static final long serialVersionUID = 1L; long from; int quantity; public TransactionMetadata( long from, int quantity ) { this.from = from; this.quantity = quantity; } } /** * 协调器,整个拓扑中仅仅有单个协调器实例 */ public static class TweetsTransactionalSpoutCoordinator implements ITransactionalSpout.Coordinator<TransactionMetadata> { private static final long MAX_TRANSACTION_SIZE = 16; RQ rq = new RQ(); long nextRead = 0; public TweetsTransactionalSpoutCoordinator() { nextRead = rq.getNextRead(); } /** * 根据上一次事务的元数据,确定本次事务的元数据 * <p> * 元数据必须包含Replay此事务所牵涉到的批次的所需要的全部信息 * <p> * 元数据被存放在ZooKeeper中,以txid为键。如果事务失败,Storm可以通过Emtter重放此事务 * * @param txid 本次事务标识符,此事务从未被提交过 * @param prevMetadata 上一个事务的元数据 * @return 本次事务的元数据 */ @Override public TransactionMetadata initializeTransaction( BigInteger txid, TransactionMetadata prevMetadata ) { long quantity = rq.getAvailableToRead( nextRead ); quantity = quantity > MAX_TRANSACTION_SIZE ? MAX_TRANSACTION_SIZE : quantity; TransactionMetadata ret = new TransactionMetadata( nextRead, (int) quantity ); nextRead += quantity; return ret; } /** * 如果可以发起新事务,返回true。如果不可以,应当睡眠一段时间然后返回false * 如果此方法返回true,initializeTransaction()被调用 * * @return */ @Override public boolean isReady() { return rq.getAvailableToRead( nextRead ) > 0; } @Override public void close() { rq.close(); } } /** * 发射器,根据事务元数据,从数据源读取一批次数据,并释放到流中 */ public static class TweetsTransactionalSpoutEmitter implements ITransactionalSpout.Emitter<TransactionMetadata> { RQ rq = new RQ(); /** * 释放一个批次,此方法的实现一定要能够重复执行 ———— 根据txId、txMeta必须可以重新释放此批次 * * @param tx 当前事务尝试 * @param txMeta 事务元数据 * @param collector 用于释放元组 */ @Override public void emitBatch( TransactionAttempt tx, TransactionMetadata txMeta, BatchOutputCollector collector ) { // 移动读游标 if ( tx.getAttemptId() == 0 ) { // attempt id存放当前事务重复的次数,由此可以推断是否Replay rq.setNextRead( txMeta.from + txMeta.quantity ); } // 读取一批消息 List<String> messages = rq.getMessages( txMeta.from, txMeta.quantity ); long tweetId = txMeta.from; // 下面释放的消息都在批次中 for ( String message : messages ) { collector.emit( new Values( tx, String.valueOf( tweetId ), message ) ); tweetId++; } } @Override public void cleanupBefore( BigInteger txid ) { } @Override public void close() { rq.close(); } } /** * 和普通的Spout不同,事务性Spout扩展自泛型接口BaseTransactionalSpout */ public static class TweetsTransactionalSpout extends BaseTransactionalSpout<TransactionMetadata> { /* 指定用来协调批次生成的协调器 */ @Override public Coordinator<TransactionMetadata> getCoordinator( Map conf, TopologyContext context ) { return new TweetsTransactionalSpoutCoordinator(); } /* 指定发射器,负责读取源中的一批元组,并释放到流中 */ @Override public Emitter<TransactionMetadata> getEmitter( Map conf, TopologyContext context ) { return new TweetsTransactionalSpoutEmitter(); } @Override public void declareOutputFields( OutputFieldsDeclarer declarer ) { declarer.declare( new Fields( "txid", "tweet_id", "tweet" ) ); } } /* 针对当前Tweet中每个@的用户,释放一个 tweetId,userId 元组 */ public static class UserSplitterBolt extends BaseBasicBolt { private static final long serialVersionUID = 1L; @Override public void execute( Tuple input, BasicOutputCollector collector ) { String tweetId = input.getStringByField( "tweet_id" ); String tweet = input.getStringByField( "tweet" ); StringTokenizer strTok = new StringTokenizer( tweet, " " ); TransactionAttempt tx = (TransactionAttempt) input.getValueByField( "txid" ); HashSet<String> users = new HashSet<String>(); while ( strTok.hasMoreTokens() ) { String user = strTok.nextToken(); // 如果一个Tweet中,用户出现两次,仅仅计算一次 if ( user.startsWith( "@" ) && !users.contains( user ) ) { // 释放到users流 collector.emit( "users", new Values( tx, tweetId, user ) ); users.add( user ); } } } @Override public void declareOutputFields( OutputFieldsDeclarer declarer ) { // 声明非默认流 declarer.declareStream( "users", new Fields( "txid", "tweet_id", "user" ) ); } } /* 针对当前Tweet中每个主题标签,释放一个 tweetId,hashTag 元组 */ public static class HashtagSplitterBolt extends BaseBasicBolt { @Override public void execute( Tuple input, BasicOutputCollector collector ) { String tweet = input.getStringByField( "tweet" ); String tweetId = input.getStringByField( "tweet_id" ); StringTokenizer strTok = new StringTokenizer( tweet, " " ); TransactionAttempt tx = (TransactionAttempt) input.getValueByField( "txid" ); HashSet<String> words = new HashSet<String>(); while ( strTok.hasMoreTokens() ) { String word = strTok.nextToken(); if ( word.startsWith( "#" ) && !words.contains( word ) ) { collector.emit( "hashtags", new Values( tx, tweetId, word ) ); words.add( word ); } } } @Override public void declareOutputFields( OutputFieldsDeclarer declarer ) { // 声明非默认流 declarer.declareStream( "hashtags", new Fields( "txid", "tweet_id", "hashtag" ) ); } } /** * UserHashtagJoinBolt是一个BaseBatchBolt,这意味着: * 1、execute处理接收到的元组,但是不释放任何新元组 * 2、在本批次的元组处理完毕后,Storm自动调用finishBatch */ public static class UserHashtagJoinBolt extends BaseBatchBolt { /* 每个Tweet都有哪些Tag */ private Map<String, Set<String>> tweetHashtags; /* 每个用户都被哪些Tweet @ */ private Map<String, Set<String>> userTweets; private BatchOutputCollector collector; private Object id; private void add( Map<String, Set<String>> map, String key, String val ) { Set<String> vals = map.get( key ); if ( vals == null ) { vals = new HashSet<>(); map.put( key, vals ); } vals.add( val ); } /** * 对于每一个事务,都会调用此方法 * * @param id 事务标识符 */ @Override public void prepare( Map conf, TopologyContext context, BatchOutputCollector collector, Object id ) { tweetHashtags = new HashMap<>(); userTweets = new HashMap<>(); this.collector = collector; this.id = id; } /* 生成两个映射关系 */ @Override public void execute( Tuple tuple ) { String source = tuple.getSourceStreamId(); String tweetId = tuple.getStringByField( "tweet_id" ); if ( "hashtags".equals( source ) ) { String hashtag = tuple.getStringByField( "hashtag" ); add( tweetHashtags, tweetId, hashtag ); } else if ( "users".equals( source ) ) { String user = tuple.getStringByField( "user" ); add( userTweets, user, tweetId ); } } /* 处理完当前批次后调用。进行JOIN操作,得到每个用户和每个主题标签一起被提及的频率 */ @Override public void finishBatch() { for ( String user : userTweets.keySet() ) { Set<String> tweets = getUserTweets( user ); HashMap<String, Integer> hashtagsCounter = new HashMap<String, Integer>(); for ( String tweet : tweets ) { Set<String> hashtags = getTweetHashtags( tweet ); if ( hashtags != null ) { for ( String hashtag : hashtags ) { Integer count = hashtagsCounter.get( hashtag ); if ( count == null ) count = 0; count++; hashtagsCounter.put( hashtag, count ); } } } for ( String hashtag : hashtagsCounter.keySet() ) { int count = hashtagsCounter.get( hashtag ); collector.emit( new Values( id, user, hashtag, count ) ); } } } private Set<String> getTweetHashtags( String tweet ) { return tweetHashtags.get( tweet ); } private Set<String> getUserTweets( String user ) { return userTweets.get( user ); } @Override public void declareOutputFields( OutputFieldsDeclarer declarer ) { declarer.declare( new Fields( "txid", "user", "hashtag", "count" ) ); } } /** * RedisCommiterCommiterBolt是一个Committer,这是由标记性接口ICommitter指定的。 * 你也可以TransactionalTopologyBuilder.setCommiterBolt将一个Bolt设置为Committer * <p> * 和普通BaseBatchBolt不同之处是,Committer的finishBatch方法仅仅在可以提交的时候才执行,也就是说 * 只有当前面的所有事务均成功提交之后,Committer才会提交当前事务。 * <p> * 同一拓扑中所有事务,在Committer都是串行执行的 */ public static class RedisCommiterCommiterBolt extends BaseTransactionalBolt implements ICommitter { public static final String LAST_COMMITED_TRANSACTION_FIELD = "LAST_COMMIT"; TransactionAttempt id; BatchOutputCollector collector; Jedis jedis; HashMap<String, Long> hashtags = new HashMap<String, Long>(); HashMap<String, Long> users = new HashMap<String, Long>(); HashMap<String, Long> usersHashtags = new HashMap<String, Long>(); private void count( HashMap<String, Long> map, String key, int count ) { Long value = map.get( key ); if ( value == null ) value = (long) 0; value += count; map.put( key, value ); } @Override public void prepare( Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id ) { this.id = id; this.collector = collector; this.jedis = new Jedis( "localhost" ); } @Override public void execute( Tuple tuple ) { String origin = tuple.getSourceComponent(); if ( "users-splitter".equals( origin ) ) { String user = tuple.getStringByField( "user" ); count( users, user, 1 ); } else if ( "hashtag-splitter".equals( origin ) ) { String hashtag = tuple.getStringByField( "hashtag" ); count( hashtags, hashtag, 1 ); } else if ( "user-hashtag-merger".equals( origin ) ) { String hashtag = tuple.getStringByField( "hashtag" ); String user = tuple.getStringByField( "user" ); String key = user + ":" + hashtag; Integer count = tuple.getIntegerByField( "count" ); count( usersHashtags, key, count ); } } /** * 一定要记住:保存上一次事务的ID,在执行提交时,再次检查此ID是否和当前事务ID重复 * * 这样可以防止Replay时,入库逻辑被重复执行 */ @Override public void finishBatch() { // 获得上一次事务ID String lastCommitedTransaction = jedis.get( LAST_COMMITED_TRANSACTION_FIELD ); String currentTransaction = String.valueOf( id.getTransactionId() ); // 当前事务已经提交过,直接返回 if ( currentTransaction.equals( lastCommitedTransaction ) ) return; // 开启Redis事务 Transaction multi = jedis.multi(); // 更新上一次事务ID multi.set( LAST_COMMITED_TRANSACTION_FIELD, currentTransaction ); Set<String> keys = hashtags.keySet(); // 更新统计信息 for ( String hashtag : keys ) { Long count = hashtags.get( hashtag ); multi.hincrBy( "hashtags", hashtag, count ); } keys = users.keySet(); for ( String user : keys ) { Long count = users.get( user ); multi.hincrBy( "users", user, count ); } keys = usersHashtags.keySet(); for ( String key : keys ) { Long count = usersHashtags.get( key ); multi.hincrBy( "users_hashtags", key, count ); } multi.exec(); } @Override public void declareOutputFields( OutputFieldsDeclarer declarer ) { } } @SuppressWarnings( "deprecation" ) public static void main( String[] args ) { // 可以使用TransactionalTopologyBuilder构建事务性拓扑,目前此类的功能已经被Trident代替 String topoId = "twitter-analytics-tool"; // 拓扑的标识符,Storm基于此标识符在ZooKeeper中存储事务性Spout的状态 String spoutId = "spout"; // Spout的标识符 TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder( topoId, spoutId, new TweetsTransactionalSpout() ); builder.setBolt( "users-splitter", new UserSplitterBolt(), 4 ).shuffleGrouping( "spout" ); builder.setBolt( "hashtag-splitter", new HashtagSplitterBolt(), 4 ).shuffleGrouping( "spout" ); builder.setBolt( "user-hashtag-merger", new UserHashtagJoinBolt(), 4 ) .fieldsGrouping( "users-splitter", "users", new Fields( "tweet_id" ) ) .fieldsGrouping( "hashtag-splitter", "hashtags", new Fields( "tweet_id" ) ); builder.setBolt( "redis-committer", new RedisCommiterCommiterBolt() ) .globalGrouping( "users-splitter", "users" ) .globalGrouping( "hashtag-splitter", "hashtags" ) .globalGrouping( "user-hashtag-merger" ); } } |
事务性拓扑提供了一种可靠的批量处理语义,其关键在于:
- 提供了组装元组批次,也就是事务的机制,并且信息持久化在ZooKeeper中,不会因为宕机而丢失
- 提供了串行有序执行语义,你只需要记录一个事务标识符 —— 上一个事务的标识符,因为任何时刻只会有一个事务正在准备提交
流是连续不断的元组序列。任何Spout、Bolt都可以释放0-N个流。
通过 JoinBolt,Storm支持将多个流合并为一个。JoinBolt是一个窗口化(Windowed)的Bolt,它会等待进行合并的多个流的Window duration匹配,确保流依据窗口边界对齐。
每个参与合并的流,必须是根据Join字段进行分fieldsGrouping,也就是说,Join字段一样的元组必须由同一Task处理。
考虑如下SQL语句:
1 2 3 4 5 |
select userId, key4, key2, key3 from table1 inner join table2 on table2.userId = table1.key1 inner join table3 on table3.key3 = table2.userId left join table4 on table4.key4 = table3.key3 |
对应的流连接如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
JoinBolt jbolt = new JoinBolt("spout1", "key1") // from spout1 // 我组件 我键 他组件 .join ("spout2", "userId", "spout1") // inner join spout2 on spout2.userId = spout1.key1 .join ("spout3", "key3", "spout2") // inner join spout3 on spout3.key3 = spout2.userId .leftJoin ("spout4", "key4", "spout3") // left join spout4 on spout4.key4 = spout3.key3 // 选择输出字段 .select ("userId, key4, key2, spout3:key3") // chose output fields // 滚动窗口,时长10分钟 .withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ; // 你也可以调用withWindow()配置滑动窗口 topoBuilder.setBolt("joiner", jbolt, 1) // 参与连接的流必须以Join字段分组 .fieldsGrouping("spout1", new Fields("key1") ) .fieldsGrouping("spout2", new Fields("userId") ) .fieldsGrouping("spout3", new Fields("key3") ) .fieldsGrouping("spout4", new Fields("key4") ); |
在Join某个流之前,你必须先引入它:
1 2 3 4 |
new JoinBolt( "spout1", "key1") // arg0引入 arg2连接到 .join( "spout2", "userId", "spout3") // 错误:spout3尚未引入 .join( "spout3", "key3", "spout1"); |
实际Join发生的顺序,就是用户声明的顺序。
上面流连接的例子中,都使用了组件名称,而不是流名称。Storm支持基于流名称连接,但是很多组件仅仅释放一个流,且流名称默认为default,要基于流名称连接,必须先定义好命名流。
1 2 3 4 5 |
// 提示此JoinBolt基于流名称而非组件名称引用参与连接的流 new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1").join("stream2", "key2"); topoBuilder.setBolt("joiner", jbolt, 1) // 配置流分组时要指定流的名称 .fieldsGrouping("bolt1", "stream1", new Fields("key1") ) |
- 目前仅仅支持内连接(INNER)和左连接(LEFT)
- SQL支持针对一个表,通过多个字段分别Join。Storm不支持,每个流有且仅有一个字段能用于Join,该字段用于流分组,确保键相同的分组发送给同一Task
- 要使用多字段连接,你需要将它们合并为单个字段
- 连接操作可能占用很高的CPU和内存。当前窗口中累积的数据越多,则连接消耗的时间越长。使用滑动窗口时,越短的滑动间隔导致越频繁的连接操作。因此太长的窗口、太短的滑动间隔都可能导致性能问题
- 使用滑动窗口时,多个窗口之间可能存在重复的Joined记录,这是因为这些记录的源元组可能跨越多个窗口存在
- 如果使用消息超时,应当正确设置topology.message.timeout.sec,确保它能够匹配窗口大小,同时考虑拓扑中其它组件可能消耗的时间
- 连接两个流时,假设窗口大小分别为M、N,那么最坏的情况下可能产生M x N个输出元组,下游Bolt将会释放更多的ACK。这种情况下,Storm的消息子系统可能面临很大的压力,不小心使用可能导致拓扑运行缓慢。下面是管理消息子系统的一些建议:
- 增加Worker的堆大小: topology.worker.max.heap.size.mb
- 如果拓扑不需要ACK,可以禁用它: topology.acker.executors=0
- 禁用事件记录器: topology.eventlogger.executor=0
- 禁用拓扑调试功能: topology.debug=false
- topology.max.spout.pending设置要匹配完整窗口需要的大小,附加额外的一些空间。这可以避免消息子系统过载时Spout仍然不停释放元组,该项设置为null时更容易发生此情况
- 在能够满足需求的情况下,窗口应该尽量的小
本章主要讨论设计拓扑入口点 —— Spout的通用策略,以及如何让Spout支持容错。
在设计拓扑时,考虑消息可靠性需求是一个重要事项。当一个消息无法完成处理时,你需要决定针对此消息进行怎样的处理?整个拓扑有应该有怎样的行为。打个比方,在处理银行存款业务时,每个交易消息都不能丢失;单是在处理百万级数据的统计信息时,丢失一部分消息则不会对结果精确性产生太大影响。
在Storm中,拓扑的设计者负责保证消息的可靠性。这常常需要权衡考虑,因为拓扑需要更多的资源以管理消息不丢失。
你可以在释放元组时为其指定标识符:
1 2 |
// 此重载版本为SpoutOutputCollector特有,Bolt使用的OutputCollector没有此方法 collector.emit(new Values(...),tupleId) |
如果不指定标识符,Storm会自动生成一个。在消费者中可以调用 input.getMessageId()获得元组的Id。
当消费者针对元组进行ack/fail操作时,生产者的ack/fail回调会被调用,并传入确认/失败的那个元组。至于如何处理失败的元组,需要根据具体业务场景决定,常见的做法是放回消息队列。
引起元组失败的原因有两种:
- 消费者调用 collector.fail(tuple)
- 元组处理的时间超过配置的时限, Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS
最简单的接入方式是Spout直接连接到数据源(Message Emitter)。如果数据源是明确的(Well-known)的设备(在这里是一个抽象概念,理解为一般性的数据源即可)或者设备组,这种方式实现起来很简单。
所谓明确的,是指在拓扑启动时设备就就是已知的(Known)并且在拓扑运行期间它保持不变。所谓未知设备,就是在拓扑运行之后才添加进来的。
所谓明确设备组,就是其中所有设备都是已知的一组设备。
下面是一个基于Twitter流API的拓扑,它使用直接连接的接入方式:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 |
package cc.gmem.study.storm.twt; import com.esotericsoftware.kryo.util.IdentityMap; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.storm.shade.org.apache.http.HttpResponse; import org.apache.storm.shade.org.apache.http.StatusLine; import org.apache.storm.shade.org.apache.http.client.CredentialsProvider; import org.apache.storm.shade.org.apache.http.client.methods.HttpGet; import org.apache.storm.shade.org.apache.http.impl.client.DefaultHttpClient; import org.apache.storm.spout.SpoutOutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseRichSpout; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.util.Map; public class TwitterStreamSpout extends BaseRichSpout { private static final String STREAMING_API_URL = "..."; private CredentialsProvider credentialProvider; private String track; private ObjectMapper om; private SpoutOutputCollector collector; @Override public void open( Map conf, TopologyContext context, SpoutOutputCollector collector ) { // 任何组件都可以访问拓扑上下文,其中包含了拓扑的所有全局性信息 // 获得当前Spout的实例数量 int spoutsSize = context.getComponentTasks( context.getThisComponentId() ).size(); // 获得当前Spoput实例的Id int myIdx = context.getThisTaskIndex(); String[] tracks = ( (String) conf.get( "track" ) ).split( "," ); StringBuffer tracksBuffer = new StringBuffer(); for ( int i = 0; i < tracks.length; i++ ) { // 让各Spout实例处理不同的track关键字 if ( i % spoutsSize == myIdx ) { tracksBuffer.append( "," ); tracksBuffer.append( tracks[i] ); } } this.track = tracksBuffer.substring( 1 ); this.collector = collector; } @Override public void nextTuple() { // 通过HTTP获取消息 DefaultHttpClient client = new DefaultHttpClient(); client.setCredentialsProvider( credentialProvider ); // 让每个Spout实例跟踪不同的track关键字 HttpGet get = new HttpGet( STREAMING_API_URL + this.track ); HttpResponse response; try { response = client.execute( get ); StatusLine status = response.getStatusLine(); if ( status.getStatusCode() == 200 ) { InputStream inputStream = response.getEntity().getContent(); BufferedReader reader = new BufferedReader( new InputStreamReader( inputStream ) ); String in; // 逐行读取 while ( ( in = reader.readLine() ) != null ) { // 解析并释放元组 Object json = om.readValue( in, Map.class ); collector.emit( new Values( this.track, json ) ); } } } catch ( IOException e ) { // 连接到Twitter API失败,可以休眠一段时间后重试 } } @Override public void declareOutputFields( OutputFieldsDeclarer declarer ) { declarer.declare( new Fields( "track", "content" ) ); } } |
注意上面的例子中用到Storm的一个重要特性 —— 在任何组件中访问TopologyContext。利用TopologyContext你可以感知到Spout的Task数量,然后将不同数据源映射到不同的Task,增加拓扑的并行度。
上面是连接到明确的设备的例子,通过类似的方式,可以直接连接到未知设备。但是你需要一个协调(Coordinator)系统来维护可用设备的列表。例如,当进行Web服务器日志分析时,Web服务器的列表可能动态变化。当一个新的Web服务器加入后,Coordinator检测到并为其创建一个Spout。
当建立连接时,应当总是由Spout发起,而不是由数据源发起。因为运行Spout的服务器宕机后Storm可能在另外一台机器上重启此Spout,数据源难以精确的定位到Spout。
除了直接连接,你还可以使用消息队列系统。Spout连接到消息队列读取消息,数据源则把消息发布到消息队列。
消息中间件软件通常都提供可靠性、持久化等保证,这简化了开发任务。使用消息中间件后Spout对数据源毫无感知。
使用消息列队这种接入方式时,如果想增加Spout并行度,可以:
- 让Spout循环轮询(Round-robin Poll)同一队列
- 基于哈希再分发:根据哈希值将消息分发给不同Spout,或者分发到不同的子队列。Spout读取子队列。Kafka是一个很好的支持子队列(分区)的消息中间件
Bolt是拓扑中的关键组件,大部分逻辑都发生在Bolt链中。
Bolt这类组件,以元组为输入,可选的,它也能够输出元组。
在创建Bolt时,你需要实现IRichBolt接口。Bolt在客户端创建,然后串行化到拓扑,最后整个拓扑被提交到Storm集群的Master节点。集群会启动工作节点(JVM进程)来运行拓扑组件,Bolt会被反串行化,其prepare方法会被调用,然后开始接受元组。
Storm确保Spout释放的所有元组都能传递给相应的Bolt,至于Bolt是否保证信息堡丢失,由开发人员决定。
拓扑是组件构成的网络,Spout释放的元组,是网络中传递的信息。信息经过不同组件之后,可能分裂、合并,这和Bolt灵活的Tuple处理机制有关:
- 可以根据一个输入元组产生多个输出元组
- 可以根据多个输入元组产生一个输出元组
从框架角度来说,无法自动推导输入、输出元组的对应关系。因此在Storm中你需要利用锚定(Anchoring)技术手工指定。
以从Spout发出的原初元组为根,其衍生的任何元组被fail,则此原初元组也就fail,发起此元组的Spout的fail回调自动被调用。
每个Spout/Bolt都可以声明多个输出流,标识符为default的默认流自动声明。要声明新的流,可以调用:
1 |
declarer.declareStream( streamId, fields ); |
要向指定的流释放元组,调用:
1 |
collector.emit( streamId, tuple ); |
前面提到过,要追踪信息流动方向,需要使用锚定技术。也就是指定输入、输出元组之间的关联关系:
1 2 |
// 此重载版本为OutputCollector特有 collector.emit( inputTuple, outputTuple ); |
有了这种调用后,Storm才可以对原初元组进行有效的追踪。
如果输出元组基于多个输入元组推导出,可以一起锚定它们:
1 2 3 4 |
List<Tuple> anchors = new ArrayList<Tuple>(); anchors.add( inputTuple1 ); anchors.add( inputTuple2 ); collector.emit( anchors, outputTuple ); |
如果outputTuple失败,则产生inputTuple1、inputTuple2的Spout都会得到通知。
Storm会在此接口的实现的execute()执行后,自动调用ack,如果execute()抛出异常,则自动调用fail。你可以考虑继承BaseBasicBolt。
Storm提供了一个抽象,允许Bolt存储/取回其操作的状态信息。该抽象有两个实现,一个是基于内存的默认实现,另一个以Redis作为后备存储。
需要状态管理功能的Bolt,需要实现IStatefulBolt接口或者继承BaseStatefulBolt,并实现initState方法。Storm在初始化Bolt实例时,调用initState并传入上次存储的状态,调用发生在prepare之后,处理任何元组之前。
当前仅仅支持的状态实现是org.apache.storm.state.KeyValueState。
分布式远程过程调用(Distributed Remote Procedure Call)利用Storm的计算能力进行远程调用。Storm提供了一系列DRPC的支持工具。
说明:
- 客户端向DRPC服务器发起同步调用
- DRPC将请求转发给DRPC Spout,进而在拓扑异步的处理
- 处理结果由最后一个Bolt返回给DRPC服务器。并返回给客户端
DRPC服务器是客户端和Storm拓扑之间的桥梁,它作为拓扑中Spout的源运行。
DRPC服务器接受需要执行的函数及其参数,它可以执行多个函数,函数通过名称唯一识别。
对于每一次函数调用,服务器分配一个请求Id,用于在拓扑中唯一的识别调用请求。当拓扑中最后一个Bolt执行完毕后,此Bolt必须释放出包含请求Id、调用结果的数组,DRPC会把结果分发给正确的客户端。
注意:目前此类已经被废弃,其功能由Trident代替。
用于构建DRPC拓扑的抽象。此Builder生成的拓扑会:
- 自动创建DRPCSpouts,连接DRPC服务器读取请求,并释放元组到拓扑的其它部分
- 对Bolts进行包装,让调用结果从最后一个Bolt返回
所有添加到LinearDRPCTopologyBuilder的Bolt均按序依次执行
要提交拓扑到Storm集群,可以调用LinearDRPCTopologyBuilder的createRemoteTopology(而不是createLocalTopology),此方法会利用Storm配置中的DRPC相关配置。
要连接到DRPC服务器,使用此类,此类是ThriftClient的子类型。
DRPC服务器暴露了基于Thrift的API,可以在很多语言中使用此API。并且,不管DRPC服务器是本地还是远程的,API完全一样。
下面是一个简单的加法计算器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 |
package cc.gmem.study.storm.drpc; import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.LocalDRPC; import org.apache.storm.drpc.LinearDRPCTopologyBuilder; import org.apache.storm.generated.StormTopology; import org.apache.storm.topology.BasicOutputCollector; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; import org.apache.storm.tuple.Values; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class DrpcAdder { private static final Logger LOGGER = LoggerFactory.getLogger( DrpcAdder.class ); public static class AdderBolt extends BaseBasicBolt { @Override public void execute( Tuple input, BasicOutputCollector collector ) { // 元组第一个参数是RPC请求Id Object rpcId = input.getValue( 0 ); // 元组第二个参数是RPC请求参数 String args = input.getString( 1 ); String[] numbers = args.split( "," ); Integer added = 0; for ( String num : numbers ) { added += Integer.parseInt( num ); } collector.emit( new Values( rpcId, added ) ); } @Override public void declareOutputFields( OutputFieldsDeclarer declarer ) { // 作为最后一个Bolt,需要释放RPC请求Id declarer.declare( new Fields( "id", "result" ) ); } } public static void main( String[] args ) { // 和DRPCClient一样是DistributedRPC.Iface的实现 LocalDRPC drpc = new LocalDRPC(); // 拓扑对应一个远程调用函数 LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder( "add" ); builder.addBolt( new AdderBolt(), 2 ); Config conf = new Config(); conf.setDebug( true ); LocalCluster cluster = new LocalCluster(); // 创建本地拓扑,需要传入客户端对象 StormTopology topology = builder.createLocalTopology( drpc ); cluster.submitTopology( "drpc-adder-topology", conf, topology ); // 进行一次调用 String result = drpc.execute( "add", "1,-1" ); LOGGER.debug( "DRPC result {}", result ); cluster.shutdown(); drpc.shutdown(); } } |
Storm使用调度器在集群中进行拓扑的调度,它提供了四种内置的调度器实现:DefaultScheduler, IsolationScheduler, MultitenantScheduler, ResourceAwareScheduler。
要实现自己的调度器,实现IScheduler接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
package cc.gmem.study.storm; import org.apache.storm.scheduler.Cluster; import org.apache.storm.scheduler.IScheduler; import org.apache.storm.scheduler.Topologies; import java.util.Map; public class Scheduler implements IScheduler { @Override public void prepare( Map conf ) { } /** * 为需要调度的拓扑设置Assignment,调用cluster.getAssignments()可以得到最新设置的Assignment * * @param topologies 提交到集群中所有的拓扑,某些拓扑需要被调度 * 这里的拓扑对象仅仅包含拓扑的静态信息,Assignment、Slot等信息存在于cluster中 * @param cluster 拓扑所在的Storm集群,此对象包含开发新调度器所需的全部信息,例如: * Supervisor信息、可用Slot、当前Assignment */ @Override public void schedule( Topologies topologies, Cluster cluster ) { } } |
要切换调度器,修改配置文件 storm.yaml的 storm.scheduler配置项。
此调度器可以让多个拓扑安全、简单的共享单个集群。此调度器允许你隔离某些拓扑 —— 让它在一组专用的机器上执行,其它拓扑不能使用这些专用机器。被隔离的拓扑优先享用集群资源,被所有隔离拓扑占用内,剩下的机器,所有非隔离拓扑共享之。
要启用此调度器,在Nimbus节点的配置文件中进行如下设置:
1 2 3 4 5 6 |
storm.scheduler: org.apache.storm.scheduler.IsolationScheduler # 拓扑名称:专用机器数量的映射 isolation.scheduler.machines: "my-topology": 8 "tiny-topology": 1 "some-other-topology": 3 |
Storm支持对一组落到同一“窗口”的元组进行统一的处理。所谓窗口,是被特定条件限定的、连续的元组集合。限定条件有两种:
- 窗口的长度:包含元组的数量,或者持续的时间
- 窗口的滑动:每隔多长窗口向前滑动
长度/间隔有两种度量方式:时间、元组数量。
窗口具有一定的长度,并且每隔一段时间,窗口就向前滑动一次。例如下面的滑动窗口示意:
........| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
-5 0 5 10 15 -> time
|<------- w1 -->|
|<---------- w2 ----->|
|<-------------- w3 ---->|
窗口的长度是10秒,滑动间隔是5秒。注意元组可能属于多个窗口,每个窗口包含的元组数量可以变化。
窗口具有固定的长度,不同元组仅仅会属于单个窗口。下面是滚动窗口的示意:
| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
0 5 10 15 -> time
w1 w2 w3
窗口长度是5秒,窗口之间不存在重叠。
要获得窗口化支持,Bolt需要实现IWindowedBolt接口:
1 2 3 4 5 6 7 |
public interface IWindowedBolt extends IComponent { void prepare(Map stormConf, TopologyContext context, OutputCollector collector); // 处理落到窗口中的元组,可选的,释放新的元组 // 每当一个窗口激活(满了)时,该方法就被调用 void execute(TupleWindow inputWindow); void cleanup(); } |
通常情况下,你可以选择继承BaseWindowedBolt类,该类提供了一些指定窗口长度、滑动间隔的方法:
1 2 3 4 5 |
builder.setBolt("slidingwindowbolt", // 滑动窗口长度 30 元组,滑动间隔 10 元组 // 要指定滚动窗口,使用withTumblingWindow方法,要基于时间而非长度,使用Duration new SlidingWindowBolt().withWindow(new Count(30), new Count(10)), 1).shuffleGrouping("spout"); |
默认的,窗口以Bolt处理元组的时间为计算窗口边界的基准。Storm还支持以元组字段给出的时间戳为基准:
1 2 3 4 5 6 |
// 根据fieldName字段指定的时间(Long型)计算窗口边界 // 如果fieldName字段不存在,导致异常 BaseWindowedBolt.withTimestampField(String fieldName); // 另外一种方式,提供一个能够从元组中抽取时间戳的回调函数 BaseWindowedBolt.withTimestampExtractor(TimestampExtractor timestampExtractor); |
除了指定时间戳获取方式之外,Storm还允许指定一个最大的延迟参数:
1 |
BaseWindowedBolt.withLag(Duration duration); |
如果Bolt接收到一个时间戳为06:00:05的元组,且最大延迟为5s,那么后续不应该再接收到时间戳小于06:00:00的元组。如果的确接收到这样的过期(Late)元组,Storm默认不会处理,仅仅在Worker日志中以INFO级别记录。
要改变上述默认行为,可以调用:
1 2 3 |
// 过期元组将被释放到streamId流中,后续可以通过WindowedBoltExecutor.LATE_TUPLE_FIELD访问此流 // 必须和withTimestampField一起调用,否则IllegalArgumentException BaseWindowedBolt.withLateTupleStream(String streamId); |
当基于元组字段时间戳计算窗口边界时,Storm内部基于接收到的元组的时间戳来计算水位。所谓水位,就是所有输入流中,所有元组中最晚的那个时间戳减去乱序延迟后得到的数值。
Storm周期性的释放出水位时间戳,此时间戳将作为判断窗口边界是否已经到达的标准。周期默认1s,要修改周期可以调用:
1 |
BaseWindowedBolt.withWatermarkInterval(Duration interval) |
在协同多个输入流,进行窗口化处理时,水位时间戳相当于一个标准时钟。一旦窗口右边界小于等于此时间戳,就会立刻被处理,且小于此时间戳的后续元组均过期。如果多个输入流释放元组的速度差距很大,那么慢的那些流的元组甚至可能全部成为过期元组。
举个例子,假设基于元组字段时间戳的窗口参数如下:
Window length = 20s, sliding interval = 10s, watermark emit frequency = 1s, max lag = 5s
当前墙上时间为09:00:00,在 09:00:00-09:00:01之间接收到以下元组:
e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36)
墙上时间到达09:00:01时,水位时间戳06:00:31(最晚元组时间戳-乱序延迟5)被释放,后续任何时间戳小于06:00:31的元组均过期。水位时间戳导致三个窗口被处理:
05:59:50 - 06:00:10 e1 e2 e3
06:00:00 - 06:00:20 e1 e2 e3 e4
06:00:10 - 06:00:30 e4 e5
将第一个元组的时间戳针对滑动间隔向上取整(这确保它落在第一个窗口中),得到06:00:10,作为第一个窗口的右边界。这样有多少个窗口,窗口边界是哪里都可以推算出来。
元组e6没有落在适当的窗口中,因此此时暂不会被处理。
在09:00:01-09:00:02之间接收到以下元组:
e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39)
墙上时间戳到达09:00:02时,水位是按戳08:00:34被释放。水位时间戳导致三个窗口被处理:
06:00:20 - 06:00:40 e5 e6
06:00:30 - 06:00:50 e6
08:00:10 - 08:00:30 e7 e8 e9
第一个窗口的左边界,等于上一批次首个窗口的左边界 + 滑动间隔10。第二个窗口继续右滑10s。
向右滑动跳过N多空窗,得到从08:00:10开始的非空窗,包含了e7-e9。e10所属窗口右边界超出水位时间戳,因此暂不处理。
Storm的窗口化功能,可以提供最少一次处理的保证。
在方法 execute(TupleWindow inputWindow)中释放的元组,自动锚定到inputWindow中所有元组。下游Bolt应当Ack从WindowedBolt接收的元组,以完成元组树的处理。如果不Ack元组需要回放,窗口计算也会重新进行。过期的元组会自动被确认。
对于基于时间的窗口,配置参数topology.message.timeout.secs应当远大于windowLength + slidingInterval,否则元组可能来得及处理之前就超时,导致Replay。
对于基于计数的窗口,配置参数topology.message.timeout.secs应当调整,保证windowLength + slidingInterval个元组能够在超时前得到处理。
Storm提供了一个分布式缓存API,利用它可以高效的分发巨大的、在拓扑生命周期内可能变化的文件(或者叫Blob),这些文件的大小可能在KB-GB之间。
对于小的,不需要动态更新的数据集,可以考虑将其嵌入在JAR中。但是过大的文件则不行,会导致拓扑启动非常缓慢。这种情况下使用分布式缓存可以加快拓扑的启动速度。
在拓扑启动时,用户可以指定拓扑所需要的文件集。在拓扑运行期间,用户可以查询分布式缓存中的任何文件,并更换为新版本。更新行为遵从最终一致性模型。在分布式缓存中,文件基于LRU算法管理。Worker负责确认什么文件不再需要,并删除以释放磁盘空间。
分布式缓存的接口是BlobStore,目前它有两种实现:LocalFsBlobStore、HdfsBlobStore
这是基于本地文件系统的缓存实现。
1 2 |
# 复制因子4,任何人可以读写管,键key1,文件内容在README.txt中 storm blobstore create --file README.txt --acl o::rwa --replication-factor 4 key1 |
文件的创建由接口ClientBlobStore负责,其实现是NimbusBlobStore。在使用基于本地文件系统的缓存时,NimbusBlobStore调用Nimbus,在其本地文件系统创建文件。
拓扑提交时,JAR和配置文件也是在BlobStore的帮助下上传的,就好像一个普通文件一样。
拓扑启动后,被分配负责运行它的节点的Supervisor会从Nimbus下载JAR、配置文件,并根据topology.blobstore.map确定此拓扑需要使用哪些缓存文件并下载到本地。
在提交拓扑的时候,可以指定该拓扑需要使用哪些文件:
1 2 3 |
storm jar /path/to/jar class # 文件key1在拓扑中基于名称blob_file访问,不压缩 -c topology.blobstore.map='{"key1":{"localname":"blob_file", "uncompress":"false"},"key2":{}}' |
和基于本地文件系统的缓存实现类似,但是容错性已经由HDFS提供了。
LocalFsBlobStore需要在ZooKeeper中存储状态信息,以便支持Nimbus HA。
文件创建、下载、更新由HdfsClientBlobStore负责。
BlobStore基于键、版本来生成一个哈希码,作为Blob的文件名。Blob存放在blobstore.dir指定的目录,默认值对于本地实现来说是storm.local.dir/blobs,对于HDFS实现是一个类似的路径。
一旦Blob被提交,BlobStore即读取配置并为文件创建元数据,元数据用于访问Blob以及对访问者的进行授权。
对于本地实现,Supervisor节点的缓存大小软限制为10240MB,每隔600s会自动根据LRU算法清理超过软限制的那部分blob。
HDFS实现能够减少Nimbus的负载,而且容错性也不受到Nimbus数量的限制。Supervisor进行blob下载时不需要和Nimbus通信,因此减少了对Nimbus的依赖。
Storm的Master是运行在单台机器上的,在监控工具保护下的进程。一般情况下,Nimbus宕机后会由监控工具重启,不会造成太大问题。但是当Nimbus所在机器出现硬件问题时,例如磁盘损坏,Nimbus就无法启动了。
如果Nimbus无法启动,现有的拓扑能够继续运行,但是无法提交新拓扑。现有的拓扑也不能被杀死、激活、禁用。此外,如果某个Supervisor宕掉,Worker重新分配也无法进行。
因此,实现Nimbus的HA是有必要的,以便:
- 增加Nimbus总体可用时间
- 允许Nimbus主机在任何时候加入、离开集群
- 如果Nimbus宕机,不需要进行任何拓扑重提交
- 活动的拓扑绝不丢失
实现HA,需要在多个Nimbus节点中选择Leader。Nimbus服务器基于如下接口进行Leader选举:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
public interface ILeaderElector { /** * 排队等待获取Leader锁,该方法立即返回,调用者应该检查自己是否Leader */ void addToLeaderLockQueue(); /** * 将自己从等待Leader锁的队列中移除,如果当前持有锁,释放之 */ void removeFromLeaderLockQueue(); /** * 当前节点是否持有了Leader锁 */ boolean isLeader(); /** * 返回当前Leader的地址,如果当前没有节点持有锁,则抛出异常 */ InetSocketAddress getLeaderAddress(); /** * 返回所有Nimbus节点的地址 */ List<InetSocketAddress> getAllNimbusAddresses(); } |
Storm提供了基于ZooKeeper的ILeaderElector实现。
为实现Nimbus故障转移,所有状态、数据必须在所有Nimbus节点之间复制,或者存储在外部的分布式存储系统中。BlobStore就可以用作这样的存储系统。
用户可以通过 topology.min.replication.count声明一个代码复制因子N,在拓扑启动前,其代码、Jar、配置必须复制到至少N个Nimbus节点。使用基于本地文件系统的BlobStore时,当发生故障转移后,如果某个Blob丢失,Nimbus会在需要的时候去下载。也就是说,成为Leader的时候Nimbus不一定在本地存储了所有Blob。
当使用基于本地文件系统的BlobStore时,所有Blob的元数据在ZooKeeper中存储;而使用基于HDFS的BlobStore时,HDFS负责管理。
基于Storm CLI使用分布式缓存的方法,请参考blobstore命令。本节给出相应的Java API的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
import org.apache.storm.utils.Utils; import org.apache.storm.blobstore.ClientBlobStore; import org.apache.storm.blobstore.AtomicOutputStream; import org.apache.storm.blobstore.InputStreamWithMeta; import org.apache.storm.blobstore.BlobStoreAclHandler; import org.apache.storm.generated.*; // 创建BlobStore客户端 Config conf = new Config(); conf.putAll(Utils.readStormConfig()); ClientBlobStore clientBlobStore = Utils.getClientBlobStore(conf); // 创建ACL String stringBlobACL = "u:username:rwa"; AccessControl blobACL = BlobStoreAclHandler.parseAccessControl(stringBlobACL); List<AccessControl> acls = new LinkedList<AccessControl>(); acls.add(blobACL); SettableBlobMeta settableBlobMeta = new SettableBlobMeta(acls); settableBlobMeta.set_replication_factor(4); // ACL的复制因子 // 创建Blob,并授予ACL AtomicOutputStream blobStream = clientBlobStore.createBlob("some_key", settableBlobMeta); blobStream.write("Some String or input data".getBytes()); blobStream.close(); // 更新Blob String blobKey = "some_key"; AtomicOutputStream blobStream = clientBlobStore.updateBlob(blobKey); // 更改ACL String blobKey = "some_key"; AccessControl updateAcl = BlobStoreAclHandler.parseAccessControl("u:USER:--a"); // u:USER:-w- List<AccessControl> updateAcls = new LinkedList<AccessControl>(); updateAcls.add(updateAcl); SettableBlobMeta modifiedSettableBlobMeta = new SettableBlobMeta(updateAcls); clientBlobStore.setBlobMeta(blobKey, modifiedSettableBlobMeta); // 设置Blob元数据 // 设置、读取复制因子 String blobKey = "some_key"; BlobReplication replication = clientBlobStore.updateBlobReplication(blobKey, 5); int replication_factor = replication.get_replication(); // 读取Blob String blobKey = "some_key"; InputStreamWithMeta blobInputStream = clientBlobStore.getBlob(blobKey); BufferedReader r = new BufferedReader(new InputStreamReader(blobInputStream)); String blobContents = r.readLine(); // 删除Blob String blobKey = "some_key"; clientBlobStore.deleteBlob(blobKey); // 列出所有Blob Iterator <String> stringIterator = clientBlobStore.listKeys(); |
从0.6.0版本开始,Storm使用Kryo作为串行化库,Kryo是一个高性能、灵活的Java串行化框架。它可以产生较紧凑的串行化格式。
默认情况下,Storm能够串行化基本类型、字符串、字节数组、ArrayList、HashMap、HashSet、Clojure集合类型。如果你希望在元组中使用其它类型,需要提供自定义的串行化器。
在声明输出字段时,Storm不支持指定字段的类型。你仅仅需要把对象放入元组,Storm负责动态的识别其类型并串行化。
这个行为和Hadoop不同,Hadoop对键、值进行强制类型,这需要你提供大量的注解。Hadoop这种强制静态类型往往得不偿失。除了出于简单的目的,下面两条也是Storm选择动态类型的原因:
- 没有办法很好的静态确定元组字段类型。Storm很灵活,一个Bolt可能订阅了多个流,这些流释放的元组字段各不相同
- 为了便于和JRuby、Clojure这样的动态语言配合
要自定义串行化器,你需要实现Kryo的某些接口。要注册这些串行化器,你需要配置拓扑的topology.kryo.register,此配置项的值是一个列表:
1 2 3 4 5 |
topology.kryo.register: # 形式一:指定需要串行化的类型,这种情况下使用Kryo的FieldsSerializer来串行化该类型 - com.mycompany.CustomType1 # 形式二:指定类型到串行化器(com.esotericsoftware.kryo.Serializer)的映射 - com.mycompany.CustomType2: com.mycompany.serializer.CustomType2Serializer |
调用 Config.registerSerialization也可以自定义串行化。
配置项 Config.TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS设置为true时,Storm忽略所有找不到的串行化器类,如果设置为false则抛出异常。
如果Storm遇到某个元组字段,它没有可用的串行化器,则转用Java原生的串行化机制。如果该字段无法基于Java串行化,Storm抛出异常。
注意:Java串行化的成本很高,CPU占用高、输出格式尺寸大。应当尽量避免使用。
配置项 Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION设置为false,则禁用Java串行化。
Storm提供了钩子机制,允许你在任何Storm事件发生时注入自定义代码。扩展BaseTaskHook类,覆盖对应你关心事件的方法即可实现钩子。相关方法包括:emit、spoutAck、spoutFail、boltAck、boltFail、boltExecute。
注册钩子的方法有两种:
- 在Spout的open、Bolt的prepare方法中,调用TopologyContext的方法注册
- 通过拓扑配置项topology.auto.task.hooks注册,这样钩子会应用到任何Spout、Bolt。在和外部监控系统集成时,可以使用这种方式
Storm暴露了一个度量(metrics)接口,用于报告整个拓扑的统计信息。在Storm内部,此接口用于元组计数、处理延迟统计、Work堆用量统计,等等。
任何度量都需要实现IMetric接口,该接口仅包含一个方法getValueAndReset,此方法用于执行必要的统计并重置度量为初始值。Storm提供的度量子类型有:
类型 | 说明 |
AssignableMetric | 支持设置度量为一个明确的值 |
CombinedMetric | 可以被关联更新的度量的通用接口 |
CountMetric | 计数性的度量,其方法 incr()用于增加1个计数, incrBy(n)用于增加n个计数 |
MultiCountMetric | CountMetric的HashMap |
ReducedMetric |
MeanReducer:用于求平均值 MultiReducedMetric:ReducedMetric的HashMap |
你可以在Bolt中编程式的声明度量、并通过TopologyContext注册:
1 2 3 4 5 6 7 8 9 10 11 12 |
private transient CountMetric countMetric; public void prepare(Map conf, TopologyContext context, OutputCollector collector) { // 注册度量 countMetric = new CountMetric(); // 每60秒自动调用countMetric.getValueAndReset()以重置度量 context.registerMetric("execute_count", countMetric, 60); } public void execute(Tuple input) { // 更新度量值 countMetric.incr(); } |
要注册Worker级别度量,可以配置:
- Config.WORKER_METRICS:针对集群中所有Worker
- Config.TOPOLOGY_WORKER_METRICS:针对某个拓扑的所有Worker
上述两个配置的取值都是度量名到度量类的HashMap。
Worker级别度量具有以下限制:
- 通过SystemBolt注册,不暴露给用户Task
- 基于默认构造器创建,不会进行任何属性或配置的注入
要消费度量值实现 IMetricsConsumer接口。通过注册消费者,你可以监听、处理拓扑的度量数据。编程式注册:
1 |
conf.registerMetricsConsumer(org.apache.storm.metric.LoggingMetricsConsumer.class, 1); |
通过配置文件:
1 2 3 4 5 6 7 8 |
topology.metrics.consumer.register: - class: "org.apache.storm.metric.LoggingMetricsConsumer" parallelism.hint: 1 - class: "org.apache.storm.metric.HttpForwardingMetricsConsumer" # 消费者对应的Bolt的并行度 parallelism.hint: 1 # 消费者的prepare方法接收此参数 argument: "http://example.com:8080/metrics/my-topology/" |
注册消费者后,Storm会在内部为每个消费者添加一个MetricsConsumerBolt(到拓扑),每个MetricsConsumerBolt都会订阅来自任何Task的度量信息。
MetricsConsumerBolt的并行度由parallelism.hint指定,其组件ID为 __metrics_consumerClassFQName#SEQ,其中SEQ仅仅在多次注册同一个消费者类时出现。
注意:消费者仅仅是一个普通的Bolt,如果它本身性能低下,则很多度量值会因而出现偏差。
Storm提供了一些内置的度量消费者,你可以使用它们来了解拓扑提供了哪些度量:
消费者 | 说明 |
LoggingMetricsConsumer | 监听度量值,以TSV格式存储到文件 |
HttpForwardingMetricsConsumer | 监听度量值,以HTTP Post方式发送到外部服务器 |
为了调整Nimbus、Supervisor、运行中拓扑的行为,Storm提供了多种配置信息。 某些配置信息是系统级的,不能为每个拓扑定制。
所有配置项的默认值记录在defaults.yaml中。要覆盖默认值,在Nimbus、Supervisor的Classpath下放置一个storm.yaml文件。
通过 StormSubmitter提交拓扑时,你可以连同自定义配置项一起提交,这些配置项必须以topology.开头。
从Storm 0.7开始,某些配置项可以在Spout/Bolt级别上定制,包括:
- topology.debug
- topology.max.spout.pending
- topology.max.task.parallelism
- topology.kryo.register
通过Java API,你可以为组件指定上述配置信息,途径有两种:
- 组件内部配置:覆盖getComponentConfiguration方法
- 组件外部配置:调用TopologyBuilder的setSpout/setBolt的返回值的addConfiguration/addConfigurations方法。此会覆盖途径1的配置项
配置信息的优先级,从低到高:defaults.yaml ⇨ storm.yaml ⇨ 拓扑配置 ⇨ 组件内部配置 ⇨ 组件外部配置
通过此助手类,可以为拓扑指定配置,此类包含了很多对应了配置项Key的常量值。
要配置一个拓扑在整个集群上,拥有的工作进程的数量,可以使用Config.TOPOLOGY_WORKERS配置项。
要提示为每个组件创建执行线程的数量,可以在setSpout/setBolt时传递parallelism_hint参数,指定初始的执行器数量。
要设置某个组件的任务数量,可以使用Config.TOPOLOGY_TASKS配置项。或者调用 ComponentConfigurationDeclarer.setNumTasks()方法。
调用命令 storm rebalance 可以在运行期间修改拓扑工作进程数量、拓扑组件的执行器数量,示例:
1 2 3 4 |
# 使用5个执行线程 # 组件blue-spout使用3个执行器 # 组件yellow-bolt使用10个执行器 storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 |
# Native库的位置 java.library.path: "/usr/local/lib:/opt/local/lib:/usr/lib" ### storm.* 一般性配置 # JAR存放位置 storm.local.dir: "storm-local" # 使用的日志实现 storm.log4j2.conf.dir: "log4j2" # 使用的ZooKeeper服务器列表 storm.zookeeper.servers: - "localhost" # 使用的ZooKeeper服务器端口 storm.zookeeper.port: 2181 # 使用的ZooKeeper根节点 storm.zookeeper.root: "/storm" # 常规性ZooKeeper配置项 storm.zookeeper.session.timeout: 20000 storm.zookeeper.connection.timeout: 15000 storm.zookeeper.retry.times: 5 storm.zookeeper.retry.interval: 1000 storm.zookeeper.retry.intervalceiling.millis: 30000 storm.zookeeper.auth.user: null storm.zookeeper.auth.password: null storm.exhibitor.port: 8080 storm.exhibitor.poll.uripath: "/exhibitor/v1/cluster/list" # 集群运行模式:distributed、local storm.cluster.mode: "distributed" storm.local.mode.zmq: false storm.thrift.transport: "org.apache.storm.security.auth.SimpleTransportPlugin" storm.thrift.socket.timeout.ms: 600000 storm.principal.tolocal: "org.apache.storm.security.auth.DefaultPrincipalToLocal" storm.group.mapping.service: "org.apache.storm.security.auth.ShellBasedGroupsMapping" storm.group.mapping.service.params: null storm.messaging.transport: "org.apache.storm.messaging.netty.Context" storm.nimbus.retry.times: 5 storm.nimbus.retry.interval.millis: 2000 storm.nimbus.retry.intervalceiling.millis: 60000 storm.auth.simple-white-list.users: [] storm.auth.simple-acl.users: [] storm.auth.simple-acl.users.commands: [] storm.auth.simple-acl.admins: [] storm.cluster.state.store: "org.apache.storm.cluster_state.zookeeper_state_factory" storm.meta.serialization.delegate: "org.apache.storm.serialization.GzipThriftSerializationDelegate" storm.codedistributor.class: "org.apache.storm.codedistributor.LocalFileSystemCodeDistributor" storm.workers.artifacts.dir: "workers-artifacts" storm.health.check.dir: "healthchecks" storm.health.check.timeout.ms: 5000 storm.disable.symlinks: false ### nimbus.* configs are for the master nimbus.seeds : ["localhost"] nimbus.thrift.port: 6627 nimbus.thrift.threads: 64 nimbus.thrift.max_buffer_size: 1048576 nimbus.childopts: "-Xmx1024m" nimbus.task.timeout.secs: 30 nimbus.supervisor.timeout.secs: 60 nimbus.monitor.freq.secs: 10 nimbus.cleanup.inbox.freq.secs: 600 nimbus.inbox.jar.expiration.secs: 3600 # Nimbus节点多久尝试同步本地缺少的代码Blob nimbus.code.sync.freq.secs: 120 nimbus.task.launch.secs: 120 nimbus.file.copy.expiration.secs: 600 nimbus.topology.validator: "org.apache.storm.nimbus.DefaultTopologyValidator" # 最低复制因子,拓扑的代码、JAR、配置复制到多少Nimbus节点后,Leader才能将拓扑标记为Active并开始分配Worker topology.min.replication.count: 1 # 等待topology.min.replication.count满足的最大时间 # 超过此时间,则不等待复制完成,立即启动拓扑 # 设置为-1表示一直等待 topology.max.replication.wait.time.sec: 60 nimbus.credential.renewers.freq.secs: 600 nimbus.queue.size: 100000 scheduler.display.resource: false ### ui.* configs are for the master ui.host: 0.0.0.0 ui.port: 8080 ui.childopts: "-Xmx768m" ui.actions.enabled: true ui.filter: null ui.filter.params: null ui.users: null ui.header.buffer.bytes: 4096 ui.http.creds.plugin: org.apache.storm.security.auth.DefaultHttpCredentialsPlugin ui.http.x-frame-options: DENY logviewer.port: 8000 logviewer.childopts: "-Xmx128m" logviewer.cleanup.age.mins: 10080 logviewer.appender.name: "A1" logviewer.max.sum.worker.logs.size.mb: 4096 logviewer.max.per.worker.logs.size.mb: 2048 logs.users: null drpc.port: 3772 drpc.worker.threads: 64 drpc.max_buffer_size: 1048576 drpc.queue.size: 128 drpc.invocations.port: 3773 drpc.invocations.threads: 64 drpc.request.timeout.secs: 600 drpc.childopts: "-Xmx768m" drpc.http.port: 3774 drpc.https.port: -1 drpc.https.keystore.password: "" drpc.https.keystore.type: "JKS" drpc.http.creds.plugin: org.apache.storm.security.auth.DefaultHttpCredentialsPlugin drpc.authorizer.acl.filename: "drpc-auth-acl.yaml" drpc.authorizer.acl.strict: false # 用于Trident拓扑中Spout相关的元数据的存储 # 存储的根路径 transactional.zookeeper.root: "/transactional" # 用于存储的ZooKeeper集群信息 transactional.zookeeper.servers: null transactional.zookeeper.port: null ## blobstore configs # Supervisor用于和BlobStore通信的客户端类 # 如果使用HDFS实现,改为org.apache.storm.blobstore.HdfsClientBlobStore supervisor.blobstore.class: "org.apache.storm.blobstore.NimbusBlobStore" # Supervisor上用于并行下载Blob的线程数量 supervisor.blobstore.download.thread.count: 5 # 下载失败后,最大重试次数 supervisor.blobstore.download.max_retries: 3 # 本地缓存最大尺寸 supervisor.localizer.cache.target.size.mb: 10240 # 本地缓存清理间隔,超过最大尺寸的部分,以LRU算法清除 supervisor.localizer.cleanup.interval.ms: 600000 # 分布式缓存实现(BlobStore)实现类 nimbus.blobstore.class: "org.apache.storm.blobstore.LocalFsBlobStore" # 通过Master和Blobstore交互时,会话过期时间,单位秒 nimbus.blobstore.expiration.secs: 600 # 所有Blob存储的位置,对于本地文件系统实现,对应了Nimbus本地目录,对于HDFS实现,对应hdfs文件路径 blobstore.dir: $storm.local.dir/blobs # 上传Blob时,缓冲区大小 storm.blobstore.inputstream.buffer.size.bytes: 65536 # Storm客户端用于和BlobStore通信的客户端类 client.blobstore.class: "org.apache.storm.blobstore.NimbusBlobStore" # BlobStore中每个Blob的复制因子 # topology.min.replication.count用于设置拓扑的数据的复制因子 # topology.min.replication.count 应该小于等于 blobstore.replication.factor storm.blobstore.replication.factor: 3 # For secure mode we would want to change this config to true storm.blobstore.acl.validation.enabled: false ### supervisor.* configs are for node supervisors # Define the amount of workers that can be run on this machine. Each worker is assigned a port to use for communication supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 supervisor.childopts: "-Xmx256m" supervisor.run.worker.as.user: false #how long supervisor will wait to ensure that a worker process is started supervisor.worker.start.timeout.secs: 120 #how long between heartbeats until supervisor considers that worker dead and tries to restart it supervisor.worker.timeout.secs: 30 #how many seconds to sleep for before shutting down threads on worker supervisor.worker.shutdown.sleep.secs: 3 #how frequently the supervisor checks on the status of the processes it's monitoring and restarts if necessary supervisor.monitor.frequency.secs: 3 #how frequently the supervisor heartbeats to the cluster state (for nimbus) supervisor.heartbeat.frequency.secs: 5 supervisor.enable: true supervisor.supervisors: [] supervisor.supervisors.commands: [] supervisor.memory.capacity.mb: 3072.0 #By convention 1 cpu core should be about 100, but this can be adjusted if needed # using 100 makes it simple to set the desired value to the capacity measurement # for single threaded bolts supervisor.cpu.capacity: 400.0 ### worker.* configs are for task workers worker.heap.memory.mb: 768 worker.childopts: "-Xmx%HEAP-MEM%m -XX:+PrintGCDetails -Xloggc:artifacts/gc.log -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1M -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=artifacts/heapdump" worker.gc.childopts: "" # Unlocking commercial features requires a special license from Oracle. # See http://www.oracle.com/technetwork/java/javase/terms/products/index.html # For this reason, profiler features are disabled by default. worker.profiler.enabled: false worker.profiler.childopts: "-XX:+UnlockCommercialFeatures -XX:+FlightRecorder" worker.profiler.command: "flight.bash" worker.heartbeat.frequency.secs: 1 # check whether dynamic log levels can be reset from DEBUG to INFO in workers worker.log.level.reset.poll.secs: 30 # control how many worker receiver threads we need per worker topology.worker.receiver.thread.count: 1 task.heartbeat.frequency.secs: 3 task.refresh.poll.secs: 10 task.credentials.poll.secs: 30 task.backpressure.poll.secs: 30 # now should be null by default topology.backpressure.enable: false backpressure.disruptor.high.watermark: 0.9 backpressure.disruptor.low.watermark: 0.4 zmq.threads: 1 zmq.linger.millis: 5000 zmq.hwm: 0 storm.messaging.netty.server_worker_threads: 1 storm.messaging.netty.client_worker_threads: 1 storm.messaging.netty.buffer_size: 5242880 #5MB buffer # Since nimbus.task.launch.secs and supervisor.worker.start.timeout.secs are 120, other workers should also wait at least that long before giving up on connecting to the other worker. The reconnection period need also be bigger than storm.zookeeper.session.timeout(default is 20s), so that we can abort the reconnection when the target worker is dead. storm.messaging.netty.max_retries: 300 storm.messaging.netty.max_wait_ms: 1000 storm.messaging.netty.min_wait_ms: 100 # If the Netty messaging layer is busy(netty internal buffer not writable), the Netty client will try to batch message as more as possible up to the size of storm.messaging.netty.transfer.batch.size bytes, otherwise it will try to flush message as soon as possible to reduce latency. storm.messaging.netty.transfer.batch.size: 262144 # Sets the backlog value to specify when the channel binds to a local address storm.messaging.netty.socket.backlog: 500 # By default, the Netty SASL authentication is set to false. Users can override and set it true for a specific topology. storm.messaging.netty.authentication: false # Default plugin to use for automatic network topology discovery storm.network.topography.plugin: org.apache.storm.networktopography.DefaultRackDNSToSwitchMapping # default number of seconds group mapping service will cache user group storm.group.mapping.service.cache.duration.secs: 120 ### topology.* configs are for specific executing storms # 是否启用消息处理超时 topology.enable.message.timeouts: true # 释放禁用拓扑调试 topology.debug: false topology.workers: 1 # ACK线程的数量,设置为0则禁用ACK topology.acker.executors: null # 事件记录器线程数量,设置为0则禁用事件记录 topology.eventlogger.executors: 0 topology.tasks: null # maximum amount of time a message has to complete before it's considered failed topology.message.timeout.secs: 30 topology.multilang.serializer: "org.apache.storm.multilang.JsonSerializer" topology.shellbolt.max.pending: 100 topology.skip.missing.kryo.registrations: false topology.max.task.parallelism: null # 对于事务性拓扑,表示能够同时处理的元组批次的数量 # 对于非事务性拓扑,如果启用了消息确认(ACK),则拓扑中尚未ACK的原初元组的数量,超过此数量不再继续释放原初元组 topology.max.spout.pending: null topology.state.synchronization.timeout.secs: 60 topology.stats.sample.rate: 0.05 topology.builtin.metrics.bucket.size.secs: 60 topology.fall.back.on.java.serialization: true topology.worker.childopts: null topology.worker.logwriter.childopts: "-Xmx64m" topology.executor.receive.buffer.size: 1024 #batched topology.executor.send.buffer.size: 1024 #individual messages topology.transfer.buffer.size: 1024 # batched topology.tick.tuple.freq.secs: null topology.worker.shared.thread.pool.size: 4 topology.spout.wait.strategy: "org.apache.storm.spout.SleepSpoutWaitStrategy" topology.sleep.spout.wait.strategy.time.ms: 1 topology.error.throttle.interval.secs: 10 topology.max.error.report.per.interval: 5 topology.kryo.factory: "org.apache.storm.serialization.DefaultKryoFactory" topology.tuple.serializer: "org.apache.storm.serialization.types.ListDelegateSerializer" topology.trident.batch.emit.interval.millis: 500 topology.testing.always.try.serialize: false topology.classpath: null topology.environment: null topology.bolts.outgoing.overflow.buffer.enable: false topology.disruptor.wait.timeout.millis: 1000 topology.disruptor.batch.size: 100 topology.disruptor.batch.timeout.millis: 1 topology.disable.loadaware.messaging: false topology.state.checkpoint.interval.ms: 1000 # Configs for Resource Aware Scheduler # topology priority describing the importance of the topology in decreasing importance starting from 0 (i.e. 0 is the highest priority and the priority importance decreases as the priority number increases). # Recommended range of 0-29 but no hard limit set. topology.priority: 29 topology.component.resources.onheap.memory.mb: 128.0 topology.component.resources.offheap.memory.mb: 0.0 topology.component.cpu.pcore.percent: 10.0 # Worker进程的最大堆尺寸 topology.worker.max.heap.size.mb: 768.0 topology.scheduler.strategy: "org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy" resource.aware.scheduler.eviction.strategy: "org.apache.storm.scheduler.resource.strategies.eviction.DefaultEvictionStrategy" resource.aware.scheduler.priority.strategy: "org.apache.storm.scheduler.resource.strategies.priority.DefaultSchedulingPriorityStrategy" dev.zookeeper.path: "/tmp/dev-storm-zookeeper" pacemaker.host: "localhost" pacemaker.port: 6699 pacemaker.base.threads: 10 pacemaker.max.threads: 50 pacemaker.thread.timeout: 10 pacemaker.childopts: "-Xmx1024m" pacemaker.auth.method: "NONE" pacemaker.kerberos.users: [] #default storm daemon metrics reporter plugins storm.daemon.metrics.reporter.plugins: - "org.apache.storm.daemon.metrics.reporters.JmxPreparableReporter" # configuration of cluster metrics consumer storm.cluster.metrics.consumer.publish.interval.secs: 60 |
前面我们提到过,Storm支持两种运行模式:本地模式、远程模式。在本地模式中你可以在单JVM进程中调试整个拓扑。
开发环境就是包含了所有Storm组件的环境,你可以在本地模式下开发、测试Storm拓扑。或者将集群提交到远程集群,启动/停止远程集群上的拓扑。
你的开发机器和远程集群的关系是这样的:远程集群被主节点——Nimbus所管理,你的机器和Nimbus交互,提交JAR格式的拓扑。随后拓扑在集群中被调度、执行。你的机器可以通过命令行客户端storm和Nimbus交互,此客户端仅仅支持远程模式。
首先,下载最新版本的Storm,解压后,把bin子目录放到PATH环境变量中。bin目录中包含客户端脚本。
要想管理远程集群,将集群连接信息存放到 ~/.storm/storm.yaml文件中。
主要步骤包括:
- 创建ZooKeeper集群
- 在Nimbus、Worker节点上安装依赖
- 在Nimbus、Worker节点上下载并解压Storm
- 在storm.yaml中添加必须的配置
- 使用你选择的监控工具,通过storm命令启动守护程序。
请参考:ZooKeeper学习笔记
安装Java 7+以及Python 2.6.6,不赘述。
Python 3应该也可以工作,但是没有经过详尽的测试。
请参考:搭建开发环境
打开$STORM_HOME/ conf/storm.yaml文件,添加以下配置:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
# 指定ZooKeeper集群的主机列表: storm.zookeeper.servers: - "zookeeper-1.gmem.cc" - "zookeeper-1.gmem.cc" - "zookeeper-3.gmem.cc" storm.zookeeper.port: 2181 # 当前节点的名称,默认自动根据主机名获取 storm.local.hostname: "storm-n1.gmem.cc" # Nimbus/Supervisor需要在本地文件系统中存放少量状态数据,例如 jars, confs # 此目录需要提前创建,并授予适当的读写权限 storm.local.dir: "/home/alex/JavaEE/middleware/storm/data/local" # 工作节点需要知道哪些机器是Master节点的候选,以便从这些节点下载拓扑JAR、配置文件 # 你应该列出目标机器的全限定域名(FQDN)。如果使用Nimbus HA,要列出所有运行Nimbus进程的机器 nimbus.seeds: ["storm-n1.gmem.cc"] # 对于每个工作节点,你需要指定它运行多少个工作进程。每个工作进程需要独特的端口来接收消息 # 这里指定多少端口,就创建多少工作进程。默认创建6700-6703这4个端口 supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 |
Storm允许你提供一些脚本,让Supervisor定期的执行,以监控节点的健康状态。如果脚本监测到节点不健康,它应当在标准输出打印一行以ERROR开头的文字。Supervisor会周期性的执行脚本,一旦发现ERROR,就会关闭节点上所有Worker进程并退出。
在监控工具下运行守护进程时,你可以考虑调用storm node-health-check命令确认节点健康状态,进一步决定是否继续运行守护进程。
1 2 3 4 |
# 指定健康检测脚本所在目录 storm.health.check.dir: "healthchecks" # 脚本执行超时时间 storm.health.check.timeout.ms: 5000 |
如果需要支持外部库、自定义插件,可以把相关的JAR存放到extlib、extlib-daemon目录下。需要注意, extlib-daemon中的JAR仅仅能被守护进程(不能被Worker也就是拓扑逻辑)使用。
你也可以使用STORM_EXT_CLASSPATH、STORM_EXT_CLASSPATH_DAEMON这两个环境变量。
1 2 3 4 5 6 7 8 |
# 仅Nimbus节点 # UI服务器配置 ui.host: 0.0.0.0 ui.port: 6600 # 日志查看器配置 logviewer.port: 6601 |
强烈推荐在监控工具下(supervision tool)启动守护程序,这样意外崩溃后可以立即重启。Storm守护程序被设计为快速失败、无状态的,重启后拓扑不会受到影响:
- 要启动Nimbus,执行 storm nimbus命令
- 要启动Supervisor,在每个Worker节点执行 storm supervisor命令
- 要运行Storm UI服务器,在Nimbus节点执行 storm ui,默认Web端口8080
所有守护程序的日志默认存放在logs目录下。
安装此监控工具参考:Ubuntu下使用monit。服务条目示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
check process storm-nimbus matching daemon.name=nimbus start program = "/home/alex/JavaEE/middleware/storm/bin/storm nimbus" as uid alex and gid alex stop program = "/bin/kill -9 $MONIT_PROCESS_PID" as uid alex and gid alex check process storm-ui matching daemon.name=ui start program = "/home/alex/JavaEE/middleware/storm/bin/storm ui" as uid alex and gid alex stop program = "/bin/kill -9 $MONIT_PROCESS_PID" as uid alex and gid alex check process storm-drpc matching daemon.name=drpc start program = "/home/alex/JavaEE/middleware/storm/bin/storm drpc" as uid alex and gid alex stop program = "/bin/kill -9 $MONIT_PROCESS_PID" as uid alex and gid alex check process storm-log matching daemon.name=logviewer start program = "/home/alex/JavaEE/middleware/storm/bin/storm logviewer" as uid alex and gid alex stop program = "/bin/kill -9 $MONIT_PROCESS_PID" as uid alex and gid alex |
请参考:官方Docker镜像。
拉取镜像: docker pull storm:1.1.1
用法示例:
1 2 3 4 5 6 |
# 以本地模式运行一个拓扑 docker run -it -v topology.jar:/topology.jar storm storm jar /topology.jar pkg.Topology # 运行nimbus docker run -d --restart always --name nimbus storm storm nimbus # 运行supervisor docker run -d --restart always --name supervisor storm storm supervisor |
此镜像使用的默认配置是defaults.yaml,改变配置的方式由两种:
- 使用命令行参数,例如 -c storm.zookeeper.servers='["zookeeper"]'
- 在容器文件系统路径 /conf/storm.yaml放置配置文件
数据和日志的存储目录位于/data、/logs,这两个目录的所有者为storm。
1 |
docker run --name storm-s1 --hostname storm-s1 --network local --ip 172.21.2.1 --dns 172.21.0.1 -d docker.gmem.cc/storm storm supervisor -c storm.local.hostname='storm-s1.gmem.cc' |
Storm拓扑的调试主要依赖于日志,特别是在远程集群中运行的情况下。
你可以通过Storm UI或者命令行动态的改变某个包前缀的日志级别,方式类似于设置Log4J的日志级别。
通过Storm UI首页找到需要调试的拓扑,点击链接进入,可以看到类似下面的页面:
此页面提供了和拓扑相关的大量信息:
- 顶部的搜索栏,可以对Worker的日志进行搜索
- 按钮区域,提供了激活、禁用、Rebalance、杀死、起停调试功能,还可以设置日志级别
- 拓扑的名称、状态、Worker数量、Executor数量、Task数量等基本信息
- 释放、传输(释放并发送给1-N个Bolt)元组的数量、延迟时间,Ack、Fail的数量
- Spout、Bolt的各类统计信息
- 拥有的Worker在集群中的分布情况,组件和Worker的对应关系
- 可以查看图形化的拓扑结构(蓝色表示Spout)
- 拓扑的配置信息
要通过Storm UI修改日志级别,点击Change Log Level链接,填写表单然后Apply即可。Logger字段可以填写你的代码中使用的日志器前缀(包名前缀)。Timeout指明本次日志级别调整激活多久。
参见命令行客户端。
通过Logviewer不能直接查看Worker的日志文件。Worker的所有日志存放在 ${storm.log.dir}/workers-artifacts/${topologyId}/${port}目录下。worker.log即为工作日志文件,worker.log.err则包含错误级别的日志。
你可以在上述Storm UI界面的顶部,输入关键字(例如Exception)来搜索Worker日志,右上角的放大镜按钮也可以用来进行搜索(默认针对所有拓扑)。
要进行动态JVM剖析,点击拓扑的某个组件,可以看到Profiling and Debugging面板。点选下面的Executors面板的条目,然后点击对应的按钮:
- JStack:生成Thread Dump
- Start:启动剖析
- Heap:生成Heap Dump
点击My Dump Files可以下载上述按钮生成的剖析文件。
配置 | 说明 |
worker.profiler.command | 指定剖析工具 |
worker.profiler.enabled | 可能被禁用,如果JDK不支持JFR记录、剖析插件也不可用的话 |
拓扑事件查看器能够用来确定元组如何穿过拓扑的每一个Stage,实时查看元组处理过程时,不会对拓扑运行产生影响。
默认事件记录功能被禁用,你可以设置 topology.eventlogger.executors为大于0的值,以启用之。取值1表示每个拓扑一个日志记录Task,取值nil表示每个Worker一个日志记录Task。
要启动事件记录,点击Topolog Actions面板上的Debug按钮。
要查看被处理的元组,进入对应Spout/Bolt组件的页面,在Component summary面板中,点击events链接。
Storm中的日志记录Bolt利用IEventLogger接口实现事件日志,此接口的默认实现是FileBasedEventLogger。你可以扩展此接口实现更加复杂的逻辑,例如记录到数据库。
Storm提供了一个名为storm的客户端程序,用于和远程集群进行交互。
调用格式:
1 2 3 4 5 6 7 |
storm jar topology-jar-path class # 逗号分隔 --jars your-local-jar.jar,your-local-jar2.jar # 逗号分隔,^用于排除依赖 --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12" # 指定下载构件的仓库,^用作分隔符 --artifactRepositories "jboss-repository^" |
调用class的main函数,需要的配置文件和依赖的jar包(例如storm的jar),放置于~/.storm目录。main函数通常调用StormSubmitter,并导致topology-jar-path被上传到Nimbus。
如果存在依赖的、没有打包到topology-jar-path的其它jar包,可以使用--jars选项。如果想使用Maven构件的方式指定依赖,可以使用--artifacts选项。
依赖会同时上传作为Worker进程的classpath条目。
调用格式: storm sql sql-file topology-name --jars --artifacts --artifactRepositories
编译SQL为Trident拓扑,并提交到Storm集群。
调用格式: storm kill topology-name [-w wait-time-secs]
杀死具有指定名称的拓扑,最多等待wait-time-secs秒。
Storm会首先禁用Spout,然后等待一定时间(默认是拓扑的消息处理超时),让正在处理的消息完成。之后,Storm会关闭Worker进程并清理状态。
调用格式: storm activate topology-name
激活指定拓扑的Spout
调用格式: storm deactivate topology-name
禁用指定拓扑的Spout
调用格式:
1 |
storm rebalance topology-name [-w wait-time-secs] [-n workers-num] [-e component=parallelism]* |
某些时候你希望把拓扑的Worker进程均匀分布到更多的机器上。
举例来说,假设你有个10台机器的集群,每个运行4个工作进程。现在,又添加了10个节点,你需要使用这些机器的运算能力,让每个节点运行2个工作进程。一种方法是,杀死并重新提交拓扑,但是rebalance提供了更加简单的方法。
rebalance首先禁用指定的拓扑Spout(等待到消息超时或者-w),然后,在集群中均匀的创建Worker进程,最后激活拓扑。
rebalance也可以用来改变拓扑的并发度: -n用来改变Worker进程数量;-e用来指定组件的Executor数量(线程数量,和Task数量不是一回事)
调用格式: storm repl
开启Clojure REPL环境。
调用格式: storm classpath,打印客户端的classpath信息。
调用格式: storm localconfvalue conf-name
打印配置项conf-name的本地值,本地配置是~/.storm/storm.yaml合并了defaults.yaml的结果。
调用格式: storm remoteconfvalue conf-name
必须在集群中的节点上执行,打印集群的配置项。集群配置是$STORM-PATH/conf/storm.yaml合并了defaults.yaml的结果。
调用格式: storm nimbus。启动Nimbus守护进程,此命令应该在 daemontools、 monit之类的supervision工具中调用。
调用格式: storm supervisor。启动Supervisor守护进程,此命令应该在 daemontools、 monit之类的supervision工具中调用。
调用格式: storm ui。启动Web UI服务器守护进程,此命令应该在 daemontools、 monit之类的supervision工具中调用。
调用格式: storm drpc。启动DRPC服务器守护进程,此命令应该在 daemontools、 monit之类的supervision工具中调用。
调用格式: storm blobstore cmd。管理Storm的分布式缓存。子命令列表:
子命令 | 说明 |
list [KEY...] | 列出当前存储的blob |
cat [-f FILE] KEY | 读取一个blob,写入到文件或者stdout |
create [-f FILE] [-a ACL ...] [--replication-factor NUMBER] KEY |
创建一个blob,其内容来自文件或者stdin ACL是一个逗号分隔的列表,条目格式[uo]:[username]:[r-][w-][a-] |
update [-f FILE] KEY | 更新一个blob的内容 |
delete KEY | 删除一个blob |
set-acl [-s ACL] KEY | 设置一个blob的访问控制列表 |
replication --read KEY | 读取blob的复制因子 |
onitor
调用格式:
storm dev-zookeeper
启动一个全新的zookeeper服务器,使用dev.zookeeper.path作为其本地目录,使用storm.zookeeper.port作为其端口。仅仅用于开发/测试。
调用格式: storm get-errors topology-name
获得指定拓扑的最新的错误信息。返回结果是component-name: component-error的键值对JSON
调用格式: storm heartbeats [cmd]
调用格式: storm kill_workers
杀死运行在此Supervisor节点上的Worker进程,必须在Supervisor节点上调用。
调用格式: storm list。列出集群中运行的拓扑,以及这些拓扑的状态。
调用格式: storm logviewer。启动日志查看器进程,此进程提供Web UI,此命令应该在 daemontools、 monit之类的supervision工具中调用。
调用格式:
1 2 3 4 5 |
storm monitor topology-name [-i interval-secs] # 默认4秒轮询一次 [-m component-id] # 默认所有组件 [-s stream-id] # 默认default [-w [emitted | transferred]] # 默认emitted |
交互式的对指定拓扑的吞吐量进行监控。
调用格式: storm node-health-check。在本地Supervisor上运行健康状态检测。
调用格式: storm pacemaker。启动Pacemaker进程,此进程提供Web UI,此命令应该在 daemontools、 monit之类的supervision工具中调用。
调用格式:
1 2 3 4 5 6 7 8 9 10 |
storm set_log_level # 日志级别ALL, TRACE, DEBUG, INFO, WARN, ERROR, FATAL, OFF # timeout 秒,整数 -l [logger name]=[log level][:optional timeout] -r [logger name] topology-name # 示例 storm set_log_level -l ROOT=DEBUG:30 my-topology storm set_log_level -l com.myapp=WARN my-topology |
动态的控制拓扑的日志级别。
调用格式: storm upload_credentials topology-name [credkey credvalue]*
更新credentials集到运行中的拓扑。
Trident(音标/'traɪd(ə)nt/)是Storm提供的另外一种访问接口,它提供一些高层抽象,包括:
- 精确的一次处理语义
- 事务性数据持久化
- 一些通用的流分析功能
使用Trident你可以无缝的将低延迟分布式查询和高吞吐量(每秒百万级消息)、有状态流处理混合。如果你对Pig、Cascading之类的高层批处理工具熟悉,理解起Trident会很容易。Trident有类似的连接(Join)、聚合(Aggregation)、分组(Grouping)、函数(Function)、过滤器(Filter)概念,此外Trident还提供在任何数据库、存储机制基础上进行有状态、增量处理的原语。
考虑两个流处理需求:
- 从输入流中统计每个姓名出现的次数
- 能够传入一个姓名列表,获得这些姓名出现的总次数
非线性随机名字生成器:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 |
package cc.gmem.study.bdg; import org.apache.commons.lang3.RandomUtils; import org.apache.commons.lang3.StringUtils; import java.util.function.BiFunction; public class RandomNameGenerator { private static String COMMON_FIRST_NAMES[] = new String[]{ "伟", "芳", "秀", "英", "娜", "敏", "静", "丽", "强", "磊", "军", "洋", "勇", "艳", "杰", "娟", "涛", "明", "超", "兰", "霞", "平", "桂", "诚", "先", "敬", "震", "振", "壮", "会", "思", "群", "豪", "心", "邦", "承", "乐", "绍", "功", "松", "善", "厚", "裕", "河", "哲", "江", "亮", "政", "谦", "亨", "奇", "固", "之", "轮", "翰", "朗", "伯", "宏", "言", "海", "山", "仁", "波", "宁", "福", "生", "龙", "元", "全", "国", "胜", "学", "祥", "才", "发", "武", "新", "利", "毅", "俊", "峰", "保", "东", "文", "辉", "力", "永", "健", "世", "广", "鸣", "朋", "斌", "行", "时", "泰", "博", "磊", "民", "友", "志", "清", "坚", "庆", "若", "德", "彪", "盛", "雄", "琛", "钧", "冠", "策", "腾", "楠", "榕", "风", "航", "弘", "义", "兴", "良", "飞", "彬", "富", "和", "梁", "栋", "维", "启", "克", "伦", "翔", "旭", "鹏", "泽", "晨", "辰", "士", "以", "建", "家", "致", "树", "炎", "蕊", "薇", "菁", "梦", "岚", "苑", "婕", "馨", "瑗", "琰", "韵", "融", "园", "艺", "咏", "卿", "聪", "澜", "纯", "爽", "琬", "茗", "羽", "希", "宁", "欣", "飘", "育", "滢", "馥", "筠", "柔", "竹", "霭", "凝", "晓", "欢", "霄", "伊", "亚", "宜", "可", "姬", "舒", "影", "荔", "枝", "芬", "芳", "燕", "莺", "媛", "珊", "莎", "蓉", "好", "君", "琴", "毓", "悦", "昭", "冰", "枫", "芸", "菲", "寒", "锦", "玲", "秋", "秀", "娟", "英", "华", "慧", "巧", "美", "淑", "惠", "珠", "翠", "雅", "芝", "玉", "萍", "红", "月", "彩", "春", "菊", "凤", "洁", "梅", "琳", "怡", "宝" }; private static String COMMON_LAST_NAMES[] = { "李", "王", "张", "刘", "陈", "杨", "赵", "黄", "周", "吴", "徐", "孙", "胡", "朱", "高", "林", "何", "郭", "马", "罗", "梁", "宋", "郑", "谢", "韩", "唐", "冯", "于", "董", "萧", "程", "曹", "袁", "邓", "许", "傅", "沈", "曾", "彭", "吕", "苏", "卢", "蒋", "蔡", "贾", "丁", "魏", "薛", "叶", "阎", "余", "潘", "杜", "戴", "夏", "钟", "汪", "田", "任", "姜", "范", "方", "石", "姚", "谭", "廖", "邹", "熊", |