kafka 的使用场景

为什么要使用 Kafka 消息队列?

解耦、削峰:传统的方式上游发送数据下游需要实时接收,如果上游在某些业务场景:例如上午十点会流量激增至顶峰,那么下游资源可能会扛不住压力。但如果使用消息队列,就可以将消息暂存在消息管道中,下游可以按照自己的速度逐步处理;

可扩展:通过横向扩展生产者、消费者和 broker, Kafka 可以轻松处理巨大的消息流;

高吞吐、低延迟:在一台普通的服务器上既可以达到 10W/s 的吞吐速率;

容灾性:kafka 通过副本 replication 的设置和 leader/follower 的容灾机制保障了消息的安全性。

kafka 的高吞吐、低延迟是如何实现的?

1.顺序读写

Kafka 使用磁盘顺序读写来提升性能

顺序读写和随机读写性能对比:

顺序读 随机读 顺序写 随机写
机械硬盘 84.0MB/s 0.033MB/s (512 字节) 79.0MB/s 0.083MB/s (512 字节)
固态硬盘 220.7MB/s 5.296MB/s(512 字节) 77.2MB/s 10.203MB/s(512 字节)

从数据可以看出磁盘的顺序读写速度远高于随机读写的速度,这是因为传统的磁头探针结构,随机读写时需要频繁寻道,也就需要磁头和探针频繁的转动,而机械结构的磁头和探针的位置调整是十分费时的,这就严重影响到硬盘的寻址速度,进而影响到随机写入速度。

Kafka 的 message 是不断追加到本地磁盘文件末尾的,而不是随机的写入,这使得 Kafka 写入吞吐量得到了显著提升 。每一个 Partition 其实都是一个文件 ,收到消息后 Kafka 会把数据插入到文件末尾。

2.页缓存(pageCache)

PageCache 是系统级别的缓存,它把尽可能多的空闲内存当作磁盘缓存使用来进一步提高 IO 效率;

PageCache 同时可以避免在 JVM 内部缓存数据,避免不必要的 GC、以及内存空间占用。对于 In-Process Cache,如果 Kafka 重启,它会失效,而操作系统管理的 PageCache 依然可以继续使用。

3.零拷贝

正常过程:

  1. 操作系统将数据从磁盘上读入到内核空间的读缓冲区中

  2. 应用程序(也就是 Kafka)从内核空间的读缓冲区将数据拷贝到用户空间的缓冲区中

  3. 应用程序将数据从用户空间的缓冲区再写回到内核空间的 socket 缓冲区中

  4. 操作系统将 socket 缓冲区中的数据拷贝到 NIC 缓冲区中,然后通过网络发送给客户端

在这个过程中,可以发现, 数据从磁盘到最终发出去,要经历 4 次拷贝,而在这四次拷贝过程中, 有两次拷贝是浪费的。

1.从内核空间拷贝到用户空间;

2.从用户空间再次拷贝到内核空间;

除此之外,由于用户空间和内核空间的切换,会带来 Cpu 上下文切换,对于 Cpu 的性能也会造成影响;

零拷贝省略了数据在内核空间和用户空间之间的重复穿梭;用户态和内核态切换时产生中断,耗时;

4.分区分段索引

Kafka 的 message 是按 topic 分类存储的,topic 中的数据又是按照一个一个的 partition 即分区存储到不同 broker 节点。每个 partition 对应了操作系统上的一个文件夹,partition 实际上又是按照 segment 分段存储的。符合分布式系统分区分桶的设计思想

通过这种分区分段的设计,Kafka 的 message 消息实际上是分布式存储在一个一个小的 segment 中的,每次文件操作也是直接操作的 segment。为了进一步的查询优化,Kafka 又默认为分段后的数据文件建立了索引文件,就是文件系统上的.index 文件。这种分区分段 + 索引的设计,不仅提升了数据读取的效率,同时也提高了数据操作的并行度。

5.批量处理

kafka 发送消息不是一条一条发送的,而是批量发送,很大的提高了发送消息的吞吐量。

