跳至正文

金蝶云ERP +Tableau 实践:以 API构建企业级数据仓库(上)

最近老客户推荐了新客户,让我基于金蝶 ERP 构建企业的分析中心,相比Sap、Monitor ERP 等成熟 ERP,金蝶 ERP 最大的问题在于如何及时、准确地获得数据。

坐拥全国这么多的云端客户,金蝶甚至都不给客户开放数据分析所需要的数据库权限!API 数据每日限制只有5万行 ,这在保证系统稳定性的同时,也限制了客户的手脚,限制了系统的持续改进。

为此,在购买 API2DataBase 的系统之前,我不得不使用 Python 先自己尝试获得一个数据表,开始探索之路。

一、构建平台的关系

在金蝶 ERP 和 Tableau 之间,很明显需要一个“数据仓库”的实体存在。测试阶段,这个数据仓库可以是我本地的 PostgreSQL 或者 MySQL 数据库,正式交付客户时,则将是基于云服务的 Postgresql 云数据库。

图示展示了金蝶ERP与API、PostgreSQL数据库以及Tableau BI之间的关系,涉及数据流和处理步骤。

之所以使用 Postgresql 而非 MySQL,还有一个原因是是,前者可以更好地兼容 json 数据格式,特别是9.4引入了 jsonb 类型,它存储二进制格式,具有更好的查询性能。

由于金蝶API 可能随着时间变化,因为我计划将 元数据JSON 数据(多层嵌套结构)先存储到数据库,根据元数据信息,查询相关数据表信息并写入数据库,通过存储过程(如 PL/pgSQL)或视图来构建一层抽象,用于数据处理(如解析嵌套字段)和分析(如过滤 MustInput=1 的字段、统计 FieldType 等)。

这是一个典型的 ETL/分析流程,数据库选择需要平衡 JSON 存储的灵活性、查询效率、一致性和维护成本。

注:2026年二季度开始,将接受“金蝶云星空/金蝶云苍穹”的新客户项目,并以此为基础筹划“供应链分析主题最佳实践”的图书,有兴趣的客户或个人欢迎提前联系。 @admin (wyp@vizwise.cn)

一张展示金蝶 ERP、PostgreSQL 数据库及 Tableau 服务器的架构示意图,包含 API 同步与数据分析组件,图中标注了相关服务器和网络配置。

二、数据采集过程(简化版本)

这里以我正在做的“采购订单表”为例。

1、API 的使用说明

金蝶的采购订单为例,对应的 formid = PUR_PurchaseOrder。如下图所示,展示了金蝶 API 官方测试的示例,可以查询采购订单前2000行数据。

显示金蝶云系统的 API 测试界面,包含请求参数和结果展示,界面上突出显示了主要 ID 和请求字段的输入限制。

查询结果如下所示:

[
  [
    "2022-03-24T00:00:00",
    "CGDD000001"
  ],
  [
    "2022-03-24T00:00:00",
    "CGDD000002"
  ],
  [
    "2022-03-24T00:00:00",
    "CGDD000003"
  ],
……  
]

由于存在查询上限,所以订单就需要增加查询条件(FilterString)并借助程序循环查询。同时,还需要想办法获得采购订单的字段列(即元数据信息)。比如查询某一天的采购订单,如下所示:

