Menu

  • Home
  • Work
    • Cloud
      • Virtualization
      • IaaS
      • PaaS
    • Java
    • Go
    • C
    • C++
    • JavaScript
    • PHP
    • Python
    • Architecture
    • Others
      • Assembly
      • Ruby
      • Perl
      • Lua
      • Rust
      • XML
      • Network
      • IoT
      • GIS
      • Algorithm
      • AI
      • Math
      • RE
      • Graphic
    • OS
      • Linux
      • Windows
      • Mac OS X
    • BigData
    • Database
      • MySQL
      • Oracle
    • Mobile
      • Android
      • IOS
    • Web
      • HTML
      • CSS
  • Life
    • Cooking
    • Travel
    • Gardening
  • Gallery
  • Video
  • Music
  • Essay
  • Home
  • Work
    • Cloud
      • Virtualization
      • IaaS
      • PaaS
    • Java
    • Go
    • C
    • C++
    • JavaScript
    • PHP
    • Python
    • Architecture
    • Others
      • Assembly
      • Ruby
      • Perl
      • Lua
      • Rust
      • XML
      • Network
      • IoT
      • GIS
      • Algorithm
      • AI
      • Math
      • RE
      • Graphic
    • OS
      • Linux
      • Windows
      • Mac OS X
    • BigData
    • Database
      • MySQL
      • Oracle
    • Mobile
      • Android
      • IOS
    • Web
      • HTML
      • CSS
  • Life
    • Cooking
    • Travel
    • Gardening
  • Gallery
  • Video
  • Music
  • Essay

Apache Kafka学习笔记

13
Jun
2017

Apache Kafka学习笔记

By Alex
/ in BigData
0 Comments
简介

