Skip to main content
版本:1.5.0

Spark

本文主要介绍在 Linkis 中, Spark 引擎插件的安装、使用和配置。

1. 前置工作

1.1 引擎安装

如果您希望在您的服务器上使用 Spark 引擎,您需要保证以下的环境变量已经设置正确并且引擎的启动用户是有这些环境变量的。

强烈建议您在执行 Spark 任务之前,检查下执行用户的这些环境变量。

环境变量名环境变量内容备注
JAVA_HOMEJDK安装路径必须
HADOOP_HOMEHadoop安装路径必须
HADOOP_CONF_DIRHadoop配置路径必须
HIVE_CONF_DIRHive配置路径必须
SPARK_HOMESpark安装路径必须
SPARK_CONF_DIRSpark配置路径必须
pythonpython建议使用anaconda的python作为默认python

1.2 环境验证

通过 pyspark 验证 Spark 是否安装成功

pyspark

#进入pyspark虚拟环境后,出现spark的logo则说明环境安装成功
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 3.2.1
/_/

Using Python version 2.7.13 (default, Sep 30 2017 18:12:43)
SparkSession available as 'spark'.

2. 引擎插件安装 默认引擎

Linkis 发布的二进制安装包中默认包含了 Spark 引擎插件,用户无需额外安装。

理论上 Linkis 支持的 Spark2.x 以上的所有版本。默认支持的版本为 Spark3.2.1 。如果您想使用其他的 Spark 版本,如 Spark2.1.0 ,则您仅仅需要将插件 Spark 的版本进行修改,然后进行编译即可。具体的,您可以找到 linkis-engineplugin-spark 模块,将 maven 依赖中 <spark.version> 标签的值改成 2.1.0 ,然后单独编译此模块即可。

EngineConnPlugin引擎插件安装

3. 引擎的使用

3.1 通过 Linkis-cli 提交任务

# codeType对应关系 py-->pyspark  sql-->sparkSQL scala-->Spark scala
sh ./bin/linkis-cli -engineType spark-3.2.1 -codeType sql -code "show databases" -submitUser hadoop -proxyUser hadoop

# 可以在提交参数通过-confMap wds.linkis.yarnqueue=dws 来指定yarn 队列
sh ./bin/linkis-cli -engineType spark-3.2.1 -codeType sql -confMap wds.linkis.yarnqueue=dws -code "show databases" -submitUser hadoop -proxyUser hadoop

更多 Linkis-Cli 命令参数参考: Linkis-Cli 使用

3.2 通过 Linkis SDK 提交任务

Linkis 提供了 JavaScalaSDKLinkis 服务端提交任务。具体可以参考 JAVA SDK Manual。对于 Spark 任务你只需要修改 Demo 中的 EngineConnTypeCodeType 参数即可:

Map<String, Object> labels = new HashMap<String, Object>();
labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "spark-3.2.1"); // required engineType Label
labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, "hadoop-IDE");// required execute user and creator
labels.put(LabelKeyConstant.CODE_TYPE_KEY, "sql"); // required codeType py,sql,scala

Spark还支持提交Scala代码和Pyspark代码:


//scala
labels.put(LabelKeyConstant.CODE_TYPE_KEY, "scala");
code:
val df=spark.sql("show tables")
show(df)
//pyspark
/labels.put(LabelKeyConstant.CODE_TYPE_KEY, "py");
code:
df=spark.sql("show tables")
show(df)

3.3 通过提交jar包执行任务

通过 OnceEngineConn 提交任务(通过 spark-submit 提交 jar 包执行任务),提交方式参考 org.apache.linkis.computation.client.SparkOnceJobTest

public class SparkOnceJobTest {

public static void main(String[] args) {

LinkisJobClient.config().setDefaultServerUrl("http://127.0.0.1:9001");

String submitUser = "linkis";
String engineType = "spark";

SubmittableSimpleOnceJob onceJob =
// region
LinkisJobClient.once().simple().builder()
.setCreateService("Spark-Test")
.setMaxSubmitTime(300000)
.setDescription("SparkTestDescription")
.addExecuteUser(submitUser)
.addJobContent("runType", "jar")
.addJobContent("spark.app.main.class", "org.apache.spark.examples.JavaWordCount")
// 提交的jar包获取的参数
.addJobContent("spark.app.args", "hdfs:///tmp/test_word_count.txt") // WordCount 测试文件
.addLabel("engineType", engineType + "-2.4.7")
.addLabel("userCreator", submitUser + "-IDE")
.addLabel("engineConnMode", "once")
.addStartupParam("spark.app.name", "spark-submit-jar-test-linkis") // yarn上展示的 Application Name
.addStartupParam("spark.executor.memory", "1g")
.addStartupParam("spark.driver.memory", "1g")
.addStartupParam("spark.executor.cores", "1")
.addStartupParam("spark.executor.instance", "1")
.addStartupParam("spark.app.resource", "hdfs:///tmp/spark/spark-examples_2.11-2.3.0.2.6.5.0-292.jar")
.addSource("jobName", "OnceJobTest")
.build();
// endregion
onceJob.submit();
onceJob.waitForCompleted(); // 网络临时不通会导致异常,建议后期修改 SDK,现阶段使用,需要做异常处理
// 网络临时不通会导致异常,建议后期修改 SDK,现阶段使用,需要做异常处理
onceJob.waitForCompleted();
}
}

