1. 数据集成

1.1. 数据集成说明

1.1.1. 数据集成定义

数据集成支持自助式的数据清洗工作,包括字段过滤,字段重命名,字段添加/删除,字段修改类型,表关联,表聚合,表合并等。支持输出到不同目标数据库。

数据集成项目结构说明

字段 类型 描述
id INTEGER 数据集成项目的 id
title STRING 数据集成项目的标题
createdAt DATETIME 数据集成项目创建的时间
createdBy INTEGER 数据集成项目创建用户的 id
updatedAt DATETIME 数据集成项目最后修改的时间
updatedBy INTEGER 数据集成项目最后修改用户的 id
options OBJECT 数据集成项目的配置
options.nodes OBJECT 数组 数据集成项目的节点连线关系
options.nodes[].he OBJECT 节点HE表达式,语法参考HQL参考衡石函数参考
options.nodes[].options OBJECT 节点其他附加信息
options.nodes[].options.title STRING 节点名称
options.nodes[].options.nodeType STRING 节点的类型,详见数据集成项目节点类型
options.nodes[].options.samplingMethod STRING 输入节点的取样方法,详见输入节点的取样方法
options.nodes[].options.samplingNum INTEGER 输入节点的取样个数
options.nodes[].options.inputStrategy STRING 输入节点的输入策略,见输入节点的输入策略
options.nodes[].options.incrementalField STRING 数组 输入节点的增量字段名列表,从左到右的区分度越来越细,比如年,月,日,时,分,秒,毫秒,也可以简单用自增主键
options.nodes[].options.sourceTitle STRING 输入节点的来源名字,对于数据集是数据集名字,对于连接是连接名字,对于文件是原始文件名
options.nodes[].options.validateSchema BOOL 输出节点的输出配置,是否验证schema
options.nodes[].options.tableAction STRING 输出节点的输出配置,表操作,见输出节点的表操作
options.nodes[].options.updateMethod STRING 输出节点的输出配置,更新方法,见输出节点的更新方法
options.nodes[].options.keyFields STRING 数组 输出节点的输出配置,键字段
options.nodes[].options.preSql STRING 输出节点的输出配置,加载前sql
options.nodes[].options.postSql STRING 输出节点的输出配置,加载后sql
options.nodes[].options.createTableProperties STRING 输出节点的输出配置,建表属性
options.defaultOutputConnectionId INTEGER 数据集成项目的默认输出目标连接id
options.defaultPath STRING 数组 数据集成项目的默认输出路径
status STRING 数据集成项目最近一次执行的状态,见执行计划中的任务执行的状态说明
latestStartAt DATETIME 数据集成项目最近一次任务执行开始执行的时间
costTime INTEGER 数据集成项目最近一次任务执行耗时,单位是秒
updatePart STRING 仅修改的时候需要,表示要修改的目标部分,见数据集成项目修改目标说明
entityGroup STRING 数据集成的执行计划类别,用于管理执行计划,固定为PIPELINE
entityKey STRING 数据集成的执行计划关键字,用于管理执行计划
execDetail OBJECT 创建执行计划需要用到的任务描述信息,详见执行计划

数据集成项目修改目标说明

意义
TITLE_AND_PATH 修改标题和输出默认配置,包括title和 options.defaultOutputConnectionId,options.defaultPath
NODES 修改节点的信息,包括options.nodes

数据集成项目节点类型

意义
CONNECTION 数据连接
FILE 文件上传
DATASET 数据集市中的数据集
SQL SQL节点
SINK 输出节点
JOIN 联合节点
UNION 合并节点
AGGREGATE 聚合节点
PIVOT 行转列节点
UNPIVOT 列转行节点
SELECT 选择节点

输入节点的取样方法

意义
DEFAULT limit n,顺序依赖具体数据源的实现
RANDOM 随机 + limit n
ALL 全量

输入节点的输入策略

意义
ALL 全量
INCREMENTAL 增量

输出节点的表操作

意义
NONE
RECREATE 重建表
TRUNCATE 清空数据

输出节点的更新方法

意义
INSERT 插入
UPSERT 更新

任务执行结果说明

字段 类型 描述
schema OBJECT数组 每一个元素表示一个字段的属性,与数据集的字段结构相同
data OBJECT数组 每一个元素是表示一行数据的数组
randomable BOOL 数据源是否支持随机获取

各个类型节点样例

数据集输入节点
{
        "he": {
          "kind": "function",
          "op": "app_dataset",
          "args": [
            101126,                  // 应用id
            1                        // 数据集id
          ],
          "uid": "input-2"           // 节点唯一id
        },
        "options": {
          "title": "A_IVT_MOVIE",
          "nodeType": "DATASET",
          "samplingMethod": "DEFAULT",
          "samplingNum": 1000,
          "inputStrategy": "INCREMENTAL",
          "incrementalField": [
            "f1",
            "f2"
          ]
        }
      }
数据连接输入节点
{
        "he": {
          "kind": "function",
          "op": "db_source",
          "args": [
            25055,                 // 数据连接id
            [                      // schema路径,支持多层,数组元素每个表示一层
              "test"
            ],
            "客户"                 // 表名
          ],
          "uid": "input-3"        // 节点唯一id
        },
        "options": {
          "title": "客户",
          "nodeType": "CONNECTION",
          "samplingMethod": "DEFAULT",
          "samplingNum": 1000,
          "inputStrategy": "INCREMENTAL",
          "incrementalField": [
            "f1",
            "f2"
          ]
        }
      }
