之前写 spark 的时候中途断掉跑去写 docker 和深度学习了。 现在捡起来把之前想写的东西补上。 建议各位同学也可以翻一下我之前的文章, 前三篇讲述的是 spark 相关的基础,如果前三篇文章的内容没好好看一下的话,可能是看不懂这次的文章的。 spark 作为现在主流的分布式计算框架,已经融入到了很多的产品中作为 ETL 的解决方案。 而我们如果想要去测试这样的产品就要对分布式计算的原理有个清晰的认知并且也要熟悉分布式计算框架的使用来针对各种 ETL 场景设计不同的测试数据。 而一般来说我们需要从以下两个角度来进行测试。
ETL 是按一定规则针对数据进行清洗,抽取,转换等一系列操作的简写。那么一般来说他要能够处理很多种不同的数据类型。 我们在生产上遇见的 bug 有很大一部分占比是生产环境遇到了比较极端的数据导致我们的 ETL 程序无法处理。 比如:
针对上面说的一些数据场景我挑几个重要的说一下:
前面的文章说过,在分布式计算中,一份数据是由多个散落在 HDFS 上的文件组成的, 这些文件可能散落在不同的机器上, 只不过 HDFS 会给使用者一个统一的视图,让使用者以为自己在操作的是一个文件,而不是很多个文件。 这是 HDFS 这种分布式文件系统的存储方式。 而各种分布式计算框架, 比如 hadoop 的 MapReduce,或者是 spark。 就会利用这种特性,直接读取散落在各个机器上文件并保存在那个节点的内存中 (理想状态下,如果资源不够可能还是会发生数据在节点间迁移)。 而读取到内存中的数据也是分片的 (partition)。 spark 默认以 128M 为单位读取数据,如果数据小于这个值会按一个分片存储,如果大于这个值就继续往上增长分片。 比如一个文件的大小是 130M, spark 读取它的时候会在内存中分成两个 partition(1 个 128M,1 个 2M)。 如果这个文件特别小,只有 10M,那它也会被当做一个 partition 存在内存中。 所以如果一份数据存放在 HDFS 中,这个数据是由 10 个散落在各个节点的文件组成的。 那么 spark 在读取的时候,就会至少在内存中有 10 个 partition, 如果每个文件的大小都超过了 128M,partition 的数量会继续增加。
而在执行计算的时候,这些存储在多个节点内存中的数据会并发的执行数据计算任务。 也就是说我们的数据是存放在多个节点中的内存中的, 我们为每一个 partition 都执行一个计算任务。 所以我们针对一个特别大的数据的计算任务, 会首先把数据按 partition 读取到不同节点的不同的内存中, 也就是把数据拆分成很多小的分片放在不同机器的内存中。 然后分别在这些小的分片上执行计算任务。 最后再聚合每个计算任务的结果。 这就是分布式计算的基本原理。 如果有同学还是比较懵,快去翻之前我写的 3 篇 spark 的基础文章。
那么这个时候问题就来了, 这种按 partition 为单位的分布式计算框架。partition 的数量决定着并发的数量。 可以理解为,如果数据有 100 个 partition,就会有 100 个线程针对这份数据做计算任务。所以 partition 的数量代表着计算的并行程度。 但是不是说 partition 越多越好,如果明明数据就很小, 我们却拆分了大量的 partition 的话,反而是比较慢的。 而且所有分片的计算结果最后是要聚合在一个地方的。 这些都会造成网络 IO 的开销 (因为数据是在不同的节点之前传输的)。 尤其是在分布式计算中,我们有 shuffle 这个性能杀手 (不熟悉这个概念的同学请看我之前的文章)。 在大量的分片下执行 shuffle 将会是一个灾难,因为大量的网络 IO 会导致集群处于很高的负载甚至瘫痪。 我们曾经碰见过只有 500M 但是却有 7000 个分片的数据,那一次的结果是针对这个数据并行执行了多个 ETL 程序后,整个 hadoop 集群瘫痪了。 这是在数据预处理的时候忘记做 reparation(重新分片) 的结果。 所以很多很小的,或者是碎片化的文件在 HDFS 中是一个很影响性能的问题。
所以说了这么多我们的测试点就来了, 针对各种 ETL 程序, 专门制造这种拥有大量分片的数据。比如测试同样的数据量, 但是分片分别是 100,500,1000。 验证各个 ETL 程序在这种数据下的表现。
在我之前的文章中讲过什么是 shuffle 和数据倾斜。这里简单的回顾一下。
在上面的任务处理中出现了 shuffle 的操作。shuffle 也叫洗牌, 在上面讲 partition 和分布式计算原理的时候,我们知道分布式计算就是把数据划分很多个数据片存放在很多个不同的节点上, 然后在这些数据片上并发执行同样的计算任务来达到分布式计算的目的,这些任务互相是独立的, 比如我们执行一个 count 操作, 也就是计算这个数据的行数。 实际的操作其实是针对每个数据分片,也就是 partition 分别执行 count 的操作。 比如我们有 3 个分片分别是 A,B,C, 那执行 count 的时候其实是并发 3 个线程,每个线程去计算一个 partition 的行数, 他们都计算完毕后,再汇总到 driver 程序中, 也就是 A,B,C 这三个计算任务的计算过程是彼此独立互不干扰的,只在计算完成后进行聚合。 但并不是所有的计算任务都可以这样独立的,比如你要执行一个 groupby 的 sql 操作。 就像上面的图中,我要先把数据按单词分组,之后才能做其他的统计计算, 比如统计词频或者其他相关操作。 那么首先 spark 要做的是根据 groupby 的字段做哈希,相同值的数据传送到一个固定的 partition 上。 这样就像上图一样,我们把数据中拥有相同 key 值的数分配到一个 partition, 这样从数据分片上就把数据进行分组隔离。 然后我们要统计词频的话,只需要才来一个 count 操作就可以了。 shuffle 的出现是为了计算能够高效的执行下去, 把相似的数据聚合到相同的 partition 上就可以方便之后的计算任务依然是独立隔离的并且不会触发网络 IO。 这是方便后续计算的设计模式,也就是节省了后续一系列计算的开销。 但代价是 shuffle 本身的开销,而且很多情况下 shuffle 本身的开销也是很大的。 尤其是 shuffle 会因为数据倾斜而出现著名的长尾现象。
好, 说了这么多终于说到数据倾斜了。 根据 shuffle 的理论,相似的数据会聚合到同一个 partition 上。 但是如果我们的数据分布不均匀会出现什么情况呢? 比如我们要针对职业这个字段做 groupby 的操作, 但是如果 100W 行数据中有 90W 行的数据都是程序员这个职业的话, 会出现什么情况? 你会发现有 90W 行的数据都跑到了同一个 partition 上造成一个巨大的 partition。这样就违背了分布式计算的初衷, 分布式计算的初衷就是把数据切分成很多的小数据分布在不同的节点内存中,利用多个节点的并行计算能力来加速计算过程。 但是现在我们绝大部分的数据都汇聚到了一个 partition 中,这样就又变成了单点计算。 而且这里还有一个特别大的问题, 就是我们在提交任务到 hadoop yarn 上的时候,申请的资源是固定切平均分配的。 比如我申请 10 个 container 去计算这份数据,那这 10 个 container 的资源是相等的,哪个也不多,哪个也不少。 但是我们的数据分片的大小却是不一样的, 比如 90W 行的分片需要 5 个 G 的内存,但是其他的数据分片可能 1 个 G 就够了。 所以如果我们不知道有数据倾斜的情况出现而导致申请的资源教少,就会导致任务 OOM 而挂掉。 而如果我们为了巨大的数据分片为每个 container 都申请了 5G 的资源, 那又造成了资源浪费。
数据倾斜和 shuffle 是业界经典难题,很难处理。 在很多大数据产品中都会有根据数据大小自动调整申请资源的功能。而数据倾斜就是这种功能绝对的天敌。 处理不好的话,要不会变成申请过大资源撑爆集群,要不会申请过小资源导致任务挂掉。 而我们在测试阶段要做的,就是模拟出这种数据倾斜的数据, 然后验证 ETL 程序的表现。
好了,上面两个最难解释的已经讲完了, 宽表就很简单了,列数太多的表就是宽表。比如我见过的最宽的表是 1W 列的, 尤其在机器学习系统中, 由于要抽取高维特征, 所以在 ETL 阶段经常会把很多的表拼接成一个很大的宽表。这种宽表是数据可视化的天敌,比如我们的功能是可以随机预览一份数据的 100 行。 那 100*1W 这样的数据量要传输到前端并渲染就是个很费事的操作了。尤其是预览本身也是要执行一些计算的。如果加上这份数据本来就有海量分片的话, 要在后台打开这么多的文件,再加上读取这么宽的表的数据。 甚至有可能 OOM, 实际上我也确实见过因为这个原因 OOM 的。 所以这个测试点就是我们故意去造这样的宽表进行测试。
其他的数据类型不一一解释了, 都跟字面的意思差不多。
既然我们知道了在测试的时候要用各种不同的数据进行测试,所以我们必然要有一款造数工具来帮我创建这些数据。 这里选型上我使用的也是 spark。 之所以也使用 spark 这种分布式框架来造数,而不是单独使用 parquet 或者 hdfs 的 client 是因为我们造的数据除了要符合一些极端场景外,也要保证要有足够的数据量, 毕竟 ETL 都是面对大数据场景的。 所以利用 spark 的分布式计算的优势可以在短时间内创建大量数据。 比如我前两天造过一个 1 亿行,60 个 G 的数据,只用了 20 分钟。
看过我之前 3 篇文章的同学应该都知道 RDD 是什么了,RDD 是 spark 的分布式数据结构。 我们刚才说的一份数据被 spark 读取后会就生成一个 RDD,当然 RDD 就包含了那些 partition。 我们创建 RDD 的方式有两种, 一种是从一个已有的文件中读取 RDD,当然这不是我们想要的效果。 所以我们使用第二种, 从内存中的一个 List 中生成 RDD。 如下:
public class Demo {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("data produce")
.setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.getOrCreate();
List data = new XRange(1000);
JavaRDD distData = sc.parallelize(data, 100);
上面是我写的一个 demo,前面初始化 spark conf 和 spark session 的代码可以先忽略不用管。 主要看最后两行, XRange 是我仿照 python 的 xrange 设计的类。 可以帮我用类似生成器的原理创建一个带有 index 序列的 List。 其实这里我们手动创建一个 list 也行。 而最后一行就是我们通过 spark 的 API 把一个 List 转换成一个 RDD。sc.parallelize 的第一个参数是 List,而第二个参数就是你要设置的并行度, 也可以理解为你要生成这个数据的 partition 的数量。 其实如果我们现在想生成这一千行的只有 index 的数据的话, 再调用这样一个 API 就可以了:distData.saveAsTextFile("path"); 通过这样一个 API 就可以直接保存文件了。 当然这样肯定不是我们想要的,因为里面还没有我们要的数据。 所以这个时候我们要出动 spark 的一个高级接口,dataframe。 dataframe 是 spark 仿照 pandas 的 dataframe 的设计开发的高级 API。 功能跟 pandas 很像, 我们可以把一个 dataframe 就当做一个表来看, 而它也有很多好用的 API。 最重要的是我们有一个 DataframeWriter 类专门用来讲 dataframe 保存成各种各样格式和分区的数据的。 比如可以很方便的保存为 scv,txt 这种传统数据, 可以很方便保存成 parquet 和 orc 这种列式存储的文件格式。 也提供 partition by 的操作来保存成分区表或者是分桶表。总之它能够帮我们造出各种我们需要的数据。 那么我们如何把一个 RDD 转换成我们需要的 dataframe 并填充进我们需要的数据呢。 往下看:
List<StructField> fields = new ArrayList<>();
String schemaString = "name,age";
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
fields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
StructType schema = DataTypes.createStructType(fields);
// Convert records of the RDD (people) to Rows
JavaRDD rowRDD = distData.map( record ->{
RandomStringField randomStringField = new RandomStringField();
randomStringField.setLength(10);
BinaryIntLabelField binaryIntLabelField = new BinaryIntLabelField();
return RowFactory.create(randomStringField.gen(), binaryIntLabelField.gen());
});
Dataset dataset =spark.createDataFrame(rowRDD, schema);
dataset.persist();
dataset.show();
DataFrameWriter writer = new DataFrameWriter(dataset);
writer.mode(SaveMode.Overwrite).partitionBy("age").parquet("/Users/sungaofei/gaofei");
dataframe 中每一个数据都是一行,也就是一个 Row 对象,而且 dataframe 对于每一列也就是每个 schema 有着严格的要求。 因为它是一个表么。所以跟数据库的表或者 pandas 中的表是一样的。要规定好每一列的 schema 以及每一行的数据。 所以首先我们先定义好 schema, 定义每个 schema 的列名和数据类型。 然后通过 DataTypes 的 API 创建 schema。 这样我们的列信息就有了。 然后是关键的我们如何把一个 RDD 转换成 dataframe 需要的 Row 并且填充好每一行的数据。 这里我们使用 RDD 的 map 方法, 其实 dataframe 也是一个特殊的 RDD, 这个 RDD 里的每一行都是一个 ROW 对象而已。 所以我们使用 RDD 的 map 方法来填充我们每一行的数据并把这一行数据转换成 Row 对象。
JavaRDD rowRDD = distData.map( record ->{
RandomStringField randomStringField = new RandomStringField();
randomStringField.setLength(10);
BinaryIntLabelField binaryIntLabelField = new BinaryIntLabelField();
return RowFactory.create(randomStringField.gen(), binaryIntLabelField.gen());
});
因为之前定义 schema 的时候只定义了两列, 分别是 name 和 age。 所以在这里我分别用一个随机生成 String 类型的类和随机生成 int 类型的类来填充数据。 最后使用 RowFactory.create 方法来把这两个数据生成一个 Row。 map 方法其实就是让使用者处理每一行数据的方法, record 这个参数就是把行数据作为参数给我们使用。 当然这个例子里原始 RDD 的每一行都是当初生成 List 的时候初始化的 index 序号。 而我们现在不需要它, 所以也就没有使用。 直接返回随机字符串和 int 类型的数。 然后我们有了这个每一行数据都是 Row 对象的 RDD 后。 就可以通过调用下面的 API 来生成 dataframe。
Dataset dataset =spark.createDataFrame(rowRDD, schema);
分别把 row 和 schema 传递进去,生成 dataframe 的表。 最后利用 DataFrameWriter 保存数据。
好了, 这就是造数的基本原理了, 其实也是蛮简单的。 当然要做到严格控制数据分布,数据类型,特征维度等等就需要做很多特殊的处理。 这里就不展开细节了。
刚才一直在说的是如何生成数据来测试 ETL 程序是否能够正常处理各种不同类型的数据的。 那么下面要讲的就是如何测试处理 ETL 程序的正确性了。 也就是它是否按我们期望的逻辑将数据进行清洗,提取,拼接等操作。 也即是说这是功能测试, 原理上跟我们传统的测试思路是一样的。 输入一份数据,然后判断输出的数据是否是正确的。 只不过我们这是在大数据量下的处理和测试,输入的数据是大数据,ELT 输出的也是大数据, 所以就需要一些新的测试手段。 其实这个测试手段也没什么新奇的了, 是我们刚才一直在讲的技术,也就是 spark 这种分布式计算框架。 我们以 spark 任务来测试这些 ETL 程序,这同样也是为了测试自身的效率和性能。 如果单纯使用 hdfs client 来读取文件的话, 扫描那么大的数据量是很耗时的,这是我们不能接受的。 所以我们利用大数据技术来测试大数据功能就成为了必然。 当然也许有些同学会认为我只是测试功能么,又不是测试算法的处理性能,没必要使用那么大的数据量。 我们用小一点的数据,比如一百行的数据就可以了。 但其实这也是不对的, 因为在分布式计算中, 大数量和小数据量的处理结果可能不是完全一致的, 比如随机拆分数据这种场景在大数据量下可能才能测试出 bug。 而且大数据测试还有另外一种场景就是数据监控, 定期的扫描线上数据,验证线上数据是否出现异常。 这也是一种测试场景,而且线上的数据一定是海量的。
废话不多说,直接看下面的代码片段。
@Features(Feature.ModelIde)
@Stories(Story.DataSplit)
@Description("使用pyspark验证随机拆分中的分层拆分")
@Test
public void dataRandomFiledTest(){
String script = "# coding: UTF-8\n" +
"# input script according to definition of \"run\" interface\n" +
"from trailer import logger\n" +
"from pyspark import SparkContext\n" +
"from pyspark.sql import SQLContext\n" +
"\n" +
"\n" +
"def run(t1, t2, context_string):\n" +
" # t2为原始数据, t1为经过数据拆分算子根据字段分层拆分后的数据\n" +
" # 由于数据拆分是根据col_20这一列进行的分层拆分, 所以在这里分别\n" +
" # 对这2份数据进行分组并统计每一个分组的计数。由于这一列是label\n" +
" # 所以其实只有两个分组,分别是0和1\n" +
" t2_row = t2.groupby(t2.col_20).agg({\"*\" : \"count\"}).cache()\n" +
" t1_row = t1.groupby(t1.col_20).agg({\"*\" : \"count\"}).cache()\n" +
" \n" +
" \n" +
" t2_0 = t2_row.filter(t2_row.col_20 == 1).collect()[0][\"count(1)\"]\n" +
" t2_1 = t2_row.filter(t2_row.col_20 == 0).collect()[0][\"count(1)\"]\n" +
" \n" +
" t1_0 = t1_row.filter(t1_row.col_20 == 1).collect()[0][\"count(1)\"]\n" +
" t1_1 = t1_row.filter(t1_row.col_20 == 0).collect()[0][\"count(1)\"]\n" +
" \n" +
" # 数据拆分算子是根据字段按照1:1的比例进行拆分的。所以t1和t2的每一个分组\n" +
" # 都应该只有原始数据量的一半\n" +
" if t2_0/2 - t1_0 >1:\n" +
" raise RuntimeError(\"the 0 class is not splited correctly\")\n" +
" \n" +
" if t2_1/2 - t1_1 >1:\n" +
" raise RuntimeError(\"the 1 class is not splited correctly\")\n" +
"\n" +
" return [t1]";
我们用来扫描数据表的 API 仍然是我们之前提到的 dataframe。上面的代码片段是我们嵌入 spark 任务的脚本。 里面 t1 和 t2 都是 dataframe, 分别代表原始数据和经过数据拆分算法拆分后的数据。 测试的功能是分层拆分。 也就是按某一列按比例抽取数据。 比如说 100W 行的数据,我按 job 这个字段分层拆分, 我要求的比例是 30%。 也即是说每种职业抽取 30% 的数据出来,相当于这是一个数据采样的功能。 OK, 所以在测试脚本中,我们分别先把原始表和经过采样的表按这一列进行分组操作, 也就是 groupby(col_20)。 这里我选择的是按 col_20 进行分层拆分。 根据刚才讲的这样的分组操作后会触发 shuffle,把有相同职业的数据传到一个数据分片上。 然后我们做 count 这种操作统计每一个组的行数。 因为这个算法我是按 1:1 拆分的,也就是按 50% 采样。 所以最后我要验证拆分后的数据的每一组的行数都是原始数据中该组的一半。
那么上面就是一个简单的 ETL 的测试场景和测试脚本了。
先写到这里吧,不早了。 最近总熬夜, 身体快扛不住了。 趁着周末我早睡会。