Apache Kafka(音标/'ka:fka:/)是一个分布式的实时数据处理的基础平台,能够处理每秒百万条数据。它具有三大功能:

  1. 订阅/发布:类似于传统MOM的功能,将队列、主题合二为一
  2. 流处理:支持编写可扩容的流处理程序,对实时事件做出响应
  3. 存储:安全的存储数据流,支持分布式、复制的、容错等特性

Kafka非常适用于以下几类应用场景:

  1. 构建能够可靠的在系统或应用程序之间收发实时流的数据管线
  2. 构建能够转换、处理流数据的实时应用程序

Kafka天生使用集群架构,由1-N个服务器构成的集群组成。Kafka集群对流记录进行分类存储,每个类别叫做主题(Topic)。每个流记录包含一个键、一个值、一个时间戳。

Kafka提供四类核心API:

  1. Producer API:允许应用程序发布流记录到Kafka主题
  2. Consumer API:允许应用程序订阅主题,并处理发布到这些主题的流记录
  3. Streams API:允许一个应用程序作为流处理器(Strem Processor),消费来自1-N个主题的输入流,并向1-N个主题释放输出流 —— 也就是高效的进行流转换
  4. Connector API: 允许构建可重用的生产者、消费者,把Kafka和既有的应用程序和数据系统连接起来。例如,针对RDBMS的连接器能够捕获针对表的每一个写操作

客户端和Kafka的通信协议基于简单、高效、语言无关的方式设计,以TCP协议为基础,支持版本化(向后兼容)。Kafka提供了Java以及其它语言的客户端。

主题和日志

主题即记录的流,Kafka中的主题可以具有多个订阅者。对于每个主题,Kafka集群维护分区化(partitioned)存储的日志:

  1. 每个分区是有序的、不可变的记录的序列,仅仅支持向序列的尾部添加记录
  2. 分区中的每个记录被分配一个序列号 —— offset,此序列号在分区中唯一的标识记录
  3. 集群在一个可配置的时间段内保留记录,不管记录有没有消费过,这个行为不同于传统MOM,后者不保留消费过的消息。不管配置的保留时间有多长,都不会影响Kafka的性能,这意味着你可以将Kafka作为数据库使用

每个消费者需要记录它当前正在消费的记录的偏移量(游标),通常情况下,消费者会线性的向前单步移动偏移量,这和传统MOM消费者的行为对应。但是,Kafka允许消费者任意移动游标,这个特性可用于实现Replay功能。

分布式

上述主题日志的分区,可以:

  1. 跨越Kafka集群中多个服务器存储
  2. 每个分区可以具有可配置数量的副本,用于容错

在启用复制的情况下,持有同一分区的多个服务器,其中一个被选举为Leader,其它则作为Followers。Leader处理所有针对分区的读写请求,Followers则仅仅进行复制。每个服务器都作为一些分区的Leader,另一些分区的Follower,以实现负载均衡。

生产者

生产者负责发布记录到主题上,它决定把哪个记录分配到哪个分区上。可选的策略例如循环轮询(round-robin)、某种和应用语义有关的方式(依据记录的某些键)。

消费者

Kafka的一个重要设计特点是,不区分主题、队列。这个特点是它灵活的消费模型提供的。

消费者可以将自己标记为属于某个消费者组(consumer group),发布到主题的每个记录会递送给消费者组消费,仅仅组中一个成员可以消费某个记录。这意味着:

  1. 某个主题仅仅包含单个消费者组:这种配置相当于传统MOM的多消费者的队列
  2. 某个主题包含多个单成员的消费者组:这种配置相当于传统MOM的主题

Kafka中消费行为的实现方式是:根据消费者数量,对日志分区进行划分。在任意时刻,每个消费者都公平的享有某一部分分区,此行为由Kafka协议动态处理。如果新的消费者加入,它将从其它消费者那里接管一部分分区。如果某个消费者宕机,它拥有的分区将由剩余的消费者瓜分。

Kafka仅仅在分区内部保证顺序性,在大部分情况下,基于记录键的分区+分区内有序是足够满足需要的。如果一定需要全局性顺序保证,你可以设计仅仅包含单个分区的主题,并且对于每个消费者组,仅仅有单个消费者。

保证

Kafka提供以下保证:

  1. 一个生产者发布到某个特定分区的记录,保证按照其发送顺序附加到日志的尾部
  2. 消费者看到的记录顺序,和它们在分区中存储的顺序一致
  3. 对于具有复制因子N的主题,Kafka允许N-1服务器同时宕机,而不丢失任何已经提交到日志的记录
作为...的Kafka
消息系统

传统的消息系统模型有两种:队列、订阅/发布。队列的优势是容易实现消费者的负载均衡,但是消息一旦被消费就消失不能供多人消费,订阅/发布(主题)的优缺点则刚好相反。

Kafka的创新之处在于,将队列、主题合而为一。

Kafka还提供比传统MOM更强的有序性。尽管传统MOM支持多个消费者,消息也是按序被消费的,但是这些消费者接收消息是异步的,顺序无法保证。Kafka的分区机制能够增强有序性,分区被特定单个消费者消费,不会出现顺序混乱的情况。

存储系统

Kafka主题中的记录被消费后,不会消失,这让它可以作为in-flight消息的存储系统。

事实上,Kafka是一个优秀的存储系统,因为:

  1. 写入的数据被复制,支持容错。Kafka允许生产者等待一个Ack —— 记录被合理复制、持久化后才认为操作成功
  2. 磁盘数据结构非常可扩容,不管是存储10KB还是10TB记录,性能不会有显著变化
流处理系统

Kafka不仅仅能够用来读取、写入、存储数据流,它还能用来进行实时流处理。

在Kafka中,一个流处理器是这样的一个程序:它接受持续不断的数据流输入(从主题),处理后产生持续不断的数据流输出(到主题)。

使用Producer/Consumer可以实现先单的流处理器,更复杂的需求可以基于Streams API实现。Streams API支持聚合、连接等操作,支持乱序数据处理、输入再加工、有状态计算等应用场景。

Streams API在Kafka提供的核心原语上构建,它使用生产者/消费者API作为输入,使用Kafka作为状态存储,使用组机制实现多个流处理器的负载均衡。

小结

联用Kafka的存储+低延迟订阅功能,可以让你处理历史、未来数据的方式变得一致 —— 基于Kafka的应用程序可以处理历史数据,当它处理完最后一个记录时,不需要退出,只需要等待未来的数据到达即可继续处理。Kafka的可靠存储功能,让你可以很容易的集成某些需要定期下线维护的系统。

典型应用场景
消息传输

消息代理可以用于解耦消息生产者和消费者、缓冲待处理消息,Kafka可以作为传统消息代理的替代品。

和大部分消息代理相比,Kafka具有更好的吞吐能力,并内置数据分区、容错等特性。

活动跟踪

Kafka的一个原始的应用场景是,辅助重建用户活动跟踪的管线。用户的活动(页面浏览、搜索等)被按照活动类型收集并发布到不同的主题上。订阅这些主题后,可以进行实时处理、实时监控、加载到Hadoop以便离线处理。

活动跟踪的数据量通常情况下比消息传输大的多。

度量记录

Kafka可以用记录、处理监控数据。它可以从分布式应用中获取度量信息并进行聚合操作。

日志聚合

Kafka可以用来从多个服务器上收集日志,并集中起来(例如存储到HDFS)以处理。Kafka抽象掉日志文件的概念,日志编程一系列消息组成的流。

流处理

从0.10开始Kafka内置了一个轻量的流处理框架Kafka Streams,使用此框架可以构建数据处理管线,针对数据进行聚合、转换等操作。

Apache Storm等流处理框架可以和Kafka很好的集成。

提交日志

Kafka可以作为分布式系统的外部提交日志(commit-log)。提交日志用于辅助节点之间的数据复制、失败节点的再同步。Kafka的log compaction特性用于支持这一应用场景,在这种场景下Kafka类似于Apache BookKeeper。

起步
下载安装

到Kafka官网下载压缩包,并解压。Kafka是基于JVM的应用,因此你需要事先安装好JRE。

启动服务
启动ZooKeeper

Kafka依赖于ZooKeeper,你可以使用外部的ZooKeeper服务器。或者,利用Kafka提供的脚本,创建一个简单的单节点ZooKeeper实例:

Shell
1
2
3
pushd ~/JavaEE/middleware/kafka/ > /dev/null
# 启动ZooKeeper
zookeeper-server-start.sh config/zookeeper.properties
启动Kafka
Shell
1
kafka-server-start.sh config/server.properties
创建主题
Shell
1
2
3
4
# 连接到ZooKeeper,来操控Kafka
# replication-factor 复制因子,也就是数据复制的份数
# partitions 分区数量
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

除了手工创建主题外,还可以配置,让代理在有客户端尝试发布消息时自动创建不存在的主题。

执行下面的命令可以列出现有的主题:

Shell
1
kafka-topics.sh --list --zookeeper localhost:2181
发布消息

Kafka提供了一个命令,用于从标准输入或者文件中读取信息,并发布到到主题上:

Shell
1
kafka-console-producer.sh --broker-list localhost:9092 --topic test

默认的每一行输入都作为单独的消息发送。

消费消息

Kafka提供了一个命令,用于消费消息并将其内容打印到标准输出:

Shell
1
2
# 从头开始消费
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

如果你在两个Terminal同时打开生产者、消费者,然后在生产者那里输入文字并回车,文字会原样的出现在消费者窗口中。

代理集群

对于Kafka来说,单个代理也是集群,也就是说它天然是集群的。要配置包含多个代理实例的集群,没有什么特别之处。本节演示如何创建三个实例组成的集群。

拷贝配置文件
Shell
1
2
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
修改配置文件

参考下面的示例修改:

Shell
1
2
3
4
5
# 对于每个集群中的任何一个节点,id必须是唯一的、永久不变的
broker.id=1
# 由于我们准备在一台机器上启动多个代理实例,因此需要定制某些参数:
listeners=PLAINTEXT://:9093
log.dir=/tmp/kafka-logs-1
启动额外实例

执行下面的命令启动两个额外的实例,加上本章最初创建的那个,组成三成员的集群:

Shell
1
2
kafka-server-start.sh config/server-1.properties
kafka-server-start.sh config/server-2.properties
复制的主题
Shell
1
2
3
# 复制因子3,表示数据复制三份
# 分区数量1
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1  --topic reptest
查看主题信息

执行下面的命令,可以看到刚刚创建的主题的相关信息,以及各代理实例和主题的关系:

Shell
1
2
3
4
5
6
7
kafka-topics.sh --describe --zookeeper localhost:2181 --topic reptest
# Topic:reptest    PartitionCount:1    ReplicationFactor:3    Configs:
# 对于每个分区(本主题只有一个分区),列出以下信息
# Leader 负责该分区所有读写操作的节点(代理实例)ID
# Replicas 复制此分区的所有节点列表,不管是否alive,不管是否leader
# Isr 和Leader保持同步(in-sync)的节点,这些节点必然是alive的
#     Topic: reptest    Partition: 0    Leader: 0    Replicas: 0,1,2    Isr: 0,1,2 
导入导出数据

使用Kafka Connect这一工具,你可以从其它数据源导入数据到kafka,或者将Kafka中的数据导出到其它系统。对于很多系统来说,导入导出不需要手工编写集成代码。

流式数据处理

Kafka Streams是一个客户端库,用于构建输入/输出数据存储于Kafka集群的实时应用或者微服务,并满足可扩容、弹性、容错、分布式的质量需求。

API概览 

Kafka提供了5类核心API:

API 说明
Producer 发布数据流到Kafka集群的主题上
Consumer 从Kafka集群的主题上读取数据流
Streams 在输入主题、输出主题之间进行数据流的转换
Connect 实现连接器,可以持续的从外部数据源拉拉取数据并发布到主题,或者持续的将主题推送到外部应用或者Sink系统
AdminClient 管理、查看代理、主题以及其它Kafka对象
生产者

要使用此API,可以引入Maven依赖:

XML
1
2
3
4
5
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version>
</dependency>
KafkaProducer

此类是一个Kafka客户端,用于发送记录到Kafka集群。此类是线程安全的,使用单个实例通常性能更好。示例代码:

Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package cc.gmem.study.kafka;
 
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
 
import java.util.Properties;
 
/**
* Entrypoint
*/
public class Application {
 
    public static void main( String[] args ) {
        // 指定生产者配置
        Properties props = new Properties();
        props.put( "bootstrap.servers", "172.21.3.1:9092,172.21.3.2:9092,172.21.3.3:9092" );
        props.put( "acks", "all" );  // 仅当所有Replica确认后,才认为记录提交成功
        props.put( "retries", 0 );   // 不重试,注意重试可能引起重复消息
        props.put( "batch.size", 16384 ); // 每个分区可以占用的缓冲区大小
        props.put( "linger.ms", 1 ); // 即使缓冲区有空间,批次也可能立即被发送,此配置引入延迟
        props.put( "buffer.memory", 33554432 ); // 最大可用缓冲区
        // 如何把键值转换为字节
        props.put( "key.serializer", "org.apache.kafka.common.serialization.StringSerializer" );
        props.put( "value.serializer", "org.apache.kafka.common.serialization.StringSerializer" );
        /**
         * 创建生产者
         * 生产者主要由一个缓冲池(存放尚未发送到服务器的记录)和一个后台IO线程(负责将记录转换为请求发送)组成
         *
         */
        Producer<String, String> producer = new KafkaProducer<>( props );
 
        for ( int i = 0; i < 10; i++ ) {
            String topic = "TEST";
            String key = "KEY-" + Integer.toString( i );
            String value = "VALUE-" + Integer.toString( i );
            /**
             * 生产者记录:需要发送到特定Kafka主题的键值对
             *
             * 你可以为记录指定一个分区号,这样记录会发送到该分区中。如果不指定分区号但是给出了键,则键的哈希
             * 用于确定分区号。如果分区号、键都没有指定,则以循环轮流的方式发送到所有可用分区
             *
             * 你还可以为记录指定一个时间戳,如果不指定默认使用当前时间
             * 如果主题配置为TimestampType#CREATE_TIME,则上述时间戳将被代理使用
             * 如果主题配置为TimestampType#LOG_APPEND_TIME,则代理覆盖记录的时间戳,以实际添加到日志时代理的本地时间为准
             * 实际生效的时间戳将通过RecordMetadata返回给客户端
             *
             */
            ProducerRecord<String, String> record = new ProducerRecord<>( topic, key, value );
            /**
             * 发送方法是异步的,它只是把消息存放到缓冲区中然后立即返回
             * 处于效率考虑,生产者可能批量的发送记录
             */
            producer.send( record );
        }
        /**
         * 必须要保证生产者的关闭,否则线程、缓冲区资源无法释放
         */
        producer.close();
    }
}

从0.11开始KafkaProducer支持两种额外的模式:幂等模式、事务模式。

幂等模式

该模式增强了递送语义,将至少一次递送增强为精确的一次递送。注意:

  1. 这种幂等性仅仅在单个会话内部保证
  2. 客户端的retry不会引发记录的重复

要启用该模式,需要配置 enable.idempotence=true。执行此配置后,retries默认值为 Integer.MAX_VALUE 且ack默认值为all。

幂等模式下,API的调用方式无变化。

事务模式

该模式允许客户端原子的向多个分区、甚至多个主题发送数据。

要启用该模式,需要设置 transactional.id。如果设置了此属性则幂等性会自动开启。参与到事务中的主题应当具有至少3的复制因子且这些主题的 min.insync.replicas至少为2。

为了保证端到端的事务性,消费者应该设置为仅仅读取已提交消息。

transactional.id的目的能够跨越单个生产者实例的多个会话进行事务恢复。此ID通常根据分片的、有状态的应用程序的分片标识符(Shard Identifier)产生。对于一个分片应用程序中的每个生产者实例来说,此ID必须唯一。

事务性的API是阻塞的,并且失败时会抛出异常,示例:

Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
Properties props = new Properties();
props.put( "bootstrap.servers", "localhost:9092" );
// 每个生产者同时只能打开一个事务
props.put( "transactional.id", "my-transactional-id" );
Producer<String, String> producer = new KafkaProducer<>( props, new StringSerializer(), new StringSerializer() );
/**
* 在事务模式下,必须首先调用下面的方法。此方法的行为如下:
* 1、确保由先前实例发起的、相同ID的事务已经完成。如果先前实例在进行事务的过程中失败了,其事务在此被中止
*    如果上一个事务正在提交,此方法会等待其完成
* 2、获得内部的生产者id、epoch,并在所有事务性消息上使用
*/
producer.initTransactions();
 
try {
    // 只要启用了事务模式,任何记录都必须在事务中发送
    // 启动一个新事务
    producer.beginTransaction();
    for ( int i = 0; i < 10; i++ ) {
        String topic = "test";
        String key = "KEY-" + Integer.toString( i );
        String value = "VALUE-" + Integer.toString( i );
        // 不需要注册send的回调或者使用get()获得Future对象
        producer.send( new ProducerRecord<>( topic, key, value ) );
    }
    /**
     * 提交事务,此方法会首先刷出所有尚未发送的消息,然后再提交
     * 任何一次send()调用出现不可恢复的错误,该方法会再接收到错误后立即抛出异常,事务无法提交
     * 为了事务能提交,所有send()必须按序的成功
     */
    producer.commitTransaction();
} catch ( ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e ) {
    // 无法从这些异常中恢复
    producer.close();
} catch ( KafkaException e ) {
    // 对于其他类型的异常,可以中止然后重试
    // 中止后,所有已经成功的写操作会被标记为aborted
    producer.abortTransaction();
}
producer.close(); 
消费者

要使用此API,可以引入Maven依赖:

XML
1
2
3
4
5
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version>
</dependency>
KafkaConsumer

使用该类能够从Kafka集群消费记录。此客户端:

  1. 能够透明的处理代理实例的失败。也就是说,如果代理失败导致分区迁移到其他代理,不需要手工处理
  2. 可以使用消费者组,与代理进行交互,让一组消费者进行负载均衡

与生产者不同,消费者不是线程安全的。

消费者需要维持到必要的代理的TCP连接,以抓取数据。不再使用消费者后,必须关闭它,否则这些连接会泄漏。

偏移量和消费位置

Kafka为分区中每个记录都维护一个数字的偏移量(Offset),这个偏移量是记录在分区中的唯一标识。

偏移量也用于标记消费者针对分区的当前消费位置(Position),例如消费者当前位置为5则意味着它已经消费过0-4记录并且下一个将被消费的记录是5。

对于消费者来说,“位置”有两层含义:

  1. 下一个将要读取的记录的偏移量,比消费者消费的最后一个记录的偏移量大1。每当消费者poll(long)了新记录后此位置自动更新
  2. 已提交位置(Committed Position),指已经被安全存储的偏移量值。消费者如果失败,重启后将根据此值进行恢复。该位置信息可以自动的定期提交。或者调用commitSync/commitAsync显式的提交
消费者组和主题订阅

Kafka引入消费者组(Consumer Group)的概念,来允许一池子的消费者划分消费、处理记录的工作。这些消费者可以运行在同一台或者多台机器上允许。任何具有相同 group.id的消费者都属于同一个组。

通过subscribe接口,组中的每个消费者都可以动态的订阅主题。Kafka会把每个消息递送给组中的单个消费者,具体实现方式是,通过负载均衡,让主题的任一分区仅供单个消费者来消费,组中的消费者会被尽可能的分配相同个数的分区。

组中的成员是动态维护的,如果某个消费者宕掉,分配给它的分区会重新分配给组中其它成员。当新消费者加入后,既有消费者持有的分区会转移给它。此所谓再平衡(Rebalancing)。当发生再平衡时,消费者可以通过 ConsumerRebalanceListener得到通知,从而进行状态清理、手工位置提交等操作。

从概念上,你可以把组看做单个逻辑的消费者。这个逻辑消费者不会重复的消费某个消息。

检测消费者失败

当订阅1-N个主题后,消费者首次调用poll(long)时会加入到消费者组。poll的实现能够监测消费者是否失败。只要你继续调用poll,消费者就会保持组成员的身份,并且持续的接收消息。在底层,消费者会定期的发送心跳给代理,如果消费者失败且代理在session.timeout.ms内没有收到心跳,就革除消费者的组成员资格并触发再平衡。

某些情况下,消费者虽然没有崩溃,但是它却进入一种livelock的状态。也就是说,尽管消费者仍然在正常发送心跳,但是它已经不能进行任何消息处理了。为了检测这一情况Kafka引入了max.poll.interval.ms配置,如果消费者在此时间内没有发起过poll()调用,代理会主动革除其组成员资格并触发再平衡。如果这种革除操作发生后,消费者进行位置提交时会收到CommitFailedException。为了保持组成员资格,消费者必须不断的poll。

增大max.poll.interval.ms参数,消费者可以用更充足的时间来处理poll()得到的记录。这样做的缺点是,再平衡可能延迟,因为仅在poll()调用时,消费者才能参与到再平衡中。

配置max.poll.records参数可以限制单次poll()操作能够返回的记录数量。通过减小此参数,你就能相应的调低max.poll.interval.ms。

如果消息处理时间无法估算,你可能需要将处理消息的逻辑从poll()线程中转移出去,由独立的线程处理。这样做的话你需要小心的管理提交位置,因为只有前述的独立线程才知道何时消息处理完毕。通常情况下需要禁用自动的位置提交,由独立线程手工的进行提交。

如果消息抓取的快而处理的相对较慢,你可以考虑pause()目标分区,这样poll()操作就不会抓取新的记录。

手工和自动提交

使用自动偏移量提交,可以保证最少一次递送语义。前提是,你必须在后续调用(或者关闭消费者)之前,消费上一次poll()返回的全部数据。如果无法保证全部消费,自动提交的偏移量可能比实际成功消费的偏移量大,导致记录遗漏。

自动偏移量提交的消费者代码示例:

Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Properties props = new Properties();
props.put( "bootstrap.servers", "172.21.3.1:9092,172.21.3.2:9092,172.21.3.3:9092" );
props.put( "group.id", "group0" );
// 启用自动的位置提交
props.put( "enable.auto.commit", "true" );
// 自动提交的间隔
props.put( "auto.commit.interval.ms", "1000" );
props.put( "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
props.put( "value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" );
KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props );
// 可以同时订阅多个主题
consumer.subscribe( Arrays.asList( "test", "hello" ) );
while ( true ) {
    // poll并等待100ms
    ConsumerRecords<String, String> records = consumer.poll( 100 );
    for ( ConsumerRecord<String, String> record : records ) {
        String topic = record.topic();
        int partition = record.partition();
        long offset = record.offset();
        String key = record.key();
        String value = record.value();
        LOGGER.debug( "Record from {}/{}, offset: {}, Key: {}, Value: {}", topic, partition, offset, key, value );
    }
}

手工偏移量提交的消费者代码示例:

Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// ...
props.put( "enable.auto.commit", "false" );
// ...
final int minBatchSize = 200;
while ( true ) {
    ConsumerRecords<String, String> records = consumer.poll( 100 );
    buffer.add( records );
    if ( buffer.size() >= minBatchSize ) {
        // 在这里进行批量处理,例如持久化
        // 手工提交一次偏移量,这意味着之前收到的消息均被标记为已提交
        consumer.commitSync();
        buffer.clear();
    }
}

某些情况下,你可能希望对偏移量的提交进行细粒度的控制,例如按分区单独提交:

Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
try {
    while ( running ) {
        ConsumerRecords<String, String> records = consumer.poll( Long.MAX_VALUE );
        // 按分区处理
        for ( TopicPartition partition : records.partitions() ) {
            // 获取当前分区的所有记录
            List<ConsumerRecord<String, String>> partitionRecords = records.records( partition );
            for ( ConsumerRecord<String, String> record : partitionRecords ) {
                // 消费
            }
            // 获取最后一个记录的偏移量
            long lastOffset = partitionRecords.get( partitionRecords.size() - 1 ).offset();
            // 提交当前分区的消费偏移量:必须指向下一个需要被消费的记录
            consumer.commitSync( Collections.singletonMap( partition, new OffsetAndMetadata( lastOffset + 1 ) ) );
        }
    }
} finally {
    consumer.close();
}
手工分区分配

上面的两个例子中,分区由Kafka公平的分配给组中的消费者实例。某些情况下,你可能需要更细粒度、主动的控制,例如:

  1. 消费者在本地存储了状态,这些状态和特定的分区相关。因此消费者仅应抓取目标分区的记录
  2. 消费者本身具有HA特性,例如它被YARN之类的集群管理框架管理、属于Storm等流处理框架的一部分。这种情况下,不需要Kafka进行故障检测,因为消费者失败后会自动的重新启动

要手工分配分区,可以使用assign代替subscribe调用:

Java
1
2
3
4
5
String topic = "test";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
// 将分区0、1分配给消费者
consumer.assign(Arrays.asList(partition0, partition1));

执行上述分配后,你就可以进行poll。消费者的配置参数group.id仍然用于偏移量的提交。除非再次调用assign,分区和消费者的映射关系不会发生变化,因为手工分配模式下组协调器不生效,消费者失败不会导致再平衡。

尽管不同消费者实例可以共享组ID,但是它们是独立工作的。为了避免偏移量提交冲突,手工分配模式下,通常为每个消费者指定唯一性的组ID。

注意:自动分区分配(subscribe)和手工分区分配不能一起使用。 

在外部存储偏移量 

尽管Kafka内置的消费偏移量的存储机制,但是你并不一定要使用它。

应用程序独立管理消费偏移量的情况很常见,记录处理结果往往需要和消费偏移量原子的写入到同一存储系统,提供更强的精确一次性语义。典型的例子:

  1. 如果在RDBMS中存储消费后的结果,那么你可以在单个事务中同时提交消费偏移量和消费结果
  2. 如果在本地存储消费后的结果,连带存储消费偏移量也能带来好处。假设你希望基于某个分区的数据构建一个索引,索引数据可以和偏移量一起存储。只要能保证索引写和偏移量写原子的进行,那么即使宕机导致数据没有同步到磁盘,后续仍然可以恢复而不丢失数据

每个记录都自带了偏移量信息,因此,要外部存储偏移量,你只需要:

  1. 配置 enable.auto.commit=false
  2. 手工存储ConsumerRecord提供的偏移量
  3. 消费者重启后,调用 seek(TopicPartition, long)恢复上次消费位置

上述步骤配合手工分区分配,最为简单。

如果配合自动分区分配,则需要考虑可能的再平衡的影响。你需要调用 subscribe(Collection/Pattern, ConsumerRebalanceListener)并提供一个ConsumerRebalanceListener监听器:

  1. 实现onPartitionsRevoked(Collection)方法,这样当分区从消费者处拿走时,当前消费者有机会存储偏移量
  2. 实现onPartitionsAssigned(Collection),这样当消费者被授予新分区时,当前消费者能够读取先前存储的偏移量并seek到适当的位置

ConsumerRebalanceListener还可以用于,在分区迁移时,刷出应用程序维护的和目标分区相关的任何缓存信息。

消费位置控制

大部分情况下,消费者只是简单的从头开始消费,定期的提交消费偏移量。

Kafka也允许消费者任意的指定消费位置:

  1. 消费者可以跳转到更小的偏移量,对历史数据进行重新消费。某些情况下消费者可能需要在重启后从头消费,已建立本地的缓存状态
  2. 消费者可以跳转到更大的偏移量,跳过一部分未消费的数据。在时间敏感的记录处理程序中会这样跳转,当消费者处理速度赶不上时,可能直接跳过一部分记录

和跳转相关的消费者方法包括:

  1. seek(TopicPartition, long),跳转到指定偏移量
  2. seekToBeginning(Collection),跳转到最小的偏移量
  3. seekToEnd(Collection),跳转到最大的偏移量
消费流控制

如果给消费者分配了多个分区,它默认会尝试同时读取所有分区中的可用数据,也就是分区的优先级是一样的。

某些情况下,可能需要优先读取某个分区的数据,仅仅在有空闲的时候才消费其它分区的数据。例如:

  1. 在流处理程序中,你抓取两个主题的数据,并对两个流进行Join操作。如果其中一个流的消费进度远远拉下,则无法有效的Join。这时可能需要把消费快的那个流暂停下来
  2. 同时需要抓取历史数据和实时数据时,可能需要优先抓取实时数据

调用 pause(Collection)可以暂停消费已分配的分区,调用 resume(Collection)则可以从暂停中恢复。暂停以后,调用poll()不会获取被暂停分区的数据。

读取事务性消息

从0.10引入的事务支持,允许生产者原子的写入多个分区、主题的数据。为了配合事务性生产者,消费者必须配置 isolation.level=read_committed。

启用读取已提交这一隔离级别后,消费者仅会读取已经成功提交的事务性消息。对于非事务性消息,读取方式和以前一致。

读取已提交模式下,客户端并没有缓冲机制。这种模式下消费者针对分区的读偏移量,是该分区中第一个属于进行中事务的记录的偏移量。此偏移量被称为最后稳定偏移量(Last Stable Offset,LSO)。一个进行中的事务可以影响多个分区甚至主题的LSO。

读取已提交模式下的消费者,会最多读取到LSO的前一个位置,并且会过滤掉所有Aborted的任何事务性消息。消费者调用seekToEnd(Collection)、endOffsets(Collection)的结果也会执行LSO。度量值抓取延迟(Fetch Lag)也相对于LSO计算。

包含了事务性消息的分区,会包含提交、中止(Abort)标记,用以说明事务的成功与否。这些标记体现为主题日志中的记录,并且这些记录不会返回给客户端。插入在日志中的标记会导致消费者看到消息偏移量之间出现空隙,不管是哪种隔离级别的消费者都会看到这种空隙。对于读取已提交的消费者,空隙还可能由于中止的事务导致。

多线程处理

KafkaConsumer是非线程安全的。所有的网络IO发生在调用线程中。多线程访问时的同步必须由调用者保证,未同步访问可能导致ConcurrentModificationException。

唯一不需要同步的例外是wakeup()调用。此调用通常在其它线程中发起,中断正在进行中的poll()操作以便关闭消费者:

Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class KafkaConsumerRunner implements Runnable {
     private final AtomicBoolean closed = new AtomicBoolean(false);
     private final KafkaConsumer consumer;
 
     public void run() {
         try {
             consumer.subscribe(Arrays.asList("topic"));
             while (!closed.get()) {
                 ConsumerRecords records = consumer.poll(10000);
                 // 处理抓取到的记录
             }
         } catch (WakeupException e) {
             // poll()被中断会导致该异常
             if (!closed.get()) throw e;
         } finally {
             consumer.close();
         }
     }
 
     // 下面的方法可以被外部线程调用
     public void shutdown() {
         closed.set(true);
         // 中断正在进行中的poll()方法
         consumer.wakeup();
     }
}

Kafka没有设计消费者的线程模型,开发者可以按需开发。

每个消费者独占一个线程的方式,其优势为:

  1. 容易实现
  2. 由于不需要跨线程的协调,往往是最快的方式
  3. 按照分区保证消息处理顺序很容易实现,线程简单的根据获得消息的顺序消费它们

其缺点是:

  1. 多个消费者之间,无法合并请求来批量发送给服务器处理,降低了吞吐能力
  2. 线程的总数受限于分区的总数,因为每个消费者至少需要独占一个分区

为了避免上述线程模型的缺点,可以将消息的消费和处理进行解耦。由一个或者多个消费者线程抓取数据,并将数据ConsumerRecords存放到阻塞队列中。由一个处理线程池来处理队列中的记录。该方式的优势是:

  1. 可以独立的对消费者、处理者线程进行扩容
  2. 需要注意数据的处理顺序问题,由于线程调度的随机性,旧记录实际的处理时间可能在新记录的后面。如果不关注顺序性则不是问题
  3. 手工的消费偏移量提交变得困难,需要协调多个线程,确认针对分区的消费已经完成
流处理器

要使用Kafka Streams API,可以引入Maven依赖:

XML
1
2
3
4
5
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>1.0.0</version>
</dependency>

详细的讨论参考Streams API一章。 

连接器

很多场景下,你不需要直接使用Connect API,只需要使用内置的Connector就可以了。

详细的讨论参考Connect API一章。

管理客户端

要使用此API,可以引入Maven依赖:

XML
1
2
3
4
5
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>1.0.0</version>
</dependency>

使用AdminClient.create()可以创建管理客户端的实例(KafkaAdminClient)。此管理客户端可以用于管理、查看主题、代理、配置、ACL。代理的版本必须在0.10.0以上。

Kafka Streams

参考Kafka Streams学习笔记

Kafka Connect

Kafka Connect是用于在Kafka和其它系统之间进行可扩容、可靠的流传输的工具。使用它你可以快速的定义将大量数据输入到Kafka或者从Kafka输出的“连接器”。

使用Kafka Connect可以读取整个数据库,收集来自所有应用程序的度量信息并存入Kafka主题,并让这些数据适合用于实时的流处理。 

使用Kafka Connect可以将主题数据导出到存储系统、查询系统或者批处理系统,便于离线分析。

特性列表

通用的连接器框架

Kafka Connect标准化了其它数据系统和Kafka的集成机制,简化了连接器的开发、部署和管理

分布式和独立运行模式

Kafka Connect可以分布式集群运行,也可以缩小到单个实例运行,满足不同规模应用程序的需要

Kafka Connect基于现有的Kafka组管理协议,需要扩容的时候,把新的Worker添加到Kafka Connect集群中即可

REST接口

可以通过REST接口向Kafka Connect Cluster提交连接器,或者管理连接器

自动偏移量管理

只需要连接器提供少量的信息,框架就能够自动管理偏移量提交过程。这样开发者可以从连接器开发的容易出错的部分解放出来

桥接流式和批量处理系统

使用Kafka既有的特性,Kafka Connect提供了桥接流式(实时)处理系统、批量处理系统的解决方案

使用连接器

本节给出一个简单的例子,说明如何配置、运行、管理单实例的Kafka Connect。

运行Kafka Connect

单实例模式下,所有工作都在单个进程(Worker)内部完成。单实例模式可能适用于日志文件收集这样的场景,但是不能利用Kafka Connect的容错特性。

启动单实例连接器的命令格式如下:

Shell
1
2
3
# 第一个参数:Worker的配置
# 后续参数:各连接器的配置
connect-standalone.sh config/connect-standalone.properties connector1.properties connector2.properties ...

分布式模式下,负载均衡被自动处理,允许随时按需扩容。多实例模式提供活动Task、配置信息、偏移量提交数据的容错:

Shell
1
2
# 不支持通过命令行提供连接器配置
connect-distributed.sh config/connect-distributed.properties

单实例和分布式模式会执行不同的Java类,这两个类会读取Worker配置并决定在何处存储配置信息、如何分配任务、在何处存储偏移量和任务状态

Worker配置

基础的通用配置项:

配置项 说明
bootstrap.servers 如何连接到Kafka集群
key.converter
value.converter
如何将键值从Java对象形式转换为Kafka串行化格式

单实例Worker专有配置如下:

配置项 说明
offset.storage.file.filename  此参数对单实例模式很重要,指明在何处存放偏移量数据

上面的Worker配置供Kafka Connect创建的生产者、消费者使用,以便访问配置、偏移量、状态主题。

对于Kafka Source和Sink任务,上述配置可以前缀consumer.和.producer,来针对任务进行配置。唯一从Worker继承来的配置项是 bootstrap.servers。

分布式模式下,Kafka Connect在Kafka主题中存储偏移量、配置和任务状态。你应当手工创建用于存放这些信息的主题,以便定制分区数量和复制因子。如果Kafka Connect启动时这些主题不存在,将以默认参数自动创建。

分布式Worker专有配置如下:

配置项 说明
group.id

集群的唯一名称,默认connect-cluster。用于构成Kafka Connect集群组

此名称不能和Kafka消费者组名冲突

config.storage.topic

用于存放Connector、Task配置的主题的名称,默认connect-configs

应当是单个分区、大复制因子、压缩格式的主题

offset.storage.topic 用于存放偏移量。应当是多分区、复制的、压缩的主题
status.storage.topic 用于存放状态信息。应当是多分区、复制的、压缩的主题
连接器配置

你可以指定多个连接器配置,但是这些连接器都是在Worker进程内使用不同的线程运行。

注意:分布式模式下,连接器配置不通过命令行参数指定。你需要通过REST API来创建、修改、销毁连接器。

连接器配置项也是键值对形式,具体包含的配置项取决于不同的连接器。公共的配置项包括:

配置项 说明
name 连接器的唯一性名称,尝试注册重复名称的连接器会导致失败
connector.class 连接器的Java类,可以指定权限定名或者别名,例如org.apache.kafka.connect.file.FileStreamSinkConnector、FileStreamSink、FileStreamSinkConnector都表示同一个连接器
tasks.max 最多为此连接器创建的Task数量
key.converter
value.converter
覆盖Worker配置指定的键值转换器
topics 作为此连接器输入/输出的主题列表
REST API

Kafka Connect允许通过REST API来管理连接器。Web服务默认暴露在8083端口,目前支持的端点如下表:

端点

GET /connectors    返回活动连接器的列表

POST /connectors 创建一个新的连接器,请求体必须是JSON格式,包含字段name、config
GET /connectors/{name} 获取指定连接器的信息
GET /connectors/{name}/config 获取指定连接器的配置 
PUT /connectors/{name}/config 设置指定连接器的配置

GET /connectors/{name}/status 获取指定连接器的状态,例如

  1. 连接器是否正在运行、失败、暂停
  2. 连接器被分配给了哪个Worker 
  3. 连接器的所有Task列表
GET /connectors/{name}/tasks 获取指定连接器正在允许的Task列表 
GET /connectors/{name}/tasks/{taskid}/status 获取指定Task的状态信息
PUT /connectors/{name}/pause 暂停连接器及其Tasks。消息处理将被暂停
PUT /connectors/{name}/resume 恢复一个被暂停的连接器
POST /connectors/{name}/restart 重新启动一个连接器 
POST /connectors/{name}/tasks/{taskId}/restart 重新启动某个任务
DELETE /connectors/{name} 删除连接器,中止所有Task并移除配置
转换

通过配置,可以让连接器对消息进行修改或者叫转换,转换可以形成一个链条:

配置项 说明
transforms 转换器的列表,配置顺序对应了转换器的执行顺序
transforms.$alias.type

$alias为转换器指定一个别名

转换器的全限定类名

transforms.$alias.$transformationSpecificConfig 转换器的私有配置项

这里给出一个示例:内置的文件源连接器 + 添加静态字段的转换器。

本示例使用模式自由的JSON数据格式,因此需要修改Worker 配置文件:

connect-standalone.properties
Shell
1
2
key.converter.schemas.enable=false
value.converter.schemas.enable=false

在本示例中文件源连接器读取文件的每一行,将其包装为Map,然后添加一个字段用于识别数据来源。为完成这些逻辑,我们需要添加两个转换器:

  1. HoistField:将输入纳入到一个Map中
  2. InsertField:添加一个字段

修改后的连接器配置如下:

connect-file-source.properties
Shell
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
name=local-file-source
# 从文件读取数据(文件作为源),输出到Kafka主题
connector.class=FileStreamSource
tasks.max=1
# 连接器专有配置
file=test.txt
topic=connect-test
# 指定转换器列表
transforms=MakeMap, InsertSource
# 将输入消息转换为键line的值
transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value
# 转换器的配置项
transforms.MakeMap.field=line
# 添加一个静态键值
transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value
transforms.InsertSource.static.field=data_source
transforms.InsertSource.static.value=test-file-source

应用此连接器后,主题的记录形式如下:

JavaScript
1
2
{"line":"hello","data_source":"test-file-source"}
{"line":"world","data_source":"test-file-source"}
内置转换器 

常用的内置转换器如下表:

配置项 说明

org.apache.kafka.connect.transforms.InsertField

通过静态值或者记录的元数据,产生一个字段,并插入到记录的键或者值。具体子类型InsertField$Key、InsertField$Value分别针对记录的键、值进行转换

offset.field

存放Kafka偏移量的字段名,仅适用于Sink连接器

如果此配置前缀以 !则表示标注此字段为必须,前缀以 ?则表示可选

partition.field 存放Kafka分区编号的字段名,前缀!/?表示必须/可选
static.field 一个存放静态数据的字段的名称,前缀!/?表示必须/可选
static.value 上述静态字段的值
timestamp.field 存放Kafka记录时间戳的字段名,前缀!/?表示必须/可选
topic.field 存放Kafka主题名的字段的名字,前缀!/?表示必须/可选

org.apache.kafka.connect.transforms.ReplaceField

过滤或者重命名现有的字段。具体子类型ReplaceField$Key、ReplaceField$Value分别针对记录的键、值进行转换

blacklist  需要过滤掉的字段名,优先级比whitelist高
renames  字段重命名映射,格式:oldname:newname,oldname2:newname2 
whitelist  需要包含的字段名,不在列表中的所有字段被过滤掉

org.apache.kafka.connect.transforms.MaskField

将指定字段的值替换为一个合法(类型相关)的“空值”,例如0、false、空串。具体子类型MaskField$Key、MaskField$Value分别针对记录的键、值进行转换

fields 需要被遮掩的字段列表

org.apache.kafka.connect.transforms.HoistField

当具有限定的Schema时,使用指定的结构来包裹数据;当使用自由Schema时,使用Map包裹数据。HoistField$Key、HoistField$Value分别针对记录的键、值进行转换

field 包裹记录数据的字段的名字

org.apache.kafka.connect.transforms.ExtractField

从结构(限定Schema)或者Map(自由Schema)中抽取一个字段。任何空值不做修改。ExtractField$Key、ExtractField$Value分别针对记录的键、值进行转换

field  需要抽取的字段的名字

org.apache.kafka.connect.transforms.SetSchemaMetadata

针对键(SetSchemaMetadata$Key)或值(SetSchemaMetadata$Value)来设置Schema名称、版本

schema.name string,模式的名称 
schema.version int,模式的版本

org.apache.kafka.connect.transforms.TimestampRouter

根据原始topic、记录时间戳来更新记录的topic字段

主要用于Sink连接器,原因是topic字段常常在Sink连接器中用作确定目标系统中的实体名(例如数据库系统中的表名)

timestamp.format 对时间戳进行格式化的Pattern,此Pattern必须兼容java.text.SimpleDateFormat。默认yyyyMMdd
topic.format 更新后的topic字段的格式,可以使用${topic}、${timestamp}两个占位符

org.apache.kafka.connect.transforms.RegexRouter

使用正则式匹配替换,来更新记录的topic字段

regex 用于匹配的正则式
replacement 原topic中匹配项被替换为的字符串

org.apache.kafka.connect.transforms.Flatten

扁平化一个内嵌的数据结构,新产生的字段名以点号导航的形式产生。Flatten$Key、Flatten$Value分别针对记录的键、值进行转换

delimiter 字段名分隔符,默认点号

org.apache.kafka.connect.transforms.Cast

将字段、整个键或者整个值转换为指定的类型。例如可以强制一个int字段为short,仅仅支持基本类型和字符串。Cast$Key、Cast$Value分别针对记录的键、值进行转换

spec

field:type,field2:type2形式的转换说明。支持的类型包括:

int8, int16, int32, int64, float32, float64, boolean,string

org.apache.kafka.connect.transforms.TimestampConverter

转换时间戳的格式,TimestampConverter$Key、TimestampConverter$Value分别针对记录的键、值进行转换

target.type 期望的目标时间戳呈现格式:string、unix、Date、Time、Timestamp
field 期望被转换的时间戳字段,如果整个键或值是时间戳则置空 
format target.type=string时,指定SimpleDateFormat兼容的Pattern 
开发连接器
核心概念
概念 说明
Connector

为了让Kafka和其它系统进行数据交换,开发人员需要创建一个Connector,连接器有两类:

  1. SourceConnector,从其它系统导入数据到Kafka,例如JDBCSourceConnector将关系型数据库导入到Kafka
  2. SinkConnector,将Kafka中的数据导出到其它系统,例如HDFSSinkConnector将Kafka主题导出到HDFS文件

Connector本身不负责实际的数据拷贝工作,它只是把工作分配给一系列的Task进行处理

并非所有的Job都是静态的,例如JDBCSourceConnector,可以监控数据库并把每个表分配给一个Task。当数据库中创建了新表的时候,Connector需要发现此表并将其分配给某个Task,这是通过重新配置Connector实现的

Task

负责将数据的一个子集拷贝到Kafka,或者从Kafka拷贝出去。Task可以分配到不同的Worker上运行

数据子集必须能够转换为Schema一致的记录构成的输入或输出流。某些情况下数据子集和流的映射关系是很明显的,例如日志文件集中的每个文件都可以产生流,日志的每一行都对应一个记录。某些情况下Offset如何界定则不明显,例如JDBC连接器能够把表转换为流,但是流中记录的Offset如何界定呢?一个常见的方案是,使用表的时间戳字段作为Offset,并基于此字段构建查询,进行批量数据读取

Stream 每个流都是键值对的流水序列。键、值都可以具有复杂的结构,例如数组、嵌套对象
Record 不管是由Source生成的,还是输出给Sink的记录,都有关联的Stream ID和Offset。框架会定期的进行偏移量提交,这样发生失败后,可以从上一次提交的偏移量处恢复
Source连接器

开发连接器仅仅需要实现Connector、Task两个接口。

从文件读取记录的源连接器和任务:

FileStreamSourceConnector.java
Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package cc.gmem.study.kafka;
 
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;
 
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
 
public class FileStreamSourceConnector extends SourceConnector {
 
    private String filename;
 
    private String topic;
 
    private final String FILE_CONFIG = "filename";
 
    private final String TOPIC_CONFIG = "topic";
 
    /**
     * 此连接器的任务实现类
     * 需要在Worker中实例化的,实际的从外部读取数据的任务的Java类
     */
    public Class<? extends Task> taskClass() {
        return FileStreamSourceTask.class;
    }
 
    /**
     * 生命周期回调,启动连接器
     * 该方法仅仅会在"干净"的连接器上面调用,所谓干净,要么是刚刚实例化、初始化的
     * 要么是已经被停止的
     *
     * @param props 传入的配置信息
     */
    public void start( Map<String, String> props ) {
        filename = props.get( FILE_CONFIG );
        topic = props.get( TOPIC_CONFIG );
    }
 
    /**
     * 声明周期回调,停止连接器
     */
    public void stop() {
 
    }
 
    /**
     * 产生最多max个任务的配置信息
     */
    @Override
    public List<Map<String, String>> taskConfigs( int max ) {
        List<Map<String, String>> configs = new ArrayList<>();
        // 仅仅支持单个任务
        Map<String, String> config = new HashMap<>();
        config.put( FILE_CONFIG, filename );
        config.put( TOPIC_CONFIG, topic );
        configs.add( config );
        return configs;
    }
 
    /**
     * 返回此连接器的版本
     */
    @Override
    public String version() {
        return null;
    }
 
    /**
     * 定义连接器的配置
     */
    @Override
    public ConfigDef config() {
        return null;
    }
 
}

FileStreamSourceTask.java
Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package cc.gmem.study.kafka;
 
import org.apache.commons.io.IOUtils;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
 
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
 
public class FileStreamSourceTask extends SourceTask {
 
    private String filename;
 
    private InputStream stream;
 
    private String topic;
 
    /**
     * 启动任务,在此处理任务配置,打开需要的资源
     * <p>
     * 实际应用时,还要考虑从Offset恢复的情况
     *
     * @param props 任务配置
     */
    @Override
    public void start( Map<String, String> props ) {
        filename = props.get( FileStreamSourceConnector.FILE_CONFIG );
        stream = openStream( filename );
        topic = props.get( FileStreamSourceConnector.TOPIC_CONFIG );
    }
 
    /**
     * Task被分配给专用的线程,此线程可能永久的阻塞,需要从Worker的其它线程调用stop方法才能停止
     */
    @Override
    public synchronized void stop() {
        IOUtils.closeQuietly( stream );
    }
 
    /**
     * 从外部系统拉取数据并构建源记录,如果没有可用的记录此方法应该阻塞
     *
     * @return 源记录的集合
     * @throws InterruptedException 当阻塞时被中断
     */
    @Override
    public List<SourceRecord> poll() throws InterruptedException {
        try {
            List<SourceRecord> records = new ArrayList<>();
            while ( records.isEmpty() ) {
                LineAndOffset line = readToNextLine( stream );
                if ( line != null ) {
                    records.add(
                            // 源记录,主要包含四个元素
                            new SourceRecord(
                                    // 源分区(相当于流的标识符),这里只有一个源分区,也就是唯一的输入文件
                                    Collections.singletonMap( "filename", filename ),
                                    // 源偏移量,这里即文件中的字节偏移量
                                    Collections.singletonMap( "position", streamOffset ),
                                    // 输出主题
                                    topic,
                                    // 输出值及其Schema。这里的Schema提示输出永远是一个字符串
                                    Schema.STRING_SCHEMA, line
                            )
                    );
                } else {
                    Thread.sleep( 1 );
                }
            }
            return records;
        } catch ( IOException e ) {
        }
        return null;
    }
 
    @Override
    public String version() {
        return null;
    }
 
    private InputStream openStream( String filename ) {
        try {
            return new FileInputStream( filename );
        } catch ( FileNotFoundException e ) {
            throw new RuntimeException( e.getCause() );
        }
    }
}

注意上面Task所释放出的源记录,每个记录都关联了流标识符(文件名)和偏移量信息。Connect框架会利用这些信息,进行周期性的偏移量提交。这样,一旦Task出现失败,可以基于偏移量进行恢复,尽可能减少重复处理、重复记录。

偏移量提交完全由框架负责,Kafka Connect暴露了接口供Task读取偏移量:

FileStreamSourceTask.java
Java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public void initialize( SourceTaskContext context ) {
    super.initialize( context );
    stream = new FileInputStream( filename );
    // 获取偏移量阅读器
    OffsetStorageReader offsetStorageReader = context.offsetStorageReader();
    // 读取指定分区的偏移量,偏移量是一个字典而不是简单的数值
    Map<String, Object> offset = offsetStorageReader.offset( Collections.singletonMap( "filename", filename ) );
    if ( offset != null ) {
        Long lastRecordedOffset = (Long) offset.get( "position" );
        if ( lastRecordedOffset != null )
            seekToOffset( stream, lastRecordedOffset );
    }
}
Sink连接器

Sink连接器的接口和Source类似。但是SinkTask和SourceTask则不同,因为后者使用的是拉模式而前者是推模式:

Java
1
2
3
4
5
6
7
8
9
10
11
12
13
public abstract class SinkTask implements Task {
    public void initialize(SinkTaskContext context) {
        this.context = context;
    }
    // 逻辑集中在此方法中,需要将SinkRecord进行转换,最终输出到外部系统
    // 此方法不一定需要保证数据完全写入到外部系统中,可以提前返回
    // SinkRecord包含的信息基本和SourceRecord一致
    public abstract void put(Collection<SinkRecord> records);
    // 在偏移量提交阶段调用下面的方法,此方法应该刷出未写入到外部系统的数据,并阻塞等待操作完成
    // currentOffsets可以用于实现精确一次性语义(和写入到外部系统的数据原子的一同写入)
    public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
    }
}
动态流