{
  "FormId": "PUR_PurchaseOrder",
  "FieldKeys": "FBillNo,FBillTypeID,FDate,FSupplierId,FPurchaseOrgId,FPurchaseDeptId,FPurchaserGroupId,FPurchaserId,FProviderId,FSettleId,FChargeId,FChangeStatus,FACCTYPE,FSettleModeId,FPayConditionId,FSettleCurrId,FExchangeTypeId,FExchangeRate,FPriceTimePoint,FFOCUSSETTLEORGID,FProductType,FIsIncludedTax,FISPRICEEXCLUDETAX,FLocalCurrId,FMaterialId,FMaterialDesc,FUnitId,FQty,FPriceUnitId,FPriceUnitQty,FPriceBaseQty,FDeliveryDate,FPrice,FTaxPrice,FEntryTaxRate,FRequireOrgId,FRequireDeptId,FReceiveOrgId,FEntrySettleOrgId,FStockUnitID,FStockQty,FStockBaseQty,FSupplierLot,FDeliveryMaxQty,FDeliveryMinQty,FDeliveryEarlyDate,FDeliveryLastDate,FPriceCoefficient,FEntrySettleModeId,FReqTraceNo,FPlanConfirm,FSalUnitID,FSalQty,FCentSettleOrgId,FDispSettleOrgId,FDeliveryStockStatus,FSalBaseQty,FEntryPayOrgId,FAllAmountExceptDisCount,FPlanQty,FPREARRIVALDATE,FSUPPLIERDELIVERYDATE,FYFRATIO,FYFAMOUNT",

  "FilterString": "FDate = '2022-03-24T00:00:00' ", 
  "OrderString": "",
  "TopRowCount": 0,
  "StartRow": 0,
  "Limit": 10,
  "SubSystemId": ""
  
}

需要注意的是,我尝试很多次才发现,条件中日期和字符串需要单引号,而且日期还需要是中间包含字母 T 的国际标准时间。

2、API 获得订单元数据返回 json 的解析

如果只是查询几个字段,大可不必元数据,只需要指定字段名称,然后查询、保存即可。如下所示:

para = {
        "FormId": "PUR_PurchaseOrder",
        "FieldKeys": "FBillNo,FBillTypeID,FDate,FSupplierId,FPurchaseOrgId,FAllAmountExceptDisCount,FPlanQty,FPREARRIVALDATE,FSUPPLIERDELIVERYDATE,FYFRATIO,FYFAMOUNT",
        "FilterString": "FDate = '2024-03-24T00:00:00'",  # [],  #
        "OrderString": "",
        "TopRowCount": 0,
        "StartRow": 0,
        "Limit": 200,
        "SubSystemId": ""
    }
    response = api_sdk.ExecuteBillQuery(para)
    print("test:",response)

上面的查询会返回如下所示的结果,这是同一个采购订单的两个产品,对应两行明细。

res=  [
['CGDD001666', '6d01d059713d42a28bb976c90a121142', '2025-07-02T00:00:00', 136496, 1, 102900, 0, 106815, 136496, 136496, 136496, 'A', 'Q', 0, 118221, 1, 1, 1.0, '1', 0, '1', True, True, 1, 137700, 'YY【】XXXXX/袋001', 103818, 1160.0, 103818, 1160.0, 1160.0, '2025-07-10T00:00:00', 3.663717, 4.14, 13.0, 1, 0, 1, 1, 103818, 1160.0, 1160.0, ' ', 1160.0, 1160.0, '2025-07-10T00:00:00', '2025-07-10T23:59:59', 1.0, 0, ' ', True, 103818, 1160.0, 0, 0, 10001, 1160.0, 0, 4802.4, 1160.0, '2025-07-10T00:00:00', '2025-07-10T00:00:00', 50.0, 2401.2], 
['CGDD001666', '6d01d059713d42a28bb976c90a121142', '2025-07-02T00:00:00', 136496, 1, 102900, 0, 106815, 136496, 136496, 136496, 'A', 'Q', 0, 118221, 1, 1, 1.0, '1', 0, '1', True, True, 1, 137700, 'YY【】XXXXX/袋001', 103818, 1160.0, 103818, 1160.0, 1160.0, '2025-07-10T00:00:00', 3.663717, 4.14, 13.0, 1, 0, 1, 1, 103818, 1160.0, 1160.0, ' ', 1160.0, 1160.0, '2025-07-10T00:00:00', '2025-07-10T23:59:59', 1.0, 0, ' ', True, 103818, 1160.0, 0, 0, 10001, 1160.0, 0, 4802.4, 1160.0, '2025-07-10T00:00:00', '2025-07-10T00:00:00', 50.0, 2401.2]
]

