背景

kafka 的诞生,是为了解决 linkedin 的数据管道问题,期初 linkedin 采用了 ActiveMQ 来进行数据交换,大约是在 2010 年前后,那时的 ActiveMQ 还远远无法满足 linkedin 对数据传递系统的要求,经常由于各种缺陷而导致消息阻塞或者服务无法正常访问,为了能够解决这个问题,linkedin 决定研发自己的消息传递系统,当时 linkedin 的首席架构师 jay kreps 便开始组织团队进行消息传递系统的研发;

kafka 名字的由来

kafka 的架构师 jay kreps 对于 kafka 的名称由来是这样讲的,由于 jay kreps 非常喜欢 franz kafka, 并且觉得 kafka 这个名字很酷,因此取了个和消息传递系统完全不相干的名称 kafka,取名字是并没有特别的含义。

kafka 的设计目标

使用推送和拉取模式 实现生产者和消费者的解耦;
微消息系统中的消息提供数据持久化,以便支持多个消费者;
系统可以随着数据流的增长进行横向扩展;
通过系统优化实现高吞吐量;

kafka 历史

2010 年底,开源到 github,初始版本为 0.7.0;
2011 年 7 月因为备受关注,被纳入 apache 孵化器项目;
2012 年 10 月,kafka 从 apache 孵化器项目毕业,成为 apache 顶级项目;
2014 年,jay kreps, neha narkhede, jun rao 离开 linkedin, 成立 confluent, 此后 linkedin 和 confluent 成为 kafka 的核心贡组织,致力于将 kafka 推广应用

应用场景

kafka-apis

消息系统

Kafka 作为一款优秀的消息系统,具有高吞吐量、内置的分区、备份冗余分布式等特点,为大规模消息处理提供了一种很好的解决方案。

应用监控

利用 Kafka 采集应用程序和服务器健康相关的指标,如 CPU 占用率、IO、内存、连接数、TPS、QPS 等,然后将指标信息进行处理,从而构建一个具有监控仪表盘、曲线图等可视化监控系统。例如,很多公司采用 Kafka 与 ELK(ElasticSearch、Logstash 和 Kibana)整合构建应用服务监控系统。

网站用户行为追踪

为了更好地了解用户行为、操作习惯,改善用户体验,进而对产品升级改进,将用户操作轨迹、内容等信息发送到 Kafka 集群上,通过 Hadoop、Spark 或 Strom 等进行数据分析处理,生成相应的统计报告,为推荐系统推荐对象建模提供数据源,进而为每个用户进行个性化推荐。

流处理

需要将已收集的流数据提供给其他流式计算框架进行处理,用 Kafka 收集流数据是一个不错的选择,而且当前版本的 Kafka 提供了 Kafka Streams 支持对流数据的处理。

持久性日志

Kafka 可以为外部系统提供一种持久性日志的分布式系统。日志可以在多个节点间进行备份,Kafka 为故障节点数据恢复提供了一种重新同步的机制。同时,Kafka 很方便与 HDFS 和 Flume 进行整合,这样就方便将 Kafka 采集的数据持久化到其他外部系统。

基本概念

Kafka 是一种高效的分布式消息系统。在性能上,它具有内置的数据冗余度与弹性,也具有高吞吐能力和可扩展性。
在功能上,它支持自动化的数据保存限制,能够以 “流” 的方式为应用提供数据转换,以及按照 “键 - 值(key-value)” 的建模关系 “压缩” 数据流。

Message(消息)

Kafka 中的一条记录或数据单位。每条消息都有一个键和对应的一个值,有时还会有可选的消息头。

Producer(生产者)

Producer 将消息发布到 Kafka 的 topics 上。Producer 决定向 topic 分区的发布方式,如:轮询的随机方法、或基于消息键(key)的分区算法。

Broker(代理)

Kafka 以分布式系统或集群的方式运行。那么群集中的每个节点称为一个 Broker。

Topic(主题)

Topic 是那些被发布的数据记录或消息的一种类别。消费者通过订阅 Topic,来读取写给它们的数据。

Topic Partition(主题分区)

不同的 Topic 被分为不同的分区,而每一条消息都会被分配一个 Offset,通常每个分区都会被复制至少一到两次。
每个分区都有一个 Leader 和存放在各个 Follower 上的一到多个副本(即:数据的副本),此法可防止某个 Broker 的失效。
群集中的所有 Broker 都可以作为 Leader 和 Follower,但是一个 Broker 最多只能有一个 Topic Partition 的副本。Leader 可被用来进行所有的读写操作。

Offset(偏移量)

单个分区中的每一条消息都被分配一个 Offset,它是一个单调递增的整型数,可用来作为分区中消息的唯一标识符。

Consumer(消费者)

Consumer 通过订阅 Topic partition,来读取 Kafka 的各种 Topic 消息。然后,消费类应用处理会收到消息,以完成指定的工作。

Consumer group(消费组)