3.4 通过 Restful API 提交任务

运行脚本类型包括 sqlscalapythondata_calc(格式为json)

任务提交执行Restful API文档

POST /api/rest_j/v1/entrance/submit
Content-Type: application/json
Token-Code: dss-AUTH
Token-User: linkis

{
"executionContent": {
// 脚本内容,可以是sql,python,scala,json
"code": "show databases",
// 运行的脚本类型 sql, py(pyspark), scala, data_calc
"runType": "sql"
},
"params": {
"variable": {
},
"configuration": {
// spark 启动参数,非必填
"startup": {
"spark.executor.memory": "1g",
"spark.driver.memory": "1g",
"spark.executor.cores": "1",
"spark.executor.instances": 1
}
}
},
"source": {
// 非必填,file:/// 或者 hdfs:///
"scriptPath": "file:///tmp/hadoop/test.sql"
},
"labels": {
// 格式为:引擎类型-版本
"engineType": "spark-3.2.1",
// userCreator: linkis 为用户名。IDE 是系统名,在 Linkis 后台管理。
"userCreator": "linkis-IDE"
}
}

3.5 通过 Linkis-cli 提交spark yarn cluster任务

上传jar包和配置

# 上传linkis spark引擎的lib下的jar包 (根据您的实际安装目录修改以下参数)
cd /appcom/Install/linkis/lib/linkis-engineconn-plugins/spark/dist/3.2.1/lib
hdfs dfs -put *.jar hdfs:///spark/cluster

# 上传linkis 配置文件 (根据您的实际安装目录修改以下参数)
cd /appcom/Install/linkis/conf
hdfs dfs -put * hdfs:///spark/cluster

# 上传hive-site.xml (根据您的实际安装目录修改以下参数)
cd $HIVE_CONF_DIR
hdfs dfs -put hive-site.xml hdfs:///spark/cluster

可以通过linkis.spark.yarn.cluster.jars参数来修改hdfs:///spark/cluster

执行测试用例

# 使用 `engingeConnRuntimeMode=yarnCluster` 来指定yarn cluster模式
sh ./bin/linkis-cli -engineType spark-3.2.1 -codeType sql -labelMap engingeConnRuntimeMode=yarnCluster -submitUser hadoop -proxyUser hadoop -code "select 123"

4.引擎配置说明

4.1 默认配置说明

配置默认值是否必须说明
wds.linkis.rm.instance10引擎最大并发数
spark.executor.cores1spark执行器核心个数
spark.driver.memory1gspark执行器实例最大并发数
spark.executor.memory1gspark执行器内存大小
wds.linkis.engineconn.max.free.time1h引擎空闲退出时间
spark.python.versionpython2python版本

4.2 队列资源配置

因为 Spark 任务的执行需要队列的资源,须要设置自己能够执行的队列。

yarn

4.3 配置修改

如果默认参数不满足时,有如下几中方式可以进行一些基础参数配置

4.3.1 管理台配置

用户可以进行自定义的设置,比如 Spark 会话 executor 个数和 executor 的内存。这些参数是为了用户能够更加自由地设置自己的 spark 的参数,另外 Spark 其他参数也可以进行修改,比如的 pysparkpython 版本等。 spark

注意: 修改 IDE 标签下的配置后需要指定 -creator IDE 才会生效(其它标签类似),如:

sh ./bin/linkis-cli -creator IDE \
-engineType spark-3.2.1 -codeType sql \
-code "show databases" \
-submitUser hadoop -proxyUser hadoop

4.3.2 任务接口配置

提交任务接口,通过参数 params.configuration.runtime 进行配置

http 请求参数示例 
{
"executionContent": {"code": "show databases;", "runType": "sql"},
"params": {
"variable": {},
"configuration": {
"runtime": {
"wds.linkis.rm.instance":"10"
}
}
},
"labels": {
"engineType": "spark-3.2.1",
"userCreator": "hadoop-IDE"
}
}