sql输入节点
{
  "he": {
    "kind": "function",
    "op": "raw_sql",
    "args": [
      25055,                         // 数据连接id
      "select * from city"           // 执行的sql
    ],
    "uid": "input-3"                 // 节点唯一id
  },
  "options": {
    "title": "sql输入节点",
    "nodeType": "SQL",
    "samplingMethod": "DEFAULT",
    "samplingNum": 1000,
    "inputStrategy": "INCREMENTAL",
    "incrementalField": [
      "f1",
      "f2"
    ]
  }
}
文件输入节点

节点信息生成需要先调用文件保存接口后生成,见使用新上传的文件表格为内容添加数据到内部连接并返回连接和表信息

{
        "he": {
          "kind": "function",
          "op": "db_source",
          "args": [
            1240,
            [
              "hengshi_internal_engine_tmp_schema"
            ],
            "f_u_7_1630930370000_18151_0"
          ],
          "uid": "input-1"
        },
        "options": {
          "title": "date.txt",
          "nodeType": "FILE",
          "samplingMethod": "DEFAULT",
          "samplingNum": 1000,
          "inputStrategy": "ALL",
          "sourceTitle": "date.txt"
        }
      }
行转列节点
{
  "he":{
    "kind":"function",
    "op":"pivot",
    "args":[
      {                                                // 上游引用的节点id
        "kind": "reference",
        "op": "uid-1"
      },
      [                                                // 分组列
        {"kind": "field", "op": "year"}
      ],
      {                                                 // 聚合列
        "kind": "function",
        "op": "sum",
    "args": [
      {
        "kind": "field",
        "op": "val"
      }
        ]
      },
      {"kind": "field", "op": "month"},                 // 转置列
      [                                                 // 转置列列值对应的列名,没有表示自动生成
    {"kind": "constant", "op": "一月", "uid": "Jan"},
    {"kind": "constant", "op": "二月", "uid": "Feb"},
    {"kind": "constant", "op": "三月", "uid": "Mar"},
    {"kind": "constant", "op": "四月", "uid": "Apr"},
    {"kind": "constant", "op": "五月", "uid": "May"},
    {"kind": "constant", "op": "六月", "uid": "Jun"}
      ]
    ],
    "uid":"uid-2"
  },
  "options":{
    "title":"pivot行转列节点",
    "nodeType":"PIVOT",
    "customizedFieldName":true
  }
}
列转行节点
{
  "he":{
    "kind":"function",
    "op":"unpivot",
    "args": [
      {"kind": "reference", "op": "uid-1"},                            // 上游节点
      {"kind": "field", "op": "month", "type": "number"},              // 存放列转行的key列,需要明确的 type
      {"kind": "field", "op": "val"},                                  // 存放列转行的 value 列
      [
        {"kind": "field", "op": "Jan", "value": 1},                    // 待转的列, value 表示转换后的key值
        {"kind": "field", "op": "Feb", "value": 2},
        {"kind": "field", "op": "Mar", "value": 3},
        {"kind": "field", "op": "Apr", "value": 4},
    {"kind": "field", "op": "May", "value": 5},
    {"kind": "field", "op": "Jun", "value": 6}
      ]
    ],
    "uid":"uid-3"
  },
  "options":{
    "title":"unpivot列转换节点",
    "nodeType":"UNPIVOT"
  }
}
聚合节点
{
  "he":{
    "kind": "function",
    "op": "summarize",
    "args": [
        {"kind": "reference", "op": "uid-1"},                        // 上游节点引用id
        {"kind": "field", "op": "id", "uid": "id2"},                 // 聚合的字段列表,可以是纯字段或者是聚合表达式
        {
          "kind": "function",
          "op": "trunc_year",
          "args": [{"kind": "field", "op": "pubdate"}],
          "uid": "pubdate2"
        },
        {
          "kind": "function",
          "op": "trunc_millisecond",
          "args": [{"kind": "field", "op": "release_time"}],
          "uid": "release_time2"
        },
        {
          "kind": "function",
          "op": "count",
          "args": [{"kind": "field", "op": "id"}]
          "uid": "votes"
        },
        {"kind": "formula", "op": "sum({rate_num})", "uid": "rate_num_sum"},
    ],
    "uid":"uid-3"
  },
  "options":{
    "title":"聚合节点",
    "nodeType":"AGGREGATE",
    "aggStartIndex":4                                                 // 分组列和聚合列的分界线,仅在和衡石前端交互有用
  }
}
联合节点
{
  "he": {
    "kind": "function",
    "op": "left_join",                                   // 支持left_join,right_join,inner_join,full_join
    "args": [
      {"kind": "reference", "op": "input-1"},           // 联合左表引用的节点id
      {"kind": "reference", "op": "input-2"},           // 联合右表引用的节点id
      {                                                 // 联合的条件
        "kind": "function",
        "op": "and",
        "args": [
          {
            "kind": "function",
            "op": "=",
            "args": [
              {
                "kind": "field",
                "dataset": "input-1",                 // 字段来自的节点id
                "op": "id"
              },
              {
                "kind": "field",
                "dataset": "input-2",                // 字段来自的节点id
                "op": "id"
              }
            ]
          }
        ]
      }
    ],
    "uid": "join-uid-1"
  },
  "options": {
    "title": "联合节点",
    "nodeType": "JOIN"
  }
}
合并节点
{
  "he": {
    "kind": "function",
    "op": "append",
    "args": [
      {"kind": "reference", "op": "uid-1"},  // 合并的第一个节点的唯一id引用,结果以这个节点的字段为准
      {"kind": "reference", "op": "uid-2"},  // 合并的其他节点的唯一id引用
      {"kind": "reference", "op": "uid-3"}
    ],
    "uid": "union-uid-1"                     // 节点唯一id
  },
  "options": {
    "title": "合并节点",
    "nodeType": "UNION"
  }
}
选择节点