Consumer 可以按照 Consumer group 进行逻辑划分。Topic Partition 被均衡地分配给组中的所有Consumers。因此,在同一个 Consumer group 中,所有的 Consumer 都以负载均衡的方式运作。换言之,同一组中的每一个 Consumer 都能看到每一条消息。如果某个 Consumer 处于 “离线” 状态的话,那么该分区将会被分配给同组中的另一个 Consumer。这就是所谓的 “再均衡(rebalance)”。当然,如果组中的 Consumer 多于分区数,则某些 Consumer 将会处于闲置的状态。相反,如果组中的 Consumer 少于分区数,则某些 Consumer 会获得来自一个以上分区的消息。

Lag(延迟)

当 Consumer 的速度跟不上消息的产生速度时,Consumer 就会因为无法从分区中读取消息,而产生延迟。

延迟表示为分区头后面的 Offset 数量。从延迟状态(到 “追赶上来”)恢复正常所需要的时间,取决于 Consumer 每秒能够应对的消息速度。

其公式如下:

1
time = messages / (consume rate per second - produce rate per second)

架构原理

对于 Kafka 的架构原理,我们先提出如下几个问题:

  • Kafka 的 topic 和分区内部是如何存储的,有什么特点?
  • 与传统的消息系统相比,Kafka 的消费模型有什么优点?
  • Kafka 如何实现分布式的数据存储与数据读取?

架构图

进程视角

Image result for kafka architecture

主题视角

ndsa 0402

消费模型

消息由生产者发送到 Kafka 集群后,会被消费者消费。一般来说我们的消费模型有两种:

  • 推送模型 (Push)
  • 拉取模型 (Pull)

基于推送模型的消息系统,由消息代理记录消费状态。消息代理将消息推送到消费者后,标记这条消息为已经被消费,但是这种方式无法很好地保证消费的处理语义。

比如当我们已经把消息发送给消费者之后,由于消费进程挂掉或者由于网络原因没有收到这条消息,如果我们在消费代理将其标记为已消费,这个消息就永久丢失了。

如果我们利用生产者收到消息后回复这种方法,消息代理需要记录消费状态,这种不可取。

如果采用 Push,消息消费的速率就完全由消费代理控制,一旦消费者发生阻塞,就会出现问题。

Kafka 采取拉取模型 (Poll),由自己控制消费速度,以及消费的进度,消费者可以按照任意的偏移量进行消费。
比如消费者可以消费已经消费过的消息进行重新处理,或者消费最近的消息等等。

网络模型

Kafka Client:单线程 Selector

img

单线程模式适用于并发链接数小,逻辑简单,数据量小。

在kafka中,consumer和producer都是使用的上面的单线程模式。这种模式不适合kafka的服务端,在服务端中请求处理过程比较复杂,会造成线程阻塞,一旦出现后续请求就会无法处理,会造成大量请求超时,引起雪崩。而在服务器中应该充分利用多线程来处理执行逻辑。

Kafka server:多线程Selector

img

在kafka服务端采用的是多线程的Selector模型,Acceptor运行在一个单独的线程中,对于读取操作的线程池中的线程都会在selector注册read事件,负责服务端读取请求的逻辑。成功读取后,将请求放入message queue共享队列中。然后在写线程池中,取出这个请求,对其进行逻辑处理,即使某个请求线程阻塞了,还有后续的县城从消息队列中获取请求并进行处理,在写线程中处理完逻辑处理,由于注册了OP_WIRTE事件,所以还需要对其发送响应。

日志结构与数据存储

Kafka中的消息是以主题(Topic)为基本单位进行组织的,各个主题之间相互独立。在这里主题只是一个逻辑上的抽象概念,而在实际数据文件的存储中,Kafka中的消息存储在物理上是以一个或多个分区(Partition)构成,每个分区对应本地磁盘上的一个文件夹,每个文件夹内包含了日志索引文件(“.index”和“.timeindex”)和日志数据文件(“.log”)两部分。分区数量可以在创建主题时指定,也可以在创建Topic后进行修改。

在Kafka中正是因为使用了分区(Partition)的设计模型,通过将主题(Topic)的消息打散到多个分区,并分布保存在不同的Kafka Broker节点上实现了消息处理的高吞吐量。其生产者和消费者都可以多线程地并行操作,而每个线程处理的是一个分区的数据。

log_anatomy

同时,Kafka为了实现集群的高可用性,在每个Partition中可以设置有一个或者多个副本(Replica),分区的副本分布在不同的Broker节点上。同时,从副本中会选出一个副本作为Leader,Leader副本负责与客户端进行读写操作。而其他副本作为Follower会从Leader副本上进行数据同步。

分区、副本的日志文件存储

在三台虚拟机上搭建完成Kafka的集群后(Kafka Broker节点数量为3个),通过在Kafka Broker节点的/bin下执行以下的命令即可创建主题和指定数量的分区以及副本:

1
./kafka-topics.sh --create --zookeeper 10.154.0.73:2181 --replication-factor 3 --partitions  3 --topic kafka-topic-01

创建完主题、分区和副本后可以查到出主题的状态(该方式主要列举了主题所有分区对应的副本以及ISR列表信息):