借助于 python 的psycopg2 可以把结果写入到Postgresql 数据库中。

    api_sdk = K3CloudApiSdk("https://XXXX.ik3cloud.com/k3cloud/")
    api_sdk.Init(config_path='./conf.ini', config_node='config')

    # 注意:这里的FieldKeys是硬编码的,如果元数据变化需要同步修改
    # FBillNo 在索引 0, FMaterialDesc 在索引 25
    field_keys_str = "FBillNo,FBillTypeID,FDate,FSupplierId,FPurchaseOrgId,FPurchaseDeptId,FPurchaserGroupId,FPurchaserId,FProviderId,FSettleId,FChargeId,FChangeStatus,FACCTYPE,FSettleModeId,FPayConditionId,FSettleCurrId,FExchangeTypeId,FExchangeRate,FPriceTimePoint,FFOCUSSETTLEORGID,FProductType,FIsIncludedTax,FISPRICEEXCLUDETAX,FLocalCurrId,FMaterialId,FMaterialDesc,FUnitId,FQty,FPriceUnitId,FPriceUnitQty,FPriceBaseQty,FDeliveryDate,FPrice,FTaxPrice,FEntryTaxRate,FRequireOrgId,FRequireDeptId,FReceiveOrgId,FEntrySettleOrgId,FStockUnitID,FStockQty,FStockBaseQty,FSupplierLot,FDeliveryMaxQty,FDeliveryMinQty,FDeliveryEarlyDate,FDeliveryLastDate,FPriceCoefficient,FEntrySettleModeId,FReqTraceNo,FPlanConfirm,FSalUnitID,FSalQty,FCentSettleOrgId,FDispSettleOrgId,FDeliveryStockStatus,FSalBaseQty,FEntryPayOrgId,FAllAmountExceptDisCount,FPlanQty,FPREARRIVALDATE,FSUPPLIERDELIVERYDATE,FYFRATIO,FYFAMOUNT"

    para = {
        "FormId": form_id,
        "FieldKeys": field_keys_str,
        "FilterString": "FDate = '2024-03-24T00:00:00'",  
        "OrderString": "", 
        "TopRowCount": 0, 
        "StartRow": 0, 
        "Limit": 2000, 
        "SubSystemId": ""
    }

    print(f"   {start_date_str} 查询完成,共 {len(all_rows)} 条记录,准备写入数据库...")
    inserted_count = insert_data(cur, conn, all_rows, field_list, table_name, schema)
    # print(f"成功将 {inserted_count} 条数据写入表 {table_name}")

3、API 的改进:函数和循环

关键在于,如何使用函数简化操作,同时使用循环提高效率。

比如 ,我的一个主程序文件是这样的

#!/usr/bin/python
# -*- coding:GBK -*-
import kingdee_utils
from datetime import datetime, timedelta

def main():
    schema = 'kingdee'
    form_id = "PUR_PurchaseOrder"
    table_name = form_id + "0817"  # 建议加上日期或版本号以区分

    # 1. 查询元数据
    # 注意:元数据字段顺序必须和 fetch_and_insert_daily_data 中 FieldKeys 的顺序严格一致
    # 这里我们假设是一致的,如果API返回字段顺序不确定,需要做额外匹配处理
    field_list = kingdee_utils.query_metadata(form_id)
    if not field_list:
        print("未获取到元数据,程序退出。")
        return

    # 2. 数据库连接和建表
    conn = kingdee_utils.get_db_connection()
    cur = conn.cursor()
    kingdee_utils.create_table_if_not_exists(cur, table_name, field_list, schema)
    conn.commit()

    # 3. 按日期循环处理数据
    start_date = datetime(2025, 7, 11)
    total_days = 5
    total_inserted = 0

    for day in range(total_days):
        current_date = start_date + timedelta(days=day)
        date_str = current_date.strftime('%Y-%m-%dT00:00:00')
        print(f"\n======== 开始处理日期: {current_date.strftime('%Y-%m-%d')} ========")

        # 调用新的核心函数,完成“获取并写入”的全部操作
        inserted_count = kingdee_utils.fetch_and_insert_daily_data(
            cur, conn, form_id, table_name, field_list, schema, date_str
        )
        total_inserted += inserted_count
        print(f"======== 日期 {current_date.strftime('%Y-%m-%d')} 处理完毕, 当日插入 {inserted_count} 条数据 ========")

    # 4. 关闭连接
    cur.close()
    conn.close()
    print(f"\n所有任务完成,总计插入 {total_inserted} 条采购订单数据。")