使用表达式过滤的例子

{
  "he": {
    "kind": "function",
    "op": "select_fields_complete",
    "args": [
      {"kind": "reference", "op": "uid-1"},                                   // 上游引用的节点id
      [
        {"kind": "field", "op": "*"},                                         // 保留的字段,*表示全部
        {"kind": "field", "op": "f1", "uid": "别名1"},                         // 重命名字段
        {"kind": "field", "op": "f2", "type": "string"},                      // 修改字段类型
        {"kind": "formula", "op": "{f1} + {f2} * {f3}", "uid": "别名2"}        // 新增表达式字段
      ],
      [                                                                       // 过滤条件,这里是一个表达式过滤
        {
          "kind": "formula", "op": "{f1} > 10 and {f2} + {f3} > 100"
        }
      ]
    ],
    "uid": "select-uid-1"
  },
  "options": {
    "title": "选择节点",
    "nodeType": "SELECT"
  }
}

使用简单过滤条件的例子

{
  "he": {
    "kind": "function",
    "op": "select_fields_complete",
    "args": [
      {"kind": "reference", "op": "uid-1"},
      [
        {"kind": "field", "op": "*"},
        {"kind": "field", "op": "f1", "uid": "别名1"},
        {"kind": "field", "op": "f2", "type": "string"},
        {"kind": "formula", "op": "{f1} + {f2} * {f3}", "uid": "别名2"}
      ],
      [                                                                         // 过滤条件,这里是一个简单过滤条件
        {
          "kind":"function",
          "op":"and",
          "args":[
            {
              "kind":"function",
              "op":"=",
              "args":[
                {"kind":"field","op":"id","type":"number"},
                {"kind":"constant","op":1,"type":"number"}
              ]
            },
            {
              "kind":"function",
              "op":"isnotnull",
              "args":[
                {"kind":"field","op":"short_name","type":"string"}
              ]
            }
          ]
        }
      ]
    ],
    "uid": "select-uid-1"
  },
  "options": {
    "title": "选择节点",
    "nodeType": "SELECT"
  }
}
输出节点
{
        "he": {
          "kind": "function",
          "op": "db_output",
          "args": [
            "input-1",                  // 要输出的上游节点id
            1,                          // 输出的数据连接id
            [                           // 输出的schema路径,支持多层,每层一个元素
              "test"
            ],
            "movie"                     // 输出表名
          ],
          "uid": "output-1"             // 节点唯一id
        },
        "options": {
          "title": "output-1",
          "nodeType": "SINK",
          "validateSchema": false,
          "tableAction": "RECREATE",
          "updateMethod": "INSERT",
          "keyFields": [
            "f3",
            "f4"
          ],
          "preSql": "",
          "postSql": ""
        }
      }

1.2. 接口说明

1.2.1. 新建一个数据集成项目

请求URL

POST /api/pipelines

需要认证:是

请求参数

URL 参数
字段 类型 是否必须 说明
Request Body 参数
字段 类型 是否必须 描述
title STRING 数据集成项目的标题
options.defaultOutputConnectionId INTEGER 数据集成项目的默认输出目标连接id
options.defaultPath STRING 数组 数据集成项目的默认输出路径

返回对象的格式说明

字段 类型 说明
version STRING 当前系统版本哈希值
data OBJECT 数据集成项目结构说明

接口示例:

POST /api/pipelines

{
  "title": "wss p2",
  "options": {
    "defaultOutputConnectionId": 5,
    "defaultPath": [
      "public"
    ]
  }
}

返回

{
  "version": "3.1-SNAPSHOT@bbdb0d6#f424d94",
  "code": 0,
  "msg": "success",
  "data": {
    "id": 2,
    "title": "wss p2",
    "createdBy": 1,
    "createdAt": "2020-04-02 15:08:33",
    "updatedBy": 1,
    "updatedAt": "2020-04-02 15:08:33",
    "options": {
      "defaultPath": [
        "public"
      ],
      "defaultOutputConnectionId": 5
    }
  }
}

1.2.2. 删除一个数据集成项目

请求URL

 DELETE /api/pipelines/{pipelineId}

需要认证:是

请求参数

URL 参数
字段 类型 是否必须 说明
Request Body 参数
字段 类型 是否必须 描述

返回对象的格式说明

字段 类型 说明
version STRING 当前系统版本哈希值
code 错误码 请求成功返回0
msg STRING 请求成功返回success

接口示例:

DELETE /api/pipelines/1

返回

{
  "code": 0,
  "msg": "success",
  "version": "3.1-SNAPSHOT@@git.commit.id.abbrev@#1234567"
}

1.2.3. 修改一个数据集成项目

请求URL

 PUT /api/pipelines/{pipelineId}

需要认证:是

请求参数

URL 参数
字段 类型 是否必须 说明
Request Body 参数

字段见 数据集成项目结构说明,其中updatePart是必须的,后端根据修改的部分对该部分做校验。

返回对象的格式说明

字段 类型 说明
version STRING 当前系统版本哈希值
data OBJECT 数据集成项目结构说明

