专栏文章 大数据测试场景科普 -- 流计算篇 (上)

孙高飞 · 2020年12月17日 · 最后由 小帅b 回复于 2021年01月22日 · 1861 次阅读
本帖已被设为精华帖!

前言

3 年前的时候写过关于一些大数据入门基础的文章, 当时学习的是 spark。 文章链接如下:
大数据介绍:https://testerhome.com/topics/7988
spark 基础操作:https://testerhome.com/topics/8040
shuffle 和性能测试:https://testerhome.com/topics/8120
离线大数据作业的测试方法:https://testerhome.com/topics/17092

这一篇算是弥补了之前对于流计算的缺失吧。 由于我们产品在今年加入了流计算的能力, 并且 Flink 貌似也有要在流计算领域中一统江山的架势,所以我前段时间借着调研混沌工程方案的契机,也开始学习了 Flink 并了解我们产品对于流计算的应用场景(PS:混沌工程是流计算中一个比较重要的测试手段)。 今天把学习和实践的一些总结分享出来。

什么场景需要流计算

流计算一般都是在一些数据计算的实时性要求很高的场景中出现, 之前在讲 spark 的时候都是基于离线的批处理计算的, 这种计算方式无法满足产品对实时性的要求。 比如如果我们要在大看板上计算 PV 和 UV 的数据, 一般都是希望能够实时的观看到这些数据的变化。 而计算 PV 和 UV 的操作又不好嵌入到业务系统中, 因为对业务侵入性太强并且会影响性能。 所以一般的架构可能是如下的样子:

业务系统会将用户行为数据 push 到消息中间件 (kafka) 中, 这样达到了解耦和降低性能开销的目的。 而 flink streaming 服务会订阅 kafka 的 topic 进行流处理, 也就是一旦有数据从 kafka 中发送过来,满足一定条件后就会触发 flink 的一系列算子进行计算, 数据在 flink 中的这些算子中进行传递,聚合,计算等操作后, 将经过处理的数据推送给外部的存储系统或者业务系统, 这些系统会将数据做进一步保存和处理后展示在大屏上 -- 这就是一种计算 PV 和 UV 的简单的场景了。 整个过程之所以叫,就是因为数据并不是像传统的方式保存到文件系统中,在保存到了一定的量或者利用定时任务触发批处理计算的方式执行。而是数据就像一条 pipeline(流水线) 一样,来一个 (或者一小批,可以规定时间窗口,可以规定数据个数) 就处理一个,并把处理结果传到 pipeline 的下一个算子上继续处理,这种方式是不是有点像 jenkins 的 piepline~ 实际上很多 AI 系统,比如推荐系统,反欺诈系统这些对实时性要求比较高的场景都要利用流计算来实时的进行处理。 如果要展开它的内部处理过程大概是下面这个图:

重点说明一下上图中的 task, 这个 task 就可以理解为 Flink 中的算子了, 也叫 operator。 在 Flink 中可以定义当数据到来的时候, 都经过哪些算子,按照什么顺序进行计算。 比如可以先试用 filter 算子把没用的数据进行过滤, 再使用 map 算子对原始数据做一些转换, 后面再使用 sum 算子进行累加计算出 PV。 当然这些 task 是可以并行计算的,Flink 可以合并计算结果。 要是写代码的话,大概是下面这个样子:

上面是大数据领域经典 demo workcount, 计算文件中的词频。 上面我用红色框起来的部分就是算子, 先是 flatmap 做一些处理, 再使用 keyBy 算子把数据分类, 把有相同的 key 的数据分到一个组里,然后进行 sum 的累加计算, 这样就能计算出每个 key(单词) 的词频了 (这个单词出现多少次)。 PS: 代码里的 keyBy(0) 中的 0 是数据的第几列, 意思是按第几列进行分组。

OK, 上面就是简单讲讲什么是流计算以及什么场景需要流计算。flink 的算子和运行模式跟 spark 是很像的,对 flink 的使用还有疑问的同学可以看看我之前写的 spark 基础。 下面要开始讲测试点了。

从消息中间件说起

