Press "Enter" to skip to content

通过物理信息引导的DeepONet运算符学习:让我们从零开始实现它

深入研究DeepONets、物理信息神经网络和物理信息DeepONets

图1. ODE/PDE广泛用于描述系统过程。在许多情况下,这些ODE/PDE接受一个函数(例如,强制函数u(t))作为输入,并输出另一个函数(例如,s(t))。传统上,使用数值求解器来连接输入和输出。最近,神经算子被开发出来解决相同的问题,但效率更高。(作者提供的图像)

常微分方程和偏微分方程(ODEs/PDEs)是科学和工程中许多学科的基础,从物理学和生物学到经济学和气候科学。它们是描述物理系统和过程的基本工具,捕捉数量随时间和空间的连续变化。

然而,这些方程的一个独特特点是它们不仅仅接受单个值作为输入,还接受函数。例如,考虑预测建筑物由于地震而产生的振动的情况。地面的震动随时间变化,可以表示为作为输入的函数,该函数作为描述建筑物运动的微分方程的输入。类似地,在音乐厅中传播的声波的情况下,乐器产生的声波可以作为具有随时间变化的音量和音高的输入函数。这些变化的输入函数从根本上影响结果的输出函数,即建筑物的振动和音乐厅中的声场。

传统上,这些ODEs/PDEs使用有限差分或有限元方法等数值求解器来处理。然而,这些方法面临一个瓶颈:对于每个新的输入函数,求解器必须重新运行。这个过程可能在复杂的系统或高维输入的情况下计算密集且缓慢。

为了解决这个挑战,Lu等人于2019年引入了一种新的框架:深度算子网络(Deep Operator Network),或称为DeepONet。DeepONets旨在学习将输入函数映射到输出函数的算子,从而学习预测这些ODEs/PDEs的输出,而无需每次重新运行数值求解器。

但是,尽管DeepONets功能强大,但它也继承了数据驱动方法所面临的常见问题:我们如何确保网络的预测与包含在控制方程中的已知物理规律一致?

这就是物理信息学习的作用。

物理信息学习是机器学习的一个快速发展的分支,它将物理原理与数据科学相结合,以增强对复杂物理系统的建模和理解。它利用特定领域的知识和物理规律来指导学习过程,提高机器学习模型的准确性、泛化能力和可解释性。

在这个框架下,Wang等人于2021年引入了DeepONets的一个新变体:物理信息DeepONet。这种创新方法基于DeepONets的基础,将我们对物理规律的理解纳入学习过程中。我们不再只要求模型从数据中学习;我们使用几个世纪的科学研究得出的原则来指导它。

这似乎是一个非常有前途的方法!但是我们应该如何在实践中实施它呢?这正是我们今天要探索的内容🤗

在本文中,我们将讨论物理信息DeepONet背后的理论,并介绍如何从头开始实现它。我们还将将我们开发的模型投入实际并通过一个实际案例研究展示其能力。

让我们开始吧!

目录 · 1. 案例研究 · 2. 物理信息DeepONet ∘ 2.1 DeepONet:概述 ∘ 2.2 物理信息神经网络(PINNs) ∘ 2.3 物理信息DeepONet · 3. 物理信息DeepONet的实现 ∘ 3.1 定义架构 ∘ 3.2 定义ODE损失 ∘ 3.3 定义梯度下降步骤 · 4. 数据生成和组织 ∘ 4.1 生成u(·)配置文件 ∘ 4.2 生成数据集 ∘ 4.3 数据集组织 · 5. 训练物理信息DeepONet · 6. 结果讨论 · 7. 收获 · 参考文献

1. 案例研究

让我们通过一个具体的例子来阐述我们的讨论。在本博客中,我们将重现王等人原始论文中考虑的第一个案例研究,即以下普通微分方程(ODE)描述的初值问题:

通过物理信息引导的DeepONet运算符学习:让我们从零开始实现它 四海 第2张

其中初始条件为 s(0) = 0。

在这个方程中,u( t ) 是随时间变化的输入函数,s( t ) 是我们感兴趣的系统在时间 t 的状态,我们希望预测。在物理场景中,u( t ) 可以表示施加在系统上的力,而 s( t ) 可能表示系统的响应,如位移或速度,具体取决于上下文。 我们的目标是学习输入函数 u( t ) 和 ODE 解 s( t ) 之间的映射关系。

传统的数值方法,如欧拉法或龙格-库塔方法,可以有效地解决这个方程。然而,注意到输入函数 u( t ) 可以采用不同的形态,如下图所示:

图2. u(t) 的示例轮廓。(图片由作者提供)

因此,每当 u( t ) 改变,我们都需要重新运行整个模拟,以获得相应的 s( t )(如图3所示),这可能在计算上是密集和低效的。

