Skip to main content
Version: 1.4.0

Support spark ETL data synchronization

1. Background

Using the Spark ETL function, users can synchronize Spark data by configuring json.

2. Supported types

currently supported types

jdbc, file, redis, kafka, elasticsearch, mongo, datalake (hudi, delta)

3. General configuration instructions

name: data source name
type: Contains `source`, `transformation`, `sink`, corresponding to input, transformation, and output respectively
options: configuration parameters
saveMode: save mode, currently supports: `overwrite` and `append`
path: file path, can be: 'file://' or 'hdfs://'(default)
`resultTable` needs to correspond to `sourceTable`

4. Instructions for use

4.1 Add the required jar package

When using the data source, you need to upload the corresponding spark connector jar to the spark/jars directory, the directory location is $SPARK_HOME/jars

The spark connector jar can be obtained by the following command

git clone https://github.com/apache/linkis.git

cd link is

git checkout master

cd linkis-engineconn-plugins/spark/scala-2.12

mvn clean install -Dmaven.test.skip=true

The compiled spark connector jar is located in the following directory

linkis/linkis-engineconn-plugins/spark/scala-2.12/target/out/spark/dist/3.2.1/lib

4.2 linkis-cli submit task example

Just pass in the specific json code in code, pay attention to the conversion of quotation marks.

sh /appcom/Install/linkis/bin/linkis-cli -engineType spark-3.2.1 -codeType data_calc -code "" -submitUser hadoop -proxyUser hadoop

Linkis-cli submits redis data synchronization task example

sh ./bin/linkis-cli -engineType spark-3.2.1 -codeType data_calc -code "{\"plugins\":[{\"name\":\"file\",\"type\":\" source\",\"config\":{\"resultTable\":\"test\",\"path\":\"hdfs://linkishdfs/tmp/linkis/spark_etl_test/etltest.dolphin\",\ "serializer\":\"csv\",\"options\":{\"header\":\"true\",\"delimiter\":\";\"},\"columnNames\":[ \"name\",\"age\"]}},{\"name\":\"redis\",\"type\":\"sink\",\"config\":{\"sourceTable \":\"test\",\"host\":\"wds07\",\"port\":\"6679\",\"auth\":\"password\",\"targetTable\" :\"spark_etl_test\",\"saveMode\":\"append\"}}]}" -submitUser hadoop -proxyUser hadoop

4.3 Synchronization json script description of each data source

4.3.1 jdbc

Configuration instructions

url: jdbc connection information
user: user name
password: password
query: sql query statement

json code

{
"sources": [
{
"name": "jdbc",
"type": "source",
"config": {
"resultTable": "test1",
"url": "jdbc:mysql://127.0.0.1:3306/dip_linkis?characterEncoding=UTF-8",
"driver": "com.mysql.jdbc.Driver",
"user": "root",
"password": "123456",
"query": "select * from dip_linkis.linkis_ps_udf_baseinfo",
"options": {
}
}
}
],
"transformations": [
{
"name": "sql",
"type": "transformation",
"config": {
"resultTable": "T1654611700631",
"sql": "select * from test1"
}
}
],
"sinks": [
{
"name": "jdbc",
"type": "sink",
"config": {
"sourceTable": "T1654611700631",
"url": "jdbc:mysql://127.0.0.1:3306/dip_linkis?characterEncoding=UTF-8",
"driver": "com.mysql.jdbc.Driver",
"user": "root",
"password": "123456",
"targetTable": "linkis_ps_udf_baseinfo2",
"options": {
}
}
}
]
}

A new jar needs to be added, and the corresponding jar should be selected according to the specific data source used

DmJdbcDriver18.jar
kingbase8-8.6.0.jar
postgresql-42.3.8.jar

4.3.2 file

Configuration instructions

serializer: file format, can be `csv`, `parquet`, etc.
columnNames: column names

json code

