Skip to main content

· 11 分钟阅读
livi12138

概述

团队有需求要在页面上同时使用sql和python语法对数据进行分析,在调研过程中发现linkis可以满足需要,遂将其引入内网,由于使用的是华为MRS,与开源的软件有所不同, 又进行了二次开发适配,本文将分享使用经验,希望对有需要的同学有所帮助。

环境以及版本

  • jdk-1.8.0_112 , maven-3.5.2
  • hadoop-3.1.1,Spark-3.1.1,Hive-3.1.0,zookerper-3.5.9 (华为MRS版本)
  • linkis-1.3.0
  • scriptis-web 1.1.0

依赖调整以及打包

首先从linkis官网上下载1.3.0的源码,然后调整依赖版本

linkis最外层调整pom文件

<hadoop.version>3.1.1</hadoop.version>
<zookerper.version>3.5.9</zookerper.version>
<curaor.version>4.2.0</curaor.version>
<guava.version>30.0-jre</guava.version>
<json4s.version>3.7.0-M5</json4s.version>
<scala.version>2.12.15</scala.version>
<scala.binary.version>2.12</scala.binary.version>

linkis-engineplugin-hive的pom文件

<hive.version>3.1.2</hive.version>

linkis-engineplugin-spark的pom文件

<spark.version>3.1.1</spark.version>

linkis-hadoop-common的pom文件

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId> <!-- 只需要将该行替换即可,替换为 <artifactId>hadoop-hdfs-client</artifactId>-->
<version>${hadoop.version}</version>
</dependency>
将hadoop-hdfs修改为:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>${hadoop.version}</version>
</dependency>

linkis-label-common

org.apache.linkis.manager.label.conf.LabelCommonConfig 修改默认版本,便于后续的自编译调度组件使用

    public static final CommonVars<String> SPARK_ENGINE_VERSION =
CommonVars.apply("wds.linkis.spark.engine.version", "3.1.1");

public static final CommonVars<String> HIVE_ENGINE_VERSION =
CommonVars.apply("wds.linkis.hive.engine.version", "3.1.2");

linkis-computation-governance-common

org.apache.linkis.governance.common.conf.GovernanceCommonConf 修改默认版本,便于后续的自编译调度组件使用

  val SPARK_ENGINE_VERSION = CommonVars("wds.linkis.spark.engine.version", "3.1.1")

val HIVE_ENGINE_VERSION = CommonVars("wds.linkis.hive.engine.version", "3.1.2")

编译

在以上配置都调整好之后,可以开始全量编译,依次执行以下命令

    cd linkis-x.x.x
mvn -N install
mvn clean install -DskipTests

编译错误

  • 如果你进行编译的时候,出现了错误,尝试单独进入到一个模块中进行编译,看是否有错误,根据具体的错误来进行调整
  • 由于linkis中使用了scala语言进行代码编写,建议可以先在配置scala环境,便于阅读源码
  • jar包冲突是最常见的问题,特别是升级了hadoop之后,请耐心调整依赖版本

DataSphereStudio的pom文件

由于我们升级了scala的版本,在部署时会报错,engineplugin启动失败,dss-gateway-support-1.1.0 conn to bml now exit java.net.socketException:Connection reset,这里需要修改scala版本,重新编译。 1.删除掉低版本的 dss-gateway-support jar包, 2.将DSS1.1.0中的scala版本修改为2.12,重新编译,获得新的dss-gateway-support-1.1.0.jar,替换linkis_installhome/lib/linkis-spring-cloud-service/linkis-mg-gateway中原有的jar包

<!-- scala 环境一致 -->
<scala.version>2.12.15</scala.version>

按照上面的依赖版本调整,就能解决大部分问题,如果还有问题则需要对应日志仔细调整。 如果能编译出完整的包,则代表linkis全量编译完成,可以进行部署。

部署

  • 为了让引擎节点有足够的资源执行脚本,我们采用了多服务器部署,大致部署结构如下
  • SLB 1台 负载均衡为轮询
  • ECS-WEB 2台 nginx,静态资源部署,后台代理转发
  • ECS-APP 2台 微服务治理,计算治理,公共增强等节点部署
  • ECS-APP 4台 EngineConnManager节点部署

linkis部署

  • 虽然采用了多节点部署,但是我们并没有将代码剥离,还是把全量包放在服务器上,只是修改了启动脚本,使其只启动所需要的服务

参考官网单机部署示例:https://linkis.apache.org/zh-CN/docs/1.3.0/deployment/deploy-quick

linkis部署注意点

  • 1.部署用户: linkis核心进程的启动用户,同时此用户会默认作为管理员权限,部署过程中会生成对应的管理员登录密码,位于conf/linkis-mg-gateway.properties文件中 Linkis支持指定提交、执行的用户。linkis主要进程服务会通过sudo -u ${linkis-user} 切换到对应用户下,然后执行对应的引擎启动命令,所以引擎linkis-engine进程归属的用户是任务的执行者
  • 该用户默认为任务的提交和执行者,如果你想改为登录用户,需要修改 org.apache.linkis.entrance.restful.EntranceRestfulApi类下对应提交方法的代码 json.put(TaskConstant.EXECUTE_USER, ModuleUserUtils.getOperationUser(req)); json.put(TaskConstant.SUBMIT_USER, SecurityFilter.getLoginUsername(req)); 将以上设置提交用户和执行用户改为Scriptis页面登录用户
  • 2.sudo -u ${linkis-user}切换到对应用户下,如果使用登录用户,这个命令可能会失败,需要修改此处命令。
  • org.apache.linkis.ecm.server.operator.EngineConnYarnLogOperator.sudoCommands