图3. 对应的 s(t) 轮廓。它们是使用 RK45 算法解决 ODE 计算的。(图片由作者提供)

那么,我们如何更高效地解决这类问题呢?

2. 物理信息 DeepONet

如前所述,物理信息 DeepONet 是解决我们目标问题的一种有前途的解决方案。在本节中,我们将详细介绍其基本概念,以使其更易于理解。

我们首先讨论原始 DeepONet 的基本原理。随后,我们将探讨物理信息神经网络的概念以及它如何为问题解决带来了一个附加维度。最后,我们将演示如何无缝地将这两个思想整合起来构建物理信息 DeepONet。

2.1 DeepONet 概述

DeepONet,即 Deep Operator Network,代表了深度学习的一个新领域。与将一组输入值映射到输出值的传统机器学习方法不同,DeepONet 旨在将整个函数映射到其他函数。这使得 DeepONet 在处理自然涉及函数输入和输出的问题时特别强大。那么它是如何实现这个目标的呢?

为了以符号形式表达我们想要实现的目标:

图4. 我们的目标是训练一个神经网络来近似映射力项 u(·) 到目标输出 s(·) 的算子,它们都是 t 的函数。(图片由作者提供)

在左边,我们有一个算子 G,它将输入函数 u(·) 映射到输出函数 s(·)。在右边,我们希望使用神经网络来近似算子 G。一旦实现了这一点,我们就可以使用已训练的神经网络对任何 u(·) 进行快速计算得到 s(·)。

对于当前案例研究,输入函数 u(·) 和输出函数 s(·) 都以时间坐标 t 作为唯一参数。因此,我们希望构建的神经网络的“输入”和“输出”应该如下所示:

图5. 我们希望训练的神经网络模型的输入和输出。(作者提供的图片)

基本上,我们的神经网络应该接受 u( t ) 的“整个剖面”作为第一个输入,以及特定的时间点 t 作为第二个输入。随后,它应该输出在时间点 t 上评估的目标输出函数 s(·),即 s( t )。

为了更好地理解这个设置,我们认识到 s( t ) 的值首先取决于 s(·) 的剖面,而 s(·) 的剖面又取决于 u(·),其次取决于 s(·) 在哪个时间点上被评估。这也是时间坐标 t 必须是输入之一的原因。

现在我们需要澄清两个问题:首先,我们应该如何将连续的 u(·) 剖面输入到网络中?其次,我们应该如何连接这两个输入,即 t 和 u(·)。

1️⃣ 我们应该如何将连续的 u(·) 剖面输入?

实际上,我们并不需要将其连续输入。一个简单的解决方案是将函数 u(·) 离散表示。更具体地说,我们只需在足够但有限的位置处评估 u(·) 值,然后将这些离散的 u(·) 值馈送到神经网络中:

图6. 在馈入神经网络之前,u(·) 剖面被离散化。(作者提供的图片)

这些位置在原始的 DeepONet 论文中被称为“传感器”。

2️⃣ 我们应该如何连接输入的 t 和 u(·)?

乍一看,我们可能想直接将它们连接在输入层。然而,事实证明这种朴素的方法不仅会对我们可以使用的神经网络类型施加约束,而且在实践中会导致次优的预测准确性。不过,有一种更好的方法。是时候介绍一下 DeepONet 了。

简而言之,DeepONet 提出了一种用于进行运算符学习的新的网络架构:它由两个主要组件组成:分支网络主干网络。分支网络接受离散函数值作为输入,并将其转换为特征向量。同时,主干网络接受坐标(在我们当前的案例研究中,坐标只是 t。对于偏微分方程,它将包括时间和空间坐标)并将其转换为具有相同维度的特征向量。然后,这两个特征向量通过点积进行合并,最终的结果被用作在输入坐标处评估的 s(·) 的预测。

图7. DeepONet 包含一个用于处理输入函数 u(·) 的分支网络和一个用于处理时间/空间坐标的主干网络。两个网络的输出具有相同的维度,并通过点积进行合并。在点积之后,还可以添加偏置项以进一步提高模型的表达能力。(作者提供的图片)

在原始的DeepONet论文中,作者指出这种“分而治之”的策略,通过独立的“分支”和“主干”网络来实现,受到了运算符通用逼近定理的启发,并且为运算符学习引入了强大的归纳偏差。这也是DeepONet成为有效解决方案的关键点,正如作者所声称的。

如果您对DeepONet的理论基础更感兴趣,请参考原始论文的附录A。

DeepONet的主要优势之一是其高效性。一旦训练完成,DeepONet可以实时推断出新输入函数的输出函数,无需进一步训练,只要新输入函数位于其训练范围内的输入函数。这使得DeepONet成为在需要实时推断的应用中强大的工具。