Kafka Connect的主要价值是用于定义大批量的数据拷贝Job,例如,Job通常用来拷贝整个数据库而非单张表。这种设计意味着连接器的输入/输出流(表、文件等等)会动态的变化。

在实现SourceConnector时,你应当考虑源系统中的变化,例如表的增加或删除。当检测到变化后应当调用ConnectorContext来通知框架:

Java
1
2
if (inputsChanged())
    this.context.requestTaskReconfiguration();

在上述调用之后,框架会立即请求新的配置,并更新Tasks。在重新配置Task之前,框架允许Task优雅的完成提交操作。

某些情况下,和输入流更新有关的逻辑仅仅存在于Connector中。另外一些情况下,Task可能受到影响,需要适当的异常处理。例如,表的删除可能导致Task在拉取数据时出现错误,并且此错误可能在Connector检测到输入流的变化之前发生。 

SinkConnector通常仅需要处理新增的Kafka流,Kafka Connect使用正则式订阅来监控SinkConnector的输入主题集的变化,并通知SinkConnector。SinkTask可能需要接收新的流,并在外部系统创建资源。如果多个SinkTask接收同一流则可能出现尝试重复创建同一资源(例如同一张表)的情况,需要注意。

验证配置

Kafka Connect支持在提交连接器之前,对提供的配置信息进行验证。要使用此功能,你需要实现config()方法:

