Menu

  • Home
  • Work
    • Cloud
      • Virtualization
      • IaaS
      • PaaS
    • Java
    • Go
    • C
    • C++
    • JavaScript
    • PHP
    • Python
    • Architecture
    • Others
      • Assembly
      • Ruby
      • Perl
      • Lua
      • Rust
      • XML
      • Network
      • IoT
      • GIS
      • Algorithm
      • AI
      • Math
      • RE
      • Graphic
    • OS
      • Linux
      • Windows
      • Mac OS X
    • BigData
    • Database
      • MySQL
      • Oracle
    • Mobile
      • Android
      • IOS
    • Web
      • HTML
      • CSS
  • Life
    • Cooking
    • Travel
    • Gardening
  • Gallery
  • Video
  • Music
  • Essay
  • Home
  • Work
    • Cloud
      • Virtualization
      • IaaS
      • PaaS
    • Java
    • Go
    • C
    • C++
    • JavaScript
    • PHP
    • Python
    • Architecture
    • Others
      • Assembly
      • Ruby
      • Perl
      • Lua
      • Rust
      • XML
      • Network
      • IoT
      • GIS
      • Algorithm
      • AI
      • Math
      • RE
      • Graphic
    • OS
      • Linux
      • Windows
      • Mac OS X
    • BigData
    • Database
      • MySQL
      • Oracle
    • Mobile
      • Android
      • IOS
    • Web
      • HTML
      • CSS
  • Life
    • Cooking
    • Travel
    • Gardening
  • Gallery
  • Video
  • Music
  • Essay

Kafka Streams学习笔记

22
Jun
2017

Kafka Streams学习笔记

By Alex
/ in BigData
/ tags 实时处理
0 Comments
简介

当前比较流行的实时计算框架包括Apache Storm、Apache Spark等。这些框架的功能强大而全面,但是具有以下缺点:

  1. 复杂度高,应对某些简单的工作显得笨重
  2. 部署Storm、Spark等分布式框架需要预留集群支持,增加开发负担 

Kafka Streams是一个实时计算框架,其特点是:

  1. 低门槛:你可以很快的编写一段代码,在单机上运行。要扩容支持生产环境只需要简单的启动多个实例。利用Kafka的并行模型,Kafka Streams能够透明的处理多个实例之间的负载均衡。相比之下,Storm虽然也有本地集群,但是其运行环境和远程集群并不一样
  2. 简单而轻量:轻松的嵌入到任何Java应用中,轻松的和现有打包、部署、操作工具整合
  3. 没有额外的依赖:除了Kafka本身,没有其它依赖
  4. 支持容错的本地状态:可以进行快速有效的有状态操作,例如窗口化Join、聚合
  5. 支持精确一次性处理语义:不管是Streams客户端还是Kafka代理出现失败,都能保证一次且仅一次的处理
  6. 对于单记录的处理,实现毫秒级延迟。支持基于事件时间对一系列记录进行窗口操作
  7. 提供必要的流处理原语,同时提供高层的DSL和低级的Processor API

Kafka Streams针对来自1-N个主题的输入进行持续计算,并把计算结果输出到0-N个输出主题中。

每个KafkaStreams实例可以包含1-N个线程,线程数量在配置中指定,这些线程用于执行数据处理任务。

KafkaStreams实例与其他具有相同Application ID的实例进行协作,这些实例可能分布在同一JVM、不同JVM进程或者不同的机器上。所有相同Application ID的实例的整体,构成一个流处理程序。通过分配输入主题的分区给不同的实例,实现处理任务的划分。如果某个实例宕机,其它实例会瓜分它持有的分区,进行负载再平衡,同时确保任何分区都被消费。

核心概念
流处理拓扑

流是Kafka Streams中最重要的一个抽象,它:

  1. 是一个无边界的、持续更新的数据集
  2. 是一个有序的、容错的、不变的数据记录的序列。数据记录以键值对的形式定义

流处理程序是指,利用Kafka Streams库,以处理器拓扑(Processor Topologies)的形式定义计算逻辑的应用程序。处理器拓扑是节点(流处理器)的有向图,节点通过流连接。

流处理器,以及拓扑节点,它每次从上游流接受一个输入记录,进行转换、处理,可选的释放一或多个记录到下游流中。有两种特殊的流处理器:

  1. Source处理器:它没有上游处理器,通过读取Kafka主题,消费其中的记录,并释放到下游流中
  2. Sink处理器:它没有下游处理器,负责将记录存储到Kafka主题中

拓扑中的节点可以调用任何外部系统,因而有机会将记录存储在外部,而非必须存储在Kafka主题中。

KafkaStreams的计算逻辑可通过两种方式来定义:

  1. 基于 Processor API 定义,生成一个Processor组成的有向无环图(DAG)拓扑。可以访问状态存储
  2. 基于StreamsBuilder定义,此类提供一个高层的DSL。支持map、filter、join、aggregation等常见操作

处理器拓扑仅仅是逻辑的抽象,在运行时,该逻辑拓扑会被实例化,并在应用中复制,以增强并行处理能力。

时间

对于流处理来说时间是个很重要的概念,窗口化等操作依赖时间来确定边界。Kafka Streams中有三种时间:

时间 说明
Event Time/事件时间 事件或者数据记录的产生时间。以GPS位置采集记录为例,事件时间应该是GPS传感器捕获到位置的那个时间
Ingestion Time/吸收时间 数据记录被存入到Kafka分区日志的时间
Process Time/处理时间 数据记录被Kafka Streams处理完毕的时间

事件时间、吸收时间是二选一的。由Kafka配置参数(在代理或者主题级别)确定,从0.10开始时间戳自动嵌入到Kafka消息中。

接口TimestampExtractor用于抽取时间戳,分配给每个记录,此接口的默认实现简单的从Kafka消息中获取上述嵌入时间戳。抽取到的时间戳被称为Stream Time。TimestampExtractor的实现可以提供不同的Stream Time语义,它可以从记录的任意字段推导出时间戳,或者直接使用墙上时间。

当Sink处理器将记录写入到Kafka主题时,也需要为记录授予时间戳,具体方式取决于上下文:

  1. 如果输出记录通过处理某些输入记录而得到,例如在process()中调用了context.forward()方法,输出记录的时间戳直接从输入记录继承
  2. 如果输出记录由周期性函数,例如Punctuator#punctuate()生成,则时间戳使用当前Task的内部时间,此内部时间通过context.timestamp()获取
  3. 对于聚合操作,结果记录的时间戳更新为最后一个参与聚合的输入记录的时间戳
状态

某些流处理程序需要记录一些状态信息,例如,聚合类的操作需要随时暂存当前的聚合结果。Kafka Streams DSL包含很多状态性的操作。

Kafka Stream引入状态存储(State Store)的概念,流处理程序可以针对其进行状态的读写。每个流处理Task都可以嵌入一个或多个状态存储。转台存储可以实现为持久化键值对存储、内存哈希表或者其他的形式。Kafka Stream为本地状态存储提供容错、自动恢复功能。

交互式查询(Interactive Queries)这一特性,允许进程内部、外部的代码对流处理程序创建的状态存储进行只读的操作。

一次处理保证

关于流处理框架的最常见的提问是,该框架能否保证每个记录被处理一次且仅一次,即使在处理过程中出现失败的情况下?

某些业务场景不允许数据丢失或者重复,因此常常引入面向批处理的框架,配合流处理管线,这被称为Lambda架构。

在0.11版本之前,Kafka仅仅提供至少一次性递送保证,因此任何基于Kafka的流处理系统都不能保证端对端的精确一次性。

从0.11版本开始,Kafka支持生产者向不同的分区甚至主题进行事务性、幂等性的消息发送,利用这一特性,Kafka Streams可以支持精确一次性处理:

  1. 对于任何从源主题读取来的记录,其处理结果在目的主题上精确的体现一次
  2. 对于任何从源主题读取来的记录,其处理结果在有状态操作的状态存储上精确的体现一次

与其他实时处理框架不同的是,Kafka Streams和底层的Kafka存储机制紧密集成,确保对输入主题偏移量的提交、对状态存储的更新、对输出主题的写入能够原子的完成。

要启用精确一次性保证,需要配置processing.guarantee=exactly_once。默认值是at_least_once。

入门

本章引入一个简单的,基于StreamBuilder的流处理程序,作为入门的例子。

流处理程序

下面是一个人名个数统计的例子,它能够在生产环境中弹性(动态增加硬件)、可扩容的部署:

Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package cc.gmem.study.kafka;
 
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
 
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
 
public class NameCountApplication {
 
    private static final Logger LOGGER = LogManager.getLogger( NameCountApplication.class );
 