private def sudoCommands(creator: String, command: String): Array[String] = {
Array(
"/bin/bash",
"-c",
"sudo su " + creator + " -c \"source ~/.bashrc 2>/dev/null; " + command + "\""
)
} 修改为
private def sudoCommands(creator: String, command: String): Array[String] = {
Array(
"/bin/bash",
"-c",
"\"source ~/.bashrc 2>/dev/null; " + command + "\""
)
}
  • 3.Mysql的驱动包一定要copy到/lib/linkis-commons/public-module/和/lib/linkis-spring-cloud-services/linkis-mg-gateway/

  • 4.默认是使用静态用户和密码,静态用户即部署用户,静态密码会在执行部署是随机生成一个密码串,存储于${LINKIS_HOME}/conf/linkis-mg-gateway.properties

  • 5 数据库脚本执行,linkis本身需要用到数据库,但是我们再执行linkis1.3.0版本的插入数据的脚本时,发现了报错,我们当时时直接删掉了报错部分的数据

  • 6 Yarn的认证,执行spark任务时会将任务提交到队列上去,会首先获取队列的资源信息,进行判断是否有资源可以提交,这里需要配置是否开启kerberos模式认证和是否使用keytab文件 进行认证,如果开启了文件认证需要将文件放入到服务器对应目录,并且在linkis_cg_rm_external_resource_provider库表中更新信息。

安装web前端

  • web端是使用nginx作为静态资源服务器的,直接下载前端安装包并解压,将其放在nginx服务器对应的目录即可

Scriptis工具安装

  • scriptis 是一个纯前端的项目,作为一个组件集成在DSS的web代码组件中,我们只需要将DSSweb项目进行单独的scriptis模块编译,将编译的静态资源上传至Linkis管理台所在的服务器,既可访问,注意:linkis单机部署默认使用的是session进行校验,需要先登录linkis管理台,再登录Scriptis就可以使用。

Nginx部署举例

nginx.conf

upstream linkisServer{
server ip:port;
server ip:port;
}
server {
listen 8088;# 访问端口
server_name localhost;
#charset koi8-r;
#access_log /var/log/nginx/host.access.log main;
#scriptis静态资源
location /scriptis {
# 修改为自己的前端路径
alias /home/nginx/scriptis-web/dist; # 静态文件目录
#root /home/hadoop/dss/web/dss/linkis;
index index.html index.html;
}
#默认资源路径指向管理台前端静态资源
location / {
# 修改为自己的前端路径
root /home/nginx/linkis-web/dist; # 静态文件目录
#root /home/hadoop/dss/web/dss/linkis;
index index.html index.html;
}

location /ws {
proxy_pass http://linkisServer/api #后端Linkis的地址
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection upgrade;
}

location /api {
proxy_pass http://linkisServer/api; #后端Linkis的地址
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header x_real_ipP $remote_addr;
proxy_set_header remote_addr $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_http_version 1.1;
proxy_connect_timeout 4s;
proxy_read_timeout 600s;
proxy_send_timeout 12s;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection upgrade;
}

#error_page 404 /404.html;
# redirect server error pages to the static page /50x.html
#
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root /usr/share/nginx/html;
}
}

如何排查问题

    1. linkis一共有100多个模块,最终启动的服务一共是7个,分别是 linkis-cg-engineconnmanager,linkis-cg-engineplugin,linkis-cg-entrance,linkis-cg-linkismanager, linkis-mg-gateway, linkis-mg-eureka,linkis-ps-publicservice,每一个模块都有这不同的功能,其中linkis-cg-engineconnmanager 负责管理启动引擎服务,会生成对应引擎的脚本来拉起引擎服务,所以我们团队在部署时将linkis-cg-engineconnmanager单独启动在服务器上以便于有足够的资源给用户执行。
    1. 像jdbc,spark.hetu之类的引擎的执行需要一些jar包的支撑,在linkis种称之为物料,打包的时候这些jar包会打到linkis-cg-engineplugin下对用的引擎中,会出现conf 和lib目录,启动这个服务时,会将两个打包上传到配置的目录,会生成两个zip文件,我们使用的是OSS来存储这些物料信息,所以首先是上传到OSS,然后再下载到linkis-cg-engineconnmanager这个服务所在服务器上,然后如果配置了以下两个配置 wds.linkis.enginecoon.public.dir 和 wds.linkis.enginecoon.root.dir ,那么会把包拉到wds.linkis.enginecoon.public.dir这个目录下来,wds.linkis.enginecoon.root.dir这个目录是工作目录,里面存放日志和脚本信息,还有一个lib和conf的软连接到 wds.linkis.enginecoon.public.dir。
    1. 如果要排查引擎日志可以到 wds.linkis.enginecoon.root.dir 配置下的目录去看,当然日志信息也会在Scriptis页面执行的日志上展示,直接粘贴去查找即可。

· 2 分钟阅读

本文主要介绍在 Linkis 1.3.2 版本中,整合 OceanBase 数据库。 OceanBase 数据库兼容 MySQL 5.7/8.0 的绝大部分功能和语法。因此可以将 OceanBase 数据库当成 MySQL 来使用。

1. 准备工作

1.1 环境安装

安装部署 OceanBase 数据库,参看快速体验OceanBase

1.2 环境验证

可以使用 MySQL 命令验证 OceanBase 数据库安装情况。

mysql -h${ip} -P${port} -u${username} -p${password} -D${db_name}

连接成功如下图所示:

2. Linkis提交 OceanBase 数据库任务

2.1 通过 shell 提交任务

 sh ./bin/linkis-cli -engineType jdbc-4 -codeType jdbc -code "show tables" -submitUser hadoop -proxyUser hadoop -runtimeMap wds.linkis.jdbc.connect.url=jdbc:mysql://${ip}:${port}/${db_name} -runtimeMap wds.linkis.jdbc.driver=com.mysql.jdbc.Driver -runtimeMap wds.linkis.jdbc.username=${username} -runtimeMap wds.linkis.jdbc.password=${password}

2.2 通过 Linkis SDK 提交任务

Linkis 提供了 JavaScalaSDKLinkis 服务端提交任务。具体可以参考 JAVA SDK Manual。对于 OceanBase 任务您只需要修改 Demo 中的 EngineConnTypeCodeType 参数即可:

Map<String, Object> labels = new HashMap<String, Object>();
labels.put(LabelKeyConstant.ENGINE_TYPE_KEY, "jdbc-4"); // required engineType Label
labels.put(LabelKeyConstant.USER_CREATOR_TYPE_KEY, "hadoop-IDE");// required execute user and creator
labels.put(LabelKeyConstant.CODE_TYPE_KEY, "jdbc"); // required codeType

2.3 多数据源支持

地址:登陆管理台-->数据源管理

步骤1:新建数据源

步骤2:连接测试

点击测试连接按钮进行测试

步骤3:发布数据源

步骤4:通过指定数据源名称提交 OceanBase 任务

请求URL:http://${gateway_url}:${port}/api/rest_j/v1/entrance/submit

请求方式:POST

请求参数:

{
"executionContent": {
"code": "show databases",
"runType": "jdbc"
},
"params": {
"variable": {},
"configuration": {
"startup": {},
"runtime": {
"wds.linkis.engine.runtime.datasource": "ob-test"
}
}
},
"labels": {
"engineType": "jdbc-4"
}
}

响应结果:

{
"method": "/api/entrance/submit",
"status": 0,
"message": "OK",
"data": {
"taskID": 93,
"execID": "exec_id018017linkis-cg-entrance000830fb1364:9104IDE_hadoop_jdbc_0"
}
}

管理台查看任务执行情况:

· 5 分钟阅读
aiceflower

背景

引擎物料管理是linkis引擎物料管理系统,主要用来管理Linkis的引擎物料文件,存储用户的各种引擎文件,包括引擎类型、引擎版本等信息。总体流程为压缩文件经前端浏览器上传至物料库(BML),物料压缩文件解压、校验,需要执行时如果发现本地不存在该引擎,则需要去物料库中寻找并下载安装注册从而执行。

具备以下功能点:

1)、支持上传打包好的引擎文件,上传文件大小受nginx的配置影响,文件类型为zip文件类型,在windows环境下自行打包zip压缩文件不支持。

2)、支持对已有的引擎物料进行更新,更新后在BML中新增一个bml引擎物料的存储版本,可以对当前的版本进行回滚和删除。

3)、一个引擎涉及两个引擎物料,分别是lib和conf,可以进行分别管理。

架构图

架构说明

1、引擎物料管理在Linkis web管理台中,需要管理员权限,在开发调试时需要设置测试环境下的管理员字段。

2、引擎物料管理涉及引擎物料文件的增加、更新、删除,物料文件分为lib和conf分别存储。文件中涉及两个版本的概念,一个是引擎本身的版本,另一个则是物料版本,在更新操作中物料如果存在修改则会新增一个物料版本并将其存储在BML中,支持物料版本的删除和回滚。

3、利用BML Service对引擎物料文件进行存储,通过RPC调用BML的服务对文件进行存储,得到存储的资源id和版本并保存。

核心流程

  1. 上传zip类型的引擎插件文件,先存储在引擎插件Home目录中并解压文件,之后进行启动刷新程序。
  2. 对解压后的引擎文件中的conf、lib目录进行压缩,上传至BML(物料管理系统)中,分别获取对应的BML的资源id和资源版本,读取对应引擎名称和版本信息。
  3. 在引擎物料资源表中,新增引擎物料的记录,每次上传都会分别产生lib和conf两条数据。除了记录这个引擎的名称和类型信息外,最重要的是记录了该引擎在物料管理系统中的信息,包括引擎的资源id和版本信息,关联至BML中的资源表。

数据库设计

引擎物料资源信息表(linkis_cg_engine_conn_plugin_bml_resources)

字段名作用备注
id引擎物料包标识idPrimary key
engine_conn_type存放资源的位置如Spark
version引擎的版本如Spark的v2.4.3
file_name引擎文件名如lib.zip
file_size引擎文件大小
last_modified文件最后的修改时间
bml_resource_id记录资源在BML(物料管理系统)中的id用于在BML中标识引擎文件的id
bml_resource_version记录资源在BML中的版本如v000001
create_time资源的创建时间
last_update_time资源最后的更新时间

· 6 分钟阅读
aiceflower

前言

随着业务的发展和社区产品的更新迭代,我们发现Linkis1.X服务过多,可以适当进行服务合并,减少服务数量,方便部署和调试。目前Linkis服务主要分为三大类,包括计算治理服务(CG: entrance/ecp/ecm/linkismanager)、公共增强服务(PS:publicservice/datasource/cs)和微服务治理服务(MG:Gateway/Eureka)。这三类服务延伸的子服务过多,可以进行服务合并,做到将PS的服务全部合并,CG服务支持全部合并,同时支持将ecm服务单独出去。

服务合并变动

