最近老客户推荐了新客户,让我基于金蝶 ERP 构建企业的分析中心,相比Sap、Monitor ERP 等成熟 ERP,金蝶 ERP 最大的问题在于如何及时、准确地获得数据。
坐拥全国这么多的云端客户,金蝶甚至都不给客户开放数据分析所需要的数据库权限!API 数据每日限制只有5万行 ,这在保证系统稳定性的同时,也限制了客户的手脚,限制了系统的持续改进。
为此,在购买 API2DataBase 的系统之前,我不得不使用 Python 先自己尝试获得一个数据表,开始探索之路。
一、构建平台的关系
在金蝶 ERP 和 Tableau 之间,很明显需要一个“数据仓库”的实体存在。测试阶段,这个数据仓库可以是我本地的 PostgreSQL 或者 MySQL 数据库,正式交付客户时,则将是基于云服务的 Postgresql 云数据库。

之所以使用 Postgresql 而非 MySQL,还有一个原因是是,前者可以更好地兼容 json 数据格式,特别是9.4引入了 jsonb 类型,它存储二进制格式,具有更好的查询性能。
由于金蝶API 可能随着时间变化,因为我计划将 元数据JSON 数据(多层嵌套结构)先存储到数据库,根据元数据信息,查询相关数据表信息并写入数据库,通过存储过程(如 PL/pgSQL)或视图来构建一层抽象,用于数据处理(如解析嵌套字段)和分析(如过滤 MustInput=1 的字段、统计 FieldType 等)。
这是一个典型的 ETL/分析流程,数据库选择需要平衡 JSON 存储的灵活性、查询效率、一致性和维护成本。
注:2026年二季度开始,将接受“金蝶云星空/金蝶云苍穹”的新客户项目,并以此为基础筹划“供应链分析主题最佳实践”的图书,有兴趣的客户或个人欢迎提前联系。 @admin (wyp@vizwise.cn)

二、数据采集过程(简化版本)
这里以我正在做的“采购订单表”为例。
1、API 的使用说明
金蝶的采购订单为例,对应的 formid = PUR_PurchaseOrder。如下图所示,展示了金蝶 API 官方测试的示例,可以查询采购订单前2000行数据。