Java
1
2
3
4
5
6
7
private static final ConfigDef CONFIG_DEF = new ConfigDef()
    .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.")
    .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to");
public ConfigDef config() {
    return CONFIG_DEF;
}

ConfigDef.define方法的重载版本允许你提供一个Validator,用于针对单配置项自定义验证规则。

此外,覆盖Connector.validate()方法可以提供配置验证逻辑。

使用Schema

要处理符合结构定义的数据,你需要使用Kafka Connect的Data API。大部分的结构化记录需要和Schema、Struct这两个类交互。这两个类都用于定义Schema:

Java
1
2
3
4
5
6
7
8
9
Schema schema = SchemaBuilder.struct().name(NAME)
    .field("name", Schema.STRING_SCHEMA)
    .field("age", Schema.INT_SCHEMA)
    .field("admin", new SchemaBuilder.boolean().defaultValue(false).build())
    .build();
Struct struct = new Struct(schema)
    .put("name", "Barbara Liskov")
    .put("age", 75);

在开发SourceConnector时,你需要考虑何时生成Schema。如果Schema是静态的,可以静态块中生成。

但是很多连接器都需要面对动态Schema。例如数据库连接器,这种连接器可能需要处理多张表,就算处理单张表,表的结构也可能随着时间改变。你必须检测这些改变并且做出适当的处理。

配置

Kafka使用properties风格的配置文件。在通过命令行启动Kafka时,你可以指定配置文件路径,或者使用配置项的命令行参数版本。

配置项列表
代理配置

最基本的配置项包括:broker.id、log.dirs、zookeeper.connect。

配置项 说明
重要配置
zookeeper.connect

Kafka使用ZooKeeper存储集群的元数据信息,此配置项提供ZooKeeper的链接字符串:

Shell
1
2
# 支持ZooKeeper的chroot语法,在尾部添加 /kafka表示所有数据存放在此znode下
zookeeper.connect=zk:2181,zk-2:2181,zk-3:2181/kafka
advertised.listeners 发布到ZooKeeper,供客户端使用的监听器列表。如果不指定则同listeners配置项
auto.create.topics.enable boolean=true。是否按需自动在服务器上创建主题
auto.leader.rebalance.enable

boolean=true。是否启用Leader负载均衡。一个后台线程负责定期检查并触发负载均衡

主题的每个分区都具有一个Leader节点,此节点负责全部读写

leader.imbalance.check.interval.seconds

long=300。再平衡检查的间隔

leader.imbalance.per.broker.percentage

int=10。每个代理允许的Leader不平衡最大比率(百分比),超过此阈值会导致再平衡

background.threads int=10。用于各种后台任务的线程池的大小
broker.id

当前服务器的代理标识。如果不设置则自动生成一个唯一性的标识符

自动生成的ID从reserved.broker.max.id + 1开始,以防止和手工指定的ID冲突

compression.type

string=producer。指定主题的最终压缩方式,可选值有:

  1. 标准的编码器名称,例如gzip、snappy、lz4
  2. uncompressed,不压缩
  3. producer,保留生产者设置的压缩方式
delete.topic.enable boolean=true。是否允许删除主题。如果设置为false,通过管理工具执行的删除操作没有任何效果
listeners

在其上监听的URI列表,逗号分隔

如果URI不使用安全协议,则listener.security.protocol.map必须配置

如果URI的主机部分设置为0.0.0.0则监听所有网络接口;置空则监听默认网络接口

示例:

PLAINTEXT://myhost:9092
SSL://:9091
CLIENT://0.0.0.0:9092
REPLICATION://localhost:9093

log.dir 日志数据存储的目录,作为log.dirs的补充
log.dirs 日志数据存储的目录,如果不指定则使用log.dir
log.flush.interval.messages

long=9223372036854775807。在消息被刷出磁盘之前,日志分区上累积的消息的数量

Kafka不建议设置此参数。为了防止数据丢失应该使用集群而不是定期调用fsync。操作系统底层会自动调度,高效的决定何时刷出

log.flush.interval.ms

在任何主题中任何消息刷出磁盘之前,在内存中最多驻留的时间

如果不指定,使用 log.flush.scheduler.interval.ms

log.flush.offset.checkpoint.interval.ms int=60000。每隔多久更新最后一次刷出(Last flush)的持久化记录,此记录作为日志恢复点
log.flush.scheduler.interval.ms long=9223372036854775807。日志刷出器每隔多少ms检查是否存在需要刷出到磁盘的日志
log.flush.start.offset.checkpoint.interval.ms int=60000。每隔多久更新日志起始偏移量(Start offset)的持久化记录
log.retention.bytes 日志的最大保留尺寸
log.retention.hours
log.retention.minutes
log.retention.ms
日志的最大保留时间。优先级逐个升高
log.roll.hours
log.roll.ms
最多经过多久,必须滚出(Roll out,产生)新的日志段(Segment)
log.roll.jitter.hours
log.roll.jitter.ms
从上面那个参数减去的最大抖动时间
log.segment.bytes 日志段的大小
log.segment.delete.delay.ms long=60000。在从文件系统删除一个日志文件(段)之前,最多等待的时间
message.max.bytes

int=1000012

Kafka允许的最大消息批量的大小(字节)。如果增加此参数并且消费者的版本在0.10.2之前,消费者的抓取尺寸(Fetch Size)也需要相应的增加

在最近版本的消息格式中,出于性能的考虑,记录总是被组装到批次(Batch)中

在以前版本的消息格式中,未压缩记录是不组装批次的。因此该参数针对单条记录

主题可以覆盖此配置

min.insync.replicas

int=1

当生产者将acks设置为all或-1时,该配置的意义是指定最少的已经同步写操作的节点数量。也就是说,只有足够多的节点已经确认(Acknowledge)了写操作,生产者才认为写操作成功

如果无法满足此条件,则生产者抛出NotEnoughReplicas或NotEnoughReplicasAfterAppend

配合使用acks和此选项可以增强持久性保证

num.io.threads int=8。服务器使用的I/O线程数量,这些线程负责磁盘IO等操作
num.network.threads int=3。服务器使用的网络线程的数量,这些线程负责接收请求、发送响应
num.recovery.threads.per.data.dir int=1。每个数据目录用于:1、在启动时进行日志恢复;2、在关闭时进行日志刷出的线程数量
num.replica.fetchers int=1。从源代理抓取数据以复制消息的线程数量。增加此配置可能增强从代理的并行度
offset.metadata.max.bytes int=4096。关联到偏移量提交(Offset Commit)的元数据条目的最大尺寸
offsets.commit.required.acks short=-1。通常不需要修改,在提交可以被接受之前,需要的Acks数量
offsets.commit.timeout.ms int=5000。偏移量提交(Offset Commit)会被推迟,直到所有Replica接收到提交,或者到达此配置指定的延迟
offsets.load.buffer.size int=5242880。当加载偏移量到缓存中时,从偏移量段(Offsets Segments)读取数据的批次大小
offsets.retention.check.interval.ms long=600000。检查偏移量是否过期(Stale)的间隔
offsets.retention.minutes int=1440。超过此驻留时间的偏移量被丢弃
offsets.topic.compression.codec int=0。偏移量提交主题(Offsets Commit Topic )的压缩算法。使用压缩可以实现“原子”提交
offsets.topic.num.partitions int=50。偏移量提交主题包含的分区数量。部署后不应再修改
offsets.topic.replication.factor short=3。偏移量提交主题的复制因子。在集群成员大小足够前,内部的主题创建会失败
offsets.topic.segment.bytes int=104857600。偏移量提交主题的段大小
queued.max.requests int=500。在阻塞网络线程(不再接受新请求)之前,排队的请求数量
replica.fetch.min.bytes int=1。Follower Replica期望每次抓取请求能接收的响应的最小字节数。如果字节数不够,Follower会等待直到replica.fetch.wait.max.ms
replica.fetch.wait.max.ms int=500。Follower Replica发送抓取请求后等待响应的最大时间。此配置应该总是小于replica.lag.time.max.ms,以防止低吞吐量主题的ISR频繁的收缩 
replica.high.watermark.checkpoint.interval.ms long=5000。高水位保存到磁盘的频率 
replica.lag.time.max.ms long=10000。Follower Replica在此时间内没有发送抓取请求,或者离Leader Replica的终点偏移量(End Offset)超过此时间,则Leader Replica从ISR列表中将Follower移除
replica.socket.receive.buffer.bytes int=65536。网络请求的套接字接收缓冲大小
replica.socket.timeout.ms int=30000。网络套接字的超时时间,应当大于replica.fetch.wait.max.ms
request.timeout.ms int=30000。客户端等待响应的最大时间,超过此时间客户端可能重发请求或者失败(重发次数超过限制)
socket.receive.buffer.bytes int=102400。服务器端套接字的SO_RCVBUF配置。如果设置为-1则使用OS默认值
socket.request.max.bytes int=104857600。一个套接字请求包含的最大字节数
socket.send.buffer.bytes int=102400。服务器端套接字的SO_SNDBUF配置。如果设置为-1则使用OS默认值
transaction.max.timeout.ms int=900000。事务的最大超时,如果客户端请求的事务时间超过此配置则代理在InitProducerIdRequest调用中返回一个错误,以阻止客户端使用太大的超时
transaction.state.log.load.buffer.size int=5242880。加载生产者ID、事务到缓存中时,读取事务状态日志的批次大小
transaction.state.log.min.isr int=2。为事务状态主题覆盖min.insync.replicas 
transaction.state.log.num.partitions int=50。事务状态主题的分区数量。部署后不应该修改
transaction.state.log.replication.factor short=3。事务状态日志的复制因子 
transaction.state.log.segment.bytes int=104857600。事务状态日志的段大小,应该设置的相对较小,以保证较快的日志压缩和缓存加载 
transactional.id.expiration.ms int=604800000。事务协调器等待此时间后,主动(不需要从生产者接收任何事务状态更新)将生产者的事务ID过期 
unclean.leader.election.enable boolean=false。指示是否允许非ISR(同步)的Replica可以被选举为Leader,设置为true可能导致数据丢失
zookeeper.connection.timeout.ms 创建到ZooKeeper的连接的超时时间,如果不指定使用zookeeper.session.timeout.ms  
zookeeper.session.timeout.ms int=6000。ZooKeeper会话过期时间 
zookeeper.set.acl boolean=false。是否设置ACL
broker.rack 代理所在的机柜,用于Rack-aware的复制分配,用于跨数据中心复制
主题配置

主题可以从服务器默认值继承配置,并且可以覆盖。要指定主题专有配置,可以:

Shell
1
2
3
4
kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1 --replication-factor
   # 使用下面的选项来覆盖配置
   --config max.message.bytes=64000
   --config flush.messages=1

在主题创建之后,你仍然可以修改配置:

Shell
1
2
3
4
5
kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic
    # 修改配置
    --alter --add-config max.message.bytes=128000
    # 移除配置
    --alter --delete-config max.message.bytes

执行下面的命令,可以查看一个主题的配置覆盖情况:

Shell
1
kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --describe

重要配置项如下:

配置项 说明
cleanup.policy

list=delete。对应服务器默认配置:log.cleanup.policy

老旧消息的驻留策略,可选值delete/compact

compression.type

string=producer。对应服务器默认配置项:compression.type

主题的最终压缩方式

delete.retention.ms

long=86400000。对应服务器默认配置项:log.cleaner.delete.retention.ms

对于log compacted主题,删除墓碑标记的驻留时间。如果消费者从Offset 0 开始读取,它必须在此配置指定的时间内完成,以确保获得有效的快照

file.delete.delay.ms

long=60000。对应服务器默认配置项:log.segment.delete.delay.ms

从文件系统中删除文件之前,等待的时间

flush.messages

long=9223372036854775807。对应服务器默认配置项:log.flush.interval.messages

指定一个消息计数,累计超过此数量的消息强制fsync日志到文件系统

flush.ms

long=9223372036854775807。对应服务器默认配置项:log.flush.interval.ms

指定一个间隔,超过此时间强制fsync日志到文件系统

follower.replication.throttled.replicas

list。对应服务器默认配置项:follower.replication.throttled.replicas

Replica的列表,对于这些Replica日志复制在Follower段限速

取值*表示针对所有Replica限速,可以指定partitionId:brokerId、brokerId、partitionId

leader.replication.throttled.replicas

list。对应服务器默认配置项:leader.replication.throttled.replicas

类似上一条,只是节流在Leader端执行

index.interval.bytes

int=4096。对应服务器默认配置项:log.index.interval.bytes

通常不需要修改此选项。控制Kafka向Offset索引添加条目的频率。更高的频率允许消费者跳转到更精确的位置,但是会导致索引占用空间更大

max.message.bytes int=1000012。对应服务器默认配置项:message.max.bytes
message.format.version

string=1.0-IV0。对应服务器默认配置项:log.message.format.version

代理为该主题添加日志时,使用的消息格式版本

message.timestamp.difference.max.ms

long=9223372036854775807。对应服务器默认配置项:log.message.timestamp.difference.max.ms

代理接收到消息时的实际时间和消息中指定的时间戳的最大差距。如果超过此值并且:

  1. message.timestamp.type=CreateTime,消息被拒绝
  2. message.timestamp.type=LogAppendTime,该配置被忽略
message.timestamp.type

string=CreateTime。对应服务器默认配置项:log.message.timestamp.type

消息时间戳存储消息创建时间(CreateTime)还是附加到日志的时间(LogAppendTime)

 

min.cleanable.dirty.ratio

double=0.5。对应服务器默认配置项:log.cleaner.min.cleanable.ratio

当启用日志整理(log compaction)时,该选项控制日志整理器尝试清理日志的频率。默认值是,当50%的消息已经被整理(也就是重复消息不超过50%),则不触发新的整理。取值越大,则整理频率越低也越高效,同时导致日志浪费更多空间

min.compaction.lag.ms

long=0。对应服务器默认配置项:log.cleaner.min.compaction.lag.ms

当启用日志真理时,该选项控制一个消息在被整理之前至少需要等待的时间

min.insync.replicas

int=0。对应服务器默认配置项:min.insync.replicas

preallocate

boolean=false。对应服务器默认配置项:log.preallocate

当创建一个新的日志段时,是否在磁盘上预分配空间

retention.bytes long=-1。对应服务器默认配置项:log.retention.bytes
retention.ms long=604800000。对应服务器默认配置项:log.retention.ms
segment.bytes int=1073741824。对应服务器默认配置项:log.segment.bytes
segment.index.bytes

int=10485760。对应服务器默认配置项:log.index.size.max.bytes

控制索引的大小,索引用于将偏移量映射到文件位置。索引文件被预先创建,仅当日志滚动(创建新段)之后才收缩其大小

segment.jitter.ms long=0。对应服务器默认配置项:log.roll.jitter.ms
segment.ms long=604800000。对应服务器默认配置项:log.roll.ms
unclean.leader.election.enable boolean=false。对应服务器默认配置项:unclean.leader.election.enable
生产者配置

下表列出Java生产者可以使用的配置项:

配置项 说明
bootstrap.servers

list=host:port,host:port...

指定初始化到Kafka集群连接时使用的服务器地址端口列表。此列表仅仅用于初始化时发现完整集群,不需要指定集群中所有节点

key.serializer 指定一个实现了org.apache.kafka.common.serialization.Serializer的类,此类用于键的串行化
value.serializer 指定一个实现了org.apache.kafka.common.serialization.Serializer的类,此类用于值的串行化
acks

string=1。在生产者认定操作已经完成之前,Leader必须接收到Ack的数量:

  1. 取值0,不等待任何Ack,记录被加入到套接字缓冲并认为已经发送成功。服务器不一定能收到消息,retries配置也不会生效
  2. 取值1,Leader将消息写到本地日志之后,立即答复生产者。如果消息来得及复制到Follower并且Leader宕掉,消息可能丢失
  3. 取值all或-1,等待所有in-saync的Follower都Ack。具有最强的可靠性
