介绍
在当今数字化的世界中,人们越来越倾向于通过在线交易和数字支付来进行交易,而不是使用现金,这是因为它的便利性。随着过渡的增加,欺诈行为也在增加。欺诈交易可以是任何类型,因为它涉及使用虚假身份或虚假信息要求钱款。这给个人和金融机构带来了重大问题。在这个项目中,我们将使用信用卡数据集来设计使用Airflow工具监控实时交易并预测其是否真实或欺诈的MLOPs模型。
学习目标
- 检测欺诈交易的重要性。
- 清理数据,转换数据集和预处理数据。
- 对数据集进行可视化分析以获得洞察力。
- 在数据科学中使用欺诈交易检测模型的实际应用。
- 使用Python编程语言进行欺诈交易数据分析
- 使用MS Azure和Airflow构建端到端的欺诈检测
本文作为数据科学博文马拉松的一部分发布。
什么是欺诈交易估计模型?
欺诈交易数据集包含来自不同来源的数据,其中包含交易时间、姓名、金额、性别、类别等列。欺诈交易估计模型是一个用于预测虚假交易的机器学习模型。该模型是在大量有效交易和欺诈交易的基础上进行训练的,以预测新的虚假交易。
什么是欺诈交易分析?
欺诈交易分析是分析过去数据集的过程。数据集分析旨在发现数据中的异常情况并找出数据集中的模式。欺诈交易分析在保护客户和减少财务损失方面起着关键作用。有不同类型的欺诈交易分析,例如基于规则的分析和异常检测。
- 基于规则的分析:基于规则的分析涉及创建规则来标记无效交易。例如,可以根据地理区域制定规则。
- 异常检测:异常检测涉及发现异常或异常的交易。例如,从新的IP地址进行的交易。
检测欺诈交易的重要性
对于企业和金融机构来说,检测欺诈交易对于保护客户免受欺诈和保护他们的资金至关重要。以下是检测欺诈交易的一些关键原因。
- 减少财务损失:欺诈交易给企业带来巨额财务损失,从而减少它们的利润。因此,企业检测欺诈交易变得至关重要。
- 维护声誉:维护声誉对于企业来说是至关重要的,因为它会导致潜在客户和顾客的流失。
- 保护客户和企业:欺诈交易可能对客户造成财务损失和情感影响。通过检测欺诈交易,企业可以保护客户和他们的业务。
数据收集和预处理
数据收集和预处理是开发欺诈检测模型的重要部分。一旦收集到数据,需要对数据集执行多个步骤。
- 数据清理:数据清理包括删除不需要的数据,例如重复数据,并填充缺失的数据值。
- 数据转换:数据转换涉及将数据列转换为可用于分析的所需数据类型。此步骤确保数据质量。
- 数据探索:数据探索涉及了解数据集并找出数据之间的关系和模式。
- 处理不平衡数据:欺诈检测数据集高度不平衡,因为有效交易很多,欺诈交易数量很少。因此,模型容易出现过拟合的问题。可以使用过采样或欠采样技术解决这个问题。使用这些技术,我们可以创建一个平衡的数据集。
使用库对欺诈检测数据集进行可视化
仅看数值可能无法帮助您建立它们之间的联系。我们将使用Python库绘制图形和图表,以从数据集中获取洞察。
- Matplotlib:Python中的重要工具之一,用于创建不同类型的图表和图形,如条形图和折线图。
- Seaborn:这是Python中的另一个可视化工具。它可以创建更详细的可视化图像,如热力图和小提琴图。
用于可视化酒店预订数据集的技术。
- Countplot:Countplot用于绘制分类值的直方图。这可以帮助绘制不同分类值的欺诈情况,有助于了解它们之间的关系。
- Distplot:使用distplot,我们可以确定随时间变化的分布情况。这有助于检查偏斜。
欺诈检测MLOPs模型的用例和应用
欺诈检测MLOPs模型在不同行业中有多种用例。以下是用例应用:
- 银行和金融机构:银行和金融机构使用MLOP工具检测信用卡和保险欺诈等欺诈交易。通过使用这些工具,他们可以减少欺诈。
- 电子商务和零售行业:在购买商品期间识别欺诈交易可以帮助保护客户数据和公司业务。
- 医疗保健和酒店行业:医疗保健行业使用MLOPs来检测虚假医疗索赔或账单行为。使用检测模型可以减少这些虚假行为。
- 电信和电子票务:检测虚假的SIM卡更换、订阅欺诈和虚假预订。可以使用MLOPs模型解决这个问题。
欺诈检测MLOPs模型的挑战和最佳实践
建立欺诈检测模型有许多挑战,原因如下:
- 数据质量:欺诈检测模型是基于历史数据进行训练的,在其中数据质量起着关键作用。数据集质量越好,模型的准确性就越高。欺诈数据集更多是一个不均衡的数据集,这意味着与无效交易相比,我们有更多的有效交易。这在模型训练阶段带来了挑战。
- 数据隐私:欺诈交易常常是由于违反客户或业务数据的未经授权的访问引起的。设计模型的方式应为维护客户数据隐私。
- 模型漂移:模型漂移由于数据质量的变化而带来了重大挑战。确保模型不会面临模型漂移需要保持数据质量并监控模型。
- 实时处理:实时处理往往带来更多的挑战,因为它增加了一个复杂的层次。
构建欺诈检测模型的最佳实践:
构建欺诈检测模型的最佳实践如下:
- 特征工程:在完成数据收集后,我们进行数据预处理和特征工程,以确保获得高质量的数据。
- 不平衡数据处理:欺诈检测数据集通常存在不平衡问题,有效交易较多,欺诈交易较少。这种不平衡往往导致模型的偏倚。为了解决这个问题,我们使用欠采样和过采样技术。
- 模型构建:使用集成技术训练模型,确保结果模型具有良好的准确性。生成的模型将能够预测欺诈和有效交易。
- MLOPs:使用MLOPs框架构建整个解决方案的生命周期,从训练到部署和监控。该框架为模型构建设置规则,确保模型准确可靠。
欺诈检测MLOPs模型的未来趋势和进展
随着数字化的增加和互联网采用率的增加,越来越多的人将使用数字支付和在线预订设施。随着技术发展的增加,它将创造一种简便快捷的支付工具。因此,开发一种能够防止欺诈并增加客户对公司和服务的信任的工具也变得非常重要。企业经常寻找可靠、易于使用、具有成本效益的解决方案。技术在其中可以起到重要作用。围绕金融产品构建工具和服务可以帮助企业为客户提供各种服务。可以提供个性化的金融产品,从而增强客户的信任并改善客户与企业之间的关系。
使用Python进行欺诈检测数据分析
让我们使用Python在Kaggle的数据集上进行基础数据分析实现。要下载数据集,请点击此处。
数据详情
欺诈检测数据集包含超过100万条记录,用于模型训练。以下是数据集的详细信息:
步骤1 导入库
import randomimport calendarimport pandas as pdimport numpy as npimport seaborn as snsimport matplotlib.pyplot as pltfrom scipy.stats import norm, skew, ttest_indimport warningswarnings.filterwarnings('ignore')
步骤2 导入数据集并检查数据
#从训练和测试文件中读取数据,然后转换为数据框df=pd.read_csv('C:\Decodr\G3\B\FRAUD_DETECTION_IN_IMBALANCED_DATA\data2\Train.csv')T_df=pd.read_csv('C:\Decodr\G3\B\FRAUD_DETECTION_IN_IMBALANCED_DATA\data2\Test.csv')#数据框形状df.shape,T_df.shape((1048575, 22), (555719, 22))#检查训练和测试数据框的信息df.info(),T_df.info()#检查训练和测试数据框中的空值df.isna().sum(),T_df.isna().sum()
输出
步骤3 可视化数据集
#根据类别绘制欺诈图sns.countplot(data=df[df['is_fraud_cat'] == "T"], x='category')plt.xticks(rotation=45)plt.show()
输出
见解大多数欺诈发生在购物网和杂货店。
#根据性别绘制欺诈图sns.countplot(data=df[df['is_fraud_cat']=="T"],x='gender') plt.show()
输出
见解虽然女性客户遭遇欺诈案例较多,但男性和女性的数量几乎相同。
#根据州绘制欺诈图fig, ax = plt.subplots(figsize=(120,60))plt.rcParams.update({'font.size': 60})sns.countplot(data=df[df['is_fraud_cat']=="T"],x='state')plt.xticks(rotation=45)for p, label in zip(ax.patches, df["state"].value_counts().index): ax.annotate(label, (p.get_x(), p.get_height()+0.15))plt.title("各州信用卡欺诈次数")plt.show()
输出
见解俄亥俄州、得克萨斯州和路易斯安那州报告了最多的信用卡欺诈案件。
#根据城市绘制欺诈图def randomcolor(): r = random.random() b = random.random() g = random.random() rgb = [r,g,b] return rgbplt.rcParams.update({'font.size': 20})df[df['is_fraud_cat']=="T"]["city"].value_counts(sort=True,ascending=False).head(10).plot(kind="bar",color=randomcolor())plt.title("城市信用卡欺诈次数")plt.show()
输出
洞察达拉斯、休斯顿和伯明翰分别报告了最多的城市欺诈案。
#欺诈基于工作df[df['is_fraud_cat']=="T"]["job"].value_counts(sort=True,ascending=False).head(10).plot(kind="bar",color=randomcolor())plt.title("职业信用卡欺诈数量")plt.show()
输出
洞察数量调查员的工作中发生了最多的欺诈,其次是航海建筑师和材料工程师的工作。
#欺诈与非欺诈 plt.figure(figsize=(8,5))ax = sns.countplot(x="is_fraud", data=df,color=randomcolor())for p in ax.patches: ax.annotate('{:.1f}'.format(p.get_height()), (p.get_x()+0.25, p.get_height()+0.01))plt.show()
输出
洞察在近百万条记录中,仅有约6006条是欺诈交易记录,因此我们面对的是一个不平衡的数据集。
步骤4. 预处理和特征工程
data['trans_date_trans_time'] = pd.to_datetime(data['trans_date_trans_time'], format='%d-%m-%Y %H:%M')data['trans_date']=data['trans_date_trans_time'].dt.strftime('%Y-%m-%d')data['trans_date']=pd.to_datetime(data['trans_date'])data['dob']=pd.to_datetime(data['dob'],format='%d-%m-%Y')data["age"] = data["trans_date"]-data["dob"]data["age"] = data["age"].astype('int64')data['trans_month'] = pd.DatetimeIndex(data['trans_date']).monthdata['trans_year'] = pd.DatetimeIndex(data['trans_date']).yeardata['Month_name'] = data['trans_month'].apply(lambda x: calendar.month_abbr[x])data['latitudinal_distance'] = abs(round(data['merch_lat']-data['lat'],3))data['longitudinal_distance'] = abs(round(data['merch_long']-data['long'],3))data.gender=data.gender.apply(lambda x: 1 if x=="M" else 0)data = data.drop(['cc_num','merchant','first','last','street','zip','trans_num','unix_time','trans_date_trans_time','city','lat','long','job','dob','merch_lat','merch_long','trans_date','state','Month_name'],axis=1)data =pd.get_dummies(data,columns=['category'],drop_first=True)#执行欠采样normal = data[data['is_fraud']==0]fraud = data[data['is_fraud']==1]normal_sample=normal.sample(n=len(fraud),random_state=42)new_data = pd.concat([normal_sample,fraud],ignore_index=True)
使用MS Azure和Airflow进行端到端模型构建
在上述步骤中,我正在本地读取数据文件进行可视化,但在实施部分,我们将使用MS Azure等云服务。我将向您展示我如何将MS Azure与Airflow工具集成以进行数据摄取和模型构建。首先,在MS Azure中创建一个存储账户,然后在其中创建一个容器。在此容器中存储文件。我们将构建一个Airflow管道,从容器中获取数据并将其保存在所需位置。此后,我们将构建一个端到端模型,然后将其部署在流式云中,以公开访问。
要创建存储账户,您必须创建一个Azure账户。请按照以下步骤进行操作:
- 创建一个MS Azure账户
- 创建一个存储账户
- 在存储账户中创建一个容器
- 容器创建完成后,手动或使用Airflow DAG上传文件。
Airflow 是什么?
Airflow 是一个开源的工作流管理平台,用于构建和监控模型。它使用有向无环图 (DAG) 来定义工作流。Airflow 提供了以下几个优点:
- 动态定义工作流:在 Airflow 中,我们可以使用 Python 定义自定义的工作流。您可以轻松创建和修改此工作流,从而提供了工作流的灵活性。
- 可扩展:通过使用分布式架构,您可以快速扩展 Airflow,以同时处理多个工作流。
- 监控和日志记录:Airflow 提供了一个用户友好的 web 接口。使用 web 接口,用户可以监控和查看日志,从而帮助快速排除问题。
- 并行执行:Airflow 提供了并行执行工作流的功能,可以显著减少运行时间,同时提高模型性能。
在现实世界中,仅构建模型是不够的;我们还必须将模型部署到生产环境中,并随着时间的推移监控模型的性能以及与真实世界数据的交互方式。我们可以使用 Airflow 构建端到端的机器学习流程,并对其进行监视。在 Airflow 中,我们可以创建工作流并设置它们的执行依赖关系。可以在 Airflow 中检查工作流的状态,无论是成功完成、失败重新启动等。工作流执行后,可以在 Airflow 中监控日志。通过这种方式,我们可以跟踪我们的生产就绪模型。我强烈建议您查阅Airflow文档以获取更多详细信息。
工作流程
工作流程包括以下步骤:
- data_upload_operator:- 此操作符将从本地存储中获取文件并将其上传到 Azure Blob 容器中。
- data_download_operator:- 此操作符将从 Azure 下载文件并保存到本地存储中。
- data_preprocessing_operator:- 此操作符对从 Azure 下载的数据集进行预处理。
- data_split_operator:- 此操作符将数据集分为两部分。第一部分用于训练模型,第二部分用于测试模型。
- model_training_operator:- 此操作符用于在数据集上训练模型。
- model_evaluation_operator:- 此操作符用于评估模型的性能。
- model_prediction_operator:- 此操作符用于在新的未知数据集上进行模型预测。
模型开发
如上所述,我们有不同的 Airflow 操作符。现在,让我们转向编写代码的部分。
data_upload_operator
from azure.storage.blob import BlobServiceClientfrom config.constant import storage_account_key, storage_account_name, connection_string, container_name, file_path_up, file_namedef uploadToBlobStorage(): try: blob_service_client = BlobServiceClient.from_connection_string (connection_string) blob_client = blob_service_client.get_blob_client (container = container_name, blob = file_name) with open(file_path_up,"rb") as data: blob_client.upload_blob(data) print("从本地上传文件 " + file_name + " 到容器 " + container_name) except Exception as e: print(f"出现错误:{str(e)}")uploadToBlobStorage()
上面,我们定义了 uploadToBlobStorage() 方法,它将与 MS Azure 存储账户进行连接。然后,它将从本地存储获取文件并将其上传到云端。
data_download_operator
from azure.storage.blob import BlobServiceClientfrom config.constant import storage_account_key, storage_account_name, connection_string, container_name, blob_name, file_path_downdef downloadFromBlobStorage(): try: # 使用连接字符串初始化 BlobServiceClient blob_service_client = BlobServiceClient.from_connection_string (connection_string) # 获取目标 Blob 的 BlobClient blob_client = blob_service_client.get_blob_client (container=container_name, blob=blob_name) # 将 Blob 下载到本地文件 with open(file_path_down, "wb") as data: data.write(blob_client.download_blob().readall()) print(f"已从 {container_name} 下载 {blob_name} 到 {file_path_down}") except Exception as e: print(f"出现错误:{str(e)}")downloadFromBlobStorage()
在这里,定义了downloadFromBlobStorage()方法。它将连接到存储帐户并下载文件。然后,文件将存储在本地路径上。
data_preprocessing_operator
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import pandas as pd
import calendar
class DataPreprocessingOperator(BaseOperator):
@apply_defaults
def __init__(self, preprocessed_data, *args, **kwargs):
super(DataPreprocessingOperator, self).__init__(*args, **kwargs)
self.preprocessed_data = preprocessed_data
def execute(self, context):
try:
# 在这里执行数据预处理逻辑
# 例如,您可以在摄入的数据中清洁、转换或构建特征
data = pd.read_csv('data/processed/ingested_data.csv')
data['trans_date_trans_time'] = pd.to_datetime(data['trans_date_trans_time'], format='%d-%m-%Y %H:%M')
data['trans_date'] = data['trans_date_trans_time'].dt.strftime('%Y-%m-%d')
data['trans_date'] = pd.to_datetime(data['trans_date'])
data['dob'] = pd.to_datetime(data['dob'], format='%d-%m-%Y')
data["age"] = data["trans_date"] - data["dob"]
data["age"] = data["age"].astype('int64')
data['trans_month'] = pd.DatetimeIndex(data['trans_date']).month
data['trans_year'] = pd.DatetimeIndex(data['trans_date']).year
data['Month_name'] = data['trans_month'].apply(lambda x: calendar.month_abbr[x])
data['latitudinal_distance'] = abs(round(data['merch_lat'] - data['lat'], 3))
data['longitudinal_distance'] = abs(round(data['merch_long'] - data['long'], 3))
data.gender = data.gender.apply(lambda x: 1 if x == "M" else 0)
data = data.drop(['cc_num', 'merchant', 'first', 'last', 'street', 'zip', 'trans_num', 'unix_time', 'trans_date_trans_time', 'city', 'lat', 'long', 'job', 'dob', 'merch_lat', 'merch_long', 'trans_date', 'state', 'Month_name'], axis=1)
data = pd.get_dummies(data, columns=['category'], drop_first=True)
# 执行欠采样
normal = data[data['is_fraud'] == 0]
fraud = data[data['is_fraud'] == 1]
normal_sample = normal.sample(n=len(fraud), random_state=42)
new_data = pd.concat([normal_sample, fraud], ignore_index=True)
# 执行过采样
# normal = data[data['is_fraud']==0]
# fraud = data[data['is_fraud']==1]
# fraud_sample=fraud.sample(n=len(normal),replace=True,random_state=42)
# new_data = pd.concat([normal,fraud_sample],ignore_index=True)
# 将预处理后的数据保存到输出文件(例如CSV文件)
new_data.to_csv(self.preprocessed_data, index=False)
except Exception as e:
self.log.error(f'数据预处理失败:{str(e)}')
raise e
- 上面,我们更改了数据类型并删除了一些列
- 由于数据集不平衡,我们进行了欠采样。我们还放置了过采样的代码。
model_training_operator
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
import joblib
class ModelTrainingRFCOperator(BaseOperator):
"""自定义Apache Airflow操作器,用于训练机器学习模型并将其保存到文件中。"""
def __init__(self, X_train_file, y_train_file, model_file, *args, **kwargs):
"""初始化操作器。
:param X_train_file: 训练集特征的文件路径(X_train)。
:param y_train_file: 训练集标签的文件路径(y_train)。
:param model_file: 保存训练好的模型的文件路径。
"""
super(ModelTrainingRFCOperator, self).__init__(*args, **kwargs)
self.X_train_file = X_train_file
self.y_train_file = y_train_file
self.model_file = model_file
def execute(self, context):
self.log.info(f'使用数据来训练机器学习模型:{self.X_train_file, self.y_train_file}')
try:
X_train = pd.read_csv(self.X_train_file)
y_train = pd.read_csv(self.y_train_file)
print(X_train.shape)
print(y_train.shape)
# 初始化并训练机器学习模型(替换为您的模型类)
RFC = RandomForestClassifier(n_estimators=100, random_state=0)
RFC.fit(X_train, y_train)
# 将训练好的模型保存到提供的model_file中
joblib.dump(RFC, self.model_file)
except Exception as e:
self.log.error(f'模型训练失败:{str(e)}')
raise e
在预处理和数据拆分之后,下一步是训练模型。在代码中,我们使用了RandomForestClassifier进行模型训练。
model_evaluation_operator
from airflow.models import BaseOperatorfrom airflow.utils.decorators import apply_defaultsimport pandas as pdfrom sklearn.metrics import accuracy_score, classification_reportimport joblibclass ModelEvaluationRFCOperator(BaseOperator): """ 自定义Apache Airflow运算符,用于评估机器学习模型并将评估结果保存到文件中。 """ @apply_defaults def __init__(self, X_test_file, y_test_file, model_file, output_file, *args, **kwargs): """ 初始化运算符。 :param X_test_file: 测试集特征的文件路径(X_test)。 :param y_test_file: 测试集标签的文件路径(y_test)。 :param model_file: 加载训练好的模型的文件路径。 :param output_file: 保存评估结果的文件路径。 """ super(ModelEvaluationRFCOperator, self).__init__(*args, **kwargs) self.X_test_file = X_test_file self.y_test_file = y_test_file self.model_file = model_file self.output_file = output_file def execute(self, context): self.log.info(f'使用来自{self.X_test_file,self.y_test_file}的数据评估机器学习模型') # 通过XCom从前一个任务中检索测试数据 test_data = context['ti'].xcom_pull(task_ids='data_split_task', key='test_data') try: """ 执行运算符以评估机器学习模型并将评估结果保存到文件中。 """ # 从提供的文件中加载测试数据和训练模型 X_test = pd.read_csv(self.X_test_file) y_test = pd.read_csv(self.y_test_file) model = joblib.load(self.model_file) # 使用训练好的模型进行预测 y_pred = model.predict(X_test) # 计算并打印评估指标 accuracy = accuracy_score(y_test, y_pred) classification_rep = classification_report (y_test, y_pred, target_names=['类别_0', '类别_1']) # 根据需要自定义标签 # 将评估结果保存到指定的输出文件中 with open(self.output_file, 'w') as f: f.write(f"准确率:{accuracy}\n\n分类报告:\n {classification_rep}") except Exception as e: self.log.error(f'模型评估失败:{str(e)}') raise e
模型训练后,我们对模型进行评估并准备分类报告。在这里,我们检查模型的准确率、精确率、召回率和F1分数。
model_prediction_operator
from airflow.models import BaseOperatorfrom airflow.utils.decorators import apply_defaultsimport pandas as pdfrom sklearn.metrics import accuracy_score, classification_reportimport joblibimport calendarclass ModelPredictionOperator(BaseOperator): """ 自定义Apache Airflow运算符,用于评估机器学习模型并将评估结果保存到文件中。 """ @apply_defaults def __init__(self, input_file, model_file, output_file, *args, **kwargs): """ 初始化运算符。 :param input_file: 输入文件的文件路径。 :param model_file: 加载训练好的模型的文件路径。 :param output_file: 保存评估结果的文件路径。 """ super(ModelPredictionOperator, self).__init__(*args, **kwargs) self.input_file = input_file self.model_file = model_file self.output_file = output_file def execute(self, context): self.log.info(f'使用来自{self.input_file}的数据评估机器学习模型') try: """ 执行运算符以评估机器学习模型并将评估结果保存到文件中。 """ # 从提供的文件中加载测试数据和训练模型 new_data = pd.read_csv('data/raw/Test.csv') new_data['trans_date_trans_time'] = pd.to_datetime (new_data['trans_date_trans_time'], format='%d-%m-%Y %H:%M') new_data['trans_date']=new_data['trans_date_trans_time']. dt.strftime('%Y-%m-%d') new_data['trans_date']=pd.to_datetime(new_data['trans_date']) new_data['dob']=pd.to_datetime(new_data['dob'],format='%d-%m-%Y') new_data["age"] = new_data["trans_date"]-new_data["dob"] new_data["age"] = new_data["age"].astype('int64') new_data['trans_month'] = pd.DatetimeIndex(new_data['trans_date']).month new_data['trans_year'] = pd.DatetimeIndex(new_data['trans_date']).year new_data['Month_name'] = new_data['trans_month'].apply(lambda x: calendar.month_abbr[x]) new_data['latitudinal_distance'] = abs(round(new_data['merch_lat']-new_data['lat'],3)) new_data['longitudinal_distance'] = abs(round(new_data['merch_long']-new_data['long'],3)) new_data.gender=new_data.gender.apply(lambda x: 1 if x=="M" else 0) new_data = new_data.drop(['cc_num','merchant','first','last','street', 'zip','trans_num','unix_time','trans_date_trans_time','city','lat', 'long','job','dob','merch_lat','merch_long','trans_date','state', 'Month_name'],axis=1) new_data =pd.get_dummies(new_data,columns=['category'],drop_first=True) X_new = new_data.drop(["is_fraud"],axis=1) y_new = new_data["is_fraud"] model = joblib.load(self.model_file) # 使用训练好的模型进行预测 y_pred_new = model.predict(X_new) print('y_new', y_new) print('y_pred_new',y_pred_new) # 计算并打印评估指标 accuracy = accuracy_score(y_new, y_pred_new) classification_rep = classification_report (y_new, y_pred_new, target_names=['类别_0', '类别_1']) # 根据需要自定义标签 # 将评估结果保存到指定的输出文件中 with open(self.output_file, 'w') as f: f.write(f"准确率:{accuracy}\n\n分类报告:\n {classification_rep}") except Exception as e: self.log.error(f'模型评估失败:{str(e)}') raise e
在预测运算符中,我们正在对模型进行新数据集的测试,即测试数据文件。预测之后,我们将准备一个分类报告。
在云中设置环境和模型部署
使用python或anaconda创建虚拟环境。
#创建虚拟环境的命令python3 -m venv <virtual_environment_name>
您需要使用以下命令在环境中安装一些Python包。
cd airflow-projects/fraud-predictionpip install -r requirements.txt
在运行工作流之前,您必须安装airflow并设置数据库。
#安装airflowpip install 'apache-airflow==2.7.1' \ --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.7.1/ constraints-3.8.txt"#设置主目录路径export AIRFLOW_HOME=/c/Users/[YourUsername]/airflow#初始化数据库:airflow db init#创建Airflow用户airflow users create --username admin –password admin –firstname admin –lastname admin –role Admin –email [email protected]#检查创建的用户airflow users list#运行Web服务器#运行调度器airflow scheduler#如果默认端口8080正在使用,请通过输入以下命令更改端口:airflow webserver –port <port number>
我们可以使用上面创建的用户名和密码登录Airflow Web门户。
上面,我们创建了可以使用Airflow DAG运行的不同的airflow操作符。我们可以通过单击来触发DAG。
在成功完成或失败之前,工作流程会经过不同的状态。下面是显示的状态:
下面是我们上面讨论的不同操作符。当工作流程执行时,我们也可以实时监测工作流程状态。
我们可以在Airflow中监视工作流程触发的DAG的日志。以下是示例。
在云中部署模型
在得到最佳模型之后,我们使用streamlit代码部署模型。要在本地系统上运行此Streamlit应用程序,请使用以下命令:
#在本地运行streamlit应用程序的命令streamlit run streamlit_app.py
应用程序的云版本也可以使用以下URL访问,可以公开访问。
https://fraud-prediction-mlops-d8rcgc2prmv9xapx5ahhhn.streamlit.app/
有关端到端完整的ML实现代码,请点击这里。
结果
我们尝试了多种算法并比较了每个模型的性能。结果如下:
在上述结果的基础上,使用集成学习技术处理高度不平衡的数据集后,我们可以看到四个模型的性能都非常好,准确率超过90%。随机森林分类器和决策树分类器的准确率几乎相同,但随机森林略好于决策树。
- 准确性:准确性是正确预测数与总预测数的比率。
- 精确度:精确度是在正类别中进行的正确预测数量。
- 召回率:召回率定义为数据集中从所有实际正样本中进行的正确乐观预测数。
- F1得分:F1得分衡量模型的准确性。其定义为精确度和准确性的调和平均数。
演示应用
使用Streamlit制作的此项目的实时演示应用。它会获取一些关于产品的输入特征,并使用我们的训练模型来预测有效或欺诈性交易。
结论
今天的世界是数字化的,技术已经成为我们生活的一部分。从书籍到智能手机和笔记本电脑,网络服务的增加让人们可以在线购买任何东西。因此,防止欺诈和实施欺诈检测模型对每个公司都变得至关重要。机器学习可以在企业和客户中扮演重要角色。
- 通过识别欺诈性财务交易,它提高了企业的利润。
- 帮助维护商业声誉并增加客户。
- ML工具有助于提供可访问和更好的服务。
- 帮助提供良好的服务并与客户建立信任。
常见问题
本文中显示的媒体不归Analytics Vidhya所有,仅由作者决定使用。