Skip to main content
Version: 1.4.0

Pipeline Engine

Pipeline is mainly used to import and export files. This article mainly introduces the installation, use and configuration of the Hive engine plugin in Linkis.

1. Engine plugin installation

1.1 Engine plugin preparation (choose one) non-default engine

Method 1: Download the engine plug-in package directly

Linkis Engine Plugin Download

Method 2: Compile the engine plug-in separately (maven environment is required)

# compile
cd ${linkis_code_dir}/linkis-engineconn-plugins/pipeline/
mvn clean install
# The compiled engine plug-in package is located in the following directory
${linkis_code_dir}/linkis-engineconn-plugins/pipeline/target/out/

EngineConnPlugin engine plugin installation

1.2 Uploading and loading of engine plugins

Upload the engine plug-in package in 1.1 to the engine directory of the server

${LINKIS_HOME}/lib/linkis-engineplugins

The directory structure after uploading is as follows

linkis-engineconn-plugins/
├── pipeline
│ ├── dist
│ │ └── 1
│ │ ├── conf
│ │ └── lib
│ └── plugin
│ └── 1

1.3 Engine refresh

1.3.1 Restart and refresh

Refresh the engine by restarting the linkis-cg-linkismanager service

cd ${LINKIS_HOME}/sbin
sh linkis-daemon.sh restart cg-linkismanager

1.3.2 Check if the engine is refreshed successfully

You can check whether the last_update_time of the linkis_engine_conn_plugin_bml_resources table in the database is the time to trigger the refresh.

#Log in to the linkis database
select * from linkis_cg_engine_conn_plugin_bml_resources;

2 Engine usage

Because the pipeline engine is mainly used to import and export files, now we assume that importing files from A to B is an introduction case

2.1 Submit tasks through Linkis-cli

sh bin/linkis-cli -submitUser Hadoop \
-engineType pipeline-1 -codeType pipeline \
-code "from hdfs:///000/000/000/A.dolphin to file:///000/000/000/B.csv"

from hdfs:///000/000/000/A.dolphin to file:///000/000/000/B.csv This content is explained in 2.3

More Linkis-Cli command parameter reference: Linkis-Cli usage

3. Engine configuration instructions

3.1 Default configuration description

ConfigurationDefaultRequiredDescription
pipeline.output.moldcsvnoresult set export type
pipeline.field.split,nocsv separator
pipeline.output.charsetgbknoresult set export character set
pipeline.output.isoverwritetruenooverwrite
wds.linkis.rm.instance3NoMaximum concurrent number of pipeline engines
pipeline.output.shuffle.null.typeNULLNoNull replacement
wds.linkis.engineconn.java.driver.memory2gnopipeline engine initialization memory size

4.2 Configuration modification

If the default parameters are not satisfied, there are the following ways to configure some basic parameters

4.2.1 Management console configuration

Note: After modifying the configuration under the IDE tag, you need to specify -creator IDE to take effect (other tags are similar), such as:

sh bin/linkis-cli -creator IDE \
-submitUser hadoop \
-engineType pipeline-1 \
-codeType pipeline \
-code "from hdfs:///000/000/000/A.dolphin to file:///000/000/000/B.csv"

4.2.2 Task interface configuration

Submit the task interface, configure it through the parameter params.configuration.runtime

Example of http request parameters
{
"executionContent": {"code": "from hdfs:///000/000/000/A.dolphin to file:///000/000/000/B.csv", "runType": "pipeline"},
"params": {
"variable": {},
"configuration": {
"runtime": {
"pipeline.output.mold":"csv",
"pipeline.output.charset":"gbk"
}
}
},
"labels": {
"engineType": "pipeline-1",
"userCreator": "hadoop-IDE"
}
}

Linkis is managed through engine tags, and the data table information involved is as follows.