1
2
3
4
5
./kafka-topics.sh --describe --zookeeper 10.154.0.73:2181 --topic kafka-topic-01
Topic:kafka-topic-01 PartitionCount:3 ReplicationFactor:3 Configs:
Topic: kafka-topic-01 Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: kafka-topic-01 Partition: 1 Leader: 2 Replicas: 2,0,1 Isr: 2,1,0
Topic: kafka-topic-01 Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 1,2,0

通过实现一个简单的Kafka Producer的demo,即可完成生产者发送消息给Kafka Broker的功能。在使用Producer产生大量的消息后,可以看到部署集群的三台虚拟机在Kafka的config/server.properties配置文件中“log.dirs”指定的日志数据存储目录下存在三个分区目录,同时在每个分区目录下存在很多对应的日志数据文件和日志索引文件文件,具体如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#1、分区目录文件
drwxr-x--- 2 root root 4096 Jul 26 19:35 kafka-topic-01-0
drwxr-x--- 2 root root 4096 Jul 24 20:15 kafka-topic-01-1
drwxr-x--- 2 root root 4096 Jul 24 20:15 kafka-topic-01-2

#2、分区目录中的日志数据文件和日志索引文件
-rw-r----- 1 root root 512K Jul 24 19:51 00000000000000000000.index
-rw-r----- 1 root root 1.0G Jul 24 19:51 00000000000000000000.log
-rw-r----- 1 root root 768K Jul 24 19:51 00000000000000000000.timeindex
-rw-r----- 1 root root 512K Jul 24 20:03 00000000000022372103.index
-rw-r----- 1 root root 1.0G Jul 24 20:03 00000000000022372103.log
-rw-r----- 1 root root 768K Jul 24 20:03 00000000000022372103.timeindex
-rw-r----- 1 root root 512K Jul 24 20:15 00000000000044744987.index
-rw-r----- 1 root root 1.0G Jul 24 20:15 00000000000044744987.log
-rw-r----- 1 root root 767K Jul 24 20:15 00000000000044744987.timeindex
-rw-r----- 1 root root 10M Jul 24 20:21 00000000000067117761.index
-rw-r----- 1 root root 511M Jul 24 20:21 00000000000067117761.log
-rw-r----- 1 root root 10M Jul 24 20:21 00000000000067117761.timeindex

由上面可以看出,每个分区在物理上对应一个文件夹,分区的命名规则为主题名后接“—”连接符,之后再接分区编号,分区编号从0开始,编号的最大值为分区总数减1。每个分区又有1至多个副本,分区的副本分布在集群的不同代理上,以提高可用性。从存储的角度上来说,分区的每个副本在逻辑上可以抽象为一个日志(Log)对象,即分区副本与日志对象是相对应的。下图是在三个Kafka Broker节点所组成的集群中分区的主/备份副本的物理分布情况图:

img

日志索引和数据文件的存储结构

在Kafka中,每个Log对象又可以划分为多个LogSegment文件,每个LogSegment文件包括一个日志数据文件和两个索引文件(偏移量索引文件和消息时间戳索引文件)。其中,每个LogSegment中的日志数据文件大小均相等(该日志数据文件的大小可以通过在Kafka Broker的config/server.properties配置文件的中的log.segment.bytes进行设置,默认为1G大小(1073741824字节),在顺序写入消息时如果超出该设定的阈值,将会创建一组新的日志数据和索引文件)。

Kafka将日志文件封装成一个FileMessageSet对象,将偏移量索引文件和消息时间戳索引文件分别封装成OffsetIndex和TimerIndex对象。Log和LogSegment均为逻辑概念,Log是对副本在Broker上存储文件的抽象,而LogSegment是对副本存储下每个日志分段的抽象,日志与索引文件才与磁盘上的物理存储相对应。下图为Kafka日志存储结构中的对象之间的对应关系图:

img

为了进一步查看“.index”偏移量索引文件、“.timeindex”时间戳索引文件和“.log”日志数据文件,可以执行下面的命令将二进制分段的索引和日志数据文件内容转换为字符型文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 1、执行下面命令即可将日志数据文件内容dump出来
./kafka-run-class.sh kafka.tools.DumpLogSegments --files /apps/svr/Kafka/kafkalogs/kafka-topic-01-0/00000000000022372103.log --print-data-log > 00000000000022372103_txt.log

#2、dump出来的具体日志数据内容
Dumping /apps/svr/Kafka/kafkalogs/kafka-topic-01-0/00000000000022372103.log
Starting offset: 22372103
offset: 22372103 position: 0 CreateTime: 1532433067157 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: 5d2697c5-d04a-4018-941d-881ac72ed9fd
offset: 22372104 position: 0 CreateTime: 1532433067159 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: 0ecaae7d-aba5-4dd5-90df-597c8b426b47
offset: 22372105 position: 0 CreateTime: 1532433067159 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: 87709dd9-596b-4cf4-80fa-d1609d1f2087
......
......
offset: 22372444 position: 16365 CreateTime: 1532433067166 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: 8d52ec65-88cf-4afd-adf1-e940ed9a8ff9
offset: 22372445 position: 16365 CreateTime: 1532433067168 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: 5f5f6646-d0f5-4ad1-a257-4e3c38c74a92
offset: 22372446 position: 16365 CreateTime: 1532433067168 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: 51dd1da4-053e-4507-9ef8-68ef09d18cca
offset: 22372447 position: 16365 CreateTime: 1532433067168 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: 80d50a8e-0098-4748-8171-fd22d6af3c9b
......
......
offset: 22372785 position: 32730 CreateTime: 1532433067174 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: db80eb79-8250-42e2-ad26-1b6cfccb5c00
offset: 22372786 position: 32730 CreateTime: 1532433067176 isvalid: true keysize: 4 valuesize: 36 magic: 2 compresscodec: NONE producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] key: 1 payload: 51d95ab0-ab0d-4530-b1d1-05eeb9a6ff00
......
......