接口示例: 1 修改标题和默认输出路径

PUT /api/pipelines/1

{
  "updatePart": "TITLE_AND_PATH",
  "title": "wss p3",
  "options": {
    "defaultOutputConnectionId": 2,
    "defaultPath": [
      "test"
    ]
  }
}

返回

{
  "version": "3.6-SNAPSHOT@bbdb0d6#f424d94",
  "code": 0,
  "msg": "success",
  "data": {
    "id": 2,
    "title": "wss p3",
    "createdBy": 1,
    "createdAt": "2020-04-02 15:08:33",
    "updatedBy": 1,
    "updatedAt": "2020-04-02 15:08:33",
    "options": {
      "defaultPath": [
        "test"
      ],
      "defaultOutputConnectionId": 2
    }
  }
}

接口示例: 2 添加数据集类型输入节点

PUT /api/pipelines/1

{
  "updatePart": "NODES",
  "options": {
    "nodes": [
      {
        "he": {
          "kind": "function",
          "op": "app_dataset",
          "args": [
            101126,
            1
          ],
          "uid": "input-2"
        },
        "options": {
          "title": "A_IVT_MOVIE",
          "nodeType": "DATASET",
          "samplingMethod": "DEFAULT",
          "samplingNum": 1000,
          "inputStrategy": "INCREMENTAL",
          "incrementalField": [
            "f1",
            "f2"
          ]
        }
      }
    ]
  }
}

返回

{
  "version": "3.6-SNAPSHOT@bbdb0d6#f424d94",
  "code": 0,
  "msg": "success",
  "data": {
    "id": 2,
    "title": "wss p3",
    "createdBy": 1,
    "createdAt": "2020-04-02 15:08:33",
    "updatedBy": 1,
    "updatedAt": "2020-04-02 15:08:33",
    "options": {
      "defaultPath": [
        "test"
      ],
      "defaultOutputConnectionId": 2,
      "nodes": [
        {
          "he": {
            "kind": "function",
            "op": "app_dataset",
            "args": [
              101126,
              1
            ],
            "uid": "input-2"
          },
          "options": {
            "title": "A_IVT_MOVIE",
            "nodeType": "DATASET",
            "samplingMethod": "DEFAULT",
            "samplingNum": 1000,
            "inputStrategy": "INCREMENTAL",
            "incrementalField": [
              "f1",
              "f2"
            ]
          }
        }
      ]
    }
  }
}

接口示例: 3 添加文件类型输入节点, 文件上传接口和返回信息参考上传文件准备作为输入节点接口

PUT /api/pipelines/1

{
  "updatePart": "NODES",
  "options": {
    "nodes": [
      {
        "he": {
          "kind": "function",
          "op": "db_source",
          "args": [
            1240,
            [
              "hengshi_internal_engine_tmp_schema"
            ],
            "f_u_7_1630930370000_18151_0"
          ],
          "uid": "input-1"
        },
        "options": {
          "title": "date.txt",
          "nodeType": "FILE",
          "samplingMethod": "DEFAULT",
          "samplingNum": 1000,
          "inputStrategy": "ALL",
          "sourceTitle": "date.txt"
        }
      }
    ]
  }
}

返回

{
  "version": "3.6-SNAPSHOT@bbdb0d6#f424d94",
  "code": 0,
  "msg": "success",
  "data": {
    "id": 2,
    "title": "wss p3",
    "createdBy": 1,
    "createdAt": "2020-04-02 15:08:33",
    "updatedBy": 1,
    "updatedAt": "2020-04-02 15:08:33",
    "options": {
      "defaultPath": [
        "test"
      ],
      "defaultOutputConnectionId": 2,
      "nodes": [
        {
          "he": {
            "kind": "function",
            "op": "db_source",
            "args": [
              1240,
              [
                "hengshi_internal_engine_tmp_schema"
              ],
              "f_u_7_1630930370000_18151_0"
            ],
            "uid": "input-1"
          },
          "options": {
            "title": "date.txt",
            "nodeType": "FILE",
            "samplingMethod": "DEFAULT",
            "samplingNum": 1000,
            "inputStrategy": "ALL"
          }
        }
      ]
    }
  }
}

接口示例: 4 添加连接类型的输入节点

PUT /api/pipelines/1

{
  "updatePart": "NODES",
  "options": {
    "nodes": [
      {
        "he": {
          "kind": "function",
          "op": "db_source",
          "args": [
            25055,
            [
              "test"
            ],
            "客户"
          ],
          "uid": "input-3"
        },
        "options": {
          "title": "客户",
          "nodeType": "CONNECTION",
          "samplingMethod": "DEFAULT",
          "samplingNum": 1000,
          "inputStrategy": "INCREMENTAL",
          "incrementalField": [
            "f1",
            "f2"
          ]
        }
      }
    ]
  }
}

返回

{
  "version": "3.6-SNAPSHOT@bbdb0d6#f424d94",
  "code": 0,
  "msg": "success",
  "data": {
    "id": 2,
    "title": "wss p3",
    "createdBy": 1,
    "createdAt": "2020-04-02 15:08:33",
    "updatedBy": 1,
    "updatedAt": "2020-04-02 15:08:33",
    "options": {
      "defaultPath": [
        "test"
      ],
      "defaultOutputConnectionId": 2,
      "nodes": [
        {
          "he": {
            "kind": "function",
            "op": "db_source",
            "args": [
              25055,
              [
                "test"
              ],
              "客户"
            ],
            "uid": "input-3"
          },
          "options": {
            "title": "客户",
            "nodeType": "CONNECTION",
            "samplingMethod": "DEFAULT",
            "samplingNum": 1000,
            "inputStrategy": "INCREMENTAL",
            "incrementalField": [
              "f1",
              "f2"
            ]
          }
        }
      ]
    }
  }
}