linkis_ps_configuration_config_key: key and default values ​​of configuration parameters inserted into the engine
linkis_cg_manager_label: insert engine label such as: pipeline-1
linkis_ps_configuration_category: The directory association relationship of the insertion engine
linkis_ps_configuration_config_value: Insert the configuration that the engine needs to display
linkis_ps_configuration_key_engine_relation: The relationship between the configuration item and the engine

The initial data related to the engine in the table is as follows

-- set variable
SET @PIPELINE_LABEL="pipeline-1";
SET @PIPELINE_ALL=CONCAT('*-*,',@PIPELINE_LABEL);
SET @PIPELINE_IDE=CONCAT('*-IDE,',@PIPELINE_LABEL);

-- engine label
insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType', @PIPELINE_ALL, 'OPTIONAL', 2, now(), now());
insert into `linkis_cg_manager_label` (`label_key`, `label_value`, `label_feature`, `label_value_size`, `update_time`, `create_time`) VALUES ('combined_userCreator_engineType', @PIPELINE_IDE, 'OPTIONAL', 2, now(), now());

select @label_id := id from linkis_cg_manager_label where `label_value` = @PIPELINE_IDE;
insert into linkis_ps_configuration_category (`label_id`, `level`) VALUES (@label_id, 2);

-- configuration key
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('pipeline.output.mold', 'Value range: csv or excel', 'Result set export type','csv', 'OFT', '[\"csv\",\"excel\"]' , '0', '0', '1', 'pipeline engine settings', 'pipeline');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('pipeline.field.split', 'value range:, or \\t', 'csv delimiter',',', 'OFT', '[\",\",\"\\\\ t\"]', '0', '0', '1', 'pipeline engine settings', 'pipeline');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('pipeline.output.charset', 'value range: utf-8 or gbk', 'result set export character set','gbk', 'OFT', '[\"utf-8\",\" gbk\"]', '0', '0', '1', 'pipeline engine settings', 'pipeline');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('pipeline.output.isoverwrite', 'Value range: true or false', 'Whether to overwrite','true', 'OFT', '[\"true\",\"false\"]', '0', '0', '1', 'pipeline engine settings', 'pipeline');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.rm.instance', 'Range: 1-3, Unit: Piece', 'Maximum concurrent number of pipeline engines','3', 'NumInterval', '[1,3]', '0 ', '0', '1', 'pipeline engine settings', 'pipeline');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('wds.linkis.engineconn.java.driver.memory', 'value range: 1-10, unit: G', 'pipeline engine initialization memory size','2g', 'Regex', '^([ 1-9]|10)(G|g)$', '0', '0', '1', 'pipeline resource settings', 'pipeline');
INSERT INTO `linkis_ps_configuration_config_key` (`key`, `description`, `name`, `default_value`, `validate_type`, `validate_range`, `is_hidden`, `is_advanced`, `level`, `treeName`, `engine_conn_type`) VALUES ('pipeline.output.shuffle.null.type', 'Value range: NULL or BLANK', 'Null value replacement','NULL', 'OFT', '[\"NULL\",\"BLANK\ "]', '0', '0', '1', 'pipeline engine settings', 'pipeline');

-- key engine relation
insert into `linkis_ps_configuration_key_engine_relation` (`config_key_id`, `engine_type_label_id`)
(select config.id as `config_key_id`, label.id AS `engine_type_label_id` FROM linkis_ps_configuration_config_key config
INNER JOIN linkis_cg_manager_label label ON config.engine_conn_type = 'pipeline' and label_value = @PIPELINE_ALL);

-- engine default configuration
insert into `linkis_ps_configuration_config_value` (`config_key_id`, `config_value`, `config_label_id`)
(select `relation`.`config_key_id` AS `config_key_id`, '' AS `config_value`, `relation`.`engine_type_label_id` AS `config_label_id` FROM linkis_ps_configuration_key_engine_relation relation
INNER JOIN linkis_cg_manager_label label ON relation.engine_type_label_id = label.id AND label.label_value = @PIPELINE_ALL);