Press "Enter" to skip to content

在 Kafka 流上训练机器学习模型

使用由Kafka生产者生成的训练数据在线和准实时更新机器学习模型

Jonathan Borba在Unsplash上的照片

最近,我对在线机器学习越来越感兴趣——即在生产环境中更新ML模型的权重。除了对我提供有趣的架构挑战外,这种方法还有巨大的潜在收益。这项来自Grubhub的2021年研究表明,利用在线学习可以实现+20%的指标增长和45倍的成本节约,而我一直致力于省钱赚钱。

有状态训练 — Chip Huyen的图像,已获得许可
在线学习 — 作者提供的图像

然而,从实际角度来看,处理数据流和流式架构对于ML从业者来说仍然是相当新的。除了创建实时的训练数据流之外,在使用这样的数据源来在线更新模型方面的资源也很少。在本文中,我将演示:

  • 设置Kafka实例
  • 创建生成训练数据的生产者
  • 创建使用该训练数据来更新ML模型的消费者

使用Docker运行Kafka

我使用docker-compose在本地使用Kafka的首选方法。如果您的环境中尚未安装,可以按照此处的说明进行安装。

Shuyi Yang的文章提供了关于此方法的高级概述,我们可以使用类似的docker-compose.yaml文件创建本地的Kafka和Zookeeper实例,并将Kafka暴露在9092端口上:

version: '3'services:  zookeeper:    image: wurstmeister/zookeeper:3.4.6    ports:     - "2181:2181"  kafka:    depends_on:       - zookeeper    image: wurstmeister/kafka    ports:     - "9092:9092"    expose:     - "9093"    environment:      KAFKA_ADVERTISED_LISTENERS: INSIDE://kafka:9093,OUTSIDE://localhost:9092      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT      KAFKA_LISTENERS: INSIDE://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092      KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181      KAFKA_CREATE_TOPICS: "ml_training_data:1:1"    volumes:     - /var/run/docker.sock:/var/run/docker.sock

它还创建了一个名为ml_training_data的Kafka主题,我们稍后将使用它。您可以通过转到包含上面文件的目录并运行以下命令来运行该文件:

docker-compose up

用于训练数据的Kafka生产者

首先,让我们安装我们需要使用的Python库:

python -m pip install kafka-python river  

接下来,我们需要创建一个人工训练数据源,该源被写入我们的Kafka主题。为此,我们将使用River Python库,它具有易于使用的流数据API:

from time import sleepfrom json import dumpsimport randomfrom river import datasetsfrom kafka import KafkaProducer# 创建连接到Kafka端口9092的Kafka产品producer = KafkaProducer(    bootstrap_servers=["localhost:9092"],    value_serializer=lambda x: dumps(x).encode("utf-8"),)# 初始化River网络钓鱼数据集。# 该数据集包含被分类为网络钓鱼或非网络钓鱼的网页的特征。dataset = datasets.Phishing()# 一次发送一个观察到Kafka主题,并进行随机休眠for x, y in dataset:    print(f"Sending: {x, y}")    data = {"x": x, "y": y}    producer.send("ml_training_data", value=data)    sleep(random.random())

上面的代码使用玩具河钓鱼数据集(CC BY 4.0),将标记的数据观察结果逐个发送到我们的Kafka主题。该数据集包含被分类为钓鱼或非钓鱼的网页功能。数据集中的样本是这样的元组:

[({'empty_server_form_handler': 0.0,   'popup_window': 0.0,   'https': 0.0,   'request_from_other_domain': 0.0,   'anchor_from_other_domain': 0.0,   'is_popular': 0.5,   'long_url': 1.0,   'age_of_domain': 1,   'ip_in_url': 1},  True), ({'empty_server_form_handler': 1.0,   'popup_window': 0.0,   'https': 0.5,   'request_from_other_domain': 0.5,   'anchor_from_other_domain': 0.0,   'is_popular': 0.5,   'long_url': 0.0,   'age_of_domain': 1,   'ip_in_url': 0},  True)]

