Doris在作业帮实时数仓中的应用与实践
置顶
三寸九州 发布于2021-04-07 浏览:1712 回复:3
0
收藏
最后编辑于2021-05-25

9月20日的Apache Doris线上Meetup——壹佰案例峰会预热沙龙圆满成功,现在为大家带来这次Meetup的内容回顾。
本次Meetup请到了来自作业帮和百度画像团队的技术大牛带来Apache Doris应用与实践上的经验分享,了解更多详情请关注Doris官方公众号。嘉宾分享回顾会陆续放出,公众号后台回复“0920”立即get回放录像。

 

 


糜利敏

作业帮大数据查询引擎负责人

伴随着业务的快速发展,由于缺失统一的查询系统,作业帮整个数据中台交付项目的成本非常高,达到数人周甚至月,为了解决如上问题,引入Apache Doris,并基于Doris做了统一元数据、统一读写服务、规范化运维等工作,数据中台交付效率提高数十倍,目前小时级交付;并对于特定场景下的查询性能提高了数十到数百倍;查询服务保持高稳定性,线上运行近1年,0事故(>=p2)。本次将分享这其中的实践和踩过的坑。

 

 

讲座分为三部分内容:

业务与背景介绍

基于Doris的实时查询系统

未来规划

 

 

 

 

 

1 业务与背景介绍

 

 

我所在团队是作业帮大数据团队,主要负责建设公司级数仓,向各个产品线提供面向业务的数据信息,如到课时长、答题情况等业务数据以及如PV、UV、活跃等流量类数据,服务于拉新、教学、BI等多个重要业务线。

在数仓体系中,大数据团队主要负责到ODS-DWS的建设,从DWS到ADS一般是数仓系统和业务线系统的边界。

 

 

在过去,由于缺失有效、统一的查询系统,我们探索了很多模式来支持各个业务线发展。

有些业务线对大数据相关技术比较了解,熟悉Spark等计算系统,可以自己处理计算。因此会选用Kafka 接收数据后使用Spark计算的模式来对接大数据团队;但是其他业务线不一定熟悉这套技术栈,因此这种方案的主要问题无法复制到其他业务线。且Spark集群跨越多个业务线使用,本身就给业务线带来了额外的维护成本。

既然Kafka+Spark的模式无法大范围推广,我们又探索了基于ES的方案,即大数据将数据写入ES中,然后业务先直接访问ES来获取数据,但是发现一方面高性能的使用ES,本身就具有很高的成本,对ES得非常熟悉,这对于业务线来说很难有精力去做,其次,由于使用ES的系统质量参差不齐,偶会还发生将ES集群打垮的问题,稳定性也不可控,最后ES-SQL完备性不足,如不支持join、多列group by(6.3版本)等。

因此我们又探索开发API接口,希望在稳定性上可以有更好的解决方案。虽然API可以可控,但是由于API不提供SQL功能,基于需求场景不断Case by case的API开发反而成了影响交付效率的主要瓶颈点。

上述多是支持查询明细数据,一旦涉及到大规模的流量类查询,如PV、UV,只好引入Druid类系统,但是Duird的接口和其他系统的接口不一致,用户往往又得学习,且Druid不支持明细,一旦需要明细,就需要到ES去查询,由于涉及两套系统,有时候还得处理明细数据和聚合数据不一致的问题。

随着需求越来越多,系统也越来越难以维护,交付效率也特别低,需求排队非常严重。因此,提供有效而统一的查询系统,对于实时数仓建设在提高业务支持效率、降低维护成本上都具有非常重大的意义。

经过过去数月的探索与实践,我们确立了以Doris为基础的实时查询系统。同时也对整个实时数仓的数据计算系统做了一次大的重构,最终整体的架构图如下:

 

 

如图所示(从下到上),原始业务层日志经数据摄入系统进入数仓,在数据清洗计算层,我们将原来基Spark系统升级到了Flink,并且基于Flink-Sql提供了统一的数据开发框架,从原有的代码开发升级到SQL开发来提升数据的研发效率。

其后查询系统将Kafka的数据实时同步到查询引擎内,并通过OpenAPI的统一接口对外提供查询服务。

 

 