好像现在业界主流的能支持流计算的消息中间件也就只有 kafka 了,所以我下面都用 kafka 来举例 (实际上我也只用过 kafka,请原谅我知识上的匮乏),这里我要讲一下 kafka 的精准一次性语义, 之所以讲这个是要开始讲述流计算中最重要最难以验证的一个场景(对,我就是想先讲难的,重要的😂 😂 )---- 数据一致性。 什么是数据一致性呢,就是不论在任何情况下数据被处理的结果都是一致的。 这里说的任何情况包括但不限于:

  • 计算任务异常重启后导致之前已经计算过的数据丢失
  • 计算任务异常重启后导致之前已经计算过的数据在本次任务进行重试的时候造成的数据重复计算
  • 计算任务不能因为网络延迟,异常等因素导致数据传递给下游系统失败后导致的数据丢失

那么我们看 kafka 是怎么处理这种情况的, 当我们使用 kafka 的 producer 向 broker 推送消息的时候,怎么能保证本次推送的消息不会因为各种异常导致数据丢失呢? 很多小伙伴可能已经想到了重试, 如果因为网络异常等原因导致 push 请求异常的话,那么我们重试几次就好了,毕竟 kafka 开了高可用模式,集群上会有其他的 broker 提供服务,就算当前的 broker 彻底跪了数据也不会丢失的。 但是我们是否想过一个问题,重试请求是可以随便执行的么? 或者说程序怎么能确定本次失败的推送请求就是真的失败了,也就是数据没有保存到 kafka 上。 在 kafka 中确认消息是否推送成功是需要 producer 和 broker 互相交换 ACK 的, 也就是 producer 在把消息推送给 broker 后,broker 在保存成功后要给 producer 回一个 ACK 让 客户端知道消息已经保存成功了。 那么如果我们的异常是发生在 broker 已经保存好数据和把 ACK 发送到客户端之间呢? 也就是数据已经保存好了, 只是没有给客户端返回 ACK,所以客户端认为这个推送消息的请求是失败的。 那么这个时候如果我们执行了 retry 的逻辑,实际上数据就出现了重复的场景。

这么解释大家是不是就明白了 retry 的逻辑不是能随随便便加的,它有一个前提条件, 就是它要 retry 的那个接口必须是幂等的。 这个我再当初讲混沌工程的时候也提过, 一个高可用的系统,它的接口必须是幂等的, 因为高可用的模式说白了就是上游系统 retry,下游系统多副本负载均衡 + 幂等。 只有下游系统有幂等的能力,上游系统才敢执行重试操作, 否则的话就是数据重复写。 那么幂等是什么意思呢, 大白话就是接口自己能判断出当前的请求是不是之前已经发送过的重复数据了,如果是重复数据它是不处理的。 行话就是同样的数据不论计算多少次都不对结果造成影响,此为幂等。

而 kafka 的精准一次性语义中定义了几个级别的模式, 其中有一个叫 exactly once(精准一次性语义,意思是我保证针对一个数据不管你重复发送多少次,服务端都只计算一次) 这种模式就可以解决这个问题。 在 producer 中可以设置幂等和分布式事务相关的参数和代码, 一旦这样设置了,那么就拥有了幂等属性, kafka 内部会根据算法计算出消息的唯一 id,broker 只要查询消息的 id 在之前是否有保存过就可以判断出当前消息是否是重复数据了 (大概是这样,细节没研究过)。 这样客户端就可以肆无忌惮的进行重试而不必担心数据重复计算。

再谈 Flink 的 exactly once

通过讲述 Kafka 的精准一次性语义也就是 exactly once 是为了跟大家讲述什么是数据一致性以及保证数据一致的方法和重要性。 由于 kafka 本身提供了这种特性所以要保证消息传送到 kafka 的数据一致性是比较容易的, 正因为很容易一般不容易出错所以很多团队都忘了去测试这个场景(有时候研发会忘了设置这个参数导致出现 bug,所以最好还是需要测一下)。 当时光保证 kafka 的精准一次性是不行, 我们是一个业务场景, 我们需要的是端到端的一致性, 得是全链路的一致性。 所以现在我们来看看 Flink 这一层怎么做的 exactly once。