4.4 引擎相关数据表

Linkis 是通过引擎标签来进行管理的,所涉及的数据表信息如下所示。

linkis_ps_configuration_config_key:  插入引擎的配置参数的key和默认values
linkis_cg_manager_label:插入引擎label如:spark-3.2.1
linkis_ps_configuration_category: 插入引擎的目录关联关系
linkis_ps_configuration_config_value: 插入引擎需要展示的配置
linkis_ps_configuration_key_engine_relation:配置项和引擎的关联关系

表中与引擎相关的初始数据如下

-- set variable
SET @SPARK_LABEL="spark-3.2.1";
SET @SPARK_ALL=CONCAT('*-*,',@SPARK_LABEL);
SET @SPARK_IDE=CONCAT('*-IDE,',@SPARK_LABEL);

-- engine label
insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType', @SPARK_ALL, 'OPTIONAL', 2, now(), now());
insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType', @SPARK_IDE, 'OPTIONAL', 2, now(), now());

select @label_id := id from linkis_cg_manager_label where `label_value` = @SPARK_IDE;
insert into linkis_ps_configuration_category (`label_id`, `level`) VALUES (@label_id, 2);

-- configuration key
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.rm.instance', '范围:1-20,单位:个', 'spark引擎最大并发数', '10', 'NumInterval', '[1,20]', '0', '0', '1', '队列资源', 'spark');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.executor.instances', '取值范围:1-40,单位:个', 'spark执行器实例最大并发数', '1', 'NumInterval', '[1,40]', '0', '0', '2', 'spark资源设置', 'spark');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.executor.cores', '取值范围:1-8,单位:个', 'spark执行器核心个数', '1', 'NumInterval', '[1,8]', '0', '0', '1','spark资源设置', 'spark');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.executor.memory', '取值范围:1-15,单位:G', 'spark执行器内存大小', '1g', 'Regex', '^([1-9]|1[0-5])(G|g)$', '0', '0', '3', 'spark资源设置', 'spark');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.driver.cores', '取值范围:只能取1,单位:个', 'spark驱动器核心个数', '1', 'NumInterval', '[1,1]', '0', '1', '1', 'spark资源设置','spark');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.driver.memory', '取值范围:1-15,单位:G', 'spark驱动器内存大小','1g', 'Regex', '^([1-9]|1[0-5])(G|g)$', '0', '0', '1', 'spark资源设置', 'spark');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.engineconn.max.free.time', '取值范围:3m,15m,30m,1h,2h', '引擎空闲退出时间','1h', 'OFT', '[\"1h\",\"2h\",\"30m\",\"15m\",\"3m\"]', '0', '0', '1', 'spark引擎设置', 'spark');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.tispark.pd.addresses', NULL, NULL, 'pd0:2379', 'None', NULL, '0', '0', '1', 'tidb设置', 'spark');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.tispark.tidb.addr', NULL, NULL, 'tidb', 'None', NULL, '0', '0', '1', 'tidb设置', 'spark');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.tispark.tidb.password', NULL, NULL, NULL, 'None', NULL, '0', '0', '1', 'tidb设置', 'spark');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.tispark.tidb.port', NULL, NULL, '4000', 'None', NULL, '0', '0', '1', 'tidb设置', 'spark');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.tispark.tidb.user', NULL, NULL, 'root', 'None', NULL, '0', '0', '1', 'tidb设置', 'spark');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('spark.python.version', '取值范围:python2,python3', 'python版本','python2', 'OFT', '[\"python3\",\"python2\"]', '0', '0', '1', 'spark引擎设置', 'spark');

-- key engine relation
insert into `linkis_ps_configuration_key_engine_relation` (`config_key_id`, `engine_type_label_id`)
(select config.id as `config_key_id`, label.id AS `engine_type_label_id` FROM linkis_ps_configuration_config_key config
INNER JOIN linkis_cg_manager_label label ON config.engine_conn_type = 'spark' and label.label_value = @SPARK_ALL);

-- engine default configuration
insert into `linkis_ps_configuration_config_value` (`config_key_id`, `config_value`, `config_label_id`)
(select `relation`.`config_key_id` AS `config_key_id`, '' AS `config_value`, `relation`.`engine_type_label_id` AS `config_label_id` FROM linkis_ps_configuration_key_engine_relation relation
INNER JOIN linkis_cg_manager_label label ON relation.engine_type_label_id = label.id AND label.label_value = @SPARK_ALL);