本次服务合并主要变动如下:

  • 支持Restful服务转发:修改点主要为Gateway的转发逻辑,类似于现在publicservice服务合并参数:wds.linkis.gateway.conf.publicservice.list
  • 支持将RPC服务远程调用改为本地调用,类似LocalMessageSender,现在已经可以通过改Sender完成本地调用的返回
  • 配置文件变动
  • 服务启停脚本变动

待实现目标

  • 基本目标:合并PS服务为一个服务
  • 基本目标:合并CG服务为CG-Service和ECM
  • 进阶目标:合并CG服务为一个服
  • 终结目标:去掉eureka、gateway变为单体服务

具体变动

Gateway变动(org.apache.linkis.gateway.ujes.route.HaContextGatewayRouter)

//变动前
override def route(gatewayContext: GatewayContext): ServiceInstance = {

if (gatewayContext.getGatewayRoute.getRequestURI.contains(HaContextGatewayRouter.CONTEXT_SERVICE_STR) ||
gatewayContext.getGatewayRoute.getRequestURI.contains(HaContextGatewayRouter.OLD_CONTEXT_SERVICE_PREFIX)){
val params: util.HashMap[String, String] = gatewayContext.getGatewayRoute.getParams
if (!gatewayContext.getRequest.getQueryParams.isEmpty) {
for ((k, vArr) <- gatewayContext.getRequest.getQueryParams) {
if (vArr.nonEmpty) {
params.putIfAbsent(k, vArr.head)
}
}
}
if (gatewayContext.getRequest.getHeaders.containsKey(ContextHTTPConstant.CONTEXT_ID_STR)) {
params.putIfAbsent(ContextHTTPConstant.CONTEXT_ID_STR, gatewayContext.getRequest.getHeaders.get(ContextHTTPConstant.CONTEXT_ID_STR)(0))
}
if (null == params || params.isEmpty) {
dealContextCreate(gatewayContext)
} else {
var contextId : String = null
for ((key, value) <- params) {
if (key.equalsIgnoreCase(ContextHTTPConstant.CONTEXT_ID_STR)) {
contextId = value
}
}
if (StringUtils.isNotBlank(contextId)) {
dealContextAccess(contextId.toString, gatewayContext)
} else {
dealContextCreate(gatewayContext)
}
}
}else{
null
}
}
//变动后
override def route(gatewayContext: GatewayContext): ServiceInstance = {

if (
gatewayContext.getGatewayRoute.getRequestURI.contains(
RPCConfiguration.CONTEXT_SERVICE_REQUEST_PREFIX
)
) {
val params: util.HashMap[String, String] = gatewayContext.getGatewayRoute.getParams
if (!gatewayContext.getRequest.getQueryParams.isEmpty) {
for ((k, vArr) <- gatewayContext.getRequest.getQueryParams.asScala) {
if (vArr.nonEmpty) {
params.putIfAbsent(k, vArr.head)
}
}
}
if (gatewayContext.getRequest.getHeaders.containsKey(ContextHTTPConstant.CONTEXT_ID_STR)) {
params.putIfAbsent(
ContextHTTPConstant.CONTEXT_ID_STR,
gatewayContext.getRequest.getHeaders.get(ContextHTTPConstant.CONTEXT_ID_STR)(0)
)
}
if (null == params || params.isEmpty) {
dealContextCreate(gatewayContext)
} else {
var contextId: String = null
for ((key, value) <- params.asScala) {
if (key.equalsIgnoreCase(ContextHTTPConstant.CONTEXT_ID_STR)) {
contextId = value
}
}
if (StringUtils.isNotBlank(contextId)) {
dealContextAccess(contextId, gatewayContext)
} else {
dealContextCreate(gatewayContext)
}
}
} else {
null
}
}


//变动前
def dealContextCreate(gatewayContext:GatewayContext):ServiceInstance = {
val serviceId = findService(HaContextGatewayRouter.CONTEXT_SERVICE_STR, list => {
val services = list.filter(_.contains(HaContextGatewayRouter.CONTEXT_SERVICE_STR))
services.headOption
})
val serviceInstances = ServiceInstanceUtils.getRPCServerLoader.getServiceInstances(serviceId.orNull)
if (serviceInstances.size > 0) {
val index = new Random().nextInt(serviceInstances.size)
serviceInstances(index)
} else {
logger.error(s"No valid instance for service : " + serviceId.orNull)
null
}
}
//变动后
def dealContextCreate(gatewayContext: GatewayContext): ServiceInstance = {
val serviceId = findService(
RPCConfiguration.CONTEXT_SERVICE_NAME,
list => {
val services = list.filter(_.contains(RPCConfiguration.CONTEXT_SERVICE_NAME))
services.headOption
}
)
val serviceInstances =
ServiceInstanceUtils.getRPCServerLoader.getServiceInstances(serviceId.orNull)
if (serviceInstances.size > 0) {
val index = new Random().nextInt(serviceInstances.size)
serviceInstances(index)
} else {
logger.error(s"No valid instance for service : " + serviceId.orNull)
null
}
}