    public static void main( final String[] args ) throws Exception {
        Properties config = new Properties();
        // 应用的标识符,不同的实例依据此标识符相互发现
        config.put( StreamsConfig.APPLICATION_ID_CONFIG, "names-counter-application" );
        // 启动时使用的Kafka服务器
        config.put( StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-1.gmem.cc:9092" );
        // 键值串行化类
        config.put( StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass() );
        config.put( StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass() );
        // High Level DSL for building topologies
        StreamsBuilder builder = new StreamsBuilder();
        // KStream是记录(KeyValue)流的抽象
        KStream<String, String> nameBatches = builder.stream( "names" );
        KTable<String, Long> nameCounts = nameBatches
                // 一到多映射,分割字符串
                .flatMapValues( nameBatch -> Arrays.asList( nameBatch.split( "\\W+" ) ) )
                // 根据人名分组
                .groupBy( ( key, name ) -> name )
                // 进行聚合,结果存放到StateStore中
                .count( Materialized.as( "names-count-store" ) );
        // 输出到目标
        nameCounts.toStream().to( "names-count", Produced.with( Serdes.String(), Serdes.Long() ) );
        // 构建流处理程序并启动
        KafkaStreams streams = new KafkaStreams( builder.build(), config );
        LOGGER.trace( "Prepare to start stream processing." );
        streams.start();
 
        TimeUnit.DAYS.sleep( 1 );  // 阻塞主线程
    }
}
运行和测试

首先,创建输入、输出主题:

Shell
1
2
kafka-topics.sh --create --zookeeper zk-1.gmem.cc:2181/kafka --replication-factor 1 --partitions 1 --topic names
kafka-topics.sh --create --zookeeper zk-1.gmem.cc:2181/kafka --replication-factor 1 --partitions 1 --topic names-count

然后,通过命令行准备一批空格分隔的人名。并准备好输出主题的消费者:

Shell
1
2
3
4
topic-consume names-count --formatter kafka.tools.DefaultMessageFormatter
    --property print.key=true --property print.value=true
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

启动上面的流处理程序,上面的消费者开始输出:

Shell
1
2
3
4
5
6
7
Meng    2
Zhen    2
Cai    4
Dang    3
Ya    4
Alex    5
Alex    6  # 计数更新,发布新记录

每个名字的计数更新后,流处理程序都会在names-count上发布一个记录,其键位人名,值为计数。记录被逐个打印在消费者的标准输出中。 

低级API

本章介绍Kafka Streams的底层API。

Topology

拓扑是各种处理器和Kafka主题共同构成的网络,要创建拓扑可以参考:

Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Topology topology = new Topology();
// 指定拓扑的输入,也就是Kafka主题
topology.addSource( "SOURCE", "src-topic" )
        // 添加一个处理器PROCESS1,其上游为拓扑输入(通过名称引用)
        .addProcessor( "PROCESS1", () -> new Processor1(), "SOURCE" )
        // 添加另一个处理器PROCESS2,以PROCESS1为上游
        .addProcessor( "PROCESS2", () -> new Processor2(), "PROCESS1" )
        // 添加另一个处理器PROCESS3,仍然以PROCESS1为上游,注意拓扑分叉了
        .addProcessor( "PROCESS3", () -> new Processor3(), "PROCESS1" )
        // 添加一个输出处理器,输出到sink-topic1,以PROCESS1为上游
        .addSink( "SINK1", "sink-topic1", "PROCESS1" )
        // 添加一个输出处理器,输出到sink-topic2,以PROCESS2为上游
        .addSink( "SINK2", "sink-topic2", "PROCESS2" )
        // 添加一个输出处理器,输出到sink-topic3,以PROCESS3为上游
        .addSink( "SINK3", "sink-topic3", "PROCESS3" );
Processor

该接口用于定义一个流处理器,也就是处理器拓扑中的节点。流处理器以参数化类型的方式限定了其键、值的类型。你可以定义任意数量的流处理器,并且连同它们关联的状态存储一起,组装出拓扑。

Processor.process()方法针对收到的每一个记录进行处理。Processor.init()方法实例化了一个ProcessorContext,流处理器可以调用上下文:

  1. context().schedule,调度一个Punctuation函数,周期性执行
  2. context().forward,转发新的或者被修改的键值对给下游处理器
  3. context().commit,提交当前处理进度

下面是基于Processor API的人名个数统计处理器示例:

Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package cc.gmem.study.kafka.streams.low;
 
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.PunctuationType;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
 
public class NameCounterProcessor implements Processor<String, String> {
 
    private ProcessorContext context;
 
    private KeyValueStore<String, Long> kvStore;
 
    @Override
    public void init( ProcessorContext context ) {
        // 保存引用,类似于Storm的TopologyContext
        this.context = context;
        // 从上下文中取回一个状态存储
        this.kvStore = (KeyValueStore<String, Long>) context.getStateStore( "NameCounts" );
        // 以墙上时间为准,每秒执行Punctuator逻辑
        this.context.schedule( 1000, PunctuationType.WALL_CLOCK_TIME, timestamp -> {
            NameCounterProcessor.this.punctuate( timestamp );
        } );
    }
 
    /**
     * 接收一个记录(人名列表)并处理
     *
     * @param dummy 记录的键,无用
     * @param line  记录的值
     */
    @Override
    public void process( String dummy, String line ) {
        String[] names = line.toLowerCase().split( " " );
 
        // 在键值存储中更新人名计数
        for ( String name : names ) {
            Long oldCount = this.kvStore.get( name );
 
            if ( oldCount == null ) {
                this.kvStore.put( name, 1L );
            } else {
                this.kvStore.put( name, oldCount + 1L );
            }
        }
    }
 
    @Override
    public void punctuate( long timestamp ) {
        // 获得键值存储的迭代器
        KeyValueIterator<String, Long> iter = this.kvStore.all();
 
        while ( iter.hasNext() ) {
            KeyValue<String, Long> entry = iter.next();
            // 转发记录给下游处理器
            context.forward( entry.key, entry.value.toString() );
        }
 
        /**
         * 调用者必须要负责关闭状态存储上的迭代器
         * 否则可能(取决于底层状态存储的实现)导致内存、文件句柄的泄漏
         */
        iter.close();
 
        // 请求提交当前流状态(消费进度)
        context.commit();
    }
 