DeepONet的另一个显著优势在于其灵活性和多功能性。虽然主干和分支网络最常见的选择可能是全连接层,但DeepONet框架允许高度的架构定制。根据输入函数u(·)和坐标的特点,还可以使用各种神经网络架构,如CNN、RNN等。这种适应性使得DeepONet成为一种高度多功能的工具。

然而,尽管具有这些优势,DeepONet的局限性也很突出:作为一种纯数据驱动的方法,DeepONet不能保证其预测将遵循描述所考虑的物理系统的先前知识或控制方程。因此,DeepONet可能不会很好地推广,特别是面对位于其训练数据分布之外的输入函数,即所谓的超出分布(OOD)输入。对此的常见补救方法是简单地准备大量数据进行训练,但在实践中这并不总是可行的,特别是在科学和工程领域,数据采集可能很昂贵或耗时。

那么,我们应该如何解决这些局限性呢?是时候谈谈基于物理的学习,更具体地说,是基于物理的神经网络(PINNs)。

2.2 基于物理的神经网络(PINNs)

在传统的机器学习模型中,我们主要依赖数据来学习潜在的模式。然而,在许多科学和工程领域,我们有关于动力系统的控制方程(ODE/PDEs),这些方程捕捉到了我们关于该系统的先前知识,并且它们提供了除观察数据之外的另一个信息源。如果正确地合并这个额外的知识源,就可以提高模型的性能和泛化能力,特别是在处理有限或嘈杂数据时。这就是基于物理的学习发挥作用的地方。

当我们将物理学习的概念与神经网络相结合时,就会得到基于物理的神经网络(PINNs)。

PINNs是一种神经网络类型,网络训练不仅要拟合数据,还要遵守由微分方程描述的已知物理定律。这是通过引入ODE/PDE损失来实现的,该损失衡量了控制微分方程的违反程度。这样,我们将物理定律注入到网络训练过程中,使其具备物理信息。

图8. 基于物理的神经网络的损失函数包括PDE损失的贡献项,该损失有效地衡量了预测解是否满足控制微分方程。注意,由于自动微分的存在,可以轻松计算输出相对于输入的导数。(作者提供的图片)

尽管PINNs在许多应用中证明是有效的,但它们也有局限性。PINNs通常针对特定的输入参数进行训练(例如边界和初始条件、外部强迫等)。因此,每当输入参数发生变化时,我们都需要重新训练PINN。因此,在不同操作条件下进行实时推断时,它们并不特别有效。

还记得哪种方法是专门用于处理不同输入参数的吗?没错,就是DeepONet!是时候将物理信息学习的思想与DeepONet结合起来了。

2.3 物理信息 DeepONet

物理信息 DeepONet 的主要思想是将 DeepONet 和 PINN 的优势结合起来。与 DeepONet 一样,物理信息 DeepONet 能够将函数作为输入并产生函数作为输出。这使得它在实时推断新的输入函数时非常高效,无需重新训练。

另一方面,与 PINN 类似,物理信息 DeepONet 在其学习过程中融入已知的物理定律。这些定律在训练过程中作为额外的约束条件引入到损失函数中。这种方法使得模型能够在处理有限或嘈杂数据时做出具有物理一致性的预测。

我们如何实现这种整合?与 PINN 类似,我们增加了额外的损失项来衡量模型的预测与已知微分方程的一致性。通过优化这个损失函数,模型学习做出既符合数据又符合物理的预测。

Figure 10. 物理信息 DeepONet 使用 DeepONet 作为骨干架构,利用物理信息学习的概念来训练模型。这样,经过训练的物理信息 DeepONet 在数据和物理上都是一致的。 (图片由作者提供)

总之,物理信息 DeepONet 是一个强大的工具,结合了 DeepONet 的高效性和物理信息学习的准确性。它代表了在实时推断和物理一致性都至关重要的领域解决复杂问题的一种有希望的方法。

在下一节中,我们将开始进行案例研究,并将理论转化为实际代码。

3. 物理信息 DeepONet 的实现

在本节中,我们将讲解如何定义一个物理信息 DeepONet 模型来解决我们的目标案例研究。我们将在 TensorFlow 中实现它。让我们从导入必要的库开始:

import numpy as npimport matplotlib.pyplot as pltimport tensorflow as tffrom tensorflow import kerastf.random.set_seed(42)

3.1 定义架构

如前所述,物理信息 DeepONet 与原始的 DeepONet 共享相同的架构。下面的函数定义了 DeepONet 的架构:

def create_model(mean, var, verbose=False):    """定义具有全连接分支和主干层的 DeepONet。        Args:    ----    mean: 字典,输入的平均值    var: 字典,输入的方差值    verbose: 布尔值,指示是否显示模型摘要        Outputs:    --------    model: DeepONet 模型    """        # 分支网络    branch_input = tf.keras.Input(shape=(len(mean['forcing'])), name="forcing")    branch = tf.keras.layers.Normalization(mean=mean['forcing'], variance=var['forcing'])(branch_input)    for i in range(3):        branch = tf.keras.layers.Dense(50, activation="tanh")(branch)        # 主干网络    trunk_input = tf.keras.Input(shape=(len(mean['time'])), name="time")    trunk = tf.keras.layers.Normalization(mean=mean['time'], variance=var['time'])(trunk_input)       for i in range(3):        trunk = tf.keras.layers.Dense(50, activation="tanh")(trunk)        # 计算分支网络和主干网络的点积    dot_product = tf.reduce_sum(tf.multiply(branch, trunk), axis=1, keepdims=True)        # 添加偏置项    output = BiasLayer()(dot_product)        # 创建模型    model = tf.keras.models.Model(inputs=[branch_input, trunk_input], outputs=output)        if verbose:        model.summary()            return model   

在上面的代码中:

  1. 我们假设分支和主干网络都是全连接网络,具有 3 个隐藏层,每个隐藏层包含 50 个神经元,并具有 tanh 激活函数。该架构是基于初步测试选择的,并应作为此问题的良好起点。然而,可以很容易地用其他架构(例如 CNN、RNN 等)和其他层超参数替换它。
  2. 通过点积将分支网络和主干网络的输出合并。如原始 DeepONet 论文中建议的,我们添加了一个偏置项来提高预测准确性。`BiasLayer()` 是一个自定义类,用于实现这个目标:
class BiasLayer(tf.keras.layers.Layer):    def build(self, input_shape):        self.bias = self.add_weight(shape=(1,),                                    initializer=tf.keras.initializers.Zeros(),                                    trainable=True)    def call(self, inputs):        return inputs + self.bias

3.2 定义ODE损失

接下来,我们定义一个函数来计算ODE损失。回顾一下我们的目标ODE:

通过物理信息引导的DeepONet运算符学习:让我们从零开始实现它 四海 第2张

因此,我们可以定义如下函数:

@tf.functiondef ODE_residual_calculator(t, u, u_t, model):    """ODE残差计算。        参数:    ----    t:时间坐标    u:在离散时间坐标上评估的输入函数    u_t:在t上评估的输入函数    model:DeepONet模型        输出:    --------    ODE_residual:控制ODE的残差    """        with tf.GradientTape() as tape:        tape.watch(t)        s = model({"forcing": u, "time": t})        # 计算梯度    ds_dt = tape.gradient(s, t)        # ODE残差    ODE_residual = ds_dt - u_t        return ODE_residual

在上面的代码中:

  1. 我们使用tf.GradientTape()来计算s(·)对t的梯度。注意,在TensorFlow中,tf.GradientTape()被用作上下文管理器,任何在该上下文内执行的操作都将被记录下来。在这里,我们明确地监视变量t。因此,TensorFlow将自动跟踪涉及t的所有操作,这在这种情况下是DeepONet模型的正向运行。然后,我们使用tape的gradient()方法来计算s(·)对t的梯度。
  2. 我们增加了额外的输入参数u_t,它表示在t上评估的输入函数u(·)的值。这构成了我们目标ODE的右侧项,计算ODE残差损失需要用到它。
  3. 我们使用@tf.function装饰器将刚刚定义的常规Python函数转换为TensorFlow图。这样做是有用的,因为梯度计算可能非常昂贵,以图形模式执行可以显著加快计算速度。

3.3 定义梯度下降步骤

接下来,我们定义一个函数来编译总损失函数并计算总损失对网络模型参数的梯度:

@tf.functiondef train_step(X, X_init, IC_weight, ODE_weight, model):    """计算总损失对网络模型参数的梯度。        参数:    ----    X:用于计算ODE残差的训练数据集    X_init:用于计算初始条件的训练数据集    IC_weight:初始条件损失的权重    ODE_weight:ODE损失的权重    model:DeepONet模型        输出:    --------    ODE_loss:计算得到的ODE损失    IC_loss:计算得到的初始条件损失    total_loss:ODE损失和初始条件损失的加权和    gradients:总损失对网络模型参数的梯度。    """    with tf.GradientTape() as tape:        tape.watch(model.trainable_weights)        # 初始条件预测        y_pred_IC = model({"forcing": X_init[:, 1:-1], "time": X_init[:, :1]})        # 方程残差        ODE_residual = ODE_residual_calculator(t=X[:, :1], u=X[:, 1:-1], u_t=X[:, -1:], model=model)        # 计算损失        IC_loss = tf.reduce_mean(keras.losses.mean_squared_error(0, y_pred_IC))        ODE_loss = tf.reduce_mean(tf.square(ODE_residual))                # 总损失        total_loss = IC_loss*IC_weight + ODE_loss*ODE_weight    gradients = tape.gradient(total_loss, model.trainable_variables)    return ODE_loss, IC_loss, total_loss, gradients