#3、同样地,dump出来的具体偏移量索引内容
Dumping /apps/svr/Kafka/kafkalogs/kafka-topic-01-0/00000000000022372103.index
offset: 22372444 position: 16365
offset: 22372785 position: 32730
offset: 22373467 position: 65460
offset: 22373808 position: 81825
offset: 22374149 position: 98190
offset: 22374490 position: 114555
......
......

#4、dump出来的时间戳索引文件内容
Dumping /apps/svr/Kafka/kafkalogs/kafka-topic-01-0/00000000000022372103.timeindex
timestamp: 1532433067174 offset: 22372784
timestamp: 1532433067191 offset: 22373466
timestamp: 1532433067206 offset: 22373807
timestamp: 1532433067214 offset: 22374148
timestamp: 1532433067222 offset: 22374489
timestamp: 1532433067230 offset: 22374830
......
......

由上面dump出来的偏移量索引文件和日志数据文件的具体内容可以分析出来,偏移量索引文件中存储着大量的索引元数据,日志数据文件中存储着大量消息结构中的各个字段内容和消息体本身的值。索引文件中的元数据postion字段指向对应日志数据文件中message的实际位置(即为物理偏移地址)。

下面的表格先列举了Kakfa消息体结构中几个主要字段的说明:

Kafka消息字段 各个字段说明
offset 消息偏移量
message size 消息总长度
CRC32 CRC32编码校验和
attributes 表示为独立版本、或标识压缩类型、或编码类型
magic 表示本次发布Kafka服务程序协议版本号
key length 消息Key的长度
key 消息Key的实际数据
valuesize 消息的实际数据长度
playload 消息的实际数据

日志数据文件

Kafka将生产者发送给它的消息数据内容保存至日志数据文件中,该文件以该段的基准偏移量左补齐0命名,文件后缀为“.log”。分区中的每条message由offset来表示它在这个分区中的偏移量,这个offset并不是该Message在分区中实际存储位置,而是逻辑上的一个值(Kafka中用8字节长度来记录这个偏移量),但它却唯一确定了分区中一条Message的逻辑位置,同一个分区下的消息偏移量按照顺序递增(这个可以类比下数据库的自增主键)。另外,从dump出来的日志数据文件的字符值中可以看到消息体的各个字段的内容值。

偏移量索引文件

如果消息的消费者每次fetch都需要从1G大小(默认值)的日志数据文件中来查找对应偏移量的消息,那么效率一定非常低,在定位到分段后还需要顺序比对才能找到。Kafka在设计数据存储时,为了提高查找消息的效率,故而为分段后的每个日志数据文件均使用稀疏索引的方式建立索引,这样子既节省空间又能通过索引快速定位到日志数据文件中的消息内容。偏移量索引文件和数据文件一样也同样也以该段的基准偏移量左补齐0命名,文件后缀为“.index”。

从上面dump出来的偏移量索引内容可以看出,索引条目用于将偏移量映射成为消息在日志数据文件中的实际物理位置,每个索引条目由offset和position组成,每个索引条目可以唯一确定在各个分区数据文件的一条消息。其中,Kafka采用稀疏索引存储的方式,每隔一定的字节数建立了一条索引,可以通过index.interval.bytes设置索引的跨度;

有了偏移量索引文件,通过它,Kafka就能够根据指定的偏移量快速定位到消息的实际物理位置。具体的做法是,根据指定的偏移量,使用二分法查询定位出该偏移量对应的消息所在的分段索引文件和日志数据文件。然后通过二分查找法,继续查找出小于等于指定偏移量的最大偏移量,同时也得出了对应的position(实际物理位置),根据该物理位置在分段的日志数据文件中顺序扫描查找偏移量与指定偏移量相等的消息。下面是Kafka中分段的日志数据文件和偏移量索引文件的对应映射关系图(ps:其中也说明了如何按照起始偏移量来定位到日志数据文件中的具体消息)。

img

时间戳索引文件

从上面一节的分区目录中,我们还可以看到存在一些以“.timeindex”的时间戳索引文件。这种类型的索引文件是Kafka从0.10.1.1版本开始引入的的一个基于时间戳的索引文件,它们的命名方式与对应的日志数据文件和偏移量索引文件名基本一样,唯一不同的就是后缀名。从上面dump出来的该种类型的时间戳索引文件的内容来看,每一条索引条目都对应了一个8字节长度的时间戳字段和一个4字节长度的偏移量字段,其中时间戳字段记录的是该LogSegment到目前为止的最大时间戳,后面对应的偏移量即为此时插入新消息的偏移量。