首先运行生产者:

python producer.py

然后您应该在控制台中看到以下内容:

Sending: ({'empty_server_form_handler': 1.0, 'popup_window': 0.5, 'https': 1.0, 'request_from_other_domain': 1.0, 'anchor_from_other_domain': 0.5, 'is_popular': 0.5, 'long_url': 0.0, 'age_of_domain': 1, 'ip_in_url': 1}, False)Sending: ({'empty_server_form_handler': 0.0, 'popup_window': 0.5, 'https': 0.0, 'request_from_other_domain': 0.0, 'anchor_from_other_domain': 0.0, 'is_popular': 0.5, 'long_url': 0.0, 'age_of_domain': 1, 'ip_in_url': 0}, True)Sending: ({'empty_server_form_handler': 1.0, 'popup_window': 1.0, 'https': 1.0, 'request_from_other_domain': 0.0, 'anchor_from_other_domain': 1.0, 'is_popular': 0.0, 'long_url': 0.5, 'age_of_domain': 1, 'ip_in_url': 0}, False)Sending: ({'empty_server_form_handler': 0.5, 'popup_window': 0.0, 'https': 0.0, 'request_from_other_domain': 0.5, 'anchor_from_other_domain': 1.0, 'is_popular': 0.5, 'long_url': 1.0, 'age_of_domain': 0, 'ip_in_url': 0}, True)Sending: ({'empty_server_form_handler': 0.0, 'popup_window': 0.0, 'https': 1.0, 'request_from_other_domain': 1.0, 'anchor_from_other_domain': 0.0, 'is_popular': 1.0, 'long_url': 0.0, 'age_of_domain': 0, 'ip_in_url': 0}, True)Sending: ({'empty_server_form_handler': 1.0, 'popup_window': 1.0, 'https': 1.0, 'request_from_other_domain': 0.5, 'anchor_from_other_domain': 0.0, 'is_popular': 1.0, 'long_url': 1.0, 'age_of_domain': 0, 'ip_in_url': 0}, False)

用于训练ML模型的Kafka消费者

编写一个简单的Kafka消费者将让我们在流中推送数据时读取它,并使用它来更新我们模型的权重。

from json import loadsfrom time import sleepfrom kafka import KafkaConsumerfrom river import linear_modelfrom river import composefrom river import preprocessingfrom river import metrics# 评估指标使用rocaucmetric = metrics.ROCAUC()# 创建一个带有标量器的简单LR模型model = compose.Pipeline(    preprocessing.StandardScaler(), linear_model.LogisticRegression())# 创建我们的Kafka消费者consumer = KafkaConsumer(    "ml_training_data",    bootstrap_servers=["localhost:9092"],    auto_offset_reset="earliest",    enable_auto_commit=True,    group_id="my-group-id",    value_deserializer=lambda x: loads(x.decode("utf-8")),)# 使用每个事件来更新我们的模型并打印指标for event in consumer:    event_data = event.value    try:        x = event_data["x"]        y = event_data["y"]        y_pred = model.predict_proba_one(x)        model.learn_one(x, y)        metric.update(y, y_pred)        print(metric)    except:        print("Processing bad data...")

上述代码使用River的LogisticRegression类初始化了一个简单的ML模型。然后,我们不断处理事件并使用它们来更新我们的ML模型——为每个添加的样本打印ROCAUC指标。

要开始训练,请运行:

python consumer.py

当模型逐个学习观察结果时,您应该在控制台中看到类似以下内容的内容!

ROCAUC: 87.12%ROCAUC: 87.29%ROCAUC: 87.42%ROCAUC: 87.29%ROCAUC: 87.42%

结论

在实时或准实时标记数据可用于实时决策的模型的领域中,连续训练和在线学习都具有巨大潜力。所有代码和说明都可在此Github存储库中找到。更多内容即将推出!

Leave a Reply

Your email address will not be published. Required fields are marked *