在上面的代码中:

  1. 我们只考虑两个损失项:与初始条件相关的损失IC_loss和ODE残差损失ODE_loss。通过将模型预测的s(t=0)与已知的初始值0进行比较来计算IC_loss,通过调用我们之前定义的ODE_residual_calculator函数来计算ODE_loss。如果可用的话,数据损失也可以计算并添加到总损失中,这需要测量的s(t)值(在上面的代码中未实现)。
  2. 总损失一般是IC_lossODE_loss的加权和,其中权重控制在训练过程中对个别损失项的重视程度或优先级。在我们的案例研究中,将IC_weightODE_weight都设置为1就足够了。
  3. 与计算ODE_loss的方式类似,我们也采用了tf.GradientTape()作为上下文管理器来计算梯度。然而,在这里我们计算的是总损失对网络模型参数的梯度,这是进行梯度下降所必需的。

在我们继续之前,让我们快速总结一下我们已经开发的内容:

1️⃣ 我们可以使用 create_model() 函数初始化一个 DeepONet 模型。

2️⃣ 我们可以计算 ODE 残差来评估模型预测与控制 ODE 的粘附程度。这可以通过 ODE_residual_calculator 函数来实现。

3️⃣ 我们可以使用 train_step 计算总损失及其对网络模型参数的梯度。

现在,准备工作已经完成一半 🚀 在下一部分中,我们将讨论数据生成和数据组织问题(上面代码中的奇怪的 X[:, :1] 将有望变得清晰)。之后,我们最终可以训练模型并看看它的表现。

4. 数据生成和组织

在本部分中,我们将讨论合成数据的生成以及如何为训练物理感知的 DeepONet 模型进行组织。

4.1 生成 u(·) 的曲线

用于训练、验证和测试的数据将通过合成生成。这种方法的理由有两个:不仅方便,而且可以完全控制数据的特性。

在我们的案例研究中,我们将使用一个均值为零的 高斯过程 以及径向基函数(RBF)核来生成输入函数 u(·)

高斯过程是一种在机器学习中常用的建模函数的强大数学框架。RBF 核是捕捉输入点之间相似性的一种常用选择。通过在高斯过程中使用 RBF 核,我们确保生成的合成数据呈现出平滑连续的模式,这在各种应用中通常是可取的。要了解更多关于高斯过程的信息,请随时查看我的以前的博客。

在 scikit-learn 中,我们只需要几行代码即可实现:

from sklearn.gaussian_process import GaussianProcessRegressorfrom sklearn.gaussian_process.kernels import RBFdef create_samples(length_scale, sample_num):    """为 u(·) 创建合成数据        Args:    ----    length_scale: float,RNF 核的长度尺度    sample_num: 要生成的 u(·) 曲线的数量        Outputs:    --------    u_sample: 生成的 u(·) 曲线    """    # 使用给定的长度尺度定义核    kernel = RBF(length_scale)    # 创建高斯过程回归器    gp = GaussianProcessRegressor(kernel=kernel)    # 协方差点的位置    X_sample = np.linspace(0, 1, 100).reshape(-1, 1)         # 创建样本    u_sample = np.zeros((sample_num, 100))    for i in range(sample_num):        # 直接从先验中进行采样        n = np.random.randint(0, 10000)        u_sample[i, :] = gp.sample_y(X_sample, random_state=n).flatten()              return u_sample

在上面的代码中:

  1. 我们使用 length_scale 来控制生成函数的形状。对于 RBF 核,图 11 显示了给定不同核长度尺度的 u(·) 曲线。
  2. 回想一下,在将 u(·) 输入 DeepONet 之前,我们需要对其进行离散化。这通过指定一个 X_sample 变量来完成,该变量在我们感兴趣的时间域内分配了 100 个均匀分布的点。
  3. 在 scikit-learn 中,GaussianProcessRegressor 对象提供了一个 sample_y 方法,允许从具有指定长度尺度的核的高斯过程中绘制随机样本。请注意,我们在使用 GaussianProcessRegressor 对象之前没有调用 .fit(),这与我们通常在其他 scikit-learn 回归器中所做的不同。这是有意的,因为我们希望 GaussianProcessRegressor 使用我们提供的 确切的 length_scale。如果调用 .fit()length_scale 将被优化为另一个值,以更好地适应给定的任何数据。
  4. 输出 u_sample 是一个维度为 sample_num * 100 的矩阵。每行 u_sample 代表一个 u(·) 曲线的剖面,由 100 个离散值组成。
