Spark Doris Sink的设计和实现
三寸九州 发布于2021-04 浏览:4097 回复:1
0
收藏

6月29日,Doris有幸得到中国信通院云大所、大数据技术标准推进委员会的支持,在中国信通院举行了0.11.0新版本预览线下沙龙。各位嘉宾都带来了干货满满的分享。关注Doris官方公众号,后台回复“0629”即可获取各位嘉宾分享PPT及现场录像。

 

 

今天是朱良昌同学代表百度智能云流式计算团队带来Spark Streaming对接Doris 设计与实现的分享。

 

 

业务场景

 

 

Spark Streaming(主要是Structured Streaming)在百度内部被广泛应用于实时计算,日志分析,ETL等业务场景。其中有很多业务方希望可以使用structured streaming读取上游数据源(例如:kafka、 hdfs、 database等),然后对数据进行处理后实时导入Doris以供查询分析。

 

为此流式计算团队专门开发了Doris sink的组件来适配Doris。Doris sink支持exactly-once语义,封装并对用户屏蔽了与Doris的交互细节,用户只需要关注用户逻辑和计算本身,经过简单配置即可非常方便的将流式数据导入到Doris中。

 

Structured Streaming介绍

 

Structured Streaming是Spark 2.3版本之后提出的新的流式计算引擎,具有以下特点:

 

1. 具备良好的扩展性和高容错性

2. 基于Spark-SQL引擎,使用DataFrame API,使用简单

3. 相比于Spark 2.2版本之前使用DStream API的spark Streaming模型,Structured Streaming支持更加丰富的流式语义。例如:基于eventTime的window、聚合计算,watermark,流和static DataFrame的join,流和流的简单join等

4. 端到端的exactly-once语义。非常适用于要求数据不重不丢的业务

 Spark工作模型 

 

 

 

Spark作业整体架构如上图所示:

 

Driver:作为程序的入口,执行用户代码,生成DAG(有限无环图),对整个app的资源进行协调和管理。

Executor: 执行用户逻辑,dataset计算(transformation和action)。一个executor中可以并行执行多个task,task的并发量由启动executor时指定的core数决定,而每个task负责对一个partition进行计算。

Cluster Manager: 百度内部使用的主要是Yarn。

 

 Structured Streaming编程模型 

 

 

Structured Streaming与传统意义上的流式计算系统不同,它是一个微批次(micro-batch)的流式计算系统。其主要原理是将源源不断的数据,切分成一个一个的小数据块,其中每一小数据块称之为一个batch。当每次触发计算时,系统会处理一个batch中的数据,而batch和batch之间则是串行执行的。一个batch的计算,可以看做是一个Spark-SQL job的一次执行,故一个Structured Streaming作业可以看做是无穷多个job组成的一个不会停止作业。

 WordCount Demo 

 

 

 

上图是一个Structured Streaming使用Complete output mode执行Wordcount的示例。

 

nc是指linux的netcat指令,input数据通过socket传入。在时间点1,系统接收到4条数据作为一个batch进行计算,产生了结果(cat, 1)(dog, 3)。在时间点2的时候又来了两条数据,这两条数据会成为一个新的batch进行计算,新的计算结果会加到最终的结果里(cat, 2)(dog,3)(owl,1)。以此类推,每来一批新数据,将这批数据作为一个batch进行计算,然后对结果进行更新。Structured Streaming就这样源源不断的将输入数据切分为一个一个小的batch,然后执行计算。

 端到端Exactly-once语

 

Structured Streaming的exactly-once语义要求数据从读取->计算->写出的过程,实现端到端的不重不丢。因此对各模块有如下要求:

 

1. Source可回溯且可回放。简单来说就是可以重复消费,常用Source主要是kafka,bigpipe(百度内部以c++实现的类似kafka的消息队列)

2. Execution engine记录checkpoint。引擎会在处理每个batch之前,先写WAL来记录当前batch要读哪些数据。如果发生failover,可以利用checkpoint对batch的数据进行重新计算。故Source + Execution engine做到了at-least once语义,即数据的不丢

3. 支持幂等写的sink,对重复数据去重,从而保证了任何情况failover的exactly-once语义

 

Checkpoint & WAL:

 

上图是一个Batch的计算流程, 以此来讲解Execution engine如果做到数据不丢:

 

1. batch刚开始时,调用getOffset, 获取该batch要处理的数据范围,即offsetRange

2. 将offsetRange [startOffset, endOffset)存储在OffsetLog中

3. 调用getBatch,利用offsetRange构建Dataset, 提交batch

4. Executor对batch进行计算,并将结果sink到下游系统

5. batch运行结束,由driver写commit log,标识该batch运行完成

 

因此一个batch从执行开始到结束会写两个log,一个offsetlog,一个commitlog。通过这个两个log可以保证任何Failover场景下的数据不丢。

 

Failover case分析:

 

Case 1中,因为OffsetLog中记录的最新batchId和CommitLog中记录的最新batchId相等,所以Failover后,引擎发现第75个batch已经成功运行结束,且没有batch需要重放,则从第76个batch开始继续执行。

 

 

