一点资讯 SparkSQL 查询引擎实践

ScottEleanore 发布于1年前
0 条问题

“本文分析SparkSQL ThriftServer工作原理,修改Spark SQL源代码并实现了SQL 查询进度的计算,最后展示了一点资讯基于Presto+SparkSQL+Hive的Web查询引擎

01

问题背景:SparkSQL ThriftServer 无法获取查询进度

目前公司的分布式Adhoc查询有以下几种:Presto,Hive,SparkSQL。公司外部开源系统里比较知名的有Redash,Zeppelin等查询工具。

以上查询工具均支持:Hive、SparkSQL、Mysql等查询数据源,SQL格式化,结果进行排序等。Redash还支持语法动态提示等功能,非常的方便。

然而很遗憾的是: 上述开源工具,无论是对接SparkSQL还是对接HiveServer2都有一个致命的问题就是—— 无法知晓查询的进度 ,甚至是像Presto这种本身带有进度的查询引擎,在接入Redash等系统后,也无法知晓当前查询的时间、进度、剩余时间等。

查询进度重要吗?

从人机交互的角度看,快查询进度条意义不大,但是Hive等MR查询 长时间的盲目等待是难以接受的(Hive命令行提供大致的MR进度展示)。 如MysqlWorkBench和Navicat Mysql等工具

  • Mysql查询大都是毫秒级别返回,不显示进度条。

  • 导入导出数据库时以上工具都有进度条,即长时间作业需要进度条。

我们的目标是 :深入分析SparkSQL ThriftServer的工作过程,并通过修改源代码等方式来获取执行的状态,让Presto查询、Hive查询、SparkSQL查询均能感知到进度。本文重点介绍了SparkSQL查询进度的实现。

02

SparkSQL ThriftServer工作原理剖析

一点资讯 SparkSQL 查询引擎实践

图1: SparkSQL ThriftServer工作的5个步骤

从上图可知,SparkSQL ThriftServer的启动过程分 5个步骤。 在步骤3 ExecuteStatement时,SparkSQL 为每一个查询创建Spark的Job,并随机生成UUID。

一点大数据团队修改SparkSQL源代码实现定制化的JobID生成,并跟踪每个SparkJob的运行过程实现SQL执行进度的计算。

1:SparkSQL初始化

一点资讯 SparkSQL 查询引擎实践

图2: SparkSQL ThriftServer 初始化

如上图所知,SparkSQL ThriftServer初始化包含以下几个步骤:

  1. Java Main函数启动,初始化HiveServer2,创建 ThriftBinaryCLIService

  2. 创建Hive模块内的 ThriftBinaryCLIService 并传入 SparkSQL模块内的SparkSqlCliService

  3. 创建SparkSQLSessionManager 通过反射方式将CliService中的SessionManager设置为自身

    1. 使用反射是 因为Hive代码中的CLiService的SessonManager为 Private方法

  4. Thrift BinaryService启动,并根据配置的ThriftSever端口、地址等信息创建TThreadPoolServer。

2:建立链接OpenSession:

客户端(Beeline CLI) 向服务器ThriftBinaryServer发送OpenSession的Thrift请求:

  1. SparkSQLCLIService 调用初始化过程中创建的SessionManager 创建Session

  2. 此时根据配置的不同为Thrift的会话绑定一个SparkSession

  3. 每个Session设置自己的SessionManager和OperationManager

    1. SessonManager用于管理整个Hive的查询周期,比如建立链接、关闭链接等。

    2. OperationManager 完成具体任务:用于执行具体的任务逻辑比如查看表信息、查看元数据

  4. SparkSQL重写OperationManager的 newExecuteStatementOperation()方法,  转换为Spark作业

3:发起SQL查询ExecuteStatement(重点)

一点资讯 SparkSQL 查询引擎实践

图3: Beeline 客户端发起查询SQL请求到 SparkSQL Server

  1. 客户端向服务器ThriftBinaryServer发送ExecuteStatement的Thrift请求,服务器接到请求后获取当且Thrift会话中的 HiveSession ,并调用 ExecuteStatement 方法。

  2. 继续调用 executeStatementInternal 函数。此方法调用 步骤2:建立链 接OpenSession : 中初始化好的 OperationManager执行 newExecuteStatementOperation ()

  3. HiveSession继续调用重写的 runInternal ()方法,将Hive请求进一步调用 execute() 来完成Spark的计算逻辑。

  4. Spark收到查询请求后,在 SparkExecuteStatementOperation 中为每一个查询语句随机生成UUID作为JobId:

    1. 此处是关键点即Spark通过UUID来识别每一个的Job,并在Spark UI显示

    2. 而UUID和SQL语句间没有直接的关联

  5. Spark执行SparkContext.Sql() 函数直接计算获取结果的DataFrame,结果计算完成将statement标记为完成状态。返回给客户端 OperationHandle 句柄作为查询结果的依据。