//变动前
def dealContextAccess(contextIdStr:String, gatewayContext: GatewayContext):ServiceInstance = {
val contextId : String = {
var tmpId : String = null
if (serializationHelper.accepts(contextIdStr)) {
val contextID : ContextID = serializationHelper.deserialize(contextIdStr).asInstanceOf[ContextID]
if (null != contextID) {
tmpId = contextID.getContextId
} else {
logger.error(s"Deserializate contextID null. contextIDStr : " + contextIdStr)
}
} else {
logger.error(s"ContxtIDStr cannot be deserialized. contextIDStr : " + contextIdStr)
}
if (null == tmpId) {
contextIdStr
} else {
tmpId
}
}
val instances = contextIDParser.parse(contextId)
var serviceId:Option[String] = None
serviceId = findService(HaContextGatewayRouter.CONTEXT_SERVICE_STR, list => {
val services = list.filter(_.contains(HaContextGatewayRouter.CONTEXT_SERVICE_STR))
services.headOption
})
val serviceInstances = ServiceInstanceUtils.getRPCServerLoader.getServiceInstances(serviceId.orNull)
if (instances.size() > 0) {
serviceId.map(ServiceInstance(_, instances.get(0))).orNull
} else if (serviceInstances.size > 0) {
serviceInstances(0)
} else {
logger.error(s"No valid instance for service : " + serviceId.orNull)
null
}
}

}
//变动后
def dealContextAccess(contextIdStr: String, gatewayContext: GatewayContext): ServiceInstance = {
val contextId: String = {
var tmpId: String = null
if (serializationHelper.accepts(contextIdStr)) {
val contextID: ContextID =
serializationHelper.deserialize(contextIdStr).asInstanceOf[ContextID]
if (null != contextID) {
tmpId = contextID.getContextId
} else {
logger.error(s"Deserializate contextID null. contextIDStr : " + contextIdStr)
}
} else {
logger.error(s"ContxtIDStr cannot be deserialized. contextIDStr : " + contextIdStr)
}
if (null == tmpId) {
contextIdStr
} else {
tmpId
}
}
val instances = contextIDParser.parse(contextId)
var serviceId: Option[String] = None
serviceId = findService(
RPCConfiguration.CONTEXT_SERVICE_NAME,
list => {
val services = list.filter(_.contains(RPCConfiguration.CONTEXT_SERVICE_NAME))
services.headOption
}
)
val serviceInstances =
ServiceInstanceUtils.getRPCServerLoader.getServiceInstances(serviceId.orNull)
if (instances.size() > 0) {
serviceId.map(ServiceInstance(_, instances.get(0))).orNull
} else if (serviceInstances.size > 0) {
serviceInstances(0)
} else {
logger.error(s"No valid instance for service : " + serviceId.orNull)
null
}
}

//变动前
object HaContextGatewayRouter{
val CONTEXT_ID_STR:String = "contextId"
val CONTEXT_SERVICE_STR:String = "ps-cs"
@Deprecated
val OLD_CONTEXT_SERVICE_PREFIX = "contextservice"
val CONTEXT_REGEX: Regex = (normalPath(API_URL_PREFIX) + "rest_[a-zA-Z][a-zA-Z_0-9]*/(v\\d+)/contextservice/" + ".+").r
}
//变动后
object HaContextGatewayRouter {

val CONTEXT_ID_STR: String = "contextId"

@deprecated("please use RPCConfiguration.CONTEXT_SERVICE_REQUEST_PREFIX")
val CONTEXT_SERVICE_REQUEST_PREFIX = RPCConfiguration.CONTEXT_SERVICE_REQUEST_PREFIX

@deprecated("please use RPCConfiguration.CONTEXT_SERVICE_NAME")
val CONTEXT_SERVICE_NAME: String =
if (
RPCConfiguration.ENABLE_PUBLIC_SERVICE.getValue && RPCConfiguration.PUBLIC_SERVICE_LIST
.exists(_.equalsIgnoreCase(RPCConfiguration.CONTEXT_SERVICE_REQUEST_PREFIX))
) {
RPCConfiguration.PUBLIC_SERVICE_APPLICATION_NAME.getValue
} else {
RPCConfiguration.CONTEXT_SERVICE_APPLICATION_NAME.getValue
}

val CONTEXT_REGEX: Regex =
(normalPath(API_URL_PREFIX) + "rest_[a-zA-Z][a-zA-Z_0-9]*/(v\\d+)/contextservice/" + ".+").r

}

RPC服务变动(org.apache.linkis.rpc.conf.RPCConfiguration)

//变动前
val BDP_RPC_BROADCAST_THREAD_SIZE: CommonVars[Integer] = CommonVars("wds.linkis.rpc.broadcast.thread.num", new Integer(25))
//变动后
val BDP_RPC_BROADCAST_THREAD_SIZE: CommonVars[Integer] = CommonVars("wds.linkis.rpc.broadcast.thread.num", 25)

//变动前
val PUBLIC_SERVICE_LIST: Array[String] = CommonVars("wds.linkis.gateway.conf.publicservice.list", "query,jobhistory,application,configuration,filesystem,udf,variable,microservice,errorcode,bml,datasource").getValue.split(",")
//变动后
val PUBLIC_SERVICE_LIST: Array[String] = CommonVars("wds.linkis.gateway.conf.publicservice.list", "cs,contextservice,data-source-manager,metadataquery,metadatamanager,query,jobhistory,application,configuration,filesystem,udf,variable,microservice,errorcode,bml,datasource").getValue.split(",")

配置文件变动

##去除部分

#删除如下配置文件
linkis-dist/package/conf/linkis-ps-cs.properties
linkis-dist/package/conf/linkis-ps-data-source-manager.properties
linkis-dist/package/conf/linkis-ps-metadataquery.properties

##修改部分

