介绍
你曾经想过会有一个系统可以预测电动车的效率,用户可以轻松使用该系统吗?在电动车的世界里,我们可以非常准确地预测电动车的效率。这个概念现在已经进入现实世界,我们对Zenml和MLflow心存无比感激。在这个项目中,我们将深入探索技术,并了解如何将数据科学、机器学习和MLOps结合在一起,创造出这项美丽的技术,并且你将看到我们如何使用ZenML来研究电动车。
![]()
学习目标
在这篇文章中,我们将学到以下内容:
- 了解Zenml是什么,以及如何在端到端的机器学习流水线中使用它。
- 了解MLFlow在创建机器学习模型实验跟踪器中的作用。
- 探索机器学习模型的部署过程以及如何设置预测服务。
- 了解如何创建一个用户友好的Streamlit应用,与机器学习模型预测进行交互。
这篇文章是作为数据科学博览会的一部分发表的。
理解电动车效率
- 电动车的效率是指电动车将电池中的电能转化为行驶里程的效率。通常以每千瓦时(kWh)的里程来衡量。
- 电动机和电池的效率、重量、空气动力学和辅助负载等因素影响着电动车的效率。因此,很明显,如果我们优化这些方面,我们可以提高电动车的效率。对消费者来说,选择一个效率更高的电动车会带来更好的驾驶体验。
- 在这个项目中,我们将建立一个端到端的机器学习流水线,使用真实世界的电动车数据来预测电动车的效率。准确地预测效率可以指导电动车制造商优化设计。
- 我们将使用ZenML,一个MLOps框架,来自动化机器学习模型的训练、评估和部署工作流程。ZenML提供了元数据跟踪、工件管理和模型可重现性等能力,覆盖了机器学习生命周期的各个阶段。
数据收集
对于这个项目,我们将从Kaggle开始收集数据。Kaggle是一个在线平台,提供许多用于数据科学和机器学习项目的数据集。您可以从任何地方收集数据。通过收集这个数据集,我们可以对我们的模型进行预测。在这里是我的GitHub代码库,您可以找到所有的文件或模板 – https://github.com/Dhrubaraj-Roy/Predicting-Electric-Vehicle-Efficiency.git
问题陈述
高效的电动车是未来的趋势,但准确预测电动车的续航里程非常困难。
解决方案
我们的项目将数据科学和MLOps结合起来,为预测电动车的效率创建一个精确的模型,使消费者和制造商受益。
设置虚拟环境
为什么我们想要设置虚拟环境?
它帮助我们使项目突出,不与系统中的其他项目发生冲突。
创建虚拟环境
- 对于Windows系统:
python -m venv myenv#然后激活虚拟环境myenv\Scripts\activate
- 对于Linux/macOS系统:
python3 -m venv myenv#然后激活虚拟环境source myenv/bin/activate
它有助于保持我们的环境清洁。
项目工作
当我们的环境准备好后,我们需要安装Zenml。那么Zenml是什么呢?Zenml是一个用于管理端到端机器学习流水线的机器学习操作(MLOps)框架。我们选择Zenml是因为它有效地管理了机器学习流水线。因此,您需要安装Zenml服务器。
在终端中使用以下命令安装Zenml服务器 –
pip install ‘zenml[server]’
这不是结束;在安装Zenml服务器后,我们需要创建Zenml存储库,以创建Zenml存储库 –
zenml init
为什么使用`zenml init`:`zenml init`用于初始化ZenML存储库,创建管理机器学习流水线和实验所需的结构。
安装要求
为了满足项目的依赖关系,我们使用了一个’requirements.txt’文件。在这个文件中,你应该有这些依赖关系。
catboost==1.0.4joblib==1.1.0lightgbm==3.3.2optuna==2.10.0streamlit==1.8.1xgboost==1.5.2markupsafe==1.1.1zenml==0.35.1
组织项目
在进行数据科学项目时,我们应该正确组织一切。让我解释一下我们如何在项目中保持结构化:
创建文件夹
我们将项目组织到文件夹中。有一些文件夹我们需要创建。
- 模型文件夹:首先,我们需要创建一个模型文件夹。它包含了我们的机器学习模型所需的基本文件。在这个文件夹中,我们有一些文件,如’data_cleaning.py’、’evaluation.py’和’model_dev.py’。这些文件就像帮助我们完成项目的不同工具。
- 步骤文件夹:该文件夹作为我们项目的控制中心。在“步骤”文件夹中,我们有各个数据科学过程阶段所需的重要文件。然后,我们必须在步骤文件夹中创建一些文件,如Ingest_data.py。这个文件帮助我们输入数据,就像为项目收集材料一样。接下来,Cleaning_data.py。它就像项目的一部分,你清洁和准备材料为主要工作。Model_train.py:这个文件是我们训练机器学习模型的地方,就像把你的材料塑造成最终产品一样。Evaluation.py:这个evaluation.py文件评估我们的模型,我们检查我们的最终产品的表现如何。
流水线文件夹
这是我们组装流水线的地方,类似于为项目设置生产线。在’Pipelines’文件夹中,’Training_pipeline.py’作为主要的生产机器。在这个文件中,我们导入’Ingest_data.py’和’ingest_df’类来准备数据,清洁数据,训练模型,并评估其性能。要运行整个项目,使用’run_pipeline.py’,类似于在生产线上按下启动按钮使用命令:
python run_pipeline.py
在这里,你可以看到项目的文件结构-
![]()
这个结构帮助我们顺利运行项目,就像一个结构良好的工作区帮助你有效地创建项目一样。
3. 设置流水线