Checkpoint

讲到这里就必须要说明一下大名鼎鼎的 checkpoint 了, 基本上 checkpoint 是所有分布式框架都要有的机制,spark 如此 flink 亦如此。checkpoint 就是一种保存我们在计算过程中的数据的方式, 它会根据设置周期性的触发 checkpoint 来保存我们计算的中间结果。 我们还是用 PV 的案例说明:
我们从 Kafka 读取到一条条的消息,从消息中解析出 app_id,然后将统计的结果放到内存中一个 Map 集合,app_id 做为 key,对应的 pv 做为 value,每次只需要将相应 app_id 的 pv 值 +1 后 put 到 Map 中即可。

这里简要说明一下 kafka 的 offset, 这个是消费消息的客户端也就是 consumer 要使用 offset 来记录我已经读取到了消息队列中的哪一条数据, 根据这个 offset 我可以知道下一次我要读取的消息的位置。 即便是程序崩溃了, 只要 offset 能够保存下来就知道恢复后应该从哪个消息开始读取了。 所以在这个机制下,flink 的 Source task 记录了当前消费到 kafka test topic 的所有 partition 的 offset。 所以 flink 会根据策略周期性的触发 checkpoint 事件以流的方式传递给所有的算子, 算子收到 checkpoint 命令后就会把中间状态保存起来, 比如在我们的案例里保存的就是 kafka 的 offset, 比如我们设置每 30s 触发一次 checkpoint, 那么 30s 后 checkpoint 触发,保存的数据为:
chk-100
offset:(0,1000)
pv:(app1,50000)(app2,10000)
该状态信息表示第 100 次 CheckPoint 的时候, partition 0 offset 消费到了 1000,pv 统计结果为(app1,50000)(app2,10000)。 那么如果任务挂了,这时候怎么办?比如:

  • 假如我们设置了三分钟进行一次 CheckPoint,保存了上述所说的 chk-100 的 CheckPoint 状态后,过了十秒钟,offset 已经消费到(0,1100),pv 统计结果变成了(app1,50080)(app2,10020),但是突然任务挂了,怎么办?
  • flink 只需要从最近一次成功的 CheckPoint 保存的 offset(0,1000)处接着消费即可,当然 pv 值也要按照状态里的 pv 值(app1,50000)(app2,10000)进行累加,不能从(app1,50080)(app2,10020)处进行累加,因为 partition 0 offset 消费到 1000 时,pv 统计结果为(app1,50000)(app2,10000)。

上面讲的并行度为 1 的情况, 那么如果并行度是 N 的情况,checkpoint 会在并行的算子里触发,这个时候 Flink 会选择是保持多个 checkpoint 一起执行完后在统一往后运算 (exactly once), 还是选择不去协调,任意一个算子运行完 checkpoint 后就当前线程就继续往下运算 (at least once),因为 at least once 模式会造成并行的算子的 checkpoint 不是同时触发和结束, 所以他们保存的中间态数据有偏差,也就是数据是会不一致。所以如果业务场景有数据强一致性的需求,那么需要将 checkpoint 模式设置为 exactly once。 这里大家能明白了么? 我们通过把 kafka 的 offset 和我们已经计算好的结果都通过 checkpoint 进行保存来防止数据丢失或重复计算的情况。 代码差不多如下:

当然上面是 checkpoint 策略, 在实际开发算子任务的时候,要把什么数据通过 checkpoint 保存到 flink 的 state backend 是需要研调用对应的 state 方法来执行的。

贴一个 checkpoint 的图:

说回数据一致性