if __name__ == '__main__':
    main()

在这里,我引用了另一个函数文件kingdee_utils(非最终版本):

#!/usr/bin/python
# -*- coding:GBK -*-
# 日期2025/08/16 更新 by Gemini
import json
import psycopg2
from psycopg2.extras import execute_values
from k3cloud_webapi_sdk.main import K3CloudApiSdk
from datetime import datetime


def get_db_connection(schema='kingdee'):
    """获取数据库连接并设置 schema"""
    try:
        conn = psycopg2.connect(
            dbname='postgres',
            user='postgres',
            password='XXXXXX',
            host='localhost',
            port=5432
        )
        cur = conn.cursor()
        cur.execute(f"SET search_path TO {schema};")
        conn.commit()
        print(f"成功连接到数据库,设置 schema 为 {schema}")
        return conn
    except psycopg2.Error as e:
        print(f"数据库连接失败: {e}")
        raise


def check_table_existence(cur, table_name, schema='kingdee'):
    """检查表是否存在"""
    try:
        cur.execute("""
            SELECT EXISTS (
                SELECT 1
                FROM information_schema.tables
                WHERE table_schema = %s AND table_name = %s
            )
        """, (schema, table_name))
        exists = cur.fetchone()[0]
        print(f"检查表 '{schema}.{table_name}' 是否存在: {'是' if exists else '否'}")
        return exists
    except psycopg2.Error as e:
        print(f"检查表是否存在时出错: {e}")
        return False


def create_table_if_not_exists(cur, table_name, field_list, schema='kingdee'):
    """如果表不存在,动态创建表"""
    if check_table_existence(cur, table_name, schema):
        print(f"表 {schema}.{table_name} 已存在,跳过创建")
        return
    print(f"表 {schema}.{table_name} 不存在,正在创建...")
    columns = []
    for field in field_list:
        col_name = field.get('Key', '')
        data_type = field.get('FieldType', 'String')
        pg_type = 'VARCHAR(255)'  # 默认类型
        if data_type in ['integer', 'number', 'decimal', 'float']:
            pg_type = 'NUMERIC'
        elif data_type == 'datetime':
            pg_type = 'TIMESTAMP'
        elif data_type == 'date':
            pg_type = 'DATE'
        elif data_type in ['boolean', 'bool']:
            pg_type = 'BOOLEAN'
        col_def = f'"{col_name}" {pg_type}'  # 使用双引号以支持大小写混合的列名
        columns.append(col_def)
    columns.append('"created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP')
    create_table_query = f'CREATE TABLE {schema}."{table_name}" ({", ".join(columns)})'
    try:
        cur.execute(create_table_query)
        print(f"创建表 {schema}.{table_name} 成功")
    except psycopg2.Error as e:
        print(f"创建表失败: {e}")
        raise

def _map_kingdee_field_type(field_type_code):
    """
    将 Kingdee 的整数 FieldType 代码映射为描述性字符串。
    此映射可能需要根据您的 Kingdee 系统具体情况进行扩展。
    """
    # 基于您提供的示例和通用类型进行映射
    mapping = {
        # 文本类型
        231: 'String',  # 例如: FBillNo
        175: 'String',  # 例如: FDocumentStatus (枚举类型)
        # 日期/时间类型
        61: 'Date',     # 例如: FDate
        # 基础资料/引用类型 (作为字符串处理)
        127: 'String',  # 例如: FPurchaseOrgId, FSupplierId
        # 数值类型 (需要根据实际情况补充)
        107: 'Decimal', # 假设 107 是数值类型
        # 其他类型...
    }
    # 如果代码未在映射中找到,则默认为字符串类型
    return mapping.get(field_type_code, 'String')