Case 2中,OffsetLog中最新的batchId是85, 而CommitLog中记录的最新的batchId是84,两者不相等,说明作业failover发生在batch 85执行过程中,此时需要重新执行batch 85。

 

Sink幂等写入:

Source + Execution-engine保证了数据的不丢,如果在此基础上希望实现端到端的exactly once,就需要Sink支持幂等写入以支持数据的去重。

 

Doris Sink的设计与实现

 

由上文介绍可知,要实现端到端的exacly-once语义,需要下游系统支持对数据的去重,所以在设计Doris Sink时,就要考虑Doris对数据的去重功能。Doris有一个很明显的特点:它的写入是唯一的,即对同一Database,对同一个label的导入是唯一的,同一个label只能被导入一次,不可以被多次导入,即一个label对应唯一一批数据。因此我们可以利用该特性来进行Doris sink的去重设计。

 

 

 

 Label的生成逻辑 

1. 每个structured streaming作业启动时都必须指定一个checkpointlocation,且每个作业的checkpoint必须是唯一的,不能混用。

2. batch是顺序执行的,因此每个batch的id是顺序递增且唯一的。

3. 每个batch实际上是一个普通的spark job,其中的每个数据分片,可以通过paritionId的来标识。

 

因此,由3元组(checkpointLocation + batchId + paritionId)组成的label可以唯一的确定一个structured streaming作业中的一段数据。那么只要确保在failover前后同一段数据对应的label相同,即可以此来去重以实现exactly-once语义。

 

val replace = tmp.replaceAll("[-|/|:|.]", "_") +
        s"_${batchId}_${TaskContext.getPartitionId}"

    // shrink multiple underscores to one
    // e.g: label___test => label_test
    val builder = new StringBuilder
    for (index <- 0 to replace.length - 1) {
      if (index == 0) {
        builder += replace(0)
      } else if (replace(index - 1) != '_' || replace(index) != '_') {
        builder += replace(index)
      }
    }

    val resultStr = builder.toString
    val length = resultStr.length
    if (length > 128) {
      logWarning("palo label size larger than 128!, we truncate it!")
      resultStr.substring(length - 128, length)
    } else {
      resultStr
    }

 

以上是生成label代码的部分实现,我们会对特殊字符进行一些处理,且如果生成的label超过128个字符会截断,因为Doris 的label最多只支持128个字符。

 Doris sink结构图 

 

 

Spark本身提供了Sink接口,我们通过继承Sink接口来实现Doris sink组件。

 

这里重点关注DorisWriterTask,该task是executor中实际进行计算的task逻辑,它有3种实现,分布对应了Doris的3种Load模式(Bulk load, Broker load, Streaming load)。下面主要介绍Bulk load和Broker load。Streaming load将在近期实现。

 Doris Bulk Load Task 

 

 

1. 开始执行后,每个task会首先先将数据写入本地磁盘形成文件,文件则以上文提到的label来命名。

2. Task发送http请求,以bulk load的方式,向Doris发起load请求

3. Task轮询Doris,查询步骤2中的load是否结束

4. Finish load之后,删除步骤1中生成的本地文件

 

注意点:

1. 容错处理:每个task执行过程中会对部分异常进行处理并重试,重试4次后(可配置),如果仍旧失败,则整个batch重算

2. 上述过程中步骤3的轮询的意义在于我们需要确保一个batch的数据成功导入后才能开始执行下一个batch,所以我们一定要通过query load info的方法,确保label对应的数据被成功load。

3. bulk load的模式,适用于单个partition数据不超过1GB的导入。

 

Doris DFS Load Task 

 

 

1. 开始执行后,每个task会首先先将数据写入hdfs,文件则以上文提到的label来命名。

2. Task 想Doris发送broker load请求

3. Doris broker去hdfs load文件

4. Task轮询Doris,查询步骤2中的load是否结束。

5. Finish load之后,删除步骤1中生成的hdfs文件

 

注意点:

1. 容错处理:每个task执行过程中会对部分异常进行处理并重试,重试4次后(可配置),如果仍旧失败,则整个batch重算

2. 适用于单个partition数据超过1GB的导入。

 

Doris 社区 Pull Request

 

https://github.com/apache/incubator-doris/pull/1332

 

 

此次沙龙我们有幸邀请到了来自一点资讯、京东、搜狐、百度智能云的技术大牛带来他们的应用实践和开发分享。

 

其他嘉宾的分享会在近日放出,欢迎关注Apache Doris(incubating)官方公众号,后台回复“0629”即可获取各位嘉宾分享PPT及现场录像。


 

 

欢迎扫码关注:

 

Apache Doris(incubating)官方公众号

相关链接:

Apache Doris官方网站:

http://doris.incubator.apache.org

Apache Doris Github:

https://github.com/apache/incubator-doris

Apache Doris Wiki:

https://github.com/apache/incubator-doris/wiki

Apache Doris 开发者邮件组:

dev@doris.apache.org

收藏
点赞
0
个赞
共1条回复 最后由doubi渣渣回复于2021-04
#2doubi渣渣回复于2021-04

虽说一波爆发挺好的。。。但是估计得对下时间,这应该是去年的。。。

0
快速回复
TOP
切换版块