buffer.memory

long=33554432。生产者总计可用的缓冲区,此缓冲区用于暂存等待发送到服务器的那些记录

如果消息生产过快超过消息递送速度,而导致缓冲区爆满,生产者会阻塞max.block.ms,此后仍然没有缓解,生产者抛出异常

此参数大概等于生产者总计的内存消耗。生产者还需要额外的小部分内存用于压缩消息、维护in-flight请求

compression.type string=none。生产者产生的任何数据的压缩方式。默认不压缩,可选gzip、snappy、lz4
retries

int=0。设置大于0的值,则生产者可以在失败后重试发送消息,以避免暂时性网络错误的影响

如果允许重试,但是没有把max.in.flight.requests.per.connection设置为1,则记录的顺序可能改变。例如,两个批次同时发到同一分区,批次1、批次2连续发送,然后1失败2成功,后续重发1

batch.size

int=16384。批量发送的最大字节数,设置为0则禁用批量发送。当多个记录将被发送给同一分区时,生产者会尝试批量发送以减少请求次数。批量发送可以增强服务器、客户端的性能

生产者不会尝试发送大于此配置的记录批次

发送给服务器的请求可以包含多个批次,每个批次针对一个分区

此配置项设置的过小,可能影响吞吐能力;设置的过大,会导致内存浪费,因为这块内存是预先分配的不管是否实际需要

client.id 发送请求时,发送给服务器的一个字符串。用途是跟踪请求的源,此ID可以包含在服务器端日志中
connections.max.idle.ms long=540000。多久以后关闭空闲的连接
linger.ms

long=0。生产者可以将请求发送期间收集的记录组成批次,在下一个请求中一起发送。正常情况下,这种批量发送行为仅在记录产生速度大于发送速度时存在。如果你希望在缓和的负载下也使用批量发送以减小请求次数,可以将该配置设置的大于0。这样,生产者在产生一个消息后,会等待linger.ms,看看有没有下一个消息到来

一旦针对某个分区的记录达到batch.size字节批量发送就会立即发送,而不需要等待linger.ms。linger.ms限定了批量发送引入的最大延迟

max.block.ms long=60000。配置KafkaProducer.send()和KafkaProducer.partitionsFor()的最大阻塞时间,当生产者发送缓冲爆满或者元数据不可用时,这两个方法会被阻塞
max.request.size

int=1048576。单个请求的最大字节数,需要注意服务器端有类似的限制,且取值可能和客户端不一样

该配置对批量发送的大小作出额外限制

partitioner.class

class=org.apache.kafka.clients.producer.internals.DefaultPartitioner

实现org.apache.kafka.clients.producer.Partitioner接口的类

receive.buffer.bytes int=32768。套接字SO_RCVBUF参数,-1则使用OS默认值
request.timeout.ms int=30000。客户端等待响应到达的最大时间。如果超时客户端可能重试或失败
security.protocol string=PLAINTEXT。用于和服务器通信的协议,可选PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
send.buffer.bytes int=131072。套接字SO_SNDBUF,-1则使用OS默认值
消费者配置

从0.9.0.0版本开始,Kafka引入新的Java消费者实现,代替原先基于Scala的简单、高层实现。下表的配置可以用于新、老消费者实现:

配置项 说明
bootstrap.servers 参考生产者配置
key.deserialize
value.deserializer
fetch.min.bytes int=1。对于一个抓取请求,服务器应该返回的最小数据量。如果数据量不足,在应答之前请求会等待更多数据。设置为较大的数,则服务器会等待更多的数据再应答,这样可以增加吞吐量
group.id

唯一性的字符串,用于识别该消费者所属的组。当消费者使用:

  1. subscribe(topic)提供的组管理功能
  2. 或者基于Kafka的偏移量管理策略

时,需要该配置

heartbeat.interval.ms

int=3000。使用Kafka的组管理功能时的心跳间隔。心跳用于确认消费者还活着,因为消费者加入、离开组时需要进行(谁消费哪个分区的)再平衡

必须小于session.timeout.ms,通常不大于session.timeout.ms的1/3。如果需要更敏捷的再平衡可以设置的更低

max.partition.fetch.bytes

int=1048576。服务针对每个分区最多返回的数据量。消费者按批次抓取记录,如果针对第一个非空分区的第一个记录批次超过此配置的限制,批次仍然被返回以便消费者能进一步处理

服务器允许的记录批次的最大尺寸,受到message.max.bytes(代理级/主题级)约束

session.timeout.ms

int=10000。使用Kafka组管理功能时,判断一个消费者宕机的延迟。消费者定期发送心跳,这样代理知道它还活着,如果session.timeout.ms后代理没有收到新的心跳,则认为消费者宕机,代理会把消费者从组中移除并进行再平衡

必须在代理配置group.min.session.timeout.ms和group.max.session.timeout.ms之间取值

connections.max.idle.ms long=540000。关闭空闲连接之前的等待时间
enable.auto.commit boolean=true。如果设置为true则消费者的偏移量会在后台定期的提交
exclude.internal.topics boolean=true。Kafka内部主题(例如偏移量)是否暴露给消费者
fetch.max.bytes

int=52428800。对于一个抓取请求服务器最多返回的数据量。消费者按批次抓取记录,如果针对第一个非空分区的第一个记录批次就超过此配置的限制,批次仍然被返回以便消费者能进一步处理。因此,此该配置不是一个绝对性的限制

服务器允许的记录批次的最大尺寸,受到message.max.bytes(代理级/主题级)约束

isolation.level

string=read_uncommitted。可选值read_committed

决定如何读取事务性写入的消息,如果设置为:

  1. read_committed,则consumer.poll()仅仅返回已经提交的事务性消息
  2. read_uncommitted,则consumer.poll()返回所有消息,甚至是已经被Abort的事务性消息

对于非事务性消息,该配置没有影响

消息总是按照偏移量顺序返回给消费者,因此read_committed模式下consumer.poll()只能返回最晚到LSO(Last Stable Offset,最新稳定偏移)的消息。LSO即第一个(偏移最靠前)打开事务的偏移量减1。特别的,任何位于进行中事务消息之后的消息都不会返回给消费者。直到这些事务完成。作为结果,read_committed可能导致消费者无法读取到高水位。另外,seekToEnd()在read_committed模式下也仅读到LSO

 

max.poll.interval.ms int=300000。使用消费者组管理时,调用poll()的最大间隔。如果消费者在此时间内没有进行poll()以抓取更多数据,则服务器认为消费者已经失败,并执行再平衡(将分区分配给其它消费者)
max.poll.records int=500。单次poll()调用返回的记录的最大数量
partition.assignment.strategy

list=org.apache.kafka.clients.consumer.RangeAssignor

使用消费者组管理时,客户端所使用的分区分配策略类。客户端使用此类决定如何在消费者实例之间分配分区

receive.buffer.bytes int=65536。套接字参数SO_RCVBUF
send.buffer.bytes int=131072。套接字参数SO_SNDBUF
request.timeout.ms int=305000。客户端等待请求的响应到达的最大时间,超时后失败或者重发
security.protocol string=PLAINTEXT。与代理通信使用的协议,可选PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
Connect配置
配置项 说明
bootstrap.servers list=localhost:9092。初始化到Kafka集群连接的服务器列表
config.storage.topic 存储连接器配置的Kafka主题的名称
group.id 当前Worker所属的Kafka Connect集群组
key.converter
value.converter

配置Kafka Connect格式和串行化的存储到Kafka主题的格式之间的转换器类

两个配置项分别指定消息中键、值的转换器

offset.storage.topic 存储连接器偏移量的Kafka主题的名称
status.storage.topic 存储连接器、任务状态的主题的名称
heartbeat.interval.ms

int=3000。当使用Kafka的组管理功能时,发送心跳到组协调器(Group Coordinator)的周期

心跳用于确保Worker还活着,并且在有Worker加入/离开组时进行再平衡

必须设置的比session.timeout.ms,典型的小于其1/3

rebalance.timeout.ms int=60000。再平衡开始后,任务Task需要在此配置的时限内刷出数据、提交偏移量。超时后Worker将从组中移除,从而导致偏移量提交失败
session.timeout.ms int=10000。用于检测Worker失败的超时。通常情况下Worker周期性的发送心跳给代理,代理因而确认它还活着,如果超过此配置的时间代理没有收到心跳,则认为Worker失败
Streams配置
配置项 说明
application.id 流处理应用程序的标识符,必须在Kafka集群内是唯一的。此标识符将用于:
  1. client-id前缀
  2. 组成员关系管理时的group-id
  3. changelog主题名的前缀
bootstrap.servers 初始化到Kafka集群连接的服务器列表
replication.factor 流处理应用程序创建的changelog主题、repartition主题的复制因子
state.dir 存储状态的目录位置
AdminClient配置
配置项 说明
bootstrap.servers 初始化到Kafka集群连接的服务器列表
JVM配置

推荐使用Java8的最新版本,例如LinkedIn就使用Java8 + G1回收器,JVM配置如下:

Shell
1
2
3
-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M
-XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80

LinkedIn最繁忙的集群,由60个代理、5万个分区(复制因子2),输入消息数量80万每秒,输入流量300MB/s、输出流量1GB+ /s。 基于上述JVM配置,90%的GC停顿时间小于21ms,每秒Young GC次数小于1次。

硬件和OS配置

LinkedIn使用24GB内存的4核Xeon机器。你需要为读写保留足够的缓冲内存空间。假设需要能够缓冲30s,则需要的大概内存是写吞吐量 * 30

LinkedIn使用8个7200RPM的SATA硬盘阵列,磁盘的吞吐能力很重要,因为这经常是瓶颈的所在。可以配备更多数量的磁盘以增强吞吐能力。如果配置更加频繁的Flush,则更高转速的磁盘会让你受益,当然价格也更贵。

操作系统

Kafka可以很好的运行在所有类UNIX系统上,相比之下Windows平台下则有一些问题。

通常不需要进行很多OS级别的调优,主要考虑一下两个参数:

内核参数 说明
文件描述符数量限制

Kafka使用到文件描述符的地方包括:日志段(每个段对应一个文件)、打开的网络连接

日志段需要的文件描述符总数的计算公式:

(number_of_partitions)*(partition_size/segment_size)

最大套接字缓冲 对于跨数据中心的复制,加大此缓冲可能提高性能
磁盘和文件系统

推荐使用多块磁盘,以增强吞吐能力。不建议和应用程序日志、其它OS文件系统活动共享磁盘,因为这会影响Kafka的处理延迟。

将多块磁盘RAID成单个卷,或者分别挂载为不同的目录,都是可以的。由于Kafka天然的容错性,RAID提供的冗余能力可能不需要,其提高单个逻辑磁盘的能力更有意义。

如果为Kafka配置多个数据目录,则分区会循环分配到这些目录,每个分区仅仅会存放在单个目录中。如果数据不是分区均衡的,则磁盘负载可能不均衡。

RAID在底层进行了负载均衡,因此可能(不一定)避免上述问题。RAID的劣势是它通常对写吞吐量有较大不利影响,并且减少了磁盘可用容量。

RAID另一个潜在的优势是,提高磁盘硬件容错。然而,重建RAID阵列非常IO敏感,几乎让服务器不可用,也就是说RAID不能提升可用性。

刷出管理

Kafka没有消息的内存缓冲池,它总是立即把数据写入到文件系统。由于OS页缓存(Pagecache)的存在,写入操作可能并没有持久化到硬盘中,具体写入时机,要么是应用程序显示调用fsync,要么是OS自行调度。在2.6.32以后的内核中,一系列pdflush线程负责在后台fsync。

你可以配置一个刷出策略,以便在一定时间后、累计一定消息数量后,fsync数据到磁盘。

需要注意:意外宕机不会导致数据丢失,因为通常主题的复制因子都大于1,缺少的数据可以从其它Replica上同步。因此Kafka的默认刷出策略是,应用程序不会触发fsync。fsync只会由OS或者Kafka后台线程完成。

启用应用程序级别的fsync,其缺点是:

  1. 比较低效的磁盘使用模式,因为显式fsync让OS不能进行写重排序以优化性能
  2. 大部分Linux文件系统中fsync是阻塞的操作
文件系统选择

Kafka没有对文件系统有硬性依赖,但是XFS比起EXT4更加适合Kafka的负载。

架构
设计目标
  1. 必须具有很高的吞吐能力,以支持高容量的事件系统,例如实时日志聚合
  2. 必须具有优雅的后备排队(Backlog)机制,以便处理来自离线系统的、周期性的负载
  3. 为了实现传统的消息递送语义,必须具有低延迟的特点
  4. 必须支持分区和分布式,保证弹性和扩容能力
  5. 必须具有容错能力,不会因为硬件故障导致数据丢失或者重复

以上目标,导致Kafka更像数据库日志,而非消息队列。

持久化设计
磁盘也可以很快

Kafka非常依赖于文件系统来存储、缓存消息。很多人觉得磁盘非常慢,无法提供足够的性能。实际上,磁盘可能运行的比想象的更快或更慢,这取决于你如何使用它。良好设计的磁盘数据结构可以让磁盘IO和网络IO一样快,某些特定情况下,磁盘顺序访问可以比内存随机访问更快。

关于磁盘性能的一个重要事实是,最近十年来磁盘的吞吐能力和磁盘寻道的延迟越来越负相关。六块7200RPM磁盘构成的RAID5,其顺序写可以高达600MB/s,但是随机写仅仅100KB/s,这是一个巨大的反差(6000倍)。在所有使用模式下,线性读写的速度都是最可预测的,并且被操作系统很大程度的优化。现代操作系统都提供了预读(Read-ahead)和后写(Write-behind)技术,支持大块的预抓取数据、分组多个小的逻辑写操作为单个物理写操作。

为了补偿随机磁盘IO的低性能,现代操作系统都激进的利用空闲内存,可能所有的空闲内存都被用作磁盘缓存。所有的磁盘读写操作都经由这个缓存层进行处理。除非使用Direct I/O,这种缓存层无法被跳过,因此即使应用程序在内部维护缓存机制,这些缓存很可能在OS的页缓存中被再次缓存。