{
"sources": [
{
"name": "file",
"type": "source",
"config": {
"resultTable": "test2",
"path": "hdfs:///tmp/test_new_no_partition",
"serializer": "csv",
"columnNames": ["id", "create_user", "udf_name", "udf_type", "tree_id", "create_time", "update_time", "sys", "cluster_name", "is_expire", "is_shared"]
}
}
],
"sinks": [
{
"name": "file",
"config": {
"sourceTable": "test2",
"path": "hdfs:///tmp/test_new",
"partitionBy": ["create_user"],
"saveMode": "overwrite",
"serializer": "csv"
}
}
]
}

Need to add new jar

spark-excel-2.12.17-3.2.2_2.12-3.2.2_0.18.1.jar

4.3.3 redis

sourceTable: source table,
host: ip address,
port": port,
auth": password,
targetTable: target table,
saveMode: support append

json code

{
"plugins":[
{
"name": "file",
"type": "source",
"config": {
"resultTable": "test",
"path": "hdfs://linkishdfs/tmp/linkis/spark_etl_test/etltest.dolphin",
"serializer": "csv",
"options": {
"header": "true",
"delimiter": ";"
},
"columnNames": ["name", "age"]
}
},
{
"name": "redis",
"type": "sink",
"config": {
"sourceTable": "test",
"host": "wds07",
"port": "6679",
"auth": "password",
"targetTable": "spark_etl_test",
"saveMode": "append"
}
}
]
}

Need to add new jar

jedis-3.2.0.jar
commons-pool2-2.8.1.jar
spark-redis_2.12-2.6.0.jar

4.3.4 kafka

Configuration instructions

servers: kafka connection information
mode: currently supports `batch` and `stream`
topic: kafka topic name

Data written to json code

{
"sources": [
{
"name": "file",
"type": "source",
"config": {
"resultTable": "T1654611700631",
"path": "file://{filePath}/etltest.dolphin",
"serializer": "csv",
"options": {
"header": "true",
"delimiter": ";"
},
"columnNames": ["name", "age"]
}
}
],
"sinks": [
{
"name": "kafka",
"config": {
"sourceTable": "T1654611700631",
"servers": "localhost:9092",
"mode": "batch",
"topic": "test121212"
}
}
]
}

Data read json code

{
"sources": [
{
"name": "kafka",
"type": "source",
"config": {
"resultTable": "T1654611700631",
"servers": "localhost:9092",
"topic": "test121212"
}
}
],
"sinks": [
{
"name": "kafka",
"config": {
"sourceTable": "T1654611700631",
"servers": "localhost:9092",
"mode": "stream",
"topic": "test55555"
}
}
]
}

Need to add new jar

kafka-clients-2.8.0.jar
spark-sql-kafka-0-10_2.12-3.2.1.jar
spark-token-provider-kafka-0-10_2.12-3.2.1.jar

4.3.5 elasticsearch

Configuration instructions

node: elasticsearch ip
port: elasticsearch port
index: elasticsearch index name

Data written to json code

{
"sources": [
{
"name": "file",
"type": "source",
"config": {
"resultTable": "T1654611700631",
"path": "file://{filePath}/etltest.dolphin",
"serializer": "csv",
"options": {
"header": "true",
"delimiter": ";"
},
"columnNames": ["name", "age"]
}
}
],
"sinks": [
{
"name": "elasticsearch",
"config": {
"sourceTable": "T1654611700631",
"node": "localhost",
"port": "9200",
"index": "estest",
"saveMode": "overwrite"
}
}
]
}

Data read json code

{
"sources": [
{
"name": "elasticsearch",
"type": "source",
"config": {
"resultTable": "T1654611700631",
"node": "localhost",
"port": "9200",
"index": "estest"
}
}
],
"sinks": [
{
"name": "file",
"config": {
"sourceTable": "T1654611700631",
"path": "file://{filePath}/csv",
"saveMode": "overwrite",
"serializer": "csv"
}
}
]
}

Need to add new jar

elasticsearch-spark-30_2.12-7.17.7.jar

4.3.6 mongo

Configuration instructions

uri: mongo connection information
database: mongo database
collection: mongo collection

Data written to json code

