Kafka Streams学习笔记
当前比较流行的实时计算框架包括Apache Storm、Apache Spark等。这些框架的功能强大而全面,但是具有以下缺点:
- 复杂度高,应对某些简单的工作显得笨重
- 部署Storm、Spark等分布式框架需要预留集群支持,增加开发负担
Kafka Streams是一个实时计算框架,其特点是:
- 低门槛:你可以很快的编写一段代码,在单机上运行。要扩容支持生产环境只需要简单的启动多个实例。利用Kafka的并行模型,Kafka Streams能够透明的处理多个实例之间的负载均衡。相比之下,Storm虽然也有本地集群,但是其运行环境和远程集群并不一样
- 简单而轻量:轻松的嵌入到任何Java应用中,轻松的和现有打包、部署、操作工具整合
- 没有额外的依赖:除了Kafka本身,没有其它依赖
- 支持容错的本地状态:可以进行快速有效的有状态操作,例如窗口化Join、聚合
- 支持精确一次性处理语义:不管是Streams客户端还是Kafka代理出现失败,都能保证一次且仅一次的处理
- 对于单记录的处理,实现毫秒级延迟。支持基于事件时间对一系列记录进行窗口操作
- 提供必要的流处理原语,同时提供高层的DSL和低级的Processor API
Kafka Streams针对来自1-N个主题的输入进行持续计算,并把计算结果输出到0-N个输出主题中。
每个KafkaStreams实例可以包含1-N个线程,线程数量在配置中指定,这些线程用于执行数据处理任务。
KafkaStreams实例与其他具有相同Application ID的实例进行协作,这些实例可能分布在同一JVM、不同JVM进程或者不同的机器上。所有相同Application ID的实例的整体,构成一个流处理程序。通过分配输入主题的分区给不同的实例,实现处理任务的划分。如果某个实例宕机,其它实例会瓜分它持有的分区,进行负载再平衡,同时确保任何分区都被消费。
流是Kafka Streams中最重要的一个抽象,它:
- 是一个无边界的、持续更新的数据集
- 是一个有序的、容错的、不变的数据记录的序列。数据记录以键值对的形式定义
流处理程序是指,利用Kafka Streams库,以处理器拓扑(Processor Topologies)的形式定义计算逻辑的应用程序。处理器拓扑是节点(流处理器)的有向图,节点通过流连接。
流处理器,以及拓扑节点,它每次从上游流接受一个输入记录,进行转换、处理,可选的释放一或多个记录到下游流中。有两种特殊的流处理器:
- Source处理器:它没有上游处理器,通过读取Kafka主题,消费其中的记录,并释放到下游流中
- Sink处理器:它没有下游处理器,负责将记录存储到Kafka主题中
拓扑中的节点可以调用任何外部系统,因而有机会将记录存储在外部,而非必须存储在Kafka主题中。
KafkaStreams的计算逻辑可通过两种方式来定义:
- 基于 Processor API 定义,生成一个Processor组成的有向无环图(DAG)拓扑。可以访问状态存储
- 基于StreamsBuilder定义,此类提供一个高层的DSL。支持map、filter、join、aggregation等常见操作
处理器拓扑仅仅是逻辑的抽象,在运行时,该逻辑拓扑会被实例化,并在应用中复制,以增强并行处理能力。
对于流处理来说时间是个很重要的概念,窗口化等操作依赖时间来确定边界。Kafka Streams中有三种时间:
时间 | 说明 |
Event Time/事件时间 | 事件或者数据记录的产生时间。以GPS位置采集记录为例,事件时间应该是GPS传感器捕获到位置的那个时间 |
Ingestion Time/吸收时间 | 数据记录被存入到Kafka分区日志的时间 |
Process Time/处理时间 | 数据记录被Kafka Streams处理完毕的时间 |
事件时间、吸收时间是二选一的。由Kafka配置参数(在代理或者主题级别)确定,从0.10开始时间戳自动嵌入到Kafka消息中。
接口TimestampExtractor用于抽取时间戳,分配给每个记录,此接口的默认实现简单的从Kafka消息中获取上述嵌入时间戳。抽取到的时间戳被称为Stream Time。TimestampExtractor的实现可以提供不同的Stream Time语义,它可以从记录的任意字段推导出时间戳,或者直接使用墙上时间。
当Sink处理器将记录写入到Kafka主题时,也需要为记录授予时间戳,具体方式取决于上下文:
- 如果输出记录通过处理某些输入记录而得到,例如在process()中调用了context.forward()方法,输出记录的时间戳直接从输入记录继承
- 如果输出记录由周期性函数,例如Punctuator#punctuate()生成,则时间戳使用当前Task的内部时间,此内部时间通过context.timestamp()获取
- 对于聚合操作,结果记录的时间戳更新为最后一个参与聚合的输入记录的时间戳
某些流处理程序需要记录一些状态信息,例如,聚合类的操作需要随时暂存当前的聚合结果。Kafka Streams DSL包含很多状态性的操作。
Kafka Stream引入状态存储(State Store)的概念,流处理程序可以针对其进行状态的读写。每个流处理Task都可以嵌入一个或多个状态存储。转台存储可以实现为持久化键值对存储、内存哈希表或者其他的形式。Kafka Stream为本地状态存储提供容错、自动恢复功能。
交互式查询(Interactive Queries)这一特性,允许进程内部、外部的代码对流处理程序创建的状态存储进行只读的操作。
关于流处理框架的最常见的提问是,该框架能否保证每个记录被处理一次且仅一次,即使在处理过程中出现失败的情况下?
某些业务场景不允许数据丢失或者重复,因此常常引入面向批处理的框架,配合流处理管线,这被称为Lambda架构。
在0.11版本之前,Kafka仅仅提供至少一次性递送保证,因此任何基于Kafka的流处理系统都不能保证端对端的精确一次性。
从0.11版本开始,Kafka支持生产者向不同的分区甚至主题进行事务性、幂等性的消息发送,利用这一特性,Kafka Streams可以支持精确一次性处理:
- 对于任何从源主题读取来的记录,其处理结果在目的主题上精确的体现一次
- 对于任何从源主题读取来的记录,其处理结果在有状态操作的状态存储上精确的体现一次
与其他实时处理框架不同的是,Kafka Streams和底层的Kafka存储机制紧密集成,确保对输入主题偏移量的提交、对状态存储的更新、对输出主题的写入能够原子的完成。
要启用精确一次性保证,需要配置processing.guarantee=exactly_once。默认值是at_least_once。
本章引入一个简单的,基于StreamBuilder的流处理程序,作为入门的例子。
下面是一个人名个数统计的例子,它能够在生产环境中弹性(动态增加硬件)、可扩容的部署:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
package cc.gmem.study.kafka; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Produced; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.TimeUnit; public class NameCountApplication { private static final Logger LOGGER = LogManager.getLogger( NameCountApplication.class ); public static void main( final String[] args ) throws Exception { Properties config = new Properties(); // 应用的标识符,不同的实例依据此标识符相互发现 config.put( StreamsConfig.APPLICATION_ID_CONFIG, "names-counter-application" ); // 启动时使用的Kafka服务器 config.put( StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1.gmem.cc:9092" ); // 键值串行化类 config.put( StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass() ); config.put( StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass() ); // High Level DSL for building topologies StreamsBuilder builder = new StreamsBuilder(); // KStream是记录(KeyValue)流的抽象 KStream<String, String> nameBatches = builder.stream( "names" ); KTable<String, Long> nameCounts = nameBatches // 一到多映射,分割字符串 .flatMapValues( nameBatch -> Arrays.asList( nameBatch.split( "\\W+" ) ) ) // 根据人名分组 .groupBy( ( key, name ) -> name ) // 进行聚合,结果存放到StateStore中 .count( Materialized.as( "names-count-store" ) ); // 输出到目标 nameCounts.toStream().to( "names-count", Produced.with( Serdes.String(), Serdes.Long() ) ); // 构建流处理程序并启动 KafkaStreams streams = new KafkaStreams( builder.build(), config ); LOGGER.trace( "Prepare to start stream processing." ); streams.start(); TimeUnit.DAYS.sleep( 1 ); // 阻塞主线程 } } |
首先,创建输入、输出主题:
1 2 |
kafka-topics.sh --create --zookeeper zk-1.gmem.cc:2181/kafka --replication-factor 1 --partitions 1 --topic names kafka-topics.sh --create --zookeeper zk-1.gmem.cc:2181/kafka --replication-factor 1 --partitions 1 --topic names-count |
然后,通过命令行准备一批空格分隔的人名。并准备好输出主题的消费者:
1 2 3 4 |
topic-consume names-count --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer |
启动上面的流处理程序,上面的消费者开始输出:
1 2 3 4 5 6 7 |
Meng 2 Zhen 2 Cai 4 Dang 3 Ya 4 Alex 5 Alex 6 # 计数更新,发布新记录 |
每个名字的计数更新后,流处理程序都会在names-count上发布一个记录,其键位人名,值为计数。记录被逐个打印在消费者的标准输出中。
本章介绍Kafka Streams的底层API。
拓扑是各种处理器和Kafka主题共同构成的网络,要创建拓扑可以参考:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
Topology topology = new Topology(); // 指定拓扑的输入,也就是Kafka主题 topology.addSource( "SOURCE", "src-topic" ) // 添加一个处理器PROCESS1,其上游为拓扑输入(通过名称引用) .addProcessor( "PROCESS1", () -> new Processor1(), "SOURCE" ) // 添加另一个处理器PROCESS2,以PROCESS1为上游 .addProcessor( "PROCESS2", () -> new Processor2(), "PROCESS1" ) // 添加另一个处理器PROCESS3,仍然以PROCESS1为上游,注意拓扑分叉了 .addProcessor( "PROCESS3", () -> new Processor3(), "PROCESS1" ) // 添加一个输出处理器,输出到sink-topic1,以PROCESS1为上游 .addSink( "SINK1", "sink-topic1", "PROCESS1" ) // 添加一个输出处理器,输出到sink-topic2,以PROCESS2为上游 .addSink( "SINK2", "sink-topic2", "PROCESS2" ) // 添加一个输出处理器,输出到sink-topic3,以PROCESS3为上游 .addSink( "SINK3", "sink-topic3", "PROCESS3" ); |
该接口用于定义一个流处理器,也就是处理器拓扑中的节点。流处理器以参数化类型的方式限定了其键、值的类型。你可以定义任意数量的流处理器,并且连同它们关联的状态存储一起,组装出拓扑。
Processor.process()方法针对收到的每一个记录进行处理。Processor.init()方法实例化了一个ProcessorContext,流处理器可以调用上下文:
- context().schedule,调度一个Punctuation函数,周期性执行
- context().forward,转发新的或者被修改的键值对给下游处理器
- context().commit,提交当前处理进度
下面是基于Processor API的人名个数统计处理器示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
package cc.gmem.study.kafka.streams.low; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; public class NameCounterProcessor implements Processor<String, String> { private ProcessorContext context; private KeyValueStore<String, Long> kvStore; @Override public void init( ProcessorContext context ) { // 保存引用,类似于Storm的TopologyContext this.context = context; // 从上下文中取回一个状态存储 this.kvStore = (KeyValueStore<String, Long>) context.getStateStore( "NameCounts" ); // 以墙上时间为准,每秒执行Punctuator逻辑 this.context.schedule( 1000, PunctuationType.WALL_CLOCK_TIME, timestamp -> { NameCounterProcessor.this.punctuate( timestamp ); } ); } /** * 接收一个记录(人名列表)并处理 * * @param dummy 记录的键,无用 * @param line 记录的值 */ @Override public void process( String dummy, String line ) { String[] names = line.toLowerCase().split( " " ); // 在键值存储中更新人名计数 for ( String name : names ) { Long oldCount = this.kvStore.get( name ); if ( oldCount == null ) { this.kvStore.put( name, 1L ); } else { this.kvStore.put( name, oldCount + 1L ); } } } @Override public void punctuate( long timestamp ) { // 获得键值存储的迭代器 KeyValueIterator<String, Long> iter = this.kvStore.all(); while ( iter.hasNext() ) { KeyValue<String, Long> entry = iter.next(); // 转发记录给下游处理器 context.forward( entry.key, entry.value.toString() ); } /** * 调用者必须要负责关闭状态存储上的迭代器 * 否则可能(取决于底层状态存储的实现)导致内存、文件句柄的泄漏 */ iter.close(); // 请求提交当前流状态(消费进度) context.commit(); } @Override public void close() { // 在此关闭所有持有的资源,但是状态存储不需要关闭,由Kafka Stream自己维护其生命周期 } } |
要位拓扑中每个Processor提供状态存储,调用:
1 2 3 4 5 |
Topology topology = new Topology(); topology.addSource("Source", "source-topic") .addProcessor("Process", () -> new WordCountProcessor(), "Source") // 为处理器Process提供一个状态存储 .addStateStore(countStoreSupplier, "Process"); |
为了支持容错、支持无数据丢失的状态迁移, 状态存储可以持续不断的、在后台备份到Kafka主题中。上述用于主题被称为状态存储的changelog主题,或者直接叫changelog。
你可以启用或者禁用状态存储的备份特性。
持久性的KV存储是容错的,它备份在一个紧凑格式的changelog主题中。使用紧凑格式的原因是:
- 防止主题无限增长
- 减少主题占用的存储空间
- 当状态存储需要通过Changelog恢复时,缩短需要的时间
持久性的窗口化存储也是容错的,它基于紧凑格式、支持删除机制的主题备份。窗口化存储的changelog的键的一部分是窗口时间戳,过期的窗口对应的段会被Kafka的日志清理器清理。changelog的默认存留时间是Windows#maintainMs() + 1天。指定StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG可以覆盖之。
启动应用程序时,状态存储通常不需要根据changelog来恢复,直接加载磁盘上持久化的数据就可以。但以下场景中:
- 宕机导致本地状态丢失
- 运行在无状态环境下的应用程序重启
状态存储需要基于changelog进行完整的恢复。
如果changelog中的数据量很大,则恢复过程可能相当的耗时。在恢复完成之前,处理器拓扑不能处理新的数据。
要监控状态存储的恢复进度,你需要实现org.apache.kafka.streams.processor.StateRestoreListener接口,并调用KafkaStreams#setGlobalStateRestoreListener注册之。监听器示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.StateRestoreListener; // 监听器会被所有org.apache.kafka.streams.processor.internals.StreamThread实例共享,并必须线程安全 public class ConsoleGlobalRestoreListerner implements StateRestoreListener { // 在恢复过程开始时回调 public void onRestoreStart( final TopicPartition topicPartition, // 主题分区 final String storeName, // 状态存储名称 final long startingOffset, // 需要恢复的起点 final long endingOffset // 需要恢复的终点 ) {} // 在恢复一批次数据后回调 public void onBatchRestored( final TopicPartition topicPartition, final String storeName, final long batchEndOffset, final long numRestored ) {} // 恢复完成后回调 public void onRestoreEnd( final TopicPartition topicPartition, final String storeName, final long totalRestored ) {} } |
禁用changelog之后,状态存储失去容错性:
1 2 3 4 5 6 7 |
// 启用:StateStoreBuilder#withLoggingEnabled(Map<String, String>); // 禁用:StateStoreBuilder#withLoggingDisabled(); KeyValueBytesStoreSupplier countStoreSupplier = Stores.inMemoryKeyValueStore("Counts"); StateStoreBuilder builder = Stores .keyValueStoreBuilder(countStoreSupplier,Serdes.String(),Serdes.Long()) .withLoggingDisabled(); |
启用changelog时,你可以为changelog主题提供配置信息:
1 2 3 4 5 |
Map<String, String> changelogConfig = new HashMap(); // 覆盖主题参数 changelogConfig.put("min.insyc.replicas", "1"); Stores...withLoggingEnabled(changelogConfig); |
除了使用Kafka内置的StateStore实现之外,你还可以自定义状态存储。
状态存储的核心接口是org.apache.kafka.streams.processor.StateStore,一些扩展的接口包括KeyValueStore。
你还需要一个创建状态存储的工厂,其接口是org.apache.kafka.streams.processor.state.StoreSupplier。
你可以提供一个org.apache.kafka.streams.processor.StateRestoreCallback,用于从changelog中恢复状态存储。要注册此回调,实现StateStore的init方法:
1 2 3 |
public void init(ProcessorContext context, StateStore store) { context.register(store, false, stateRestoreCallBackIntance); } |
你可以实现org.apache.kafka.streams.processor.BatchingStateRestoreCallback,代替StateRestoreCallback。后者每次恢复一条数据,前者支持批量式的数据恢复。
抽象类AbstractNotifyingRestoreCallback、AbstractNotifyingBatchingRestoreCallback分别实现了StateRestoreCallback、BatchingStateRestoreCallback接口,并同时实现了StateRestoreListener接口。
开发人员可以利用StreamsBuilder,基于Kafka Streams的领域特定语言(DSL)来构建拓扑。
高级API引入了表的概念,表和流可以看做是相同数据集不同视图:
- Stream as Table:一个流可以看做是一个表的changelog。流中的每条记录都捕获了表的一次状态变更,通过Replay changelog,流可以转变为表。流记录和表行不一定是1:1对应关系,流记录可能经过聚合,更新到表中的一行
- Table as Stream:表可以看做是某个瞬间的、流中每个键的最终值构成的快照。迭代表中的键值对很容易将其转换为流
在Kafka Streams中,状态存储的跨机器复制(容错,基于changelog)就利用了流表二元性。
该接口是对记录的流的抽象,每个记录是无边界的数据集中的一个自包含的数据。
对于发送到KStream的两个记录("Alex", 1)、("Alex", 3),假设流处理程序进行sum(人名统计)操作,则返回结果是4。
该接口是对changelog流的抽象,此流中的每个记录代表了针对某个特定key的数据更新。
对于发送到KTable的两个记录("Alex", 1)、("Alex", 3),相当于对键Alex进行两次更新,返回结果是3。
类似于KTable,但是跨越所有KafkaStreams实例进行复制。 GlobalKTable支持通过key来查询value,在进行Join操作时,需要使用这种查询。
通过读取Kafka主题,即可为Kafka Streams提供输入流。首先你需要实例化一个StreamsBuilder:
1 |
StreamsBuilder builder = new StreamsBuilder(); |
1 2 3 4 |
KStream<String, Long> nameCounts = builder.stream( "names-counts-input-topic", // 输入主题名称 Consumed.with(Serdes.String(), Serdes.Long()) // 指定键值的串行化器 ); |
KStream对应了从输入主题读取的、分区化的记录的流。流处理程序的每个实例,都会消费输入主题的分区的子集,并且在整体上保证所有分区都被消费。
1 2 3 4 5 |
KTable<String, Long> nameCounts = builder.table( Serdes.String(), /* 键串行化器 */ Serdes.Long(), /* 值串行化器 */ "name-counts-input-topic", /* 输入主题 */ "name-counts-partitioned-store" /* 表名 */); |
可以把任何主题看做是changelog,并将其读入到KTable。当:
- 记录的键不存在时,相当于执行INSERT操作
- 记录的键存在,值不为null时,相当于执行UPDATE操作
- 记录的键存在,值为null时,相当于执行DELETE操作
KTable对应了从输入主题读取的、分区化的记录的流。流处理程序的每个实例,都会消费输入主题的分区的子集,并且在整体上保证所有分区都被消费。
你可以为KTable提供一个名称,此名称也是KTable对应的状态存储的名称。只有命名的KTable才支持交互式查询。
1 2 3 4 5 |
GlobalKTable<String, Long> nameCounts = builder.globalTable( Serdes.String(), /* 键串行化器 */ Serdes.Long(), /* 值串行化器 */ "name-counts-input-topic", /* 输入主题 */ "name-counts-global-store" /* 表名 */); |
KStream和KTable支持一系列的转换操作,这些高层操作都可以被转换为1-N个连接在一起的处理器。由于KStream、KTable都是强类型的,因此这些转换操作都以泛型的方式定义。
某些KStream转换操作可以产生一个(例如filter/map)或多个(例如branch)KStream对象,另外一些则产生一个KTable对象(例如aggregation)。
对于KTable来说,所有的转换操作都只能产生另一个KTable。
不依赖于任何状态即可完成转换,不要求流处理器有关联的StateStore。
操作 | 说明 | ||
branch |
IO:KStream → KStream 基于给定的断言集分割KStream,将其分割为1-N个KStream实例。断言按照声明的顺序依次估算,每个记录只被转发到第一个匹配的下游流中:
|
||
filter |
IO:KStream → KStream 或 KTable → KTable 基于给定的断言,针对每个记录进行估算。估算结果为true的记录进入下游流:
|
||
filterNot | 与filter类似,仅仅保留不匹配的 | ||
flatMap |
IO:KStream → KStream 基于一个记录,产生0-N个输出记录:
|
||
flatMapValues | 类似于flatMap,但是保持键不变,可能产生多个键相同的记录 | ||
foreach |
IO:KStream → void 终结性操作,针对每个记录执行无状态的操作 需要注意:操作的副作用(例如对外部系统的写)无法被Kafka跟踪,也就是说无法获得Kafka的处理语义保证 示例: stream.foreach((key, value) -> System.out.println(key + " => " + value)); |
||
groupByKey |
IO:KStream → KGroupedStream 分组是进行流/表的聚合操作的前提。分组保证了数据被正确的分区,保证后续操作的正常进行 和分组相关的一个操作是窗口化。利用窗口化,可以将分组后的记录二次分组,形成一个个窗口,然后以窗口为单位进行聚合、Join 仅当流被标记用于重新分区,则此操作才会导致重新分区。该操作不允许修改键或者键类型 示例:
|
||
groupBy |
IO:KStream → KGroupedStream 或 KTable → KGroupedTable 实际上是selectKey+groupByKey的组合 基于一个新的键来分组记录,新键的类型可能和记录旧的键类型不同。当对表进行分组时,还可以指定新的值、值类型 该操作总是会导致数据的重新分区,因此在可能的情况下你应该优选groupByKey,后者仅在必要的时候分区 示例:
|
||
map |
IO:KStream → KStream 根据一个输入记录产生一个输出记录,你可以修改键值的类型
|
||
mapValues | 类似上面,但是仅仅映射值,键不变 | ||
IO:KStream → void 终结操作,打印记录到输出流中。示例:
|
|||
selectKey |
IO:KStream → KStream 对每个记录分配一个新的键,键类型可能改变。
|
||
toStream |
IO:KTable → KStream 将表转换为流: table.toStream(); |
||
WriteAsText |
IO:KStream → void 终结性操作,将流写出到文件 |
这类转换操作需要依赖于某些状态信息。例如在聚合性操作中,会使用窗口化状态存储来保存上一个窗口的聚合结果。在Join操作中,会使用窗口化状态存储到目前为止接收到的、窗口边界内部的所有记录。
状态存储默认支持容错,如果出现失败,则Kafka Streams会首先恢复所有的状态存储,然后再进行后续的处理。
高级的有状态转换操作包括:聚合、Join,以及针对两者的窗口化支持。
操作 | 说明 | ||
aggregate |
IO:KGroupedStream → KTable 或 KGroupedTable → KTable 滚动聚合(Rolling Aggregation)操作,根据分组键对非窗口化的记录的值进行聚合 当对已分组流进行聚合时,你需要提供初始化器(确定聚合初值)、聚合器adder。当聚合已分组表时,你需要额外提供聚合器subtractor。代码示例:
KGroupedStream的聚合操作的行为:
KGroupedTable的聚合操作的行为:
|
||
windowedBy + aggregate |
IO:KGroupedStream → KTable 窗口化聚合:以窗口为单位,根据分组键,对KGroupedStream中的记录进行聚合操作,并把结果存放到窗口化的KTable 你需要提供初始化器、累加器、窗口定义。如果基于会话进行窗口定义,你需要额外提供聚合器——会话合并器 示例代码:
窗口化聚合操作的行为如下:
|
Leave a Reply