Apache Kafka学习笔记
Apache Kafka(音标/'ka:fka:/)是一个分布式的实时数据处理的基础平台,能够处理每秒百万条数据。它具有三大功能:
- 订阅/发布:类似于传统MOM的功能,将队列、主题合二为一
- 流处理:支持编写可扩容的流处理程序,对实时事件做出响应
- 存储:安全的存储数据流,支持分布式、复制的、容错等特性
Kafka非常适用于以下几类应用场景:
- 构建能够可靠的在系统或应用程序之间收发实时流的数据管线
- 构建能够转换、处理流数据的实时应用程序
Kafka天生使用集群架构,由1-N个服务器构成的集群组成。Kafka集群对流记录进行分类存储,每个类别叫做主题(Topic)。每个流记录包含一个键、一个值、一个时间戳。
Kafka提供四类核心API:
- Producer API:允许应用程序发布流记录到Kafka主题
- Consumer API:允许应用程序订阅主题,并处理发布到这些主题的流记录
- Streams API:允许一个应用程序作为流处理器(Strem Processor),消费来自1-N个主题的输入流,并向1-N个主题释放输出流 —— 也就是高效的进行流转换
- Connector API: 允许构建可重用的生产者、消费者,把Kafka和既有的应用程序和数据系统连接起来。例如,针对RDBMS的连接器能够捕获针对表的每一个写操作
客户端和Kafka的通信协议基于简单、高效、语言无关的方式设计,以TCP协议为基础,支持版本化(向后兼容)。Kafka提供了Java以及其它语言的客户端。
主题即记录的流,Kafka中的主题可以具有多个订阅者。对于每个主题,Kafka集群维护分区化(partitioned)存储的日志:
- 每个分区是有序的、不可变的记录的序列,仅仅支持向序列的尾部添加记录
- 分区中的每个记录被分配一个序列号 —— offset,此序列号在分区中唯一的标识记录
- 集群在一个可配置的时间段内保留记录,不管记录有没有消费过,这个行为不同于传统MOM,后者不保留消费过的消息。不管配置的保留时间有多长,都不会影响Kafka的性能,这意味着你可以将Kafka作为数据库使用
每个消费者需要记录它当前正在消费的记录的偏移量(游标),通常情况下,消费者会线性的向前单步移动偏移量,这和传统MOM消费者的行为对应。但是,Kafka允许消费者任意移动游标,这个特性可用于实现Replay功能。
上述主题日志的分区,可以:
- 跨越Kafka集群中多个服务器存储
- 每个分区可以具有可配置数量的副本,用于容错
在启用复制的情况下,持有同一分区的多个服务器,其中一个被选举为Leader,其它则作为Followers。Leader处理所有针对分区的读写请求,Followers则仅仅进行复制。每个服务器都作为一些分区的Leader,另一些分区的Follower,以实现负载均衡。
生产者负责发布记录到主题上,它决定把哪个记录分配到哪个分区上。可选的策略例如循环轮询(round-robin)、某种和应用语义有关的方式(依据记录的某些键)。
Kafka的一个重要设计特点是,不区分主题、队列。这个特点是它灵活的消费模型提供的。
消费者可以将自己标记为属于某个消费者组(consumer group),发布到主题的每个记录会递送给消费者组消费,仅仅组中一个成员可以消费某个记录。这意味着:
- 某个主题仅仅包含单个消费者组:这种配置相当于传统MOM的多消费者的队列
- 某个主题包含多个单成员的消费者组:这种配置相当于传统MOM的主题
Kafka中消费行为的实现方式是:根据消费者数量,对日志分区进行划分。在任意时刻,每个消费者都公平的享有某一部分分区,此行为由Kafka协议动态处理。如果新的消费者加入,它将从其它消费者那里接管一部分分区。如果某个消费者宕机,它拥有的分区将由剩余的消费者瓜分。
Kafka仅仅在分区内部保证顺序性,在大部分情况下,基于记录键的分区+分区内有序是足够满足需要的。如果一定需要全局性顺序保证,你可以设计仅仅包含单个分区的主题,并且对于每个消费者组,仅仅有单个消费者。
Kafka提供以下保证:
- 一个生产者发布到某个特定分区的记录,保证按照其发送顺序附加到日志的尾部
- 消费者看到的记录顺序,和它们在分区中存储的顺序一致
- 对于具有复制因子N的主题,Kafka允许N-1服务器同时宕机,而不丢失任何已经提交到日志的记录
传统的消息系统模型有两种:队列、订阅/发布。队列的优势是容易实现消费者的负载均衡,但是消息一旦被消费就消失不能供多人消费,订阅/发布(主题)的优缺点则刚好相反。
Kafka的创新之处在于,将队列、主题合而为一。
Kafka还提供比传统MOM更强的有序性。尽管传统MOM支持多个消费者,消息也是按序被消费的,但是这些消费者接收消息是异步的,顺序无法保证。Kafka的分区机制能够增强有序性,分区被特定单个消费者消费,不会出现顺序混乱的情况。
Kafka主题中的记录被消费后,不会消失,这让它可以作为in-flight消息的存储系统。
事实上,Kafka是一个优秀的存储系统,因为:
- 写入的数据被复制,支持容错。Kafka允许生产者等待一个Ack —— 记录被合理复制、持久化后才认为操作成功
- 磁盘数据结构非常可扩容,不管是存储10KB还是10TB记录,性能不会有显著变化
Kafka不仅仅能够用来读取、写入、存储数据流,它还能用来进行实时流处理。
在Kafka中,一个流处理器是这样的一个程序:它接受持续不断的数据流输入(从主题),处理后产生持续不断的数据流输出(到主题)。
使用Producer/Consumer可以实现先单的流处理器,更复杂的需求可以基于Streams API实现。Streams API支持聚合、连接等操作,支持乱序数据处理、输入再加工、有状态计算等应用场景。
Streams API在Kafka提供的核心原语上构建,它使用生产者/消费者API作为输入,使用Kafka作为状态存储,使用组机制实现多个流处理器的负载均衡。
联用Kafka的存储+低延迟订阅功能,可以让你处理历史、未来数据的方式变得一致 —— 基于Kafka的应用程序可以处理历史数据,当它处理完最后一个记录时,不需要退出,只需要等待未来的数据到达即可继续处理。Kafka的可靠存储功能,让你可以很容易的集成某些需要定期下线维护的系统。
消息代理可以用于解耦消息生产者和消费者、缓冲待处理消息,Kafka可以作为传统消息代理的替代品。
和大部分消息代理相比,Kafka具有更好的吞吐能力,并内置数据分区、容错等特性。
Kafka的一个原始的应用场景是,辅助重建用户活动跟踪的管线。用户的活动(页面浏览、搜索等)被按照活动类型收集并发布到不同的主题上。订阅这些主题后,可以进行实时处理、实时监控、加载到Hadoop以便离线处理。
活动跟踪的数据量通常情况下比消息传输大的多。
Kafka可以用记录、处理监控数据。它可以从分布式应用中获取度量信息并进行聚合操作。
Kafka可以用来从多个服务器上收集日志,并集中起来(例如存储到HDFS)以处理。Kafka抽象掉日志文件的概念,日志编程一系列消息组成的流。
从0.10开始Kafka内置了一个轻量的流处理框架Kafka Streams,使用此框架可以构建数据处理管线,针对数据进行聚合、转换等操作。
Apache Storm等流处理框架可以和Kafka很好的集成。
Kafka可以作为分布式系统的外部提交日志(commit-log)。提交日志用于辅助节点之间的数据复制、失败节点的再同步。Kafka的log compaction特性用于支持这一应用场景,在这种场景下Kafka类似于Apache BookKeeper。
到Kafka官网下载压缩包,并解压。Kafka是基于JVM的应用,因此你需要事先安装好JRE。
Kafka依赖于ZooKeeper,你可以使用外部的ZooKeeper服务器。或者,利用Kafka提供的脚本,创建一个简单的单节点ZooKeeper实例:
1 2 3 |
pushd ~/JavaEE/middleware/kafka/ > /dev/null # 启动ZooKeeper zookeeper-server-start.sh config/zookeeper.properties |
1 |
kafka-server-start.sh config/server.properties |
1 2 3 4 |
# 连接到ZooKeeper,来操控Kafka # replication-factor 复制因子,也就是数据复制的份数 # partitions 分区数量 kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test |
除了手工创建主题外,还可以配置,让代理在有客户端尝试发布消息时自动创建不存在的主题。
执行下面的命令可以列出现有的主题:
1 |
kafka-topics.sh --list --zookeeper localhost:2181 |
Kafka提供了一个命令,用于从标准输入或者文件中读取信息,并发布到到主题上:
1 |
kafka-console-producer.sh --broker-list localhost:9092 --topic test |
默认的每一行输入都作为单独的消息发送。
Kafka提供了一个命令,用于消费消息并将其内容打印到标准输出:
1 2 |
# 从头开始消费 kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning |
如果你在两个Terminal同时打开生产者、消费者,然后在生产者那里输入文字并回车,文字会原样的出现在消费者窗口中。
对于Kafka来说,单个代理也是集群,也就是说它天然是集群的。要配置包含多个代理实例的集群,没有什么特别之处。本节演示如何创建三个实例组成的集群。
1 2 |
cp config/server.properties config/server-1.properties cp config/server.properties config/server-2.properties |
参考下面的示例修改:
1 2 3 4 5 |
# 对于每个集群中的任何一个节点,id必须是唯一的、永久不变的 broker.id=1 # 由于我们准备在一台机器上启动多个代理实例,因此需要定制某些参数: listeners=PLAINTEXT://:9093 log.dir=/tmp/kafka-logs-1 |
执行下面的命令启动两个额外的实例,加上本章最初创建的那个,组成三成员的集群:
1 2 |
kafka-server-start.sh config/server-1.properties kafka-server-start.sh config/server-2.properties |
1 2 3 |
# 复制因子3,表示数据复制三份 # 分区数量1 kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic reptest |
执行下面的命令,可以看到刚刚创建的主题的相关信息,以及各代理实例和主题的关系:
1 2 3 4 5 6 7 |
kafka-topics.sh --describe --zookeeper localhost:2181 --topic reptest # Topic:reptest PartitionCount:1 ReplicationFactor:3 Configs: # 对于每个分区(本主题只有一个分区),列出以下信息 # Leader 负责该分区所有读写操作的节点(代理实例)ID # Replicas 复制此分区的所有节点列表,不管是否alive,不管是否leader # Isr 和Leader保持同步(in-sync)的节点,这些节点必然是alive的 # Topic: reptest Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2 |
使用Kafka Connect这一工具,你可以从其它数据源导入数据到kafka,或者将Kafka中的数据导出到其它系统。对于很多系统来说,导入导出不需要手工编写集成代码。
Kafka Streams是一个客户端库,用于构建输入/输出数据存储于Kafka集群的实时应用或者微服务,并满足可扩容、弹性、容错、分布式的质量需求。
Kafka提供了5类核心API:
API | 说明 |
Producer | 发布数据流到Kafka集群的主题上 |
Consumer | 从Kafka集群的主题上读取数据流 |
Streams | 在输入主题、输出主题之间进行数据流的转换 |
Connect | 实现连接器,可以持续的从外部数据源拉拉取数据并发布到主题,或者持续的将主题推送到外部应用或者Sink系统 |
AdminClient | 管理、查看代理、主题以及其它Kafka对象 |
要使用此API,可以引入Maven依赖:
1 2 3 4 5 |
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency> |
此类是一个Kafka客户端,用于发送记录到Kafka集群。此类是线程安全的,使用单个实例通常性能更好。示例代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
package cc.gmem.study.kafka; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * Entrypoint */ public class Application { public static void main( String[] args ) { // 指定生产者配置 Properties props = new Properties(); props.put( "bootstrap.servers", "172.21.3.1:9092,172.21.3.2:9092,172.21.3.3:9092" ); props.put( "acks", "all" ); // 仅当所有Replica确认后,才认为记录提交成功 props.put( "retries", 0 ); // 不重试,注意重试可能引起重复消息 props.put( "batch.size", 16384 ); // 每个分区可以占用的缓冲区大小 props.put( "linger.ms", 1 ); // 即使缓冲区有空间,批次也可能立即被发送,此配置引入延迟 props.put( "buffer.memory", 33554432 ); // 最大可用缓冲区 // 如何把键值转换为字节 props.put( "key.serializer", "org.apache.kafka.common.serialization.StringSerializer" ); props.put( "value.serializer", "org.apache.kafka.common.serialization.StringSerializer" ); /** * 创建生产者 * 生产者主要由一个缓冲池(存放尚未发送到服务器的记录)和一个后台IO线程(负责将记录转换为请求发送)组成 * */ Producer<String, String> producer = new KafkaProducer<>( props ); for ( int i = 0; i < 10; i++ ) { String topic = "TEST"; String key = "KEY-" + Integer.toString( i ); String value = "VALUE-" + Integer.toString( i ); /** * 生产者记录:需要发送到特定Kafka主题的键值对 * * 你可以为记录指定一个分区号,这样记录会发送到该分区中。如果不指定分区号但是给出了键,则键的哈希 * 用于确定分区号。如果分区号、键都没有指定,则以循环轮流的方式发送到所有可用分区 * * 你还可以为记录指定一个时间戳,如果不指定默认使用当前时间 * 如果主题配置为TimestampType#CREATE_TIME,则上述时间戳将被代理使用 * 如果主题配置为TimestampType#LOG_APPEND_TIME,则代理覆盖记录的时间戳,以实际添加到日志时代理的本地时间为准 * 实际生效的时间戳将通过RecordMetadata返回给客户端 * */ ProducerRecord<String, String> record = new ProducerRecord<>( topic, key, value ); /** * 发送方法是异步的,它只是把消息存放到缓冲区中然后立即返回 * 处于效率考虑,生产者可能批量的发送记录 */ producer.send( record ); } /** * 必须要保证生产者的关闭,否则线程、缓冲区资源无法释放 */ producer.close(); } } |
从0.11开始KafkaProducer支持两种额外的模式:幂等模式、事务模式。
该模式增强了递送语义,将至少一次递送增强为精确的一次递送。注意:
- 这种幂等性仅仅在单个会话内部保证
- 客户端的retry不会引发记录的重复
要启用该模式,需要配置 enable.idempotence=true。执行此配置后,retries默认值为 Integer.MAX_VALUE 且ack默认值为all。
幂等模式下,API的调用方式无变化。
该模式允许客户端原子的向多个分区、甚至多个主题发送数据。
要启用该模式,需要设置 transactional.id。如果设置了此属性则幂等性会自动开启。参与到事务中的主题应当具有至少3的复制因子且这些主题的 min.insync.replicas至少为2。
为了保证端到端的事务性,消费者应该设置为仅仅读取已提交消息。
transactional.id的目的能够跨越单个生产者实例的多个会话进行事务恢复。此ID通常根据分片的、有状态的应用程序的分片标识符(Shard Identifier)产生。对于一个分片应用程序中的每个生产者实例来说,此ID必须唯一。
事务性的API是阻塞的,并且失败时会抛出异常,示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
Properties props = new Properties(); props.put( "bootstrap.servers", "localhost:9092" ); // 每个生产者同时只能打开一个事务 props.put( "transactional.id", "my-transactional-id" ); Producer<String, String> producer = new KafkaProducer<>( props, new StringSerializer(), new StringSerializer() ); /** * 在事务模式下,必须首先调用下面的方法。此方法的行为如下: * 1、确保由先前实例发起的、相同ID的事务已经完成。如果先前实例在进行事务的过程中失败了,其事务在此被中止 * 如果上一个事务正在提交,此方法会等待其完成 * 2、获得内部的生产者id、epoch,并在所有事务性消息上使用 */ producer.initTransactions(); try { // 只要启用了事务模式,任何记录都必须在事务中发送 // 启动一个新事务 producer.beginTransaction(); for ( int i = 0; i < 10; i++ ) { String topic = "test"; String key = "KEY-" + Integer.toString( i ); String value = "VALUE-" + Integer.toString( i ); // 不需要注册send的回调或者使用get()获得Future对象 producer.send( new ProducerRecord<>( topic, key, value ) ); } /** * 提交事务,此方法会首先刷出所有尚未发送的消息,然后再提交 * 任何一次send()调用出现不可恢复的错误,该方法会再接收到错误后立即抛出异常,事务无法提交 * 为了事务能提交,所有send()必须按序的成功 */ producer.commitTransaction(); } catch ( ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e ) { // 无法从这些异常中恢复 producer.close(); } catch ( KafkaException e ) { // 对于其他类型的异常,可以中止然后重试 // 中止后,所有已经成功的写操作会被标记为aborted producer.abortTransaction(); } producer.close(); |
要使用此API,可以引入Maven依赖:
1 2 3 4 5 |
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency> |
使用该类能够从Kafka集群消费记录。此客户端:
- 能够透明的处理代理实例的失败。也就是说,如果代理失败导致分区迁移到其他代理,不需要手工处理
- 可以使用消费者组,与代理进行交互,让一组消费者进行负载均衡
与生产者不同,消费者不是线程安全的。
消费者需要维持到必要的代理的TCP连接,以抓取数据。不再使用消费者后,必须关闭它,否则这些连接会泄漏。
Kafka为分区中每个记录都维护一个数字的偏移量(Offset),这个偏移量是记录在分区中的唯一标识。
偏移量也用于标记消费者针对分区的当前消费位置(Position),例如消费者当前位置为5则意味着它已经消费过0-4记录并且下一个将被消费的记录是5。
对于消费者来说,“位置”有两层含义:
- 下一个将要读取的记录的偏移量,比消费者消费的最后一个记录的偏移量大1。每当消费者poll(long)了新记录后此位置自动更新
- 已提交位置(Committed Position),指已经被安全存储的偏移量值。消费者如果失败,重启后将根据此值进行恢复。该位置信息可以自动的定期提交。或者调用commitSync/commitAsync显式的提交
Kafka引入消费者组(Consumer Group)的概念,来允许一池子的消费者划分消费、处理记录的工作。这些消费者可以运行在同一台或者多台机器上允许。任何具有相同 group.id的消费者都属于同一个组。
通过subscribe接口,组中的每个消费者都可以动态的订阅主题。Kafka会把每个消息递送给组中的单个消费者,具体实现方式是,通过负载均衡,让主题的任一分区仅供单个消费者来消费,组中的消费者会被尽可能的分配相同个数的分区。
组中的成员是动态维护的,如果某个消费者宕掉,分配给它的分区会重新分配给组中其它成员。当新消费者加入后,既有消费者持有的分区会转移给它。此所谓再平衡(Rebalancing)。当发生再平衡时,消费者可以通过 ConsumerRebalanceListener得到通知,从而进行状态清理、手工位置提交等操作。
从概念上,你可以把组看做单个逻辑的消费者。这个逻辑消费者不会重复的消费某个消息。
当订阅1-N个主题后,消费者首次调用poll(long)时会加入到消费者组。poll的实现能够监测消费者是否失败。只要你继续调用poll,消费者就会保持组成员的身份,并且持续的接收消息。在底层,消费者会定期的发送心跳给代理,如果消费者失败且代理在session.timeout.ms内没有收到心跳,就革除消费者的组成员资格并触发再平衡。
某些情况下,消费者虽然没有崩溃,但是它却进入一种livelock的状态。也就是说,尽管消费者仍然在正常发送心跳,但是它已经不能进行任何消息处理了。为了检测这一情况Kafka引入了max.poll.interval.ms配置,如果消费者在此时间内没有发起过poll()调用,代理会主动革除其组成员资格并触发再平衡。如果这种革除操作发生后,消费者进行位置提交时会收到CommitFailedException。为了保持组成员资格,消费者必须不断的poll。
增大max.poll.interval.ms参数,消费者可以用更充足的时间来处理poll()得到的记录。这样做的缺点是,再平衡可能延迟,因为仅在poll()调用时,消费者才能参与到再平衡中。
配置max.poll.records参数可以限制单次poll()操作能够返回的记录数量。通过减小此参数,你就能相应的调低max.poll.interval.ms。
如果消息处理时间无法估算,你可能需要将处理消息的逻辑从poll()线程中转移出去,由独立的线程处理。这样做的话你需要小心的管理提交位置,因为只有前述的独立线程才知道何时消息处理完毕。通常情况下需要禁用自动的位置提交,由独立线程手工的进行提交。
如果消息抓取的快而处理的相对较慢,你可以考虑pause()目标分区,这样poll()操作就不会抓取新的记录。
使用自动偏移量提交,可以保证最少一次递送语义。前提是,你必须在后续调用(或者关闭消费者)之前,消费上一次poll()返回的全部数据。如果无法保证全部消费,自动提交的偏移量可能比实际成功消费的偏移量大,导致记录遗漏。
自动偏移量提交的消费者代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
Properties props = new Properties(); props.put( "bootstrap.servers", "172.21.3.1:9092,172.21.3.2:9092,172.21.3.3:9092" ); props.put( "group.id", "group0" ); // 启用自动的位置提交 props.put( "enable.auto.commit", "true" ); // 自动提交的间隔 props.put( "auto.commit.interval.ms", "1000" ); props.put( "key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" ); props.put( "value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer" ); KafkaConsumer<String, String> consumer = new KafkaConsumer<>( props ); // 可以同时订阅多个主题 consumer.subscribe( Arrays.asList( "test", "hello" ) ); while ( true ) { // poll并等待100ms ConsumerRecords<String, String> records = consumer.poll( 100 ); for ( ConsumerRecord<String, String> record : records ) { String topic = record.topic(); int partition = record.partition(); long offset = record.offset(); String key = record.key(); String value = record.value(); LOGGER.debug( "Record from {}/{}, offset: {}, Key: {}, Value: {}", topic, partition, offset, key, value ); } } |
手工偏移量提交的消费者代码示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
// ... props.put( "enable.auto.commit", "false" ); // ... final int minBatchSize = 200; while ( true ) { ConsumerRecords<String, String> records = consumer.poll( 100 ); buffer.add( records ); if ( buffer.size() >= minBatchSize ) { // 在这里进行批量处理,例如持久化 // 手工提交一次偏移量,这意味着之前收到的消息均被标记为已提交 consumer.commitSync(); buffer.clear(); } } |
某些情况下,你可能希望对偏移量的提交进行细粒度的控制,例如按分区单独提交:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
try { while ( running ) { ConsumerRecords<String, String> records = consumer.poll( Long.MAX_VALUE ); // 按分区处理 for ( TopicPartition partition : records.partitions() ) { // 获取当前分区的所有记录 List<ConsumerRecord<String, String>> partitionRecords = records.records( partition ); for ( ConsumerRecord<String, String> record : partitionRecords ) { // 消费 } // 获取最后一个记录的偏移量 long lastOffset = partitionRecords.get( partitionRecords.size() - 1 ).offset(); // 提交当前分区的消费偏移量:必须指向下一个需要被消费的记录 consumer.commitSync( Collections.singletonMap( partition, new OffsetAndMetadata( lastOffset + 1 ) ) ); } } } finally { consumer.close(); } |
上面的两个例子中,分区由Kafka公平的分配给组中的消费者实例。某些情况下,你可能需要更细粒度、主动的控制,例如:
- 消费者在本地存储了状态,这些状态和特定的分区相关。因此消费者仅应抓取目标分区的记录
- 消费者本身具有HA特性,例如它被YARN之类的集群管理框架管理、属于Storm等流处理框架的一部分。这种情况下,不需要Kafka进行故障检测,因为消费者失败后会自动的重新启动
要手工分配分区,可以使用assign代替subscribe调用:
1 2 3 4 5 |
String topic = "test"; TopicPartition partition0 = new TopicPartition(topic, 0); TopicPartition partition1 = new TopicPartition(topic, 1); // 将分区0、1分配给消费者 consumer.assign(Arrays.asList(partition0, partition1)); |
执行上述分配后,你就可以进行poll。消费者的配置参数group.id仍然用于偏移量的提交。除非再次调用assign,分区和消费者的映射关系不会发生变化,因为手工分配模式下组协调器不生效,消费者失败不会导致再平衡。
尽管不同消费者实例可以共享组ID,但是它们是独立工作的。为了避免偏移量提交冲突,手工分配模式下,通常为每个消费者指定唯一性的组ID。
注意:自动分区分配(subscribe)和手工分区分配不能一起使用。
尽管Kafka内置的消费偏移量的存储机制,但是你并不一定要使用它。
应用程序独立管理消费偏移量的情况很常见,记录处理结果往往需要和消费偏移量原子的写入到同一存储系统,提供更强的精确一次性语义。典型的例子:
- 如果在RDBMS中存储消费后的结果,那么你可以在单个事务中同时提交消费偏移量和消费结果
- 如果在本地存储消费后的结果,连带存储消费偏移量也能带来好处。假设你希望基于某个分区的数据构建一个索引,索引数据可以和偏移量一起存储。只要能保证索引写和偏移量写原子的进行,那么即使宕机导致数据没有同步到磁盘,后续仍然可以恢复而不丢失数据
每个记录都自带了偏移量信息,因此,要外部存储偏移量,你只需要:
- 配置 enable.auto.commit=false
- 手工存储ConsumerRecord提供的偏移量
- 消费者重启后,调用 seek(TopicPartition, long)恢复上次消费位置
上述步骤配合手工分区分配,最为简单。
如果配合自动分区分配,则需要考虑可能的再平衡的影响。你需要调用 subscribe(Collection/Pattern, ConsumerRebalanceListener)并提供一个ConsumerRebalanceListener监听器:
- 实现onPartitionsRevoked(Collection)方法,这样当分区从消费者处拿走时,当前消费者有机会存储偏移量
- 实现onPartitionsAssigned(Collection),这样当消费者被授予新分区时,当前消费者能够读取先前存储的偏移量并seek到适当的位置
ConsumerRebalanceListener还可以用于,在分区迁移时,刷出应用程序维护的和目标分区相关的任何缓存信息。
大部分情况下,消费者只是简单的从头开始消费,定期的提交消费偏移量。
Kafka也允许消费者任意的指定消费位置:
- 消费者可以跳转到更小的偏移量,对历史数据进行重新消费。某些情况下消费者可能需要在重启后从头消费,已建立本地的缓存状态
- 消费者可以跳转到更大的偏移量,跳过一部分未消费的数据。在时间敏感的记录处理程序中会这样跳转,当消费者处理速度赶不上时,可能直接跳过一部分记录
和跳转相关的消费者方法包括:
- seek(TopicPartition, long),跳转到指定偏移量
- seekToBeginning(Collection),跳转到最小的偏移量
- seekToEnd(Collection),跳转到最大的偏移量
如果给消费者分配了多个分区,它默认会尝试同时读取所有分区中的可用数据,也就是分区的优先级是一样的。
某些情况下,可能需要优先读取某个分区的数据,仅仅在有空闲的时候才消费其它分区的数据。例如:
- 在流处理程序中,你抓取两个主题的数据,并对两个流进行Join操作。如果其中一个流的消费进度远远拉下,则无法有效的Join。这时可能需要把消费快的那个流暂停下来
- 同时需要抓取历史数据和实时数据时,可能需要优先抓取实时数据
调用 pause(Collection)可以暂停消费已分配的分区,调用 resume(Collection)则可以从暂停中恢复。暂停以后,调用poll()不会获取被暂停分区的数据。
从0.10引入的事务支持,允许生产者原子的写入多个分区、主题的数据。为了配合事务性生产者,消费者必须配置 isolation.level=read_committed。
启用读取已提交这一隔离级别后,消费者仅会读取已经成功提交的事务性消息。对于非事务性消息,读取方式和以前一致。
读取已提交模式下,客户端并没有缓冲机制。这种模式下消费者针对分区的读偏移量,是该分区中第一个属于进行中事务的记录的偏移量。此偏移量被称为最后稳定偏移量(Last Stable Offset,LSO)。一个进行中的事务可以影响多个分区甚至主题的LSO。
读取已提交模式下的消费者,会最多读取到LSO的前一个位置,并且会过滤掉所有Aborted的任何事务性消息。消费者调用seekToEnd(Collection)、endOffsets(Collection)的结果也会执行LSO。度量值抓取延迟(Fetch Lag)也相对于LSO计算。
包含了事务性消息的分区,会包含提交、中止(Abort)标记,用以说明事务的成功与否。这些标记体现为主题日志中的记录,并且这些记录不会返回给客户端。插入在日志中的标记会导致消费者看到消息偏移量之间出现空隙,不管是哪种隔离级别的消费者都会看到这种空隙。对于读取已提交的消费者,空隙还可能由于中止的事务导致。
KafkaConsumer是非线程安全的。所有的网络IO发生在调用线程中。多线程访问时的同步必须由调用者保证,未同步访问可能导致ConcurrentModificationException。
唯一不需要同步的例外是wakeup()调用。此调用通常在其它线程中发起,中断正在进行中的poll()操作以便关闭消费者:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
public class KafkaConsumerRunner implements Runnable { private final AtomicBoolean closed = new AtomicBoolean(false); private final KafkaConsumer consumer; public void run() { try { consumer.subscribe(Arrays.asList("topic")); while (!closed.get()) { ConsumerRecords records = consumer.poll(10000); // 处理抓取到的记录 } } catch (WakeupException e) { // poll()被中断会导致该异常 if (!closed.get()) throw e; } finally { consumer.close(); } } // 下面的方法可以被外部线程调用 public void shutdown() { closed.set(true); // 中断正在进行中的poll()方法 consumer.wakeup(); } } |
Kafka没有设计消费者的线程模型,开发者可以按需开发。
每个消费者独占一个线程的方式,其优势为:
- 容易实现
- 由于不需要跨线程的协调,往往是最快的方式
- 按照分区保证消息处理顺序很容易实现,线程简单的根据获得消息的顺序消费它们
其缺点是:
- 多个消费者之间,无法合并请求来批量发送给服务器处理,降低了吞吐能力
- 线程的总数受限于分区的总数,因为每个消费者至少需要独占一个分区
为了避免上述线程模型的缺点,可以将消息的消费和处理进行解耦。由一个或者多个消费者线程抓取数据,并将数据ConsumerRecords存放到阻塞队列中。由一个处理线程池来处理队列中的记录。该方式的优势是:
- 可以独立的对消费者、处理者线程进行扩容
- 需要注意数据的处理顺序问题,由于线程调度的随机性,旧记录实际的处理时间可能在新记录的后面。如果不关注顺序性则不是问题
- 手工的消费偏移量提交变得困难,需要协调多个线程,确认针对分区的消费已经完成
要使用Kafka Streams API,可以引入Maven依赖:
1 2 3 4 5 |
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>1.0.0</version> </dependency> |
详细的讨论参考Streams API一章。
很多场景下,你不需要直接使用Connect API,只需要使用内置的Connector就可以了。
详细的讨论参考Connect API一章。
要使用此API,可以引入Maven依赖:
1 2 3 4 5 |
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency> |
使用AdminClient.create()可以创建管理客户端的实例(KafkaAdminClient)。此管理客户端可以用于管理、查看主题、代理、配置、ACL。代理的版本必须在0.10.0以上。
Kafka Connect是用于在Kafka和其它系统之间进行可扩容、可靠的流传输的工具。使用它你可以快速的定义将大量数据输入到Kafka或者从Kafka输出的“连接器”。
使用Kafka Connect可以读取整个数据库,收集来自所有应用程序的度量信息并存入Kafka主题,并让这些数据适合用于实时的流处理。
使用Kafka Connect可以将主题数据导出到存储系统、查询系统或者批处理系统,便于离线分析。
通用的连接器框架 Kafka Connect标准化了其它数据系统和Kafka的集成机制,简化了连接器的开发、部署和管理 |
分布式和独立运行模式 Kafka Connect可以分布式集群运行,也可以缩小到单个实例运行,满足不同规模应用程序的需要 Kafka Connect基于现有的Kafka组管理协议,需要扩容的时候,把新的Worker添加到Kafka Connect集群中即可 |
REST接口 可以通过REST接口向Kafka Connect Cluster提交连接器,或者管理连接器 |
自动偏移量管理 只需要连接器提供少量的信息,框架就能够自动管理偏移量提交过程。这样开发者可以从连接器开发的容易出错的部分解放出来 |
桥接流式和批量处理系统 使用Kafka既有的特性,Kafka Connect提供了桥接流式(实时)处理系统、批量处理系统的解决方案 |
本节给出一个简单的例子,说明如何配置、运行、管理单实例的Kafka Connect。
单实例模式下,所有工作都在单个进程(Worker)内部完成。单实例模式可能适用于日志文件收集这样的场景,但是不能利用Kafka Connect的容错特性。
启动单实例连接器的命令格式如下:
1 2 3 |
# 第一个参数:Worker的配置 # 后续参数:各连接器的配置 connect-standalone.sh config/connect-standalone.properties connector1.properties connector2.properties ... |
分布式模式下,负载均衡被自动处理,允许随时按需扩容。多实例模式提供活动Task、配置信息、偏移量提交数据的容错:
1 2 |
# 不支持通过命令行提供连接器配置 connect-distributed.sh config/connect-distributed.properties |
单实例和分布式模式会执行不同的Java类,这两个类会读取Worker配置并决定在何处存储配置信息、如何分配任务、在何处存储偏移量和任务状态
基础的通用配置项:
配置项 | 说明 |
bootstrap.servers | 如何连接到Kafka集群 |
key.converter value.converter |
如何将键值从Java对象形式转换为Kafka串行化格式 |
单实例Worker专有配置如下:
配置项 | 说明 |
offset.storage.file.filename | 此参数对单实例模式很重要,指明在何处存放偏移量数据 |
上面的Worker配置供Kafka Connect创建的生产者、消费者使用,以便访问配置、偏移量、状态主题。
对于Kafka Source和Sink任务,上述配置可以前缀consumer.和.producer,来针对任务进行配置。唯一从Worker继承来的配置项是 bootstrap.servers。
分布式模式下,Kafka Connect在Kafka主题中存储偏移量、配置和任务状态。你应当手工创建用于存放这些信息的主题,以便定制分区数量和复制因子。如果Kafka Connect启动时这些主题不存在,将以默认参数自动创建。
分布式Worker专有配置如下:
配置项 | 说明 |
group.id |
集群的唯一名称,默认connect-cluster。用于构成Kafka Connect集群组 此名称不能和Kafka消费者组名冲突 |
config.storage.topic |
用于存放Connector、Task配置的主题的名称,默认connect-configs 应当是单个分区、大复制因子、压缩格式的主题 |
offset.storage.topic | 用于存放偏移量。应当是多分区、复制的、压缩的主题 |
status.storage.topic | 用于存放状态信息。应当是多分区、复制的、压缩的主题 |
你可以指定多个连接器配置,但是这些连接器都是在Worker进程内使用不同的线程运行。
注意:分布式模式下,连接器配置不通过命令行参数指定。你需要通过REST API来创建、修改、销毁连接器。
连接器配置项也是键值对形式,具体包含的配置项取决于不同的连接器。公共的配置项包括:
配置项 | 说明 |
name | 连接器的唯一性名称,尝试注册重复名称的连接器会导致失败 |
connector.class | 连接器的Java类,可以指定权限定名或者别名,例如org.apache.kafka.connect.file.FileStreamSinkConnector、FileStreamSink、FileStreamSinkConnector都表示同一个连接器 |
tasks.max | 最多为此连接器创建的Task数量 |
key.converter value.converter |
覆盖Worker配置指定的键值转换器 |
topics | 作为此连接器输入/输出的主题列表 |
Kafka Connect允许通过REST API来管理连接器。Web服务默认暴露在8083端口,目前支持的端点如下表:
端点 |
GET /connectors 返回活动连接器的列表 |
POST /connectors 创建一个新的连接器,请求体必须是JSON格式,包含字段name、config |
GET /connectors/{name} 获取指定连接器的信息 |
GET /connectors/{name}/config 获取指定连接器的配置 |
PUT /connectors/{name}/config 设置指定连接器的配置 |
GET /connectors/{name}/status 获取指定连接器的状态,例如
|
GET /connectors/{name}/tasks 获取指定连接器正在允许的Task列表 |
GET /connectors/{name}/tasks/{taskid}/status 获取指定Task的状态信息 |
PUT /connectors/{name}/pause 暂停连接器及其Tasks。消息处理将被暂停 |
PUT /connectors/{name}/resume 恢复一个被暂停的连接器 |
POST /connectors/{name}/restart 重新启动一个连接器 |
POST /connectors/{name}/tasks/{taskId}/restart 重新启动某个任务 |
DELETE /connectors/{name} 删除连接器,中止所有Task并移除配置 |
通过配置,可以让连接器对消息进行修改或者叫转换,转换可以形成一个链条:
配置项 | 说明 |
transforms | 转换器的列表,配置顺序对应了转换器的执行顺序 |
transforms.$alias.type |
$alias为转换器指定一个别名 转换器的全限定类名 |
transforms.$alias.$transformationSpecificConfig | 转换器的私有配置项 |
这里给出一个示例:内置的文件源连接器 + 添加静态字段的转换器。
本示例使用模式自由的JSON数据格式,因此需要修改Worker 配置文件:
1 2 |
key.converter.schemas.enable=false value.converter.schemas.enable=false |
在本示例中文件源连接器读取文件的每一行,将其包装为Map,然后添加一个字段用于识别数据来源。为完成这些逻辑,我们需要添加两个转换器:
- HoistField:将输入纳入到一个Map中
- InsertField:添加一个字段
修改后的连接器配置如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
name=local-file-source # 从文件读取数据(文件作为源),输出到Kafka主题 connector.class=FileStreamSource tasks.max=1 # 连接器专有配置 file=test.txt topic=connect-test # 指定转换器列表 transforms=MakeMap, InsertSource # 将输入消息转换为键line的值 transforms.MakeMap.type=org.apache.kafka.connect.transforms.HoistField$Value # 转换器的配置项 transforms.MakeMap.field=line # 添加一个静态键值 transforms.InsertSource.type=org.apache.kafka.connect.transforms.InsertField$Value transforms.InsertSource.static.field=data_source transforms.InsertSource.static.value=test-file-source |
应用此连接器后,主题的记录形式如下:
1 2 |
{"line":"hello","data_source":"test-file-source"} {"line":"world","data_source":"test-file-source"} |
常用的内置转换器如下表:
配置项 | 说明 |
org.apache.kafka.connect.transforms.InsertField 通过静态值或者记录的元数据,产生一个字段,并插入到记录的键或者值。具体子类型InsertField$Key、InsertField$Value分别针对记录的键、值进行转换 |
|
offset.field |
存放Kafka偏移量的字段名,仅适用于Sink连接器 如果此配置前缀以 !则表示标注此字段为必须,前缀以 ?则表示可选 |
partition.field | 存放Kafka分区编号的字段名,前缀!/?表示必须/可选 |
static.field | 一个存放静态数据的字段的名称,前缀!/?表示必须/可选 |
static.value | 上述静态字段的值 |
timestamp.field | 存放Kafka记录时间戳的字段名,前缀!/?表示必须/可选 |
topic.field | 存放Kafka主题名的字段的名字,前缀!/?表示必须/可选 |
org.apache.kafka.connect.transforms.ReplaceField 过滤或者重命名现有的字段。具体子类型ReplaceField$Key、ReplaceField$Value分别针对记录的键、值进行转换 |
|
blacklist | 需要过滤掉的字段名,优先级比whitelist高 |
renames | 字段重命名映射,格式:oldname:newname,oldname2:newname2 |
whitelist | 需要包含的字段名,不在列表中的所有字段被过滤掉 |
org.apache.kafka.connect.transforms.MaskField 将指定字段的值替换为一个合法(类型相关)的“空值”,例如0、false、空串。具体子类型MaskField$Key、MaskField$Value分别针对记录的键、值进行转换 |
|
fields | 需要被遮掩的字段列表 |
org.apache.kafka.connect.transforms.HoistField 当具有限定的Schema时,使用指定的结构来包裹数据;当使用自由Schema时,使用Map包裹数据。HoistField$Key、HoistField$Value分别针对记录的键、值进行转换 |
|
field | 包裹记录数据的字段的名字 |
org.apache.kafka.connect.transforms.ExtractField 从结构(限定Schema)或者Map(自由Schema)中抽取一个字段。任何空值不做修改。ExtractField$Key、ExtractField$Value分别针对记录的键、值进行转换 |
|
field | 需要抽取的字段的名字 |
org.apache.kafka.connect.transforms.SetSchemaMetadata 针对键(SetSchemaMetadata$Key)或值(SetSchemaMetadata$Value)来设置Schema名称、版本 |
|
schema.name | string,模式的名称 |
schema.version | int,模式的版本 |
org.apache.kafka.connect.transforms.TimestampRouter 根据原始topic、记录时间戳来更新记录的topic字段 主要用于Sink连接器,原因是topic字段常常在Sink连接器中用作确定目标系统中的实体名(例如数据库系统中的表名) |
|
timestamp.format | 对时间戳进行格式化的Pattern,此Pattern必须兼容java.text.SimpleDateFormat。默认yyyyMMdd |
topic.format | 更新后的topic字段的格式,可以使用${topic}、${timestamp}两个占位符 |
org.apache.kafka.connect.transforms.RegexRouter 使用正则式匹配替换,来更新记录的topic字段 |
|
regex | 用于匹配的正则式 |
replacement | 原topic中匹配项被替换为的字符串 |
org.apache.kafka.connect.transforms.Flatten 扁平化一个内嵌的数据结构,新产生的字段名以点号导航的形式产生。Flatten$Key、Flatten$Value分别针对记录的键、值进行转换 |
|
delimiter | 字段名分隔符,默认点号 |
org.apache.kafka.connect.transforms.Cast 将字段、整个键或者整个值转换为指定的类型。例如可以强制一个int字段为short,仅仅支持基本类型和字符串。Cast$Key、Cast$Value分别针对记录的键、值进行转换 |
|
spec |
field:type,field2:type2形式的转换说明。支持的类型包括: int8, int16, int32, int64, float32, float64, boolean,string |
org.apache.kafka.connect.transforms.TimestampConverter 转换时间戳的格式,TimestampConverter$Key、TimestampConverter$Value分别针对记录的键、值进行转换 |
|
target.type | 期望的目标时间戳呈现格式:string、unix、Date、Time、Timestamp |
field | 期望被转换的时间戳字段,如果整个键或值是时间戳则置空 |
format | target.type=string时,指定SimpleDateFormat兼容的Pattern |
概念 | 说明 |
Connector |
为了让Kafka和其它系统进行数据交换,开发人员需要创建一个Connector,连接器有两类:
Connector本身不负责实际的数据拷贝工作,它只是把工作分配给一系列的Task进行处理 并非所有的Job都是静态的,例如JDBCSourceConnector,可以监控数据库并把每个表分配给一个Task。当数据库中创建了新表的时候,Connector需要发现此表并将其分配给某个Task,这是通过重新配置Connector实现的 |
Task |
负责将数据的一个子集拷贝到Kafka,或者从Kafka拷贝出去。Task可以分配到不同的Worker上运行 数据子集必须能够转换为Schema一致的记录构成的输入或输出流。某些情况下数据子集和流的映射关系是很明显的,例如日志文件集中的每个文件都可以产生流,日志的每一行都对应一个记录。某些情况下Offset如何界定则不明显,例如JDBC连接器能够把表转换为流,但是流中记录的Offset如何界定呢?一个常见的方案是,使用表的时间戳字段作为Offset,并基于此字段构建查询,进行批量数据读取 |
Stream | 每个流都是键值对的流水序列。键、值都可以具有复杂的结构,例如数组、嵌套对象 |
Record | 不管是由Source生成的,还是输出给Sink的记录,都有关联的Stream ID和Offset。框架会定期的进行偏移量提交,这样发生失败后,可以从上一次提交的偏移量处恢复 |
开发连接器仅仅需要实现Connector、Task两个接口。
从文件读取记录的源连接器和任务:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 |
package cc.gmem.study.kafka; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.source.SourceConnector; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class FileStreamSourceConnector extends SourceConnector { private String filename; private String topic; private final String FILE_CONFIG = "filename"; private final String TOPIC_CONFIG = "topic"; /** * 此连接器的任务实现类 * 需要在Worker中实例化的,实际的从外部读取数据的任务的Java类 */ public Class<? extends Task> taskClass() { return FileStreamSourceTask.class; } /** * 生命周期回调,启动连接器 * 该方法仅仅会在"干净"的连接器上面调用,所谓干净,要么是刚刚实例化、初始化的 * 要么是已经被停止的 * * @param props 传入的配置信息 */ public void start( Map<String, String> props ) { filename = props.get( FILE_CONFIG ); topic = props.get( TOPIC_CONFIG ); } /** * 声明周期回调,停止连接器 */ public void stop() { } /** * 产生最多max个任务的配置信息 */ @Override public List<Map<String, String>> taskConfigs( int max ) { List<Map<String, String>> configs = new ArrayList<>(); // 仅仅支持单个任务 Map<String, String> config = new HashMap<>(); config.put( FILE_CONFIG, filename ); config.put( TOPIC_CONFIG, topic ); configs.add( config ); return configs; } /** * 返回此连接器的版本 */ @Override public String version() { return null; } /** * 定义连接器的配置 */ @Override public ConfigDef config() { return null; } } |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
package cc.gmem.study.kafka; import org.apache.commons.io.IOUtils; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; public class FileStreamSourceTask extends SourceTask { private String filename; private InputStream stream; private String topic; /** * 启动任务,在此处理任务配置,打开需要的资源 * <p> * 实际应用时,还要考虑从Offset恢复的情况 * * @param props 任务配置 */ @Override public void start( Map<String, String> props ) { filename = props.get( FileStreamSourceConnector.FILE_CONFIG ); stream = openStream( filename ); topic = props.get( FileStreamSourceConnector.TOPIC_CONFIG ); } /** * Task被分配给专用的线程,此线程可能永久的阻塞,需要从Worker的其它线程调用stop方法才能停止 */ @Override public synchronized void stop() { IOUtils.closeQuietly( stream ); } /** * 从外部系统拉取数据并构建源记录,如果没有可用的记录此方法应该阻塞 * * @return 源记录的集合 * @throws InterruptedException 当阻塞时被中断 */ @Override public List<SourceRecord> poll() throws InterruptedException { try { List<SourceRecord> records = new ArrayList<>(); while ( records.isEmpty() ) { LineAndOffset line = readToNextLine( stream ); if ( line != null ) { records.add( // 源记录,主要包含四个元素 new SourceRecord( // 源分区(相当于流的标识符),这里只有一个源分区,也就是唯一的输入文件 Collections.singletonMap( "filename", filename ), // 源偏移量,这里即文件中的字节偏移量 Collections.singletonMap( "position", streamOffset ), // 输出主题 topic, // 输出值及其Schema。这里的Schema提示输出永远是一个字符串 Schema.STRING_SCHEMA, line ) ); } else { Thread.sleep( 1 ); } } return records; } catch ( IOException e ) { } return null; } @Override public String version() { return null; } private InputStream openStream( String filename ) { try { return new FileInputStream( filename ); } catch ( FileNotFoundException e ) { throw new RuntimeException( e.getCause() ); } } } |
注意上面Task所释放出的源记录,每个记录都关联了流标识符(文件名)和偏移量信息。Connect框架会利用这些信息,进行周期性的偏移量提交。这样,一旦Task出现失败,可以基于偏移量进行恢复,尽可能减少重复处理、重复记录。
偏移量提交完全由框架负责,Kafka Connect暴露了接口供Task读取偏移量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
@Override public void initialize( SourceTaskContext context ) { super.initialize( context ); stream = new FileInputStream( filename ); // 获取偏移量阅读器 OffsetStorageReader offsetStorageReader = context.offsetStorageReader(); // 读取指定分区的偏移量,偏移量是一个字典而不是简单的数值 Map<String, Object> offset = offsetStorageReader.offset( Collections.singletonMap( "filename", filename ) ); if ( offset != null ) { Long lastRecordedOffset = (Long) offset.get( "position" ); if ( lastRecordedOffset != null ) seekToOffset( stream, lastRecordedOffset ); } } |
Sink连接器的接口和Source类似。但是SinkTask和SourceTask则不同,因为后者使用的是拉模式而前者是推模式:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
public abstract class SinkTask implements Task { public void initialize(SinkTaskContext context) { this.context = context; } // 逻辑集中在此方法中,需要将SinkRecord进行转换,最终输出到外部系统 // 此方法不一定需要保证数据完全写入到外部系统中,可以提前返回 // SinkRecord包含的信息基本和SourceRecord一致 public abstract void put(Collection<SinkRecord> records); // 在偏移量提交阶段调用下面的方法,此方法应该刷出未写入到外部系统的数据,并阻塞等待操作完成 // currentOffsets可以用于实现精确一次性语义(和写入到外部系统的数据原子的一同写入) public void flush(Map<TopicPartition, OffsetAndMetadata> currentOffsets) { } } |
Kafka Connect的主要价值是用于定义大批量的数据拷贝Job,例如,Job通常用来拷贝整个数据库而非单张表。这种设计意味着连接器的输入/输出流(表、文件等等)会动态的变化。
在实现SourceConnector时,你应当考虑源系统中的变化,例如表的增加或删除。当检测到变化后应当调用ConnectorContext来通知框架:
1 2 |
if (inputsChanged()) this.context.requestTaskReconfiguration(); |
在上述调用之后,框架会立即请求新的配置,并更新Tasks。在重新配置Task之前,框架允许Task优雅的完成提交操作。
某些情况下,和输入流更新有关的逻辑仅仅存在于Connector中。另外一些情况下,Task可能受到影响,需要适当的异常处理。例如,表的删除可能导致Task在拉取数据时出现错误,并且此错误可能在Connector检测到输入流的变化之前发生。
SinkConnector通常仅需要处理新增的Kafka流,Kafka Connect使用正则式订阅来监控SinkConnector的输入主题集的变化,并通知SinkConnector。SinkTask可能需要接收新的流,并在外部系统创建资源。如果多个SinkTask接收同一流则可能出现尝试重复创建同一资源(例如同一张表)的情况,需要注意。
Kafka Connect支持在提交连接器之前,对提供的配置信息进行验证。要使用此功能,你需要实现config()方法:
1 2 3 4 5 6 7 |
private static final ConfigDef CONFIG_DEF = new ConfigDef() .define(FILE_CONFIG, Type.STRING, Importance.HIGH, "Source filename.") .define(TOPIC_CONFIG, Type.STRING, Importance.HIGH, "The topic to publish data to"); public ConfigDef config() { return CONFIG_DEF; } |
ConfigDef.define方法的重载版本允许你提供一个Validator,用于针对单配置项自定义验证规则。
此外,覆盖Connector.validate()方法可以提供配置验证逻辑。
要处理符合结构定义的数据,你需要使用Kafka Connect的Data API。大部分的结构化记录需要和Schema、Struct这两个类交互。这两个类都用于定义Schema:
1 2 3 4 5 6 7 8 9 |
Schema schema = SchemaBuilder.struct().name(NAME) .field("name", Schema.STRING_SCHEMA) .field("age", Schema.INT_SCHEMA) .field("admin", new SchemaBuilder.boolean().defaultValue(false).build()) .build(); Struct struct = new Struct(schema) .put("name", "Barbara Liskov") .put("age", 75); |
在开发SourceConnector时,你需要考虑何时生成Schema。如果Schema是静态的,可以静态块中生成。
但是很多连接器都需要面对动态Schema。例如数据库连接器,这种连接器可能需要处理多张表,就算处理单张表,表的结构也可能随着时间改变。你必须检测这些改变并且做出适当的处理。
Kafka使用properties风格的配置文件。在通过命令行启动Kafka时,你可以指定配置文件路径,或者使用配置项的命令行参数版本。
最基本的配置项包括:broker.id、log.dirs、zookeeper.connect。
配置项 | 说明 | ||
重要配置 | |||
zookeeper.connect |
Kafka使用ZooKeeper存储集群的元数据信息,此配置项提供ZooKeeper的链接字符串:
|
||
advertised.listeners | 发布到ZooKeeper,供客户端使用的监听器列表。如果不指定则同listeners配置项 | ||
auto.create.topics.enable | boolean=true。是否按需自动在服务器上创建主题 | ||
auto.leader.rebalance.enable |
boolean=true。是否启用Leader负载均衡。一个后台线程负责定期检查并触发负载均衡 主题的每个分区都具有一个Leader节点,此节点负责全部读写 |
||
leader.imbalance.check.interval.seconds |
long=300。再平衡检查的间隔 |
||
leader.imbalance.per.broker.percentage |
int=10。每个代理允许的Leader不平衡最大比率(百分比),超过此阈值会导致再平衡 |
||
background.threads | int=10。用于各种后台任务的线程池的大小 | ||
broker.id |
当前服务器的代理标识。如果不设置则自动生成一个唯一性的标识符 自动生成的ID从reserved.broker.max.id + 1开始,以防止和手工指定的ID冲突 |
||
compression.type |
string=producer。指定主题的最终压缩方式,可选值有:
|
||
delete.topic.enable | boolean=true。是否允许删除主题。如果设置为false,通过管理工具执行的删除操作没有任何效果 | ||
listeners |
在其上监听的URI列表,逗号分隔 如果URI不使用安全协议,则listener.security.protocol.map必须配置 如果URI的主机部分设置为0.0.0.0则监听所有网络接口;置空则监听默认网络接口 示例: PLAINTEXT://myhost:9092 |
||
log.dir | 日志数据存储的目录,作为log.dirs的补充 | ||
log.dirs | 日志数据存储的目录,如果不指定则使用log.dir | ||
log.flush.interval.messages |
long=9223372036854775807。在消息被刷出磁盘之前,日志分区上累积的消息的数量 Kafka不建议设置此参数。为了防止数据丢失应该使用集群而不是定期调用fsync。操作系统底层会自动调度,高效的决定何时刷出 |
||
log.flush.interval.ms |
在任何主题中任何消息刷出磁盘之前,在内存中最多驻留的时间 如果不指定,使用 log.flush.scheduler.interval.ms |
||
log.flush.offset.checkpoint.interval.ms | int=60000。每隔多久更新最后一次刷出(Last flush)的持久化记录,此记录作为日志恢复点 | ||
log.flush.scheduler.interval.ms | long=9223372036854775807。日志刷出器每隔多少ms检查是否存在需要刷出到磁盘的日志 | ||
log.flush.start.offset.checkpoint.interval.ms | int=60000。每隔多久更新日志起始偏移量(Start offset)的持久化记录 | ||
log.retention.bytes | 日志的最大保留尺寸 | ||
log.retention.hours log.retention.minutes log.retention.ms |
日志的最大保留时间。优先级逐个升高 | ||
log.roll.hours log.roll.ms |
最多经过多久,必须滚出(Roll out,产生)新的日志段(Segment) | ||
log.roll.jitter.hours log.roll.jitter.ms |
从上面那个参数减去的最大抖动时间 | ||
log.segment.bytes | 日志段的大小 | ||
log.segment.delete.delay.ms | long=60000。在从文件系统删除一个日志文件(段)之前,最多等待的时间 | ||
message.max.bytes |
int=1000012 Kafka允许的最大消息批量的大小(字节)。如果增加此参数并且消费者的版本在0.10.2之前,消费者的抓取尺寸(Fetch Size)也需要相应的增加 在最近版本的消息格式中,出于性能的考虑,记录总是被组装到批次(Batch)中 在以前版本的消息格式中,未压缩记录是不组装批次的。因此该参数针对单条记录 主题可以覆盖此配置 |
||
min.insync.replicas |
int=1 当生产者将acks设置为all或-1时,该配置的意义是指定最少的已经同步写操作的节点数量。也就是说,只有足够多的节点已经确认(Acknowledge)了写操作,生产者才认为写操作成功 如果无法满足此条件,则生产者抛出NotEnoughReplicas或NotEnoughReplicasAfterAppend 配合使用acks和此选项可以增强持久性保证 |
||
num.io.threads | int=8。服务器使用的I/O线程数量,这些线程负责磁盘IO等操作 | ||
num.network.threads | int=3。服务器使用的网络线程的数量,这些线程负责接收请求、发送响应 | ||
num.recovery.threads.per.data.dir | int=1。每个数据目录用于:1、在启动时进行日志恢复;2、在关闭时进行日志刷出的线程数量 | ||
num.replica.fetchers | int=1。从源代理抓取数据以复制消息的线程数量。增加此配置可能增强从代理的并行度 | ||
offset.metadata.max.bytes | int=4096。关联到偏移量提交(Offset Commit)的元数据条目的最大尺寸 | ||
offsets.commit.required.acks | short=-1。通常不需要修改,在提交可以被接受之前,需要的Acks数量 | ||
offsets.commit.timeout.ms | int=5000。偏移量提交(Offset Commit)会被推迟,直到所有Replica接收到提交,或者到达此配置指定的延迟 | ||
offsets.load.buffer.size | int=5242880。当加载偏移量到缓存中时,从偏移量段(Offsets Segments)读取数据的批次大小 | ||
offsets.retention.check.interval.ms | long=600000。检查偏移量是否过期(Stale)的间隔 | ||
offsets.retention.minutes | int=1440。超过此驻留时间的偏移量被丢弃 | ||
offsets.topic.compression.codec | int=0。偏移量提交主题(Offsets Commit Topic )的压缩算法。使用压缩可以实现“原子”提交 | ||
offsets.topic.num.partitions | int=50。偏移量提交主题包含的分区数量。部署后不应再修改 | ||
offsets.topic.replication.factor | short=3。偏移量提交主题的复制因子。在集群成员大小足够前,内部的主题创建会失败 | ||
offsets.topic.segment.bytes | int=104857600。偏移量提交主题的段大小 | ||
queued.max.requests | int=500。在阻塞网络线程(不再接受新请求)之前,排队的请求数量 | ||
replica.fetch.min.bytes | int=1。Follower Replica期望每次抓取请求能接收的响应的最小字节数。如果字节数不够,Follower会等待直到replica.fetch.wait.max.ms | ||
replica.fetch.wait.max.ms | int=500。Follower Replica发送抓取请求后等待响应的最大时间。此配置应该总是小于replica.lag.time.max.ms,以防止低吞吐量主题的ISR频繁的收缩 | ||
replica.high.watermark.checkpoint.interval.ms | long=5000。高水位保存到磁盘的频率 | ||
replica.lag.time.max.ms | long=10000。Follower Replica在此时间内没有发送抓取请求,或者离Leader Replica的终点偏移量(End Offset)超过此时间,则Leader Replica从ISR列表中将Follower移除 | ||
replica.socket.receive.buffer.bytes | int=65536。网络请求的套接字接收缓冲大小 | ||
replica.socket.timeout.ms | int=30000。网络套接字的超时时间,应当大于replica.fetch.wait.max.ms | ||
request.timeout.ms | int=30000。客户端等待响应的最大时间,超过此时间客户端可能重发请求或者失败(重发次数超过限制) | ||
socket.receive.buffer.bytes | int=102400。服务器端套接字的SO_RCVBUF配置。如果设置为-1则使用OS默认值 | ||
socket.request.max.bytes | int=104857600。一个套接字请求包含的最大字节数 | ||
socket.send.buffer.bytes | int=102400。服务器端套接字的SO_SNDBUF配置。如果设置为-1则使用OS默认值 | ||
transaction.max.timeout.ms | int=900000。事务的最大超时,如果客户端请求的事务时间超过此配置则代理在InitProducerIdRequest调用中返回一个错误,以阻止客户端使用太大的超时 | ||
transaction.state.log.load.buffer.size | int=5242880。加载生产者ID、事务到缓存中时,读取事务状态日志的批次大小 | ||
transaction.state.log.min.isr | int=2。为事务状态主题覆盖min.insync.replicas | ||
transaction.state.log.num.partitions | int=50。事务状态主题的分区数量。部署后不应该修改 | ||
transaction.state.log.replication.factor | short=3。事务状态日志的复制因子 | ||
transaction.state.log.segment.bytes | int=104857600。事务状态日志的段大小,应该设置的相对较小,以保证较快的日志压缩和缓存加载 | ||
transactional.id.expiration.ms | int=604800000。事务协调器等待此时间后,主动(不需要从生产者接收任何事务状态更新)将生产者的事务ID过期 | ||
unclean.leader.election.enable | boolean=false。指示是否允许非ISR(同步)的Replica可以被选举为Leader,设置为true可能导致数据丢失 | ||
zookeeper.connection.timeout.ms | 创建到ZooKeeper的连接的超时时间,如果不指定使用zookeeper.session.timeout.ms | ||
zookeeper.session.timeout.ms | int=6000。ZooKeeper会话过期时间 | ||
zookeeper.set.acl | boolean=false。是否设置ACL | ||
broker.rack | 代理所在的机柜,用于Rack-aware的复制分配,用于跨数据中心复制 |
主题可以从服务器默认值继承配置,并且可以覆盖。要指定主题专有配置,可以:
1 2 3 4 |
kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1 --replication-factor # 使用下面的选项来覆盖配置 --config max.message.bytes=64000 --config flush.messages=1 |
在主题创建之后,你仍然可以修改配置:
1 2 3 4 5 |
kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic # 修改配置 --alter --add-config max.message.bytes=128000 # 移除配置 --alter --delete-config max.message.bytes |
执行下面的命令,可以查看一个主题的配置覆盖情况:
1 |
kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my-topic --describe |
重要配置项如下:
配置项 | 说明 |
cleanup.policy |
list=delete。对应服务器默认配置:log.cleanup.policy 老旧消息的驻留策略,可选值delete/compact |
compression.type |
string=producer。对应服务器默认配置项:compression.type 主题的最终压缩方式 |
delete.retention.ms |
long=86400000。对应服务器默认配置项:log.cleaner.delete.retention.ms 对于log compacted主题,删除墓碑标记的驻留时间。如果消费者从Offset 0 开始读取,它必须在此配置指定的时间内完成,以确保获得有效的快照 |
file.delete.delay.ms |
long=60000。对应服务器默认配置项:log.segment.delete.delay.ms 从文件系统中删除文件之前,等待的时间 |
flush.messages |
long=9223372036854775807。对应服务器默认配置项:log.flush.interval.messages 指定一个消息计数,累计超过此数量的消息强制fsync日志到文件系统 |
flush.ms |
long=9223372036854775807。对应服务器默认配置项:log.flush.interval.ms 指定一个间隔,超过此时间强制fsync日志到文件系统 |
follower.replication.throttled.replicas |
list。对应服务器默认配置项:follower.replication.throttled.replicas Replica的列表,对于这些Replica日志复制在Follower段限速 取值*表示针对所有Replica限速,可以指定partitionId:brokerId、brokerId、partitionId |
leader.replication.throttled.replicas |
list。对应服务器默认配置项:leader.replication.throttled.replicas 类似上一条,只是节流在Leader端执行 |
index.interval.bytes |
int=4096。对应服务器默认配置项:log.index.interval.bytes 通常不需要修改此选项。控制Kafka向Offset索引添加条目的频率。更高的频率允许消费者跳转到更精确的位置,但是会导致索引占用空间更大 |
max.message.bytes | int=1000012。对应服务器默认配置项:message.max.bytes |
message.format.version |
string=1.0-IV0。对应服务器默认配置项:log.message.format.version 代理为该主题添加日志时,使用的消息格式版本 |
message.timestamp.difference.max.ms |
long=9223372036854775807。对应服务器默认配置项:log.message.timestamp.difference.max.ms 代理接收到消息时的实际时间和消息中指定的时间戳的最大差距。如果超过此值并且:
|
message.timestamp.type |
string=CreateTime。对应服务器默认配置项:log.message.timestamp.type 消息时间戳存储消息创建时间(CreateTime)还是附加到日志的时间(LogAppendTime)
|
min.cleanable.dirty.ratio |
double=0.5。对应服务器默认配置项:log.cleaner.min.cleanable.ratio 当启用日志整理(log compaction)时,该选项控制日志整理器尝试清理日志的频率。默认值是,当50%的消息已经被整理(也就是重复消息不超过50%),则不触发新的整理。取值越大,则整理频率越低也越高效,同时导致日志浪费更多空间 |
min.compaction.lag.ms |
long=0。对应服务器默认配置项:log.cleaner.min.compaction.lag.ms 当启用日志真理时,该选项控制一个消息在被整理之前至少需要等待的时间 |
min.insync.replicas |
int=0。对应服务器默认配置项:min.insync.replicas |
preallocate |
boolean=false。对应服务器默认配置项:log.preallocate 当创建一个新的日志段时,是否在磁盘上预分配空间 |
retention.bytes | long=-1。对应服务器默认配置项:log.retention.bytes |
retention.ms | long=604800000。对应服务器默认配置项:log.retention.ms |
segment.bytes | int=1073741824。对应服务器默认配置项:log.segment.bytes |
segment.index.bytes |
int=10485760。对应服务器默认配置项:log.index.size.max.bytes 控制索引的大小,索引用于将偏移量映射到文件位置。索引文件被预先创建,仅当日志滚动(创建新段)之后才收缩其大小 |
segment.jitter.ms | long=0。对应服务器默认配置项:log.roll.jitter.ms |
segment.ms | long=604800000。对应服务器默认配置项:log.roll.ms |
unclean.leader.election.enable | boolean=false。对应服务器默认配置项:unclean.leader.election.enable |
下表列出Java生产者可以使用的配置项:
配置项 | 说明 |
bootstrap.servers |
list=host:port,host:port... 指定初始化到Kafka集群连接时使用的服务器地址端口列表。此列表仅仅用于初始化时发现完整集群,不需要指定集群中所有节点 |
key.serializer | 指定一个实现了org.apache.kafka.common.serialization.Serializer的类,此类用于键的串行化 |
value.serializer | 指定一个实现了org.apache.kafka.common.serialization.Serializer的类,此类用于值的串行化 |
acks |
string=1。在生产者认定操作已经完成之前,Leader必须接收到Ack的数量:
|
buffer.memory |
long=33554432。生产者总计可用的缓冲区,此缓冲区用于暂存等待发送到服务器的那些记录 如果消息生产过快超过消息递送速度,而导致缓冲区爆满,生产者会阻塞max.block.ms,此后仍然没有缓解,生产者抛出异常 此参数大概等于生产者总计的内存消耗。生产者还需要额外的小部分内存用于压缩消息、维护in-flight请求 |
compression.type | string=none。生产者产生的任何数据的压缩方式。默认不压缩,可选gzip、snappy、lz4 |
retries |
int=0。设置大于0的值,则生产者可以在失败后重试发送消息,以避免暂时性网络错误的影响 如果允许重试,但是没有把max.in.flight.requests.per.connection设置为1,则记录的顺序可能改变。例如,两个批次同时发到同一分区,批次1、批次2连续发送,然后1失败2成功,后续重发1 |
batch.size |
int=16384。批量发送的最大字节数,设置为0则禁用批量发送。当多个记录将被发送给同一分区时,生产者会尝试批量发送以减少请求次数。批量发送可以增强服务器、客户端的性能 生产者不会尝试发送大于此配置的记录批次 发送给服务器的请求可以包含多个批次,每个批次针对一个分区 此配置项设置的过小,可能影响吞吐能力;设置的过大,会导致内存浪费,因为这块内存是预先分配的不管是否实际需要 |
client.id | 发送请求时,发送给服务器的一个字符串。用途是跟踪请求的源,此ID可以包含在服务器端日志中 |
connections.max.idle.ms | long=540000。多久以后关闭空闲的连接 |
linger.ms |
long=0。生产者可以将请求发送期间收集的记录组成批次,在下一个请求中一起发送。正常情况下,这种批量发送行为仅在记录产生速度大于发送速度时存在。如果你希望在缓和的负载下也使用批量发送以减小请求次数,可以将该配置设置的大于0。这样,生产者在产生一个消息后,会等待linger.ms,看看有没有下一个消息到来 一旦针对某个分区的记录达到batch.size字节批量发送就会立即发送,而不需要等待linger.ms。linger.ms限定了批量发送引入的最大延迟 |
max.block.ms | long=60000。配置KafkaProducer.send()和KafkaProducer.partitionsFor()的最大阻塞时间,当生产者发送缓冲爆满或者元数据不可用时,这两个方法会被阻塞 |
max.request.size |
int=1048576。单个请求的最大字节数,需要注意服务器端有类似的限制,且取值可能和客户端不一样 该配置对批量发送的大小作出额外限制 |
partitioner.class |
class=org.apache.kafka.clients.producer.internals.DefaultPartitioner 实现org.apache.kafka.clients.producer.Partitioner接口的类 |
receive.buffer.bytes | int=32768。套接字SO_RCVBUF参数,-1则使用OS默认值 |
request.timeout.ms | int=30000。客户端等待响应到达的最大时间。如果超时客户端可能重试或失败 |
security.protocol | string=PLAINTEXT。用于和服务器通信的协议,可选PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL |
send.buffer.bytes | int=131072。套接字SO_SNDBUF,-1则使用OS默认值 |
从0.9.0.0版本开始,Kafka引入新的Java消费者实现,代替原先基于Scala的简单、高层实现。下表的配置可以用于新、老消费者实现:
配置项 | 说明 |
bootstrap.servers | 参考生产者配置 |
key.deserialize | |
value.deserializer | |
fetch.min.bytes | int=1。对于一个抓取请求,服务器应该返回的最小数据量。如果数据量不足,在应答之前请求会等待更多数据。设置为较大的数,则服务器会等待更多的数据再应答,这样可以增加吞吐量 |
group.id |
唯一性的字符串,用于识别该消费者所属的组。当消费者使用:
时,需要该配置 |
heartbeat.interval.ms |
int=3000。使用Kafka的组管理功能时的心跳间隔。心跳用于确认消费者还活着,因为消费者加入、离开组时需要进行(谁消费哪个分区的)再平衡 必须小于session.timeout.ms,通常不大于session.timeout.ms的1/3。如果需要更敏捷的再平衡可以设置的更低 |
max.partition.fetch.bytes |
int=1048576。服务针对每个分区最多返回的数据量。消费者按批次抓取记录,如果针对第一个非空分区的第一个记录批次超过此配置的限制,批次仍然被返回以便消费者能进一步处理 服务器允许的记录批次的最大尺寸,受到message.max.bytes(代理级/主题级)约束 |
session.timeout.ms |
int=10000。使用Kafka组管理功能时,判断一个消费者宕机的延迟。消费者定期发送心跳,这样代理知道它还活着,如果session.timeout.ms后代理没有收到新的心跳,则认为消费者宕机,代理会把消费者从组中移除并进行再平衡 必须在代理配置group.min.session.timeout.ms和group.max.session.timeout.ms之间取值 |
connections.max.idle.ms | long=540000。关闭空闲连接之前的等待时间 |
enable.auto.commit | boolean=true。如果设置为true则消费者的偏移量会在后台定期的提交 |
exclude.internal.topics | boolean=true。Kafka内部主题(例如偏移量)是否暴露给消费者 |
fetch.max.bytes |
int=52428800。对于一个抓取请求服务器最多返回的数据量。消费者按批次抓取记录,如果针对第一个非空分区的第一个记录批次就超过此配置的限制,批次仍然被返回以便消费者能进一步处理。因此,此该配置不是一个绝对性的限制 服务器允许的记录批次的最大尺寸,受到message.max.bytes(代理级/主题级)约束 |
isolation.level |
string=read_uncommitted。可选值read_committed 决定如何读取事务性写入的消息,如果设置为:
对于非事务性消息,该配置没有影响 消息总是按照偏移量顺序返回给消费者,因此read_committed模式下consumer.poll()只能返回最晚到LSO(Last Stable Offset,最新稳定偏移)的消息。LSO即第一个(偏移最靠前)打开事务的偏移量减1。特别的,任何位于进行中事务消息之后的消息都不会返回给消费者。直到这些事务完成。作为结果,read_committed可能导致消费者无法读取到高水位。另外,seekToEnd()在read_committed模式下也仅读到LSO
|
max.poll.interval.ms | int=300000。使用消费者组管理时,调用poll()的最大间隔。如果消费者在此时间内没有进行poll()以抓取更多数据,则服务器认为消费者已经失败,并执行再平衡(将分区分配给其它消费者) |
max.poll.records | int=500。单次poll()调用返回的记录的最大数量 |
partition.assignment.strategy |
list=org.apache.kafka.clients.consumer.RangeAssignor 使用消费者组管理时,客户端所使用的分区分配策略类。客户端使用此类决定如何在消费者实例之间分配分区 |
receive.buffer.bytes | int=65536。套接字参数SO_RCVBUF |
send.buffer.bytes | int=131072。套接字参数SO_SNDBUF |
request.timeout.ms | int=305000。客户端等待请求的响应到达的最大时间,超时后失败或者重发 |
security.protocol | string=PLAINTEXT。与代理通信使用的协议,可选PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL |
配置项 | 说明 |
bootstrap.servers | list=localhost:9092。初始化到Kafka集群连接的服务器列表 |
config.storage.topic | 存储连接器配置的Kafka主题的名称 |
group.id | 当前Worker所属的Kafka Connect集群组 |
key.converter value.converter |
配置Kafka Connect格式和串行化的存储到Kafka主题的格式之间的转换器类 两个配置项分别指定消息中键、值的转换器 |
offset.storage.topic | 存储连接器偏移量的Kafka主题的名称 |
status.storage.topic | 存储连接器、任务状态的主题的名称 |
heartbeat.interval.ms |
int=3000。当使用Kafka的组管理功能时,发送心跳到组协调器(Group Coordinator)的周期 心跳用于确保Worker还活着,并且在有Worker加入/离开组时进行再平衡 必须设置的比session.timeout.ms,典型的小于其1/3 |
rebalance.timeout.ms | int=60000。再平衡开始后,任务Task需要在此配置的时限内刷出数据、提交偏移量。超时后Worker将从组中移除,从而导致偏移量提交失败 |
session.timeout.ms | int=10000。用于检测Worker失败的超时。通常情况下Worker周期性的发送心跳给代理,代理因而确认它还活着,如果超过此配置的时间代理没有收到心跳,则认为Worker失败 |
配置项 | 说明 |
application.id | 流处理应用程序的标识符,必须在Kafka集群内是唯一的。此标识符将用于:
|
bootstrap.servers | 初始化到Kafka集群连接的服务器列表 |
replication.factor | 流处理应用程序创建的changelog主题、repartition主题的复制因子 |
state.dir | 存储状态的目录位置 |
配置项 | 说明 |
bootstrap.servers | 初始化到Kafka集群连接的服务器列表 |
推荐使用Java8的最新版本,例如LinkedIn就使用Java8 + G1回收器,JVM配置如下:
1 2 3 |
-Xmx6g -Xms6g -XX:MetaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80 |
LinkedIn最繁忙的集群,由60个代理、5万个分区(复制因子2),输入消息数量80万每秒,输入流量300MB/s、输出流量1GB+ /s。 基于上述JVM配置,90%的GC停顿时间小于21ms,每秒Young GC次数小于1次。
LinkedIn使用24GB内存的4核Xeon机器。你需要为读写保留足够的缓冲内存空间。假设需要能够缓冲30s,则需要的大概内存是写吞吐量 * 30
LinkedIn使用8个7200RPM的SATA硬盘阵列,磁盘的吞吐能力很重要,因为这经常是瓶颈的所在。可以配备更多数量的磁盘以增强吞吐能力。如果配置更加频繁的Flush,则更高转速的磁盘会让你受益,当然价格也更贵。
Kafka可以很好的运行在所有类UNIX系统上,相比之下Windows平台下则有一些问题。
通常不需要进行很多OS级别的调优,主要考虑一下两个参数:
内核参数 | 说明 |
文件描述符数量限制 |
Kafka使用到文件描述符的地方包括:日志段(每个段对应一个文件)、打开的网络连接 日志段需要的文件描述符总数的计算公式: (number_of_partitions)*(partition_size/segment_size) |
最大套接字缓冲 | 对于跨数据中心的复制,加大此缓冲可能提高性能 |
推荐使用多块磁盘,以增强吞吐能力。不建议和应用程序日志、其它OS文件系统活动共享磁盘,因为这会影响Kafka的处理延迟。
将多块磁盘RAID成单个卷,或者分别挂载为不同的目录,都是可以的。由于Kafka天然的容错性,RAID提供的冗余能力可能不需要,其提高单个逻辑磁盘的能力更有意义。
如果为Kafka配置多个数据目录,则分区会循环分配到这些目录,每个分区仅仅会存放在单个目录中。如果数据不是分区均衡的,则磁盘负载可能不均衡。
RAID在底层进行了负载均衡,因此可能(不一定)避免上述问题。RAID的劣势是它通常对写吞吐量有较大不利影响,并且减少了磁盘可用容量。
RAID另一个潜在的优势是,提高磁盘硬件容错。然而,重建RAID阵列非常IO敏感,几乎让服务器不可用,也就是说RAID不能提升可用性。
Kafka没有消息的内存缓冲池,它总是立即把数据写入到文件系统。由于OS页缓存(Pagecache)的存在,写入操作可能并没有持久化到硬盘中,具体写入时机,要么是应用程序显示调用fsync,要么是OS自行调度。在2.6.32以后的内核中,一系列pdflush线程负责在后台fsync。
你可以配置一个刷出策略,以便在一定时间后、累计一定消息数量后,fsync数据到磁盘。
需要注意:意外宕机不会导致数据丢失,因为通常主题的复制因子都大于1,缺少的数据可以从其它Replica上同步。因此Kafka的默认刷出策略是,应用程序不会触发fsync。fsync只会由OS或者Kafka后台线程完成。
启用应用程序级别的fsync,其缺点是:
- 比较低效的磁盘使用模式,因为显式fsync让OS不能进行写重排序以优化性能
- 大部分Linux文件系统中fsync是阻塞的操作
Kafka没有对文件系统有硬性依赖,但是XFS比起EXT4更加适合Kafka的负载。
- 必须具有很高的吞吐能力,以支持高容量的事件系统,例如实时日志聚合
- 必须具有优雅的后备排队(Backlog)机制,以便处理来自离线系统的、周期性的负载
- 为了实现传统的消息递送语义,必须具有低延迟的特点
- 必须支持分区和分布式,保证弹性和扩容能力
- 必须具有容错能力,不会因为硬件故障导致数据丢失或者重复
以上目标,导致Kafka更像数据库日志,而非消息队列。
Kafka非常依赖于文件系统来存储、缓存消息。很多人觉得磁盘非常慢,无法提供足够的性能。实际上,磁盘可能运行的比想象的更快或更慢,这取决于你如何使用它。良好设计的磁盘数据结构可以让磁盘IO和网络IO一样快,某些特定情况下,磁盘顺序访问可以比内存随机访问更快。
关于磁盘性能的一个重要事实是,最近十年来磁盘的吞吐能力和磁盘寻道的延迟越来越负相关。六块7200RPM磁盘构成的RAID5,其顺序写可以高达600MB/s,但是随机写仅仅100KB/s,这是一个巨大的反差(6000倍)。在所有使用模式下,线性读写的速度都是最可预测的,并且被操作系统很大程度的优化。现代操作系统都提供了预读(Read-ahead)和后写(Write-behind)技术,支持大块的预抓取数据、分组多个小的逻辑写操作为单个物理写操作。
为了补偿随机磁盘IO的低性能,现代操作系统都激进的利用空闲内存,可能所有的空闲内存都被用作磁盘缓存。所有的磁盘读写操作都经由这个缓存层进行处理。除非使用Direct I/O,这种缓存层无法被跳过,因此即使应用程序在内部维护缓存机制,这些缓存很可能在OS的页缓存中被再次缓存。
对于JVM来说,需要注意以下事实:
- 对象形式的数据,其空间占用很长大,常常比串行化格式大两倍。这意味着从存储效率方面来说,内存缓存是比较浪费的
- 随着对空间的占用,垃圾回收器的行为越发复杂和缓慢
因此,使用文件系统进行缓存,依赖于OS的页缓存机制,要比自行在内存中维护缓存更加有效 —— 避免了潜在的重复缓存、避免了JVM中松散的空间结构。在32G内存的机器上,可以有效的缓存28-30G的数据,却可以避免GC带来的性能问题。
使用文件系统缓存的另外一个优势是重启后不需要预热,内存缓存在重启后是空白的,需要重新加载。
使用文件系统缓存,还免去了手工维护内存、磁盘数据的一致性。
当你的磁盘访问风格是顺序读,则操作系统的预读功能将大大的提高读效率。因为预读总是能命中你所需的数据。
Kafka的日志,不在内存中缓存,直接写入到磁盘日志中。
消息系统中的持久化数据结构常常是每个消费者一个队列,附带关联的BTree结构,此数据用于随机访问messages的元数据。BTree是消息系统中最广泛使用的数据结构,用于支持大量事务性、非事务性语义。
但是,BTree结构具有较高的成本。尽管BTree结构本身是对数复杂度(可以近似看做常量复杂度),但是应用到磁盘操作时并非如此。每次磁盘寻道操作大概需要10ms级别,每个磁盘同时仅仅支持单个寻道操作,因此并发度受限,少量的磁盘寻道请求就会导致非常高的Overhead。
存储系统需要混合非常快的页面缓存读写操作,以及非常慢的磁盘寻道操作,因此站在观察者的角度,BTree的性能随着数据量的增加而降低。数据量翻倍后性能降低的超过两倍。
如果以日志系统的风格来设计消息队列,也就是说仅仅支持Append方式的写操作,则所有读写操作都能实现常数的时间复杂度,同时读写操作不会相互阻塞。这种存储风格的一个明显优势是,性能和数据量完全解耦。服务器可以使用廉价、低转速磁盘的全部空间。
性能和数据量解耦后,可以实现在典型消息系统中难以看到的特性。例如,在Kafka中,消息不会在消费后立即被删除(以压缩空间,因为数据量和性能负相关),而是被保留一周甚至更久。这种特性让消费者的行为非常灵活,可以Replay历史数据,或者跳转到未来进行消费。
Kafka的一个主要目标是处理Web活动数据,这种数据的量是非常大的,一次页面访问可能产生数十条数据。为了保证数据的生产和消费都能流畅进行,Kafka必须非常高效。
关于磁盘的效率问题在前面讨论过了。消除磁盘性能问题后,系统中最常见的性能问题通常由于:
- 过多的微小IO操作:C-S之间的IO和服务器内部的持久化都可能有这种问题。Kafka引入了消息集的概念,批量的发送消息,避免过多的网络请求带来Roundtrip开销。相应的,服务器把消息集整个的Append到日志,消费者也采取批量抓取的方式消费。Kafka的这种设计,让系统性能有了数量级的提升,因为批处理产生了更大的网络包、更大的顺序磁盘IO、更大的连续内存块
- 过多的字节拷贝:低消息生产速率下不是问题,反之则影响重大。为了避免消息拷贝,Kafka设计了统一的二进制消息格式供生产者、代理、消费者使用,这样数据块就可以直接传输,不需要修改。代理维护的分区日志,本身仅仅是目录中的一系列文件,文件中的消息同样使用标准的二进制格式
通常情况下,将文件传输到套接字中的步骤是:
- OS在内核空间,读取文件到页面缓存
- 应用从内核空间读取页面缓存,拷贝到用户空间
- 应用将用户空间中的文件回写到内核空间(套接字缓冲)
- OS将套接字缓冲拷贝到网络接口(NIC)缓冲。然后网卡负责发送
很明显上述步骤较为低效,包含了4次拷贝2次系统调用。现代UNIX系统(Linux通过sendfile系统调用)对页面缓存到套接字的传输高度优化,统一消息格式因而受益。具体来说,免去了不必要的拷贝(Zero-copy),仅仅需要将页面缓存拷贝到NIC缓冲即可,不需要用户空间的参与。
Kafka主题通常有多组消费者,那么,日志一旦拷贝到页面缓存,就可以多次的Zero-copy并发送给消费者。这避免了反复拷贝数据到用户空间,记录的消费速率,仅仅受限于网络连接本身。
某些情况下,系统的瓶颈出现在网络带宽,而非磁盘或者CPU。当数据处理管线需要跨越数据中心进行消息收发时尤为如此。
用户可以自己实现压缩,但是客户端代码很难实现高效的压缩。Kafka采取批量压缩的方式,这样多个消息中反复出现的内容可以被有效的处理。
Kafka代理收到压缩消息后,直接存放到日志中,因此日志的磁盘结构也是紧凑的。除非消费者进行消费,消息不会解压缩。Kafka支持GZip、LZ4、Snappy等压缩协议。
生产者直接将消息发送给分区的Leader,Kafka没有引入中介的路由层。为了能让生产者知道向谁发送消息,所有节点都能应答元数据请求,给出服务器状态信息、分区Leader信息。
生产者负责决定将消息发送到哪个分区,可以随机发送、循环轮流发送,或者根据消息内容实现某种语义分区。要使用语义分区,生产者需要给记录一个适当的键,此键的哈希(默认逻辑,分区函数可以被覆盖)决定其被发送到哪个分区。例如,假设记录的键选用UserId,则用户的所有数据都被发送到单个分区。
批量处理是高效性的一个重大驱动力。Kafka生产者倾向于在内存中累计记录,并通过单个请求发送多个记录构成的批次。你可以配置累计记录时最多消耗的时间,或者累计数据的最大字节数。通过调整配置,你可以在更好吞吐量、更低延迟之间做权衡。
消费者通过向Leader分区发送抓取(Fetch)请求来工作。请求中会包含消费者的消费偏移量,代理依据此偏移量发送一批记录作为响应。消费者对偏移量具有很大的控制能力,只要不超过可用的区间,消费者能够倒回(Rewind)以消费历史数据,或者跳转到“未来”(在跟不上生产者的节奏时)进行消费。
消息应该由代理推送给客户端,还是由客户端主动去拉取?Kafka在这方面的设计和典型消息系统是一致的 —— 消息由生产者推送给代理,再由消费者从代理处拉取。
某些以日志为中心的系统,例如Apache Flume,把消息推送给消费者。推和拉各有其优缺点。
当推送消息给消费者时,代理很难针对消费者的特性进行处理,控制数据的推送速率,当生产者速度太快时,消费者很容易过载。拉模式下,消费者可以自由决定拉取速率,并通过某种后备(Backoff)机制缓和生产和消费速率不匹配的问题。拉模式还将将批量抓取的职责转移到消费者端,消费者更知道自己何时能消费大批量数据。
拉模式的缺点是,如果代理上没有可用的消息。消费者的轮询循环会反复进行,浪费CPU。Kafka的解决方式是使用长轮询,你可以配置一个等待时间,如果代理上没有消息可用,则客户端会在套接字上等待,而非立即结束请求。
对于消息系统来说,跟踪哪些消息已经被消费,是对性能有关键影响的地方。
传统消息系统一般都在代理端维护消息的元数据,通过元数据识别哪些消息已经被消费。当客户端抓取消息,或者Ack消息后,代理立即删除此消息。之所有要尽快的删除,是因为传统消息系统的数据结构的扩容性往往较差,倾向于尽可能让此结构更小。
识别消息是否真正被成功消费,并不简单。如果消息被抓取后立即删除,消费者可能在处理消息之前就宕掉。为解决此问题,消息系统通常引入Ack机制。这又引入新的问题,消费者可能在处理消息之后,Ack之前宕掉,并引发重复的消息处理。分布式事务机制可能解决此问题,但是性能很差。
Kafka跟踪消息进度的方式完全不同,它将主题分为若干个消息有序的分区,并引入消费偏移量,标记消费者当前的位置。对于每个消费者和分区的组合,仅仅需要一个整数就可以完成状态记录。
消费偏移量的引入,还让消息没必要立即删除,实际上,只要磁盘空间充足,你可能配置让Kafka在一周或更久后才删除数据。
由于Kafka不会立即删除数据,且数据量的多少和性能无关,因此,那种周期性运行,消费大量数据并存放到离线系统(Hadoop、RDBMS数据仓库)的消费者,可以和Kafka协同工作。
对于Hadoop来说,为每个节点/主题/分区的组合创建一个Map任务,可以很好的实现加载并行化。利用Hadoop提供的任务管理能力,失败的任务可以自动重启而不需要担心数据重复,重启的任务自动从原始的位置开始读取。
Kafka的bin目录包含很多脚本,可以用于完成日常的管理工作。
主题可以在第一次被使用时自动创建,你可能需要对默认配置进行定制,以便控制默认窗口过程。
要手工添加主题,参考如下命令:
1 2 |
kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic topic_name --partitions 20 --replication-factor 3 --config x=y |
分区日志存放在Kafka的log目录,每个分区都有自己的目录,目录命名规则为topicname-partitionid。
仍然使用上面的脚本,例如修改分区数可以:
1 2 |
kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic topic_name --partitions 40 |
注意:
- 分区可能具有应用语义,添加分区不会自动修改已有分区的数据分布
- 当前不支持减少分区数量
要添加、删除主题的配置项,参考:
1 2 3 4 |
kafka-configs.sh --zookeeper zk_host:port/chroot --entity-type topics --entity-name topic_name --alter --add-config x=y kafka-configs.sh --zookeeper zk_host:port/chroot --entity-type topics --entity-name topic_name --alter --delete-config x |
kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic topic_name
Kafka集群会自动检测到宕机、有意被关闭(维护目的)的代理,并在必要时为分区选举新的Leader。
对于有意关闭的情况,你可以优雅的停止代理而不是强行关机。优雅关闭的好处是:
- 会自动同步所有日志到磁盘中,避免在重启时进行日志恢复。日志恢复对日志尾部的所有记录进行校验和操作,需要一定时间才能完成
- 对于待关闭机器是Leader的分区,执行迁移,将Leader身份迁移给其它代理。这种Leader迁移更快、Leader迁移导致的分区不可用时间仅仅在ms级别
优雅关闭时,上述第1条自动发生。要启用第2条,需要配置: controlled.shutdown.enable=true
代理宕机后,其Leader身份全部被移除。这意味着,代理重启后,它不是任何分区的Leader,因而不会接受任何客户端的读写请求,其计算能力被浪费。
为了避免这种不平衡,Kafka引入优选Replica的概念。如果某个分区的复制集为1,5,9则1是优选节点,因为它在列表的最前面。要在Replica 1宕机重启后,自动恢复其Leader身份,可以执行:
kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
要自动在需要的时候执行上述命令,可以配置: auto.leader.rebalance.enable=true
从0.10开始Kafka支持机柜感知(Rack awareness),允许将分区的不同Replica分布到不同的Rack中,防止机柜整体断电断网导致Kafka分区完全不可用,Rack可以在地理上远程分布。
要指定代理所属的机柜,配置broker.rack属性即可。
当创建、修改主题,或者再平衡Replica时,Kafka会尽可能的把分区的每个Replica分散到不同的Rack中。
此脚本提供了一种跨越数据中心进行复制的手段。数据会从源集群中读取,并写到目标集群的同名主题中。实际上此镜像工具就是挂钩在一起的消费者-生产者组合。
为了增强吞吐量和容错能力,你可以运行此脚本的任意数量的实例。当某个实例意外宕掉后,其它实例会接管它的负载。
镜像不能作为完全有效的容错措施,因为源、目标集群是完全独立的,它们的偏移量取值是不一样的。
镜像单个主题的示例:
1 2 3 4 |
kafka-mirror-maker.sh --consumer.config consumer.properties --producer.config producer.properties --whitelist topic-name # 白名单,仅仅复制这些主题,可以使用正则式 |
某些部署场景中,需要跨越数据中心复制数据。
Kafka推荐的方式是,在数据中心内部创建集群,应用程序仅仅和当前数据中心的Kafka集群交互。对于跨数据中心的数据复制,利用跨集群镜像实现。
这种部署模式的好处时,让数据中心之间解耦,使它们作为独立的实体运作。同时,跨数据中心的数据复制可以被集中管理。当数据中心之间的链路断开后,各中心可以独立运作,落后的数据复制进度可以在链路恢复后赶上。
如果某个应用需要全数据集的视图,使用跨集群镜像机制来聚合所有数据中心的主题,形成一个新数据中心,那些需要全数据集视图的应用,和新数据中心交互。
除了上述推荐方式以外,你也可以让应用通过WAN访问其它数据中心,但是很明显这会引入访问延迟。
不过,即便在高延迟网络环境下,Kafka天然支持的生产者/消费者批处理能力依然能让应用拥有较高吞吐量。不过你可能需要为代理、生产者、消费者调整socket.send.buffer.bytes、socket.receive.buffer.bytes等参数。
通常情况下,不要创建跨越数据中心的Kafka集群。特别是链路延迟较大的情况下。这会让ZooKeeper、Kafka的复制延迟都很大,此外,如果链路中断,则部分数据中心的Kafka、ZooKeeper会不可用。
要列出集群中的消费者组,执行:
1 |
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list |
如果你使用老的High-Level消费者并且在ZooKeeper中存储组元数据(offsets.storage=zookeeper),则传递--zookeeper而非--bootstrap-server。
你可以检查某个消费者组中,每个分区的消费偏移量:
1 2 3 4 5 6 |
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group groupname # 还可以使用如下形式(仅仅显示基于ZooKeeper的消费者,不显示使用Java Consumer API的消费者 kafka-consumer-groups.sh --zookeeper localhost:2181 --describe --group groupname # 输出示例 # TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID # 主题 分区号 消费偏移量 日志偏移量 延迟 消费者 |
要添加新服务器到Kafka集群,你只需要为它分配唯一性的代理ID即可。但是新的代理不会自动被分配数据分区,因此默认情况下除非创建了新的主题,新加入的代理不会有任何作用。
在添加新服务器的同时,你常常会执行迁移,将已有的分区迁移到新服务器中。迁移过程需要手工触发,之后可以自动完成。迁移触发后,Kafka将新服务器作为待迁移分区的Follower节点,当完成数据复制,新服务器成为In-sync节点后,复制集中某个旧节点的数据被清除,成员身份也被解除。
Kafka提供了分区重分配工具,允许你跨越代理移动分区。理想的分区分布,能让所有代理节点具有均匀的数据负载、均匀的数据尺寸。Kafka没有提供自动识别“不均匀”的工具,因而管理员需要手工识别出哪些分区需要移动。
分区重分配工具可以在三个互斥的模式下运行:
- --generate,给出主题、代理的列表,生成一个候选重分配方案,将列出的主题的所有分区移动到代理中
- --execute,执行用户给出的重分配方案,方案通过 --reassignment-json-file参数给出
- --verify,显示上次重分配的执行情况,状态可以是成功、失败、进行中
在添加了新的服务器(扩容Kafka集群)之后,使用分区重分配工具,可以将一部分主题迁移到新添加的代理上,迁移整个主题,要比每次迁移一个分区更加容易。
要执行整个主题的迁移,用户需要指定被迁移的主题、接收主题的代理。迁移后,主题的复制因子保持不变,主题的分区被尽可能均匀的分布到目标代理中。
具体步骤如下:
- 以JSON格式提供待迁移主题的列表:
12345{// 待迁移主题"topics": [{"topic": "foo1"}, {"topic": "foo2"}],"version":1} - 使用分区重分配工具,生成一个候选的重分配置文件:
12345678910111213141516kafka-reassign-partitions.sh --zookeeper localhost:2181--topics-to-move-json-file topics-to-move.json # 待迁移主题列表文件--broker-list "5,6" # 目标代理列表(标识符)--generate# 输出的前一部分是当前重分配置文件(略)# 输出的后一部分是新的重分配置文件,示例:# {# "version":1,# # 主题 # 分区编号 # 复制集# "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},# {"topic":"foo1","partition":0,"replicas":[5,6]},# {"topic":"foo2","partition":2,"replicas":[5,6]},# {"topic":"foo2","partition":0,"replicas":[5,6]},# {"topic":"foo1","partition":1,"replicas":[5,6]},# {"topic":"foo2","partition":1,"replicas":[5,6]}]# } - 此时重分配尚未发生,你需要备份当前分配置文件,以便回滚。然后将重分配候选保存到JSON文件中,执行:
123kafka-reassign-partitions.sh --zookeeper localhost:2181# 重分配配置文件--reassignment-json-file expand-cluster-reassignment.json --execute - 上述命令会异步执行,你可以随时检查进度:
12345kafka-reassign-partitions.sh --zookeeper localhost:2181--reassignment-json-file expand-cluster-reassignment.json --verify# Reassignment of partition [foo1,0] completed successfully# Reassignment of partition [foo1,1] is in progress
除了上节那种自动分配、迁移整个主题的用法之外,分区重分配工具还允许你手工指定迁移特定的分区。如果你明白集群的负载分配情况,并且自动生成的重分配候选不能满足需要,可以进行使用这种定制迁移。
在准备好重分配置文件后,执行上节第3步即可。
增加某个分区的复制因子很简单,只需要在定制的重分配置文件中,为某个分区的replicas列表添加一个代理ID就可以了:
1 2 |
// 为foo的分区0添加一个复制因子,新的副本存放在代理7上 { "version":1, "partitions": [{"topic":"foo","partition":0,"replicas":[5,6,7]}] } |
Kafka支持设置迁移所消耗的带宽的上限,避免集群再平衡、添加/移除代理时Replica在机器之间移动消耗过多的带宽,影响系统性能。
限制带宽的方式有两种:
-
1kafka-reassign-partitions.sh --execute —throttle 50000000 # 单位 Bytes/s
注意:
- 你可以在迁移过程中重新调用上述脚本并指定不同的带宽限制
- 在迁移完成后,你必须调用--verify来解除带宽限制,否则,正常的数据复制会一直被限流
- 调用脚本
kafka-configs.sh来验证相关配置。和限流有关的配置有两对:
leader.replication.throttled.rate、follower.replication.throttled.rate 限流速率
leader.replication.throttled.replicas、follower.replication.throttled.replicas 受到限制的Replica
当前Kafka没有提供直接可用的集群缩容工具。这意味着,你需要使用分区重分配工具,将准备移除的代理上所有的分区全部迁移走。
Kafka支持在user,client-id、user、client-id级别进行配额,限制用户对集群资源的占用。默认情况下,资源占用不受限制。
要为用户user1、客户端clientA进行配额,可以:
1 2 3 4 5 6 |
kafka-configs.sh --zookeeper localhost:2181 --alter # 配额 生产速率 消费速率 --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048,request_percentage=200' # 要仅对用户、客户端进行配额,则指定两项之一 --entity-type users --entity-name user1 --entity-type clients --entity-name clientA |
要查看用户user1、客户端clientA的当前配额,可以:
1 |
kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type clients --entity-name clientA |
你可以在代理级别上设置默认的配额,对所有客户端生效:
1 2 3 |
# 仅仅当没有在ZooKeeper中覆盖时生效 quota.producer.default=10485760 quota.consumer.default=10485760 |
Leave a Reply