def query_metadata(form_id):
    """查询元数据并返回字段列表"""
    api_sdk = K3CloudApiSdk("https://XXXXX.ik3cloud.com/k3cloud/")
    api_sdk.Init(config_path='./conf.ini', config_node='config')
    para = {"FormId": form_id}
    response = api_sdk.QueryBusinessInfo(para)
    print("1 - 正在解析元数据...")
    try:
        res = json.loads(response)
        if not res.get('Result', {}).get('ResponseStatus', {}).get('IsSuccess', False):
            print("  元数据查询失败:", res.get('Result', {}).get('ResponseStatus', {}).get('Errors', []))
            return []

        need_return_data = res.get('Result', {}).get('NeedReturnData', {})

        # 遍历所有 Entrys (如单据头、单据体),收集所有字段
        all_fields = []
        for entry in need_return_data.get('Entrys', []):
            all_fields.extend(entry.get('Fields', []))

        if not all_fields:
            print("  元数据中未找到任何字段。")
            return []

        # 构建最终的字段列表
        field_list = []
        for field in all_fields:
            if field.get('Key'):
                field_list.append({
                    'Key': field.get('Key'),
                    'Name': next((item['Value'] for item in field.get('Name', []) if item['Key'] == 2052),
                                 field.get('Key')),
                    'FieldType': _map_kingdee_field_type(field.get('FieldType'))
                })

        print(f"  元数据解析成功,共找到 {len(field_list)} 个字段。")
        return field_list
    except (json.JSONDecodeError, KeyError) as e:
        print(f"元数据解析错误: {e}")
        return []


def preprocess_row(row, field_keys, field_list):
    """预处理行数据,将值转换为与字段类型匹配的格式"""
    processed_row = []
    field_types = {f['Key']: f['FieldType'] for f in field_list}
    for i, key in enumerate(field_keys):
        value = row[i] if i < len(row) else None
        field_type = field_types.get(key, 'string')
        if isinstance(value, dict):
            value = value.get('FNumber', value.get('FName', ''))
        elif value is None:
            pass
        elif field_type in ['integer', 'number', 'decimal', 'float']:
            value = float(value) if value not in [None, ''] else None
        elif field_type in ['boolean', 'bool']:
            value = str(value).lower() == 'true'
        elif value == '':
            value = None
        else:
            value = str(value)
        processed_row.append(value)
    return tuple(processed_row)


def insert_data(cur, conn, rows, field_list, table_name, schema):
    """将数据批量插入到数据库"""
    if not rows:
        return 0
    field_keys = [field['Key'] for field in field_list]
    columns = [f'"{key}"' for key in field_keys]  # 使用双引号

    insert_query = f"""
        INSERT INTO {schema}."{table_name}" ({', '.join(columns)}, "created_at")
        VALUES %s
    """
    try:
        processed_rows = [preprocess_row(row, field_keys, field_list) + (datetime.now(),) for row in rows]
        execute_values(cur, insert_query, processed_rows)
        conn.commit()
        return len(processed_rows)
    except psycopg2.Error as e:
        print(f"插入数据失败: {e}")
        conn.rollback()
        return 0


