使用由Kafka生产者生成的训练数据在线和准实时更新机器学习模型
最近,我对在线机器学习越来越感兴趣——即在生产环境中更新ML模型的权重。除了对我提供有趣的架构挑战外,这种方法还有巨大的潜在收益。这项来自Grubhub的2021年研究表明,利用在线学习可以实现+20%的指标增长和45倍的成本节约,而我一直致力于省钱赚钱。
然而,从实际角度来看,处理数据流和流式架构对于ML从业者来说仍然是相当新的。除了创建实时的训练数据流之外,在使用这样的数据源来在线更新模型方面的资源也很少。在本文中,我将演示:
- 设置Kafka实例
- 创建生成训练数据的生产者
- 创建使用该训练数据来更新ML模型的消费者
使用Docker运行Kafka
我使用docker-compose
在本地使用Kafka的首选方法。如果您的环境中尚未安装,可以按照此处的说明进行安装。
Shuyi Yang的文章提供了关于此方法的高级概述,我们可以使用类似的docker-compose.yaml
文件创建本地的Kafka和Zookeeper实例,并将Kafka暴露在9092端口上:
version: '3'services: zookeeper: image: wurstmeister/zookeeper:3.4.6 ports: - "2181:2181" kafka: depends_on: - zookeeper image: wurstmeister/kafka ports: - "9092:9092" expose: - "9093" environment: KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092 KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_CREATE_TOPICS: "ml_training_data:1:1" volumes: - /var/run/docker.sock:/var/run/docker.sock
它还创建了一个名为ml_training_data
的Kafka主题,我们稍后将使用它。您可以通过转到包含上面文件的目录并运行以下命令来运行该文件:
docker-compose up
用于训练数据的Kafka生产者
首先,让我们安装我们需要使用的Python库:
python -m pip install kafka-python river
接下来,我们需要创建一个人工训练数据源,该源被写入我们的Kafka主题。为此,我们将使用River Python库,它具有易于使用的流数据API:
from time import sleepfrom json import dumpsimport randomfrom river import datasetsfrom kafka import KafkaProducer# 创建连接到Kafka端口9092的Kafka产品producer = KafkaProducer( bootstrap_servers=["localhost:9092"], value_serializer=lambda x: dumps(x).encode("utf-8"),)# 初始化River网络钓鱼数据集。# 该数据集包含被分类为网络钓鱼或非网络钓鱼的网页的特征。dataset = datasets.Phishing()# 一次发送一个观察到Kafka主题,并进行随机休眠for x, y in dataset: print(f"Sending: {x, y}") data = {"x": x, "y": y} producer.send("ml_training_data", value=data) sleep(random.random())
上面的代码使用玩具河钓鱼数据集(CC BY 4.0),将标记的数据观察结果逐个发送到我们的Kafka主题。该数据集包含被分类为钓鱼或非钓鱼的网页功能。数据集中的样本是这样的元组:
[({'empty_server_form_handler': 0.0, 'popup_window': 0.0, 'https': 0.0, 'request_from_other_domain': 0.0, 'anchor_from_other_domain': 0.0, 'is_popular': 0.5, 'long_url': 1.0, 'age_of_domain': 1, 'ip_in_url': 1}, True), ({'empty_server_form_handler': 1.0, 'popup_window': 0.0, 'https': 0.5, 'request_from_other_domain': 0.5, 'anchor_from_other_domain': 0.0, 'is_popular': 0.5, 'long_url': 0.0, 'age_of_domain': 1, 'ip_in_url': 0}, True)]
首先运行生产者:
python producer.py
然后您应该在控制台中看到以下内容:
Sending: ({'empty_server_form_handler': 1.0, 'popup_window': 0.5, 'https': 1.0, 'request_from_other_domain': 1.0, 'anchor_from_other_domain': 0.5, 'is_popular': 0.5, 'long_url': 0.0, 'age_of_domain': 1, 'ip_in_url': 1}, False)Sending: ({'empty_server_form_handler': 0.0, 'popup_window': 0.5, 'https': 0.0, 'request_from_other_domain': 0.0, 'anchor_from_other_domain': 0.0, 'is_popular': 0.5, 'long_url': 0.0, 'age_of_domain': 1, 'ip_in_url': 0}, True)Sending: ({'empty_server_form_handler': 1.0, 'popup_window': 1.0, 'https': 1.0, 'request_from_other_domain': 0.0, 'anchor_from_other_domain': 1.0, 'is_popular': 0.0, 'long_url': 0.5, 'age_of_domain': 1, 'ip_in_url': 0}, False)Sending: ({'empty_server_form_handler': 0.5, 'popup_window': 0.0, 'https': 0.0, 'request_from_other_domain': 0.5, 'anchor_from_other_domain': 1.0, 'is_popular': 0.5, 'long_url': 1.0, 'age_of_domain': 0, 'ip_in_url': 0}, True)Sending: ({'empty_server_form_handler': 0.0, 'popup_window': 0.0, 'https': 1.0, 'request_from_other_domain': 1.0, 'anchor_from_other_domain': 0.0, 'is_popular': 1.0, 'long_url': 0.0, 'age_of_domain': 0, 'ip_in_url': 0}, True)Sending: ({'empty_server_form_handler': 1.0, 'popup_window': 1.0, 'https': 1.0, 'request_from_other_domain': 0.5, 'anchor_from_other_domain': 0.0, 'is_popular': 1.0, 'long_url': 1.0, 'age_of_domain': 0, 'ip_in_url': 0}, False)
用于训练ML模型的Kafka消费者
编写一个简单的Kafka消费者将让我们在流中推送数据时读取它,并使用它来更新我们模型的权重。
from json import loadsfrom time import sleepfrom kafka import KafkaConsumerfrom river import linear_modelfrom river import composefrom river import preprocessingfrom river import metrics# 评估指标使用rocaucmetric = metrics.ROCAUC()# 创建一个带有标量器的简单LR模型model = compose.Pipeline( preprocessing.StandardScaler(), linear_model.LogisticRegression())# 创建我们的Kafka消费者consumer = KafkaConsumer( "ml_training_data", bootstrap_servers=["localhost:9092"], auto_offset_reset="earliest", enable_auto_commit=True, group_id="my-group-id", value_deserializer=lambda x: loads(x.decode("utf-8")),)# 使用每个事件来更新我们的模型并打印指标for event in consumer: event_data = event.value try: x = event_data["x"] y = event_data["y"] y_pred = model.predict_proba_one(x) model.learn_one(x, y) metric.update(y, y_pred) print(metric) except: print("Processing bad data...")
上述代码使用River的LogisticRegression
类初始化了一个简单的ML模型。然后,我们不断处理事件并使用它们来更新我们的ML模型——为每个添加的样本打印ROCAUC指标。
要开始训练,请运行:
python consumer.py
当模型逐个学习观察结果时,您应该在控制台中看到类似以下内容的内容!
ROCAUC: 87.12%ROCAUC: 87.29%ROCAUC: 87.42%ROCAUC: 87.29%ROCAUC: 87.42%
结论
在实时或准实时标记数据可用于实时决策的模型的领域中,连续训练和在线学习都具有巨大潜力。所有代码和说明都可在此Github存储库中找到。更多内容即将推出!