数据测试 专为小白打造—Kafka 一篇文章从入门到入土 | 京东云技术团队

京东云开发者 · 2023年09月26日 · 3493 次阅读

一、什么是 Kafka

MQ 消息队列作为最常用的中间件之一,其主要特性有:解耦、异步、限流/削峰。

Kafka 和传统的消息系统 (也称作消息中间件) 都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。与此同时,Kafka 还提供了大多数消息系统难以实现的消息顺序性保障及回溯消费的功能。

二、Kafka 常用概念

2.1 Topic 与 Partition

Topic(主题)是一个逻辑概念,在物理上并不存储。主要用于描述一个类型的消息。例如我们有一个业务系统会发送一个描述用户订单状态的消息,那么这一个类型里面所有的消息就是一个 Topic,又比如这个业务系统同时还会发送描述会员余额的消息,那么这个就是一个新的消息类型,也就是一个新的 Topic

Partition(分区)是一个物理概念,是实际存在于物理设备上的。一个 Topic 由多个 Partition 共同组成。Partition 的存在是为了提高消息的性能与吞吐量,多个分区多个进程消息处理速度肯定要比单分区快的多。

image-20230816151418046

2.2 Broker 与 Partition

Broker 作为分布式的实现,其实可以直接简单理解为一个 Kafka 进程就是一个 Broker。

我们之前提到 Partition 是物理存在的,其物理的存在的位置就在 Broker 中。同时,为了服务具有一定的可靠性,每一个分区都有几个副本,每个副本存在于不同的 Broker 中。

img

我们之前提到的 Topic 是逻辑概念即在于此,并没有物理存在,图中每个 TopicA-x 都是一个 Partition,其中后面的数字代表了一个分区中的第几个副本,每个 Broker 中都有不同的副本,目的就是当有 Broker 宕机时,其他的副本还存在保证系统的可用性。

此外,多个副本 Partition 中会选取一个作为 leader,其他的作为 follower。我们的生产者在发送数据的时候,是直接发送到 leader partition 里面,然后 follower partition 会去 leader 那里自行同步数据,消费者消费数据的时候,也是从 leader 那去消费数据的

副本处于不同的 broker 中,当 leader 副本出现故障时,从 follower 副本中重新选举新的 leader 副本对外提供服务。Kafka 通过多副本机制实现了故障的自动转移,当 Kafka 集群中某个 broker 失效时仍然能保证服务可用。

2.3 生产者消费者与 ZooKeeper

产生消息的角色或系统称之为生产者,例如上述某个业务系统产生了关于订单状态的相关消息,那么该业务系统即为生产者。

消费者则是负责接收或者使用消息的角色或系统。

ZooKeeper 是 Kafka 用来负责集群元数据的管理、控制器 的选举等操作的。Producer 将消息发送到 Broker,Broker 负责将收到的消息存储到磁盘中,而 Consumer 负责从 Broker 订阅并消费消息。

img

在每一个 Broker 在启动时都会像向 ZK 注册信息,ZK 会选取一个最早注册的 Broker 作为 Controller,后面 Controller 会与 ZK 进行数据交互获取元数据(即整个 Kafka 集群的信息,例如有那些 Broker,每个 Broker 中有那些 Partition 等信息),然后其他 Broker 再与 Controller 交互进而所有的 Broker 都能感知到整个集群的所有信息.

2.4 消费者组

目前大部分业务系统架构都是分布式的,即一个应用会部署多个节点。正常来说,一条消息只应该被其中某一个节点消费掉,而不应该是所有被所有的消费者同时消费一遍。因此就产生了消费者组的概念,在一个消费者组中,一条消息只会被消费者组中的一个消费者所消费。

从使用上来说,一般配置为一个应用为一个消费者组,或一个应用中不同的环境也可以配置不用的消费者组。例如生产环境的节点与预发环境的节点可以配置两套消费者组,这样在有新的改动部署在预发时,即时本次改动修改了消费动作的相关逻辑,也不会影响生产的数据。

img

消费者与消费组这种模型可以让整体的消费能力具备横向伸缩性,我们可以增加 (或减少) 消费者的个数来提高 (或降低) 整体的消费能力。对于分区数固定的情况,一味地增加消费者 并不会让消费能力一直得到提升,如果消费者过多,出现了消费者的个数大于分区个数的情况, 就会有消费者分配不到任何分区。参考下图(右下),一共有 8 个消费者,7 个分区,那么最后的消费 者 C7 由于分配不到任何分区而无法消费任何消息。

