京喜达技术部在社区团购场景下采用 JDQ+Flink+Elasticsearch 架构来打造实时数据报表。随着业务的发展 Elasticsearch 开始暴露出一些弊端,不适合大批量的数据查询,高频次深度分页导出导致 ES 宕机、不能精确去重统计,多个字段聚合计算时性能下降明显。所以引入 ClickHouse 来处理这些弊端。
数据写入链路是业务数据 (binlog) 经过处理转换成固定格式的 MQ 消息,Flink 订阅不同 Topic 来接收不同生产系统的表数据,进行关联、计算、过滤、补充基础数据等加工关联汇总成宽表,最后将加工后的 DataStream 数据流双写入 ES 和 ClickHouse。查询服务通过 JSF 和物流网关对外暴露提供给外部进行展示,由于 ClickHouse 将所有计算能力都用在一次查询上,所以不擅长高并发查询。我们通过对部分实时聚合指标接口增加缓存,或者定时任务查询 ClickHosue 计算指标存储到 ES,部分指标不再实时查 ClickHouse 而是查 ES 中计算好的指标来抗住并发,并且这种方式能够极大提高开发效率,易维护,能够统一指标口径。
在引入 ClickHouse 过程中经历各种困难,耗费大量精力去探索并一一解决,在这里记录一下希望能够给没有接触过 ClickHouse 的同学提供一些方向上的指引避免多走弯路,如果文中有错误也希望多包含给出指点,欢迎大家一起讨论 ClickHouse 相关的话题。本文偏长但全是干货,请预留 40~60 分钟进行阅读。
前文说到遇到了很多困难,下面这些遇到的问题是本文讲述的重点内容。
在选择表引擎以及查询方案之前,先把需求捋清楚。前言中说到我们是在 Flink 中构造宽表,在业务上会涉及到数据的更新的操作,会出现同一个业务单号多次写入数据库。ES 的 upsert 支持这种需要覆盖之前数据的操作,ClickHouse 中没有 upsert,所以需要探索出能够支持 upsert 的方案。带着这个需求来看一下 ClickHouse 的表引擎以及查询方案。
ClickHouse 有很多表引擎,表引擎决定了数据以什么方式存储,以什么方式加载,以及数据表拥有什么样的特性。目前 ClickHouse 表引擎一共分为四个系列,分别是 Log、MergeTree、Integration、Special。
Log、Special、Integration 主要用于特殊用途,场景相对有限。其中最能体现 ClickHouse 性能特点的是 MergeTree 及其家族表引擎,也是官方主推的存储引擎,几乎支持所有 ClickHouse 核心功能,在生产环境的大部分场景中都会使用此系列的表引擎。我们的业务也不例外需要使用主键索引,日数据增量在 2500 多万的增量,所以 MergeTree 系列是我们需要探索的目标。
MergeTree 系列的表引擎是为插入大量数据而生,数据是以数据片段的形式一个接一个的快速写入,ClickHouse 为了避免数据片段过多会在后台按照一定的规则进行合并形成新的段,相比在插入时不断的修改已经存储在磁盘的数据,这种插入后合并再合并的策略效率要高很多。这种数据片段反复合并的特点,也正是 MergeTree 系列 (合并树家族) 名称的由来。为了避免形成过多的数据片段,需要进行批量写入。MergeTree 系列包含 MergeTree、ReplacingMergeTree、CollapsingMergeTree、VersionedCollapsingMergeTree、SummingMergeTree、AggregatingMergeTree 引擎,下面就介绍下这几种引擎。
MergeTree 支持所有 ClickHouse SQL 语法。大部分功能点和我们熟悉的 MySQL 是类似的,但是有些功能差异比较大,比如主键,MergeTree 系列的主键并不用于去重,MySQL 中一个表中不能存在两条相同主键的数据,但是 ClickHouse 中是可以的。
下面建表语句中,定义了订单号,商品数量,创建时间,更新时间。按照创建时间进行数据分区,orderNo 作为主键 (primary key),orderNo 也作为排序键 (order by),默认情况下主键和排序键相同,大部分情况不需要再专门指定 primary key,这个例子中指定只是为了说明下主键和排序键的关系。当然排序键可以与的主键字段不同,但是主键必须为排序键的子集,例如主键 (a,b), 排序键必须为 (a,b, , ),并且组成主键的字段必须在排序键字段中的最左侧。
CREATE TABLE test_MergeTree ( orderNo String, number Int16, createTime DateTime, updateTime DateTime) ENGINE = MergeTree() PARTITION BY createTimeORDER BY (orderNo) PRIMARY KEY (orderNo);insert into test_MergeTree values('1', '20', '2021-01-01 00:00:00', '2021-01-01 00:00:00');insert into test_MergeTree values('1', '30', '2021-01-01 00:00:00', '2021-01-01 01:00:00');
注意这里写入的两条数据主键 orderNo 都是 1 的两条数据,这个场景是我们先创建订单,再更新了订单的商品数量为 30 和更新时间,此时业务实际订单量为 1,商品件量是 30。
插入主键相同的数据不会产生冲突,并且查询数据两条相同主键的数据都存在。下图是查询结果,由于每次插入都会形成一个 part,第一次 insert 生成了 1609430400_1_1_0 数据分区文件,第二次 insert 生成了 1609430400_2_2_0 数据分区文件,后台还没触发合并,所以在 clickhouse-client 上的展示结果是分开两个表格的 (图形化查询工具 DBeaver、DataGrip 不能看出是两个表格,可以通过 docker 搭建 ClickHouse 环境通过 client 方式执行语句,文末有搭建 CK 环境文档)。
预期结果应该是 number 从 20 更新成 30,updateTime 也会更新成相应的值,同一个业务主键只存在一行数据,可是最终是保留了两条。Clickhouse 中的这种处理逻辑会导致我们查询出来的数据是不正确的。比如去重统计订单数量,count(orderNo),统计下单件数 sum(number)。
下面尝试将两行数据进行合并。
进行强制的分段合并后,还是有两条数据,并不是我们预期的保留最后一条商品数量为 30 的数据。但是两行数据合并到了一个表格中,其中的原因是 1609430400_1_1_0,1609430400_2_2_0 的 partitionID 相同合并成了 1609430400_1_2_1 这一个文件。合并完成后其中 1609430400_1_1_0,1609430400_2_2_0 会在一定时间 (默认 8min) 后被后台删除。下图是分区文件的命名规则,partitionID:1609430400 = 2021-01-01 00:00:00,MinBolckNum、MaxBolckNum:是最小数据块最大数据块,是一个整形自增的编号。Level:0 可以理解为分区合并过的次数,默认值是 0,每次合并过后生成的新的分区后会加 1。
综合上述,可以看出 MergeTree 虽然有主键,但并不是类似 MySQL 用来保持记录唯一的去重作用,只是用来查询加速,即使在手动合并之后,主键相同的数据行也仍旧存在,不能按业务单据去重导致 count(orderNo),sum(number) 拿到的结果是不正确的,不适用我们的需求。
MergeTree 虽然有主键,但是不能对相同主键的数据进行去重,我们的业务场景不能有重复数据。ClickHouse 提供了 ReplacingMergeTree 引擎用来去重,能够在合并分区时删除重复的数据。我理解的去重分两个方面,一个是物理去重,就是重复的数据直接被删除掉,另一个是查询去重,不处理物理数据,但是查询结果是已经将重复数据过滤掉的。
示例如下,ReplacingMergeTree 建表方法和 MergeTree 没有特别大的差异,只是 ENGINE 由 MergeTree 更改为 ReplacingMergeTree([ver]),其中 ver 是版本列,是一个选填项,官网给出支持的类型是 UInt ,Date 或者 DateTime,但是我试验 Int类型也是可以支持的 (ClickHouse 20.8.11)。ReplacingMergeTree 在数据合并时物理数据去重,去重策略如下。
CREATE TABLE test_ReplacingMergeTree ( orderNo String, version Int16, number Int16, createTime DateTime, updateTime DateTime) ENGINE = ReplacingMergeTree(version) PARTITION BY createTimeORDER BY (orderNo) PRIMARY KEY (orderNo);1) insert into test_ReplacingMergeTree values('1', 1, '20', '2021-01-01 00:00:00', '2021-01-01 00:00:00');2) insert into test_ReplacingMergeTree values('1', 2, '30', '2021-01-01 00:00:00', '2021-01-01 01:00:00');3) insert into test_ReplacingMergeTree values('1', 3, '30', '2021-01-02 00:00:00', '2021-01-01 01:00:00');-- final 方式去重 select * from test_ReplacingMergeTree final;-- argMax 方式去重 select argMax(orderNo,version) as orderNo, argMax(number,version) as number,argMax(createTime,version),argMax(updateTime,version) from test_ReplacingMergeTree;
下图是在执行完前两条 insert 语句后进行三次查询的结果,三种方式查询均未对物理存储的数据产生影响,final、argMax 方式只是查询结果是去重的。
其中 final 和 argMax 查询方式都过滤掉了重复数据。我们的示例都是基于本地表做的操作,final 和 argMax 在结果上没有差异,但是如果基于分布式表进行试验,两条数据落在了不同数据分片 (注意这里不是数据分区),那么 final 和 argMax 的结果将会产生差异。final 的结果将是未去重的,原因是 final 只能对本地表做去重查询,不能对跨分片的数据进行去重查询,但是 argMax 的结果是去重的。argMax 是通过比较第二参数 version 的大小,来取出我们要查询的最新数据来达到过滤掉重复数据的目的,其原理是将每个 Shard 的数据搂到同一个 Shard 的内存中进行比较计算,所以支持跨分片的去重。
由于后台的合并是在不确定时间执行的,执行合并命令,然后再使用普通查询,发现结果已经是去重后的数据,version=2,number=30 是我们想保留的数据。
执行第三条 insert 语句,第三条的主键和前两条一致,但是分区字段 createTime 字段不同,前两条是 2021-01-01 00:00:00,第三条是 2021-01-02 00:00:00,如果按照上述的理解,在强制合并会后将会保留 version = 3 的这条数据。我们执行普通查询之后发现,version = 1 和 2 的数据做了合并去重,保留了 2,但是 version=3 的还是存在的,这其中的原因 ReplacingMergeTree 是已分区为单位删除重复数据。前两个 insert 的分区字段 createTime 字段相同,partitionID 相同,所以都合并到了 1609430400_1_2_1 分区文件,而第三条 insert 与前两条不一致,不能合并到一个分区文件,不能做到物理去重。最后通过 final 去重查询发现可以支持查询去重,argMax 也是一样的效果未作展示。
ReplacingMergeTree 具有如下特点
ReplacingMergeTree 最佳使用方案
上述的三种使用方案中其中 ReplacingMergeTree 配合 final 方式查询,是符合我们需求的。
折叠合并树不再通过示例来进行说明。可参考官网示例。
CollapsingMergeTree 通过定义一个 sign 标记位字段,记录数据行的状态。如果 sign 标记位 1(《状态》行), 则表示这是一行有效的数据, 如果 sign 标记位为 -1(《取消》行),则表示这行数据需要被删除。需要注意的是数据主键相同才可能会被折叠。
在使用 CollapsingMergeTree 时候需要注意
1)与 ReplacingMergeTree 一样,折叠数据不是实时触发的,是在分区合并的时候才会体现,在合并之前还是会查询到重复数据。解决方式有两种
2)在写入方面通过《取消》行删除或修改数据的方式需要写入数据的程序记录《状态》行的数据,极大的增加存储成本和编程的复杂性。Flink 在上线或者某些情况下会重跑数据,会丢失程序中的记录的数据行,可能会造成 sign=1 与 sign=-1 不对等不能进行合并,这一点是我们无法接受的问题。
CollapsingMergeTree 还有一个弊端,对写入的顺序有严格的要求,如果按照正常顺序写入,先写入 sign=1 的行再写入 sign=-1 的行,能够正常合并,如果顺序反过来则不能正常合并。ClickHouse 提供了 VersionedCollapsingMergeTree,通过增加版本号来解决顺序问题。但是其他的特性与 CollapsingMergeTree 完全一致,也不能满足我们的需求
我们详细介绍了 MergeTree 系列中的 MergeTree、ReplacingMergeTree、CollapsingMergeTree、VersionedCollapsingMergeTree 四种表引擎,还有 SummingMergeTree、AggregatingMergeTree 没有介绍,SummingMergeTree 是为不关心明细数据,只关心汇总数据设计的表引擎。MergeTree 也能够满足这种只关注汇总数据的需求,通过 group by 配合 sum,count 聚合函数就可以满足,但是每次查询都进行实时聚合会增加很大的开销。我们既有明细数据需求,又需要汇总指标需求,所以 SummingMergeTree 不能满足我们的需求。AggregatingMergeTree 是 SummingMergeTree 升级版,本质上还是相同的,区别在于:SummingMergeTree 对非主键列进行 sum 聚合,而 AggregatingMergeTree 则可以指定各种聚合函数。同样也满足不了需求。
最终我们选用了 ReplacingMergeTree 引擎,分布式表通过业务主键 sipHash64(docId) 进行 shard 保证同一业务主键数据落在同一分片,同时使用业务单据创建时间按月/按天进行分区。配合 final 进行查询去重。这种方案在双十一期间数据日增 3000W,业务高峰数据库 QPS93,32C 128G 6 分片 2 副本的集群 CPU 使用率最高在 60%,系统整体稳定。下文的所有实践优化也都是基于 ReplacingMergeTree 引擎。
Flink 支持通过 JDBC Connector 将数据写入 JDBC 数据库,但是 Flink 不同版本的 JDBC connector 写入方式有很大区别。因为 Flink 在 1.11 版本对 JDBC Connector 进行了一次较大的重构:
两者对 Flink 中以不同方式写入 ClickHouse Sink 的支持情况如下:
起初我们使用 1.10.3 版本的 Flink,flink-jdbc 不支持使用 DataStream 流写入,需要升级 Flink 版本至 1.11.x 及以上版本来使用 flink-connector-jdbc 来写入数据到 ClickHouse。
/** * 构造 Sink * @param clusterPrefix clickhouse 数据库名称 * @param sql insert 占位符 eq:insert into demo (id, name) values (?, ?) */public static SinkFunction getSink(String clusterPrefix, String sql) { String clusterUrl = LoadPropertiesUtil.appInfoProcessMap.get(clusterPrefix + CLUSTER_URL); String clusterUsername = LoadPropertiesUtil.appInfoProcessMap.get(clusterPrefix + CLUSTER_USER_NAME); String clusterPassword = LoadPropertiesUtil.appInfoProcessMap.get(clusterPrefix + CLUSTER_PASSWORD); return JdbcSink.sink(sql, new CkSinkBuilder<>(), new JdbcExecutionOptions.Builder().withBatchSize(200000).build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withDriverName("ru.yandex.clickhouse.ClickHouseDriver") .withUrl(clusterUrl) .withUsername(clusterUsername) .withPassword(clusterPassword) .build());}
使用 flink-connector-jdbc 的 JdbcSink.sink() api 来构造 Flink sink。JdbcSink.sink() 入参含义如下
Flink 同时写入 ES 和 Clikhouse,但是在进行数据查询的时候发现 ClickHouse 永远要比 ES 慢一些,开始怀疑是 ClickHouse 合并等处理会耗费一些时间,但是 ClickHouse 这些合并操作不会影响查询。后来查阅 Flink 写入策略代码发现是我们使用的策略有问题。
上段 (4.2) 代码中 new JdbcExecutionOptions.Builder().withBatchSize(200000).build() 为写入策略,ClickHouse 为了提高写入性能建议进行不少于 1000 行的批量写入,或每秒不超过一个写入请求。策略是 20W 行记录进行写入一次,Flink 进行 Checkpoint 的时候也会进行写入提交。所以当数据量积攒到 20W 或者 Flink 记性 Checkpoint 的时候 ClickHouse 里面才会有数据。我们的 ES sink 策略是 1000 行或 5s 进行写入提交,所以出现了写入 ClickHouse 要比写入 ES 慢的现象。
到达 20W 或者进行 Checkpoint 的时候进行提交有一个弊端,当数据量小达不到 20W 这个量级,Checkpoint 时间间隔 t1,一次 checkpoint 时间为 t2,那么从接收到 JDQ 消息到写入到 ClickHouse 最长时间间隔为 t1+t2,完全依赖 Checkpoint 时间,有时候有数据积压最慢有 1~2min。进而对 ClickHouse 的写入策略进行优化,new JdbcExecutionOptions.Builder().withBatchIntervalMs(30 * 1000).build() 优化为没 30s 进行提交一次。这样如果 Checkpoint 慢的话可以触发 30s 提交策略,否则 Checkpoint 的时候提交,这也是一种比较折中的策略,可以根据自己的业务特性进行调整,在调试提交时间的时候发现如果间隔过小,zookeeper 的 cpu 使用率会提升,10s 提交一次 zk 使用率会从 5% 以下提升到 10% 左右。
Flink 中的 org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat#open 处理逻辑如下图。
先说结果,我们是写入分布式表。\
网上的资料和 ClickHouse 云服务的同事都建议写入本地表。分布式表实际上是一张逻辑表并不存储真实的物理数据。如查询分布式表,分布式表会把查询请求发到每一个分片的本地表上进行查询,然后再集合每个分片本地表的结果,汇总之后再返回。写入分布式表,分布式表会根据一定规则,将写入的数据按照规则存储到不同的分片上。如果写入分布式表也只是单纯的网络转发,影响也不大,但是写入分布式表并非单纯的转发,实际情况见下图。
有三个分片 S1、S2、S3,客户端连接到 S1 节点,进行写入分布式表操作。
从分布式表的写入方式可以看到,会将所有数据落到 client 连接分片的磁盘上。如果数据量大,磁盘的 IO 会造成瓶颈。并且 MergeTree 系列引擎存在合并行为,本身就有写放大 (一条数据合并多次),占用一定磁盘性能。在网上看到写入本地表的案例都是日增量百亿,千亿。我们选择写入分布式表主要有两点,一是简单,因为写入本地表需要改造代码,自己指定写入哪个节点,另一个是开发过程中写入本地表并未出现什么严重的性能瓶颈。双十一期间数据日增 3000W(合并后) 行并未造成写入压力。如果后续产生瓶颈,可能会放弃写入分布式表。
上图是在接入 ClickHouse 过程中遇到的一个问题,其中 7-1 节点 CPU 使用率非常高,不同节点的差异非常大。后来通过 SQL 定位发现不同节点上的数据量差异也非常大,其中 7-1 节点数据量是最多的,导致 7-1 节点相比其他节点需要处理的数据行数非常多,所以 CPU 相对会高很多。因为我们使用网格站编码,分拣仓编码 hash 后做分布式表的数据分片策略,但是分拣仓编码和网站编码的基数比较小,导致 hash 后不够分散造成这种数据倾斜的现象。后来改用业务主键做 hash,解决了这种部分节点 CPU 高的问题。
7-4 节点 (主节点和副本),CPU 毫无征兆的比其他节点高很多,在排除新业务上线、大促等突发情况后进行慢 SQL 定位,通过 query_log 进行分析每个节点的慢查询,具体语句见第 8 小节。
通过两个节点的慢 SQL 进行对比,发现是如下 SQL 的查询情况有较大差异。
SELECT ifNull(sum(t1.unTrackQty), 0) AS unTrackQtyFROM wms.wms_order_sku_local AS t1 FINAL PREWHERE t1.shipmentOrderCreateTime > '2021-11-17 11:00:00' AND t1.shipmentOrderCreateTime <= '2021-11-18 11:00:00' AND t1.gridStationNo = 'WG0000514' AND t1.warehouseNo NOT IN ('wms-6-979', 'wms-6-978', '6_979', '6_978') AND t1.orderType = '10'WHERE t1.ckDeliveryTaskStatus = '3'
但是我们有个疑惑,同样的语句,同样的执行次数,而且两个节点的数据量,part 数量都没有差异,为什么 7-4 节点扫描的行数是 7-0 上的 5 倍,把这个原因找到,应该就能定位到问题的根本原因了。\
接下来我们使用 clickhouse-client 进行 SQL 查询,开启 trace 级别日志,查看 SQL 的执行过程。具体执行方式以及查询日志分析参考下文 9.1 小节,这里我们直接分析结果。
上面两张图可以分析出
很明显 7-4 节点的 202111_0_408188_322 这个分区比较异常,因为我们是按月分区的,7-4 节点不知道什么原因发生了分区合并,导致我们检索的 11 月 17 号的数据落到了这个大分区上,所以但是查询会过滤 11 月初到 18 号的所有数据,和 7-0 节点产生了差异。上述的 SQL 通过 gridStationNo = ‘WG0000514’ 条件进行查询,所以在对 gridStationNo 字段进行创建二级索引后解决了这个问题。
在增加加二级索引后 7-4 节点:扫描了 2 个 part 分区文件,共计 38W 行,耗时 0.103s。
这种情况少见,但是也遇到过一次
我认为可以通过两个方向来排查问题,一个是 SQL 执行频率是否过高,另一个方向是判断是否有慢 SQL 在执行,高频执行或者慢查询都会大量消耗 CPU 的计算资源。下面通过两个案例来说明一下排查 CPU 偏高的两种有效方法,下面两种虽然操作上是不同的,但是核心都是通过分析 query_log 来进行分析定位的。
在 12 月份上线了一些需求,最近发现 CPU 使用率对比来看使用率偏高,需要排查具体是哪些 SQL 导致的。
通过上图自行搭建的 grafana 监控可以看出 (搭建文档),有几个查询语句执行频率非常高,通过 SQL 定位到查询接口代码逻辑,发现一次前端接口请求后端接口会执行多条相似条件的 SQL 语句,只是业务状态不相同。这种需要统计不同类型、不同状态的语句,可以进行条件聚合进行优化,9.4.1 小节细讲。优化后语句执行频率极大的降低。
上节说 SQL 执行频率高,导致 CPU 使用率高。如果 SQL 频率执行频率很低很低,但是 CPU 还是很高该怎么处理。SQL 执行频率低,可能存在扫描的数据行数很大的情况,消耗的磁盘 IO,内存,CPU 这些资源很大,这种情况下就需要换个手段来排查出来这个很坏很坏的 SQL(T⌓T)。
ClickHouse 自身有 system.query_log 表,用于记录所有的语句的执行日志,下图是该表的一些关键字段信息
-- 创建 query_log 分布式表 CREATE TABLE IF NOT EXISTS system.query_log_allON CLUSTER defaultAS system.query_logENGINE = Distributed(sht_ck_cluster_pro,system,query_log,rand());-- 查询语句 select -- 执行次数 count(), -- 平均查询时间 avg(query_duration_ms) avgTime, -- 平均每次读取数据行数 floor(avg(read_rows)) avgRow, -- 平均每次读取数据大小 floor(avg(read_rows) / 10000000) avgMB, -- 具体查询语句 any(query), -- 去除掉 where 条件,用户 group by 归类 substring(query, positionCaseInsensitive(query, 'select'), positionCaseInsensitive(query, 'from')) as queryLimitfrom system.query_log_all/system.query_logwhere event_date = '2022-01-21' and type = 2group by queryLimitorder by avgRow desc;
query_log 是本地表,需要创建分布式表,查询所有节点的查询日志,然后再执行查询分析语句,执行效果见下图,图中可以看出有几个语句平均扫秒行数已经到了亿级别,这种语句可能就存在问题。通过扫描行数可以分析出索引,查询条件等不合理的语句。7.2 中的某个节点 CPU 偏高就是通过这种方式定位到有问题的 SQL 语句,然后进一步排查从而解决的。
ClickHouse 的 SQL 优化比较简单,查询的大部分耗时都在磁盘 IO 上,可以参考下这个小实验来理解。核心优化方向就是降低 ClickHouse 单次查询处理的数据量,也就是降低磁盘 IO。下面介绍下慢查询分析手段、建表语句优化方式,还有一些查询语句优化。
虽然 ClickHouse 在 20.6 版本之后已经提供查看查询计划的原生 EXPLAIN,但是提供的信息对我们进行慢 SQL 优化提供的帮助不是很大,在 20.6 版本前借助后台的服务日志,可以拿到更多的信息供我们分析。与 EXPLAIN 相比我更倾向于使用查看服务日志这种方式进行分析,这种方式需要使用 clickhouse-client 进行执行 SQL 语句,文末有通过 docker 搭建 CK 环境文档。高版本的 EXPLAIN 提供了 ESTIMATE 可以查询到 SQL 语句扫描的 part 数量、数据行数等细粒度信息,EXPLAIN 使用方式可以参考官方文档说明。\
用一个慢查询来进行分析,通过 8.2 中的 query_log_all 定位到下列慢 SQL。
select ifNull(sum(interceptLackQty), 0) as interceptLackQtyfrom wms.wms_order_sku_local final prewhere productionEndTime = '2022-02-17 08:00:00' and orderType = '10'where shipmentOrderDetailDeleted = '0' and ckContainerDetailDeleted = '0'
使用 clickhouse-client,send_logs_level 参数指定日志级别为 trace。
clickhouse-client -h 地址 --port 端口 --user 用户名 --password 密码 --send_logs_level=trace
在 client 中执行上述慢 SQL,服务端打印日志如下,日志量较大,省去部分部分行,不影响整体日志的完整性。
[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.036317 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} executeQuery: (from 11.77.96.163:35988, user: bjwangjiangbo) select ifNull(sum(interceptLackQty), 0) as interceptLackQty from wms.wms_order_sku_local final prewhere productionEndTime = '2022-02-17 08:00:00' and orderType = '10' where shipmentOrderDetailDeleted = '0' and ckContainerDetailDeleted = '0'[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.037876 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} ContextAccess (bjwangjiangbo): Access granted: SELECT(orderType, interceptLackQty, productionEndTime, shipmentOrderDetailDeleted, ckContainerDetailDeleted) ON wms.wms_order_sku_local[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.038239 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} wms.wms_order_sku_local (SelectExecutor): Key condition: unknown, unknown, and, unknown, unknown, and, and, unknown, unknown, and, and[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.038271 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} wms.wms_order_sku_local (SelectExecutor): MinMax index condition: unknown, unknown, and, unknown, unknown, and, and, unknown, unknown, and, and[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.038399 [ 1340 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} wms.wms_order_sku_local (SelectExecutor): Not using primary index on part 202101_0_0_0_3[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.038475 [ 1407 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} wms.wms_order_sku_local (SelectExecutor): Not using primary index on part 202103_0_17_2_22[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.038491 [ 111 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} wms.wms_order_sku_local (SelectExecutor): Not using primary index on part 202103_18_20_1_22..................................省去若干行 (此块含义为:在分区内检索有没有使用索引).................................................[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.039041 [ 1205 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} wms.wms_order_sku_local (SelectExecutor): Not using primary index on part 202202_1723330_1723365_7[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.039054 [ 159 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} wms.wms_order_sku_local (SelectExecutor): Not using primary index on part 202202_1723367_1723367_0[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.038928 [ 248 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} wms.wms_order_sku_local (SelectExecutor): Not using primary index on part 202201_3675258_3700711_1054[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.039355 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} wms.wms_order_sku_local (SelectExecutor): Selected 47 parts by date, 47 parts by key, 9471 marks by primary key, 9471 marks to read from 47 ranges[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.039495 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} MergeTreeSelectProcessor: Reading 1 ranges from part 202101_0_0_0_3, approx. 65536 rows starting from 0[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.039583 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} MergeTreeSelectProcessor: Reading 1 ranges from part 202101_1_1_0_3, approx. 16384 rows starting from 0[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.040291 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} MergeTreeSelectProcessor: Reading 1 ranges from part 202102_0_2_1_4, approx. 146850 rows starting from 0..................................省去若干行(每个分区读取的数据行数信息).................................................[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.043538 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} MergeTreeSelectProcessor: Reading 1 ranges from part 202202_1723330_1723365_7, approx. 24576 rows starting from 0[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.043604 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} MergeTreeSelectProcessor: Reading 1 ranges from part 202202_1723366_1723366_0, approx. 8192 rows starting from 0[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.043677 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} MergeTreeSelectProcessor: Reading 1 ranges from part 202202_1723367_1723367_0, approx. 8192 rows starting from 0..................................完成数据读取,开始进行聚合计算.................................................[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.047880 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} InterpreterSelectQuery: FetchColumns -> Complete[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.263500 [ 1377 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} AggregatingTransform: Aggregating[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.263680 [ 1439 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} Aggregator: Aggregation method: without_key..................................省去若干行(数据读取完成后做聚合操作).................................................[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.263840 [ 156 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} AggregatingTransform: Aggregated. 12298 to 1 rows (from 36.03 KiB) in 0.215046273 sec. (57187.69187876137 rows/sec., 167.54 KiB/sec.)[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.264283 [ 377 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} AggregatingTransform: Aggregated. 12176 to 1 rows (from 35.67 KiB) in 0.215476999 sec. (56507.191284950095 rows/sec., 165.55 KiB/sec.)[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.264307 [ 377 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} Aggregator: Merging aggregated data..................................完成聚合计算,返回最终结果.................................................┌─interceptLackQty─┐│ 563 │└──────────────────┘...................................数据处理耗时,速度,信息展示................................................[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.265490 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} executeQuery: Read 73645604 rows, 1.20 GiB in 0.229100749 sec., 321455099 rows/sec., 5.22 GiB/sec.[chi-ck-t8ebn40kv7-3-0-0] 2022.02.17 21:21:54.265551 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} MemoryTracker: Peak memory usage (for query): 60.37 MiB.1 rows in set. Elapsed: 0.267 sec. Processed 73.65 million rows, 1.28 GB (276.03 million rows/s., 4.81 GB/s.)
现在分析下,从上述日志中能够拿到什么信息,首先该查询语句没有使用主键索引,具体信息如下
2022.02.17 21:21:54.038239 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} wms.wms_order_sku_local (SelectExecutor): Key condition: unknown, unknown, and, unknown, unknown, and, and, unknown, unknown, and, and
同样也没有使用分区索引,具体信息如下
2022.02.17 21:21:54.038271 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} wms.wms_order_sku_local (SelectExecutor): MinMax index condition: unknown, unknown, and, unknown, unknown, and, and, unknown, unknown, and, and
此次查询一共扫描 36 个 parts,9390 个 MarkRange,通过查询 system.parts 系统分区信息表发现当前表一共拥有 36 个活跃的分区,相当于全表扫描。
2022.02.17 21:44:58.012832 [ 1138 ] {f1561330-4988-4598-a95d-bd12b15bc750} wms.wms_order_sku_local (SelectExecutor): Selected 36 parts by date, 36 parts by key, 9390 marks by primary key, 9390 marks to read from 36 ranges
此次查询总共读取了 73645604 行数据,这个行数也是这个表的总数据行数,读取耗时 0.229100749s,共读取 1.20GB 的数据。
2022.02.17 21:21:54.265490 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} executeQuery: Read 73645604 rows, 1.20 GiB in 0.229100749 sec., 321455099 rows/sec., 5.22 GiB/sec.
此次查询语句消耗的内存最大为 60.37MB
2022.02.17 21:21:54.265551 [ 618 ] {ea8f56fe-cf2b-4260-8f44-a006458bdab3} MemoryTracker: Peak memory usage (for query): 60.37 MiB.
最后汇总了下信息,此次查询总共耗费了 0.267s,处理了 7365W 数据,共 1.28GB,并且给出了数据处理速度。
1 rows in set. Elapsed: 0.267 sec. Processed 73.65 million rows, 1.28 GB (276.03 million rows/s., 4.81 GB/s.)
通过上述可以发现两点严重问题
所以需要再查询条件上添加主键字段或者分区索引来进行优化。
shipmentOrderCreateTime 为分区键,在添加这个条件后再看下效果。
通过分析日志可以看到没有使用主键索引,但是使用了分区索引,扫描分片数为 6,MarkRange 186,共扫描 1409001 行数据,使用内存 40.76MB,扫描数据大小等大幅度降低节省大量服务器资源,并且提升了查询速度,0.267s 降低到 0.18s。
从实践上看,设置成 Nullable 对性能影响也没有多大,可能是因为我们数据量比较小。不过官方已经明确指出尽量不要使用 Nullable 类型,因为 Nullable 字段不能被索引,而且 Nullable 列除了有一个存储正常值的文件,还会有一个额外的文件来存储 Null 标记。
Using Nullable almost always negatively affects performance, keep this in mind when designing your databases.
CREATE TABLE test_Nullable( orderNo String, number Nullable(Int16), createTime DateTime) ENGINE = MergeTree() PARTITION BY createTimeORDER BY (orderNo) PRIMARY KEY (orderNo);
上述建表语句为例,number 列会生成 number.null.* 两个额外文件,占用额外存储空间,而 orderNo 列则没有额外的 null 标识的存储文件。
我们实际应用中建表,难免会遇到这种可能为 null 的字段,这种情况下可以使用不可能出现的一个值作为默认值,例如将状态字段都是 0 及以上的值,那么可以设置为-1 为默认值,而不是使用 nullable。
分区粒度根据业务场景特性来设置,不宜过粗也不宜过细。我们的数据一般都是按照时间来严格划分,所以都是按天、按月来划分分区。如果索引粒度过细按分钟、按小时等划分会产生大量的分区目录,更不能直接 PARTITION BY create_time ,会导致分区数量惊人的多,几乎每条数据都有一个分区会严重的影响性能。如果索引粒度过粗,会导致单个分区的数据量级比较大,上面 7.2 节的问题和索引粒度也有关系,按月分区,单个分区数据量到达 500W 级,数据范围 1 号到 18 号,只查询 17 号,18 号两天的数据量,但是优化按月分区,分区合并之后不得不处理不相关的 1 号到 16 号的额外数据,如果按天分区就不会产生 CPU 飙升的现象。所以要根据自己业务特性来创建,保持一个原则就是查询只处理本次查询条件范围内的数据,不额外处理不相关的数据。
以上文 7.1 中为例,分布式表选择的分片规则不合理,导致数据倾斜严重落到了少数几个分片中。没有发挥出分布式数据库整个集群的计算能力,而是把压力全压在了少部分机器上。这样整体集群的性能肯定是上不来的,所以根据业务场景选择合适的分片规则,比如我们将 sipHash64(warehouseNo) 优化为 sipHash64(docId),其中 docId 是业务上唯一的一个标识。
在聊查询优化之前先说一个小工具,clickhouse 提供的一个 clickhouse-benchmark 性能测试工具,环境和前文提到的一样通过 docker 搭建 CK 环境,压测参数可参考官方文档,这里我举一个简单的单并发测试示例。
clickhouse-benchmark -c 1 -h 链接地址 --port 端口号 --user 账号 --password 密码 <<< "具体 SQL 语句"
通过这种方式可以了解 SQL 级别的 QPS 和 TP99 等信息,这样就可以测试语句优化前后的性能差异。
假设一个接口要统计某天的” 入库件量”,” 有效出库单量”,” 复核件量”。
-- 入库件量 select sum(qty) from table_1 final prewhere type = 'inbound' and dt = '2021-01-01';-- 有效出库单量 select count(distinct orderNo) final from table_1 prewhere type = 'outbound' and dt = '2021-01-01' where and status = '1' ;-- 复核件量 select sum(qty) from table_1 final prewhere type = 'check' and dt = '2021-01-01';
一个接口出三个指标需要上述三个 SQL 语句查询 table_1 来完成,但是我们不难发现 dt 是一致的,区别在于 type 和 status 两个条件。假设 dt = ‘2021-01-1’ 每次查询需要扫描 100W 行数据,那么一次接口请求将会扫描 300W 行数据。通过条件聚合函数优化后将三次查询改成一次,那么扫描行数将降低为 100W 行,所以能极大的节省集群的计算资源。
select sumIf(qty, type = 'inbound'), -- 入库件量 countIf(distinct orderNo, type = 'outbound' and status = '1'), -- 有效出库单量 sumIf(qty, type = 'check') -- 复核件量 prewhere dt = '2021-01-01';
条件聚合函数是比较灵活的,可根据自己业务情况自由发挥,记住一个宗旨就是减少整体的扫描量,就能到达提升查询性能的目的。
MergeTree 系列的表引擎可以指定跳数索引。\
跳数索引是指数据片段按照粒度 (建表时指定的 index_granularity) 分割成小块后,将 granularity_value 数量的小块组合成一个大的块,对这些大块写入索引信息,这样有助于使用 where 筛选时跳过大量不必要的数据,减少 SELECT 需要读取的数据量。
CREATE TABLE table_name( u64 UInt64, i32 Int32, s String, ... INDEX a (u64 * i32, s) TYPE minmax GRANULARITY 3, INDEX b (u64 * length(s)) TYPE set(1000) GRANULARITY 4) ENGINE = MergeTree()...
上例中的索引能让 ClickHouse 执行下面这些查询时减少读取数据量。
SELECT count() FROM table WHERE s < 'z'SELECT count() FROM table WHERE u64 * i32 == 10 AND u64 * length(s) >= 1234
支持的索引类型
创建二级索引示例
Alter table wms.wms_order_sku_local ON cluster default ADD INDEX belongProvinceCode_idx belongProvinceCode TYPE set(0) GRANULARITY 5;Alter table wms.wms_order_sku_local ON cluster default ADD INDEX productionEndTime_idx productionEndTime TYPE minmax GRANULARITY 5;
重建分区索引数据:在创建二级索引前插入的数据,不能走二级索引,需要重建每个分区的索引数据后才能生效
-- 拼接出所有数据分区的 MATERIALIZE 语句 select concat('alter table wms.wms_order_sku_local on cluster default ', 'MATERIALIZE INDEX productionEndTime_idx in PARTITION '||partition_id||',') from system.partswhere database = 'wms' and table = 'wms_order_sku_local'group by partition_id-- 执行上述 SQL 查询出的所有 MATERIALIZE 语句进行重建分区索引数据
对比下 final 和 argMax 两种方式的性能差距,如下 SQL
-- final 方式 select count(distinct groupOrderCode), sum(arriveNum), count(distinct sku) from tms.group_order final prewhere siteCode = 'WG0001544' and createTime >= '2022-03-14 22:00:00' and createTime <= '2022-03-15 22:00:00' where arriveNum > 0 and test <> '1'-- argMax 方式 select count(distinct groupOrderCode), sum(arriveNumTemp), count(distinct sku) from (select argMax(groupOrderCode,version) as groupOrderCode, argMax(arriveNum,version) as arriveNumTemp, argMax(sku,version) as sku from tms.group_order prewhere siteCode = 'WG0001544' and createTime >= '2022-03-14 22:00:00' and createTime <= '2022-03-15 22:00:00' where arriveNum > 0 and test <> '1' group by docId)
final 方式的 TP99 明显要比 argMax 方式优秀很多
ClickHouse 的语法支持了额外的 prewhere 过滤条件,它会先于 where 条件进行判断,可以看做是更高效率的 where,作用都是过滤数据。当在 sql 的 filter 条件中加上 prewhere 过滤条件时,存储扫描会分两阶段进行,先读取 prewhere 表达式中依赖的列值存储块,检查是否有记录满足条件,在把满足条件的其他列读出来,以下述的 SQL 为例,其中 prewhere 方式会优先扫描 type,dt 字段,将符合条件的列取出来,当没有任何记录满足条件时,其他列的数据就可以跳过不读了。相当于在 Mark Range 的基础上进一步缩小扫描范围。prewhere 相比 where 而言,处理的数据量会更少,性能会更高。看这段话可能不太容易理解,
-- 常规方式 select count(distinct orderNo) final from table_1 where type = 'outbound' and status = '1' and dt = '2021-01-01';-- prewhere 方式 select count(distinct orderNo) final from table_1 prewhere type = 'outbound' and dt = '2021-01-01' where and status = '1' ;
上节我们说了使用 final 进行去重优化。通过 final 去重,并且使用 prewhere 进行查询条件优化时有个坑需要注意,prewhere 会优先于 final 进行执行,所以对于 status 这种值可变的字段处理过程中,能够查询到中间状态的数据行,导致最终数据不一致。
如上图所示,docId:123_1 的业务数据,进行三次写入,到 version=103 的数据是最新版本数据,当我们使用 where 过滤 status 这个可变值字段时,语句 1,语句 2 结果如下。
--语句 1:使用 where + status=1 查询,无法命中 docId:123_1 这行数据 select count(distinct orderNo) final from table_1 where type = 'outbound' and dt = '2021-01-01' and status = '1';--语句 2:使用 where + status=2 查询,可以查询到 docId:123_1 这行数据 select count(distinct orderNo) final from table_1 where type = 'outbound' and dt = '2021-01-01' and status = '2';
当我们引入 prewhere 后,语句 3 写法:prewhere 过滤 status 字段时将 status=1,version=102 的数据会过滤出来,导致我们查询结果不正确。正确的写法是语句 2,将不可变字段使用 prewhere 进行优化。
-- 语句 3:错误方式,将 status 放到 prewhereselect count(distinct orderNo) final from table_1 prewhere type = 'outbound' and dt = '2021-01-01' and status = '1';-- 语句 4:正确 prewhere 方式,status 可变字段放到 where 上 select count(distinct orderNo) final from table_1 prewhere type = 'outbound' and dt = '2021-01-01' where and status = '1' ;
其他限制:prewhere 目前只能用于 MergeTree 系列的表引擎
ClickHouse 非常适合存储大数据量的宽表,因此我们应该避免使用 SELECT * 操作,这是一个非常影响的操作。应当对列进行裁剪,只选择你需要的列,因为字段越少,消耗的 IO 资源就越少,从而性能就越高。\
而分区裁剪就是只读取需要分区,控制好分区字段查询范围。
where 和 group by 中的列顺序,要和建表语句中 order by 的列顺序统一,并且放在最前面使得它们有连续不间断的公共前缀,否则会影响查询性能。
-- 建表语句 create table group_order_local( docId String, version UInt64, siteCode String, groupOrderCode String, sku String, ... 省略非关键字段 ... createTime DateTime) engine = ReplicatedReplacingMergeTree('/clickhouse/tms/group_order/{shard}', '{replica}', version) PARTITION BY toYYYYMM(createTime) ORDER BY (siteCode, groupOrderCode, sku);--查询语句 1select count(distinct groupOrderCode) groupOrderQty, ifNull(sum(arriveNum),0) arriveNumSum,count(distinct sku) skuQtyfrom tms.group_order finalprewhere createTime >= '2021-09-14 22:00:00' and createTime <= '2021-09-15 22:00:00'and siteCode = 'WG0000709'where arriveNum > 0 and test <> '1'--查询语句 2(where/prewhere 中字段)select count(distinct groupOrderCode) groupOrderQty, ifNull(sum(arriveNum),0) arriveNumSum,count(distinct sku) skuQtyfrom tms.group_order finalprewhere siteCode = 'WG0000709' and createTime >= '2021-09-14 22:00:00' and createTime <= '2021-09-15 22:00:00'where arriveNum > 0 and test <> '1'
建表语句 ORDER BY (siteCode, groupOrderCode, sku),语句 1 没有符合要求经过压测 QPS6.4,TP99 0.56s,语句 2 符合要求经过压测 QPS 14.9,TP99 0.12s
1)降低查询速度,提高吞吐量
max_threads:位于 users.xml 中,表示单个查询所能使用的最大 CPU 个数,默认是 CPU 核数,假如机器是 32C,则会起 32 个线程来处理当前请求。可以把 max_threads 调低,牺牲单次查询速度来保证 ClickHouse 的可用性,提升并发能力。可通过 jdbc 的 url 来配置
下图是基于 32C128G 配置,在保证 CK 集群能够提供稳定服务 CPU 使用率在 50% 的情况下针对 max_threads 做的一个压测,接口级别压测,一次请求执行 5 次 SQL,处理数据量 508W 行。可以看出 max_threads 越小,QPS 越优秀 TP99 越差。可根据自身业务情况来进行调整一个合适的配置值。
2)接口增加一定时间的缓存\
3)异步任务执行查询语句,将聚合指标结果落到 ES 中,应用查询 ES 中的聚合结果\
4)物化视图,通过预聚合方式解决这种问题,但是我们这种业务场景不适用
•更改ORDER BY 字段,PARTITION BY,备份数据,单表迁移数据等操作
•基于 docker 搭建 clickhouse-client 链接 ck 集群
•基于 docker 搭建 grafana 监控 SQL 执行情况
作者:京东物流 马红岩
内容来源:京东云开发者社区