Press "Enter" to skip to content

利用快速工程从SAP系统中掌握数据集成

建筑工程师调查工作 - 稳定扩散

介绍

在我们之前的一篇文章中,从数据工程到提示工程,我们演示了如何利用ChatGPT解决数据准备任务。除了我们收到的良好反馈之外,还提出了一个关键问题:提示工程在简单任务中可能有所帮助,但在更具挑战性的环境中是否真的有用?这是一个公正的观点。在最近的几十年里,数据架构变得越来越多样化和复杂化。由于这种复杂性,数据工程师越来越需要整合他们可能不熟悉的各种数据源。提示工程能在这种情况下帮助吗?

本文基于人力资源管理的真实用例来探讨这个问题。我们将应用少量样本学习将SAP HCM数据模型介绍给ChatGPT,并使用Apache Spark分析收集到的信息。通过这种方式,我们将说明即使在高级数据工程环境中,提示工程也可以提供价值。

关于业务案例

每个VoAGI面临的常见任务是确定其员工数量及其在任意时间点的组织分配。我们场景中相关的数据存储在SAP HCM系统中,这是企业环境中人力资源管理的领先应用之一。

为了解决这种目标,每个数据工程师都需要建立大量与底层数据模型密切相关的业务相关知识。

本文将提供一步一步的指南,通过创建可以用于构建数据模型以及任何报告解决方案基础的PySpark代码来解决所描述的业务问题。

PowerBi示例报告显示人员数量

步骤1:确定需要哪些信息

数据科学中的一个主要挑战是根据业务用例选择必要的信息并确定其在源系统中的来源。为了解决这个问题,我们需要为chatgpt带入一些业务知识。为此,我们将ChatGPT教授一些关于可以在SAP参考手册中找到的SAP HCM基本表的信息,结合每个表的csv样本记录。

在这个第一个场景中,我们的目的是报告特定时间点上所有活动员工。结果还应包括员工个人编号、姓名、状态和组织分配。

为了收集所有必要的信息,我们需要将数据库模式引入ChatGPT,并使用少量提示来包括示例数据集和字段描述。我们将从传播数据库模式和一些示例数据到ChatGPT开始。

了解SAP HCM数据模型的每个人都应该对infotype和transparent table的概念很熟悉。infotype包含所有事务性信息,而透明表包含每个实体的业务信息(主数据)。

对于接下来的场景,我们将使用OpenAI的GPT-4来创建我们需要的代码。让我们从向ChatGPT提供基本表信息开始。

Prompt:给定以下表格1. 表PA0000 - 员工操作字段;键;数据元素;域;数据类型;长度;小数位数;简短描述;检查表格MANDT;;MANDT;MANDT;CLNT;3;0;客户;T000PERNR;;PERSNO;PERSNO;NUMC;8 ;0;人员编号;PA0003SUBTY;;SUBTY;SUBTY;CHAR;4;0;子类型;OBJPS;;OBJPS;OBJPS;CHAR;2;0;对象识别...包括以下示例数据集:100;99992228;;;;9999-12-31T00:00:00.0000000;2023-07-01T00:00:00.0000000;0;2023-01-10T00:00:00.0000000;TRP0001;;;;;;;;;;;;;;10;3;;0;0100;99992229;;;;9999-12-31T00:00:00.0000000;2023-07-01T00:00:00.0000000;0;2023-03-29T00:00:00.0000000;TRP0001;;;;;;;;;;;;;;10;6;;0;02. 表T529U - 状态值字段;键;数据元素;域;数据类型;长度;小数位数;简短描述;检查表格MANDT;;MANDT;MANDT;CLNT;3;0;客户;*SPRSL;;SPRAS;SPRAS;LANG;1;0;语言键;T002STATN;;STATN;STATN;NUMC;1;0;状态编号;...包括以下示例数据集:MANDT,SPRSL,STATN,STATV,TEXT1100,D,2,3,激活100,D,2,0,离职....

步骤2:加入必要的基本表并仅筛选活跃员工

现在让我们创建代码来加入基本表:

提示:给我提供一个Pyspark代码,将“员工动作”限制为仅包含根据其相应的英语语言文本的“状态值”指出的活跃员工。使用"structureVoAGI.PA0000"作为Table PA0000的源,使用"structureVoAGI.T529U"作为Table T529U的源。使用有意义的列别名。

这将产生相当不错且格式良好的PySpark代码:

from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col# 初始化Spark Sessionspark = SparkSession.builder.appName("EmployeeActionsFilter").getOrCreate()# 将表读入DataFramesdf_pa0000 = spark.read.table("structureVoAGI.PA0000")df_t529u = spark.read.table("structureVoAGI.T529U")# 对T529U进行筛选,仅保留相关状态描述和语言df_active_status = df_t529u.filter(    (col("TEXT1") == "Active") &    (col("SPRSL") == "E"))# 基于状态字段STAT2,将PA0000与筛选后的T529U进行连接df_active_employees = df_pa0000.join(    df_active_status,    (df_pa0000.STAT2 == df_active_status.STATV) &    (df_active_status.STATN == 2)  # 假设STATN=2表示根据给定数据的就业状态).select(    df_pa0000["PERNR"].alias("EmployeeNumber"),    df_pa0000["MASSN"].alias("ActionType"),    df_pa0000["MASSG"].alias("ReasonForAction"),    df_active_status["TEXT1"].alias("StatusDescription"))# 展示结果df_active_employees.show()# 可选择将该DataFrame写入目标位置或进一步处理。

请注意,GPT-4识别出了两个表的连接条件,即基于Table PA0000的Column STAT2和Table T529U的Column STATV。此外,生成的代码还包含业务描述作为列别名,以提高可读性。

步骤3:构建一个时间轴以反映公司员工的历史

现在,我们将创建一个DataFrame,其中包含从2020年1月1日到2024年1月1日的日期值,并根据他们的入职日期(BEGDA)和可能的离职日期(ENDDA)加入所有有效的员工,我们需要创建一个人工时间轴来进行连接。

提示:使用Pyspark创建一个包含从2020-01-01到2024-01-01的每个日期的行的DataFrame。Date列应该格式化为日期。然后,根据其开始和结束日期,加入所有有效的员工。输出应仅包含根据其就业状态为活跃的员工。
from pyspark.sql import SparkSessionfrom pyspark.sql.functions import explode, sequence, to_date, lit, col# 开始一个Spark Sessionspark = SparkSession.builder.appName("EmployeeTimeline").getOrCreate()# 定义起始和结束日期start_date = to_date(lit("2020-01-01"))end_date = to_date(lit("2024-01-01"))# 创建一个包含每个日期一行的DataFramedate_df = (spark.range(1)          .select(explode(sequence(start_date, end_date)).alias("date")))# 加载PA0000和T529U表pa0000_df = spark.read.table("structureVoAGI.PA0000")t529u_df = spark.read.table("structureVoAGI.T529U")# 筛选T529U,获取'Active'员工并使用语言键Eactive_t529u_df = t529u_df.filter((col("SPRSL") == "E") & (col("TEXT1") == "Active"))# 将PA0000与筛选后的T529U进行连接以获取活跃的员工active_pa0000_df = pa0000_df.join(active_t529u_df, pa0000_df.STAT2 == active_t529u_df.STATV)# 使用日期范围DataFrame进行连接,获取活跃员工的时间轴active_employee_timeline_df = date_df.join(active_pa0000_df, (date_df.date >= active_pa0000_df.BEGDA) & (date_df.date <= active_pa0000_df.ENDDA))# 展示DataFrameactive_employee_timeline_df.show()

步骤4:通过确定最高级组织对象来解散父子关系表

在最后一步中,我们想要构建一个DataFrame来表示公司组织结构,并确定每个对象的组织分配。具体而言,我们想要确定每个子对象分配给哪个最高级别的组织单位(如区域或部门)。

组织结构可以用于连接员工时间线,并在稍后的步骤中获取每个员工的组织分配的详细信息。为了实现这一点,我们需要利用SAP的HRP1001表:

Prompt:以下表包含每个员工的组织分配:表HRP1001-组织结构字段;数据元素;域;数据类型;长度;小数位数;简短描述;检查表MANDT;MANDT;MANDT;CLNT;3;0;客户端;T000OTYPE;OTYPE;OTYPE;CHAR;2;0;对象类型;T778OOBJID;HROBJID;HROBJID;NUMC;8;0;对象ID;PLOGI...包含以下示例数据集:MANDT,OTYPE,OBJID,PLVAR,RSIGN,RELAT,ISTAT,PRIOX,BEGDA,ENDDA,VARYF,SEQNR,INFTY,OTJID,SUBTY,AEDTM,UNAME,REASN,HISTO,ITXNR,SCLAS,SOBID,PROZT,ADATANR100,P,00998833,A,209,1,,2018-01-01T00:00:00.0000000,9999-12-31T00:00:00.0000000,CP50035300,000,1001,P 00100769,A209,2020-12-11T00:00:00.0000000,TRP1000,,,00000000,CP,60095300,0.00,

Prompt:表HRP1001包含公司组织结构的父子信息。列OBJID包含对象ID,而SOBID包含相关的父对象ID。列SCLAS包含了对象ID是职位'S'、组织单位'O'还是角色'K'的信息。在Pyspark中:给我创建一个包含每个对象的最高可用组织单位对象ID的数据框的代码,通过递归解析层次结构。使用有意义的列别名。

语言模型生成了一个递归函数,该函数搜索每个对象的最高级别组织单位(“O”):