基于Doris的查询系统上线后,我们面对一个需求,不用像过去一样做方案调研、开发接口、联调测试,现在只要把数据写入,业务层就可以基于SQL自己完成数据查询、业务开发,交付效率(数据计算好到提供可读服务)从过去的数人周加快到小时级。

在性能方面,过去基于ES或者MySQL来做,当查询的数据量较大时,我们只能忍受数十个小时到数分钟的延迟,基于Doris的方案,加快到分钟级甚至秒级。

Doris的整体架构非常简单,不依赖任何第三方组件,社区支持度也非常好,从上线到今,我们只需做一些轻量级的运维规范,即可保证高稳定性。

所以说,通过引入Doris,解决了作业帮内实时数仓查询交付慢、查询慢的痛点问题,对于后续数仓的系统发展起到了非常关键的作用。

 

 

 

2 基于Doris的实时查询系统

 

接下来,重点讲下查询系统的工作。

主要分两部分:查询系统的架构选型以及原理,以及应用&实践

 

2.1 系统选型&原理
在讲查询引擎之前,先讲下业务场景。

 

 

作业帮内,业务场景主要分两种

一种是传统的流量类,比如算PV、UV、活跃……,作业帮内很多时候还需要看进一步的明细。

比如:作业帮主App 在每天各个小时的活跃用户数,还要看作业帮主App每个小时内各个版本的活跃用户数。

 

 

第二种是面向我们业务线的工作台,比如教学的老师。

比如:我们的老师上完课后,会看下自己班内的同学们的出勤数据、课堂测验数据等。

 

 

这两种场景下,这块考虑到调研成本、团队技术生态、维护成本等多种因素,我们最后选择了Doris 作为我们的查询引擎。主要是Doris可在上述两种场景下都可以统一的满足业务的需求。

 

 

首先介绍下Doris

Doris是MPP架构的查询引擎,整体架构非常简单,只有FE、BE两个服务,FE负责SQL解析、规划以及元数据存储,BE负责SQL-Plan的执行以及数据的存储,整体运行不依赖任何第三方系统,功能也非常丰富如支持丰富的数据更新模型、MySQL协议、智能路由等。对于业务线部署运维到使用都非常友好。

接下来讲下用Doris如何解决我们前面提到的业务场景下的问题。

 

 

Doris有多种数据模型,流量类场景常用的是聚合模型。比如对于前面提到的场景,我们会把作业帮主App各个版本的明细数据存到base表中,如果直接从base表中读取跨天级的聚合数据,由于数据行比较多,可能会出现查询延迟的问题,因此我们会对常用的天级数据做一次rollup,这样通过预聚合,来减少查询的数据量,可以加快查询的延迟。

 

 

要高效的使用Doris的聚合模型,前提都是基于key列做数据行筛选,如果使用value列,Doris需要把相关的行全部聚合计算后方可决策是否属于结果集,因此效率比较低。

而对于教研工作台,前面提到的都是基于value的筛选,因此使用了Doris on ES的模型。主要是考虑到 可以发挥ES的任意列检索的能力,来加快查询速度。

在我们的实践中,发现Doris on ES相比直接裸用ES或社区的其他方案如Presto on ES在性能上有很大的提升,接下来介绍下Doris on ES高性能的设计原理。

 

 

Doris on ES整体的架构如图,FE负责查询ES的元数据信息如location、shard等,BE负责从ES数据节点扫描数据。

 

 

Doris on ES高性能,相比裸用ES,有几个优化点:

裸用ES时,ES采用的是Query then Fetch的模式,比如请求1000条文档,ES有10个分片,这时候每个分片都会给协调返回1000个doc id,然后 协调节点其实拿到了10 * 1000个doc id,然后选择1000个。这样其实每个分片多返回了900个.

Doris on ES则绕过了协调节点直接去操作datanode。它会在每个datanode上查询符合预期的docid,这样不会有过多的docid返回。

 

 

其次,Doris从ES扫描数据时,也做了很多优化。比如在扫描速度上,采用了顺序扫描、列存优化、谓词下推等,在数据从ES传输到Doris时,采用就近原则如BE会优先访问本机的datanode、source filter来过滤不用的字段等来加速传输速度。 