Figure 11. 不同核长度下的合成u(·)配置文件。(图片由作者提供)

4.2 数据集的生成

现在我们已经生成了u(·)配置文件,让我们专注于如何组织数据集,以便将其输入DeepONet模型。

回想一下,在上一节中我们开发的DeepONet模型需要三个输入:

  1. 时间坐标t,它是一个介于0和1之间的标量(暂时不考虑批量大小);
  2. u(·)配置文件,它是一个向量,包含在0和1之间的预定义固定时间坐标处评估的u(·)值;
  3. u(t)的值,它再次是一个标量。这个u(t)值用于计算时间坐标t处的ODE损失。

因此,我们可以这样定义一个单个样本:

(图片由作者提供)

当然,对于每个u(·)配置文件(在上图中用绿色标记),我们应该考虑多个t(以及对应的u(t))来评估ODE损失,以更好地施加物理约束。理论上,t可以在考虑的时间域内取任何值(即在我们的案例研究中在0和1之间)。然而,为了简化事情,我们将只考虑在u(·)配置文件离散化的相同时间位置处的t。因此,我们的更新后的数据集将是这样的:

(图片由作者提供)

请注意,上述讨论只考虑了单个u(·)配置文件。如果我们考虑所有的u(·)配置文件,我们的最终数据集将是这样的:

(图片由作者提供)

其中N代表u(·)配置文件的数量。现在有了这个想法,让我们看一些代码:

from tqdm import tqdmfrom scipy.integrate import solve_ivpdef generate_dataset(N, length_scale, ODE_solve=False):    """生成物理知识DeepONet训练的数据集。        Args:    ----    N: int,u(·)配置文件的数量    length_scale: float,RNF核的长度尺度    ODE_solve: boolean,指示是否计算相应的s(·)        Outputs:    --------    X: t,u(·)配置文件和u(t)的数据集    y: 相应ODE解s(·)的数据集    """        # 创建随机场    random_field = create_samples(length_scale, N)        # 编译数据集    X = np.zeros((N*100, 100+2))    y = np.zeros((N*100, 1))    for i in tqdm(range(N)):        u = np.tile(random_field[i, :], (100, 1))        t = np.linspace(0, 1, 100).reshape(-1, 1)        # 在t处评估u(·)        u_t = np.diag(u).reshape(-1, 1)        # 更新总体矩阵        X[i*100:(i+1)*100, :] = np.concatenate((t, u, u_t), axis=1)        # 解ODE        if ODE_solve:            sol = solve_ivp(lambda var_t, var_s: np.interp(var_t, t.flatten(), random_field[i, :]),                             t_span=[0, 1], y0=[0], t_eval=t.flatten(), method='RK45')            y[i*100:(i+1)*100, :] = sol.y[0].reshape(-1, 1)            return X, y

在上面的代码中,我们添加了一个选项来计算给定u(·)配置文件的相应s(·)。虽然我们在训练中不会使用s(·)值,但在测试模型性能时仍然需要它们。s(·)的计算是通过使用scipy.integrate.solve_ivp实现的,它是来自SciPy的ODE求解器,专门用于解决初值问题。

现在我们可以生成训练、验证和测试数据集。请注意,对于这个案例研究,我们将使用长度尺度为0.4来生成u(·)配置文件并训练物理信息DeepONet。

# 创建训练数据集
N_train = 2000
length_scale_train = 0.4
X_train, y_train = generate_dataset(N_train, length_scale_train)
# 创建验证数据集
N_val = 100
length_scale_test = 0.4
X_val, y_val = generate_dataset(N_val, length_scale_test)
# 创建测试数据集
N_test = 100
length_scale_test = 0.4
X_test, y_test = generate_dataset(N_test, length_scale_test, ODE_solve=True)

4.3 数据集组织

最后,我们将NumPy数组转换为TensorFlow数据集对象,以便进行数据摄取。

# 确定批大小
ini_batch_size = int(2000/100)
col_batch_size = 2000
# 创建数据集对象(初始条件)
X_train_ini = tf.convert_to_tensor(X_train[X_train[:, 0]==0], dtype=tf.float32)
ini_ds = tf.data.Dataset.from_tensor_slices((X_train_ini))
ini_ds = ini_ds.shuffle(5000).batch(ini_batch_size)
# 创建数据集对象(插值点)
X_train = tf.convert_to_tensor(X_train, dtype=tf.float32)
train_ds = tf.data.Dataset.from_tensor_slices((X_train))
train_ds = train_ds.shuffle(100000).batch(col_batch_size)
# 缩放
mean = {'forcing': np.mean(X_train[:, 1:-1], axis=0), 'time': np.mean(X_train[:, :1], axis=0)}
var = {'forcing': np.var(X_train[:, 1:-1], axis=0), 'time': np.var(X_train[:, :1], axis=0)}