好了上面说了那么多东西, 但是好像 kafka 和 Flink 都已经把数据一致性保证好了, 那还需要我们测试什么一致性么? 那不是变成了在测试 kafka 或者 flink 么? 我想一定会有同学这么问, 那么我在这里解释下:

  • 即便 kafka 和 flink 有 exactly once 语义, 但是开启这些语义需要对应的参数调整, 并且需要编码的时候进行处理, 比如 kafka 里在开启了 exactly once 语义后, 也需要研发在代码里显示调用分布式事务进行数据计算, flink 里对于 kafka 的 offset 和计算结果的保存也需要显示在代码里调用类似 valueState 来进行保存和处理。 也就是你们的产品研发同学是否编码正确决定了数据一致性。
  • 在我们的流计算里, flink 上下游都会对接不同的系统, 上游可以是 kafka,也可以是业务系统暴露出来的 socket 服务,也可以其他的源。 所以你在使用非 kafka 也就是没有 exactly once 语义支持的系统的时候,就需要研发去开发相应的方案来解决这个问题。 同理输出放, 流是有数据的源,也有在经过 flink 计算之后输出的系统,这个系统可以是另外一个 kafka,也可以是 mysql, 也可以是业务系统的接口。 那么输出方是否有 exactly once 语义支持呢? 非 kafka 的场景下,基本上也是没有的, 也需要研发来开发对应的方案。 也就是说我程序中 Flink 的 CheckPoint 语义设置了 Exactly Once,但是我在计算的过程中需要实时的把计算结果保存到 mysql 里,那异常出现的时候根据 checkpoint 机制,我们从上一个 checkpoint 记录中保存的 offset 去重新读取并计算消息, 这时候我的 mysql 中看到岂不是看到了数据重复了?比如程序中设置了 1 分钟 1 次 CheckPoint,但是 5 秒向 mysql 写一次数据,并 commit。 所以我们要求的是 Flink 的 end to end 的精确一次都必须实现。如果你的 chk-100 成功了,过了 30 秒,由于 5 秒 commit 一次数据库,所以实际上已经写入了 6 批数据进入 mysql,但是突然程序挂了,从 chk100 处恢复,这样的话,之前提交的 6 批数据就会重复写入,所以出现了重复消费。Flink 的精确一次有两种情况,一个是 Flink 内部的精确一次,一个是端对端的精确一次。 这里面有点绕,我解释的有点啰嗦。

所以根据上面说的,虽然 flink 提供了 exactly once 语义, 但是它的 exactly once 语义只保证 flink 自己的数据计算过程,而不是端到端的。 想要保证数据一致性,还是需要研发同学针对业务场景进行特殊的设计。也就是开发自己产品的 exactly once 语义。 所以我们还是要针对端到端的场景进行测试。

测试的注意事项
  • 首先弄清楚产品中流计算的架构,都有哪些数据源,数据又发送到哪些地方。 这一步至关重要, 因为端到端的数据一致性场景,在这一条流式链条里,任何一个点没有做到精准一次性语义都会导致数据不一致,所以我们要测试所有的点。
  • 完成第一步后在每一个点进行故障注入,故意让任务失败,让服务挂掉, 属于混沌工程式的测试方法, 就是想尽办法让这个流式的链条中的服务出故障来验证数据一致性。 没有测试工具的同学可以去看一下阿里开源的 chaos blade。 注意: 一个场景的故障注入要反复进行,比如 30 分钟内每隔 3 分钟都随机找到一个 flink task manager 进行 kill 来注入故障, 有些时候只注入一次故障是发现不了 bug 的,因为我们是有状态计算,有状态计算的场景很多是在特殊的状态下发生故障才会出错。 所以要反复注入故障来最大概率的触发 bug。
  • 自动化测试中 case 要验证数据一致性的点,比如在 kafka->flink-mysql 的这个场景里,你往数据源 kafka 里灌入了 1000 个消息,如果正确的逻辑是经过计算后要往 mysql 存入 10 条记录, 那么你要去验证这 10 条记录的正确性。 是否有数据丢失或者重复的结算结果出现。

注意: 做这个测试前, 先确定你们是否有数据一致性的强需求。 有些场景真的会觉得数据丢了就丢了。。。。。

结尾