在我们的调研中,Doris on ES的性能,比Presto on ES快了有数十倍。

 

 

2.2 应用实践
在作业帮内,除了上面介绍的基于Doris的数据模型做的基础应用,要完整的支持业务、保证稳定性、提高效率,还需要其他周边的系统建设。

接下来介绍下基于Doris,作业帮查询系统架构的整体设计以及工作模式。

 

 

这是作业帮查询系统的总体架构。

从上往下,首先是我们平台,包括各个报表平台、元数据管理平台等,主要来提高各个场景的人效。

其下红色部分为我们统一的API接口层,这里我们主要是制定了API的规范比如请求响应方式、返回码等,来减少系统之间对接的成本。

基于api除了提供了主要的读写接口外,也包含了周边的服务建设,比如元数据管理、调度系统等。 

 

 

接下来就基于一个完整的流程来介绍下各部分系统。

 

 

首先是元数据,Doris基于mySQL语法建表,已经有元数据,我们这里做元数据,有几个额外的考虑:

首先是保障查询性能方面:如果一个表在建表时配置写错,那么查询性能会非常差,比如ES的index mapping中关闭了docvalue,或者Doris表未启动列存模式,那么查询就会退化成行存模式,性能会比较低,因此为了最大化性能,就需要将建表的过程全部自动化且规范化。这是其一。

Doris自身存储是有强Schema约束的,比如一个字符串的长度。但是ES并没有明确的长度约束,对于一个keyword类型的字段,写入128B或者256B都可以成功,但这会导致一个问题,当把一张es表同步到doris表时,同步的成功率无法保障。另外,一旦doris表声明的类型(如bigint)和ES index的类型不一致(如keyword)时,也会导致Sql运行失败。因此需要构建统一的数据模型来避免这类问题。

第三:使用效率。我们在使用过程中,建表、删除表、修改表是一个常见的操作,为了让各个业务线的同学(不管是否了解Doris)都可以快速的建表,这也是要做统一元数据、统一模型的基础。

最后,前面也提到了我们整个计算系统也在重构为Flink-SQL。Flink-SQL则会强依赖元数据,比如Table on Kafka、Table on Redis……

 

 

要统一元数据,统一数据模型,就得抽象整个数据表的结构,来管理好不同存储上的表,我们基于env、db、table为基本单位来管理表,database、table大家相对熟悉,env是我们引入的新namespace,主要用于提供不同集群/业务线的定义,如百度云的数仓集群、腾讯云的数仓集群,表单元下主要包含field(列类型、值域)、index(如rollup、bitmap索引等)、storage(存储属性)。

关于列属性,主要是规范化类型系统,考虑到json-schema由于其校验规则丰富、描述能力强,因此对于列值的约束统一使用json-schema来做。

对于数据类型,我们设计了公共数据类型以及私有数据类型。公共类如varchar、int等,这些在不同的存储系统都有对应的实现,也支持私有类型如doris::bitmap,方便私有系统的兼容和扩展。通过这个模式可以将基于各个存储系统的表做了统一的管理。

 

 

这是我们线上的真实的一张表,里面包含了列信息以及对应的存储配置。

左图中的纵向红框是json-schema的描述,来规范化值域。横向红框为ES表的一些meta字段,比如docid、数据更新时间。这些字段可以方便追查数据问题、以及用作数据筛选。

因为我们统一了数据模型,因此可以很方便的对所有表统一设置要增加这些meta字段。

 

 

通过元数据的统一管理,构建的表质量都非常高。所有的表都在最大化性能的提供查询服务,且由于数据导致的查询不可用Case为0。且对于任何业务线的同学,不管是否了解Doris,都可以分钟级构建出这样一张高质量的表。

 

 

建好表后,就是数据的写以及读,统一基于Open API来做。

做API接口其实本质上也是为了在提供系统能力的前提下,进一步保障系统的稳定性和易用性。

比如:要控制业务线的误用(如连接数打满),提供统一的入口方便写ES、Doris,且控制数据质量……

 

 

首先介绍下数据写接口。

由于统一了表模型,因此可以很方便的提供统一的写入接口协议。用户也无须关注实际表的存储是ES还是Doris以及处理异构系统的系统。

