# Copyright 2023 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""physical informed recurrent convolutional nerual network"""
import numpy as np
import mindspore as ms
from mindspore import Parameter, Tensor, nn, ops, lazy_inline
from ...utils.check_func import check_param_type, check_param_type_value
def lazy_inline_wrapper(backend):
"""lazy inline wrapper"""
if backend == "Ascend":
def deco(f):
f = lazy_inline(f)
return f
else:
def deco(f):
return f
return deco
class PeriodicPadding(nn.Cell):
r"""
PeriodicPadding pads input tensor according to given kernel size.
Args:
dim (int): The physical dimension of input. Length of the shape of a 2D input is 4, of a
3D input is 5. Data follows `NCHW` or `NCDHW` format.
kernel_size (int, tuple): Specifies the convolution kernel. If type of kernel_size is int,
last 2 or 3 dimension will be padded. If type of kernel_size is tuple, its
sequence should be (depth, height, width) for 3D and (height, width) for 2D.
Inputs:
**input** (Tensor) - Tensor of shape :math:`(batch\_size, channels, depth, height, width)` for 3D.
Tensor of shape :math:`(batch\_size, channels, height, width)` for 2D.
Outputs:
Tensor, has the same shape as `input`.
Raises:
TypeError: If `dim` is not an int.
TypeError: If `kernel_size` is not a int/tuple/list.
ValueError: If length of `kernel_size` is not the same as the value of `dim`.
Supported Platforms:
``Ascend`` ``GPU``
Examples:
>>> import numpy as np
>>> import mindspore as ms
>>> from mindspore import Tensor, ops, nn
>>> x= np.zeros((1,2, 48, 48, 48))
>>> pd = PeriodicPadding(3, (3,3,5))
>>> x = pd(Tensor(x))
>>> print(x.shape)
(1, 2, 50, 50, 52)
"""
def __init__(self, dim, kernel_size):
super().__init__()
check_param_type(dim, "dim", data_type=int)
check_param_type(kernel_size, "kernel_size", data_type=(int, tuple, list))
if isinstance(kernel_size, int):
self.pad = [int((kernel_size - 1) / 2) for _ in range(dim * 2)]
else:
if len(kernel_size) != dim:
raise ValueError("length of kernel size must be the same as dim")
kernel_size = list(kernel_size)
kernel_size.reverse()
self.pad = []
for size in kernel_size:
self.pad.append(int((size - 1) / 2))
self.pad.append(int((size - 1) / 2))
def construct(self, x):
x = ops.pad(x, padding=self.pad, mode="circular")
return x
[文档]class PeRCNN(nn.Cell):
r"""
Physics-embedded Recurrent Convolutional Neural Network (PeRCNN) Cell. It forcibly encodes given
physics structure to facilitate the learning of the spatiotemporal dynamics in sparse data regimes.
PeRCNN can be applied to a variety of problems regarding the PDE system, including forward and
inverse analysis, data-driven modeling, and discovery of PDEs.
For more details, please refers to the paper `Encoding physics to learn reaction–diffusion processes
<https://www.nature.com/articles/s42256-023-00685-7>`_ .
lazy_inline is used to accelerate the compile stage, but now it only functions in Ascend backends.
PeRCNN currently supports input with two physical components. For inputs with different shape, users
must manually add or remove corresponding parameters and pi_blocks.
Args:
dim (int): The physical dimension of input. Length of the shape of a 2D input is 4, of a
3D input is 5. Data follows `NCHW` or `NCDHW` format.
in_channels (int): The number of channels in the input space.
hidden_channels (int): Number of channels in the output space of parallel convolution layers.
kernel_size (int): Specifies the convolution kernel for parallel convolution layers.
dt (Union[int, float]): The time step of PeRCNN.
nu (Union[int, float]): The coefficient of diffusion term.
laplace_kernel (mindspore.Tensor): For 3D, Set size of kernel is :math:`(\text{kernel_size[0]},
\text{kernel_size[1]}, \text{kernel_size[2]})`, then the shape is :math:`(C_{out}, C_{in},
\text{kernel_size[0]}, \text{kernel_size[1]}, \text{kernel_size[1]})`. For 2D, Tensor of shape
:math:`(N, C_{in} / \text{groups}, \text{kernel_size[0]}, \text{kernel_size[1]})`, then the size of kernel
is :math:`(\text{kernel_size[0]}, \text{kernel_size[1]})`.
conv_layers_num (int): Number of parallel convolution layers. Default: ``3``.
padding (str): Boundary padding, currently only periodic padding supported. Default: ``periodic``.
compute_dtype (dtype.Number): The data type of PeRCNN. Default: ``mindspore.float32``.
Should be ``mindspore.float16`` or ``mindspore.float32``.
mindspore.float32 is recommended for GPU backends,
mindspore.float16 is recommended for Ascend backends.
Inputs:
- **x** (Tensor) - Tensor of shape :math:`(batch\_size, channels, depth, height, width)` for 3D.
Tensor of shape :math:`(batch\_size, channels, height, width)` for 2D.
Outputs:
Tensor, has the same shape as `x`.
Raises:
TypeError: If `dim`, `in_channels`, `hidden_channels`, `kernel_size` is not an int.
TypeError: If `dt` and `nu` is not an int nor a float.
Supported Platforms:
``Ascend`` ``GPU``
Examples:
>>> import numpy as np
>>> import mindspore as ms
>>> from mindflow.cell import PeRCNN
>>> laplace_3d = [[[[[0.0, 0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0, 0.0],
... [0.0, 0.0, -0.08333333333333333, 0.0, 0.0],
... [0.0, 0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0, 0.0]],
... [[0.0, 0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0, 0.0],
... [0.0, 0.0, 1.3333333333333333, 0.0, 0.0],
... [0.0, 0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0, 0.0]],
... [[0.0, 0.0, -0.08333333333333333, 0.0, 0.0],
... [0.0, 0.0, 1.3333333333333333, 0.0, 0.0],
... [-0.08333333333333333, 1.3333333333333333, -7.5, 1.3333333333333333, -0.08333333333333333],
... [0.0, 0.0, 1.3333333333333333, 0.0, 0.0],
... [0.0, 0.0, -0.08333333333333333, 0.0, 0.0]],
... [[0.0, 0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0, 0.0],
... [0.0, 0.0, 1.3333333333333333, 0.0, 0.0],
... [0.0, 0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0, 0.0]],
... [[0.0, 0.0, 0.0, 0.0, 0.0],
... [0.0, 0.0, 0.0, 0.0, 0.0], [0.0, 0.0, -0.08333333333333333, 0.0, 0.0],
... [0.0, 0.0, 0.0, 0.0, 0.0], [0.0, 0.0, 0.0, 0.0, 0.0]]]]]
>>> laplace = np.array(laplace_3d)
>>> grid_size = 48
>>> field = 100
>>> dx_3d = field / grid_size
>>> laplace_3d_kernel = ms.Tensor(1 / dx_3d**2 * laplace, dtype=ms.float32)
>>> rcnn_ms = PeRCNN(
... dim=3,
... in_channels=2,
... hidden_channels=2,
... kernel_size=1,
... dt=0.5,
... nu=0.274,
... laplace_kernel=laplace_3d_kernel,
... conv_layers_num=3,
... compute_dtype=ms.float32,
... )
>>> input = np.random.randn(1, 2, 48, 48, 48)
>>> input = ms.Tensor(input, ms.float32)
>>> output = rcnn_ms(input)
>>> print(output.shape)
(1, 2, 48, 48, 48)
"""
@lazy_inline_wrapper(ms.context.get_context(attr_key="device_target"))
def __init__(
self, dim, in_channels, hidden_channels, kernel_size, dt, nu,
laplace_kernel=None, conv_layers_num=3, padding="periodic", compute_dtype=ms.float32):
super().__init__()
check_param_type_value(dim, "dim", valid_value=(2, 3), data_type=int, exclude_type=bool)
check_param_type(in_channels, "in_channels", data_type=int, exclude_type=bool)
check_param_type(hidden_channels, "hidden_channels", data_type=int, exclude_type=bool)
check_param_type(kernel_size, "kernel_size", data_type=int, exclude_type=bool)
check_param_type(conv_layers_num, "conv_layers_num", data_type=int, exclude_type=bool)
check_param_type(dt, "dt", data_type=(int, float), exclude_type=bool)
check_param_type(nu, "nu", data_type=(int, float), exclude_type=bool)
self.in_channels = in_channels
self.hidden_channels = hidden_channels
self.kernel_size = kernel_size
self.input_stride = 1
self.compute_dtype = compute_dtype
self.padding = padding
self.dim = dim
self.dt = dt
self.nu = nu # nu from 0 to upper bound (two times the estimate)
self._nn_conv_table = {2: nn.Conv2d, 3: nn.Conv3d}
self._ops_conv_table = {2: ops.conv2d, 3: ops.conv3d}
if laplace_kernel is not None:
self.w_laplace = laplace_kernel
self.coef_u = Parameter(Tensor(np.random.rand(), dtype=self.compute_dtype), requires_grad=True)
self.coef_v = Parameter(Tensor(np.random.rand(), dtype=self.compute_dtype), requires_grad=True)
self.u_pi_block = nn.CellList()
self.v_pi_block = nn.CellList()
# Parallel conv layer for u
for i in range(conv_layers_num):
u_conv_layer = self._nn_conv_table[dim](
in_channels=in_channels,
out_channels=hidden_channels,
kernel_size=kernel_size,
stride=self.input_stride,
pad_mode="valid",
has_bias=True,
).to_float(self.compute_dtype)
self.u_pi_block.append(u_conv_layer)
u_conv_layer.update_parameters_name(f"u_{i}.")
# 1x1 layer conv for u
self.u_conv = self._nn_conv_table[dim](
in_channels=hidden_channels,
out_channels=1,
kernel_size=1,
stride=1,
has_bias=True,
).to_float(self.compute_dtype)
# Parallel conv layer for v
for i in range(conv_layers_num):
v_conv_layer = self._nn_conv_table[dim](
in_channels=in_channels,
out_channels=hidden_channels,
kernel_size=kernel_size,
stride=self.input_stride,
pad_mode="valid",
has_bias=True,
).to_float(self.compute_dtype)
self.v_pi_block.append(v_conv_layer)
v_conv_layer.update_parameters_name(f"v_{i}.")
# 1x1 layer conv for v
self.v_conv = self._nn_conv_table[dim](
in_channels=hidden_channels,
out_channels=1,
kernel_size=1,
stride=1,
has_bias=True,
).to_float(self.compute_dtype)
def construct(self, h):
"""construct function of PeRCNN"""
if self.padding == "periodic":
conv_padding = PeriodicPadding(self.dim, self.kernel_size)
else:
raise ValueError("unsupported padding type")
h_conv = conv_padding(h)
u_res = 1
v_res = 1
for conv in self.u_pi_block:
u_res = u_res * conv(h_conv)
for conv in self.v_pi_block:
v_res = v_res * conv(h_conv)
u_res = self.u_conv(u_res)
v_res = self.v_conv(v_res)
if self.w_laplace is not None:
laplace_padding = PeriodicPadding(self.dim, self.w_laplace.shape[2:])
h_lap = laplace_padding(h)
u_pad = h_lap[:, 0:1, ...]
v_pad = h_lap[:, 1:2, ...]
u_res = u_res + self.nu * ops.sigmoid(self.coef_u) * self._ops_conv_table[
self.dim
](u_pad, self.w_laplace)
v_res = v_res + self.nu * ops.sigmoid(self.coef_v) * self._ops_conv_table[
self.dim
](v_pad, self.w_laplace)
# previous state
if h.shape[1] < 2:
raise ValueError("input field should have at least two physical components")
u_prev = h[:, 0:1, ...]
v_prev = h[:, 1:2, ...]
u_next = u_prev + u_res * self.dt
v_next = v_prev + v_res * self.dt
out = ops.concat((u_next, v_next), axis=1)
return out