img

2.5 ISR、HW、LEO

Kafka 通过 ISR 机制尽量保证消息不会丢失。

一个 Partition 中所有副本称为AR(Assigned Replicas),所有与 leader 副本保持一定程度同步的副本 (包括 leader 副本在内) 组成 **ISR (In-Sync Replicas)。** 我们上文提到,follower 副本只负责消息的同步,很多时候 follower 副本中的消息相对 leader 副本而言会有一定的滞后,而及时与 leader 副本保持数据一致的就可以成为 ISR 成员。与 leader 副本同步滞后过多的副本 (不包括 leader 副本) 组成OSR (Out-of-Sync Replicas),由此可见,AR=ISR+OSR。 在正常情况下,所有的 follower 副本都应该与 leader 副本保持一定程度的同步,即 AR=ISR, OSR 集合为空。

leader 副本会监听所有 follower 副本,当其与 leader 副本数据一致时会将其加入 ISR 成员,当与 leader 副本相差太多或宕机时会将其踢出 ISR,也会再其追上 leader 副本后重新加入 ISR。

当 leader 副本宕机或不可用时,只有 ISR 成员才能有机会被选择为新的 leader 副本,这样就能确保新的 leader 与已经宕机的 leader 数据一致,而如果选择 OSR 中的副本作为 leader 时会造成部分未同步的数据丢失。

MQ1.drawio

上图情况中,P1 副本首先当选了 leader,且只有 P2 副本同步了 P1 的数据,offset 都为 110,那么此时的 ISR 只有 P1 与 P2,OSR 有 P3 和 P4。当 P3 同步数据到 110 后,也会被 leader 加入到 ISR 中,若此时 leader 宕机,则会从 ISR 中选出一个新的 leader,并将 P0 踢出 ISR 中。

那么 leader 是如何感知到其他副本是否与自己数据一致呢?靠的就是 HW 与 LEO 机制。

LEO 是 Log End Offset 的缩写,它标识当前日志文件中下一条待写入消息的 offset,LEO 的大小相当于当前日志分区中最后一条消息的 offset 值加 1。分区 ISR 集合中的每个副本都会维护自身的 LEO而 ISR 集合中最小的 LEO 即为分区的 HW,HW 是 High Watermark 的缩写,俗称高水位,它标识 了一个特定的消息偏移量 (offset),消费者只能拉取到这个 offset 之前的消息。

img

上图中,因为所有副本消息都是一致的,所以所有 LEO 都是 3,HW 也为 3,当有新的消息产生时,即 leader 副本新插入了 3/4 两条消息,此时 leader 的 LEO 为 5,两个 follower 的此时未同步消息,所以 LEO 仍未 3,HW 选择最小的 LEO 是 3.

当 follower1 同步完成 leader 的数据后,LEO 未 5,但 follower2 未同步,所以此时 HW 仍未 3。此后 follower2 同步完成后,其 LEO 为 5,所有副本的 LEO 都未 5,此时 HW 选择最小的为 5。

通过这种机制,leader 副本就能知道那些副本是满足 ISR 条件的(该副本 LEO 是否等于 leader 副本 LEO)。

三、Kafka 全流程梳理

3.1 注册信息

Kafka 强依赖与 ZooKeeper 以维护整个集群的信息,因此在启动前应该先启动 ZooKeeper。

MQ-第 2 页.drawio

在 ZK 启动完成之后,所有的 Broker(即所有的 Kafka 进程) 都会向 ZK 注册信息,然后争取/controller的监听权,获取到监听权的 Broker 称为 Controller,此后由 Controller 与 ZK 进行信息交换,所有的 Broker 与 Controller 进行消息交换。进而保持整个 Kafka 集群的信息一致性。

image-20230824143525137

3.2 创建主题

在所有的 Broker 注册完毕后,需要注册主题(Topic)以继续后续流程。

MQ-第 3 页.drawio

其中某个客户端接收到创建 Topic 请求后,会将请求中的分区方案 (有几个分区、几个副本等) 告诉 ZK,ZK 再将信息同步至 Controller,此后所有的 Broker 与 Controller 交换完元数据,至此所有的 Broker 都已经知道该 Topic 的分区方案了,然后按照该分区方案创建自己的分区或副本即可。