假设发送一条消息的时间是 1ms,而此时的吞吐量就是 1000TPS。但是假如我们将消息批量发送,1000 条消息需要 10ms,而此时的吞吐量就达到了 1000*100TPS。而且这样也很大程度的减少了请求 Broker 的次数,提升了总体的效率。

kafka 架构

基本概念

名词 概念
Producer 生产者(发送消息)
Consumer 消费者(接收消息)
ConsumerGroup 消费者组,可以并行消费同一 topic 中的消息
Broker 一个独立的 kafka 服务器被称为 broker。broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。broker 为消费者提供服务,对读取分区的请求作出相应,返回已经提交到磁盘上的消息。可起到负载均衡、容错的作用。
Topic 主题,一个队列,可理解为按照消息的逻辑分类将消息划分为不同的 topic
Partition topic 的物理分组,一个 topic 可以分为多个 partition,每个 partition 是一个有序队列。可起到提高可扩展性,应对高并发场景的作用。
replica 副本,为保证集群的高可用性,kafka 提供副本机制,一个 topic 的每个分区都有若干个副本,一个 leader 和若干个 follower
leader 每个分区多个副本的主节点,生产者发送数据的对象,以及消费者消费数据的对象都是 leader
offset 对于 Kafka 中的分区而言,它的每条消息都有唯一的 offset,用来表示消息在分区中对应的位置。

架构图

Q1:Topic 的分区及副本在 broker 上是如何分配的呢?

这里涉及到两个参数:

startIndex:第一个分区的第一个副本放置位置(P0-R0)

nextReplicaShift:其他分区的副本的放置是依次后移的,间隔距离就是 nextReplicaShift 值。

Q2:Kafka 的架构是基于什么设计思想呢?

分治思想:

1. topic 分治:对于 kafka 的 topic,我们在创建之初可以设置多个 partition 来存放数据,对于同一个 topic 的数据,每条数据的 key 通过哈希取模被路由到不同的 partition 中(如果没有设置 key,则根据消息本身取模),以此达到分治的目的。

2. partition 分治:为了方便数据的消费,kafka 将原始的数据转化为” 索引 + 数据 “的形式进行分治,将一个 partition 对应一个文件转变为一个 partition 对应多个人不同类型的文件,分别为:

3. 底层文件分治:不能将 partition 全部文件都放入一套 ”.index+.log+.timeindex“ 文件中,因此需要对文件进行拆分。kafka 对单个.index 文件、.timeindex 文件、.log 文件的大小都有限定(通过不同参数配置),且这 3 个文件互为一组。当.log 文件的大小达到阈值则会自动拆分形成一组新的文件,这种将数据拆分成多个的小文件叫做 segment,一个 log 文件代表一个 segment。

kafka 工作流程

生产流程:

  1. 先从 zk 获取对应分区的 leader 在哪个 broker

  2. broker 进程上的 leader 将消息写入到本地 log 中

  3. follower 从 leader 上拉取消息,写入到本地 log,并向 leader 发送 ACK

  4. leader 接收到所有的 ISR 中的 Replica 的 ACK 后,并向生产者返回 ACK

消费流程:

  1. 每个 consumer 都可以根据分配策略,获得要消费的分区

  2. 获取到 consumer 对应的 leader 处于哪个 broker 以及 offset

  3. 拉取数据

  4. 消费者提交 offset

分区策略

相信上面的内容已经让大家大致了解了消息生产及消费的过程:一个 topic 内的消息会被发送到不同的分区以供不同的消费者拉取消息。

那么在这个过程中就涉及到了两个问题:

  1. 生产者按照什么策略将数据分配到分区中呢?

  2. 消费者按照什么策略去不同的分区拉取消息呢?

生产者分区策略

生产者写入消息到 topic,Kafka 将依据不同的策略将数据分配到不同的分区中:

1. 轮询分区策略

即按消息顺序进行分区顺序分配,是默认的策略,也是使用最多的策略,可以最大限度保证所有消息平均分配到一个分区;