def fetch_and_insert_daily_data(cur, conn, form_id, table_name, field_list, schema, start_date_str):
    """
    核心函数:获取指定单据和单日的所有数据,并直接写入数据库。
    - 动态生成查询字段
    - 自动处理分页。
    - 打印订单号和产品名称日志。
    - 仅在获取到数据时执行写入。
    """
    api_sdk = K3CloudApiSdk("https://XXXXX.ik3cloud.com/k3cloud/")
    api_sdk.Init(config_path='./conf.ini', config_node='config')

    # 从元数据动态生成字段列表和查询字符串
    if not field_list:
        print("错误: 字段列表为空,无法执行查询。")
        return 0

    field_keys = [field['Key'] for field in field_list]
    field_keys_str = ",".join(field_keys)
    # print("fields list:",field_keys_str)

    para = {
        "FormId": form_id,
        "FieldKeys": field_keys_str,
        "FilterString": f"FDate = '{start_date_str}'",
        "OrderString": "", "TopRowCount": 0, "StartRow": 0, "Limit": 2000, "SubSystemId": ""
    }
    print(f"开始查询日期: {start_date_str}")
    all_rows = []
    while True:
        try:
            response = api_sdk.ExecuteBillQuery(para)
            res = json.loads(response)
            print("res",res)
            if not res:
                break
            for row in res:
                # 查找 FBillNo 和 FMaterialDesc 的索引用于日志记录
                order_no = row[0]
                product_name = row[8]
                print(f"  获取到数据 -> 订单号: {order_no}, 时间: {product_name}")
                all_rows.append(row)

            if len(res) < para["Limit"]:
                break

            para["StartRow"] += para["Limit"]
            print(f"  分页查询,下一页起始行: {para['StartRow']}")

        except (json.JSONDecodeError, Exception) as e:
            print(f"查询或解析数据时出错: {e}")
            break

    if not all_rows:
        print(f"日期 {start_date_str} 没有查询到任何数据,跳过写入。")
        return 0

    print(f"日期 {start_date_str} 查询完成,共 {len(all_rows)} 条记录,准备写入数据库...")
    inserted_count = insert_data(cur, conn, all_rows, field_list, table_name, schema)
    print(f"成功将 {inserted_count} 条数据写入表 {table_name}")
    return inserted_count

4、使用元数据优化建表逻辑和排错

Kingdee 金蝶 api 的 json元数据,提供了很好的检查手段。

比如,我在查询条码数据时提示,

api_sdk.ExecuteBillQuery(para) =  
[[{'Result': 
   {'ResponseStatus': 
      {'ErrorCode': 500, 'IsSuccess': False, 'Errors': 
        [{'FieldName': None, 'Message': '名称为“条码管理”的模块/子系统未购买', 'DIndex': 0}], 'SuccessEntitys': [], 'SuccessMessages': [], 'MsgCode': 7}
}}]]

使用ResponseStatus 可以快速判断是否返回了正确的结果。避免循环引用错误。

金蝶云星空的数据库表关联关系类型:

001、单据头拆分表

单据头的拆分表引用单据头所在表的主键

例如:T_AP_PAYABLE_O.FID=T_AP_PAYABLE.FID

002、单据体拆分表

单据体的拆分表引用单据体所在表的主键;同时引用单据体的父实体的主键

例如:T_AP_PAYABLEENTRY_O.Fentryid=T_AP_PAYABLEENTRY.fentryid

例如:T_AP_PAYABLEENTRY_O.fid=T_AP_PAYABLE.fid

003、多语言表

多语言表引用所在实体的表的主键

例如:T_AP_PAYABLEENTRY_L.fentryid=T_AP_PAYABLEENTRY.fentryid

004、单据体

单据体引用单据头的主键

例如:T_AP_PAYABLEENTRY.FID=T_AP_PAYABLE.FID

005、单据头关联凭证表(VH表)

单据头关联凭证表引用单据头的主键

例如:T_AP_PAYABLE_VH.Fid=T_AP_PAYABLE.FID

006、子单据体

子单据体引用单据体的主键

例如:T_AP_PAYABLETAX.fentryid=T_AP_PAYABLEenty.fentryid

作者:夏天的云儿

来源:金蝶云社区

原文链接:https://vip.kingdee.com/article/618381625222396160?productLineId=1&lang=zh-CN

著作权归作者所有。未经允许禁止转载,如需转载请联系作者获得授权。

借助于元数据的结构,可以优化建表逻辑和查询过程。以物料信息为例,json 的 entry 节点中有很多个分类,简化期间,我只保留了前面五个分类信息。

但是要注意,如果保留所有的 Entry 节点,数据可能重复重复,这让我很震惊。