image-20230824143846692

以上就是某一个 broker 下面的某一个主题的分布情况

3.3 生产者发送数据

在创建完想要的 Topic 之后,生产者就可以开始发送数据。

MQ-第 4 页.drawio

3.3.1 封装 ProducerRecord

首先生产者会将信息封装成ProducerRecord

private final String topic;
private final Integer partition;
private final Headers headers;
private final K key;
private final V value;
private final Long timestamp;


其中主要包好了要发送的 Topic 名称,要发送至那个分区,以及要发送的数据和 key。

其他的都比较好理解,key 的作用是如果 key 存在的话,就会对 key 进行 hash,然后根据不同的结果发送至不同的分区,这样当有相同的 key 时,所有相同的 key 都会发送到同一个分区,我们之前也提到,所有的新消息都会被添加到分区的尾部,进而保证了数据的顺序性。

例如我们有个关于会员的业务系统,其中生产者会产生关于某个会员积分的信息,消费者拿到这个消息之后会实际对积分进行操作。假如某个会员先获得了 100 积分,然后又消费了 50 积分。因此生产者会发送两个 MQ 消息,但是假如没有使用 key 的功能,这两个消息被发送到了不同的分区,因为每个分区的消费水平不一样(例如获得积分的逻辑耗时比较长而某个分区又都是获得积分的 MQ),就有可能造成消费 50 积分的 MQ 会先被消费者收到

而假如此时会员积分为 0 的情况下再去消费 50 积分明显是不合理且逻辑错误的,会造成业务系统异常。因此在生产者发送 MQ 时如果消息有顺序性要求则一定要将 key 赋值,具体的可以是某些有唯一性标识例如此处可以是会员 ID。

3.3.2 序列化数据、获取元数据、确定分区

首先生产则客户端的序列化器会将要发送的 ProducerRecord 对象序列化成字节数组 ,然后发送到消费端后消费端的反序列化器会将字节数组再转换成对应的消费对象。常用的序列化器有 String、Doule、Long 等等。

其次也可以自定义序列化器与反序列化器,例如可以将将字节数组进行加密后再进行传输,以此保证数据的安全性。

数据都准备完成之后就可以开始获取 broker 元数据,例如 host 等,以方便后续确定要发送的位置。

确定要发送至那个分区有几种情况:

  1. 如果 ProducerRecord 中指定了要发往那个分区,则选择用户使用的分区

  2. 如果没有指定分区,则查看 ProducerRecord 中 key 是否为空,如果不为空则对 key 进行计算以获取使用那个分区

  3. 如果 key 也为空,则按照轮询的方式发送至不同的分区

也可以通过自定义分区器的方式确定发送那个分区。

3.3.3 写入缓冲区、分批分送消息

生产者发送的 MQ 并不会直接通过网络发送至 broker,而是会先保存在生产者的缓冲区。

然后由生产者的 Sender 线程分批次将数据发送出去,分批次发送的原因是可以节省一定的网络消耗与提升速度,因为一次发送一万条与一万次发送一条肯定效率不太一样。

分批次发送主要有两个参数,批次量与等待时间。两个参数主要是解决两个问题,一个是防止一次发送的消息量过大,比如一次可能发送几十 mb 的数据。另一个解决的问题是防止长时间没有足够消息产生而导致的消息一直不发送。因此当上述两个条件任意满足其一就会触发这一批次的发送。

Kafka 的网络模型用的是加强版的 reactor 网络模型

img

首先客户端发送请求全部会先发送给一个 Acceptor,broker 里面会存在 3 个线程(默认是 3 个),这 3 个线程都是叫做 processor,Acceptor 不会对客户端的请求做任何的处理,直接封装成一个个 socketChannel 发送给这些 processor 形成一个队列,发送的方式是轮询,就是先给第一个 processor 发送,然后再给第二个,第三个,然后又回到第一个。消费者线程去消费这些 socketChannel 时,会获取一个个 request 请求,这些 request 请求中就会伴随着数据。

线程池里面默认有 8 个线程,这些线程是用来处理 request 的,解析请求,如果 request 是写请求,就写到磁盘里。读的话返回结果。 processor 会从 response 中读取响应数据,然后再返回给客户端。这就是 Kafka 的网络三层架构。