    @Override
    public void close() {
        // 在此关闭所有持有的资源,但是状态存储不需要关闭,由Kafka Stream自己维护其生命周期
    }
}
StateStore
使用状态存储

要位拓扑中每个Processor提供状态存储,调用:

Java
1
2
3
4
5
Topology topology = new Topology();
topology.addSource("Source", "source-topic")
    .addProcessor("Process", () -> new WordCountProcessor(), "Source")
    // 为处理器Process提供一个状态存储
    .addStateStore(countStoreSupplier, "Process");
changelog

为了支持容错、支持无数据丢失的状态迁移, 状态存储可以持续不断的、在后台备份到Kafka主题中。上述用于主题被称为状态存储的changelog主题,或者直接叫changelog。

你可以启用或者禁用状态存储的备份特性。

持久性的KV存储是容错的,它备份在一个紧凑格式的changelog主题中。使用紧凑格式的原因是:

  1. 防止主题无限增长
  2. 减少主题占用的存储空间
  3. 当状态存储需要通过Changelog恢复时,缩短需要的时间

持久性的窗口化存储也是容错的,它基于紧凑格式、支持删除机制的主题备份。窗口化存储的changelog的键的一部分是窗口时间戳,过期的窗口对应的段会被Kafka的日志清理器清理。changelog的默认存留时间是Windows#maintainMs() + 1天。指定StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG可以覆盖之。

监控状态恢复

启动应用程序时,状态存储通常不需要根据changelog来恢复,直接加载磁盘上持久化的数据就可以。但以下场景中:

  1. 宕机导致本地状态丢失
  2. 运行在无状态环境下的应用程序重启

状态存储需要基于changelog进行完整的恢复。

如果changelog中的数据量很大,则恢复过程可能相当的耗时。在恢复完成之前,处理器拓扑不能处理新的数据。

要监控状态存储的恢复进度,你需要实现org.apache.kafka.streams.processor.StateRestoreListener接口,并调用KafkaStreams#setGlobalStateRestoreListener注册之。监听器示例:

Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.streams.processor.StateRestoreListener;
// 监听器会被所有org.apache.kafka.streams.processor.internals.StreamThread实例共享,并必须线程安全
public class ConsoleGlobalRestoreListerner implements StateRestoreListener {
    // 在恢复过程开始时回调
    public void onRestoreStart(
            final TopicPartition topicPartition,  // 主题分区
            final String storeName,               // 状态存储名称
            final long startingOffset,            // 需要恢复的起点
            final long endingOffset               // 需要恢复的终点
    ) {}
 
    // 在恢复一批次数据后回调
    public void onBatchRestored( final TopicPartition topicPartition,
            final String storeName,
            final long batchEndOffset,
            final long numRestored
    ) {}
 
    // 恢复完成后回调
    public void onRestoreEnd( final TopicPartition topicPartition,
            final String storeName,
            final long totalRestored ) {}
}
启/禁changelog

禁用changelog之后,状态存储失去容错性:

Java
1
2
3
4
5
6
7
// 启用:StateStoreBuilder#withLoggingEnabled(Map<String, String>);
// 禁用:StateStoreBuilder#withLoggingDisabled();
 
KeyValueBytesStoreSupplier countStoreSupplier = Stores.inMemoryKeyValueStore("Counts");
StateStoreBuilder builder = Stores
    .keyValueStoreBuilder(countStoreSupplier,Serdes.String(),Serdes.Long())
    .withLoggingDisabled();

启用changelog时,你可以为changelog主题提供配置信息:

Java
1
2
3
4
5
Map<String, String> changelogConfig = new HashMap();
// 覆盖主题参数
changelogConfig.put("min.insyc.replicas", "1");
 
Stores...withLoggingEnabled(changelogConfig);
实现状态存储 

除了使用Kafka内置的StateStore实现之外,你还可以自定义状态存储。 

状态存储的核心接口是org.apache.kafka.streams.processor.StateStore,一些扩展的接口包括KeyValueStore。

你还需要一个创建状态存储的工厂,其接口是org.apache.kafka.streams.processor.state.StoreSupplier。

你可以提供一个org.apache.kafka.streams.processor.StateRestoreCallback,用于从changelog中恢复状态存储。要注册此回调,实现StateStore的init方法:

Java
1
2
3
public void init(ProcessorContext context, StateStore store) {
    context.register(store, false, stateRestoreCallBackIntance);
}

你可以实现org.apache.kafka.streams.processor.BatchingStateRestoreCallback,代替StateRestoreCallback。后者每次恢复一条数据,前者支持批量式的数据恢复。

抽象类AbstractNotifyingRestoreCallback、AbstractNotifyingBatchingRestoreCallback分别实现了StateRestoreCallback、BatchingStateRestoreCallback接口,并同时实现了StateRestoreListener接口。

高级API 

开发人员可以利用StreamsBuilder,基于Kafka Streams的领域特定语言(DSL)来构建拓扑。

流表二元性

高级API引入了表的概念,表和流可以看做是相同数据集不同视图:

  1. Stream as Table:一个流可以看做是一个表的changelog。流中的每条记录都捕获了表的一次状态变更,通过Replay changelog,流可以转变为表。流记录和表行不一定是1:1对应关系,流记录可能经过聚合,更新到表中的一行
  2. Table as Stream:表可以看做是某个瞬间的、流中每个键的最终值构成的快照。迭代表中的键值对很容易将其转换为流

在Kafka Streams中,状态存储的跨机器复制(容错,基于changelog)就利用了流表二元性。

KStream

该接口是对记录的流的抽象,每个记录是无边界的数据集中的一个自包含的数据。 

对于发送到KStream的两个记录("Alex", 1)、("Alex", 3),假设流处理程序进行sum(人名统计)操作,则返回结果是4。

KTable

该接口是对changelog流的抽象,此流中的每个记录代表了针对某个特定key的数据更新。 

对于发送到KTable的两个记录("Alex", 1)、("Alex", 3),相当于对键Alex进行两次更新,返回结果是3。

GlobalKTable

类似于KTable,但是跨越所有KafkaStreams实例进行复制。 GlobalKTable支持通过key来查询value,在进行Join操作时,需要使用这种查询。

创建源流 

通过读取Kafka主题,即可为Kafka Streams提供输入流。首先你需要实例化一个StreamsBuilder:

Java
1
StreamsBuilder builder = new StreamsBuilder();
创建KStream 
Java
1
2
3
4
KStream<String, Long> nameCounts = builder.stream(
    "names-counts-input-topic",  // 输入主题名称
    Consumed.with(Serdes.String(), Serdes.Long()) // 指定键值的串行化器
);

KStream对应了从输入主题读取的、分区化的记录的流。流处理程序的每个实例,都会消费输入主题的分区的子集,并且在整体上保证所有分区都被消费。

创建KTable 
Java
1
2
3
4
5
KTable<String, Long> nameCounts = builder.table(
    Serdes.String(), /* 键串行化器 */
    Serdes.Long(),   /* 值串行化器 */
    "name-counts-input-topic", /* 输入主题 */
    "name-counts-partitioned-store" /* 表名 */);

可以把任何主题看做是changelog,并将其读入到KTable。当:

  1. 记录的键不存在时,相当于执行INSERT操作
  2. 记录的键存在,值不为null时,相当于执行UPDATE操作
  3. 记录的键存在,值为null时,相当于执行DELETE操作

KTable对应了从输入主题读取的、分区化的记录的流。流处理程序的每个实例,都会消费输入主题的分区的子集,并且在整体上保证所有分区都被消费。

你可以为KTable提供一个名称,此名称也是KTable对应的状态存储的名称。只有命名的KTable才支持交互式查询。

创建GlobalKTable
Java
1
2
3
4
5
GlobalKTable<String, Long> nameCounts = builder.globalTable(
    Serdes.String(), /* 键串行化器 */
    Serdes.Long(),   /* 值串行化器 */
    "name-counts-input-topic", /* 输入主题 */
    "name-counts-global-store" /* 表名 */);
对流进行转换 

KStream和KTable支持一系列的转换操作,这些高层操作都可以被转换为1-N个连接在一起的处理器。由于KStream、KTable都是强类型的,因此这些转换操作都以泛型的方式定义。

某些KStream转换操作可以产生一个(例如filter/map)或多个(例如branch)KStream对象,另外一些则产生一个KTable对象(例如aggregation)。

对于KTable来说,所有的转换操作都只能产生另一个KTable。

无状态转换

不依赖于任何状态即可完成转换,不要求流处理器有关联的StateStore。

操作 说明
branch

IO:KStream → KStream

基于给定的断言集分割KStream,将其分割为1-N个KStream实例。断言按照声明的顺序依次估算,每个记录只被转发到第一个匹配的下游流中:

Java
1
2
3
4
5
6
KStream<String, Long> stream = ...;
KStream<String, Long>[] branches = stream.branch(
        (key, value) -> key.startsWith("A"), /* 以A开头的键  */
        (key, value) -> key.startsWith("B"), /* 以B开头的键 */
        (key, value) -> true                 /* 所有其它的记录均发往此流  */
);
filter

IO:KStream → KStream 或 KTable → KTable

基于给定的断言,针对每个记录进行估算。估算结果为true的记录进入下游流:

Java
1
2
3
4
5
// 仅保留正数值
stream.filter((key, value) -> value > 0);
// 针对一个KTable进行过滤,结果物化到一个StageStore中
Materialized m = Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("filtered")
table.filter((key, value) -> value != 0, m);
filterNot 与filter类似,仅仅保留不匹配的
flatMap

IO:KStream → KStream

基于一个记录,产生0-N个输出记录:

Java
1
2
3
4
5
6
7
8
9
KStream<String, Integer> transformed = stream.flatMap(
    (key, value) -> {
        // 键值对的列表
        List<KeyValue<String, Integer>> result = new LinkedList<>();
        result.add(KeyValue.pair(value.toUpperCase(), 1000));
        result.add(KeyValue.pair(value.toLowerCase(), 9000));
        return result;
    }
);
flatMapValues 类似于flatMap,但是保持键不变,可能产生多个键相同的记录
foreach

IO:KStream → void

终结性操作,针对每个记录执行无状态的操作

需要注意:操作的副作用(例如对外部系统的写)无法被Kafka跟踪,也就是说无法获得Kafka的处理语义保证

示例: stream.foreach((key, value) -> System.out.println(key + " => " + value)); 

groupByKey

IO:KStream → KGroupedStream

分组是进行流/表的聚合操作的前提。分组保证了数据被正确的分区,保证后续操作的正常进行

和分组相关的一个操作是窗口化。利用窗口化,可以将分组后的记录二次分组,形成一个个窗口,然后以窗口为单位进行聚合、Join

仅当流被标记用于重新分区,则此操作才会导致重新分区。该操作不允许修改键或者键类型

示例:

Java
1
2
3
4
5
6
KGroupedStream<byte[], String> groupedStream = stream.groupByKey(
    // 如果键值的类型不匹配配置的默认串行化器,则需要明确指定:
    Serialized.with(
         Serdes.ByteArray(),
         Serdes.String())
);
groupBy

IO:KStream → KGroupedStream 或 KTable → KGroupedTable

实际上是selectKey+groupByKey的组合

基于一个新的键来分组记录,新键的类型可能和记录旧的键类型不同。当对表进行分组时,还可以指定新的值、值类型

该操作总是会导致数据的重新分区,因此在可能的情况下你应该优选groupByKey,后者仅在必要的时候分区

示例:

Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
KGroupedStream<String, String> groupedStream = stream.groupBy(
    (key, value) -> value,  // 产生键值对value:value并依此分组
    Serialize.with(
         Serdes.String(), /* 键的类型发生改变 */
         Serdes.String())  /* value */
);
 
KGroupedTable<String, Integer> groupedTable = table.groupBy(
    // 产生键值对  value:length(value),并依此分组
    (key, value) -> KeyValue.pair(value, value.length()),
    Serialized.with(
        Serdes.String(), /* 键的类发生改变 */
        Serdes.Integer()) /* 值的类型发生改变  */
);
map

IO:KStream → KStream

根据一个输入记录产生一个输出记录,你可以修改键值的类型

Java
1
2
3
KStream<byte[], String> stream = ...;
KStream<String, Integer> transformed = stream.map(
    (key, value) -> KeyValue.pair(value.toLowerCase(), value.length()));
mapValues 类似上面,但是仅仅映射值,键不变 
print

IO:KStream → void

终结操作,打印记录到输出流中。示例:

Java
1
stream.print(Printed.toFile("stream.out"));
selectKey

IO:KStream → KStream

对每个记录分配一个新的键,键类型可能改变。

Java
1
KStream<String, String> rekeyed = stream.selectKey((key, value) -> value.split(" ")[0])
toStream

IO:KTable → KStream

将表转换为流: table.toStream();

WriteAsText

IO:KStream → void

终结性操作,将流写出到文件

有状态转换 

这类转换操作需要依赖于某些状态信息。例如在聚合性操作中,会使用窗口化状态存储来保存上一个窗口的聚合结果。在Join操作中,会使用窗口化状态存储到目前为止接收到的、窗口边界内部的所有记录。

状态存储默认支持容错,如果出现失败,则Kafka Streams会首先恢复所有的状态存储,然后再进行后续的处理。

高级的有状态转换操作包括:聚合、Join,以及针对两者的窗口化支持。

操作 说明
aggregate

IO:KGroupedStream → KTable 或 KGroupedTable → KTable

滚动聚合(Rolling Aggregation)操作,根据分组键对非窗口化的记录的值进行聚合

当对已分组流进行聚合时,你需要提供初始化器(确定聚合初值)、聚合器adder。当聚合已分组表时,你需要额外提供聚合器subtractor。代码示例:

Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
KGroupedStream<Bytes, String> groupedStream = null;
KGroupedTable<Bytes, String> groupedTable = null;
// 聚合一个分组流,值类型从字符串变为整数
KTable<Bytes, Long> aggregatedStream = groupedStream.aggregate(
        () -> 0L, /* 初始化器 */
        ( aggKey, newValue, aggValue ) -> aggValue + newValue.length(), /* 累加器 */
        Serdes.Long(), /* 值的串行化器 */
        "aggregated-stream-store" /* 状态存储的名称 */ );
 
KTable<Bytes, Long> aggregatedTable = groupedTable.aggregate(
        () -> 0L, /* 初始化器 */
        ( aggKey, newValue, aggValue ) -> aggValue + newValue.length(), /* 累加器 */
        ( aggKey, oldValue, aggValue ) -> aggValue - oldValue.length(), /* 减法器 */
        Serdes.Long(), /* 值的串行化器 */
        "aggregated-table-store" /* 状态存储的名称 */ );

KGroupedStream的聚合操作的行为:

  1. 值为null的记录被忽略
  2. 当首次收到某个新的记录键时,初始化器被调用
  3. 每当接收到非null值的记录时,累加器被调用

KGroupedTable的聚合操作的行为:

  1. 值为null的记录被忽略
  2. 当首次收到某个新的记录键时,初始化器被调用(在调用累加器/减法器之前)。与KGroupedStream不同,随着时间的推移,针对一个键,可能调用初始化器多次。只要接收到目标键的墓碑记录
  3. 当首次收到某个键的非null值时(INSERT操作),调用累加器
  4. 当非首次收到某个键的非null值时(UPDATE操作):
    1. 调用减法器,传入存储在KTable表中的旧值
    2. 调用累加器,传入刚刚接收到的新值
    3. 上述两个聚合器的执行顺序未定义
  5. 当接收到墓碑记录(DELETE操作)亦即null值的记录时,调用减法器
  6. 不论何时,减法器返回null时都会导致相应的键从结果KTable表中删除。遇到相同键的下一个记录时,会执行第3步的行为

windowedBy

+

aggregate

IO:KGroupedStream → KTable

窗口化聚合:以窗口为单位,根据分组键,对KGroupedStream中的记录进行聚合操作,并把结果存放到窗口化的KTable

你需要提供初始化器、累加器、窗口定义。如果基于会话进行窗口定义,你需要额外提供聚合器——会话合并器

示例代码:

Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
KGroupedStream<String, Long> groupedStream = null;
 
// 基于时间的窗口化(滚动窗口)
KTable<Windowed<String>, Long> timeWindowedAggregatedStream = groupedStream
        .windowedBy( TimeWindows.of( TimeUnit.MINUTES.toMillis( 5 ) ) )
        .aggregate(
                () -> 0L, /* 初始化器 */
                ( aggKey, newValue, aggValue ) -> aggValue + newValue, /* 累加器 */
                /* 状态存储 */
                Materialized.<String, Long, WindowStore<Bytes, byte[]>>as( "time-windowed-aggregated-stream-store" )
                        .withValueSerde( Serdes.Long() ) );
// 基于会话的窗口化
KTable<Windowed<String>, Long> sessionizedAggregatedStream = groupedStream
        .windowedBy( SessionWindows.with( TimeUnit.MINUTES.toMillis( 5 ) ) ) /* 窗口定义 */
        .aggregate(
                () -> 0L, /* 初始化器 */
                ( aggKey, newValue, aggValue ) -> aggValue + newValue, /* 累加器 */
                ( aggKey, leftAggValue, rightAggValue ) -> leftAggValue + rightAggValue, /* 会话合并器 */
                Materialized.<String, Long, SessionStore<Bytes, byte[]>>as( "sessionized-aggregated-stream-store" ).withValueSerde( Serdes.Long() ) );

窗口化聚合操作的行为如下:

  1. 类似于非窗口化的聚合,但是操作应用到每个窗口
  2. 键为null的记录被忽略
  3. 对于一个给定的窗口,收到某个键的第一个记录时,调用初始化器
  4. 对于一个给定的窗口,收到非空值记录时,调用累加器
  5. 当使用基于会话的窗口时,每当两个会话被合并时,会话合并器被调用

 

 

 

 

 

 

 

 

 

 

 

 

 

 

← Apache Kafka学习笔记
Apache Storm学习笔记 →

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code class="" title="" data-url=""> <del datetime=""> <em> <i> <q cite=""> <strike> <strong> <pre class="" title="" data-url=""> <span class="" title="" data-url="">

Related Posts

  • Apache Storm学习笔记
  • OpenTSDB学习笔记
  • 基于EFK构建日志分析系统
  • ElasticSearch学习笔记
  • Apache Kafka学习笔记

Recent Posts