from pyspark.sql import SparkSessionfrom pyspark.sql.functions import col# 初始化一个Spark会话spark = SparkSession.builder.appName("HighestAvailableOrgUnit").getOrCreate()HRP1001_df = spark.read.table("structureVoAGI.HRP1001")# 递归函数来查找最高可用的组织单位def find_highest_org_unit(df, obj_id):    # 为当前对象ID过滤行    object_df = df.filter(col("OBJID") == obj_id)        if object_df.count() == 0:        return None        # 获取当前对象的父对象ID(SOBID)和SCLAS    parent_obj_id = object_df.select("SOBID").first()[0]    sclas = object_df.select("SCLAS").first()[0]        # 递归查找最高可用的组织单位    highest_org_unit = find_highest_org_unit(df, parent_obj_id)        # 如果当前对象是组织单位('O')或highest_org_unit为None,则更新结果    if sclas == 'O' or highest_org_unit is None:        highest_org_unit = obj_id        return highest_org_unit# 创建一个UDF(用户定义的函数)以应用递归函数from pyspark.sql.functions import udffrom pyspark.sql.types import StringTypefind_highest_org_unit_udf = udf(find_highest_org_unit, StringType())# 将UDF应用于找到每个对象的最高可用组织单位highest_org_unit_df = df.withColumn("HighestAvailableOrgUnit", find_highest_org_unit_udf(df, col("OBJID")))# 选择有意义的列和别名result_df = highest_org_unit_df.select(    col("OBJID").alias("ObjectID"),    col("HighestAvailableOrgUnit").alias("HighestAvailableOrgUnit"))# 显示结果DataFrameresult_df.show()# 停止Spark会话spark.stop()

即使创建的代码不是很优化,它包含一个递归函数来解析层次结构。偏好使用公共表达式(CTE)的用户应该在输入提示中给出提示(使用公共表达式)以创建更可读和可理解的PySpark语句。

from pyspark.sql import SparkSession# 创建一个Spark会话spark = SparkSession.builder.appName("OrganizationalUnitHierarchy").getOrCreate()# 定义HRP1001 DataFrame(假设您已经加载了它)# 用实际的DataFrame替换'hrp1001_df'# 确保您有适当的列:'OBJID','SOBID'和'SCLAS'# 将HRP1001 DataFrame注册为临时SQL表hrp1001_df.createOrReplaceTempView("hrp1001")# 定义递归查询以解散层次结构并查找最高组织单位递归查询 = """WITH RECURSIVE OrgHierarchy AS (  SELECT OBJID, SOBID, SCLAS  FROM hrp1001  WHERE SCLAS = 'O'  -- 从组织单位开始    UNION ALL    SELECT h.OBJID, p.SOBID, p.SCLAS  FROM OrgHierarchy h  JOIN hrp1001 p ON h.SOBID = p.OBJID)SELECT DISTINCT  h.OBJID AS EmployeeID,  h.SOBID AS HighestOrganizationalUnitFROM OrgHierarchy h"""# 执行递归查询并创建结果DataFrame result_df = spark.sql(recursive_query)# 显示结果result_df.show()# 当完成时停止Spark会话spark.stop()

两个代码版本都将创建用于展平层次结构组织的DataFrame,可以通过将其与先前生成的DataFrame连接来进一步进行数据集成步骤。

包含层次信息的数据集

结论

我们已经展示了如何通过使用ChatGPT从SAP系统中提取和集成数据来解决更复杂的数据工程任务的实际用例。虽然大型语言模型可能还不完美,但每个人都可以想象到这些技术对于数据工程师而言有多么强大。以下是一些关键要点:

  • ChatGPT能够理解数据模型的基本原理。您可以利用提示技术来细化其理解,从而提供更深入的知识。
  • 即使第一次尝试的方法不会产生完美的代码,我们也可以轻松调整创建的代码以适应我们的个别情况。
  • 由于开放式参考文档和SAP知识库的广泛可用性,该方法可以扩展为一种检索增强生成(RAG)解决方案。

在提示工程的最佳实践方面,尽量提供准确的信息,并提供Spark环境返回的错误代码,以利用LLMs重构生成的代码的能力。可能需要多次尝试来改进代码,然而,将“准确”之类的关键词添加到提示中可能有助于ChatGPT产生更好的结果。询问解决方案方法的详细解释,因为这将迫使ChatGPT的转换器模型深入挖掘。

注意:由于本文的长度限制,包含CSV示例数据集的提示必须被截断。

关于作者

Markus Stadi是Dehn SE的高级云数据工程师,多年来一直从事数据工程,数据科学和数据分析工作。

Christian Koch是BWI GmbH的企业架构师,同时也是纽伦堡理工学院的讲师。

Leave a Reply

Your email address will not be published. Required fields are marked *