在上面的代码中,我们创建了两个不同的数据集:一个用于评估ODE损失(train_ds),另一个用于评估初始条件损失(ini_ds)。我们还预先计算了t和u(·)的均值和方差。这些值将用于对输入进行标准化。

这就是数据生成和组织的全部内容。接下来,我们将开始模型训练并查看其表现。

5. 训练物理信息DeepONet

首先,让我们创建一个自定义类来跟踪损失的演变:

from collections import defaultdict
class LossTracking:
    def __init__(self):
        self.mean_total_loss = keras.metrics.Mean()
        self.mean_IC_loss = keras.metrics.Mean()
        self.mean_ODE_loss = keras.metrics.Mean()
        self.loss_history = defaultdict(list)
        
    def update(self, total_loss, IC_loss, ODE_loss):
        self.mean_total_loss(total_loss)
        self.mean_IC_loss(IC_loss)
        self.mean_ODE_loss(ODE_loss)
        
    def reset(self):
        self.mean_total_loss.reset_states()
        self.mean_IC_loss.reset_states()
        self.mean_ODE_loss.reset_states()
        
    def print(self):
        print(f"IC={self.mean_IC_loss.result().numpy():.4e}, \
              ODE={self.mean_ODE_loss.result().numpy():.4e}, \
              total_loss={self.mean_total_loss.result().numpy():.4e}")
        
    def history(self):
        self.loss_history['total_loss'].append(self.mean_total_loss.result().numpy())
        self.loss_history['IC_loss'].append(self.mean_IC_loss.result().numpy())
        self.loss_history['ODE_loss'].append(self.mean_ODE_loss.result().numpy())

然后,我们定义主要的训练/验证逻辑:

# 设置训练配置
n_epochs = 300
IC_weight = tf.constant(1.0, dtype=tf.float32)
ODE_weight = tf.constant(1.0, dtype=tf.float32)
loss_tracker = LossTracking()
val_loss_hist = []
# 设置优化器
optimizer = keras.optimizers.Adam(learning_rate=1e-3)
# 实例化PINN模型
PI_DeepONet = create_model(mean, var)
PI_DeepONet.compile(optimizer=optimizer)
# 配置回调函数
callbacks = [keras.callbacks.ReduceLROnPlateau(factor=0.5, patience=30), 
             tf.keras.callbacks.ModelCheckpoint('NN_model.h5', monitor='val_loss', save_best_only=True)]
callbacks = tf.keras.callbacks.CallbackList(_callbacks, add_history=False, model=PI_DeepONet)
# 开始训练过程
for epoch in range(1, n_epochs + 1):
    print(f"Epoch {epoch}:")
    for X_init, X in zip(ini_ds, train_ds):
        # 计算梯度
        ODE_loss, IC_loss, total_loss, gradients = train_step(X, X_init, IC_weight, ODE_weight, PI_DeepONet)
        # 梯度下降
        PI_DeepONet.optimizer.apply_gradients(zip(gradients, PI_DeepONet.trainable_variables))
        # 损失跟踪
        loss_tracker.update(total_loss, IC_loss, ODE_loss)
    # 损失摘要
    loss_tracker.history()
    loss_tracker.print()
    loss_tracker.reset()
    ####### 验证
    val_res = ODE_residual_calculator(X_val[:, :1], X_val[:, 1:-1], X_val[:, -1:], PI_DeepONet)
    val_ODE = tf.cast(tf.reduce_mean(tf.square(val_res)), tf.float32)
    X_val_ini = X_val[X_val[:, 0]==0]
    pred_ini_valid = PI_DeepONet.predict({"forcing": X_val_ini[:, 1:-1], "time": X_val_ini[:, :1]}, batch_size=12800)
    val_IC = tf.reduce_mean(keras.losses.mean_squared_error(0, pred_ini_valid))
    print(f"val_IC: {val_IC.numpy():.4e}, val_ODE: {val_ODE.numpy():.4e}, lr: {PI_DeepONet.optimizer.lr.numpy():.2e}")
    # 回调函数
    callbacks.on_epoch_end(epoch, logs={'val_loss': val_IC+val_ODE})
    val_loss_hist.append(val_IC+val_ODE)
    # 重新洗牌数据集
    ini_ds = tf.data.Dataset.from_tensor_slices((X_train_ini))
    ini_ds = ini_ds.shuffle(5000).batch(ini_batch_size)
    train_ds = tf.data.Dataset.from_tensor_slices((X_train))
    train_ds = train_ds.shuffle(100000).batch(col_batch_size)

这是一段相当长的代码块,但是由于我们已经涵盖了所有重要的部分,所以它应该是不言自明的。