接口示例: 5 添加输出节点

PUT /api/pipelines/1

{
  "updatePart": "NODES",
  "options": {
    "nodes": [
      {
        "he": {
          "kind": "function",
          "op": "db_output",
          "args": [
            "input-1",
            1,
            [
              "test"
            ],
            "movie"
          ],
          "uid": "output-1"
        },
        "options": {
          "title": "output-1",
          "nodeType": "SINK",
          "validateSchema": false,
          "tableAction": "RECREATE",
          "updateMethod": "INSERT",
          "keyFields": [
            "f3",
            "f4"
          ],
          "preSql": "",
          "postSql": ""
        }
      }
    ]
  }
}

返回

{
  "version": "3.6-SNAPSHOT@bbdb0d6#f424d94",
  "code": 0,
  "msg": "success",
  "data": {
    "id": 2,
    "title": "wss p3",
    "createdBy": 1,
    "createdAt": "2020-04-02 15:08:33",
    "updatedBy": 1,
    "updatedAt": "2020-04-02 15:08:33",
    "options": {
      "defaultPath": [
        "test"
      ],
      "defaultOutputConnectionId": 2,
      "nodes": [
        {
          "he": {
            "kind": "function",
            "op": "db_output",
            "args": [
              "input-1",
              1,
              [
                "test"
              ],
              "movie"
            ],
            "uid": "output-1"
          },
          "options": {
            "title": "output-1",
            "nodeType": "SINK",
            "validateSchema": false,
            "tableAction": "RECREATE",
            "updateMethod": "INSERT",
            "keyFields": [
              "f3",
              "f4"
            ],
            "preSql": "",
            "postSql": ""
          }
        }
      ]
    }
  }
}

1.2.4. 查看一个数据集成项目

请求URL

 GET /api/pipelines/{pipelineId}

需要认证:是

请求参数

URL 参数
字段 类型 是否必须 说明
Request Body 参数
字段 类型 是否必须 描述

返回对象的格式说明

字段 类型 说明
version STRING 当前系统版本哈希值
data OBJECT 数据集成项目结构说明

接口示例:

GET /api/pipelines/1 返回

{
  "data": {
    "id": 1,
    "title": "数据集成1",
    "status": "PENDING",
    "latestStartAt": "2019-05-05 17:55:55.123",
    "costTime": 60,
    "options": {
      "nodes": [
        {
          "he": {
            "kind": "function",
            "op": "app_dataset",
            "args": [
              101126,
              1
            ],
            "uid": "input-2"
          },
          "options": {
            "title": "A_IVT_MOVIE",
            "nodeType": "DATASET",
            "samplingMethod": "DEFAULT",
            "samplingNum": 1000,
            "inputStrategy": "INCREMENTAL",
            "incrementalField": [
              "f1",
              "f2"
            ]
          }
        }
      ],
      "defaultOutputConnectionId": 1,
      "defaultPath": [
        "public"
      ]
    }
  }
}

1.2.5. 获取所有数据集成项目

请求URL

GET /api/pipelines

需要认证:是

请求参数

URL 参数
字段 类型 是否必须 说明
offset INTEGER 可选 分页偏移量,默认是0
limit INTEGER 可选 分页获取个数,默认是10
Request Body 参数
字段 类型 是否必须 描述

返回对象的格式说明

字段 类型 说明
version STRING 当前系统版本哈希值
data OBJECT 数组 每个对象见见数据集成项目结构说明

接口示例:

GET /api/pipelines 返回

{
  "data": [
    {
      "id": 1,
      "title": "数据集成1",
      "status": "PENDING",
      "latestStartAt": "2019-05-05 17:55:55.123",
      "costTime": 60,
      "options": {
        "nodes": [
          {
            "he": {
              "kind": "function",
              "op": "app_dataset",
              "args": [
                101126,
                1
              ],
              "uid": "input-2"
            },
            "options": {
              "title": "A_IVT_MOVIE",
              "nodeType": "DATASET",
              "samplingMethod": "DEFAULT",
              "samplingNum": 1000,
              "inputStrategy": "INCREMENTAL",
              "incrementalField": [
                "f1",
                "f2"
              ]
            }
          }
        ],
        "defaultOutputConnectionId": 1,
        "defaultPath": [
          "public"
        ]
      }
    },
    ...
  ],
  ...
}

1.2.6. 使用新上传的文件表格为内容添加数据到内部连接并返回连接和表信息

请求URL

POST /api/pipelines/{pipelineId}/nodes/files/{fileId}/sheets/{sheetId}/save

需要认证:是

请求参数

URL 参数
字段 类型 是否必须 说明
Request Body 参数

注意,这个options是描述文件内容的options,不是节点的options。

