阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

一个SparkSQL作业的一生

144次阅读
没有评论

共计 6333 个字符,预计需要花费 16 分钟才能阅读完成。

Spark 是时下很火的计算框架,由 UC Berkeley AMP Lab 研发,并由原班人马创建的 Databricks 负责商业化相关事务。而 SparkSQL 则是 Spark 之上搭建的 SQL 解决方案,主打交互查询场景。

人人都说 Spark/SparkSQL 快,各种 Benchmark 满天飞,但是到底 Spark/SparkSQL 快么,或者快在哪里,似乎很少 有人说得清。因为 Spark 是基于内存的计算框架?因为 SparkSQL 有强大的优化器?本文将带你看一看一个 SparkSQL 作业到底是如何执行的,顺 便探讨一下 SparkSQL 和 Hive On MapReduce 比起来到底有何其别。

SQL On Hadoop 的解决方案已经玲琅满目了,不管是元祖级的 Hive,Cloudera 的 Impala,MapR 的 Drill,Presto,SparkSQL 甚至 Apache Tajo,IBM BigSQL 等等,各家公司都试图解决 SQL 交互场景的性能问题,因为原本的 Hive On MapReduce 实在太慢了。

那么 Hive On MapReduce 和 SparkSQL 或者其他交互引擎相比,慢在何处呢?让我们先看看一个 SQL On Hadoop 引擎到底如何工作的。

现在的 SQL On Hadoop 作业,前半段的工作原理都差不多,类似一个 Compiler,分来分去都是这基层。

小红是数据分析,她某天写了个 SQL 来统计一个分院系的加权均值分数汇总。

SELECT dept, avg(math_score * 1.2) + avg(eng_score * 0.8) FROM studentsGROUP BY dept;

其中 STUDENTS 表是学生分数表(请不要在意这个表似乎不符合范式,很多 Hadoop 上的数据都不符合范式,因为 Join 成本高,而且我写表 介绍也会很麻烦)。她通过网易大数据的猛犸系统提交了这个查询到某个 SQL On Hadoop 平台执行,然后她放下工作,切到视频网页看一会《琅琊榜》。

在她看视频的时候,我们的 SQL 平台可是有很努力的工作滴。

首先是查询解析。

这里和很多 Compiler 类似,你需要一个 Parser(就是著名的程序员约架专用项目),Parser(确切说是 Lexer 加 Parser)的作用是把一个字符串流变成一个一个 Token,再根据语法定义生成一棵抽象语法树 AST。这里不详细展开,童鞋们可以参考编译原理。比较 多的项目会选 ANTLR(Hive 啦,Presto 啦等等),你可以用类似 BNF 的范式来写 Parser 规则,当然也有手写的比如 SparkSQL。AST 会进一步包装成一个简单的基本查询信息对象,这个对象包含了一个查询基本的信息,比如基本语句的类型是 SELECT 还是 INSERT,WHERE 是 什么,GROUP BY 是什么,如果有子查询,还需要递归进去,这个东西大致来说就是所谓的逻辑计划。

TableScan(students)

-> Project(dept, avg(math_score * 1.2) + avg(eng_score * 0.8))

->TableSink

上面是无责任示意,具体到某个 SQL 引擎会略有不同,但是基本上都会这么干。如果你想找一个代码干净易懂的 SQL 引擎,可以参考 Presto(可以算我读过的开源代码写的最漂亮的了)。

到上面为止,你已经把字符串转换成一个所谓的 LogicalPlan,这个 Plan 距离可以求值来说还比较残疾。最基本来说,我还不知道 dept 是个啥吧,math_score 是神马类型,AVG 是个什么函数,这些都不明了。这样的 LogicalPlan 可以称为 Unresolved(残疾 的)Logical Plan。

缺少的是所谓的元数据信息,这里主要包含两部分:表的 Schema 和函数信息。表的 Schema 信息主要包含表的列定义(名字,类型),表的物理位置,格式,如何读取;函数信息是函数签名,类的位置等。

有了这些,SQL 引擎需要再一次遍历刚才的残废计划,进行一次深入的解析。最重要的处理是列引用绑定和函数绑定。列引用绑定决定了一个表达式的 类型。而有了类型你可以做函数绑定。函数绑定几乎是这里最关键的步骤,因为普通函数比如 CAST,和聚合函数比如这里的 AVG,分析函数比如 Rank 以及 Table Function 比如 explode 都会用完全不同的方式求值,他们会被改写成独立的计划节点,而不再是普通的 Expression 节点。除此之外,还需 要进行深入的语义检测。比如 GROUP BY 是否囊括了所有的非聚合列,聚合函数是否内嵌了聚合函数,以及最基本的类型兼容检查,对于强类型的系统,类型不一致比如 date =‘2015-01-01’需要报错,对于弱类型的系统,你可以添加 CAST 来做 Type(类型)Coerce(苟合)。

然后我们得到了一个尚未优化的逻辑计划:

TableScan(students=>dept:String, eng_score:double, math_score:double)

->Project(dept, math_score * 1.2:expr1, eng_score * 0.8:expr2)

->Aggregate(avg(expr1):expr3, avg(expr2):expr4, GROUP:dept)

->Project(dept, expr3+expr4:avg_result)

->TableSink(dept, avg_result->Client)

所以我们可以开始上肉戏了?还早呢。刚才的计划,还差得很远,作为一个 SQL 引擎,没有优化怎么好见人?不管是 SparkSQL 还是 Hive,都 有一套优化器。大多数 SQL on Hadoop 引擎都有基于规则的优化,少数复杂的引擎比如 Hive,拥有基于代价的优化。规则优化很容易实现,比如经典的谓词下推,可以把 Join 查询的 过滤条件推送到子查询预先计算,这样 JOIN 时需要计算的数据就会减少(JOIN 是最重的几个操作之一,能用越少的数据做 JOIN 就会越快),又比如一些 求值优化,像去掉求值结果为常量的表达式等等。基于代价的优化就复杂多了,比如根据 JOIN 代价来调整 JOIN 顺序(最经典的场景),对 SparkSQL 来说,代价优化是最简单的根据表大小来选择 JOIN 策略(小表可以用广播分发),而没有 JOIN 顺序交换这些,而 JOIN 策略选择则是在随后要解释的物理 执行计划生成阶段。

到这里,如果还没报错,那你就幸运滴得到了一个 Resolved(不残废的)Logical Plan 了。这个 Plan,再配上表达式求值器,你也可以折腾折腾在单机对表查询求值了。但是,我们不是做分布式系统的么?数据分析妹子已经看完《琅琊 榜》的片头了,你还在悠闲什么呢?

为了让妹子在看完电视剧之前算完几百 G 的数据,我们必须借助分布式的威力,毕竟单节点算的话够妹子看完整个琅琊榜剧集了。刚才生成的逻辑计划,之 所以称为逻辑计划,是因为它只是逻辑上看起来似乎能执行了(误),实际上我们并不知道具体这个东西怎么对应 Spark 或者 MapReduce 任务。

逻辑执行计划接下来需要转换成具体可以在分布式情况下执行的物理计划,你还缺少:怎么和引擎对接,怎么做表达式求值两个部分。

表达式求值有两种基本策略,一个是解释执行,直接把之前带来的表达式进行解释执行,这个是 Hive 现在的模式;另一个是代码生成,包括 SparkSQL,Impala,Drill 等等号称新一代的引擎都是代码生成模式的(并且配合高速编译器)。不管是什么模式,你最终把表达式求值部分封 装成了类。代码可能长得类似如下:

// math_score * 1.2

val leftOp = row.get(1/* math_score column index */);

val result = if (leftOp == null) then null else leftOp * 1.2;

每个独立的 SELECT 项目都会生成这样一段表达式求值代码或者封装过的求值器。但是 AVG 怎么办?当初写 wordcount 的时候,我记得聚合计算需要分派在 Map 和 Reduce 两个阶段呀?这里就涉及到物理执行转换,涉及到分布式引擎的对接。

AVG 这样的聚合计算,加上 GROUP BY 的指示,告诉了底层的分布式引擎你需要怎么做聚合。本质上来说 AVG 聚合需要拆分成 Map 阶段来计算累加,还有条目个数,以及 Reduce 阶段二次累加最后每个组做除法。

因此我们要算的 AVG 其实会进一步拆分成两个计划节点:Aggregates(Partial)和 Aggregates(Final)。Partial 部分是我们计算局部累加的部分,每个 Mapper 节点都将执行,然后底层引擎会做一个 Shuffle,将相同 Key(在这里是 Dept)的 行分发到相同的 Reduce 节点。这样经过最终聚合你才能拿到最后结果。

拆完聚合函数,如果只是上面案例给的一步 SQL,那事情比较简单,如果还有多个子查询,那么你可能面临多次 Shuffle,对于 MapReduce 来说,每次 Shuffle 你需要一个 MapReduce Job 来支撑,因为 MapReduce 模型中,只有通过 Reduce 阶段才能做 Shuffle 操作,而对于 Spark 来说,Shuffle 可以随意摆放,不过你要根据 Shuffle 来拆分 Stage。这样拆过之后,你得到一个多个 MR Job 串起来的 DAG 或者一个 Spark 多个 Stage 的 DAG(有向无环图)。

还记得刚才的执行计划么?它最后变成了这样的物理执行计划:

TableScan->Project(dept, math_score * 1.2: expr1, eng_score * 0.8: expr2)

-> AggretatePartial(avg(expr1):avg1, avg(expr2):avg2, GROUP: dept)

-> ShuffleExchange(Row, KEY:dept)

-> AggregateFinal(avg1, avg2, GROUP:dept)

-> Project(dept, avg1 + avg2)

-> TableSink