在组织项目和配置流水线之后,下一步是执行流水线。现在,你可能会有一个问题:什么是流水线?流水线是一组自动化步骤,用于从开发到生产流程中部署、监控和管理机器学习模型。通过运行’zenml up’命令来实现,它就像是生产线的电源开关。它确保在数据科学项目中定义的所有步骤按照正确的顺序执行,启动整个工作流程,从数据摄取和清洗到模型训练和评估。
数据清洗
在‘Model’文件夹中,你会找到一个名为‘data_cleaning’的文件,这个文件负责数据清洗。在这个文件中,你会发现以下内容——
– Column Cleanup:一个专门用于识别和删除数据集中不必要列的部分,使数据集更有序,更容易找到你需要的内容。
– DataDivideStretegy类:这个类帮助我们有效地划分数据。就像计划如何为你的项目安排材料一样。
class DataDivideStrategy(DataStrategy):
"""
将数据划分为训练集和测试集的数据划分策略。
"""
def handle_data(self, data: pd.DataFrame) -> Union[pd.DataFrame, pd.Series]:
"""
将数据划分为训练集和测试集。
"""
try:
# 假设“Efficiency”是你的目标变量
# 将特征(X)和目标变量(y)从数据集中分离出来
X = data.drop("Efficiency", axis=1)
y = data["Efficiency"]
# 将数据按80-20的比例划分为训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# 返回划分后的数据集
return X_train, X_test, y_train, y_test
except Exception as e:
# 如果发生任何异常,记录错误信息
logging.error("Error in Divides the data into train and test data.".format(e))
raise e
- 它接受一个数据集,并将其分成训练集和测试集(80-20的比例),返回划分后的数据集。如果在这个过程中发生任何错误,它会记录一个错误信息。
- DataCleaning 类:’DataCleaning’类是一组规则和方法,用于确保我们的数据处于最佳状态。handle_data方法:这个方法就像一个多功能工具,可以根据不同的方式管理和操作数据,确保它为我们的项目的后续步骤做好准备。
- 我们的主要类是DataCleaning,它是DataPreProcessStrategy。在这个类中,我们清理我们的数据。
现在,我们继续讲解‘Steps’文件夹。在里面,有一个名为‘clean_data.py’的文件,专门用于数据清洗。以下是在这里发生的情况:
- 我们从 ‘data_cleaning’ 中导入了 ‘DataCleaning’、‘DataDivideStrategy’、‘DataPreProcesStrategy’。这就像从工具箱中获得正确的工具和材料,以便有效地继续你的项目。
import logging
from typing import Tuple
import pandas as pd
from model.data_cleaning import DataCleaning, DataDivideStrategy, DataPreProcessStrategy
from zenml import step
from typing_extensions import Annotated
@step
def clean_df(data: pd.DataFrame) -> Tuple[
Annotated[pd.DataFrame, 'X_train'],
Annotated[pd.DataFrame, 'X_test'],
Annotated[pd.Series, 'y_train'],
Annotated[pd.Series, 'y_test'],
]:
"""
数据清洗类,对数据进行预处理并将其划分为训练集和测试集。
Args:
data: pd.DataFrame
"""
try:
preprocess_strategy = DataPreProcessStrategy()
data_cleaning = DataCleaning(data, preprocess_strategy)
preprocessed_data = data_cleaning.handle_data()
divide_strategy = DataDivideStrategy()
data_cleaning = DataCleaning(preprocessed_data, divide_strategy)
X_train, X_test, y_train, y_test = data_cleaning.handle_data()
logging.info(f"Data Cleaning Complete")
return X_train, X_test, y_train, y_test
except Exception as e:
logging.error(e)
raise e
- 首先,它导入了必要的库和模块,包括logging、pandas和各种数据清洗策略。
- @step 装饰器将一个函数标记为机器学习管道中的一个步骤。这个步骤接受一个DataFrame,对其进行预处理,并将其划分为训练集和测试集。
- 这个步骤使用了数据清洗和分割策略,记录了这个过程,并按指定的数据类型返回拆分的数据。例如,我们的X_train和X_test是DataFrame,y_test和y_train是Series。
创建一个简单的线性回归模型
现在,让我们谈谈在‘model’文件夹中创建的model_dev文件。在这个文件中,我们主要关注构建机器学习模型。
- 简单线性回归模型:在这个文件中,我们创建了一个简单的线性回归模型。我们的主要目标是关注MLOps,而不是构建一个复杂的模型。就像构建您的MLOps项目的基本原型。
这种结构化的方法确保我们拥有一个清洁和有组织的数据清洗过程,而且我们的模型开发遵循一个清晰的蓝图,将重点放在MLOps的效率上,而不是构建一个复杂的模型。将来,我们将更新我们的模型。
import loggingfrom abc import ABC, abstractmethodimport pandas as pdfrom sklearn.linear_model import LinearRegressionfrom typing importDictimport optuna # 导入optuna库# 其余的代码...classModel(ABC): """ 所有模型的抽象基类。 """ @abstractmethoddeftrain(self, X_train, y_train): """ 在给定的数据上训练模型。 参数: x_train: 训练数据 y_train: 目标数据 """passclassLinearRegressionModel(Model): """ 实现模型接口的LinearRegressionModel。 """deftrain(self, X_train, y_train, **kwargs): try: reg = LinearRegression(**kwargs) # 创建一个线性回归模型 reg.fit(X_train, y_train) # 将模型拟合到训练数据上 logging.info('训练完成') # 记录一条指示训练已完成的消息return reg # 返回训练好的模型except Exception as e: logging.error("训练模型时发生错误: {}".format(e)) # 如果发生异常,记录一条错误消息raise e # 抛出异常以供进一步处理
Model Development中‘model_train.py’的改进
在‘model_train.py’文件中,我们对项目进行了几个重要的添加:
导入线性回归模型:我们从‘model.mode_dev’导入‘LinearRegressionModel’。它帮助我们构建项目。我们的‘model_train.py’文件设置为与这种特定类型的机器学习模型一起使用。
def train_model( X_train: pd.DataFrame, X_test: pd.DataFrame, y_train: pd.Series, y_test: pd.Series, config: ModelNameConfig,) -> RegressorMixin: """ 根据指定的配置训练回归模型。 参数: X_train (pd.DataFrame): 训练数据特征。 X_test (pd.DataFrame): 测试数据特征。 y_train (pd.Series): 训练数据目标。 y_test (pd.Series): 测试数据目标。 config (ModelNameConfig): 模型配置。 返回: RegressorMixin: 训练好的回归模型。 """ try: model = None # 检查配置中指定的模型 if config.model_name == "linear_regression": # 启用MLflow自动日志记录 autolog() # 创建一个LinearRegressionModel的实例 model = LinearRegressionModel() # 在训练数据上训练模型 trained_model = model.train(X_train, y_train) # 返回训练好的模型 return trained_model else: # 如果模型名不受支持,抛出错误 raise ValueError("不支持的模型名") except Exception as e: # 记录并抛出在模型训练过程中发生的任何异常 logging.error(f"训练模型时发生错误: {e}") raise e
这段代码根据选择的配置训练回归模型(例如线性回归)。它检查所选模型是否受支持,使用MLflow进行日志记录,使用提供的数据训练模型,并返回训练好的模型。如果选择的模型不受支持,它将抛出一个错误。
方法‘Train Model’:‘model_train.py’文件定义了一个名为‘train_model’的方法,它返回一个‘LinearRegressionModel’。
导入RegressorMixin:我们从sklearn.base导入‘RegressorMixin’。RegressorMixin是scikit-learn中提供回归估计器的公共接口的类。sklearn.base是Scikit-Learn库的一部分,用于构建和使用机器学习模型。
配置模型设置和性能评估
在‘Steps’文件夹中创建‘config.py’:在‘steps’文件夹中,我们创建一个名为‘config.py’的文件。这个文件包含一个叫做‘ModelNameConfig’的类。‘ModelNameConfig’是‘config.py’文件中的一个类,它作为机器学习模型的配置指南。它指定了模型的各种设置和选项。
# 从ZenML导入所需的类以配置模型参数
from zenml.steps import BaseParameters
# 定义一个名为ModelNameConfig的类,该类继承自BaseParameters
class ModelNameConfig(BaseParameters):
"""
模型配置:
"""
# 使用默认值定义模型配置的属性
model_name: str = "linear_regression" # 机器学习模型的名称
fine_tuning: bool = False # 启用进行微调的标志
- 它允许您选择模型的名称以及是否进行微调。微调类似于对已经工作的机器学习模型进行微小的调整,以在特定任务上提高性能。
- 评估:在’src’或’model’文件夹中,我们创建一个名为’evaluation.py’的文件。该文件包含一个名为’evaluation’的抽象类和一个名为’calculate_score’的方法。这些是我们用来评估机器学习模型性能的工具。
- 评估策略:我们介绍了特定的评估策略,比如均方误差(MSE)。每个策略类都包含一个’calculate_score’方法,用于评估模型的性能。
- 在“步骤”中实现评估:我们在’steps’文件夹中的’evaluation.py’中实现这些评估策略。这就像在我们的项目中设置质量控制流程。
使用“评估模型”方法量化模型性能
方法’Evaluate Model’:在’steps’文件夹中的’evaluation.py’中,我们创建一个名为’evaluate_model’的方法,该方法返回R平方(R2)分数和均方根误差(RMSE)等性能指标。
@step(experiment_tracker=experiment_tracker.name)def evaluate_model(
model: RegressorMixin, X_test: pd.DataFrame, y_test: pd.Series) -> Tuple[Annotated[float, "r2"],
Annotated[float, "rmse"],]:
"""
使用多种指标评估机器学习模型的性能并记录结果。
参数:
model: RegressorMixin - 要评估的机器学习模型。
X_test: pd.DataFrame - 测试数据集的特征值。
y_test: pd.Series - 测试数据集的实际目标值。
返回:
Tuple[float, float] - 包含R2分数和RMSE的元组。
"""
try:
# 使用模型进行预测
prediction = model.predict(X_test)
# 使用MSE类计算均方误差(MSE)
mse_class = MSE()
mse = mse_class.calculate_score(y_test, prediction)
mlflow.log_metric("mse", mse)
# 使用R2Score类计算R2分数
r2_class = R2()
r2 = r2_class.calculate_score(y_test, prediction)
mlflow.log_metric("r2", r2)
# 使用RMSE类计算均方根误差(RMSE)
rmse_class = RMSE()
rmse = rmse_class.calculate_score(y_test, prediction)
mlflow.log_metric("rmse", rmse)
return r2, rmse # 返回R2分数和RMSE
except Exception as e:
logging.error("evaluation中的错误".format(e))
raise e
在’model_train.py’、’config.py’和’evaluation.py’中进行的这些添加,通过引入机器学习模型训练、配置和全面评估,增强了我们的项目,确保项目符合高质量标准。
运行流水线
接下来,我们更新’training_pipeline’文件以成功运行流水线;ZenML是一个用于简化和标准化机器学习工作流程管理的开源MLOps框架。要查看您的流水线,您可以使用此命令’zenml up’。
![]()
现在,我们继续实现实验跟踪器并部署模型:
- 导入MLflow:在’model_train.py’文件中,我们导入’mlflow’。MLflow是一种多功能工具,帮助我们管理机器学习模型的生命周期,跟踪实验并保持每个项目的详细记录。
- 实验跟踪器:现在,您可能会有一个问题:什么是实验跟踪器?实验跟踪器是一种用于监视和组织实验的系统,使我们能够记录项目的进展情况。在我们的代码中,我们通过’zenml.client’和’mlflow’来访问实验跟踪器,确保我们能够有效地管理我们的实验。您可以查看’model_train.py’代码以更好地理解。
- 使用MLflow进行自动记录:我们使用’mlflow.sklearn’中的’autolog’功能自动记录机器学习模型性能的各个方面。这简化了实验跟踪过程,为我们了解模型的性能提供了有价值的见解。
- 记录指标:我们在’evaluation.py’文件中使用’mlflow.log_metric’记录特定的指标,如均方误差(MSE)。这样,我们可以在项目进行过程中追踪模型的性能。
![]()
如果您正在运行“run_deployment.py”脚本,则必须使用ZenML安装一些集成。现在,集成帮助连接您的模型与部署环境,您可以在其中部署模型。
Zenml集成
Zenml与MLOps工具实现了集成。通过运行以下命令,我们必须安装Zenml与MLflow的集成,这是一个非常重要的步骤:
要创建此集成,您需要使用此命令:
zenml integration install mlflow -y
此集成帮助我们有效地管理实验。
实验追踪
实验追踪是MLOps的关键方面。我们使用Zenml和MLflow监视、记录和管理机器学习实验的所有方面,促进高效的实验和可重复性。
注册实验跟踪器:
zenml experiment-tracker register mlflow_tracker --flavor=mlflow
注册模型部署器:
zenml model-deployer register mlflow --flavor=mlflow
Stack:
zenml stack register mlflow_stack -a default -o default -d mlflow -e mlflow_tracker --set
部署
部署是我们流程的最后一步,也是我们项目的重要组成部分。我们的目标不仅是构建模型,我们希望将模型部署在互联网上,以便用户可以使用它。
部署流程配置:您在名为‘deployment_pipeline.py’的Python文件中定义了一个部署流程,该流程管理部署任务。
部署触发器:有一个名为‘deployment_trigger’的步骤
class DeploymentTriggerConfig(BaseParameters): min_accuracy = 0 @step(enable_cache=False)def dynamic_importer() -> str: """Downloads the latest data from a mock API.""" data = get_data_for_test() return data
该代码定义了一个名为`DeploymentTriggerConfig`的类,具有最小准确度参数。在这种情况下,它为零。它还定义了一个管道步骤`dynamic_importer`,该步骤禁用了缓存,从模拟API下载数据。
预测服务加载器
`prediction_service_loader’步骤检索由部署流程启动的预测服务。它用于管理和与部署的模型进行交互。
def prediction_service_loader( pipeline_name: str, pipeline_step_name: str, running: bool = True, model_name: str = "model",) -> MLFlowDeploymentService: """获取由部署管道启动的预测服务。 参数: pipeline_name:部署MLflow预测服务器的管道名称 step_name:部署MLflow预测服务器的步骤名称 running:设置此标志时,该步骤只返回正在运行的服务 model_name:部署的模型名称 """ #获取MLflow模型部署器的堆栈组件 mlflow_model_deployer_component = MLFlowModelDeployer.get_active_model_deployer() #根据相同的管道名称、步骤名称和模型名称获取现有服务 existing_services = mlflow_model_deployer_component.find_model_server( pipeline_name=pipeline_name, pipeline_step_name = pipeline_step_name, model_name=model_name, running=running, ) if not existing_services: raise RuntimeError( f"在管道名为{pipeline_name}的{pipeline_step_name}步骤中" f"当前未运行已部署MLflow预测服务的'{model_name}'模型。“ ) return existing_services[0]
该代码定义了一个函数`prediction_service_loader`,用于检索由部署流程启动的预测服务。
- 它接受输入,如管道名称、步骤名称和模型名称。
- 该函数检查符合这些参数的现有服务,并返回找到的第一个服务。如果找不到任何服务,将会引发错误。
预测器
“预测器”步骤针对预测服务运行推理请求。它处理传入数据并返回预测结果。
@stepdef predictor( service: MLFlowDeploymentService, data: str,) -> np.ndarray: """针对预测服务运行推理请求""" service.start(timeout=10) # 如果已经启动,则应该不起作用 data = json.loads(data) # 将输入数据从JSON字符串解析为Python字典 data.pop("columns") data.pop("index") columns_for_df = [ # 定义用于创建DataFrame的列名列表 "加速度", "最高速度", "续航里程", "快速充电速度", "英国价格", "德国价格", ] df = pd.DataFrame(data["data"], columns=columns_for_df) json_list = json.loads(json.dumps(list(df.T.to_dict().values()))) data = np.array(json_list) # 将JSON列表转换为NumPy数组 prediction = service.predict(data) return prediction
- 这段代码定义了一个名为`predictor`的函数,用于使用通过MLFlow部署的ML模型进行预测。它启动服务,处理来自JSON格式的输入数据,将其转换为NumPy数组,并返回模型的预测结果。这个函数处理与电动车相关的特定特征的数据。
部署执行:您有一个名为‘run_deployment.py’的脚本,可用于触发部署过程。该脚本接受‘–config’参数。`–config`参数用于通过命令行指定程序的配置文件或设置,可以将其设置为‘deploy’以部署模型,‘predict’以进行预测,或‘deploy_and_predict’以同时进行。
部署状态和交互:该脚本还提供有关MLflow预测服务器状态的信息,包括如何启动和停止它。它使用MLFlow进行模型部署。
最小准确度阈值:可以指定‘min_accuracy’参数来设置模型部署的最小准确度阈值。如果满足该值,模型将会被部署。
Docker配置:Docker用于管理部署环境,并且您已在部署流水线中定义了Docker设置。
这个部署过程似乎专注于以一种可控和可配置的方式部署机器学习模型并进行预测。
- 部署我们的模型就像运行‘run_deployment.py’脚本一样简单。使用以下命令:
python3 run_deployment.py --config deploy
预测
一旦我们的模型部署完成,就可以用于进行预测了。
- 运行预测:使用以下命令执行预测-
python3 run_deployment.py --config predict
Streamlit应用
Streamlit应用提供了一个用户友好的界面来与我们模型的预测进行交互。Streamlit简化了交互式、基于Web的数据科学应用的创建过程,使用户可以轻松地探索和理解模型的预测结果。同样,您可以在GitHub上找到Streamlit应用程序的代码。
- 使用以下命令启动Streamlit应用程序:streamlit run streamlit_app.py
通过这样做,您可以探索和与我们模型的预测进行交互。
- Streamlit应用使我们模型的预测变得用户友好且在线可访问;用户可以轻松地与结果进行交互和理解。在这里,您可以看到Streamlit应用在Web上的展示图片-
![]()
结论
在本文中,我们深入研究了一个令人兴奋的项目,展示了MLOps在预测电动车效率方面的强大能力。我们了解了Zenml和MLFlow,在创建端到端机器学习流程中的关键作用。我们还探讨了数据收集过程、问题陈述以及准确预测电动车效率的解决方案。
该项目突出了高效电动车的重要性,以及如何利用MLOps创建精确的预测模型。我们涵盖了设置虚拟环境、模型开发、配置模型设置和评估模型性能等重要步骤。文章最后强调了实验跟踪、部署和用户通过Streamlit应用程序交互的重要性。通过这个项目,我们离塑造电动车的未来又近了一步。
主要要点
- 无缝集成:通过“Zenml与终端到终端预测电动车能效管道”项目展示了数据收集、模型训练、评估和部署的无缝集成。这凸显了MLOps在重塑电动车行业方面的巨大潜力。
- GitHub项目:如需进一步了解,您可以在GitHub上访问该项目:GitHub项目。
- MLOps课程:为了加深对MLOps的理解,我们推荐观看我们全面的课程:MLOps课程。
- 该项目展示了MLOps在重塑电动车行业方面的潜力,提供了宝贵的见解,并为更绿色的未来做出了贡献。
常见问题
本文中显示的媒体不是Analytics Vidhya拥有的,并根据作者的自由裁量使用。