第二,统一了写接口,就可以统一的对写入的数据会做校验检查,如数据的大小、类型等,这样可以保证数据写入的质量与准确性。这样对于数据的二次加工非常重要。

第三,接入协议中还增加了关键词,如数据的版本。可以解决数据的乱序问题,以及建立统一的写入监控。如下图是我们整个写入数据流的QPS以及端到端(数据写入存储时间以及数据生产时间)的延迟分位值,这样可以让系统提高可观测性、白盒化。

 

 

接下来讲一个具体的场景,写入端是如何解决乱序问题的。

常态下我们的实时数据流是经过Flink或Spark计算后写入Kafka,然后由查询系统同步到Doris/ES中。

 

 

当需要修数时,如果直接写入,会导致同一个key的数据被互相覆盖,因此为了避免数据被乱序覆盖,就得必须停掉实时流,这个会导致数据时效性式受损。

因此我们基于写入端做了改进,实时数据流、离线修复数据流各自写入不同的topic,同步服务对每个topic做限速消费,如实时流时效性要求高,可以配额调的大些,保证配额,离线时效性则允许配额小点,或者在业务低峰期将配额调大,并基于数据key&列版本存储做了过滤。这样可以保证时效性的前提下,修数也可以按照预期进行。

 

 

最后是读的部分。

在提供SQL能力的前提下,我们也做了一些额外的方案,比如缓存、统一的系统配置。对于系统延迟、稳定性提升都有很大的改进。并且由于统一了读接口,上述的这些改造,对于业务线来说都是透明的。

 

 

除了常规下面向低延迟的读,还有一类场景面向吞吐的读。

介绍下场景,比如 要统计统计某个学部下(各个老师)的学生上课情况:上课人数、上课时长等。 

在过去,我们是基于Spark/Flink来处理这类问题,如Spark消费Kafka中的课中数据,对于每一条数据,会去Redis中查询教师信息来补全维度。

常态下,当课中数据到达的时候,教师信息是就绪的,因此没有什么问题。可是在异常下,如维度流迟到、存储查询失败等,会导致课中流到达时,无法获取对应的教师信息,也就无法计算相关维度如学部的统计。 

过去面临这种情况时,只能遇到这种异常,如重试如果无法解决,只能丢弃或者紧急人工干预,比如在尾标就绪后再重新回刷课中表,一旦遇到上游Kafka数据过期就只能从ODS层或者离线修复,效率特别低,用户体验也非常差。

 

 

基于Doris模式下,我们使用微批调度的模式。

调度系统会定期(分钟级)执行一个调度任务,基于SQL join完成数据的选取。这样哪怕在异常下,课中流查不到教师数据,这样join的结果只是包含了可以查到教师数据的信息,

待教师数据就绪后,即可自动补全这部分课中数据的维度。整个过程全部自动化来容错。效率非常高。

 

 

 

因此这个模式的主要好处

业务端延迟可控、稳定性好。整个过程主要取决于调度的周期和SQL执行时长。调度周期可控,且由于Doris on ES的高性能,SQL执行时长几乎都可以在分钟内完成。

数据修复成本低、维护方便。一旦数据有异常,可以自动触发对应的数据窗口进行重新计算。

 

 

最后,讲下其他方面的建议实践,这些相对简单,但是在实际的应用中非常容易忽视。

ADS层表,尤其是面向平台侧的应用,慎用join。Doris的join策略比较多,如broadcast、shuffer等,如果使用需要了解原理,属于高级用户的使用范畴。对于强调快速迭代的场景下,可以使用微批模式来略降低数据更新的延迟,提高数据查询的效率。

使用Doris on ES时,尤其是在ES集群负载很高的情况下,在延迟允许的情况下建议将es的扫描超时时间设置大一点,如30s甚至更久。

Batch size,不是越大越好。我们实践中发现4096下最好,可以最高达到每秒30w的扫描速度。

Doris使用Bitmap做精确去重时,有时候会发现Sql延迟比较高,但是系统CPU利用率低,可以通过调大fragment_instance_num的值。

运维Doris时,建议使用Supervisor,可以帮助避免很多服务异常挂掉的问题;机器全部开启ulimit –c,避免出core时无法高效定位