为了可视化训练性能,我们可以绘制损失收敛曲线:

# 历史图,ax = plt.subplots(1, 3, figsize=(12, 4))ax[0].plot(range(n_epochs), loss_tracker.loss_history['IC_loss'])ax[1].plot(range(n_epochs), loss_tracker.loss_history['ODE_loss'])ax[2].plot(range(n_epochs), val_loss_hist)ax[0].set_title('IC Loss')ax[1].set_title('ODE Loss')ax[2].set_title('Val Loss')for axs in ax:    axs.set_yscale('log')

训练结果如下所示:

图12:损失收敛图(作者提供的图片)

此外,我们还可以看到在训练过程中某个特定目标 s(·) 的预测准确性如何演变:

在训练开始时,我们可以看到模型预测与真实值之间存在明显差异。然而,随着训练的结束,预测的 s(·) 收敛到了真实值。这表明我们的基于物理信息的 DeepONet 学习得很好。

6. 结果讨论

训练完成后,我们可以重新加载保存的权重并评估性能。

在这里,我们随机选择了测试数据集中的三个 u(·) 概况,并比较了由我们的基于物理信息的 DeepONet 预测的相应 s(·) 和由数值 ODE 求解器计算的结果。我们可以看到预测结果和真实值几乎无法区分。

图13:从测试数据集中随机选择的三个 u(·) 概况,显示在上排。下排显示了相应的 s(·) 概况。我们可以看到,基于物理信息的 DeepONet 预测的结果与由数值 ODE 求解器计算的真实值几乎无法区分(作者提供的图片)

考虑到我们甚至没有使用任何 s(·) 的观测数据(除了初始条件)来训练 DeepONet,这些结果相当令人难以置信。这表明控制 ODE 本身为模型提供了足够的“监督”信号,使其能够进行准确的预测。

另一个有趣的评估指标是所谓的“超出分布”预测能力。由于我们在训练 DeepONet 时强制执行了控制方程,我们可以期望经过训练的基于物理信息的 DeepONet 在 u(·) 概况在训练 u(·) 分布之外时仍能够进行合理的预测。

为了测试这一点,我们可以使用不同的长度尺度生成 u(·) 概况。以下结果显示了使用长度尺度为0.6生成的三个 u(·) 概况以及预测的 s(·)。考虑到基于物理信息的 DeepONet 是使用长度尺度为0.4进行训练的,这些结果相当不错。

图14:经过训练的基于物理信息的 DeepONet 显示出一定程度的超出分布预测能力(作者提供的图片)

然而,如果我们继续减小长度尺度到0.2,我们会注意到可见的差异开始出现。这表明经过训练的基于物理信息的 DeepONet 的泛化能力是有限的。

图15. 物理信息DeepONet的泛化能力存在限制。(图片由作者提供)

一般来说,更小的长度尺度会导致更复杂的u(·)曲线,这与训练时使用的u(·)曲线会有很大不同。这可能解释了为什么训练模型在更小的长度尺度区域中遇到了准确预测的挑战。

图16. 对于我们训练的模型来说,泛化到更小的长度尺度区域是具有挑战性的,因为u(·)曲线更加复杂且与训练数据有明显差异。(图片由作者提供)

总的来说,我们可以说开发的物理信息DeepONet能够正确学习系统动力学,并在仅有ODE约束的情况下从输入函数映射到输出函数。此外,物理信息DeepONet显示出一定水平的处理“超出分布”预测的能力,表明将模型训练与主导ODE相一致可以提高模型的泛化能力。

7. 要点

我们已经深入探讨了物理信息DeepONet。从理解DeepONet和物理信息学习的基本概念,到通过代码实现看到它们的实际应用,我们详细介绍了这种解决微分方程的强大方法。

以下是一些关键要点:

1️⃣ DeepONet 是一个强大的框架,用于进行运算符学习,得益于其分支和主干网络的创新架构。

2️⃣ 物理信息学习 将动力系统的主导微分方程明确地纳入学习过程中,因此具有提高模型解释性和泛化能力的潜力。

3️⃣ 物理信息DeepONet 结合了DeepONet和物理信息学习的优势,是一个有希望的工具,可以在遵循相关主导方程的同时学习功能映射。

希望您喜欢这次对物理信息DeepONet的深入探索。接下来,我们将转向使用物理信息DeepONet解决逆问题。敬请关注🤗

您可以在此处找到带有完整代码的伴随笔记本 💻

参考文献

[1] Lu等人,DeepONet:基于运算符的通用近似定理识别微分方程的非线性运算符学习。arXiv,2019。

[2] Wang等人,使用物理信息DeepOnets学习参数化偏微分方程的解算符。arXiv,2021。

Leave a Reply

Your email address will not be published. Required fields are marked *