比如,我在查询 User 用户时,如果保留所有元数据条目 (Entries): [‘FBillHead’, ‘FOrgInfo’, ‘FRoleInfo’],就会出现一个用户对应多个OrgInfo 和多个 RoleInfo 之中。 这个就需要通过设置主键或其他方式避免。

为了帮助检查 Entry 的数量,我在查询时让代码抛出 Entry 节点的名称以备检查,并默认保留前面5个节点。

1 - 正在解析元数据...
  发现的元数据条目 (Entries): ['FBillHead', 'FSubHeadEntity', 'FBarCodeEntity_CMK', 'FSpecialAttributeEntity', 'SubHeadEntity', 'SubHeadEntity1', 'SubHeadEntity2', 'SubHeadEntity3', 'SubHeadEntity4', 'SubHeadEntity5', 'FEntityAuxPty', 'FEntityInvPty', 'SubHeadEntity7', 'SubHeadEntity6']
  将只处理以下条目中的字段: ['FBillHead', 'FSubHeadEntity', 'FBarCodeEntity_CMK', 'FSpecialAttributeEntity', 'SubHeadEntity']
  元数据解析成功,共找到 123 个字段。

以订单到货为例,元数据的数据中 Entry 中的内容如下:

[{
	'Key': 'FBillNo',
	'Name': [{
		'Key': 2052,
		'Value': '单据编号'
	}, {
		'Key': 1033,
		'Value': 'Doc No.'
	}, {
		'Key': 3076,
		'Value': '單據編號'
	}],
	'FieldName': 'FBILLNO',
	'PropertyName': 'BillNo',
	'FieldType': 231,
	'EntityKey': 'FBillHead',
	'TableName': 't_STK_InStock',
	'ElementType': 12,
	'MustInput': 0,
	'LookUpObjectFormId': None,
	'LookUpObjectID': None,
	'EnumObjectId': None,
	'Extends': None,
	'ControlFieldKey': None,
	'GroupFieldTableName': None,
	'DefValue': '',
	'IsViewVisible': True,
	'IsEditVisible': True,
	'IsNewVisible': True,
	'IsNewLock': False,
	'IsEditLock': False,
	'Editlen': 30,
	'ConditionType': '0'
}, {
	'Key': 'FDocumentStatus',
	'Name': [{
		'Key': 2052,
		'Value': '单据状态'
	}, {
		'Key': 1033,
		'Value': 'Doc Status'
	}, {
		'Key': 3076,
		'Value': '單據狀態'
	}],
	'FieldName': 'FDOCUMENTSTATUS',
	'PropertyName': 'DocumentStatus',
	'FieldType': 175,
	'EntityKey': 'FBillHead',
	'TableName': 't_STK_InStock',
	'ElementType': 40,
	'MustInput': 0,
	'LookUpObjectFormId': None,
	'LookUpObjectID': None,
	'EnumObjectId': None,
	'Extends': [{
		'Value': 'Z',
		'Caption': '暂存',
		'Seq': 0,
		'Invalid': False
	}, {
		'Value': 'A',
		'Caption': '创建',
		'Seq': 2,
		'Invalid': False
	}, {
		'Value': 'B',
		'Caption': '审核中',
		'Seq': 3,
		'Invalid': False
	}, {
		'Value': 'C',
		'Caption': '已审核',
		'Seq': 4,
		'Invalid': False
	}, {
		'Value': 'D',
		'Caption': '重新审核',
		'Seq': 5,
		'Invalid': False
	}],
	'ControlFieldKey': None,
	'GroupFieldTableName': None,
	'DefValue': 'Z',
	'IsViewVisible': True,
	'IsEditVisible': True,
	'IsNewVisible': True,
	'IsNewLock': True,
	'IsEditLock': True,
	'Editlen': 0,
	'ConditionType': '9'
}, {
	'Key': 'FStockOrgId',
	'Name': [{
		'Key': 2052,
		'Value': '收料组织'
	}, {
		'Key': 1033,
		'Value': 'Receipt Org.'
	}, {
		'Key': 3076,
		'Value': '收料組織'
	}],
	'FieldName': 'FSTOCKORGID',
	'PropertyName': 'StockOrgId',
	'FieldType': 127,
	'EntityKey': 'FBillHead',
	'TableName': 't_STK_InStock',
	'ElementType': 7,
	'MustInput': 1,
	'LookUpObjectFormId': 'ORG_Organizations',
	'LookUpObjectID': '3f8da3f4-fe84-4456-9548-7c2c38b7c57f',
	'EnumObjectId': None,
	'Extends': None,
	'ControlFieldKey': None,
	'GroupFieldTableName': None,
	'DefValue': '',
	'IsViewVisible': True,
	'IsEditVisible': True,
	'IsNewVisible': True,
	'IsNewLock': False,
	'IsEditLock': False,
	'Editlen': 0,
	'ConditionType': '0,20'
}, {
	'Key': 'FDate',
	'Name': [{
		'Key': 2052,
		'Value': '入库日期'
	}, {
		'Key': 1033,
		'Value': 'Warehouse Receipt Date'
	}, {
		'Key': 3076,
		'Value': '入庫日期'
	}],
	'FieldName': 'FDATE',
	'PropertyName': 'Date',
	'FieldType': 61,
	'EntityKey': 'FBillHead',
	'TableName': 't_STK_InStock',
	'ElementType': 4,
	'MustInput': 1,
	'LookUpObjectFormId': None,
	'LookUpObjectID': None,
	'EnumObjectId': None,
	'Extends': None,
	'ControlFieldKey': None,
	'GroupFieldTableName': None,
	'DefValue': None,
	'IsViewVisible': True,
	'IsEditVisible': True,
	'IsNewVisible': True,
	'IsNewLock': False,
	'IsEditLock': False,
	'Editlen': 0,
	'ConditionType': '2'
}, 
…………………………

不得不说,金蝶 API 的帮助文档是真差,为了一个数据类型,我搜索了好几天才从一个论坛文章中找到一丝线索。金蝶的数据类型编码到类型的转化如下:

作为单一主键的数据库数据类型有以下6种:
1、smallint
system_type_id=52 举例:T_BAS_NUMBER.FID
2、int
system_type_id=56 举例:T_BAS_ITEM.FITEMID
3、bigint
system_type_id=127 举例:T_SAL_INITOUTSTOCK.FID
4、varchar
system_type_id=167 举例:T_BAS_PUBNEEDS.FID
5、char
system_type_id=175 举例:CMK_BD_KTMDELETELOG.FID
6、nvarchar
system_type_id=231 举例: T_AM_NETCONTROL.FID
作者:i求知若渴
来源:金蝶云社区
原文链接:https://vip.kingdee.com/article/61129193165550592?productLineId=1&lang=zh-CN
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

三、企业级 DW 中心开发

当然,上面是我个人用 Python 完成的测试,最终开发使用了更加健壮的 java 完成。并结合定时任务实现高效的增量刷新。

  • 增量:有新订单追加(随着同步周期不同而不同,相对较多)
  • 覆盖:历史订单有变更覆盖数据(相对较多)
  • 删除:历史订单有删除同步删除或“标记删除”(非常之少)
A snapshot of system logs showing request URLs and timestamps related to an API execution.

目前,我用 java 完成了采购及部分主题的数据表查询,这样就建立了后续 BI 的分析基础,并且尽可能不影响业务系统。

补充:我的测试数据一览

A screenshot showing database assets, including a table list, schema information, and data statistics from Kingdee cloud, utilizing API for querying and data storage.

四、最精彩的部分:Tableau 分析

分析始于建模,建模的典型是关系模型(relationship)。

如图所示,展示了采购订单和供应商信息的基本模型,后续可以增加物料等基本信息,构成典型的单事实多维度模型。

图示展示了采购订单与物料明细的关系模型,左侧为连接信息,右侧为物料详细数据。

模型之后就是仪表板啦,让大家看看近期的作品(列表)吧。

Interface of Tableau DW/BI analysis system showing various dashboards and reports.

注:客户数据不便公开。

2025/09/30 update by 喜乐君