对于JVM来说,需要注意以下事实:

  1. 对象形式的数据,其空间占用很长大,常常比串行化格式大两倍。这意味着从存储效率方面来说,内存缓存是比较浪费的
  2. 随着对空间的占用,垃圾回收器的行为越发复杂和缓慢

因此,使用文件系统进行缓存,依赖于OS的页缓存机制,要比自行在内存中维护缓存更加有效 —— 避免了潜在的重复缓存、避免了JVM中松散的空间结构。在32G内存的机器上,可以有效的缓存28-30G的数据,却可以避免GC带来的性能问题。

使用文件系统缓存的另外一个优势是重启后不需要预热,内存缓存在重启后是空白的,需要重新加载。

使用文件系统缓存,还免去了手工维护内存、磁盘数据的一致性。

当你的磁盘访问风格是顺序读,则操作系统的预读功能将大大的提高读效率。因为预读总是能命中你所需的数据。

Kafka的日志,不在内存中缓存,直接写入到磁盘日志中。

常量时间复杂度

消息系统中的持久化数据结构常常是每个消费者一个队列,附带关联的BTree结构,此数据用于随机访问messages的元数据。BTree是消息系统中最广泛使用的数据结构,用于支持大量事务性、非事务性语义。

但是,BTree结构具有较高的成本。尽管BTree结构本身是对数复杂度(可以近似看做常量复杂度),但是应用到磁盘操作时并非如此。每次磁盘寻道操作大概需要10ms级别,每个磁盘同时仅仅支持单个寻道操作,因此并发度受限,少量的磁盘寻道请求就会导致非常高的Overhead。

存储系统需要混合非常快的页面缓存读写操作,以及非常慢的磁盘寻道操作,因此站在观察者的角度,BTree的性能随着数据量的增加而降低。数据量翻倍后性能降低的超过两倍。

如果以日志系统的风格来设计消息队列,也就是说仅仅支持Append方式的写操作,则所有读写操作都能实现常数的时间复杂度,同时读写操作不会相互阻塞。这种存储风格的一个明显优势是,性能和数据量完全解耦。服务器可以使用廉价、低转速磁盘的全部空间。

性能和数据量解耦后,可以实现在典型消息系统中难以看到的特性。例如,在Kafka中,消息不会在消费后立即被删除(以压缩空间,因为数据量和性能负相关),而是被保留一周甚至更久。这种特性让消费者的行为非常灵活,可以Replay历史数据,或者跳转到未来进行消费。

高效性
IO优化

Kafka的一个主要目标是处理Web活动数据,这种数据的量是非常大的,一次页面访问可能产生数十条数据。为了保证数据的生产和消费都能流畅进行,Kafka必须非常高效。

关于磁盘的效率问题在前面讨论过了。消除磁盘性能问题后,系统中最常见的性能问题通常由于:

  1. 过多的微小IO操作:C-S之间的IO和服务器内部的持久化都可能有这种问题。Kafka引入了消息集的概念,批量的发送消息,避免过多的网络请求带来Roundtrip开销。相应的,服务器把消息集整个的Append到日志,消费者也采取批量抓取的方式消费。Kafka的这种设计,让系统性能有了数量级的提升,因为批处理产生了更大的网络包、更大的顺序磁盘IO、更大的连续内存块
  2. 过多的字节拷贝:低消息生产速率下不是问题,反之则影响重大。为了避免消息拷贝,Kafka设计了统一的二进制消息格式供生产者、代理、消费者使用,这样数据块就可以直接传输,不需要修改。代理维护的分区日志,本身仅仅是目录中的一系列文件,文件中的消息同样使用标准的二进制格式

通常情况下,将文件传输到套接字中的步骤是:

  1. OS在内核空间,读取文件到页面缓存
  2. 应用从内核空间读取页面缓存,拷贝到用户空间
  3. 应用将用户空间中的文件回写到内核空间(套接字缓冲)
  4. OS将套接字缓冲拷贝到网络接口(NIC)缓冲。然后网卡负责发送

很明显上述步骤较为低效,包含了4次拷贝2次系统调用。现代UNIX系统(Linux通过sendfile系统调用)对页面缓存到套接字的传输高度优化,统一消息格式因而受益。具体来说,免去了不必要的拷贝(Zero-copy),仅仅需要将页面缓存拷贝到NIC缓冲即可,不需要用户空间的参与。

Kafka主题通常有多组消费者,那么,日志一旦拷贝到页面缓存,就可以多次的Zero-copy并发送给消费者。这避免了反复拷贝数据到用户空间,记录的消费速率,仅仅受限于网络连接本身。

端对端压缩

某些情况下,系统的瓶颈出现在网络带宽,而非磁盘或者CPU。当数据处理管线需要跨越数据中心进行消息收发时尤为如此。

用户可以自己实现压缩,但是客户端代码很难实现高效的压缩。Kafka采取批量压缩的方式,这样多个消息中反复出现的内容可以被有效的处理。

Kafka代理收到压缩消息后,直接存放到日志中,因此日志的磁盘结构也是紧凑的。除非消费者进行消费,消息不会解压缩。Kafka支持GZip、LZ4、Snappy等压缩协议。

生产者
负载均衡

生产者直接将消息发送给分区的Leader,Kafka没有引入中介的路由层。为了能让生产者知道向谁发送消息,所有节点都能应答元数据请求,给出服务器状态信息、分区Leader信息。

生产者负责决定将消息发送到哪个分区,可以随机发送、循环轮流发送,或者根据消息内容实现某种语义分区。要使用语义分区,生产者需要给记录一个适当的键,此键的哈希(默认逻辑,分区函数可以被覆盖)决定其被发送到哪个分区。例如,假设记录的键选用UserId,则用户的所有数据都被发送到单个分区。

异步发送

批量处理是高效性的一个重大驱动力。Kafka生产者倾向于在内存中累计记录,并通过单个请求发送多个记录构成的批次。你可以配置累计记录时最多消耗的时间,或者累计数据的最大字节数。通过调整配置,你可以在更好吞吐量、更低延迟之间做权衡。

消费者

消费者通过向Leader分区发送抓取(Fetch)请求来工作。请求中会包含消费者的消费偏移量,代理依据此偏移量发送一批记录作为响应。消费者对偏移量具有很大的控制能力,只要不超过可用的区间,消费者能够倒回(Rewind)以消费历史数据,或者跳转到“未来”(在跟不上生产者的节奏时)进行消费。

推vs拉

消息应该由代理推送给客户端,还是由客户端主动去拉取?Kafka在这方面的设计和典型消息系统是一致的 —— 消息由生产者推送给代理,再由消费者从代理处拉取。

某些以日志为中心的系统,例如Apache Flume,把消息推送给消费者。推和拉各有其优缺点。

当推送消息给消费者时,代理很难针对消费者的特性进行处理,控制数据的推送速率,当生产者速度太快时,消费者很容易过载。拉模式下,消费者可以自由决定拉取速率,并通过某种后备(Backoff)机制缓和生产和消费速率不匹配的问题。拉模式还将将批量抓取的职责转移到消费者端,消费者更知道自己何时能消费大批量数据。

拉模式的缺点是,如果代理上没有可用的消息。消费者的轮询循环会反复进行,浪费CPU。Kafka的解决方式是使用长轮询,你可以配置一个等待时间,如果代理上没有消息可用,则客户端会在套接字上等待,而非立即结束请求。

消费偏移量

对于消息系统来说,跟踪哪些消息已经被消费,是对性能有关键影响的地方。

传统消息系统一般都在代理端维护消息的元数据,通过元数据识别哪些消息已经被消费。当客户端抓取消息,或者Ack消息后,代理立即删除此消息。之所有要尽快的删除,是因为传统消息系统的数据结构的扩容性往往较差,倾向于尽可能让此结构更小。

识别消息是否真正被成功消费,并不简单。如果消息被抓取后立即删除,消费者可能在处理消息之前就宕掉。为解决此问题,消息系统通常引入Ack机制。这又引入新的问题,消费者可能在处理消息之后,Ack之前宕掉,并引发重复的消息处理。分布式事务机制可能解决此问题,但是性能很差。

Kafka跟踪消息进度的方式完全不同,它将主题分为若干个消息有序的分区,并引入消费偏移量,标记消费者当前的位置。对于每个消费者和分区的组合,仅仅需要一个整数就可以完成状态记录。

消费偏移量的引入,还让消息没必要立即删除,实际上,只要磁盘空间充足,你可能配置让Kafka在一周或更久后才删除数据。

离线数据负载

由于Kafka不会立即删除数据,且数据量的多少和性能无关,因此,那种周期性运行,消费大量数据并存放到离线系统(Hadoop、RDBMS数据仓库)的消费者,可以和Kafka协同工作。

对于Hadoop来说,为每个节点/主题/分区的组合创建一个Map任务,可以很好的实现加载并行化。利用Hadoop提供的任务管理能力,失败的任务可以自动重启而不需要担心数据重复,重启的任务自动从原始的位置开始读取。 

运维
操控主题

Kafka的bin目录包含很多脚本,可以用于完成日常的管理工作。

添加主题

主题可以在第一次被使用时自动创建,你可能需要对默认配置进行定制,以便控制默认窗口过程。

要手工添加主题,参考如下命令:

Shell
1
2
kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic topic_name
      --partitions 20 --replication-factor 3 --config x=y

分区日志存放在Kafka的log目录,每个分区都有自己的目录,目录命名规则为topicname-partitionid。

修改主题

仍然使用上面的脚本,例如修改分区数可以:

Shell
1
2
kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic topic_name
      --partitions 40

注意:

  1. 分区可能具有应用语义,添加分区不会自动修改已有分区的数据分布
  2. 当前不支持减少分区数量

要添加、删除主题的配置项,参考:

Shell
1
2
3
4
kafka-configs.sh --zookeeper zk_host:port/chroot --entity-type topics --entity-name topic_name
    --alter --add-config x=y
kafka-configs.sh --zookeeper zk_host:port/chroot --entity-type topics --entity-name topic_name
    --alter --delete-config x
删除主题 

kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic topic_name

优雅关闭

Kafka集群会自动检测到宕机、有意被关闭(维护目的)的代理,并在必要时为分区选举新的Leader。

对于有意关闭的情况,你可以优雅的停止代理而不是强行关机。优雅关闭的好处是:

  1. 会自动同步所有日志到磁盘中,避免在重启时进行日志恢复。日志恢复对日志尾部的所有记录进行校验和操作,需要一定时间才能完成
  2. 对于待关闭机器是Leader的分区,执行迁移,将Leader身份迁移给其它代理。这种Leader迁移更快、Leader迁移导致的分区不可用时间仅仅在ms级别

优雅关闭时,上述第1条自动发生。要启用第2条,需要配置: controlled.shutdown.enable=true

再平衡

代理宕机后,其Leader身份全部被移除。这意味着,代理重启后,它不是任何分区的Leader,因而不会接受任何客户端的读写请求,其计算能力被浪费。

为了避免这种不平衡,Kafka引入优选Replica的概念。如果某个分区的复制集为1,5,9则1是优选节点,因为它在列表的最前面。要在Replica 1宕机重启后,自动恢复其Leader身份,可以执行:

kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot

要自动在需要的时候执行上述命令,可以配置: auto.leader.rebalance.enable=true

跨机柜再平衡 

从0.10开始Kafka支持机柜感知(Rack awareness),允许将分区的不同Replica分布到不同的Rack中,防止机柜整体断电断网导致Kafka分区完全不可用,Rack可以在地理上远程分布。 

要指定代理所属的机柜,配置broker.rack属性即可。

当创建、修改主题,或者再平衡Replica时,Kafka会尽可能的把分区的每个Replica分散到不同的Rack中。

跨集群镜像

此脚本提供了一种跨越数据中心进行复制的手段。数据会从源集群中读取,并写到目标集群的同名主题中。实际上此镜像工具就是挂钩在一起的消费者-生产者组合。

为了增强吞吐量和容错能力,你可以运行此脚本的任意数量的实例。当某个实例意外宕掉后,其它实例会接管它的负载。

镜像不能作为完全有效的容错措施,因为源、目标集群是完全独立的,它们的偏移量取值是不一样的。

镜像单个主题的示例:

Shell
1
2
3
4
kafka-mirror-maker.sh
      --consumer.config consumer.properties
      --producer.config producer.properties
      --whitelist topic-name    # 白名单,仅仅复制这些主题,可以使用正则式
跨数据中心部署

某些部署场景中,需要跨越数据中心复制数据。

推荐的方式

Kafka推荐的方式是,在数据中心内部创建集群,应用程序仅仅和当前数据中心的Kafka集群交互。对于跨数据中心的数据复制,利用跨集群镜像实现。

这种部署模式的好处时,让数据中心之间解耦,使它们作为独立的实体运作。同时,跨数据中心的数据复制可以被集中管理。当数据中心之间的链路断开后,各中心可以独立运作,落后的数据复制进度可以在链路恢复后赶上。

如果某个应用需要全数据集的视图,使用跨集群镜像机制来聚合所有数据中心的主题,形成一个新数据中心,那些需要全数据集视图的应用,和新数据中心交互。

远程访问主题

除了上述推荐方式以外,你也可以让应用通过WAN访问其它数据中心,但是很明显这会引入访问延迟。

不过,即便在高延迟网络环境下,Kafka天然支持的生产者/消费者批处理能力依然能让应用拥有较高吞吐量。不过你可能需要为代理、生产者、消费者调整socket.send.buffer.bytes、socket.receive.buffer.bytes等参数。

跨数据中心集群

通常情况下,不要创建跨越数据中心的Kafka集群。特别是链路延迟较大的情况下。这会让ZooKeeper、Kafka的复制延迟都很大,此外,如果链路中断,则部分数据中心的Kafka、ZooKeeper会不可用。

 

管理消费者组
列出消费者组

要列出集群中的消费者组,执行:

Shell
1
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

如果你使用老的High-Level消费者并且在ZooKeeper中存储组元数据(offsets.storage=zookeeper),则传递--zookeeper而非--bootstrap-server。

检查消费偏移量