另外,时间戳索引文件的时间戳类型与日志数据文件中的时间类型是一致的,索引条目中的时间戳值及偏移量与日志数据文件中对应的字段值相同(ps:Kafka也提供了通过时间戳索引来访问消息的方法)。

副本机制

Kafka的副本机制是多个服务端节点对其他节点的主题分区的日志进行复制。当集群中的某个节点出现故障,访问故障节点的请求会被转移到其他正常节点(这一过程通常叫Reblance),kafka每个主题的每个分区都有一个主副本以及0个或者多个副本,副本保持和主副本的数据同步,当主副本出故障时就会被替代。

img

在Kafka中并不是所有的副本都能被拿来替代主副本,所以在kafka的leader节点中维护着一个ISR(In sync Replicas)集合,翻译过来也叫正在同步中集合,在这个集合中的需要满足两个条件:

  • 节点必须和ZK保持连接
  • 在同步的过程中这个副本不能落后主副本太多

另外还有个AR(Assigned Replicas)用来标识副本的全集,OSR用来表示由于落后被剔除的副本集合,所以公式如下:ISR = leader + 没有落后太多的副本; AR = OSR+ ISR;

这里先要说下两个名词:HW(高水位)是consumer能够看到的此partition的位置,LEO是每个partition的log最后一条Message的位置。HW能保证leader所在的broker失效,该消息仍然可以从新选举的leader中获取,不会造成消息丢失。

当producer向leader发送数据时,可以通过request.required.acks参数来设置数据可靠性的级别:

  • 1(默认):这意味着producer在ISR中的leader已成功收到的数据并得到确认后发送下一条message。如果leader宕机了,则会丢失数据。
  • 0:这意味着producer无需等待来自broker的确认而继续发送下一批消息。这种情况下数据传输效率最高,但是数据可靠性确是最低的。
  • -1:producer需要等待ISR中的所有follower都确认接收到数据后才算一次发送完成,可靠性最高。但是这样也不能保证数据不丢失,比如当ISR中只有leader时(其他节点都和zk断开连接,或者都没追上),这样就变成了acks=1的情况。

高可用及幂等

在分布式系统中一般有三种处理语义:

at-least-once

至少一次,有可能会有多次。如果producer收到来自ack的确认,则表示该消息已经写入到Kafka了,此时刚好是一次,也就是我们后面的exactly-once。但是如果producer超时或收到错误,并且request.required.acks配置的不是-1,则会重试发送消息,客户端会认为该消息未写入Kafka。如果broker在发送Ack之前失败,但在消息成功写入Kafka之后,这一次重试将会导致我们的消息会被写入两次,所以消息就不止一次地传递给最终consumer,如果consumer处理逻辑没有保证幂等的话就会得到不正确的结果。

在这种语义中会出现乱序,也就是当第一次ack失败准备重试的时候,但是第二消息已经发送过去了,这个时候会出现单分区中乱序的现象,我们需要设置Prouducer的参数max.in.flight.requests.per.connection,flight.requests是Producer端用来保存发送请求且没有响应的队列,保证Producer端未响应的请求个数为1。

at-most-once

如果在ack超时或返回错误时producer不重试,也就是我们讲request.required.acks=-1,则该消息可能最终没有写入kafka,所以consumer不会接收消息。

exactly-once

刚好一次,即使producer重试发送消息,消息也会保证最多一次地传递给consumer。该语义是最理想的,也是最难实现的。在0.10之前并不能保证exactly-once,需要使用consumer自带的幂等性保证。0.11.0使用事务保证了

如何实现exactly-once

要实现exactly-once在Kafka 0.11.0中有两个官方策略:

单Producer单Topic

每个producer在初始化的时候都会被分配一个唯一的PID,对于每个唯一的PID,Producer向指定的Topic中某个特定的Partition发送的消息都会携带一个从0单调递增的sequence number。

在我们的Broker端也会维护一个维度为,每次提交一次消息的时候都会对齐进行校验:

  • 如果消息序号比Broker维护的序号大一以上,说明中间有数据尚未写入,也即乱序,此时Broker拒绝该消息,Producer抛出InvalidSequenceNumber
  • 如果消息序号小于等于Broker维护的序号,说明该消息已被保存,即为重复消息,Broker直接丢弃该消息,Producer抛出DuplicateSequenceNumber
  • 如果消息序号刚好大一,就证明是合法的

上面所说的解决了两个问题:

  1. 当Prouducer发送了一条消息之后失败,broker并没有保存,但是第二条消息却发送成功,造成了数据的乱序。

  2. 当Producer发送了一条消息之后,broker保存成功,ack回传失败,producer再次投递重复的消息。

上面所说的都是在同一个PID下面,意味着必须保证在单个Producer中的同一个seesion内,如果Producer挂了,被分配了新的PID,这样就无法保证了,所以Kafka中又有事务机制去保证。

事务