当前我们在使用master版本,主要是考虑到bugfix很及时,但是也要避免新代码、feature的bug引入,因此我们会关注社区的issue、并做好case回归、固化使用模式等一系列手段来保障master在实际生产中的稳定性。

 

 

 

 

3 未来规划

 

最后讲下规划,Doris 在作业帮实时数仓的建设中发挥了很关键的作用。在实际的应用中,我们也发现了一些当前的一些不足。

 

 

Doris on ES在面对大表的join查询时,目前延迟还比较大,因此需要进一步的优化解决;

Doris自身的OLAP表可以做动态分区,对于ES表目前可控性还不足;

其次,当ES修改表后,如增加字段,只能删除Doris表重建,可能会有短暂的表不可用,需要自动化同步或者支持在线热修改;

最后Doris on ES可以支持更多的谓词下推,如count等。

 

我们也希望可以和社区一起,把Doris建设的越来越好。

我的分享到此,谢谢大家。

 

 

 


问题1

Doris on ES V.S. SparkSQL on ES,在功能上和性能上咱们调研过吗?对于使用哪个您这边有什么建议吗?


SparkSQL on ES和Doris on ES 虽然都是SQL,但是在实际的生产环境中使用差异还是比较大的。

功能上来说,SparkSQL和Doris-SQL需要考虑语法的兼容性问题,毕竟是两个系统,语法兼容其实很难。一旦不一致就需要用户端面向不同的系统做适配。

性能上,SparkSQL或者Doris on ES,虽然访问ES的原理都差不多,但是实现上可能会有diff,这些diff会导致性能上差异比较大,如SparkSQL的connector是不支持列存模式的。

场景上,如果使用SparkSQL建议可以使用在流计算场景,更多的是解决吞吐的问题,类似的系统应该是Flink-SQL。可以把数据按照行扫出来后,基于Spark的分布式计算能力、yarn的资源管理走流计算的模式。Doris on ES更适合走低延迟的场景。

 


问题2

Doris 支持Hive Metastore,和Flink SQL是什么关系?刚才讲的太快,有点没听懂


Doris其实是不支持Hive MetaStore的。只是可以从HDFS上load文件,然后在Doris的load语法中指定对应的列。

FlinkSQL和这块关系不大。不过我理解你说的应该是我们的元数据,这部分背景是因为Flink-SQL运行时需要设置DDL语句,比如一张基于Redis的表都有哪些列,类型是什么,这些需要统一的管理起来,目前是存储到了我们的元数据系统中,通过接口和Flink系统完成对接。

 


问题3

_version字段是一个内部字段?需要用户端写入的时候指定,还是系统自动创建?和HBase的version的应用场景有区别吗?


_version是我们数据流的一个内置协议字段。在数流转过程中,用户只要设置值即可,不需要显示创建。具体的值可以根据数据字段的写入服务来设置,比如在ODS层,应该是采集侧服务来写入,如果在中间的Flink清洗环节,应该是Flink系统来设置,尽量让架构服务统一设置,保证稳定性。

_version字段最终会映射到存储系统中的UpdateTime字段,这个也是架构负责写入的,不需要业务侧关注。

HBase的version更多是用于多版本的管理,比如数据的回滚等。这里查询系统的_version更多是为了保证数据的时鲜性,即用户从查询系统读到的数据始终是最新的。这么做的前提主要是因为查询系统比如ES对于数据列多版本支持不太好,对于数据流更新时如果没有版本管理,容易导致乱序覆盖,和HBase的version场景还不同。

 

 

 

 

 

 

 

欢迎扫码关注:

Apache Doris(incubating)官方公众号

 

相关链接:

Apache Doris官方网站:

http://doris.incubator.apache.org

Apache Doris Github:

https://github.com/apache/incubator-doris

Apache Doris 开发者邮件组:

dev@doris.apache.org 

收藏
点赞
0
个赞
共3条回复 最后由doubi渣渣回复于2021-04-07
#3doubi渣渣回复于2021-04-07

架构比较复杂的话,对维护要求就比较高

0
#2doubi渣渣回复于2021-04-07

两层kafka……

0
快速回复
TOP
切换版块