这东西到底怎么在 MR 或者 Spark 中执行啊?对应 Shuffle 之前和之后,物理上它们将在不同批次的计算节点上执行。不管对应 MapReduce 引擎还是 Spark,它们分别是 Mapper 和 Reducer,中间隔了 Shuffle。上面的计划,会由 ShuffleExchange 中间断开,分别发送到 Mapper 和 Reducer 中执行,当然除了上面的部分还有之前提到的求值类,也都会一起序列化发 送。

实际在 MapReduce 模型中,你最终执行的是一个特殊的 Mapper 和特殊的 Reducer,它们分别在初始化阶段载入被序列化的 Plan 和求值器信息,然后在 map 和 reduce 函数中依次对每个输入求值;而在 Spark 中,你生成的是一个一个 RDD 变换操作。

比如一个 Project 操作,对于 MapReduce 来说,伪代码大概是这样的:

void configuration() {

context = loadContext()

}

void map(inputRow) {

outputRow = context.projectEvaluator (inputRow);

write(outputRow);

}

对于 Spark,大概就是这样:

currentPlan.mapPartitions {iter =>

projection = loadContext()

iter.map {row => projection(row) } }

至此为止,引擎帮你愉快滴提交了 Job,你的集群开始不紧不慢地计算了。

到这里为止,似乎看起来 SparkSQL 和 Hive On MapReduce 没有什么区别?其实 SparkSQL 快,并不快在引擎。SparkSQL 的引擎优化,并没有 Hive 复杂,毕竟人 Hive 多年积累,十多年下来也不是吃素的。但是 Spark 本身快呀。

Spark 标榜自己比 MapReduce 快几倍几十倍,很多人以为这是因为 Spark 是“基于内存的计算引擎”,其实这不是真的。Spark 还是 要落磁盘的,Shuffle 的过程需要也会将中间数据吐到本地磁盘上。所以说 Spark 是基于内存计算的说法,不考虑手动 Cache 的情景,是不正确的。

SparkSQL 的快,根本不是刚才说的那一坨东西哪儿比 Hive On MR 快了,而是 Spark 引��本身快了。

事实上,不管是 SparkSQL,Impala 还是 Presto 等等,这些标榜第二代的 SQL On Hadoop 引擎,都至少做了三个改进,消除了冗余的 HDFS 读写,冗余的 MapReduce 阶段,节省了 JVM 启动时间。

在 MapReduce 模型下,需要 Shuffle 的操作,就必须接入一个完整的 MapReduce 操作,而接入一个 MR 操作,就必须将前阶段的 MR 结果写入 HDFS,并且在 Map 阶段重新读出来,这才是万恶之源。

事实上,如果只是上面的 SQL 查询,不管用 MapReduce 还是 Spark,都不一定会有显著的差异,因为它只经过了一个 shuffle 阶段。

真正体现差异的,是这样的查询:

SELECT g1.name, g1.avg, g2.cnt

FROM (SELECT name, avg(id) AS avg FROM students GROUP BY name) g1

JOIN (SELECT name, count(id) AS cnt FROM students GROUP BY name) g2

ON (g1.name = g2.name)

ORDER BY avg;

而他们所对应的 MR 任务和 Spark 任务分别是这样的:

一个 SparkSQL 作业的一生

一次 HDFS 中间数据写入,其实会因为 Replication 的常数扩张为三倍写入,而磁盘读写是非常耗时的。这才是 Spark 速度的主要来源。另一个加速,来自于 JVM 重用。考虑一个上万 Task 的 Hive 任务,如果用 MapReduce 执行,每个 Task 都会启动一次 JVM,而每次 JVM 启动 时间可能就是几秒到十几秒,而一个短 Task 的计算本身可能也就是几秒到十几秒,当 MR 的 Hive 任务启动完成,Spark 的任务已经计算结束了。对于短 Task 多的情形下,这是很大的节省。

更多 Spark 相关教程见以下内容

CentOS 7.0 下安装并配置 Spark  http://www.linuxidc.com/Linux/2015-08/122284.htm

Spark1.0.0 部署指南 http://www.linuxidc.com/Linux/2014-07/104304.htm

CentOS 6.2(64 位)下安装 Spark0.8.0 详细记录 http://www.linuxidc.com/Linux/2014-06/102583.htm

Spark 简介及其在 Ubuntu 下的安装使用 http://www.linuxidc.com/Linux/2013-08/88606.htm

安装 Spark 集群(在 CentOS 上) http://www.linuxidc.com/Linux/2013-08/88599.htm

Hadoop vs Spark 性能对比 http://www.linuxidc.com/Linux/2013-08/88597.htm

Spark 安装与学习 http://www.linuxidc.com/Linux/2013-08/88596.htm

Spark 并行计算模型 http://www.linuxidc.com/Linux/2012-12/76490.htm

Spark 的详细介绍:请点这里
Spark 的下载地址:请点这里

本文永久更新链接地址:http://www.linuxidc.com/Linux/2015-12/126000.htm

正文完
星哥说事-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-21发表,共计6333字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中