好了写这么多, 今天罗里吧嗦的写了一大堆好像就说了一个数据一致性的测试。 之前在社区跟人讨论的时候,有很多同学其实不赞同这种深入研发架构的测试方式。 而我前两天刷脉脉的时候也在匿名区看到有人发消息, 讨论区里对 qa 是否要测试这种场景有很大的争议。 所以我花了很大的篇幅解释一下做这种测试的必要性。 下次我们将其他的测试方法。

如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
共收到 51 条回复 时间 点赞

测试一下

写的真好!

wennyfly 回复

多谢~~

文章写得很棒!特别是 checkpoint 的部分。 发现个小问题,wordCount 的例子缺少了 try catch,Flink 算子有个很坑的点,一旦没捕捉异常,就会一直重启。 流式计算依赖消息的顺序性,要是消息不能保证顺序,这个点就很蛋疼。 我们用 Flink 做实时计算,同时还把 Kafka 持久化到 Hbase,实时计算出现偏差时,再用 Spark 去离线重算一遍。

花菜 回复

恩恩,因为是在写 demo 就没顾及那么多。 保序的事能用 kafka 自己机制保证一部分么, 或者你们消息的字段里有能作为 eventtime 的时间字段么, 这样在 flink 里设置一下 watermark 的策略应该就还好吧。 毕竟 flink 是能在一定程度上处理乱序的流的。

孙高飞 回复

消息里面有 eventtime,但是有些消息相隔几个小时,甚至几天再上来。因为消息是从硬件开始上报 mqtt,再转 kafka 的。

花菜 回复

那你们实际的业务场景里也不能要求那么高了吧, 应该是设置个迟到事件, 超过 sideOutputLateData 设置,这样迟到的数据应该都不在窗口里处理了。 通过 window.getSideOutput(outputTag) 在流的外面拿到这些迟到事件再做处理。

孙高飞 回复

要求还蛮高,计算结果会在厂家的大屏上显示。 延迟一般都是硬件故障导致的,目前还想不到很好的办法。延迟的消息只能直接在 filter 过滤了,不会计算,所以影响不了当前的计算结果。

提一个 topic,大数据测试一个难点是数据准确性测试,根本性的原因是大数据量的准确性的 test oracle 问题,有什么系统的方法论吗?

rihkddd 回复

我举个例子, 如果是批处理,比如我们的产品中有一个算子是拆分算子, 可以配置不同的参数来把一份数据拆分成 2 份。 比如先排序在拆分这个逻辑就是,先按某个字段排序然后再按比例进行拆分。 这样的话测试的逻辑就比较简单了, 那我们就可以去写一个 spark 脚本, 扫描这两个拆分出来的数据。 验证那个时间字段是否是有序的, 再验证两份数据中的数据行数是否满足算子设置的比例。 这个比较好实现, 一个比较简单的 spark 脚本就可以了。 所以测试方式就是以大数据的方式测试大数据。

花菜 回复

有个不成熟的想法,你们能不能设置一个非常长的迟到时间, 只要不超过这个时间,数据到来后就会触发算子把最新的数据更新到窗口里进行计算。 这样就可以影响大屏上的计算结果了,当然这也不实时的, 毕竟数据迟到了, 但是可以在后面对计算结果进行重新计算。

孙高飞 回复

嗯,多谢解答。这是一种思路,问题点是:

  1. 举例的例子比较简单,在实际问题中,离线计算的指标业务逻辑比较复杂,而难点就在于复杂的逻辑。所以这个例子很难显然的推广到实际的工作场景中。
  2. 算不上大的问题点,就是要求测试人员掌握比较好的大数据数据开发技能,当然从方向上是好的,但是问题也很多,实际也较难具备这种条件。如果没有相关技能,这种方法就不太可行。

这种思路抽象一下,就是不同的人(研发、测试)分别实现需求,然后对比,或者降级一些,测试人员只需要对结果数据做一些性质严重。但是没法解决 test oracle 的问题,例如研发、测试算出来的结果一致,而很有可能是他们都算错了。

孙高飞 回复

这个不科学的,不能因为迟到的一点数据而让整体跟着延迟,就算是可以,中间值也会非常大,吃完了内存。😂

花菜 回复