所以如果我们需要对 kafka 进行增强调优,增加 processor 并增加线程池里面的处理线程,就可以达到效果。request 和 response 那一块部分其实就是起到了一个缓存的效果,是考虑到 processor 们生成请求太快,线程数不够不能及时处理的问题。

3.4 消费者消费数据

消费者消费也主要分为两个阶段:

  1. 信息注册阶段,即整个消费者组向集群注册消费信息等

  2. 信息消费阶段,开始信息消息,确保消息可靠性等

3.4.1 信息注册

首先消费者组内所有消费者都会向集群寻找自己的 Coordinator(以消费者组 id 做均衡)。找到 Coordinator 后,所有的 Consumer 都会向 Coordinator 发起join group加入消费者组的请求,Coordinator 会选择一个最早发起请求的 Consumer 作为 leader Consumer,其他的 Consumer 作为 follower。

MQ-第 6 页.drawio

leader 会根据要消费的 Topic 及分区情况制定一个消费方案,告知给 Coordinator,Coordinator 再将此消费方案告知给各个 follower。

自此,所有的 Consumer 都已经知道自己要消费那个分区了。

如上图,每个消费者都找了自己要消费的分区情况

3.4.2 消费信息

消费信息主要包含了以下几个步骤:

MQ-第 8 页.drawio

1)拉取消息

常用的消息队列的消费消息一般有两种,推送或者拉取,Kafka 在此处用的是拉取模式。

try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
    for (ConsumerRecord<String, String> record : records) {
      int updateCount = 1;
      if (map.containsKey(record.value())) {
        updateCount = (int) map.get(record.value() + 1);
      }
      map.put(record.value(), updateCount);
    }
  }
}finally {
  consumer.close();
}


通过设置定时时间,每隔多长时间拉取一次消息。

2)反序列化与消费消息

在上面的代码中,我们拿到的就是ConsumerRecord对象,但是实际上这个是消费者客户端帮我们做的反序列化的操作,将字节数组 (byte[]) 反序列化成了对象。参考 3.3.2 我们也可以自定义反序列化器。

3)提交消息位移

例如当消息队列中有 100 条消息,消费者第一次消费了 20 条消息,那么第二次消费的位置肯定是要从第 21 条消息开始消费,而记录第 21 条消息的信息称之为 offset,offset 为已经消费位置 +1.

MQ-第 9 页.drawio

在之前版本的客户端,offset 数据被存在 zk 中,每次都需要请求 zk 获取数据,而 zk 并不适合作为高并发的请求。因此在现在的版本中,kafka 通过建立一个 Topic 来记录所有消费者消费的 offset,这个 Topic 是__consumer_offsets。每一个消费者在消费数据之前(即 pol() 方法中),都会把上一次消费数据中最大的 offset 提交到该 Topic 中,即此时是作为生产者的身份投递信息。

image-20230825165847711

kafka 中有几种 offset 提交模式,默认的是自动提交:

enable.auto.commit设置为 true 时,每隔auto.commit.interval.ms时间会自动提交已经已经拉取到的消息中最大的 offset。

但是默认的自动提交也会带来重复消费与消息丢失的问题:

  • 重复消费。例如从 offset 为 21 开始拉取数据,拉取到了 40,但是当消费者处理到第 30 条数据的时候系统宕机了,那么此时已经提交的 offset 仍为 21,当节点重新连接时,仍会从 21 消费,那么此时 21-30 的数据就会被重新消费。还有一种情况是再均衡时,例如有新节点加入也会引发类似的问题。

  • 消息丢失。

手动同步提交

public static void main(String[] args) {
    while (true) {
        // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
        records.forEach((ConsumerRecord<String, String> record) -> {
            // 模拟消息的处理逻辑
            System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
        });
        try {
            //处理完当前批次的消息,在轮询更多的消息之前,调用commitSync方法提交当前批次最新的消息
            consumer.commitSync();
        } catch (CommitFailedException e) {
            //todo 事务回滚
            e.printStackTrace();
        }
    }
}


手动同步提交可以在任何时候提交 offset,例如可以每消费一条进行一次提交。提交失败之后会抛出异常,可以在异常中做出补偿机制,例如事务回滚等操作。

但是因为手动同步提交是阻塞性质的,所以不建议太高的频率进行提交。

手动异步提交