在kafka中事务的作用是

  • 实现exactly-once语义

  • 保证操作的原子性,要么全部成功,要么全部失败。

  • 有状态的操作的恢复

事务可以保证就算跨多个,在本次事务中的对消费队列的操作都当成原子性,要么全部成功,要么全部失败。并且,有状态的应用也可以保证重启后从断点处继续处理,也即事务恢复。在kafka的事务中,应用程序必须提供一个唯一的事务ID,即Transaction ID,并且宕机重启之后,也不会发生改变,Transactin ID与PID可能一一对应。区别在于Transaction ID由用户提供,而PID是内部的实现对用户透明。为了Producer重启之后,旧的Producer具有相同的Transaction ID失效,每次Producer通过Transaction ID拿到PID的同时,还会获取一个单调递增的epoch。由于旧的Producer的epoch比新Producer的epoch小,Kafka可以很容易识别出该Producer是老的Producer并拒绝其请求。为了实现这一点,Kafka 0.11.0.0引入了一个服务器端的模块,名为Transaction Coordinator,用于管理Producer发送的消息的事务性。该Transaction Coordinator维护Transaction Log,该log存于一个内部的Topic内。由于Topic数据具有持久性,因此事务的状态也具有持久性。Producer并不直接读写Transaction Log,它与Transaction Coordinator通信,然后由Transaction Coordinator将该事务的状态插入相应的Transaction Log。Transaction Log的设计与Offset Log用于保存Consumer的Offset类似。

最佳实践

针对 Partitions 的最佳实践

了解分区的数据速率,以确保提供合适的数据保存空间

此处所谓“分区的数据速率”是指数据的生成速率。换言之,它是由“平均消息大小”乘以“每秒消息数”得出的数据速率决定了在给定时间内,所能保证的数据保存空间的大小(以字节为单位)。
如果您不知道数据速率的话,则无法正确地计算出满足基于给定时间跨度的数据,所需要保存的空间大小。
同时,数据速率也能够标识出单个 Consumer 在不产生延时的情况下,所需要支持的最低性能值。

除非您有其他架构上的需要,否则在写 Topic 时请使用随机分区

在您进行大型操作时,各个分区在数据速率上的参差不齐是非常难以管理的。

其原因来自于如下三个方面:

  • 首先,“热”(有较高吞吐量)分区上的 Consumer 势必会比同组中的其他 Consumer 处理更多的消息,因此很可能会导致出现在处理上和网络上的瓶颈。
  • 其次,那些为具有最高数据速率的分区,所配置的最大保留空间,会导致Topic 中其他分区的磁盘使用量也做相应地增长。
  • 第三,根据分区的 Leader 关系所实施的最佳均衡方案,比简单地将 Leader 关系分散到所有 Broker 上,要更为复杂。在同一 Topic 中,“热”分区会“承载”10 倍于其他分区的权重。

有关 Topic Partition 的使用,可以参阅《Kafka Topic Partition的各种有效策略》https://blog.newrelic.com/engineering/effective-strategies-kafka-topic-partitioning/

针对 Consumers 的最佳实践

如果 Consumers 运行的是比 Kafka 0.10 还要旧的版本,那么请马上升级

在 0.8.x 版中,Consumer 使用 Apache ZooKeeper 来协调 Consumer group,而许多已知的 Bug 会导致其长期处于再均衡状态,或是直接导致再均衡算法的失败(我们称之为“再均衡风暴”)。
因此在再均衡期间,一个或多个分区会被分配给同一组中的每个 Consumer。
而在再均衡风暴中,分区的所有权会持续在各个 Consumers 之间流转,这反而阻碍了任何一个 Consumer 去真正获取分区的所有权。

调优 Consumer 的套接字缓冲区(socket buffers),以应对数据的高速流入

在 Kafka 的 0.10.x 版本中,参数 receive.buffer.bytes 的默认值为 64KB。而在 Kafka 的 0.8.x 版本中,参数 socket.receive.buffer.bytes 的默认值为 100KB。
这两个默认值对于高吞吐量的环境而言都太小了,特别是如果 Broker 和 Consumer 之间的网络带宽延迟积(bandwidth-delay product)大于局域网(local areanetwork,LAN)时。
对于延迟为 1 毫秒或更多的高带宽的网络(如 10Gbps 或更高),请考虑将套接字缓冲区设置为 8 或 16MB。
如果您的内存不足,也至少考虑设置为 1MB。当然,您也可以设置为 -1,它会让底层操作系统根据网络的实际情况,去调整缓冲区的大小。
但是,对于需要启动“热”分区的 Consumers 来说,自动调整可能不会那么快。

设计具有高吞吐量的 Consumers,以便按需实施背压(back-pressure)

通常,我们应该保证系统只去处理其能力范围内的数据,而不要超负荷“消费”,进而导致进程中断“挂起”,或出现 Consume group 的溢出。
如果是在 Java 虚拟机(JVM)中运行,Consumers 应当使用固定大小的缓冲区,而且最好是使用堆外内存(off-heap)。请参见 Disruptor 模式:http://lmax-exchange.github.io/disruptor/files/Disruptor-1.0.pdf
固定大小的缓冲区能够阻止 Consumer 将过多的数据拉到堆栈上,以至于 JVM 花费掉其所有的时间去执行垃圾回收,进而无法履行其处理消息的本质工作。