#修改linkis-dist/package/conf/linkis-ps-publicservice.properties
#restful修改前
wds.linkis.server.restful.scan.packages=org.apache.linkis.jobhistory.restful,org.apache.linkis.variable.restful,org.apache.linkis.configuration.restful,org.apache.linkis.udf.api,org.apache.linkis.filesystem.restful,org.apache.linkis.filesystem.restful,org.apache.linkis.instance.label.restful,org.apache.linkis.metadata.restful.api,org.apache.linkis.cs.server.restful,org.apache.linkis.bml.restful,org.apache.linkis.errorcode.server.restful

#restful修改后
wds.linkis.server.restful.scan.packages=org.apache.linkis.cs.server.restful,org.apache.linkis.datasourcemanager.core.restful,org.apache.linkis.metadata.query.server.restful,org.apache.linkis.jobhistory.restful,org.apache.linkis.variable.restful,org.apache.linkis.configuration.restful,org.apache.linkis.udf.api,org.apache.linkis.filesystem.restful,org.apache.linkis.filesystem.restful,org.apache.linkis.instance.label.restful,org.apache.linkis.metadata.restful.api,org.apache.linkis.cs.server.restful,org.apache.linkis.bml.restful,org.apache.linkis.errorcode.server.restful

#mybatis修改前
wds.linkis.server.mybatis.mapperLocations=classpath:org/apache/linkis/jobhistory/dao/impl/*.xml,classpath:org/apache/linkis/variable/dao/impl/*.xml,classpath:org/apache/linkis/configuration/dao/impl/*.xml,classpath:org/apache/linkis/udf/dao/impl/*.xml,classpath:org/apache/linkis/instance/label/dao/impl/*.xml,classpath:org/apache/linkis/metadata/hive/dao/impl/*.xml,org/apache/linkis/metadata/dao/impl/*.xml,classpath:org/apache/linkis/bml/dao/impl/*.xml

wds.linkis.server.mybatis.typeAliasesPackage=org.apache.linkis.configuration.entity,org.apache.linkis.jobhistory.entity,org.apache.linkis.udf.entity,org.apache.linkis.variable.entity,org.apache.linkis.instance.label.entity,org.apache.linkis.manager.entity,org.apache.linkis.metadata.domain,org.apache.linkis.bml.entity

wds.linkis.server.mybatis.BasePackage=org.apache.linkis.jobhistory.dao,org.apache.linkis.variable.dao,org.apache.linkis.configuration.dao,org.apache.linkis.udf.dao,org.apache.linkis.instance.label.dao,org.apache.linkis.metadata.hive.dao,org.apache.linkis.metadata.dao,org.apache.linkis.bml.dao,org.apache.linkis.errorcode.server.dao,org.apache.linkis.publicservice.common.lock.dao

#mybatis修改后
wds.linkis.server.mybatis.mapperLocations=classpath*:org/apache/linkis/cs/persistence/dao/impl/*.xml,classpath:org/apache/linkis/datasourcemanager/core/dao/mapper/*.xml,classpath:org/apache/linkis/jobhistory/dao/impl/*.xml,classpath:org/apache/linkis/variable/dao/impl/*.xml,classpath:org/apache/linkis/configuration/dao/impl/*.xml,classpath:org/apache/linkis/udf/dao/impl/*.xml,classpath:org/apache/linkis/instance/label/dao/impl/*.xml,classpath:org/apache/linkis/metadata/hive/dao/impl/*.xml,org/apache/linkis/metadata/dao/impl/*.xml,classpath:org/apache/linkis/bml/dao/impl/*.xml

wds.linkis.server.mybatis.typeAliasesPackage=org.apache.linkis.cs.persistence.entity,org.apache.linkis.datasourcemanager.common.domain,org.apache.linkis.datasourcemanager.core.vo,org.apache.linkis.configuration.entity,org.apache.linkis.jobhistory.entity,org.apache.linkis.udf.entity,org.apache.linkis.variable.entity,org.apache.linkis.instance.label.entity,org.apache.linkis.manager.entity,org.apache.linkis.metadata.domain,org.apache.linkis.bml.entity

wds.linkis.server.mybatis.BasePackage=org.apache.linkis.cs.persistence.dao,org.apache.linkis.datasourcemanager.core.dao,org.apache.linkis.jobhistory.dao,org.apache.linkis.variable.dao,org.apache.linkis.configuration.dao,org.apache.linkis.udf.dao,org.apache.linkis.instance.label.dao,org.apache.linkis.metadata.hive.dao,org.apache.linkis.metadata.dao,org.apache.linkis.bml.dao,org.apache.linkis.errorcode.server.dao,org.apache.linkis.publicservice.common.lock.dao

部署脚本变动(linkis-dist/package/sbin/linkis-start-all.sh)

#服务启动脚本去掉如下部分

#linkis-ps-cs
SERVER_NAME="ps-cs"
SERVER_IP=$CS_INSTALL_IP
startApp

if [ "$ENABLE_METADATA_QUERY" == "true" ]; then
#linkis-ps-data-source-manager
SERVER_NAME="ps-data-source-manager"
SERVER_IP=$DATASOURCE_MANAGER_INSTALL_IP
startApp

#linkis-ps-metadataquery
SERVER_NAME="ps-metadataquery"
SERVER_IP=$METADATA_QUERY_INSTALL_IP
startApp
fi

#linkis-ps-cs
SERVER_NAME="ps-cs"
SERVER_IP=$CS_INSTALL_IP
checkServer

if [ "$ENABLE_METADATA_QUERY" == "true" ]; then
#linkis-ps-data-source-manager
SERVER_NAME="ps-data-source-manager"
SERVER_IP=$DATASOURCE_MANAGER_INSTALL_IP
checkServer

#linkis-ps-metadataquery
SERVER_NAME="ps-metadataquery"
SERVER_IP=$METADATA_QUERY_INSTALL_IP
checkServer
fi


#服务停止脚本去掉如下部分
#linkis-ps-cs
SERVER_NAME="ps-cs"
SERVER_IP=$CS_INSTALL_IP
stopApp

if [ "$ENABLE_METADATA_QUERY" == "true" ]; then
#linkis-ps-data-source-manager
SERVER_NAME="ps-data-source-manager"
SERVER_IP=$DATASOURCE_MANAGER_INSTALL_IP
stopApp

#linkis-ps-metadataquery
SERVER_NAME="ps-metadataquery"
SERVER_IP=$METADATA_QUERY_INSTALL_IP
stopApp
fi

更多服务合并变动细节参见:https://github.com/apache/linkis/pull/2927/files

· 7 分钟阅读
kevinWdong

前言

随着业务的发展和社区产品的更新迭代,我们发现Linkis1.X在资源管理,引擎管理方面有极大的性能提升,可以更好的满足数据中台的建设。相较于0.9.3版本和我们之前使用的平台, 在用户体验方面也得到很大的提升,任务失败页面无法方便查看详情等问题也都得到改善,因此决定升级Linkis以及WDS套件,那么如下是具体的实践操作,希望给大家带来参考。

一、环境

CDH6.3.2 各组件版本

  • hadoop:3.0.0-cdh6.3.2
  • hive:2.1.1-cdh6.3.2
  • spark:2.4.8

硬件环境

2台 128G 云物理机

二、Linkis安装部署

2.1编译代码or release安装包?

本次安装部署采用的是release安装包方式部署。为了适配司内CDH6.3.2版本,hadoop和hive的相关依赖包需要替换成CDH6.3.2版本,这里采用的是直接替换安装包的方式。需要替换的依赖包与模块如下l列表所示。

--涉及到的模块
linkis-engineconn-plugins/spark
linkis-engineconn-plugins/hive
/linkis-commons/public-module
/linkis-computation-governance/
-----需要更换cdh包的列表
./lib/linkis-engineconn-plugins/spark/dist/v2.4.8/lib/hive-shims-0.23-2.1.1-cdh6.3.2.jar
./lib/linkis-engineconn-plugins/spark/dist/v2.4.8/lib/hive-shims-scheduler-2.1.1-cdh6.3.2.jar
./lib/linkis-engineconn-plugins/spark/dist/v2.4.8/lib/hadoop-annotations-3.0.0-cdh6.3.2.jar
./lib/linkis-engineconn-plugins/spark/dist/v2.4.8/lib/hadoop-auth-3.0.0-cdh6.3.2.jar
./lib/linkis-engineconn-plugins/spark/dist/v2.4.8/lib/hadoop-common-3.0.0-cdh6.3.2.jar
./lib/linkis-engineconn-plugins/spark/dist/v2.4.8/lib/hadoop-hdfs-3.0.0-cdh6.3.2.jar
./lib/linkis-engineconn-plugins/spark/dist/v2.4.8/lib/hadoop-hdfs-client-3.0.0-cdh6.3.2.jar
./lib/linkis-engineconn-plugins/hive/dist/v2.1.1/lib/hadoop-client-3.0.0-cdh6.3.2.jar
./lib/linkis-engineconn-plugins/hive/dist/v2.1.1/lib/hadoop-mapreduce-client-common-3.0.0-cdh6.3.2.jar
./lib/linkis-engineconn-plugins/hive/dist/v2.1.1/lib/hadoop-mapreduce-client-jobclient-3.0.0-cdh6.3.2.jar
./lib/linkis-engineconn-plugins/hive/dist/v2.1.1/lib/hadoop-yarn-api-3.0.0-cdh6.3.2.jar
./lib/linkis-engineconn-plugins/hive/dist/v2.1.1/lib/hadoop-yarn-client-3.0.0-cdh6.3.2.jar
./lib/linkis-engineconn-plugins/hive/dist/v2.1.1/lib/hadoop-yarn-server-common-3.0.0-cdh6.3.2.jar
./lib/linkis-engineconn-plugins/hive/dist/v2.1.1/lib/hadoop-hdfs-client-3.0.0-cdh6.3.2.jar
./lib/linkis-engineconn-plugins/hive/dist/v2.1.1/lib/hadoop-mapreduce-client-core-3.0.0-cdh6.3.2.jar
./lib/linkis-engineconn-plugins/hive/dist/v2.1.1/lib/hadoop-mapreduce-client-shuffle-3.0.0-cdh6.3.2.jar
./lib/linkis-engineconn-plugins/hive/dist/v2.1.1/lib/hadoop-yarn-common-3.0.0-cdh6.3.2.jar
./lib/linkis-engineconn-plugins/flink/dist/v1.12.2/lib/hadoop-annotations-3.0.0-cdh6.3.2.jar
./lib/linkis-engineconn-plugins/flink/dist/v1.12.2/lib/hadoop-auth-3.0.0-cdh6.3.2.jar
./lib/linkis-engineconn-plugins/flink/dist/v1.12.2/lib/hadoop-mapreduce-client-core-3.0.0-cdh6.3.2.jar
./lib/linkis-engineconn-plugins/flink/dist/v1.12.2/lib/hadoop-yarn-api-3.0.0-cdh6.3.2.jar
./lib/linkis-engineconn-plugins/flink/dist/v1.12.2/lib/hadoop-yarn-client-3.0.0-cdh6.3.2.jar
./lib/linkis-engineconn-plugins/flink/dist/v1.12.2/lib/hadoop-yarn-common-3.0.0-cdh6.3.2.jar
./lib/linkis-commons/public-module/hadoop-annotations-3.0.0-cdh6.3.2.jar
./lib/linkis-commons/public-module/hadoop-auth-3.0.0-cdh6.3.2.jar
./lib/linkis-commons/public-module/hadoop-common-3.0.0-cdh6.3.2.jar
./lib/linkis-commons/public-module/hadoop-hdfs-client-3.0.0-cdh6.3.2.jar
./lib/linkis-computation-governance/linkis-cg-linkismanager/hadoop-annotations-3.0.0-cdh6.3.2.jar
./lib/linkis-computation-governance/linkis-cg-linkismanager/hadoop-auth-3.0.0-cdh6.3.2.jar
./lib/linkis-computation-governance/linkis-cg-linkismanager/hadoop-yarn-api-3.0.0-cdh6.3.2.jar
./lib/linkis-computation-governance/linkis-cg-linkismanager/hadoop-yarn-client-3.0.0-cdh6.3.2.jar
./lib/linkis-computation-governance/linkis-cg-linkismanager/hadoop-yarn-common-3.0.0-cdh6.3.2.jar

2.2部署过程中遇到的问题

1、kerberos配置 需要在linkis.properties公共配置中添加 各个引擎conf也需要添加

wds.linkis.keytab.enable=true
wds.linkis.keytab.file=/hadoop/bigdata/kerberos/keytab
wds.linkis.keytab.host.enabled=false
wds.linkis.keytab.host=your_host

2、更换Hadoop依赖包后启动报错java.lang.NoClassDefFoundError:org/apache/commons/configuration2/Configuration

image

原因:Configuration类冲突,在linkis-commons模块下在添加一个commons-configuration2-2.1.1.jar解决冲突

3、script中运行spark、python等报错no plugin for XXX 现象:在配置文件中修改完spark/python的版本后,启动引擎报错no plugin for XXX image 原因:LabelCommonConfig.java和GovernaceCommonConf.scala这两个类中写死了引擎的版本,修改相应版本,编译后替换掉linkis以及其他组件(包括schedulis等)里面所有包含这两个类的jar(linkis-computation-governance-common-1.1.1.jar和linkis-label-common-1.1.1.jar)

4、python引擎执行报错,初始化失败

  • 修改python.py,移除引入pandas模块

  • 配置python加载目录,修改python引擎的linkis-engineconn.properties

    pythonVersion=/usr/local/bin/python3.6

5、运行pyspark任务失败报错 image 原因:未设置PYSPARK_VERSION 解决方法: 在/etc/profile下设置两个参数

export PYSPARK_PYTHON=/usr/local/bin/python3.6

export PYSPARK_DRIVER_PYTHON=/usr/local/bin/python3.6

6、执行pyspark任务报错 java.lang.NoSuchFieldError: HIVE_STATS_JDBC_TIMEOUT image 原因:spark2.4.8里面使用的是hive1.2.1的包,但是我们的hive升级到了2.1.1版本,hive2里面已经去掉了这个参数,然后spark-sql里面的代码依然是要调用hive的这个参数的,然后就报错了, 所以在spark-sql/hive代码中删除掉了HIVE_STATS_JDBC_TIMEOUT这个参数,重新编译后打包,替换spark2.4.8中的spark-hive_2.11-2.4.8.jar

7、jdbc引擎执行出现代理用户异常

现象:用A用户去执行一个jdbc任务1,引擎选了可以复用,然后我也用B用户去执行一个jdbc任务2,发现 任务2的提交人是A 分析原因: ConnectionManager::getConnection image 这里创建datasource的时候是根据key来判断是否创建,而这个key是jdbc url ,但这种粒度可能有点大,因为有可能是不同的用户去访问同一个数据源,比如说hive,他们的url是一样的,但是账号密码是不一样的,所以当第一个用户去创建datasource时,username已经指定了,第二个用户进来的时候,发现这个数据源存在,就直接拿这个数据源去用,而不是创建一个新的datasource,所以造成了用户B提交的代码通过A去执行了。
解决方法:数据源缓存map的key粒度降低,改成jdbc.url+jdbc.user。

三、DSS部署

安装过程参考官网文档进行安装配置,下面说明一下在安装调试过程中遇到的一些事项。

3.1 DSS 左侧数据库展示的数据库列表显示不全

分析:DSS数据源模块显示的数据库信息是来源于hive的元数据库,但由于CDH6中通过sentry进行权限控制,大部分的hive表元数据信息没有存在于hive metastore中,所以展示的数据存在缺失。 解决方法: 将原有逻辑改造成使用jdbc链接hive的方式,从jdbc中获取表数据展示。 简单逻辑描述: jdbc的properties信息通过linkis控制台配置的IDE-jdbc的配置信息获取。 DBS:通过connection.getMetaData()获取schema TBS:connection.getMetaData().getTables()获取对应db下的tables COLUMNS:通过执行describe table 获取表的columns信息

3.2 DSS 工作流中执行jdbc脚本报错 jdbc.name is empty

分析:dss workflow中默认执行的creator是Schedulis,由于在管理台中未配置Schedulis的相关引擎参数,导致读取的参数全为空。 在控制台中添加Schedulis的Category时报错,”Schedulis目录已存在“。由于调度系统中的creator是schedulis,导致无法添加Schedulis Category,为了更好的标识各个系统,所以将dss workflow中默认执行的creator改成nodeexcetion,该参数可以在dss-flow-execution-server.properties中添加wds.linkis.flow.job.creator.v1=nodeexecution一行配置即可。