你可以检查某个消费者组中,每个分区的消费偏移量:

Shell
1
2
3
4
5
6
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group groupname
# 还可以使用如下形式(仅仅显示基于ZooKeeper的消费者,不显示使用Java Consumer API的消费者
kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group groupname
# 输出示例
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID
# 主题   分区号     消费偏移量      日志偏移量       延迟  消费者
集群扩容 

要添加新服务器到Kafka集群,你只需要为它分配唯一性的代理ID即可。但是新的代理不会自动被分配数据分区,因此默认情况下除非创建了新的主题,新加入的代理不会有任何作用。

在添加新服务器的同时,你常常会执行迁移,将已有的分区迁移到新服务器中。迁移过程需要手工触发,之后可以自动完成。迁移触发后,Kafka将新服务器作为待迁移分区的Follower节点,当完成数据复制,新服务器成为In-sync节点后,复制集中某个旧节点的数据被清除,成员身份也被解除。

Kafka提供了分区重分配工具,允许你跨越代理移动分区。理想的分区分布,能让所有代理节点具有均匀的数据负载、均匀的数据尺寸。Kafka没有提供自动识别“不均匀”的工具,因而管理员需要手工识别出哪些分区需要移动。

分区重分配工具可以在三个互斥的模式下运行:

  1. --generate,给出主题、代理的列表,生成一个候选重分配方案,将列出的主题的所有分区移动到代理中
  2. --execute,执行用户给出的重分配方案,方案通过 --reassignment-json-file参数给出
  3. --verify,显示上次重分配的执行情况,状态可以是成功、失败、进行中
自动分区迁移

在添加了新的服务器(扩容Kafka集群)之后,使用分区重分配工具,可以将一部分主题迁移到新添加的代理上,迁移整个主题,要比每次迁移一个分区更加容易。

要执行整个主题的迁移,用户需要指定被迁移的主题、接收主题的代理。迁移后,主题的复制因子保持不变,主题的分区被尽可能均匀的分布到目标代理中。

具体步骤如下:

  1. 以JSON格式提供待迁移主题的列表:
    topics-to-move.json
    JavaScript
    1
    2
    3
    4
    5
    {
        // 待迁移主题
        "topics": [{"topic": "foo1"}, {"topic": "foo2"}],
        "version":1
    }
  2. 使用分区重分配工具,生成一个候选的重分配置文件:
    Shell
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    kafka-reassign-partitions.sh --zookeeper localhost:2181
        --topics-to-move-json-file topics-to-move.json   # 待迁移主题列表文件
        --broker-list "5,6"                              # 目标代理列表(标识符)
        --generate
    # 输出的前一部分是当前重分配置文件(略)
    # 输出的后一部分是新的重分配置文件,示例:
    # {
    # "version":1,
    #                 # 主题         # 分区编号     # 复制集
    # "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
    #               {"topic":"foo1","partition":0,"replicas":[5,6]},
    #               {"topic":"foo2","partition":2,"replicas":[5,6]},
    #               {"topic":"foo2","partition":0,"replicas":[5,6]},
    #               {"topic":"foo1","partition":1,"replicas":[5,6]},
    #               {"topic":"foo2","partition":1,"replicas":[5,6]}]
    # }
  3. 此时重分配尚未发生,你需要备份当前分配置文件,以便回滚。然后将重分配候选保存到JSON文件中,执行:
    Shell
    1
    2
    3
    kafka-reassign-partitions.sh --zookeeper localhost:2181
        # 重分配配置文件
        --reassignment-json-file expand-cluster-reassignment.json --execute
  4. 上述命令会异步执行,你可以随时检查进度:
    Shell
    1
    2
    3
    4
    5
    kafka-reassign-partitions.sh --zookeeper localhost:2181
        --reassignment-json-file expand-cluster-reassignment.json --verify
     
    # Reassignment of partition [foo1,0] completed successfully
    # Reassignment of partition [foo1,1] is in progress 
定制分区迁移

除了上节那种自动分配、迁移整个主题的用法之外,分区重分配工具还允许你手工指定迁移特定的分区。如果你明白集群的负载分配情况,并且自动生成的重分配候选不能满足需要,可以进行使用这种定制迁移。

在准备好重分配置文件后,执行上节第3步即可。

增加复制因子

增加某个分区的复制因子很简单,只需要在定制的重分配置文件中,为某个分区的replicas列表添加一个代理ID就可以了:

JavaScript
1
2
// 为foo的分区0添加一个复制因子,新的副本存放在代理7上
{ "version":1, "partitions": [{"topic":"foo","partition":0,"replicas":[5,6,7]}] }
限制迁移带宽

Kafka支持设置迁移所消耗的带宽的上限,避免集群再平衡、添加/移除代理时Replica在机器之间移动消耗过多的带宽,影响系统性能。

限制带宽的方式有两种:

  1. Shell
    1
    kafka-reassign-partitions.sh --execute —throttle 50000000  # 单位 Bytes/s 

    注意:

    1. 你可以在迁移过程中重新调用上述脚本并指定不同的带宽限制
    2. 在迁移完成后,你必须调用--verify来解除带宽限制,否则,正常的数据复制会一直被限流
  2. 调用脚本 kafka-configs.sh来验证相关配置。和限流有关的配置有两对:
    leader.replication.throttled.rate、follower.replication.throttled.rate   限流速率
    leader.replication.throttled.replicas、follower.replication.throttled.replicas 受到限制的Replica
集群缩容

当前Kafka没有提供直接可用的集群缩容工具。这意味着,你需要使用分区重分配工具,将准备移除的代理上所有的分区全部迁移走。

配额 

Kafka支持在user,client-id、user、client-id级别进行配额,限制用户对集群资源的占用。默认情况下,资源占用不受限制。

要为用户user1、客户端clientA进行配额,可以:

Shell
1
2
3
4
5
6
kafka-configs.sh --zookeeper localhost:2181 --alter
    # 配额        生产速率                  消费速率                
    --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200'
    # 要仅对用户、客户端进行配额,则指定两项之一
    --entity-type users --entity-name user1
    --entity-type clients --entity-name clientA

要查看用户user1、客户端clientA的当前配额,可以:

Shell
1
kafka-configs.sh  --zookeeper localhost:2181 --describe --entity-type clients --entity-name clientA

你可以在代理级别上设置默认的配额,对所有客户端生效:

1
2
3
# 仅仅当没有在ZooKeeper中覆盖时生效
quota.producer.default=10485760
quota.consumer.default=10485760  

 

 

 

 

 

 

← Previous Post
Kafka Streams学习笔记 →

Leave a Reply Cancel reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code class="" title="" data-url=""> <del datetime=""> <em> <i> <q cite=""> <strike> <strong> <pre class="" title="" data-url=""> <span class="" title="" data-url="">

Related Posts

  • Kafka Streams学习笔记
  • Apache Storm学习笔记
  • OpenTSDB学习笔记
  • 基于EFK构建日志分析系统
  • ElasticSearch学习笔记

Recent Posts

  • Investigating and Solving the Issue of Failed Certificate Request with ZeroSSL and Cert-Manager
  • A Comprehensive Study of Kotlin for Java Developers
  • 背诵营笔记
  • 利用LangChain和语言模型交互
  • 享学营笔记
ABOUT ME

汪震 | Alex Wong

江苏淮安人,现居北京。目前供职于腾讯云,专注容器方向。

GitHub:gmemcc

Git:git.gmem.cc

Email:gmemjunk@gmem.cc@me.com

ABOUT GMEM

绿色记忆是我的个人网站,域名gmem.cc中G是Green的简写,MEM是Memory的简写,CC则是我的小天使彩彩名字的简写。

我在这里记录自己的工作与生活,同时和大家分享一些编程方面的知识。

GMEM HISTORY
v2.00:微风
v1.03:单车旅行
v1.02:夏日版
v1.01:未完成
v0.10:彩虹天堂
v0.01:阳光海岸
MIRROR INFO
Meta
  • Log in
  • Entries RSS
  • Comments RSS
  • WordPress.org
Recent Posts
  • Investigating and Solving the Issue of Failed Certificate Request with ZeroSSL and Cert-Manager
    In this blog post, I will walk ...
  • A Comprehensive Study of Kotlin for Java Developers
    Introduction Purpose of the Study Understanding the Mo ...
  • 背诵营笔记
    Day 1 Find Your Greatness 原文 Greatness. It’s just ...
  • 利用LangChain和语言模型交互
    LangChain是什么 从名字上可以看出来,LangChain可以用来构建自然语言处理能力的链条。它是一个库 ...
  • 享学营笔记
    Unit 1 At home Lesson 1 In the ...
  • K8S集群跨云迁移
    要将K8S集群从一个云服务商迁移到另外一个,需要解决以下问题: 各种K8S资源的迁移 工作负载所挂载的数 ...
  • Terraform快速参考
    简介 Terraform用于实现基础设施即代码(infrastructure as code)—— 通过代码( ...
  • 草缸2021
    经过四个多月的努力,我的小小荷兰景到达极致了状态。

  • 编写Kubernetes风格的APIServer
    背景 前段时间接到一个需求做一个工具,工具将在K8S中运行。需求很适合用控制器模式实现,很自然的就基于kube ...
  • 记录一次KeyDB缓慢的定位过程
    环境说明 运行环境 这个问题出现在一套搭建在虚拟机上的Kubernetes 1.18集群上。集群有三个节点: ...
  • eBPF学习笔记
    简介 BPF,即Berkeley Packet Filter,是一个古老的网络封包过滤机制。它允许从用户空间注 ...
  • IPVS模式下ClusterIP泄露宿主机端口的问题
    问题 在一个启用了IPVS模式kube-proxy的K8S集群中,运行着一个Docker Registry服务 ...
  • 念爷爷
      今天是爷爷的头七,十二月七日、阴历十月廿三中午,老人家与世长辞。   九月初,回家看望刚动完手术的爸爸,发

  • 6 杨梅坑

  • liuhuashan
    深圳人才公园的网红景点 —— 流花山

  • 1 2020年10月拈花湾

  • 内核缺陷触发的NodePort服务63秒延迟问题
    现象 我们有一个新创建的TKE 1.3.0集群,使用基于Galaxy + Flannel(VXLAN模式)的容 ...
  • Galaxy学习笔记
    简介 Galaxy是TKEStack的一个网络组件,支持为TKE集群提供Overlay/Underlay容器网 ...
TOPLINKS
  • Zitahli's blue 91 people like this
  • 梦中的婚礼 64 people like this
  • 汪静好 61 people like this
  • 那年我一岁 36 people like this
  • 为了爱 28 people like this
  • 小绿彩 26 people like this
  • 彩虹姐姐的笑脸 24 people like this
  • 杨梅坑 6 people like this
  • 亚龙湾之旅 1 people like this
  • 汪昌博 people like this
  • 2013年11月香山 10 people like this
  • 2013年7月秦皇岛 6 people like this
  • 2013年6月蓟县盘山 5 people like this
  • 2013年2月梅花山 2 people like this
  • 2013年淮阴自贡迎春灯会 3 people like this
  • 2012年镇江金山游 1 people like this
  • 2012年徽杭古道 9 people like this
  • 2011年清明节后扬州行 1 people like this
  • 2008年十一云龙公园 5 people like this
  • 2008年之秋忆 7 people like this
  • 老照片 13 people like this
  • 火一样的六月 16 people like this
  • 发黄的相片 3 people like this
  • Cesium学习笔记 90 people like this
  • IntelliJ IDEA知识集锦 59 people like this
  • Bazel学习笔记 38 people like this
  • 基于Kurento搭建WebRTC服务器 38 people like this
  • PhoneGap学习笔记 32 people like this
  • NaCl学习笔记 32 people like this
  • 使用Oracle Java Mission Control监控JVM运行状态 29 people like this
  • Ceph学习笔记 27 people like this
  • 基于Calico的CNI 27 people like this
Tag Cloud
ActiveMQ AspectJ CDT Ceph Chrome CNI Command Cordova Coroutine CXF Cygwin DNS Docker eBPF Eclipse ExtJS F7 FAQ Groovy Hibernate HTTP IntelliJ IO编程 IPVS JacksonJSON JMS JSON JVM K8S kernel LB libvirt Linux知识 Linux编程 LOG Maven MinGW Mock Monitoring Multimedia MVC MySQL netfs Netty Nginx NIO Node.js NoSQL Oracle PDT PHP Redis RPC Scheduler ServiceMesh SNMP Spring SSL svn Tomcat TSDB Ubuntu WebGL WebRTC WebService WebSocket wxWidgets XDebug XML XPath XRM ZooKeeper 亚龙湾 单元测试 学习笔记 实时处理 并发编程 彩姐 性能剖析 性能调优 文本处理 新特性 架构模式 系统编程 网络编程 视频监控 设计模式 远程调试 配置文件 齐塔莉
Recent Comments
  • qg on Istio中的透明代理问题
  • heao on 基于本地gRPC的Go插件系统
  • 黄豆豆 on Ginkgo学习笔记
  • cloud on OpenStack学习笔记
  • 5dragoncon on Cilium学习笔记
  • Archeb on 重温iptables
  • C/C++编程:WebSocketpp(Linux + Clion + boostAsio) – 源码巴士 on 基于C/C++的WebSocket库
  • jerbin on eBPF学习笔记
  • point on Istio中的透明代理问题
  • G on Istio中的透明代理问题
  • 绿色记忆:Go语言单元测试和仿冒 on Ginkgo学习笔记
  • point on Istio中的透明代理问题
  • 【Maven】maven插件开发实战 – IT汇 on Maven插件开发
  • chenlx on eBPF学习笔记
  • Alex on eBPF学习笔记
  • CFC4N on eBPF学习笔记
  • 李运田 on 念爷爷
  • yongman on 记录一次KeyDB缓慢的定位过程
  • Alex on Istio中的透明代理问题
  • will on Istio中的透明代理问题
  • will on Istio中的透明代理问题
  • haolipeng on 基于本地gRPC的Go插件系统
  • 吴杰 on 基于C/C++的WebSocket库
©2005-2025 Gmem.cc | Powered by WordPress | 京ICP备18007345号-2