# Copyright 2021 Huawei Technologies Co., Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""
The GymEnvironment base class.
"""
import gym
from gym import spaces
import numpy as np
from mindspore.ops import operations as P
from mindspore_rl.environment.environment import Environment
from mindspore_rl.environment.space import Space
[docs]class GymEnvironment(Environment):
"""
The GymEnvironment class provides the functions to interact with
different environments.
Args:
params (dict): A dictionary contains all the parameters which are used to create the
instance of GymEnvironment, such as name of environment.
env_id (int): A integer which is used to set the seed of this environment.
Supported Platforms:
``Ascend`` ``GPU`` ``CPU``
Examples:
>>> env_params = {'name': 'CartPole-v0'}
>>> environment = GymEnvironment(env_params, 0)
>>> print(environment)
GymEnvironment<>
"""
def __init__(self, params, env_id=0):
super(GymEnvironment, self).__init__()
self.params = params
self._name = params['name']
self._env = gym.make(self._name)
if 'seed' in params:
self._env.seed(params['seed'] + env_id * 1000)
self._observation_space = self._space_adapter(self._env.observation_space)
self._action_space = self._space_adapter(self._env.action_space)
self._reward_space = Space((1,), np.float32)
self._done_space = Space((1,), np.bool_, low=0, high=2)
# reset op
reset_input_type = []
reset_input_shape = []
reset_output_type = [self._observation_space.ms_dtype,]
reset_output_shape = [self._observation_space.shape,]
self._reset_op = P.PyFunc(self._reset, reset_input_type,
reset_input_shape, reset_output_type, reset_output_shape)
# step op
step_input_type = (self._action_space.ms_dtype,)
step_input_shape = (self._action_space.shape,)
step_output_type = (self.observation_space.ms_dtype, self._reward_space.ms_dtype, self._done_space.ms_dtype)
step_output_shape = (self._observation_space.shape, self._reward_space.shape, self._done_space.shape)
self._step_op = P.PyFunc(
self._step, step_input_type, step_input_shape, step_output_type, step_output_shape)
self.action_dtype = self._action_space.ms_dtype
self.cast = P.Cast()
[docs] def reset(self):
"""
Reset the environment to the initial state. It is always used at the beginning of each
episode. It will return the value of initial state.
Returns:
A tensor which states for the initial state of environment.
"""
return self._reset_op()[0]
[docs] def step(self, action):
r"""
Execute the environment step, which means that interact with environment once.
Args:
action (Tensor): A tensor that contains the action information.
Returns:
- state (Tensor), the environment state after performing the action.
- reward (Tensor), the reward after performing the action.
- done (mindspore.bool\_), whether the simulation finishes or not.
"""
# Add cast ops for mixed precision case. Redundant cast ops will be eliminated automatically.
action = self.cast(action, self.action_dtype)
return self._step_op(action)
@property
def observation_space(self):
"""
Get the state space of the environment.
Returns:
A tuple which states for the space of state
"""
return self._observation_space
@property
def action_space(self):
"""
Get the action space of the environment.
Returns:
A tuple which states for the space of action
"""
return self._action_space
@property
def reward_space(self):
return self._reward_space
@property
def done_space(self):
return self._done_space
@property
def config(self):
return {}
def _reset(self):
"""
The python(can not be interpreted by mindspore interpreter) code of resetting the
environment. It is the main body of reset function. Due to Pyfunc, we need to
capsule python code into a function.
Returns:
A numpy array which states for the initial state of environment.
"""
s0 = self._env.reset()
# In some gym version, the obvervation space is announced to be float32, but get float64 from reset and step.
s0 = s0.astype(self.observation_space.np_dtype)
return s0
def _step(self, action):
"""
The python(can not be interpreted by mindspore interpreter) code of interacting with the
environment. It is the main body of step function. Due to Pyfunc, we need to
capsule python code into a function.
Args:
action(int or float): The action which is calculated by policy net. It could be integer
or float, according to different environment
Returns:
- s1 (numpy.array), the environment state after performing the action.
- r1 (numpy.array), the reward after performing the action.
- done (boolean), whether the simulation finishes or not.
"""
s, r, done, _ = self._env.step(action)
# In some gym version, the obvervation space is announced to be float32, but get float64 from reset and step.
s = s.astype(self.observation_space.np_dtype)
r = np.array([r]).astype(np.float32)
done = np.array([done])
return s, r, done
def _space_adapter(self, gym_space):
"""Transfer gym dtype to the dtype that is suitable for MindSpore"""
shape = gym_space.shape
gym_type = gym_space.dtype.type
# The dtype get from gym.space is np.int64, but step() accept np.int32 actually.
if gym_type == np.int64:
dtype = np.int32
# The float64 is not supported, cast to float32
elif gym_type == np.float64:
dtype = np.float32
else:
dtype = gym_type
if isinstance(gym_space, spaces.Discrete):
return Space(shape, dtype, low=0, high=gym_space.n)
return Space(shape, dtype, low=gym_space.low, high=gym_space.high)