key 为 null,则使用轮询算法均衡地分配分区;

2. 按 key 分区分配策略

key 不为 null,key.hash() % n

但是按照 key 决定分区有可能会造成数据倾斜

3. 随机分区策略

随机分区,不建议使用

4. 自定义分区策略

根据业务需要制定以分区策略

乱序问题在 Kafka 中生产者是有写入策略,如果 topic 有多个分区,就会将数据分散在不同的 partition 中存储当 partition 数量大于 1 的时候,数据(消息)会打散分布在不同的 partition 中如果只有一个分区,消息是有序的

消费者分区策略

同一时刻,一条消息只能被组中的一个消费者实例消费:

  1. 消费者数=分区数:一个分区对应一个消费者

  2. 消费者数<分区数:一个消费者对应多个分区

  3. 消费者数>分区数:多出来的消费者将不会消费任何消息

分区分配策略:保障每个消费者尽量能够均衡地消费分区的数据,不能出现某个消费者消费分区的数量特别多,某个消费者消费的分区特别少

1. Range 分配策略(范围分配策略):Kafka 默认的分配策略

计算公式:

n=分区数量/消费者数量

m=分区数量% 消费者数量

前 m 个消费者消费 n+1 个,剩余消费者消费 n 个

以上图为例:n=8/3=2m=8%3=2 因此前 2 个消费者消费 2+1=3 个分区,剩下 1 个消费者消费 2 个分区

2. RoundRobin 分配策略(轮询分配策略)

消费者挨个分配消费的分区:

如下图,3 个消费者共同消费 8 个分区

第一轮:Consumer0-->A-Partition0;Consumer1-->A-Partition1;Consumer2-->A-Partition2

第二轮:Consumer0-->A-Partition3;Consumer1-->B-Partition0;Consumer2-->B-Partition1

第三轮:Consumer0-->B-Partition2;Consumer1-->B-Partition3

3. Striky 粘性分配策略

在没有发生 rebalance 跟轮询分配策略是一致的

发生了 rebalance(例如 Consumer2 故障宕机),轮询分配策略,重新走一遍轮询分配的过程。而粘性会保证跟上一次的尽量一致,只是将新的需要分配的分区,均匀的分配到现有可用的消费者中即可,这样就减少了上下文的切换

副本的 ACK 机制

producer 是不断地往 Kafka 中写入数据,写入数据会有一个返回结果,表示是否写入成功。这里对应有一个 ACKs 的配置。

根据业务情况来选择 ack 机制,是要求性能最高,一部分数据丢失影响不大,可以选择 0/1。如果要求数据一定不能丢失,就得配置为-1/all。

分区中是有 leader 和 follower 的概念,为了确保消费者消费的数据是一致的,只能从分区 leader 去读写消息,follower 做的事情就是同步数据。

Q&A:

1. offset 存在哪里?

0.9 版本前默认存在 zk,但是由于频繁访问 zk,zk 需要一个一个节点更新 offset,不能批量或分组更新,导致 offset 更新成了瓶颈。

在新版 Kafka 以及之后的版本,Kafka 消费的 offset 都会默认存放在 Kafka 集群中的一个叫 __consumer_offsets 的 topic 中。offset 以消息形式发送到该 topic 并保存在 broker 中。这样 consumer 提交 offset 时,只需连接到 broker,不用访问 zk,避免了 zk 节点更新瓶颈。

2.leader 选举策略?

2 种 leader:①broker 的 leader 即 controller leader ② partition 的 leader

  1. Controller leader:当 broker 启动的时候,都会创建 KafkaController 对象,但是集群中只能有一个 leader 对外提供服务,这些每个节点上的 KafkaController 会在指定的 zookeeper 路径下创建临时节点,只有第一个成功创建的节点的 KafkaController 才可以成为 leader,其余的都是 follower。当 leader 故障后,所有的 follower 会收到通知,再次竞争在该路径下创建节点从而选举新的 leader

  2. Partition leader :由 controller leader 执行