那就只能用 window.getSideOutput(outputTag) 后续把迟到事件都拿到然后再做处理吧,当然就做不到跟窗口里的数据一起计算的能力了。 可以把整体数据保存到 HDFS 上,然后用批处理的方式计算再更新到数据库里。

孙高飞 回复

嗯嗯,这个应该能解决一部分问题。计算结果有部分是通过 kafka 发出去的,存在 redis 的部分应该能这么搞

rihkddd 回复

对于第一个问题是,如果你不了解复杂的业务, 那么给你什么测试工具和测试方法你都是测不了的。 对于第二个问题是,大数据测试就是有门槛的, 不懂大数据还硬要来测本身就是不行的。

最后我感觉你已经钻进牛角尖里了,不想懂业务也不想懂技术就能保证产品质量这事本身就是不可能的。 碰到难点了要去解决难点,而不是通过降低测试人员的责任范围来骗自己。 就像我们招聘一定是招聘懂大数据的测试人员而不是说这个大数据场景我们就测个皮,深入的我们不测了所以就招一些不懂大数据也不懂业务的人来做。

PS: 不要再用 test oracle 这种看上去高大上实际上没什么用的名词了。 老拽这些词本身就会引起他人反感的。

孙高飞 回复

先回拽专用名次的事情,这是一个测试论坛,test oracle 算不上高大上,但是没有很好的对应翻译,所以才用,不知道你为啥反感。按照你的逻辑我是不是也要反感一下 shuffle。不妨讲讲你是为啥感到反感?

其次我也是技术派拥护者,只是我没有局限在技术里面,你到管理的位置上不要思考团队人员配置吗?精英配置是好,但是具有普适性吗? 我在讨论一个大数据测试领域一个很具体的问题,想探讨一下你的方法论,结果就是了解复杂业务、学大数据技术。这是你的方法论,我指出问题点就是作为方法论它缺少普适性,而且没有根本性上解决问题。

rihkddd 回复

不杠了, 你高兴就好

点开之后,默默地点了 "x"😂 😂 😂

闲云野鹤 回复

这个 “x” 是什么情况。。。。

@ 孙高飞 尴尬,文章看不懂,默默地点了关闭按钮😂 😂 😂

@ 孙高飞 先收藏,万一以后能看懂了呢,嘿嘿😀

——由于 kafka 本身提供了这种特性所以要保证消息传送到 kafka 的数据一致性是比较容易的, 正因为很容易一般不容易出错所以很多团队都忘了去测试这个场景(有时候研发会忘了设置这个参数导致出现 bug,所以最好还是需要测一下)—— 不是忘了,而是这种属于优先级很低的场景了。应该把有限的测试资源首先放在高优先级的测试场景上。

simple 将本帖设为了精华贴 12月18日 09:51

输入是 A,期望输出是 E,开发的实现过程是 A—>B—>C—>D—>E。 开发的实现属于黑盒内部逻辑,测试要做的是输入 A,并检验实际输出是否为 E。 而不是检验如下中间过程:A—>B,A—>C,A—>D,B—>C,B—>D,B—>E,C—>D,…… 把被测产品当做整体看待很重要,也是我们的测试目标。中间使用到的产品,都不是测试目标。目标要明确。

Thirty-Thirty 回复

你又来杠了,那杠就是你赢,你开心就行。既然跟上次一样一杆子就否定了包括白盒测试在内的各种测试理论。那注定咱俩谁都说服不了谁, 你继续在你的业务上做黑盒测试, 我继续在我的业务上做测试开发, 咱们井水不犯河水, 没必要非争出个子丑寅卯出来。 让时间来证明谁的观点会走的更高更远。 文章和评论也都放在这里,让同行们来自选选择走哪一条路

目前大数据测试,行情如何?

mark,有时间学习,感谢大佬

另外想问一下,Flink 是怎么实现自动化部署的,我们目前 Flink 是运行在 Yarn 集群上面,发布的时候需要人工先保存中间值,然后再启动。

lv1792017548 回复

应该挺不错的吧, 很多大厂的 JD 里都开始出现有要大数据技术栈的技术需求了。 能面上一般都是差不多 p7 的级别了