字段 类型 是否必须 描述
options OBJECT 描述文件内容
options.delimiter STRING 文件的分隔符,见文件分隔符
options.encoding STRING 文件的编码
options.header INTEGER 表头所在行数,从0开始
options.origin STRING 文件元素类,可能为file_excel,file_csv
options.range OBJECT数组 数据在文件中的行列范围
options.range[].xbegin INTEGER 开始列数,从0开始,包含自身
options.range[].xend INTEGER 结束列数,从0开始,包含自身
options.range[].ybegin INTEGER 开始行数,从0开始,包含自身
options.range[].yend INTEGER 结束行数,从0开始,包含自身
transpose BOOL 是否做行列翻转
title STRING 文件的名字
文件分隔符
意义
colon 英文的冒号(:)
comma 英文的逗号(,)
pipe 英文的竖线(\ )
semicolon 英文的分号(;)
tab tab符号(\t)

返回对象的格式说明

字段 类型 说明
version STRING 当前系统版本哈希值
data OBJECT 数据集成项目结构说明 中options.nodes的说明

接口示例:

POST /api/pipelines/2/nodes/files/56c8905fcc06bd0b210b29401e336194/sheets/0/save

{
  "options": {
    "delimiter": "comma",
    "encoding": "UTF-8",
    "header": 1,
    "origin": "file_csv",
    "range": [
      {
        "xbegin": 1,
        "xend": 6,
        "ybegin": 2,
        "yend": 10
      }
    ],
    "transpose": false
  },
  "title": "date.txt"
}

返回

{
  "version": "3.6-SNAPSHOT@bbdb0d6#f424d94",
  "data": {
    "he": {
      "kind": "function",
      "op": "db_source",
      "args": [
        1240,
        [
          "hengshi_internal_engine_tmp_schema"
        ],
        "f_u_7_1630930370000_18151_0"
      ]
    },
    "options": {
      "sourceTitle": "date.txt"
    }
  }
}

1.2.7. 预览一个节点的数据

请求URL

GET /api/pipelines/{pipelineId}/nodes/{uid}/data

需要认证:是

请求参数

URL 参数
字段 类型 是否必须 说明
offset INTEGER 可选 分页偏移量,默认是0
limit INTEGER 可选 分页获取个数,默认是1000,可以用0来表示只获取schema
Request Body 参数
字段 类型 是否必须 描述

返回对象的格式说明

字段 类型 说明
version STRING 当前系统版本哈希值
data OBJECT 任务执行结果说明
errorMap OBJECT 具体节点的错误信息,是一个字符串到字符串的map

接口示例:

GET /api/pipelines/1/nodes/input-1/data

返回

{
  "data": {
    "data": [
      ...
    ],
    "schema": [
      ...
    ],
    "randomable": false
  },
  ...
}

1.2.8. 根据当前设置预览一个节点的数据

请求URL

POST /api/pipelines/{pipelineId}/nodes/{uid}/data

需要认证:是

请求参数

URL 参数
字段 类型 是否必须 说明
offset INTEGER 可选 分页偏移量,默认是0
limit INTEGER 可选 分页获取个数,默认是1000,可以用0来表示只获取schema
Request Body 参数
字段 类型 是否必须 描述
options OBJECT 字段和数据集成项目结构说明中的options一致

返回对象的格式说明

字段 类型 说明
version STRING 当前系统版本哈希值
data OBJECT 任务执行结果说明
errorMap OBJECT 具体节点的错误信息,是一个字符串到字符串的map

接口示例:

POST /api/pipelines/1/nodes/input-1/data

{
  "options":{
    "defaultOutputConnectionId":1,
    "defaultPath":["public"],
    "nodes":[...]
  }
}

返回

{
  "data": {
    "data": [
      ...
    ],
    "schema": [
      ...
    ],
    "randomable": false
  },
  ...
}

1.2.9. 获取一个节点的debug sql

请求URL

GET /api/pipelines/{pipelineId}/nodes/{uid}/sql-debug

需要认证:是

请求参数

URL 参数
字段 类型 是否必须 说明
Request Body 参数
字段 类型 是否必须 描述

返回对象的格式说明

字段 类型 说明
version STRING 当前系统版本哈希值
data STRING 当前节点预计会生成的sql
errorMap OBJECT 具体节点的错误信息,是一个字符串到字符串的map

接口示例:

GET /api/pipelines/1/nodes/input-1/sql-debug

返回

{
  "data": "select * from table",
  ...
}

1.2.10. 根据当前设置获取一个节点的debug sql

请求URL

POST /api/pipelines/{pipelineId}/nodes/{uid}/sql-debug

需要认证:是

请求参数

URL 参数
字段 类型 是否必须 说明
Request Body 参数
字段 类型 是否必须 描述
options OBJECT 字段和数据集成项目结构说明中的options一致

返回对象的格式说明

字段 类型 说明
version STRING 当前系统版本哈希值
data STRING 当前节点预计会生成的sql
errorMap OBJECT 具体节点的错误信息,是一个字符串到字符串的map

接口示例:

POST /api/pipelines/1/nodes/input-1/sql-debug

{
  "options":{
    "defaultOutputConnectionId":1,
    "defaultPath":["public"],
    "nodes":[...]
  }
}

返回

{
  "data": "select * from table",
  ...
}

### 根据当前设置预览一个节点的某个字段的排重值数据

#### 请求URL

POST /api/pipelines/{pipelineId}/nodes/{uid}/fields/{fieldName}/distinct


需要认证:是

#### 请求参数

##### URL 参数

| 字段  | 类型     |是否必须| 说明                              |
|-------|---------|-------|-----------------------------------|
| offset                                | INTEGER     | 可选   |分页偏移量,默认是0                              |
| limit                                 | INTEGER     | 可选   |分页获取个数,默认是1000,可以用0来表示只获取schema |

##### Request Body 参数