在 JVM 上运行各种 Consumers 时,请警惕垃圾回收对它们可能产生的影响

例如,长时间垃圾回收的停滞,可能导致 ZooKeeper 的会话被丢弃、或 Consumer group 处于再均衡状态。
对于 Broker 来说也如此,如果垃圾回收停滞的时间太长,则会产生集群掉线的风险。

针对 Producers 的最佳实践

配置 Producer,以等待各种确认

籍此 Producer 能够获知消息是否真正被发送到了 Broker 的分区上。在 Kafka 的 0.10.x 版本上,其设置是 Acks;而在 0.8.x 版本上,则为 request.required.acks。
Kafka 通过复制,来提供容错功能,因此单个节点的故障、或分区 Leader 关系的更改不会影响到系统的可用性。
如果您没有用 Acks 来配置 Producer(或称“fireand forget”)的话,则消息可能会悄然丢失。

为各个 Producer 配置 Retries

其默认值为 3,当然是非常低的。不过,正确的设定值取决于您的应用程序,即:就那些对于数据丢失零容忍的应用而言,请考虑设置为 Integer.MAX_VALUE(有效且最大)。
这样将能够应对 Broker 的 Leader 分区出现无法立刻响应 Produce 请求的情况。

为高吞吐量的 Producer,调优缓冲区的大小

特别是 buffer.memory 和 batch.size(以字节为单位)。由于 batch.size 是按照分区设定的,而 Producer 的性能和内存的使用量,都可以与 Topic 中的分区数量相关联。
因此,此处的设定值将取决于如下几个因素:

  • Producer 数据速率(消息的大小和数量)
  • 要生成的分区数
  • 可用的内存量

请记住,将缓冲区调大并不总是好事,如果 Producer 由于某种原因而失效了(例如,某个 Leader 的响应速度比确认还要慢),那么在堆内内存(on-heap)中的缓冲的数据量越多,其需要回收的垃圾也就越多。

检测应用程序,以跟踪诸如生成的消息数、平均消息大小、以及已使用的消息数等指标

针对 Brokers 的最佳实践

在各个 Brokers 上,请压缩 Topics 所需的内存和 CPU 资源