异步提交有三种方式,区别在于有没有回调的方式。

@Test
public void asynCommit1(){
    while (true) {
        // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
        records.forEach((ConsumerRecord<String, String> record) -> {
            System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
        });
        consumer.commitAsync();
    }
}

@Test
public void asynCommit2(){
    while (true) {
        // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
        records.forEach((ConsumerRecord<String, String> record) -> {
            System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
        });
        // 异步回调机制
        consumer.commitAsync(new OffsetCommitCallback(){
            @Override
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                if (exception!=null){
                    System.out.println(String.format("提交失败:%s", offsets.toString()));
                }
            }
        });
    }
}

@Test
public void asynCommit3(){
    while (true) {
        // 这里的参数指的是轮询的时间间隔,也就是多长时间去拉一次数据
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));
        records.forEach((ConsumerRecord<String, String> record) -> {
            System.out.println("revice: key ===" + record.key() + " value ====" + record.value() + " topic ===" + record.topic());
        });
        consumer.commitAsync((offsets, exception) ->{
            if (exception!=null){
                System.out.println(String.format("提交失败:%s", offsets.toString()));
            }
        });
    }
}


异步提交commitAsync()与同步提交commitSync()最大的区别在于异步提交不会进行重试,同步提交会一直进行自动重试,当然也可以通过再发生异常时继续提交的方式来完成此功能。

同步 + 异步

可以使用同步 + 异步的形式保证数据能够准确提交:

while (true) {
    ConsumerRecords records = consumer.poll(100);
    for (ConsumerRecord record : records) {
        log.trace("Kafka消费信息ConsumerRecord={}",record.toString());
    }
    try {
        //先使用异步提交机制
        consumer.commitAsync();
    } catch (CommitFailedException e) {
        // todo 补偿机制
        log.error("commitAsync failed", e)
    } finally{
        try {
        //再使用同步提交机制
            consumer.commitSync();
         }  catch (CommitFailedException e) {
            // todo 补偿机制
            log.error("commitAsync failed", e)
        } finally{
            consumer.close();
         }
    }
}


四、异常场景实践

4.1 异常重试

我们系统之前遇到过消费者在消费消息时,短时间内连续报错。根据现象以为是系统出现问题,后续发现所有报错都是同一条消息,排查后发现是处理消息过程中存在未捕获的异常,导致消息重试,相同的问题引发了连续报错。

JMQ 在消费过程中如果有未捕获的异常会认为消息消费失败,会首先在本地重试两次后放入重试队列中,进入重试队列的消息,会有过期逻辑,当超过重试时间或者超过最大重试次数后 (默认 3 天过期),消息将会被丢弃。因此在处理消息时需要考虑如果出现异常后的处理场景,选择是重试还是忽略还是记录数据后告警。

因此我们在消费消息的过程中,尤其是采用 pull 模式,一定要根据业务场景注意异常的捕获。否则小则影响本条消息,大则本批次后续所有消息都可能丢失。

//每隔1min拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(60L));
for (ConsumerRecord<String, String> record : records) {
    try {
        //doing
    } catch (Exception e) {
        //如果此处未捕获消息,会直接导致for循环退出,后续所有消息都将丢失
        log.error("Bdp监听任务执行失败, taskName:{}", taskName, e);
    }
}


4.2 本地重试与服务端重试

系统还遇到过在 JMQ 服务端配置了消费失败重试的逻辑,例如重试多少次间隔多久,但是在消费失败之后,发现重试的逻辑并没有按照配置的逻辑走。联系运维帮忙排查后发现:

重试分为本地重试和服务端重试


根据 4.1 我们知道消费失败后,会首先在本地重试,本地重试失败后会放入重试队列,则此时进入服务端重试,两套重试需要两套配置,本地的重试配置在本地的配置文件中。

本地配置如下:

<jmq:consumer id="apiConsumer" transport="jmq.apilog.transport">
    <!--配置间隔1秒,重试3次-->
    <jmq:listener topic="${jmq.topic.apilog}" listener="apiLogMessageListener" retryDelay="1000" maxRetrys="3"/>
</jmq:consumer>


服务端重试配置:

image.png

作者:京东科技 韩国凯

来源:京东云开发者社区 转载请注明来源

暂无回复。
需要 登录 后方可回复, 如果你还没有账号请点击这里 注册