查询结果如下所示:
[
[
"2022-03-24T00:00:00",
"CGDD000001"
],
[
"2022-03-24T00:00:00",
"CGDD000002"
],
[
"2022-03-24T00:00:00",
"CGDD000003"
],
……
]
由于存在查询上限,所以订单就需要增加查询条件(FilterString)并借助程序循环查询。同时,还需要想办法获得采购订单的字段列(即元数据信息)。比如查询某一天的采购订单,如下所示:
{
"FormId": "PUR_PurchaseOrder",
"FieldKeys": "FBillNo,FBillTypeID,FDate,FSupplierId,FPurchaseOrgId,FPurchaseDeptId,FPurchaserGroupId,FPurchaserId,FProviderId,FSettleId,FChargeId,FChangeStatus,FACCTYPE,FSettleModeId,FPayConditionId,FSettleCurrId,FExchangeTypeId,FExchangeRate,FPriceTimePoint,FFOCUSSETTLEORGID,FProductType,FIsIncludedTax,FISPRICEEXCLUDETAX,FLocalCurrId,FMaterialId,FMaterialDesc,FUnitId,FQty,FPriceUnitId,FPriceUnitQty,FPriceBaseQty,FDeliveryDate,FPrice,FTaxPrice,FEntryTaxRate,FRequireOrgId,FRequireDeptId,FReceiveOrgId,FEntrySettleOrgId,FStockUnitID,FStockQty,FStockBaseQty,FSupplierLot,FDeliveryMaxQty,FDeliveryMinQty,FDeliveryEarlyDate,FDeliveryLastDate,FPriceCoefficient,FEntrySettleModeId,FReqTraceNo,FPlanConfirm,FSalUnitID,FSalQty,FCentSettleOrgId,FDispSettleOrgId,FDeliveryStockStatus,FSalBaseQty,FEntryPayOrgId,FAllAmountExceptDisCount,FPlanQty,FPREARRIVALDATE,FSUPPLIERDELIVERYDATE,FYFRATIO,FYFAMOUNT",
"FilterString": "FDate = '2022-03-24T00:00:00' ",
"OrderString": "",
"TopRowCount": 0,
"StartRow": 0,
"Limit": 10,
"SubSystemId": ""
}
需要注意的是,我尝试很多次才发现,条件中日期和字符串需要单引号,而且日期还需要是中间包含字母 T 的国际标准时间。
2、API 获得订单元数据返回 json 的解析
如果只是查询几个字段,大可不必元数据,只需要指定字段名称,然后查询、保存即可。如下所示:
para = {
"FormId": "PUR_PurchaseOrder",
"FieldKeys": "FBillNo,FBillTypeID,FDate,FSupplierId,FPurchaseOrgId,FAllAmountExceptDisCount,FPlanQty,FPREARRIVALDATE,FSUPPLIERDELIVERYDATE,FYFRATIO,FYFAMOUNT",
"FilterString": "FDate = '2024-03-24T00:00:00'", # [], #
"OrderString": "",
"TopRowCount": 0,
"StartRow": 0,
"Limit": 200,
"SubSystemId": ""
}
response = api_sdk.ExecuteBillQuery(para)
print("test:",response)
上面的查询会返回如下所示的结果,这是同一个采购订单的两个产品,对应两行明细。
res= [
['CGDD001666', '6d01d059713d42a28bb976c90a121142', '2025-07-02T00:00:00', 136496, 1, 102900, 0, 106815, 136496, 136496, 136496, 'A', 'Q', 0, 118221, 1, 1, 1.0, '1', 0, '1', True, True, 1, 137700, 'YY【】XXXXX/袋001', 103818, 1160.0, 103818, 1160.0, 1160.0, '2025-07-10T00:00:00', 3.663717, 4.14, 13.0, 1, 0, 1, 1, 103818, 1160.0, 1160.0, ' ', 1160.0, 1160.0, '2025-07-10T00:00:00', '2025-07-10T23:59:59', 1.0, 0, ' ', True, 103818, 1160.0, 0, 0, 10001, 1160.0, 0, 4802.4, 1160.0, '2025-07-10T00:00:00', '2025-07-10T00:00:00', 50.0, 2401.2],
['CGDD001666', '6d01d059713d42a28bb976c90a121142', '2025-07-02T00:00:00', 136496, 1, 102900, 0, 106815, 136496, 136496, 136496, 'A', 'Q', 0, 118221, 1, 1, 1.0, '1', 0, '1', True, True, 1, 137700, 'YY【】XXXXX/袋001', 103818, 1160.0, 103818, 1160.0, 1160.0, '2025-07-10T00:00:00', 3.663717, 4.14, 13.0, 1, 0, 1, 1, 103818, 1160.0, 1160.0, ' ', 1160.0, 1160.0, '2025-07-10T00:00:00', '2025-07-10T23:59:59', 1.0, 0, ' ', True, 103818, 1160.0, 0, 0, 10001, 1160.0, 0, 4802.4, 1160.0, '2025-07-10T00:00:00', '2025-07-10T00:00:00', 50.0, 2401.2]
]
借助于 python 的psycopg2 可以把结果写入到Postgresql 数据库中。
api_sdk = K3CloudApiSdk("https://XXXX.ik3cloud.com/k3cloud/")
api_sdk.Init(config_path='./conf.ini', config_node='config')
# 注意:这里的FieldKeys是硬编码的,如果元数据变化需要同步修改
# FBillNo 在索引 0, FMaterialDesc 在索引 25
field_keys_str = "FBillNo,FBillTypeID,FDate,FSupplierId,FPurchaseOrgId,FPurchaseDeptId,FPurchaserGroupId,FPurchaserId,FProviderId,FSettleId,FChargeId,FChangeStatus,FACCTYPE,FSettleModeId,FPayConditionId,FSettleCurrId,FExchangeTypeId,FExchangeRate,FPriceTimePoint,FFOCUSSETTLEORGID,FProductType,FIsIncludedTax,FISPRICEEXCLUDETAX,FLocalCurrId,FMaterialId,FMaterialDesc,FUnitId,FQty,FPriceUnitId,FPriceUnitQty,FPriceBaseQty,FDeliveryDate,FPrice,FTaxPrice,FEntryTaxRate,FRequireOrgId,FRequireDeptId,FReceiveOrgId,FEntrySettleOrgId,FStockUnitID,FStockQty,FStockBaseQty,FSupplierLot,FDeliveryMaxQty,FDeliveryMinQty,FDeliveryEarlyDate,FDeliveryLastDate,FPriceCoefficient,FEntrySettleModeId,FReqTraceNo,FPlanConfirm,FSalUnitID,FSalQty,FCentSettleOrgId,FDispSettleOrgId,FDeliveryStockStatus,FSalBaseQty,FEntryPayOrgId,FAllAmountExceptDisCount,FPlanQty,FPREARRIVALDATE,FSUPPLIERDELIVERYDATE,FYFRATIO,FYFAMOUNT"
para = {
"FormId": form_id,
"FieldKeys": field_keys_str,
"FilterString": "FDate = '2024-03-24T00:00:00'",
"OrderString": "",
"TopRowCount": 0,
"StartRow": 0,
"Limit": 2000,
"SubSystemId": ""
}
print(f" {start_date_str} 查询完成,共 {len(all_rows)} 条记录,准备写入数据库...")
inserted_count = insert_data(cur, conn, all_rows, field_list, table_name, schema)
# print(f"成功将 {inserted_count} 条数据写入表 {table_name}")
3、API 的改进:函数和循环
关键在于,如何使用函数简化操作,同时使用循环提高效率。
比如 ,我的一个主程序文件是这样的
#!/usr/bin/python
# -*- coding:GBK -*-
import kingdee_utils
from datetime import datetime, timedelta
def main():
schema = 'kingdee'
form_id = "PUR_PurchaseOrder"
table_name = form_id + "0817" # 建议加上日期或版本号以区分
# 1. 查询元数据
# 注意:元数据字段顺序必须和 fetch_and_insert_daily_data 中 FieldKeys 的顺序严格一致
# 这里我们假设是一致的,如果API返回字段顺序不确定,需要做额外匹配处理
field_list = kingdee_utils.query_metadata(form_id)
if not field_list:
print("未获取到元数据,程序退出。")
return
# 2. 数据库连接和建表
conn = kingdee_utils.get_db_connection()
cur = conn.cursor()
kingdee_utils.create_table_if_not_exists(cur, table_name, field_list, schema)
conn.commit()
# 3. 按日期循环处理数据
start_date = datetime(2025, 7, 11)
total_days = 5
total_inserted = 0
for day in range(total_days):
current_date = start_date + timedelta(days=day)
date_str = current_date.strftime('%Y-%m-%dT00:00:00')
print(f"\n======== 开始处理日期: {current_date.strftime('%Y-%m-%d')} ========")
# 调用新的核心函数,完成“获取并写入”的全部操作
inserted_count = kingdee_utils.fetch_and_insert_daily_data(
cur, conn, form_id, table_name, field_list, schema, date_str
)
total_inserted += inserted_count
print(f"======== 日期 {current_date.strftime('%Y-%m-%d')} 处理完毕, 当日插入 {inserted_count} 条数据 ========")
# 4. 关闭连接
cur.close()
conn.close()
print(f"\n所有任务完成,总计插入 {total_inserted} 条采购订单数据。")
if __name__ == '__main__':
main()
在这里,我引用了另一个函数文件kingdee_utils(非最终版本):
#!/usr/bin/python
# -*- coding:GBK -*-
# 日期2025/08/16 更新 by Gemini
import json
import psycopg2
from psycopg2.extras import execute_values
from k3cloud_webapi_sdk.main import K3CloudApiSdk
from datetime import datetime
def get_db_connection(schema='kingdee'):
"""获取数据库连接并设置 schema"""
try:
conn = psycopg2.connect(
dbname='postgres',
user='postgres',
password='XXXXXX',
host='localhost',
port=5432
)
cur = conn.cursor()
cur.execute(f"SET search_path TO {schema};")
conn.commit()
print(f"成功连接到数据库,设置 schema 为 {schema}")
return conn
except psycopg2.Error as e:
print(f"数据库连接失败: {e}")
raise
def check_table_existence(cur, table_name, schema='kingdee'):
"""检查表是否存在"""
try:
cur.execute("""
SELECT EXISTS (
SELECT 1
FROM information_schema.tables
WHERE table_schema = %s AND table_name = %s
)
""", (schema, table_name))
exists = cur.fetchone()[0]
print(f"检查表 '{schema}.{table_name}' 是否存在: {'是' if exists else '否'}")
return exists
except psycopg2.Error as e:
print(f"检查表是否存在时出错: {e}")
return False
def create_table_if_not_exists(cur, table_name, field_list, schema='kingdee'):
"""如果表不存在,动态创建表"""
if check_table_existence(cur, table_name, schema):
print(f"表 {schema}.{table_name} 已存在,跳过创建")
return
print(f"表 {schema}.{table_name} 不存在,正在创建...")
columns = []
for field in field_list:
col_name = field.get('Key', '')
data_type = field.get('FieldType', 'String')
pg_type = 'VARCHAR(255)' # 默认类型
if data_type in ['integer', 'number', 'decimal', 'float']:
pg_type = 'NUMERIC'
elif data_type == 'datetime':
pg_type = 'TIMESTAMP'
elif data_type == 'date':
pg_type = 'DATE'
elif data_type in ['boolean', 'bool']:
pg_type = 'BOOLEAN'
col_def = f'"{col_name}" {pg_type}' # 使用双引号以支持大小写混合的列名
columns.append(col_def)
columns.append('"created_at" TIMESTAMP DEFAULT CURRENT_TIMESTAMP')
create_table_query = f'CREATE TABLE {schema}."{table_name}" ({", ".join(columns)})'
try:
cur.execute(create_table_query)
print(f"创建表 {schema}.{table_name} 成功")
except psycopg2.Error as e:
print(f"创建表失败: {e}")
raise
def _map_kingdee_field_type(field_type_code):
"""
将 Kingdee 的整数 FieldType 代码映射为描述性字符串。
此映射可能需要根据您的 Kingdee 系统具体情况进行扩展。
"""
# 基于您提供的示例和通用类型进行映射
mapping = {
# 文本类型
231: 'String', # 例如: FBillNo
175: 'String', # 例如: FDocumentStatus (枚举类型)
# 日期/时间类型
61: 'Date', # 例如: FDate
# 基础资料/引用类型 (作为字符串处理)
127: 'String', # 例如: FPurchaseOrgId, FSupplierId
# 数值类型 (需要根据实际情况补充)
107: 'Decimal', # 假设 107 是数值类型
# 其他类型...
}
# 如果代码未在映射中找到,则默认为字符串类型
return mapping.get(field_type_code, 'String')
def query_metadata(form_id):
"""查询元数据并返回字段列表"""
api_sdk = K3CloudApiSdk("https://XXXXX.ik3cloud.com/k3cloud/")
api_sdk.Init(config_path='./conf.ini', config_node='config')
para = {"FormId": form_id}
response = api_sdk.QueryBusinessInfo(para)
print("1 - 正在解析元数据...")
try:
res = json.loads(response)
if not res.get('Result', {}).get('ResponseStatus', {}).get('IsSuccess', False):
print(" 元数据查询失败:", res.get('Result', {}).get('ResponseStatus', {}).get('Errors', []))
return []
need_return_data = res.get('Result', {}).get('NeedReturnData', {})
# 遍历所有 Entrys (如单据头、单据体),收集所有字段
all_fields = []
for entry in need_return_data.get('Entrys', []):
all_fields.extend(entry.get('Fields', []))
if not all_fields:
print(" 元数据中未找到任何字段。")
return []
# 构建最终的字段列表
field_list = []
for field in all_fields:
if field.get('Key'):
field_list.append({
'Key': field.get('Key'),
'Name': next((item['Value'] for item in field.get('Name', []) if item['Key'] == 2052),
field.get('Key')),
'FieldType': _map_kingdee_field_type(field.get('FieldType'))
})
print(f" 元数据解析成功,共找到 {len(field_list)} 个字段。")
return field_list
except (json.JSONDecodeError, KeyError) as e:
print(f"元数据解析错误: {e}")
return []
def preprocess_row(row, field_keys, field_list):
"""预处理行数据,将值转换为与字段类型匹配的格式"""
processed_row = []
field_types = {f['Key']: f['FieldType'] for f in field_list}
for i, key in enumerate(field_keys):
value = row[i] if i < len(row) else None
field_type = field_types.get(key, 'string')
if isinstance(value, dict):
value = value.get('FNumber', value.get('FName', ''))
elif value is None:
pass
elif field_type in ['integer', 'number', 'decimal', 'float']:
value = float(value) if value not in [None, ''] else None
elif field_type in ['boolean', 'bool']:
value = str(value).lower() == 'true'
elif value == '':
value = None
else:
value = str(value)
processed_row.append(value)
return tuple(processed_row)
def insert_data(cur, conn, rows, field_list, table_name, schema):
"""将数据批量插入到数据库"""
if not rows:
return 0
field_keys = [field['Key'] for field in field_list]
columns = [f'"{key}"' for key in field_keys] # 使用双引号
insert_query = f"""
INSERT INTO {schema}."{table_name}" ({', '.join(columns)}, "created_at")
VALUES %s
"""
try:
processed_rows = [preprocess_row(row, field_keys, field_list) + (datetime.now(),) for row in rows]
execute_values(cur, insert_query, processed_rows)
conn.commit()
return len(processed_rows)
except psycopg2.Error as e:
print(f"插入数据失败: {e}")
conn.rollback()
return 0
def fetch_and_insert_daily_data(cur, conn, form_id, table_name, field_list, schema, start_date_str):
"""
核心函数:获取指定单据和单日的所有数据,并直接写入数据库。
- 动态生成查询字段
- 自动处理分页。
- 打印订单号和产品名称日志。
- 仅在获取到数据时执行写入。
"""
api_sdk = K3CloudApiSdk("https://XXXXX.ik3cloud.com/k3cloud/")
api_sdk.Init(config_path='./conf.ini', config_node='config')
# 从元数据动态生成字段列表和查询字符串
if not field_list:
print("错误: 字段列表为空,无法执行查询。")
return 0
field_keys = [field['Key'] for field in field_list]
field_keys_str = ",".join(field_keys)
# print("fields list:",field_keys_str)
para = {
"FormId": form_id,
"FieldKeys": field_keys_str,
"FilterString": f"FDate = '{start_date_str}'",
"OrderString": "", "TopRowCount": 0, "StartRow": 0, "Limit": 2000, "SubSystemId": ""
}
print(f"开始查询日期: {start_date_str}")
all_rows = []
while True:
try:
response = api_sdk.ExecuteBillQuery(para)
res = json.loads(response)
print("res",res)
if not res:
break
for row in res:
# 查找 FBillNo 和 FMaterialDesc 的索引用于日志记录
order_no = row[0]
product_name = row[8]
print(f" 获取到数据 -> 订单号: {order_no}, 时间: {product_name}")
all_rows.append(row)
if len(res) < para["Limit"]:
break
para["StartRow"] += para["Limit"]
print(f" 分页查询,下一页起始行: {para['StartRow']}")
except (json.JSONDecodeError, Exception) as e:
print(f"查询或解析数据时出错: {e}")
break
if not all_rows:
print(f"日期 {start_date_str} 没有查询到任何数据,跳过写入。")
return 0
print(f"日期 {start_date_str} 查询完成,共 {len(all_rows)} 条记录,准备写入数据库...")
inserted_count = insert_data(cur, conn, all_rows, field_list, table_name, schema)
print(f"成功将 {inserted_count} 条数据写入表 {table_name}")
return inserted_count
4、使用元数据优化建表逻辑和排错
Kingdee 金蝶 api 的 json元数据,提供了很好的检查手段。
比如,我在查询条码数据时提示,
api_sdk.ExecuteBillQuery(para) =
[[{'Result':
{'ResponseStatus':
{'ErrorCode': 500, 'IsSuccess': False, 'Errors':
[{'FieldName': None, 'Message': '名称为“条码管理”的模块/子系统未购买', 'DIndex': 0}], 'SuccessEntitys': [], 'SuccessMessages': [], 'MsgCode': 7}
}}]]
使用ResponseStatus 可以快速判断是否返回了正确的结果。避免循环引用错误。
金蝶云星空的数据库表关联关系类型:
001、单据头拆分表
单据头的拆分表引用单据头所在表的主键
例如:T_AP_PAYABLE_O.FID=T_AP_PAYABLE.FID
002、单据体拆分表
单据体的拆分表引用单据体所在表的主键;同时引用单据体的父实体的主键
例如:T_AP_PAYABLEENTRY_O.Fentryid=T_AP_PAYABLEENTRY.fentryid
例如:T_AP_PAYABLEENTRY_O.fid=T_AP_PAYABLE.fid
003、多语言表
多语言表引用所在实体的表的主键
例如:T_AP_PAYABLEENTRY_L.fentryid=T_AP_PAYABLEENTRY.fentryid
004、单据体
单据体引用单据头的主键
例如:T_AP_PAYABLEENTRY.FID=T_AP_PAYABLE.FID
005、单据头关联凭证表(VH表)
单据头关联凭证表引用单据头的主键
例如:T_AP_PAYABLE_VH.Fid=T_AP_PAYABLE.FID
006、子单据体
子单据体引用单据体的主键
例如:T_AP_PAYABLETAX.fentryid=T_AP_PAYABLEenty.fentryid
作者:夏天的云儿
来源:金蝶云社区
原文链接:https://vip.kingdee.com/article/618381625222396160?productLineId=1&lang=zh-CN
著作权归作者所有。未经允许禁止转载,如需转载请联系作者获得授权。
借助于元数据的结构,可以优化建表逻辑和查询过程。以物料信息为例,json 的 entry 节点中有很多个分类,简化期间,我只保留了前面五个分类信息。
但是要注意,如果保留所有的 Entry 节点,数据可能重复重复,这让我很震惊。
比如,我在查询 User 用户时,如果保留所有元数据条目 (Entries): [‘FBillHead’, ‘FOrgInfo’, ‘FRoleInfo’],就会出现一个用户对应多个OrgInfo 和多个 RoleInfo 之中。 这个就需要通过设置主键或其他方式避免。
为了帮助检查 Entry 的数量,我在查询时让代码抛出 Entry 节点的名称以备检查,并默认保留前面5个节点。
1 - 正在解析元数据...
发现的元数据条目 (Entries): ['FBillHead', 'FSubHeadEntity', 'FBarCodeEntity_CMK', 'FSpecialAttributeEntity', 'SubHeadEntity', 'SubHeadEntity1', 'SubHeadEntity2', 'SubHeadEntity3', 'SubHeadEntity4', 'SubHeadEntity5', 'FEntityAuxPty', 'FEntityInvPty', 'SubHeadEntity7', 'SubHeadEntity6']
将只处理以下条目中的字段: ['FBillHead', 'FSubHeadEntity', 'FBarCodeEntity_CMK', 'FSpecialAttributeEntity', 'SubHeadEntity']
元数据解析成功,共找到 123 个字段。
以订单到货为例,元数据的数据中 Entry 中的内容如下:
[{
'Key': 'FBillNo',
'Name': [{
'Key': 2052,
'Value': '单据编号'
}, {
'Key': 1033,
'Value': 'Doc No.'
}, {
'Key': 3076,
'Value': '單據編號'
}],
'FieldName': 'FBILLNO',
'PropertyName': 'BillNo',
'FieldType': 231,
'EntityKey': 'FBillHead',
'TableName': 't_STK_InStock',
'ElementType': 12,
'MustInput': 0,
'LookUpObjectFormId': None,
'LookUpObjectID': None,
'EnumObjectId': None,
'Extends': None,
'ControlFieldKey': None,
'GroupFieldTableName': None,
'DefValue': '',
'IsViewVisible': True,
'IsEditVisible': True,
'IsNewVisible': True,
'IsNewLock': False,
'IsEditLock': False,
'Editlen': 30,
'ConditionType': '0'
}, {
'Key': 'FDocumentStatus',
'Name': [{
'Key': 2052,
'Value': '单据状态'
}, {
'Key': 1033,
'Value': 'Doc Status'
}, {
'Key': 3076,
'Value': '單據狀態'
}],
'FieldName': 'FDOCUMENTSTATUS',
'PropertyName': 'DocumentStatus',
'FieldType': 175,
'EntityKey': 'FBillHead',
'TableName': 't_STK_InStock',
'ElementType': 40,
'MustInput': 0,
'LookUpObjectFormId': None,
'LookUpObjectID': None,
'EnumObjectId': None,
'Extends': [{
'Value': 'Z',
'Caption': '暂存',
'Seq': 0,
'Invalid': False
}, {
'Value': 'A',
'Caption': '创建',
'Seq': 2,
'Invalid': False
}, {
'Value': 'B',
'Caption': '审核中',
'Seq': 3,
'Invalid': False
}, {
'Value': 'C',
'Caption': '已审核',
'Seq': 4,
'Invalid': False
}, {
'Value': 'D',
'Caption': '重新审核',
'Seq': 5,
'Invalid': False
}],
'ControlFieldKey': None,
'GroupFieldTableName': None,
'DefValue': 'Z',
'IsViewVisible': True,
'IsEditVisible': True,
'IsNewVisible': True,
'IsNewLock': True,
'IsEditLock': True,
'Editlen': 0,
'ConditionType': '9'
}, {
'Key': 'FStockOrgId',
'Name': [{
'Key': 2052,
'Value': '收料组织'
}, {
'Key': 1033,
'Value': 'Receipt Org.'
}, {
'Key': 3076,
'Value': '收料組織'
}],
'FieldName': 'FSTOCKORGID',
'PropertyName': 'StockOrgId',
'FieldType': 127,
'EntityKey': 'FBillHead',
'TableName': 't_STK_InStock',
'ElementType': 7,
'MustInput': 1,
'LookUpObjectFormId': 'ORG_Organizations',
'LookUpObjectID': '3f8da3f4-fe84-4456-9548-7c2c38b7c57f',
'EnumObjectId': None,
'Extends': None,
'ControlFieldKey': None,
'GroupFieldTableName': None,
'DefValue': '',
'IsViewVisible': True,
'IsEditVisible': True,
'IsNewVisible': True,
'IsNewLock': False,
'IsEditLock': False,
'Editlen': 0,
'ConditionType': '0,20'
}, {
'Key': 'FDate',
'Name': [{
'Key': 2052,
'Value': '入库日期'
}, {
'Key': 1033,
'Value': 'Warehouse Receipt Date'
}, {
'Key': 3076,
'Value': '入庫日期'
}],
'FieldName': 'FDATE',
'PropertyName': 'Date',
'FieldType': 61,
'EntityKey': 'FBillHead',
'TableName': 't_STK_InStock',
'ElementType': 4,
'MustInput': 1,
'LookUpObjectFormId': None,
'LookUpObjectID': None,
'EnumObjectId': None,
'Extends': None,
'ControlFieldKey': None,
'GroupFieldTableName': None,
'DefValue': None,
'IsViewVisible': True,
'IsEditVisible': True,
'IsNewVisible': True,
'IsNewLock': False,
'IsEditLock': False,
'Editlen': 0,
'ConditionType': '2'
},
…………………………
不得不说,金蝶 API 的帮助文档是真差,为了一个数据类型,我搜索了好几天才从一个论坛文章中找到一丝线索。金蝶的数据类型编码到类型的转化如下:
作为单一主键的数据库数据类型有以下6种:
1、smallint
system_type_id=52 举例:T_BAS_NUMBER.FID
2、int
system_type_id=56 举例:T_BAS_ITEM.FITEMID
3、bigint
system_type_id=127 举例:T_SAL_INITOUTSTOCK.FID
4、varchar
system_type_id=167 举例:T_BAS_PUBNEEDS.FID
5、char
system_type_id=175 举例:CMK_BD_KTMDELETELOG.FID
6、nvarchar
system_type_id=231 举例: T_AM_NETCONTROL.FID
作者:i求知若渴
来源:金蝶云社区
原文链接:https://vip.kingdee.com/article/61129193165550592?productLineId=1&lang=zh-CN
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
三、企业级 DW 中心开发
当然,上面是我个人用 Python 完成的测试,最终开发使用了更加健壮的 java 完成。并结合定时任务实现高效的增量刷新。
- 增量:有新订单追加(随着同步周期不同而不同,相对较多)
- 覆盖:历史订单有变更覆盖数据(相对较多)
- 删除:历史订单有删除同步删除或“标记删除”(非常之少)

目前,我用 java 完成了采购及部分主题的数据表查询,这样就建立了后续 BI 的分析基础,并且尽可能不影响业务系统。
补充:我的测试数据一览

四、最精彩的部分:Tableau 分析
分析始于建模,建模的典型是关系模型(relationship)。
如图所示,展示了采购订单和供应商信息的基本模型,后续可以增加物料等基本信息,构成典型的单事实多维度模型。

模型之后就是仪表板啦,让大家看看近期的作品(列表)吧。

注:客户数据不便公开。
2025/09/30 update by 喜乐君