日志压缩(请参见https://kafka.apache.org/documentation/#compaction)需要各个 Broker 上的堆栈(内存)和 CPU 周期都能成功地配合实现而如果让那些失败的日志压缩数据持续增长的话,则会给 Brokers 分区带来风险。
您可以在 Broker 上调整 log.cleaner.dedupe.buffer.size 和 log.cleaner.threads 这两个参数,但是请记住,这两个值都会影响到各个 Brokers 上的堆栈使用。
如果某个 Broker 抛出 OutOfMemoryError 异常,那么它将会被关闭、并可能造成数据的丢失。
而缓冲区的大小和线程的计数,则取决于需要被清除的 Topic Partition 数量、以及这些分区中消息的数据速率与密钥的大小。
对于 Kafka 的 0.10.2.1 版本而言,通过 ERROR 条目来监控日志清理程序的日志文件,是检测其线程可能出现问题的最可靠方法。

通过网络吞吐量来监控 Brokers

请监控发向(transmit,TX)和收向(receive,RX)的流量,以及磁盘的 I/O、磁盘的空间、以及 CPU 的使用率,而且容量规划是维护群集整体性能的关键步骤。

在群集的各个 Brokers 之间分配分区的 Leader 关系

Leader 通常会需要大量的网络 I/O 资源。例如,当我们将复制因子(replication factor)配置为 3、并运行起来时。
Leader 必须首先获取分区的数据,然后将两套副本发送给另两个 Followers,进而再传输到多个需要该数据的 Consumers 上。
因此在该例子中,单个 Leader 所使用的网络 I/O,至少是 Follower 的四倍。而且,Leader 还可能需要对磁盘进行读操作,而 Follower 只需进行写操作。

不要忽略监控 Brokers 的 in-sync replica(ISR)shrinks、under-replicatedpartitions 和 unpreferred leaders

这些都是集群中潜在问题的迹象。例如,单个分区频繁出现 ISR 收缩,则暗示着该分区的数据速率超过了 Leader 的能力,已无法为 Consumer 和其他副本线程提供服务了。

按需修改 Apache Log4j 的各种属性

详细内容可以参考:https://github.com/apache/kafka/blob/trunk/config/log4j.properties
Kafka 的 Broker 日志记录会耗费大量的磁盘空间,但是我们却不能完全关闭它。
因为有时在发生事故之后,需要重建事件序列,那么 Broker 日志就会是我们最好的、甚至是唯一的方法。

禁用 Topic 的自动创建,或针对那些未被使用的 Topics 建立清除策略

例如,在设定的 x 天内,如果未出现新的消息,您应该考虑该 Topic 是否已经失效,并将其从群集中予以删除。此举可避免您花时间去管理群集中被额外创建的元数据。

对于那些具有持续高吞吐量的 Brokers,请提供足够的内存,以避免它们从磁盘子系统中进行读操作

我们应尽可能地直接从操作系统的缓存中直接获取分区的数据。然而,这就意味着您必须确保自己的 Consumers 能够跟得上“节奏”,而对于那些延迟的 Consumer 就只能强制 Broker 从磁盘中读取了。

对于具有高吞吐量服务级别目标(service level objectives,SLOs)的大型群集,请考虑为 Brokers 的子集隔离出不同的 Topic

至于如何确定需要隔离的 Topics,则完全取决于您自己的业务需要。例如,您有一些使用相同群集的联机事务处理(multipleonline transaction processing,OLTP)系统。
那么将每个系统的 Topics 隔离到不同 Brokers 子集中,则能够有助于限制潜在事件的影响半径。

在旧的客户端上使用新的 Topic 消息格式。应当代替客户端,在各个 Brokers 上加载额外的格式转换服务

当然,最好还是要尽量避免这种情况的发生。

不要错误地认为在本地主机上测试好 Broker,就能代表生产环境中的真实性能了

要知道,如果使用复制因子为 1,并在环回接口上对分区所做的测试,是与大多数生产环境截然不同的。
在环回接口上网络延迟几乎可以被忽略的,而在不涉及到复制的情况下,接收 Leader 确认所需的时间则同样会出现巨大的差异。

参考资料

  1. Kafka 入门与实践 牟大恩著 2017.11 出版
  2. NewRelic 官方博客:20 Best Practices for Working With Apache Kafka at Scale
  3. MapR 官方博客:Kafka as Streaming Transport
  4. Apache Kafka 官方文档
  5. 公众号咖啡拿铁:你必须知道的 Kafka

Comments

10.1k 字
 37 分钟
 2018-10-21
  1. 背景
    1. kafka 名字的由来
    2. kafka 的设计目标
    3. kafka 历史
  2. 应用场景
    1. 消息系统
    2. 应用监控
    3. 网站用户行为追踪
    4. 流处理
    5. 持久性日志
  3. 基本概念
    1. Message(消息)
    2. Producer(生产者)
    3. Broker(代理)
    4. Topic(主题)
    5. Topic Partition(主题分区)
    6. Offset(偏移量)
    7. Consumer(消费者)
    8. Consumer group(消费组)
    9. Lag(延迟)
  4. 架构原理
    1. 架构图
    2. 消费模型
    3. 网络模型
      1. Kafka Client:单线程 Selector
      2. Kafka server:多线程Selector
    4. 日志结构与数据存储
    5. 分区、副本的日志文件存储
  5. 日志索引和数据文件的存储结构
    1. 日志数据文件
    2. 偏移量索引文件
    3. 时间戳索引文件
    4. 副本机制
    5. 高可用及幂等
      1. at-least-once
      2. at-most-once
      3. exactly-once
      4. 如何实现exactly-once
        1. 单Producer单Topic
        2. 事务
  6. 最佳实践
    1. 针对 Partitions 的最佳实践
      1. 了解分区的数据速率,以确保提供合适的数据保存空间
      2. 除非您有其他架构上的需要,否则在写 Topic 时请使用随机分区
    2. 针对 Consumers 的最佳实践
      1. 如果 Consumers 运行的是比 Kafka 0.10 还要旧的版本,那么请马上升级
      2. 调优 Consumer 的套接字缓冲区(socket buffers),以应对数据的高速流入
      3. 设计具有高吞吐量的 Consumers,以便按需实施背压(back-pressure)
      4. 在 JVM 上运行各种 Consumers 时,请警惕垃圾回收对它们可能产生的影响
    3. 针对 Producers 的最佳实践
      1. 配置 Producer,以等待各种确认
      2. 为各个 Producer 配置 Retries
      3. 为高吞吐量的 Producer,调优缓冲区的大小
      4. 检测应用程序,以跟踪诸如生成的消息数、平均消息大小、以及已使用的消息数等指标
    4. 针对 Brokers 的最佳实践
      1. 在各个 Brokers 上,请压缩 Topics 所需的内存和 CPU 资源
      2. 通过网络吞吐量来监控 Brokers
      3. 在群集的各个 Brokers 之间分配分区的 Leader 关系
      4. 不要忽略监控 Brokers 的 in-sync replica(ISR)shrinks、under-replicatedpartitions 和 unpreferred leaders
      5. 按需修改 Apache Log4j 的各种属性
      6. 禁用 Topic 的自动创建,或针对那些未被使用的 Topics 建立清除策略
      7. 对于那些具有持续高吞吐量的 Brokers,请提供足够的内存,以避免它们从磁盘子系统中进行读操作
      8. 对于具有高吞吐量服务级别目标(service level objectives,SLOs)的大型群集,请考虑为 Brokers 的子集隔离出不同的 Topic
      9. 在旧的客户端上使用新的 Topic 消息格式。应当代替客户端,在各个 Brokers 上加载额外的格式转换服务
      10. 不要错误地认为在本地主机上测试好 Broker,就能代表生产环境中的真实性能了
  7. 参考资料