PDE-Net for Convection-Diffusion Equation



PDE-Net is a feedforward deep network proposed by Zichao Long et al. to learn partial differential equations from data, predict the dynamic characteristics of complex systems accurately and uncover potential PDE models. The basic idea of PDE-Net is to approximate differential operators by learning convolution kernels (filters). Neural networks or other machine learning methods are applied to fit unknown nonlinear responses. Numerical experiments show that the model can identify the observed dynamical equations and predict the dynamical behavior over a relatively long period of time, even in noisy environments. More information can be found in PDE-Net: Learning PDEs from Data.

This notebook requires MindSpore version >= 2.0.0 to support new APIs including: mindspore.jit, mindspore.jit_class, mindspore.data_sink.

Problem Description

This case solves the inverse problem of convection-diffusion partial differential equations with variable parameters and realizes long-term prediction.

Governing Equation

In this case, the convection-diffusion equation is of the form:

\[u_t = a(x,y) u_x + b(x,y) u_y + c u_{xx} + d u_{yy}, \quad (x,y) \in[0,2 \pi] \times[0,2 \pi]\]
\[u|_{t=0} = u_0(x,y)\]

The coefficients of each derivative are:

\[a(x,y)=0.5(cos(y)+x(2\pi-x)sin(x))+0.6 \quad b(x,y)=2(cos(y)+sin(x))+0.8\]
\[c=0.2 \quad d=0.3\]

Model Structure of the PDE-Net

The PDE-Net consists of multiple \(\delta T\) Blocks in series to implement prediction of long sequence information. Each \(\delta T\) Block includes several moment matrixes of trainable parameters. The matrixes can be converted to convolution kernels according to a mapping relationship. Thereby the derivatives of the physical field can be obtained. After linearly combining the derivative and its corresponding physical quantity, the information of the next time step can be deduced by using the forward Euler method.



Technology Path

MindSpore Flow solves the problem as follows:

  1. Model Construction.

  2. Single Step Training.

  3. Multi-step Training.

  4. Model Evaluation and Visualization.

import os
import time
import numpy as np

import mindspore
from mindspore.common import set_seed
from mindspore import nn, Tensor, ops, jit
from mindspore.train.serialization import load_param_into_net

The following src pacakage can be downloaded in applications/data_mechanism_fusion/variant_linear_coe_pde_net/src.

from mindflow.cell import PDENet
from mindflow.utils import load_yaml_config
from mindflow.loss import get_loss_metric, RelativeRMSELoss
from mindflow.pde import UnsteadyFlowWithLoss

from src import init_model, create_dataset, calculate_lp_loss_error
from src import make_dir, scheduler, get_param_dic
from src import plot_coe, plot_extrapolation_error, get_label_coe, plot_test_error

Parameter can be modified in configuration file.

mindspore.set_context(mode=mindspore.GRAPH_MODE, device_target="GPU", device_id=4)
# load configuration yaml
config = load_yaml_config('pde_net.yaml')

Model Construction

MindSpore Flow provides the PDENet interface to directly create a PDE-Net model. You need to specify the width, height, data depth, boundary condition, and highest order of fitting.