花菜 回复

保存中间值的这个操作说的是 save point 吧, 这个就是 flink 的命令行工具, 一般为了保护正在计算的数据, 都避免不了吧。 不过理论上应该能自动化, 只是我也没操作过, 毕竟我不是运维

终于看完了,大数据只了解皮毛,勉强能跟上

好奇问一个点,这里提到测试是否有达到数据一致性的方法,是注入故障。那具体怎么注入才能保障能做到文中期望的端到端上每个点的 exactly once 做到位,避免遗漏?

rihkddd 回复

对于你提到的这两个问题,我曾经呆过公司里看到的情况是,大数据团队都是自测上线,靠计算结果的数据监控来发现问题和修正问题的。测试团队基本不介入这部分测试。

所以我是基本认同飞哥的观点,不了解背后的技术或者复杂的业务,是做不好大数据测试的,至少很难让大数据研发团队信服你的结果。

另外,谷歌了下 test oracle ,有对应中文名称,叫做 测试准则 。不过这个词的解释我看了好久也没看明白,它涵盖的范围有点大。所以也很少用。

陈恒捷 回复

多谢解答,据我的了解,你说的大数据团队自测也是一种很普遍的选择。

我说一下我对 test oracle 的了解,“测试准则” 不太能表示它的意思,它是指到底怎么判断测试结果对或者不对,oracle 对应的翻译(神谕)更能表明这层含义,举个例子,比如你测试一个函数实现了加法,大部分人一看就知道对不对,又或者点某个按钮要弹出一个提示,这种就不存在 test oracle 的问题,它的正确性对绝大部分人都是显而易见的。但是涉及到大数据数据的准确性/正确性,这个就很难。例如,你算出一个指标你甚至很难在数量级上肯定它对不对,更别说它的准确性/正确性了。所以我说 test oracle 是大数据测试的一个根本性问题。即使你掌握了比大数据开发工程师更多的技能,当领导问你:你怎么确定你测试过的的数据是对的,你其实依然无法保证,本质上你只是把开发的流程走了一遍,那怎么保证你自己不出错呢?自己说自己开发的东西没 bug,这不符合基本的逻辑。

最后我没有说不需要了解业务和学习技术,我知道这两点的重要性。但是回到我讨论的这个问题,了解业务、学习技术,是能提高正确性的可能,但还是没从根本上解决问题。如果这是正确的解决方案,我相信对数据准确度要求高的团队无疑可以招聘两倍的大数据研发来解决问题。

目前公司也是使用 Flink,场景也比较多,链路借助的工具也很多,也跟部门的老员工请教他们之前的大数据是怎么测试的,哈没取到有用的经。就公司的业务而言,业务->kafka->Flink->数据库【redis, clickhouse,hbase,mysql 等】,业务有日志,有计算等,我也想通过一个有效的测试流程跟有效的测试方式对自己开发的功能进行测试,避免线上出现问题。数据的一致性我认为数据源【得清楚产生的数据是那些数据】,根据业务的需求,入库的数据准确即可,也就是数据源->经过处理->入库,验证的就是数据源跟入库的数据是否丢失,中间环节目前不怎么 care,因为这块很多都是借助工具,默认都是工具无 bug,只是尽可能多的设计相关的异常场景去验证流程,保证数据一致性。就比如,开发数据存储到 redis 的数据是永久性的,测试有时不知道都没注意,最终就是很快出现服务器内存预警,坏点的没有运维的就是戎机了

MmoMartin 回复

有点没太 get 到你的点, 你应该是研发同学吧? Flink 有个单测框架你可以试试看行不行~ 其他的我看你的描述里也有往数据源 push 数据然后验证计算出的结果是不是对的。 你也说了你会模拟异常场景, 我理解的就是注入故障吧。

陈恒捷 回复

就是针对端到端的每个点都注入故障么, 比如我输入的数据和预期的数据计算的结果都已经固定到 case 中了, 那么就循环跑这些 case, 在跑 case 的同时, 挨个的去给每一个端注入故障, 最后验证 case 不失败就行了。