  • Investigating and Solving the Issue of Failed Certificate Request with ZeroSSL and Cert-Manager
  • A Comprehensive Study of Kotlin for Java Developers
  • 背诵营笔记
  • 利用LangChain和语言模型交互
  • 享学营笔记
ABOUT ME

汪震 | Alex Wong

江苏淮安人,现居北京。目前供职于腾讯云,专注容器方向。

GitHub:gmemcc

Git:git.gmem.cc

Email:gmemjunk@gmem.cc@me.com

ABOUT GMEM

绿色记忆是我的个人网站,域名gmem.cc中G是Green的简写,MEM是Memory的简写,CC则是我的小天使彩彩名字的简写。

我在这里记录自己的工作与生活,同时和大家分享一些编程方面的知识。

GMEM HISTORY
v2.00:微风
v1.03:单车旅行
v1.02:夏日版
v1.01:未完成
v0.10:彩虹天堂
v0.01:阳光海岸
MIRROR INFO
Meta
  • Log in
  • Entries RSS
  • Comments RSS
  • WordPress.org
Recent Posts
  • Investigating and Solving the Issue of Failed Certificate Request with ZeroSSL and Cert-Manager
    In this blog post, I will walk ...
  • A Comprehensive Study of Kotlin for Java Developers
    Introduction Purpose of the Study Understanding the Mo ...
  • 背诵营笔记
    Day 1 Find Your Greatness 原文 Greatness. It’s just ...
  • 利用LangChain和语言模型交互
    LangChain是什么 从名字上可以看出来,LangChain可以用来构建自然语言处理能力的链条。它是一个库 ...
  • 享学营笔记
    Unit 1 At home Lesson 1 In the ...
  • K8S集群跨云迁移
    要将K8S集群从一个云服务商迁移到另外一个,需要解决以下问题: 各种K8S资源的迁移 工作负载所挂载的数 ...
  • Terraform快速参考
    简介 Terraform用于实现基础设施即代码(infrastructure as code)—— 通过代码( ...
  • 草缸2021
    经过四个多月的努力,我的小小荷兰景到达极致了状态。

  • 编写Kubernetes风格的APIServer
    背景 前段时间接到一个需求做一个工具,工具将在K8S中运行。需求很适合用控制器模式实现,很自然的就基于kube ...
  • 记录一次KeyDB缓慢的定位过程
    环境说明 运行环境 这个问题出现在一套搭建在虚拟机上的Kubernetes 1.18集群上。集群有三个节点: ...
  • eBPF学习笔记
    简介 BPF,即Berkeley Packet Filter,是一个古老的网络封包过滤机制。它允许从用户空间注 ...
  • IPVS模式下ClusterIP泄露宿主机端口的问题
    问题 在一个启用了IPVS模式kube-proxy的K8S集群中,运行着一个Docker Registry服务 ...
  • 念爷爷
      今天是爷爷的头七,十二月七日、阴历十月廿三中午,老人家与世长辞。   九月初,回家看望刚动完手术的爸爸,发

  • 6 杨梅坑

  • liuhuashan
    深圳人才公园的网红景点 —— 流花山

  • 1 2020年10月拈花湾

  • 内核缺陷触发的NodePort服务63秒延迟问题
    现象 我们有一个新创建的TKE 1.3.0集群,使用基于Galaxy + Flannel(VXLAN模式)的容 ...
  • Galaxy学习笔记
    简介 Galaxy是TKEStack的一个网络组件,支持为TKE集群提供Overlay/Underlay容器网 ...
TOPLINKS
  • Zitahli's blue 91 people like this
  • 梦中的婚礼 64 people like this
  • 汪静好 61 people like this
  • 那年我一岁 36 people like this
  • 为了爱 28 people like this
  • 小绿彩 26 people like this
  • 杨梅坑 6 people like this
  • 亚龙湾之旅 1 people like this
  • 汪昌博 people like this
  • 彩虹姐姐的笑脸 24 people like this
  • 2013年11月香山 10 people like this
  • 2013年7月秦皇岛 6 people like this
  • 2013年6月蓟县盘山 5 people like this
  • 2013年2月梅花山 2 people like this
  • 2013年淮阴自贡迎春灯会 3 people like this
  • 2012年镇江金山游 1 people like this
  • 2012年徽杭古道 9 people like this
  • 2011年清明节后扬州行 1 people like this
  • 2008年十一云龙公园 5 people like this
  • 2008年之秋忆 7 people like this
  • 老照片 13 people like this
  • 火一样的六月 16 people like this
  • 发黄的相片 3 people like this
  • Cesium学习笔记 90 people like this
  • IntelliJ IDEA知识集锦 59 people like this
  • 基于Kurento搭建WebRTC服务器 38 people like this
  • Bazel学习笔记 37 people like this
  • PhoneGap学习笔记 32 people like this
  • NaCl学习笔记 32 people like this
  • 使用Oracle Java Mission Control监控JVM运行状态 29 people like this
  • Ceph学习笔记 27 people like this
  • 基于Calico的CNI 27 people like this
  • Three.js学习笔记 24 people like this
Tag Cloud
ActiveMQ AspectJ CDT Ceph Chrome CNI Command Cordova Coroutine CXF Cygwin DNS Docker eBPF Eclipse ExtJS F7 FAQ Groovy Hibernate HTTP IntelliJ IO编程 IPVS JacksonJSON JMS JSON JVM K8S kernel LB libvirt Linux知识 Linux编程 LOG Maven MinGW Mock Monitoring Multimedia MVC MySQL netfs Netty Nginx NIO Node.js NoSQL Oracle PDT PHP Redis RPC Scheduler ServiceMesh SNMP Spring SSL svn Tomcat TSDB Ubuntu WebGL WebRTC WebService WebSocket wxWidgets XDebug XML XPath XRM ZooKeeper 亚龙湾 单元测试 学习笔记 实时处理 并发编程 彩姐 性能剖析 性能调优 文本处理 新特性 架构模式 系统编程 网络编程 视频监控 设计模式 远程调试 配置文件 齐塔莉
Recent Comments
  • qg on Istio中的透明代理问题
  • heao on 基于本地gRPC的Go插件系统
  • 黄豆豆 on Ginkgo学习笔记
  • cloud on OpenStack学习笔记
  • 5dragoncon on Cilium学习笔记
  • Archeb on 重温iptables
  • C/C++编程:WebSocketpp(Linux + Clion + boostAsio) – 源码巴士 on 基于C/C++的WebSocket库
  • jerbin on eBPF学习笔记
  • point on Istio中的透明代理问题
  • G on Istio中的透明代理问题
  • 绿色记忆:Go语言单元测试和仿冒 on Ginkgo学习笔记
  • point on Istio中的透明代理问题
  • 【Maven】maven插件开发实战 – IT汇 on Maven插件开发
  • chenlx on eBPF学习笔记
  • Alex on eBPF学习笔记
  • CFC4N on eBPF学习笔记
  • 李运田 on 念爷爷
  • yongman on 记录一次KeyDB缓慢的定位过程
  • Alex on Istio中的透明代理问题
  • will on Istio中的透明代理问题
  • will on Istio中的透明代理问题
  • haolipeng on 基于本地gRPC的Go插件系统
  • 吴杰 on 基于C/C++的WebSocket库
©2005-2025 Gmem.cc | Powered by WordPress | 京ICP备18007345号-2