4: 客户端获取结果FetchResults

客户端检查Opeartion的状态,当发现是FINISHED状态时候,请求结果的元数据Meta信息和结果内容信息,分别由客户端的  FetchResults 请求和  GetResultSetMetadata 请求

  1. Spark 将DataFrame的每一行转换为Thrift结果的Row

  2. Spark 将DataFrame的 DataFrame的行列信息转换为 Hive的元数据信息。

5:关闭并释放链接

03

设计原理与代码修改

步骤1: 一点查询客户端根据SQL生成UUID并保存对应关系:

  • 我们根据Hive JDBC接口,封装了自己的查询客户端,并生成唯一的查询ID

  • QueryID 样例: 

    • 20181014_101745_57dbf79bc1e1f27f82911b00b91ddcde

  • 客户端通过 jdbc:hive2://IP:Port/dw? shark.sparksql.queryid=$QueryID  这样的连接来访问后端的ThriftServer。

步骤2: 服务器接受SQL和QueryID,将Spark的JobID设置为QueryID:

一点资讯 SparkSQL 查询引擎实践

图4: 如何实现HiveJDBC客户端传递UUID到服务器端

  1. Spark ThriftServer 建立连接请求的时候,根据Url中的参数信息,HiveSession在建立链接的时候解析 JDBC:hive2 url参数 并且将参数 “shark.sparksql.queryid“ , 以Key-Value 的形式存放于 HiveConf

  2. 在OperationManager中,当创建ExecuteStatementOperation时候,读取所有HiveConf中的配置信息,写入到SparkContxt的配置中。

  3. 在 执行 ExecuteStatementOperation 时,取上下文  “shark.sparksql.quer yid" 对应的值,并将其设置为JobID, 此ID在SparkUI中可以查询到

    1. statementId=sqlContext.getConf("shark.sparksql.queryid", statementId)

修改后的SparkSQL执行页面中JobID已经生效

步骤3: 客户端读取 Spark RESTful API 获取查询进度:

我们采用Spark的Restful API来获取每一个查询ID的进展情况

Spark RESTful API的访问地址为 

http://HOST:4041/api/v1/applications 

  • Step1:查询当前Spark的ApplicationID:http://HOST:4041/api/v1/applications 

  • Step2:查询当前Spark的所有Job列表:http://HOST:4041/api/v1/applications/application_id/jobs application_id为 Step_1中 取到的application_id

  • Step3: 根据QueryID,可以获取到当前QueryID下所有任务数(numTasks),已经完成的任务数(numCompletedTasks),跳过的Task数(numSkippedTasks)

    • 计算公式为:完成任务数+跳过任务数)/(总任务数)

一点资讯 SparkSQL 查询引擎实践

图6: Spark 的Restful API获取任务状态

步骤4:效果展示

通过以上对SparkSQL的改进,我们的Shark 一点大数据查询系统可以支持SparkSQL的进度查询。

一点资讯 SparkSQL 查询引擎实践

图7:  蓝色为SparkSQL 查询进度条 当前进度9%

另外我们通过SQL语法分析工具、LeaderLatch(ZK Leader选择)等技术实现了SQL格式化、高可用的Presto等,使得我们的Shark查询引擎达到了生产系统的要求。

一点资讯 SparkSQL 查询引擎实践

图8:  一点大数据查询系统与Redash、Zeppelin对比分析

04

未来规划

1: SparkSQL ThriftServer的分布式化:

  1. 智能的客户端根据查询类型、数据分区大小提交至不同的SparkSQL ThriftServer。

  2. 对数据取样、小查询等建立单独的SparkSQL查询后端服务,提高查询的响应速度对大数据量、高内存占用查询分配大内存的Executor保障查询的稳定性。

  3. 统一的Web客户端维护查询ID和后端SparkThriftServer的关系,支持失败重试、同时提交等功能。

2 : 用户无感知的 智能查询引擎选择

  1. 综合利用Presto、SparkSQL、Hive等查询系统。短查询优先Presto,长查询优先Hive

  2. 统一查询语法,消除Presto语法和Hive语法直接的不同。

  3. Presto和Hive的UDF不同,需要建立统一的UDF管理系统实现语法和代码的统一管理。

查看原文: 一点资讯 SparkSQL 查询引擎实践

  • organicbird
  • orangesnake
  • ticklishmouse
  • tinycat
  • tinyswan
  • lazymouse
  • crazyswan
需要 登录 后回复方可回复, 如果你还没有账号你可以 注册 一个帐号。