rihkddd 回复

当领导问你:你怎么确定你测试过的的数据是对的

我理解你说的这个,是测试出来的结果和数据使用方的数据口径是否一致的问题,类似一般功能里的 “是否满足需求” 问题? 也想听下你的想法。

rihkddd 回复

但是没法解决 test oracle 的问题,例如研发、测试算出来的结果一致,而很有可能是他们都算错了。

深表认同。

花菜 回复

可以打包,然后调用 flink 的程序运行命令就可以了,公司目前就是这样做的,有新包更新的 GITLAB,会自动执行重启服务的程序,自动更新功能

孙高飞 回复

不不不 我算半个测试,心不在开发,在测试。大数据的测试看你总结的就 3 个点。但是我们测试往往想不到更多的故障注入,因为涉及的工具太多了,咋怎?数据的一致性,我们测试的需不需要也借助 FLINK 或 SPARK 进行处理得出结论跟打开的结论对照【但是存在我们写的业务逻辑也许有 BUG,也费时】,还是一般都是自造数据,知道数据结果直接查库验证【如对 json 的某个字段的值的计算,我们造 100W 条,然后验证库,但是对于复杂的业务时又该何去何从呢】?我们测试时需要怎么保证这个过程的?希望大佬可以指点指点

陈恒捷 回复

对,你说的这个点是造成准确性难保证的一个原因。这个问题其实很难解决,在于出需求的人对需求的描述是自然语言,而开发的人用的是计算机语言,这个转换难以避免出现误解,所以一个方向就是降低写批处理/流处理的难度,设立数据产品经理(掌握一定的数据开发技能解决)需求方直接写,避免转换过程出现问题,也有一些前沿方向是自然语言到 hsql 的生成,但现在还很初级。

但是除去这个原因,假设研发/测试都正确的理解了数据需求,依然不能保证数据的正确性,这里面的原因多种多样,比如对数据的假设不成立/处理逻辑有 bug/大数据基础设施有问题。所以我说这个问题的根本在于人没法判断数据对或不对。

这个问题我思考了挺长时间了,也有一些想法和结果,有机会我单独写一写。

MmoMartin 回复

这个文章我还没写完, 后面会写一些功能测试的东西, 现在这一个主要还是讲一致性。 我是不建议自己写 spark 或者 flink 把产品功能开发一遍再跟研发对比的, 还是使用你后面说的自造数据, 验证数据计算的结果是否是正确的。 至于你说的面对复杂业务时该怎么办, 我认为是没有捷径的, 你只能去了解这个复杂的业务然后根据业务需求造数据, 再验证计算结果。

当然你也可以看一个叫做 flink-spector 的东西。 专门给 flink 做测试的框架, 需要在研发的 repo 里用 unit test 的形式编写

孙高飞 回复

期待你在后续文章里着重描述下对计算结果的验证问题,相信这也是@rihkddd关注的问题。

MmoMartin 回复

如果你不排斥在研发的 repo 里以写 UT 的形式去做集成测试的话, 可以去测一测 UDF, 我们这边开发了有百余个 UDF, 大多都是 QA 来负责编写测试用例的。 UDF 测试完可以测 stream, 都是属于深入研发的代码中进行测试的手段了, 目前这是我们这边主要的功能测试方法了, 我下一篇文章里会写。

坐等更新

李墩儿 回复

已更新

孙高飞 回复

好嘞

一字一句读完,没跟得上,在线自卑。。。

完了。。我咋觉得我看评论比看文章来得有意思得多呢。。。。我堕落了,但是我还是要说一句 ,真香🤐

对流计算的具体流程有了概念,但是具体测试手段还是有些不清晰,主要疑问点是 checkPoint 的间隔时间点内去多次注入故障,以求出现数据重复类的问题,可是大数据量的情况下,怎么去验证数据重复的问题出现呢?现在项目也在用
sql-》kafka-》flink》sql 的流程,是不是可以直接通过两端此时间段内的数据量差别来进行确定呢?

需要 登录 后方可回复, 如果你还没有账号请点击这里 注册