def init_model(config):
    return PDENet(height=config["mesh_size"],
                  dx=2 * np.pi / config["mesh_size"],
                  dy=2 * np.pi / config["mesh_size"],

Single Step Training

The parameters of each \(\delta T\) block are shared. Therefore, the model is trained one by one based on the number of connected \(\delta T\) blocks. When step is 1, the model is in the warm-up phase. The moments of the PDE-Net are frozen. The parameters in the moments are not involved in training. Each time a \(\delta T\) block is added, the program generates data and reads data sets. After the model is initialized, the program loads the checkpoint trained in the previous step, defines the optimizer, mode, and loss function. During training process, the model reflects the model performance in real time based on the callback function.

def train_single_step(step, config, lr, train_dataset, eval_dataset):
    """train PDE-Net with advancing steps"""

    print("Current step for train loop: {}".format(step, ))
    model = init_model(config)

    epoch = config["epochs"]
    warm_up_epoch_scale = 10
    if step == 1:
        model.if_fronzen = True
        epoch = warm_up_epoch_scale * epoch
    elif step == 2:
        param_dict = get_param_dic(config["summary_dir"], step - 1, epoch * 10)
        load_param_into_net(model, param_dict)
        print("Load pre-trained model successfully")
        param_dict = get_param_dic(config["summary_dir"], step - 1, epoch)
        load_param_into_net(model, param_dict)
        print("Load pre-trained model successfully")

    optimizer = nn.Adam(model.trainable_params(), learning_rate=Tensor(lr))
    problem = UnsteadyFlowWithLoss(model, t_out=step, loss_fn=RelativeRMSELoss(), data_format="NTCHW")

    def forward_fn(u0, uT):
        loss = problem.get_loss(u0, uT)
        return loss

    grad_fn = mindspore.value_and_grad(forward_fn, None, optimizer.parameters, has_aux=False)

    def train_step(u0, uT):
        loss, grads = grad_fn(u0, uT)
        loss = ops.depend(loss, optimizer(grads))
        return loss

    steps = train_dataset.get_dataset_size()
    sink_process = mindspore.data_sink(train_step, train_dataset, sink_size=1)

    for cur_epoch in range(epoch):
        local_time_beg = time.time()

        for _ in range(steps):
            cur_loss = sink_process()
            print("epoch: %s, loss is %s" % (cur_epoch + 1, cur_loss), flush=True)
        local_time_end = time.time()
        epoch_seconds = (local_time_end - local_time_beg) * 1000
        step_seconds = epoch_seconds / steps
        print("Train epoch time: {:5.3f} ms, per step time: {:5.3f} ms".format
              (epoch_seconds, step_seconds), flush=True)

        if (cur_epoch + 1) % config["save_epoch_interval"] == 0:
            ckpt_file_name = "ckpt/step_{}".format(step)
            ckpt_dir = os.path.join(config["summary_dir"], ckpt_file_name)
            if not os.path.exists(ckpt_dir):
            ckpt_name = "pdenet-{}.ckpt".format(cur_epoch + 1, )
            mindspore.save_checkpoint(model, os.path.join(ckpt_dir, ckpt_name))

        if (cur_epoch + 1) % config['eval_interval'] == 0:
            calculate_lp_loss_error(problem, eval_dataset, config["batch_size"])

Multi-step Training

The PDE-Net is trained step by step. With MindSpore version >= 2.0.0, we can use the functional programming for training neural networks.

def train(config):
    lr = config["lr"]
    for i in range(1, config["multi_step"] + 1):
        db_name = "train_step{}.mindrecord".format(i)
        dataset = create_dataset(config, i, db_name, "train", data_size=2 * config["batch_size"])
        train_dataset, eval_dataset = dataset.create_train_dataset()
        lr = scheduler(int(config["multi_step"] / config["learning_rate_reduce_times"]), step=i, lr=lr)
        train_single_step(step=i, config=config, lr=lr, train_dataset=train_dataset, eval_dataset=eval_dataset)

if not os.path.exists(config["mindrecord_data_dir"]):
Model Evaluation and Visualization

After the model training is complete, run the visualization.py file to test and visualize the model training result. The following figure shows the comparison between the prediction result and label.

step = 20
test_data_size = 20

model = init_model(config)
param_dict = get_param_dic(config["summary_dir"], config["multi_step"], config["epochs"])
load_param_into_net(model, param_dict)

Plot Coefficient

coe_label = get_label_coe(max_order=config["max_order"], resolution=config["mesh_size"])
coes_out_dir = os.path.join(config["figure_out_dir"], "coes")
plot_coe(model.coe, coes_out_dir, prefix="coe_trained", step=step, title="Coefficient Regression Results of the PDE")
plot_coe(coe_label, coes_out_dir, prefix="coe_label", title="Data Labels for the Coefficients of the PDE")

Plot Test Error

dataset = create_dataset(config, step, "eval.mindrecord", "test", data_size=test_data_size)
test_dataset = dataset.create_test_dataset(step)
iterator_test_dataset = test_dataset.create_dict_iterator()
final_item = [_ for _ in iterator_test_dataset][-1]
plot_test_error(problem, get_loss_metric("mse"), final_item, step, config["mesh_size"], config["figure_out_dir"])