如何处理所有 Replica 都不工作?在 ISR 中至少有一个 follower 时,Kafka 可以确保已经 commit 的数据不丢失,但如果某个 Partition 的所有 Replica 都宕机了,就无法保证数据不丢失了。这种情况下有两种可行的方案:等待 ISR 中的任一个 Replica“活” 过来,并且选它作为 Leader 选择第一个 “活” 过来的 Replica(不一定是 ISR 中的)作为 Leader 这就需要在可用性和一致性当中作出一个简单的折衷。如果一定要等待 ISR 中的 Replica“活” 过来,那不可用的时间就可能会相对较长。而且如果 ISR 中的所有 Replica 都无法 “活” 过来了,或者数据都丢失了,这个 Partition 将永远不可用。选择第一个 “活” 过来的 Replica 作为 Leader,而这个 Replica 不是 ISR 中的 Replica,那即使它并不保证已经包含了所有已 commit 的消息,它也会成为 Leader 而作为 consumer 的数据源(前文有说明,所有读写都由 Leader 完成)。Kafka0.8.* 使用了第二种方式。根据 Kafka 的文档,在以后的版本中,Kafka 支持用户通过配置选择这两种方式中的一种,从而根据不同的使用场景选择高可用性还是强一致性。

3.分区可以提高扩展性以及吞吐量,那分区越多越好吗?

分区并不是越多越好,分区过多存在着以下弊端:

4.与数据库相比 kafka 的优势?

5.消费偏移的更新方式 无论是 kafka 默认 api,还是 java 的 api,offset 的更新方式都有两种:自动提交和手动提交

  1. 自动提交(默认方式)

Kafka 中偏移量的自动提交是由参数 enable_auto_commit 和 auto_commit_interval_ms 控制的,当 enable_auto_commit=True 时,Kafka 在消费的过程中会以频率为 auto_commit_interval_ms 向 Kafka 自带的 topic(__consumer_offsets) 进行偏移量提交,具体提交到哪个 Partation:Math.abs(groupID.hashCode()) % numPartitions。

这种方式也被称为 at most once,fetch 到消息后就可以更新 offset,无论是否消费成功。

2. 手动提交

鉴于 Kafka 自动提交 offset 的不灵活性和不精确性 (只能是按指定频率的提交),Kafka 提供了手动提交 offset 策略。手动提交能对偏移量更加灵活精准地控制,以保证消息不被重复消费以及消息不被丢失。

对于手动提交 offset 主要有 3 种方式:

同步提交:提交失败的时候一直尝试提交,直到遇到无法重试的情况下才会结束,同步方式下消费者线程在拉取消息会被阻塞,在 broker 对提交的请求做出响应之前,会一直阻塞直到偏移量提交操作成功或者在提交过程中发生异常,限制了消息的吞吐量。

异步提交:异步手动提交 offset 时,消费者线程不会阻塞,提交失败的时候也不会进行重试,并且可以配合回调函数在 broker 做出响应的时候记录错误信息。 对于异步提交,由于不会进行失败重试,当消费者异常关闭或者触发了再均衡前,如果偏移量还未提交就会造成偏移量丢失。

异步 + 同步:针对异步提交偏移量丢失的问题,通过对消费者进行异步批次提交并且在关闭时同步提交的方式,这样即使上一次的异步提交失败,通过同步提交还能够进行补救,同步会一直重试,直到提交成功。 通过 finally 在最后不管是否异常都会触发 consumer.commit() 来同步补救一次,确保偏移量不会丢失

参考资料:

https://www.jianshu.com/p/90a15fe33551

https://www.jianshu.com/p/da3f19cee0e2

https://www.modb.pro/db/373502

https://blog.csdn.net/anryg/article/details/123579937

https://toutiao.io/posts/ptwuho/preview

作者:京东科技 于添馨

来源:京东云开发者社区 转载请注明来源


↙↙↙阅读原文可查看相关链接,并与作者交流