{
"sources": [
{
"name": "file",
"type": "source",
"config": {
"resultTable": "T1654611700631",
"path": "file://{filePath}/etltest.dolphin",
"serializer": "csv",
"options": {
"header": "true",
"delimiter": ";"
},
"columnNames": ["name", "age"]
}
}
],
"sinks": [
{
"name": "mongo",
"config": {
"sourceTable": "T1654611700631",
"uri": "mongodb://localhost:27017/test",
"database": "test",
"collection": "test",
"saveMode": "overwrite"
}
}
]
}

Data read json code

{
"sources": [
{
"name": "mongo",
"type": "source",
"config": {
"resultTable": "T1654611700631",
"uri": "mongodb://localhost:27017/test",
"database": "test",
"collection": "test"
}
}
],
"sinks": [
{
"name": "file",
"config": {
"sourceTable": "T1654611700631",
"path": "file://{filePath}/json",
"saveMode": "overwrite",
"serializer": "json"
}
}
]
}

Need to add new jar

bson-3.12.8.jar
mongo-spark-connector_2.12-3.0.1.jar
mongodb-driver-core-3.12.8.jar
mongodb-driver-sync-3.12.8.jar

4.3.7 delta

Configuration instructions

tableFormat: currently supports `hudi` and `delta`

Data written to json code

{
"sources": [
{
"name": "file",
"type": "source",
"config": {
"resultTable": "T1654611700631",
"path": "file://{filePath}/etltest.dolphin",
"serializer": "csv",
"options": {
"header": "true",
"delimiter": ";"
},
"columnNames": ["name", "age"]
}
}
],
"sinks": [
{
"name": "datalake",
"config": {
"sourceTable": "T1654611700631",
"tableFormat": "delta",
"path": "file://{filePath}/delta",
"saveMode": "overwrite"
}
}
]
}

Data read json code

{
"sources": [
{
"name": "datalake",
"type": "source",
"config": {
"resultTable": "T1654611700631",
"tableFormat": "delta",
"path": "file://{filePath}/delta",
}
}
],
"sinks": [
{
"name": "file",
"config": {
"sourceTable": "T1654611700631",
"path": "file://{filePath}/csv",
"saveMode": "overwrite",
"options": {
"header": "true"
},
"serializer": "csv"
}
}
]
}

Need to add new jar

delta-core_2.12-2.0.2.jar
delta-storage-2.0.2.jar

4.3.8 hudi

Configuration instructions

tableFormat: currently supports `hudi` and `delta`

Data written to json code

{
"sources": [
{
"name": "file",
"type": "source",
"config": {
"resultTable": "T1654611700631",
"path": "file://{filePath}/etltest.dolphin",
"serializer": "csv",
"options": {
"header": "true",
"delimiter": ";"
},
"columnNames": ["name", "age"]
}
}
],
"transformations": [
{
"name": "sql",
"type": "transformation",
"config": {
"resultTable": "T111",
"sql": "select * from T1654611700631"
}
}
],
"sinks": [
{
"name": "datalake",
"config": {
"sourceTable": "T1654611700631",
"tableFormat": "hudi",
"options": {
"hoodie.table.name": "huditest",
"hoodie.datasource.write.recordkey.field": "age",
"hoodie.datasource.write.precombine.field":"age"
},
"path": "file://{filePath}/hudi",
"saveMode": "append"
}
}
]
}

Data read json code

{
"sources": [
{
"name": "datalake",
"type": "source",
"config": {
"resultTable": "T1654611700631",
"tableFormat": "hudi",
"path": "file://{filePath}/hudi",
}
}
],
"transformations": [
{
"name": "sql",
"type": "transformation",
"config": {
"resultTable": "T111",
"sql": "select * from T1654611700631"
}
}
],
"sinks": [
{
"name": "file",
"config": {
"sourceTable": "T1654611700631",
"path": "file://{filePath}/csv",
"saveMode": "overwrite",
"options": {
"header": "true"
},
"serializer": "csv"
}
}
]
}

Need to add new jar

hudi-spark3.2-bundle_2.12-0.13.0.jar