| 字段                                  | 类型        | 是否必须|描述                                           |
|---------------------------------------|------------|-------|-----------------------------------------------|
| options |OBJECT|是|字段和[数据集成项目结构说明](#数据集成项目结构说明)中的options一致|


#### 返回对象的格式说明

| 字段     | 类型    | 说明                               |
|---------|---------|-----------------------------------|
| version | STRING  | 当前系统版本哈希值                   |
| data    | OBJECT  | 见[任务执行结果说明](#任务执行结果说明)       |
| errorMap |OBJECT |具体节点的错误信息,是一个字符串到字符串的map|

#### 接口示例:

`POST /api/pipelines/1/nodes/input-1/fields/month/distinct`

```json
{
  "options":{
    "defaultOutputConnectionId":1,
    "defaultPath":["public"],
    "nodes":[...]
  }
}

返回

{
  "data": {
    "data": [
      ["Jan"],
      ["Feb"],
      ["Mar"],
      ["Apr"],
      ["May"],
      ...
    ],
    "schema": [
      ...
    ]
  },
  ...
}

1.2.11. 列出当前用户的支持输出的连接列表

请求URL

GET /api/connections/writable

需要认证:是

请求参数

URL 参数
字段 类型 是否必须 说明
Request Body 参数
字段 类型 是否必须 描述

返回对象的格式说明

字段 类型 说明
version STRING 当前系统版本哈希值
data OBJECT 数组 每个成员描述连接的信息

接口示例:

GET /api/connections/writable

{
  "version": "3.1-SNAPSHOT@bbdb0d6#f424d94",
  "code": 0,
  "msg": "success",
  "data": [
    {
      "id": 5,
      "options": {
        "type": "postgresql",
        "maxConnNum": 10,
        "config": {},
        "protocol": "http",
        "outputAble": true
      },
      "createdBy": 1,
      "createdAt": "2020-04-01 16:57:51",
      "updatedBy": 1,
      "updatedAt": "2020-04-01 16:57:51",
      "visible": true,
      "title": "pg wss",
      "status": 0,
      "refreshStats": {},
      "hsVersion": 0,
      "accessCount": 1,
      "creator": {
        "id": 1,
        "name": "hengshiwss",
        "email": "weishishuo@hengshi.com",
        "userAttributes": {
          "name": "hengshiwss",
          "id": 1,
          "email": "weishishuo@hengshi.com"
        }
      },
      "updater": {
        "id": 1,
        "name": "hengshiwss",
        "email": "weishishuo@hengshi.com",
        "userAttributes": {
          "name": "hengshiwss",
          "id": 1,
          "email": "weishishuo@hengshi.com"
        }
      },
      "auth": false
    }
  ],
  "totalHits": 1,
  "offset": 0
}

1.2.12. 获取指定数据集成项目的状态

请求URL

GET /api/pipelines/status

需要认证:是

请求参数

URL 参数
字段 类型 是否必须 说明
ids STRING 需要查询状态的pipeline的id列表,用英文逗号分隔
Request Body 参数
字段 类型 是否必须 描述

返回对象的格式说明

字段 类型 说明
version STRING 当前系统版本哈希值
data OBJECT 是一个map,key是pipeline的id,value是状态,见执行计划中的任务执行的状态说明,id不存在或者没权限的,map里面不返回

接口示例:

GET /api/pipelines/status?ids=1,2,3

返回

{
  "data": {
    "1": "PENDING",
    "2": "RUNNING",
    "3": "SUCCESSFUL"
  },
  ...
}

1.2.13. 根据当前配置立即执行接口

请求URL

POST /api/schedules

需要认证:是

请求参数

URL 参数
字段 类型 是否必须 描述
Request Body 参数

这个接口重用执行计划管理中的立即执行接口,参数比那边两个以下参数

字段 类型 是否必须 描述
execDetail.jobParams.uid STRING 节点的uid,当非空的时候是执行指定节点,否则按当前options配置执行整个项目
execDetail.jobParams.options OBJECT 当前项目的设置,字段和数据集成项目结构说明中的options一致

返回对象的格式说明

字段 类型 说明
version STRING 当前系统版本哈希值
data OBJECT 执行计划说明中执行计划结构说明

接口示例1: 根据options立即执行数据集成

POST /api/schedules

{
    "entityGroup": "PIPELINE",
    "entityKey": "1",
    "planItems": [
        {
            "triggerType": "ONCE"
        }
    ],
    "execDetail": {
        "jobClass": "com.hengshi.nangaparbat.schedulejob.PipelineJob",
        "jobParams": {
          "pipeline": 1,
          "options": {
            "defaultOutputConnectionId":1,
            "defaultPath":["public"],
            "nodes":[...]
          }
        },
        "retryTimes": 1
    }
}

返回

{
    "code": 0,
    "data": {
        "id": 1,
        "entityGroup": "PIPELINE",
        "entityKey": "1",
        "enabled": true,
        "planItems": [
            {
                "id": 3,
                "triggerType": "ONCE",
                "planId": 1
            }
        ],
        "execDetail": {
            "jobClass": "com.hengshi.nangaparbat.schedulejob.PipelineJob",
            "jobParams": {
              "pipeline": 1,
              "options": {
                "defaultOutputConnectionId":1,
                "defaultPath":["public"],
                "nodes":[...]
              }
            },
            "retryTimes": 1
        },
        "title": "pl1",
        "createdAt": "2020-03-05 15:01:02",
        "createdBy": 1,
        "updatedAt": "2020-03-05 15:01:02",
        "updatedBy": 1
    },
    "msg": "success",
    "version": "4.0-SNAPSHOT@@git.commit.id.abbrev@#1234567"
}

接口示例2: 根据当前options配置立即执行一个(一般是输出)节点

POST /api/schedules

{
    "entityGroup": "PIPELINE",
    "entityKey": "1",
    "planItems": [
        {
            "triggerType": "ONCE"
        }
    ],
    "execDetail": {
        "jobClass": "com.hengshi.nangaparbat.schedulejob.PipelineJob",
        "jobParams": {
          "pipeline": 1,
          "uid": "output-1",
          "options": {
            "defaultOutputConnectionId":1,
            "defaultPath":["public"],
            "nodes":[...]
          }
        },
        "retryTimes": 1
    }
}

返回

{
    "code": 0,
    "data": {
        "id": 1,
        "entityGroup": "PIPELINE",
        "entityKey": "1",
        "enabled": true,
        "planItems": [
            {
                "id": 3,
                "triggerType": "ONCE",
                "planId": 1
            }
        ],
        "execDetail": {
            "jobClass": "com.hengshi.nangaparbat.schedulejob.PipelineJob",
            "jobParams": {
              "pipeline": 1,
              "uid": "output-1",
              "options": {
                "defaultOutputConnectionId":1,
                "defaultPath":["public"],
                "nodes":[...]
              }
            },
            "retryTimes": 1
        },
        "title": "pl1",
        "createdAt": "2020-03-05 15:01:02",
        "createdBy": 1,
        "updatedAt": "2020-03-05 15:01:02",
        "updatedBy": 1
    },
    "msg": "success",
    "version": "4.0-SNAPSHOT@@git.commit.id.abbrev@#1234567"
}

1.2.14. 查询任务执行具体节点错误信息

请求URL

GET /api/contexts/{contextId}/errors

需要认证:是

请求参数

URL 参数
字段 类型 是否必须 说明
block BOOL 可选 是否阻塞,为true表示等到执行完再返回,否则返回空data,默认是true
Request Body 参数
字段 类型 是否必须 描述

返回对象的格式说明

字段 类型 说明
version STRING 当前系统版本哈希值
data OBJECT 是一个map,map的key是节点的uid,value是具体的错误信息

接口示例:

GET /api/contexts/{contextId}/errors

返回

{
    "code": 0,
    "data": {
      "uid-1": "文件节点没有上传",
      "uid-2": "输出节点没有上游",
      ...
    },
    "msg": "success",
    "version": "4.0-SNAPSHOT@@git.commit.id.abbrev@#1234567"
}

1.2.15. 复制一个数据集成项目

请求URL

POST /api/pipelines/{pipelineId}/duplicate

需要认证:是

请求参数

URL 参数
字段 类型 是否必须 说明
Request Body 参数
字段 类型 是否必须 描述

返回对象的格式说明

字段 类型 说明
version STRING 当前系统版本哈希值
data OBJECT 数据集成项目结构说明

接口示例:

POST /api/pipelines/1/duplicate

{}

返回

{
  "version": "4.0-SNAPSHOT@bbdb0d6#f424d94",
  "code": 0,
  "msg": "success",
  "data": {
    "id": 2,
    "title": "数据集成1 (1)",
    "createdBy": 1,
    "createdAt": "2022-04-02 15:08:33",
    "updatedBy": 1,
    "updatedAt": "2022-04-02 15:08:33",
    "options": {
      "defaultPath": [
        "public"
      ],
      "defaultOutputConnectionId": 5,
      "nodes":[...]
    }
  }
}

1.2.16. 根据当前设置获取输出节点的建表属性模板

请求URL

POST /api/pipelines/{pipelineId}/nodes/{uid}/create-table-props-template

需要认证:是

请求参数

URL 参数
字段 类型 是否必须 说明
Request Body 参数
字段 类型 是否必须 描述
options OBJECT 字段和数据集成项目结构说明中的options一致

返回对象的格式说明

字段 类型 说明
version STRING 当前系统版本哈希值
data STRING 根据当前输出的数据源类型和字段生成的建表属性模板

接口示例:

POST /api/pipelines/1/nodes/input-1/create-table-props-template

{
  "options":{
    "defaultOutputConnectionId":1,
    "defaultPath":["public"],
    "nodes":[...]
  }
}

返回

{
  "data": "distributed by (f3) partition by list(f2) (DEFAULT PARTITION pd,partition p1 values(1),partition p2 values (2),partition p3 values (3))",
  ...
}

1.2.17. 根据当前建表属性等设置测试建表

请求URL

POST /api/pipelines/{pipelineId}/nodes/{uid}/test-create

需要认证:是

请求参数

URL 参数
字段 类型 是否必须 说明
Request Body 参数
字段 类型 是否必须 描述
options OBJECT 字段和数据集成项目结构说明中的options一致

返回对象的格式说明

字段 类型 说明
version STRING 当前系统版本哈希值
msg STRING http code不是200的话,返回具体的错误信息

接口示例:

POST /api/pipelines/1/nodes/input-1/test-create

{
  "options":{
    "defaultOutputConnectionId":1,
    "defaultPath":["public"],
    "nodes":[...]
  }
}

返回

{
  "msg": "ERROR:  column "f3" named in 'DISTRIBUTED BY' clause does not exist",
  ...
}

results matching ""

    No results matching ""

    数据科学-段落 公钥加密