2024-10-29 12:23:31 +08:00

1228 lines
63 KiB
Python

from legged_gym import LEGGED_GYM_ROOT_DIR, envs
import time
from warnings import WarningMessage
import numpy as np
import os
from isaacgym.torch_utils import *
from isaacgym import gymtorch, gymapi, gymutil
import torch
from torch import Tensor
from typing import Tuple, Dict
from legged_gym import LEGGED_GYM_ROOT_DIR
from legged_gym.envs.base.base_task import BaseTask
from legged_gym.utils.math import wrap_to_pi
from legged_gym.utils.isaacgym_utils import get_euler_xyz as get_euler_xyz_in_tensor
from legged_gym.utils.helpers import class_to_dict
from .h1_2_config import H1_2RoughCfg
from legged_gym.utils.terrain import Terrain
class H1_2Robot(BaseTask):
def __init__(self, cfg: H1_2RoughCfg, sim_params, physics_engine, sim_device, headless):
""" Parses the provided config file,
calls create_sim() (which creates, simulation and environments),
initilizes pytorch buffers used during training
Args:
cfg (Dict): Environment config file
sim_params (gymapi.SimParams): simulation parameters
physics_engine (gymapi.SimType): gymapi.SIM_PHYSX (must be PhysX)
device_type (string): 'cuda' or 'cpu'
device_id (int): 0, 1, ...
headless (bool): Run without rendering if True
"""
self.cfg = cfg
self.sim_params = sim_params
self.height_samples = None
self.debug_viz = False
self.init_done = False
self._parse_cfg(self.cfg)
super().__init__(self.cfg, sim_params, physics_engine, sim_device, headless)
if not self.headless:
self.set_camera(self.cfg.viewer.pos, self.cfg.viewer.lookat)
self._init_buffers()
self._prepare_reward_function()
self.init_done = True
def step(self, actions):
""" Apply actions, simulate, call self.post_physics_step()
Args:
actions (torch.Tensor): Tensor of shape (num_envs, num_actions_per_env)
"""
clip_actions = self.cfg.normalization.clip_actions
self.actions = torch.clip(actions, -clip_actions, clip_actions).to(self.device)
self.action_buffer = torch.cat((self.actions[:, None, :], self.action_buffer[:, :-1]), dim=1)
# step physics and render each frame
self.render()
for decimation_step in range(self.cfg.control.decimation):
# delays
curr_target_idx = torch.ceil(
(self.action_delay_steps - decimation_step) / self.cfg.control.decimation
).long()
assert (curr_target_idx >= 0).all(), "the action inx must not be negative"
self.actions_c = self.action_buffer[torch.arange(self.num_envs, device=self.device).unsqueeze(-1),
curr_target_idx,].squeeze()
self.torques = self._compute_torques(self.actions_c).view(self.torques.shape)
self.gym.set_dof_actuation_force_tensor(self.sim, gymtorch.unwrap_tensor(self.torques))
self.gym.simulate(self.sim)
if self.cfg.env.test:
elapsed_time = self.gym.get_elapsed_time(self.sim)
sim_time = self.gym.get_sim_time(self.sim)
if sim_time-elapsed_time>0:
time.sleep(sim_time-elapsed_time)
if self.device == 'cpu':
self.gym.fetch_results(self.sim, True)
self.gym.refresh_dof_state_tensor(self.sim)
self.post_physics_step()
# return clipped obs, clipped states (None), rewards, dones and infos
clip_obs = self.cfg.normalization.clip_observations
self.obs_buf = torch.clip(self.obs_buf, -clip_obs, clip_obs)
if self.privileged_obs_buf is not None:
self.privileged_obs_buf = torch.clip(self.privileged_obs_buf, -clip_obs, clip_obs)
return self.obs_buf, self.privileged_obs_buf, self.rew_buf, self.reset_buf, self.extras
def post_physics_step(self):
""" check terminations, compute observations and rewards
calls self._post_physics_step_callback() for common computations
calls self._draw_debug_vis() if needed
"""
self.gym.refresh_actor_root_state_tensor(self.sim)
self.gym.refresh_net_contact_force_tensor(self.sim)
self.gym.refresh_rigid_body_state_tensor(self.sim)
self.episode_length_buf += 1
self.common_step_counter += 1
# prepare quantities
self.base_pos[:] = self.root_states[:, 0:3]
self.base_quat[:] = self.root_states[:, 3:7]
self.rpy[:] = get_euler_xyz_in_tensor(self.base_quat[:])
self.base_lin_vel[:] = quat_rotate_inverse(self.base_quat, self.root_states[:, 7:10])
self.base_ang_vel[:] = quat_rotate_inverse(self.base_quat, self.root_states[:, 10:13])
self.projected_gravity[:] = quat_rotate_inverse(self.base_quat, self.gravity_vec)
self._post_physics_step_callback()
# compute observations, rewards, resets, ...
self.check_termination()
self.compute_reward()
env_ids = self.reset_buf.nonzero(as_tuple=False).flatten()
self.reset_idx(env_ids)
self.compute_observations() # in some cases a simulation step might be required to refresh some obs (for example body positions)
self.last_last_actions[:] = self.last_actions[:]
self.last_actions[:] = self.actions[:]
self.last_dof_vel[:] = self.dof_vel[:]
self.last_root_vel[:] = self.root_states[:, 7:13]
def check_termination(self):
""" Check if environments need to be reset
"""
self.reset_buf = torch.any(torch.norm(self.contact_forces[:, self.termination_contact_indices, :], dim=-1) > 1., dim=1)
self.reset_buf |= torch.logical_or(torch.abs(self.rpy[:,1])>1.0, torch.abs(self.rpy[:,0])>0.8)
self.time_out_buf = self.episode_length_buf > self.max_episode_length # no terminal reward for time-outs
self.reset_buf |= self.time_out_buf
def reset_idx(self, env_ids):
""" Reset some environments.
Calls self._reset_dofs(env_ids), self._reset_root_states(env_ids), and self._resample_commands(env_ids)
[Optional] calls self._update_terrain_curriculum(env_ids), self.update_command_curriculum(env_ids) and
Logs episode info
Resets some buffers
Args:
env_ids (list[int]): List of environment ids which must be reset
"""
if len(env_ids) == 0:
return
# reset robot states
self._reset_dofs(env_ids)
self._reset_root_states(env_ids)
self._resample_commands(env_ids)
self._reset_dof_props(env_ids)
# reset action delays
self.action_delay_steps[env_ids] = torch.randint_like(self.action_delay_steps_init, high=(int(self.action_delay_steps_init.max().item())+1))[env_ids]
# reset buffers
self.last_last_actions[env_ids] = 0.
self.last_actions[env_ids] = 0.
self.last_dof_vel[env_ids] = 0.
self.feet_air_time[env_ids] = 0.
self.single_contact_time[env_ids] = 0.
self.episode_length_buf[env_ids] = 0
self.reset_buf[env_ids] = 1
self.obs_hist_buffer[env_ids] = 0.
self.privileged_obs_hist_buffer[env_ids] = 0.
# fill extras
self.extras["episode"] = {}
for key in self.episode_sums.keys():
self.extras["episode"]['rew_' + key] = torch.mean(self.episode_sums[key][env_ids]) / self.max_episode_length_s
self.episode_sums[key][env_ids] = 0.
if self.cfg.commands.curriculum:
self.extras["episode"]["max_command_x"] = self.command_ranges["lin_vel_x"][1]
# send timeout info to the algorithm
if self.cfg.env.send_timeouts:
self.extras["time_outs"] = self.time_out_buf
# fix reset gravity bug
self.base_quat[env_ids] = self.root_states[env_ids, 3:7]
self.rpy = get_euler_xyz_in_tensor(self.base_quat[:])
self.projected_gravity[env_ids] = quat_rotate_inverse(self.base_quat[env_ids], self.gravity_vec[env_ids])
def compute_reward(self):
""" Compute rewards
Calls each reward function which had a non-zero scale (processed in self._prepare_reward_function())
adds each terms to the episode sums and to the total reward
"""
self.rew_buf[:] = 0.
for i in range(len(self.reward_functions)):
name = self.reward_names[i]
rew = self.reward_functions[i]() * self.reward_scales[name]
self.rew_buf += rew
self.episode_sums[name] += rew
if self.cfg.rewards.only_positive_rewards:
self.rew_buf[:] = torch.clip(self.rew_buf[:], min=0.)
# add termination reward after clipping
if "termination" in self.reward_scales:
rew = self._reward_termination() * self.reward_scales["termination"]
self.rew_buf += rew
self.episode_sums["termination"] += rew
def _get_phase(self):
ref_cycle_time = self.cfg.rewards.cycle_time
#if torch.sqrt(self.commands[:,0]**2 + self.commands[:,1]**2) < 1:
#time_scale = 1
#else:
time_scale = torch.sqrt(self.commands[:,0]**2 + self.commands[:,1]**2)# / (2.236-1) + 1
time_scale = torch.where(time_scale<1.2, ref_cycle_time, ref_cycle_time/2)
phase = self.episode_length_buf * self.dt / time_scale
return phase
def _get_gait_phase(self):
# return float mask 1 is stance, 0 is swing
phase = self._get_phase()
sin_pos = torch.sin(2 * torch.pi * phase)
# Add double support phase
stance_mask = torch.zeros((self.num_envs, 2), device=self.device)
# left foot stance
stance_mask[:, 0] = sin_pos >= 0
# right foot stance
stance_mask[:, 1] = sin_pos < 0
# Double support phase
stance_mask[torch.abs(sin_pos) < 0.1] = 1
#if self.status == "standing":
#stance_mask[:] = 1
return stance_mask
def compute_ref_state(self):
phase = self._get_phase()
sin_pos = torch.sin(2 * torch.pi * phase)
sin_pos_l = sin_pos.clone()
sin_pos_r = sin_pos.clone()
#print('ww')
#print(self.dof_pos.shape)
self.ref_dof_pos = torch.zeros_like(self.dof_pos)
scale_1 = self.cfg.rewards.target_joint_pos_scale
scale_2 = 2 * scale_1
# left foot stance phase set to default joint pos
sin_pos_l[sin_pos_l > 0] = 0
self.ref_dof_pos[:, 2] = sin_pos_l * scale_1
self.ref_dof_pos[:, 3] = -1 * sin_pos_l * scale_2
self.ref_dof_pos[:, 4] = sin_pos_l * scale_1
# right foot stance phase set to default joint pos
sin_pos_r[sin_pos_r < 0] = 0
self.ref_dof_pos[:, 6] = -1 * sin_pos_r * scale_1
self.ref_dof_pos[:, 9] = sin_pos_r * scale_2
self.ref_dof_pos[:, 10] = -1 * sin_pos_r * scale_1
# Double support phase
self.ref_dof_pos[torch.abs(sin_pos) < 0.1] = 0
self.ref_action = 2 * self.ref_dof_pos
def compute_observations(self):
""" Computes observations
"""
batch, _ = self.commands.shape
random_pos = torch_rand_float(-2,2,(batch,37), device=self.device).squeeze(1)
random_vel = torch_rand_float(-0.5,0.5,(batch,9), device=self.device).squeeze(1)
phase = self._get_phase()
self.compute_ref_state()
sin_pos = torch.sin(2 * torch.pi * phase).unsqueeze(1)
cos_pos = torch.cos(2 * torch.pi * phase).unsqueeze(1)
stance_mask = self._get_gait_phase()
contact_mask = self.contact_forces[:, self.feet_indices, 2] > 5.
self.command_input = torch.cat(
(sin_pos, cos_pos, self.commands[:, :3] * self.commands_scale), dim=1)
obs_buf = torch.cat((
self.base_ang_vel * self.obs_scales.ang_vel,
self.projected_gravity,
self.command_input,
(self.dof_pos - self.default_dof_pos) * self.obs_scales.dof_pos,
#random_pos[:,:9].cuda() * self.obs_scales.dof_pos,
self.dof_vel * self.obs_scales.dof_vel,
#random_vel.cuda() * self.obs_scales.dof_vel,
self.actions,
random_pos[:,:21].cuda()
),dim=-1)
privileged_obs_buf = torch.cat((
self.base_lin_vel * self.obs_scales.lin_vel,
self.base_ang_vel * self.obs_scales.ang_vel,
self.projected_gravity,
self.command_input,
(self.dof_pos - self.default_dof_pos) * self.obs_scales.dof_pos,
#random_pos[:,:9].cuda(),
self.dof_vel * self.obs_scales.dof_vel,
#random_vel.cuda(),
self.actions,
random_pos[:,:21].cuda()
),dim=-1)
# add perceptive inputs if not blind
# add noise if needed
if self.add_noise:
obs_buf += (2 * torch.rand_like(obs_buf) - 1) * self.noise_scale_vec
self.obs_hist_buffer = torch.cat((
self.obs_hist_buffer[:, 1:],
obs_buf[:, None, :]
), dim=1)
self.privileged_obs_hist_buffer = torch.cat((
self.privileged_obs_hist_buffer[:, 1:],
privileged_obs_buf[:, None, :]
), dim=1)
self.obs_buf = self.obs_hist_buffer.reshape((self.num_envs, self.num_obs))
self.privileged_obs_buf = self.privileged_obs_hist_buffer.reshape((self.num_envs, self.num_privileged_obs))
def create_sim(self):
""" Creates simulation, terrain and evironments
"""
self.up_axis_idx = 2 # 2 for z, 1 for y -> adapt gravity accordingly
self.sim = self.gym.create_sim(self.sim_device_id, self.graphics_device_id, self.physics_engine, self.sim_params)
mesh_type = self.cfg.terrain.mesh_type
if mesh_type in ['heightfield', 'trimesh']:
self.terrain = Terrain(self.cfg.terrain, self.num_envs)
if mesh_type == 'plane':
self._create_ground_plane()
elif mesh_type == 'heightfield':
self._create_heightfield()
elif mesh_type == 'trimesh':
self._create_trimesh()
elif mesh_type is not None:
raise ValueError("Terrain mesh type not recognised. Allowed types are [None, plane, heightfield, trimesh]")
# self._create_ground_plane()
self._create_envs()
def set_camera(self, position, lookat):
""" Set camera position and direction
"""
cam_pos = gymapi.Vec3(position[0], position[1], position[2])
cam_target = gymapi.Vec3(lookat[0], lookat[1], lookat[2])
self.gym.viewer_camera_look_at(self.viewer, None, cam_pos, cam_target)
#------------- Callbacks --------------
def _process_rigid_shape_props(self, props, env_id):
""" Callback allowing to store/change/randomize the rigid shape properties of each environment.
Called During environment creation.
Base behavior: randomizes the friction of each environment
Args:
props (List[gymapi.RigidShapeProperties]): Properties of each shape of the asset
env_id (int): Environment id
Returns:
[List[gymapi.RigidShapeProperties]]: Modified rigid shape properties
"""
if self.cfg.domain_rand.randomize_friction:
if env_id==0:
# prepare friction randomization
friction_range = self.cfg.domain_rand.friction_range
num_buckets = 64
bucket_ids = torch.randint(0, num_buckets, (self.num_envs, 1))
friction_buckets = torch_rand_float(friction_range[0], friction_range[1], (num_buckets,1), device='cpu')
self.friction_coeffs = friction_buckets[bucket_ids]
for s in range(len(props)):
props[s].friction = self.friction_coeffs[env_id]
return props
def _process_dof_props(self, props, env_id):
""" Callback allowing to store/change/randomize the DOF properties of each environment.
Called During environment creation.
Base behavior: stores position, velocity and torques limits defined in the URDF
Args:
props (numpy.array): Properties of each DOF of the asset
env_id (int): Environment id
Returns:
[numpy.array]: Modified DOF properties
"""
if env_id==0:
self.dof_pos_limits = torch.zeros(self.num_dof, 2, dtype=torch.float, device=self.device, requires_grad=False)
self.dof_vel_limits = torch.zeros(self.num_dof, dtype=torch.float, device=self.device, requires_grad=False)
self.torque_limits = torch.zeros(self.num_dof, dtype=torch.float, device=self.device, requires_grad=False)
for i in range(len(props)):
self.dof_pos_limits[i, 0] = props["lower"][i].item() * self.cfg.safety.pos_limit
self.dof_pos_limits[i, 1] = props["upper"][i].item() * self.cfg.safety.pos_limit
self.dof_vel_limits[i] = props["velocity"][i].item() * self.cfg.safety.vel_limit
self.torque_limits[i] = props["effort"][i].item() * self.cfg.safety.torque_limit
# soft limits
m = (self.dof_pos_limits[i, 0] + self.dof_pos_limits[i, 1]) / 2
r = self.dof_pos_limits[i, 1] - self.dof_pos_limits[i, 0]
self.dof_pos_limits[i, 0] = m - 0.5 * r * self.cfg.rewards.soft_dof_pos_limit
self.dof_pos_limits[i, 1] = m + 0.5 * r * self.cfg.rewards.soft_dof_pos_limit
return props
def _process_rigid_body_props(self, props, env_id):
# if env_id==0:
# sum = 0
# for i, p in enumerate(props):
# sum += p.mass
# print(f"Mass of body {i}: {p.mass} (before randomization)")
# print(f"Total mass {sum} (before randomization)")
# randomize base mass
if self.cfg.domain_rand.randomize_base_mass:
rng = self.cfg.domain_rand.added_mass_range
props[0].mass += np.random.uniform(rng[0], rng[1])
return props
def _post_physics_step_callback(self):
""" Callback called before computing terminations, rewards, and observations
Default behaviour: Compute ang vel command based on target and heading, compute measured terrain heights and randomly push robots
"""
#
env_ids = (self.episode_length_buf % int(self.cfg.commands.resampling_time / self.dt)==0).nonzero(as_tuple=False).flatten()
self._resample_commands(env_ids)
if self.cfg.commands.heading_command:
forward = quat_apply(self.base_quat, self.forward_vec)
heading = torch.atan2(forward[:, 1], forward[:, 0])
self.commands[:, 2] = torch.clip(0.5*wrap_to_pi(self.commands[:, 3] - heading), -1., 1.)
if self.cfg.domain_rand.push_robots and (self.common_step_counter % self.cfg.domain_rand.push_interval == 0):
self._push_robots()
# randomize dof properties
env_ids = (self.episode_length_buf % int(self.cfg.domain_rand.dof_prop_rand_interval) == 0).nonzero(
as_tuple=False).flatten()
self._reset_dof_props(env_ids)
if self.cfg.domain_rand.randomize_gravity and (self.common_step_counter % int(self.cfg.domain_rand.gravity_rand_interval) == 0):
self._randomize_gravity()
if self.cfg.domain_rand.randomize_gravity and (int(self.common_step_counter - self.cfg.domain_rand.gravity_rand_duration) % int(
self.cfg.domain_rand.gravity_rand_interval)) == 0:
self._randomize_gravity(torch.tensor([0, 0, 0]))
def _resample_commands(self, env_ids):
""" Randommly select commands of some environments
Args:
env_ids (List[int]): Environments ids for which new commands are needed
"""
self.commands[env_ids, 0] = torch_rand_float(self.command_ranges["lin_vel_x"][0], self.command_ranges["lin_vel_x"][1], (len(env_ids), 1), device=self.device).squeeze(1)
self.commands[env_ids, 1] = torch_rand_float(self.command_ranges["lin_vel_y"][0], self.command_ranges["lin_vel_y"][1], (len(env_ids), 1), device=self.device).squeeze(1)
if self.cfg.commands.heading_command:
self.commands[env_ids, 3] = torch_rand_float(self.command_ranges["heading"][0], self.command_ranges["heading"][1], (len(env_ids), 1), device=self.device).squeeze(1)
else:
self.commands[env_ids, 2] = torch_rand_float(self.command_ranges["ang_vel_yaw"][0], self.command_ranges["ang_vel_yaw"][1], (len(env_ids), 1), device=self.device).squeeze(1)
# set small commands to zero
self.commands[env_ids, :2] *= (torch.norm(self.commands[env_ids, :2], dim=1) > 0.2).unsqueeze(1)
def _compute_torques(self, actions):
""" Compute torques from actions.
Actions can be interpreted as position or velocity targets given to a PD controller, or directly as scaled torques.
[NOTE]: torques must have the same dimension as the number of DOFs, even if some DOFs are not actuated.
Args:
actions (torch.Tensor): Actions
Returns:
[torch.Tensor]: Torques sent to the simulation
"""
#pd controller
actions_scaled = actions * self.cfg.control.action_scale
actions_scaled[:, [0, 2, 6, 8]] *= self.cfg.control.hip_scale_reduction # scale down hip flexion range
control_type = self.cfg.control.control_type
if control_type == "actuator_net":
self.joint_pos_err = self.dof_pos - (actions_scaled + self.default_dof_pos) + self.motor_offsets
self.joint_vel = self.dof_vel
torques = self.actuator_network(self.joint_pos_err, self.joint_pos_err_last, self.joint_pos_err_last_last,
self.joint_vel, self.joint_vel_last, self.joint_vel_last_last)
self.joint_pos_err_last_last = torch.clone(self.joint_pos_err_last)
self.joint_pos_err_last = torch.clone(self.joint_pos_err)
self.joint_vel_last_last = torch.clone(self.joint_vel_last)
self.joint_vel_last = torch.clone(self.joint_vel)
elif control_type=="P":
# print(self.kp_factors.shape)
# print(self.default_dof_pos.shape)
# print(self.motor_offsets.shape)
# print(self.dof_vel.shape)
torques = self.p_gains*self.kp_factors*(actions_scaled + self.default_dof_pos - self.dof_pos + self.motor_offsets) - self.d_gains*self.kd_factors*self.dof_vel
elif control_type=="V":
torques = self.p_gains*(actions_scaled - self.dof_vel) - self.d_gains*(self.dof_vel - self.last_dof_vel)/self.sim_params.dt
elif control_type=="T":
torques = actions_scaled
else:
raise NameError(f"Unknown controller type: {control_type}")
torques = torques * self.motor_strengths
return torch.clip(torques, -self.torque_limits, self.torque_limits)
def _reset_dofs(self, env_ids):
""" Resets DOF position and velocities of selected environmments
Positions are randomly selected within 0.5:1.5 x default positions.
Velocities are set to zero.
Args:
env_ids (List[int]): Environemnt ids
"""
self.dof_pos[env_ids] = self.default_dof_pos * torch_rand_float(0.5, 1.5, (len(env_ids), self.num_dof), device=self.device)
env_ids_int32 = env_ids.to(dtype=torch.int32)
self.gym.set_dof_state_tensor_indexed(self.sim,
gymtorch.unwrap_tensor(self.dof_state),
gymtorch.unwrap_tensor(env_ids_int32), len(env_ids_int32))
def _reset_root_states(self, env_ids):
""" Resets ROOT states position and velocities of selected environmments
Sets base position based on the curriculum
Selects randomized base velocities within -0.5:0.5 [m/s, rad/s]
Args:
env_ids (List[int]): Environemnt ids
"""
# base position
if self.custom_origins:
self.root_states[env_ids] = self.base_init_state
self.root_states[env_ids, :3] += self.env_origins[env_ids]
self.root_states[env_ids, :2] += torch_rand_float(-1., 1., (len(env_ids), 2), device=self.device) # xy position within 1m of the center
else:
self.root_states[env_ids] = self.base_init_state
self.root_states[env_ids, :3] += self.env_origins[env_ids]
# base velocities
self.root_states[env_ids, 7:13] = torch_rand_float(-0.5, 0.5, (len(env_ids), 6), device=self.device) # [7:10]: lin vel, [10:13]: ang vel
env_ids_int32 = env_ids.to(dtype=torch.int32)
self.gym.set_actor_root_state_tensor_indexed(self.sim,
gymtorch.unwrap_tensor(self.root_states),
gymtorch.unwrap_tensor(env_ids_int32), len(env_ids_int32))
def _reset_dof_props(self, env_ids):
if self.cfg.domain_rand.randomize_pd_params:
min_Kp_factor, max_Kp_factor = self.cfg.domain_rand.kp_ratio_range
self.kp_factors[env_ids, :] = torch.rand(len(env_ids), dtype=torch.float, device=self.device,
requires_grad=False).unsqueeze(1) * (
max_Kp_factor - min_Kp_factor) + min_Kp_factor
min_Kd_factor, max_Kd_factor = self.cfg.domain_rand.kd_ratio_range
self.kd_factors[env_ids, :] = torch.rand(len(env_ids), dtype=torch.float, device=self.device,
requires_grad=False).unsqueeze(1) * (
max_Kd_factor - min_Kd_factor) + min_Kd_factor
if self.cfg.domain_rand.randomize_motor_strength:
min_strength, max_strength = self.cfg.domain_rand.motor_strength_range
self.motor_strengths[env_ids, :] = torch.rand(len(env_ids), dtype=torch.float, device=self.device,
requires_grad=False).unsqueeze(1) * (
max_strength - min_strength) + min_strength
if self.cfg.domain_rand.randomize_motor_offset:
min_offset, max_offset = self.cfg.domain_rand.motor_offset_range
self.motor_offsets[env_ids, :] = torch.rand(len(env_ids), self.num_dof, dtype=torch.float,
device=self.device, requires_grad=False) * (
max_offset - min_offset) + min_offset
def _randomize_gravity(self, external_force = None):
if external_force is not None:
self.gravities[:, :] = external_force.unsqueeze(0)
elif self.cfg.domain_rand.randomize_gravity:
min_gravity, max_gravity = self.cfg.domain_rand.gravity_range
external_force = torch.rand(3, dtype=torch.float, device=self.device,
requires_grad=False) * (max_gravity - min_gravity) + min_gravity
self.gravities[:, :] = external_force.unsqueeze(0)
sim_params = self.gym.get_sim_params(self.sim)
gravity = self.gravities[0, :] + torch.Tensor([0, 0, -9.8]).to(self.device)
self.gravity_vec[:, :] = gravity.unsqueeze(0) / torch.norm(gravity)
sim_params.gravity = gymapi.Vec3(gravity[0], gravity[1], gravity[2])
self.gym.set_sim_params(self.sim, sim_params)
def _push_robots(self):
""" Random pushes the robots. Emulates an impulse by setting a randomized base velocity.
"""
max_vel = self.cfg.domain_rand.max_push_vel_xy
self.root_states[:, 7:9] = torch_rand_float(-max_vel, max_vel, (self.num_envs, 2), device=self.device) # lin vel x/y
self.gym.set_actor_root_state_tensor(self.sim, gymtorch.unwrap_tensor(self.root_states))
def update_command_curriculum(self, env_ids):
""" Implements a curriculum of increasing commands
Args:
env_ids (List[int]): ids of environments being reset
"""
# If the tracking reward is above 80% of the maximum, increase the range of commands
if torch.mean(self.episode_sums["tracking_lin_vel"][env_ids]) / self.max_episode_length > 0.8 * self.reward_scales["tracking_lin_vel"]:
self.command_ranges["lin_vel_x"][0] = np.clip(self.command_ranges["lin_vel_x"][0] - 0.5, -self.cfg.commands.max_curriculum, 0.)
self.command_ranges["lin_vel_x"][1] = np.clip(self.command_ranges["lin_vel_x"][1] + 0.5, 0., self.cfg.commands.max_curriculum)
def _get_noise_scale_vec(self, cfg):
""" Sets a vector used to scale the noise added to the observations.
[NOTE]: Must be adapted when changing the observations structure
Args:
cfg (Dict): Environment config file
Returns:
[torch.Tensor]: Vector of scales used to multiply a uniform distribution in [-1, 1]
"""
# noise_vec = torch.zeros_like(self.obs_buf[0])
noise_vec = torch.zeros_like(self.obs_hist_buffer[0][0])
self.add_noise = self.cfg.noise.add_noise
noise_scales = self.cfg.noise.noise_scales
noise_level = self.cfg.noise.noise_level
# noise_vec[:3] = noise_scales.lin_vel * noise_level * self.obs_scales.lin_vel
# noise_vec[3:6] = noise_scales.ang_vel * noise_level * self.obs_scales.ang_vel
# noise_vec[6:9] = noise_scales.gravity * noise_level
# noise_vec[9:12] = 0. # commands
# noise_vec[12:12+self.num_actions] = noise_scales.dof_pos * noise_level * self.obs_scales.dof_pos
# noise_vec[12+self.num_actions:12+2*self.num_actions] = noise_scales.dof_vel * noise_level * self.obs_scales.dof_vel
# noise_vec[12+2*self.num_actions:12+3*self.num_actions] = 0. # previous actions
noise_vec[:3] = noise_scales.ang_vel * noise_level * self.obs_scales.ang_vel
noise_vec[3:6] = noise_scales.gravity * noise_level
noise_vec[6:9] = 0. # commands
noise_vec[9:9+self.num_actions] = noise_scales.dof_pos * noise_level * self.obs_scales.dof_pos
noise_vec[9+self.num_actions:9+2*self.num_actions] = noise_scales.dof_vel * noise_level * self.obs_scales.dof_vel
noise_vec[9+2*self.num_actions:9+3*self.num_actions] = 0. # previous actions
return noise_vec
#----------------------------------------
def _init_buffers(self):
""" Initialize torch tensors which will contain simulation states and processed quantities
"""
# get gym GPU state tensors
actor_root_state = self.gym.acquire_actor_root_state_tensor(self.sim)
dof_state_tensor = self.gym.acquire_dof_state_tensor(self.sim)
rigid_body_states_tensor = self.gym.acquire_rigid_body_state_tensor(self.sim)
net_contact_forces = self.gym.acquire_net_contact_force_tensor(self.sim)
self.gym.refresh_dof_state_tensor(self.sim)
self.gym.refresh_actor_root_state_tensor(self.sim)
self.gym.refresh_net_contact_force_tensor(self.sim)
# create some wrapper tensors for different slices
self.root_states = gymtorch.wrap_tensor(actor_root_state)
self.dof_state = gymtorch.wrap_tensor(dof_state_tensor)
self.rigid_body_states = gymtorch.wrap_tensor(rigid_body_states_tensor).view(self.num_envs, self.num_bodies, -1)
self.dof_pos = self.dof_state.view(self.num_envs, self.num_dof, 2)[..., 0]
self.dof_vel = self.dof_state.view(self.num_envs, self.num_dof, 2)[..., 1]
self.base_quat = self.root_states[:, 3:7]
self.rpy = get_euler_xyz_in_tensor(self.base_quat)
self.base_pos = self.root_states[:self.num_envs, 0:3]
self.contact_forces = gymtorch.wrap_tensor(net_contact_forces).view(self.num_envs, -1, 3) # shape: num_envs, num_bodies, xyz axis
self.ref_dof_pos = torch.zeros_like(self.dof_pos, device=self.device)
# initialize some data used later on
self.obs_hist_buffer = torch.zeros(self.num_envs, self.cfg.env.obs_context_len, self.cfg.env.num_observations_single, device=self.device)
self.privileged_obs_hist_buffer = torch.zeros(self.num_envs, self.cfg.env.obs_context_len, self.cfg.env.num_privileged_obs_single, device=self.device)
self.common_step_counter = 0
self.extras = {}
self.noise_scale_vec = self._get_noise_scale_vec(self.cfg)
self.forward_vec = to_torch([1., 0., 0.], device=self.device).repeat((self.num_envs, 1))
self.torques = torch.zeros(self.num_envs, self.num_actions, dtype=torch.float, device=self.device, requires_grad=False)
self.p_gains = torch.zeros(self.num_actions, dtype=torch.float, device=self.device, requires_grad=False)
self.d_gains = torch.zeros(self.num_actions, dtype=torch.float, device=self.device, requires_grad=False)
self.actions = torch.zeros(self.num_envs, self.num_actions, dtype=torch.float, device=self.device, requires_grad=False)
self.last_actions = torch.zeros(self.num_envs, self.num_actions, dtype=torch.float, device=self.device, requires_grad=False)
self.last_last_actions = torch.zeros(self.num_envs, self.num_actions, dtype=torch.float, device=self.device, requires_grad=False)
self.last_dof_vel = torch.zeros_like(self.dof_vel)
self.last_root_vel = torch.zeros_like(self.root_states[:, 7:13])
self.commands = torch.zeros(self.num_envs, self.cfg.commands.num_commands, dtype=torch.float, device=self.device, requires_grad=False) # x vel, y vel, yaw vel, heading
self.commands_scale = torch.tensor([self.obs_scales.lin_vel, self.obs_scales.lin_vel, self.obs_scales.ang_vel], device=self.device, requires_grad=False,) # TODO change this
self.feet_air_time = torch.zeros(self.num_envs, self.feet_indices.shape[0], dtype=torch.float, device=self.device, requires_grad=False)
self.single_contact_time = torch.zeros(self.num_envs, dtype=torch.float, device=self.device, requires_grad=False)
self.last_contacts = torch.zeros(self.num_envs, len(self.feet_indices), dtype=torch.bool, device=self.device, requires_grad=False)
self.base_lin_vel = quat_rotate_inverse(self.base_quat, self.root_states[:, 7:10])
self.base_ang_vel = quat_rotate_inverse(self.base_quat, self.root_states[:, 10:13])
self.projected_gravity = quat_rotate_inverse(self.base_quat, self.gravity_vec)
# action delay
action_delay = self.cfg.env.action_delay
if action_delay is not None:
ctrl_delay = torch.tensor(action_delay, device=self.device)
assert torch.allclose(
torch.round(ctrl_delay / self.sim_params.dt),
ctrl_delay / self.sim_params.dt,
), "ctrl_delay must be a multiple of the simulation dt"
assert (ctrl_delay >= 0).all(), "ctrl_delay can not be negative"
self.action_delay_steps_init = torch.round(ctrl_delay / self.sim_params.dt)
else:
self.action_delay_steps_init = torch.tensor(0, device=self.device, dtype=torch.int32)
self.action_delay_steps_init = self.action_delay_steps_init.repeat(self.num_envs, 1)
self.action_delay_steps = self.action_delay_steps_init.clone()
self.action_buffer = torch.zeros((self.num_envs, self.cfg.control.decimation, self.num_actions), device=self.device)
# joint positions offsets and PD gains
self.default_dof_pos = torch.zeros(self.num_dof, dtype=torch.float, device=self.device, requires_grad=False)
print('ioioioo')
print(self.num_dofs)
for i in range(self.num_dofs):
name = self.dof_names[i]
angle = self.cfg.init_state.default_joint_angles[name]
self.default_dof_pos[i] = angle
found = False
#print(self.cfg.control.stiffness.keys())
for dof_name in self.cfg.control.stiffness.keys():
if dof_name in name:
#print(i)
self.p_gains[i] = self.cfg.control.stiffness[dof_name]
self.d_gains[i] = self.cfg.control.damping[dof_name]
found = True
if not found:
self.p_gains[i] = 0.
self.d_gains[i] = 0.
if self.cfg.control.control_type in ["P", "V"]:
print(f"PD gain of joint {name} were not defined, setting them to zero")
# self.init_p_gains = self.init_p_gains.repeat(self.num_envs, 1)
# self.init_d_gains = self.init_d_gains.repeat(self.num_envs, 1)
# self.p_gains = self.init_p_gains.clone()
# self.d_gains = self.init_d_gains.clone()
self.default_dof_pos = self.default_dof_pos.unsqueeze(0)
# feet height for gait
self.last_feet_z = 0.05
self.feet_height = torch.zeros((self.num_envs, 2), device=self.device)
def _init_custom_buffers__(self):
# domain randomization properties
self.motor_strengths = torch.ones(self.num_envs, self.num_dof, dtype=torch.float, device=self.device,
requires_grad=False)
self.motor_offsets = torch.zeros(self.num_envs, self.num_dof, dtype=torch.float, device=self.device,
requires_grad=False)
self.gravity_vec = to_torch(get_axis_params(-1., self.up_axis_idx), device=self.device).repeat((self.num_envs, 1))
self.gravities = torch.zeros(self.num_envs, 3, dtype=torch.float, device=self.device,
requires_grad=False)
self.kp_factors = torch.ones(self.num_envs, self.num_dof, dtype=torch.float, device=self.device,
requires_grad=False)
self.kd_factors = torch.ones(self.num_envs, self.num_dof, dtype=torch.float, device=self.device,
requires_grad=False)
def _prepare_reward_function(self):
""" Prepares a list of reward functions, whcih will be called to compute the total reward.
Looks for self._reward_<REWARD_NAME>, where <REWARD_NAME> are names of all non zero reward scales in the cfg.
"""
# remove zero scales + multiply non-zero ones by dt
for key in list(self.reward_scales.keys()):
scale = self.reward_scales[key]
if scale==0:
self.reward_scales.pop(key)
else:
self.reward_scales[key] *= self.dt
# prepare list of functions
self.reward_functions = []
self.reward_names = []
for name, scale in self.reward_scales.items():
if name=="termination":
continue
self.reward_names.append(name)
name = '_reward_' + name
self.reward_functions.append(getattr(self, name))
# reward episode sums
self.episode_sums = {name: torch.zeros(self.num_envs, dtype=torch.float, device=self.device, requires_grad=False)
for name in self.reward_scales.keys()}
def _create_ground_plane(self):
""" Adds a ground plane to the simulation, sets friction and restitution based on the cfg.
"""
plane_params = gymapi.PlaneParams()
plane_params.normal = gymapi.Vec3(0.0, 0.0, 1.0)
plane_params.static_friction = self.cfg.terrain.static_friction
plane_params.dynamic_friction = self.cfg.terrain.dynamic_friction
plane_params.restitution = self.cfg.terrain.restitution
self.gym.add_ground(self.sim, plane_params)
def _create_heightfield(self):
""" Adds a heightfield terrain to the simulation, sets parameters based on the cfg.
"""
hf_params = gymapi.HeightFieldParams()
hf_params.column_scale = self.terrain.cfg.horizontal_scale
hf_params.row_scale = self.terrain.cfg.horizontal_scale
hf_params.vertical_scale = self.terrain.cfg.vertical_scale
hf_params.nbRows = self.terrain.tot_cols
hf_params.nbColumns = self.terrain.tot_rows
hf_params.transform.p.x = -self.terrain.cfg.border_size
hf_params.transform.p.y = -self.terrain.cfg.border_size
hf_params.transform.p.z = 0.0
hf_params.static_friction = self.cfg.terrain.static_friction
hf_params.dynamic_friction = self.cfg.terrain.dynamic_friction
hf_params.restitution = self.cfg.terrain.restitution
print(self.terrain.heightsamples.shape, hf_params.nbRows, hf_params.nbColumns)
self.gym.add_heightfield(self.sim, self.terrain.heightsamples.T, hf_params)
self.height_samples = torch.tensor(self.terrain.heightsamples).view(self.terrain.tot_rows,
self.terrain.tot_cols).to(self.device)
def _create_trimesh(self):
""" Adds a triangle mesh terrain to the simulation, sets parameters based on the cfg.
# """
tm_params = gymapi.TriangleMeshParams()
tm_params.nb_vertices = self.terrain.vertices.shape[0]
tm_params.nb_triangles = self.terrain.triangles.shape[0]
tm_params.transform.p.x = -self.terrain.cfg.border_size
tm_params.transform.p.y = -self.terrain.cfg.border_size
tm_params.transform.p.z = 0.0
tm_params.static_friction = self.cfg.terrain.static_friction
tm_params.dynamic_friction = self.cfg.terrain.dynamic_friction
tm_params.restitution = self.cfg.terrain.restitution
self.gym.add_triangle_mesh(self.sim, self.terrain.vertices.flatten(order='C'),
self.terrain.triangles.flatten(order='C'), tm_params)
self.height_samples = torch.tensor(self.terrain.heightsamples).view(self.terrain.tot_rows,
self.terrain.tot_cols).to(self.device)
def _create_envs(self):
""" Creates environments:
1. loads the robot URDF/MJCF asset,
2. For each environment
2.1 creates the environment,
2.2 calls DOF and Rigid shape properties callbacks,
2.3 create actor with these properties and add them to the env
3. Store indices of different bodies of the robot
"""
asset_path = self.cfg.asset.file.format(LEGGED_GYM_ROOT_DIR=LEGGED_GYM_ROOT_DIR)
asset_root = os.path.dirname(asset_path)
asset_file = os.path.basename(asset_path)
asset_options = gymapi.AssetOptions()
asset_options.default_dof_drive_mode = self.cfg.asset.default_dof_drive_mode
asset_options.collapse_fixed_joints = self.cfg.asset.collapse_fixed_joints
asset_options.replace_cylinder_with_capsule = self.cfg.asset.replace_cylinder_with_capsule
asset_options.flip_visual_attachments = self.cfg.asset.flip_visual_attachments
asset_options.fix_base_link = self.cfg.asset.fix_base_link
asset_options.density = self.cfg.asset.density
asset_options.angular_damping = self.cfg.asset.angular_damping
asset_options.linear_damping = self.cfg.asset.linear_damping
asset_options.max_angular_velocity = self.cfg.asset.max_angular_velocity
asset_options.max_linear_velocity = self.cfg.asset.max_linear_velocity
asset_options.armature = self.cfg.asset.armature
asset_options.thickness = self.cfg.asset.thickness
asset_options.disable_gravity = self.cfg.asset.disable_gravity
if self.cfg.asset.vhacd_enabled:
asset_options.vhacd_enabled = True
asset_options.vhacd_params = gymapi.VhacdParams()
asset_options.vhacd_params.resolution = self.cfg.asset.vhacd_params_resolution
robot_asset = self.gym.load_asset(self.sim, asset_root, asset_file, asset_options)
self.num_dof = self.gym.get_asset_dof_count(robot_asset)
self.num_bodies = self.gym.get_asset_rigid_body_count(robot_asset)
dof_props_asset = self.gym.get_asset_dof_properties(robot_asset)
rigid_shape_props_asset = self.gym.get_asset_rigid_shape_properties(robot_asset)
# save body names from the asset
body_names = self.gym.get_asset_rigid_body_names(robot_asset)
self.dof_names = self.gym.get_asset_dof_names(robot_asset)
self.num_bodies = len(body_names)
self.num_dofs = len(self.dof_names)
feet_names = [s for s in body_names if self.cfg.asset.foot_name in s]
knee_names = [s for s in body_names if self.cfg.asset.knee_name in s]
penalized_contact_names = []
for name in self.cfg.asset.penalize_contacts_on:
penalized_contact_names.extend([s for s in body_names if name in s])
termination_contact_names = []
for name in self.cfg.asset.terminate_after_contacts_on:
termination_contact_names.extend([s for s in body_names if name in s])
base_init_state_list = self.cfg.init_state.pos + self.cfg.init_state.rot + self.cfg.init_state.lin_vel + self.cfg.init_state.ang_vel
self.base_init_state = to_torch(base_init_state_list, device=self.device, requires_grad=False)
start_pose = gymapi.Transform()
start_pose.p = gymapi.Vec3(*self.base_init_state[:3])
self._get_env_origins()
env_lower = gymapi.Vec3(0., 0., 0.)
env_upper = gymapi.Vec3(0., 0., 0.)
self.actor_handles = []
self.envs = []
self._init_custom_buffers__()
self._randomize_gravity()
for i in range(self.num_envs):
# create env instance
env_handle = self.gym.create_env(self.sim, env_lower, env_upper, int(np.sqrt(self.num_envs)))
pos = self.env_origins[i].clone()
pos[:2] += torch_rand_float(-1., 1., (2,1), device=self.device).squeeze(1)
start_pose.p = gymapi.Vec3(*pos)
rigid_shape_props = self._process_rigid_shape_props(rigid_shape_props_asset, i)
self.gym.set_asset_rigid_shape_properties(robot_asset, rigid_shape_props)
actor_handle = self.gym.create_actor(env_handle, robot_asset, start_pose, self.cfg.asset.name, i, self.cfg.asset.self_collisions, 0)
dof_props = self._process_dof_props(dof_props_asset, i)
self.gym.set_actor_dof_properties(env_handle, actor_handle, dof_props)
body_props = self.gym.get_actor_rigid_body_properties(env_handle, actor_handle)
body_props = self._process_rigid_body_props(body_props, i)
self.gym.set_actor_rigid_body_properties(env_handle, actor_handle, body_props, recomputeInertia=True)
self.envs.append(env_handle)
self.actor_handles.append(actor_handle)
self.feet_indices = torch.zeros(len(feet_names), dtype=torch.long, device=self.device, requires_grad=False)
for i in range(len(feet_names)):
self.feet_indices[i] = self.gym.find_actor_rigid_body_handle(self.envs[0], self.actor_handles[0], feet_names[i])
self.knee_indices = torch.zeros(len(knee_names), dtype=torch.long, device=self.device, requires_grad=False)
for i in range(len(knee_names)):
self.knee_indices[i] = self.gym.find_actor_rigid_body_handle(self.envs[0], self.actor_handles[0], knee_names[i])
self.penalised_contact_indices = torch.zeros(len(penalized_contact_names), dtype=torch.long, device=self.device, requires_grad=False)
for i in range(len(penalized_contact_names)):
self.penalised_contact_indices[i] = self.gym.find_actor_rigid_body_handle(self.envs[0], self.actor_handles[0], penalized_contact_names[i])
self.termination_contact_indices = torch.zeros(len(termination_contact_names), dtype=torch.long, device=self.device, requires_grad=False)
for i in range(len(termination_contact_names)):
self.termination_contact_indices[i] = self.gym.find_actor_rigid_body_handle(self.envs[0], self.actor_handles[0], termination_contact_names[i])
self.rigid_body_states = self.gym.get_actor_rigid_body_states(env_handle, actor_handle, gymapi.STATE_ALL)
def _get_env_origins(self):
""" Sets environment origins. On rough terrain the origins are defined by the terrain platforms.
Otherwise create a grid.
"""
self.custom_origins = False
self.env_origins = torch.zeros(self.num_envs, 3, device=self.device, requires_grad=False)
# create a grid of robots
num_cols = np.floor(np.sqrt(self.num_envs))
num_rows = np.ceil(self.num_envs / num_cols)
xx, yy = torch.meshgrid(torch.arange(num_rows), torch.arange(num_cols))
spacing = self.cfg.env.env_spacing
self.env_origins[:, 0] = spacing * xx.flatten()[:self.num_envs]
self.env_origins[:, 1] = spacing * yy.flatten()[:self.num_envs]
self.env_origins[:, 2] = 0.
def _parse_cfg(self, cfg):
self.dt = self.cfg.control.decimation * self.sim_params.dt
self.obs_scales = self.cfg.normalization.obs_scales
self.reward_scales = class_to_dict(self.cfg.rewards.scales)
self.command_ranges = class_to_dict(self.cfg.commands.ranges)
self.max_episode_length_s = self.cfg.env.episode_length_s
self.max_episode_length = np.ceil(self.max_episode_length_s / self.dt)
self.cfg.domain_rand.push_interval = np.ceil(self.cfg.domain_rand.push_interval_s / self.dt)
self.cfg.domain_rand.gravity_rand_interval = np.ceil(self.cfg.domain_rand.gravity_rand_interval_s / self.dt)
self.cfg.domain_rand.gravity_rand_duration = np.ceil(
self.cfg.domain_rand.gravity_rand_interval * self.cfg.domain_rand.gravity_impulse_duration)
self.cfg.domain_rand.dof_prop_rand_interval = np.ceil(self.cfg.domain_rand.dof_prop_rand_interval_s / self.dt)
#------------ reward functions----------------
def _reward_lin_vel_z(self):
# Penalize z axis base linear velocity
return torch.square(self.base_lin_vel[:, 2])
def _reward_ang_vel_xy(self):
# Penalize xy axes base angular velocity
return torch.sum(torch.square(self.base_ang_vel[:, :2]), dim=1)
def _reward_orientation(self):
# Penalize non flat base orientation
return torch.sum(torch.square(self.projected_gravity[:, :2]), dim=1)
def _reward_base_height(self):
# Penalize base height away from target
base_height = self.root_states[:, 2]
return torch.square(base_height - self.cfg.rewards.base_height_target)
def _reward_torques(self):
# Penalize torques
return torch.sum(torch.square(self.torques), dim=1)
def _reward_dof_vel(self):
# Penalize dof velocities
return torch.sum(torch.square(self.dof_vel), dim=1)
def _reward_dof_acc(self):
# Penalize dof accelerations
return torch.sum(torch.square((self.last_dof_vel - self.dof_vel) / self.dt), dim=1)
def _reward_collision(self):
# Penalize collisions on selected bodies
return torch.sum(1.*(torch.norm(self.contact_forces[:, self.penalised_contact_indices, :], dim=-1) > 0.1), dim=1)
def _reward_termination(self):
# Terminal reward / penalty
return self.reset_buf * ~self.time_out_buf
def _reward_dof_pos_limits(self):
# Penalize dof positions too close to the limit
out_of_limits = -(self.dof_pos - self.dof_pos_limits[:, 0]).clip(max=0.) # lower limit
out_of_limits += (self.dof_pos - self.dof_pos_limits[:, 1]).clip(min=0.)
return torch.sum(out_of_limits, dim=1)
def _reward_dof_vel_limits(self):
# Penalize dof velocities too close to the limit
# clip to max error = 1 rad/s per joint to avoid huge penalties
return torch.sum((torch.abs(self.dof_vel) - self.dof_vel_limits*self.cfg.rewards.soft_dof_vel_limit).clip(min=0., max=1.), dim=1)
def _reward_torque_limits(self):
# penalize torques too close to the limit
return torch.sum((torch.abs(self.torques) - self.torque_limits*self.cfg.rewards.soft_torque_limit).clip(min=0.), dim=1)
def _reward_tracking_lin_vel(self):
# Tracking of linear velocity commands (xy axes)
lin_vel_error = torch.sum(torch.square(self.commands[:, :2] - self.base_lin_vel[:, :2]), dim=1)
return torch.exp(-lin_vel_error/self.cfg.rewards.tracking_sigma)
def _reward_tracking_ang_vel(self):
# Tracking of angular velocity commands (yaw)
ang_vel_error = torch.square(self.commands[:, 2] - self.base_ang_vel[:, 2])
return torch.exp(-ang_vel_error/self.cfg.rewards.tracking_sigma)
def _reward_feet_air_time(self):
# Reward long steps
# Need to filter the contacts because the contact reporting of PhysX is unreliable on meshes
contact = self.contact_forces[:, self.feet_indices, 2] > 1.
contact_filt = torch.logical_or(contact, self.last_contacts)
self.last_contacts = contact
first_contact = (self.feet_air_time > 0.) * contact_filt
self.feet_air_time += self.dt
rew_airTime = torch.sum((self.feet_air_time - 0.2) * first_contact, dim=1) # reward only on first contact with the ground
rew_airTime *= torch.norm(self.commands[:, :2], dim=1) > 0.1 #no reward for zero command
self.feet_air_time *= ~contact_filt
return rew_airTime
def _reward_stumble(self):
# Penalize feet hitting vertical surfaces
return torch.any(torch.norm(self.contact_forces[:, self.feet_indices, :2], dim=2) >\
5 *torch.abs(self.contact_forces[:, self.feet_indices, 2]), dim=1)
def _reward_stand_still(self):
# Penalize motion at zero commands
return torch.sum(torch.abs(self.dof_pos - self.default_dof_pos), dim=1) * (torch.norm(self.commands[:, :2], dim=1) < 0.1)
def _reward_feet_contact_forces(self):
# penalize high contact forces
return torch.sum((torch.norm(self.contact_forces[:, self.feet_indices, :], dim=-1) - self.cfg.rewards.max_contact_force).clip(min=0.), dim=1)
def _reward_action_rate(self):
# Penalize changes in actions
return torch.sum(torch.square(self.last_actions - self.actions), dim=1)
def _reward_action_smoothness(self):
# Penalize changes in actions
term_2 = torch.sum(torch.square(self.actions - 2.*self.last_actions + self.last_last_actions), dim=1)
term_3 = 0.05 * torch.sum(torch.abs(self.actions), dim=1)
return term_2 + term_3
def _reward_default_joint_pos(self):
"""
Calculates the reward for keeping joint positions close to default positions, with a focus
on penalizing deviation in yaw and roll directions. Excludes yaw and roll from the main penalty.
"""
joint_diff = self.dof_pos - self.default_dof_pos
left_yaw_roll = joint_diff[:, [0, 2]]
right_yaw_roll = joint_diff[:, [6, 8]]
yaw_roll = torch.norm(left_yaw_roll, dim=1) + torch.norm(right_yaw_roll, dim=1)
yaw_roll = torch.clamp(yaw_roll - 0.1, 0, 50)
return torch.exp(-yaw_roll * 100) - 0.01 * torch.norm(joint_diff, dim=1)
def _reward_feet_contact_number(self):
"""
Calculates a reward based on the number of feet contacts aligning with the gait phase.
Rewards or penalizes depending on whether the foot contact matches the expected gait phase.
"""
contact = self.contact_forces[:, self.feet_indices, 2] > 1.
# count the number of feet in contact at each time step
contact_count = torch.sum(contact, dim=1)
# check for single foot contact (exactly one foot in contact)
single_contact = (contact_count == 1)
# accumulate time where there was a single foot contact
self.single_contact_time += self.dt * single_contact.float()
# apply grace period: if single contact occurred at least once in the last 0.2s, grant reward
grace_period_contact = self.single_contact_time > 0.
rew_contact_number = grace_period_contact.float()
# Reset single contact time where there is no single foot contact
self.single_contact_time *= ~single_contact
# Reward is granted if non-standing command and single contact in the last 0.2s
non_standing_command = torch.norm(self.commands[:, :2], dim=1) > 0.1
rew_contact_number *= non_standing_command.float()
rew_contact_number += (~non_standing_command).float()
return rew_contact_number
def _reward_feet_parallel(self):
left_foot_quat = self.rigid_body_states[:, self.feet_indices[0], 3:7]
right_foot_quat = self.rigid_body_states[:, self.feet_indices[1], 3:7]
feet_dir_local = torch.zeros((self.num_envs, 3), device=self.device)
feet_dir_local[:,0] = 1
# 脚在世界坐标系的方向
left_foot_dir_world = quat_rotate(left_foot_quat, feet_dir_local.clone())
right_foot_dir_world = quat_rotate(right_foot_quat, feet_dir_local.clone())
# 脚在base坐标系的方向
left_foot_dir_base_y = quat_rotate_inverse(self.base_quat, left_foot_dir_world)[:,1]
right_foot_dir_base_y = quat_rotate_inverse(self.base_quat, right_foot_dir_world)[:,1]
feet_dir_y = torch.abs(left_foot_dir_base_y) + torch.abs(right_foot_dir_base_y)
return feet_dir_y
def _reward_energy(self):
return torch.sum(torch.square(self.torques * self.dof_vel), dim=1)
def _reward_knee_distance(self):
"""
Calculates the reward based on the distance between the knee of the humanoid.
"""
foot_pos = self.rigid_body_states[:, self.knee_indices, :2]
foot_dist = torch.norm(foot_pos[:, 0, :] - foot_pos[:, 1, :], dim=1)
fd = self.cfg.rewards.min_dist
max_df = self.cfg.rewards.max_dist / 2
d_min = torch.clamp(foot_dist - fd, -0.5, 0.)
d_max = torch.clamp(foot_dist - max_df, 0, 0.5)
return (torch.exp(-torch.abs(d_min) * 100) + torch.exp(-torch.abs(d_max) * 100)) / 2
def _reward_feet_distance(self):
"""
Calculates the reward based on the distance between the feet. Penalize feet get close to each other or too far away.
"""
foot_pos = self.rigid_body_states[:, self.feet_indices, :2]
foot_dist = torch.norm(foot_pos[:, 0, :] - foot_pos[:, 1, :], dim=1)
fd = self.cfg.rewards.min_dist
max_df = self.cfg.rewards.max_dist
d_min = torch.clamp(foot_dist - fd, -0.5, 0.)
d_max = torch.clamp(foot_dist - max_df, 0, 0.5)
return (torch.exp(-torch.abs(d_min) * 100) + torch.exp(-torch.abs(d_max) * 100)) / 2
def _reward_arm_pose(self):
return torch.sum(torch.abs(self.actions[:,13:]), dim=1)
def _reward_joint_pos(self):
"""
Calculates the reward based on the difference between the current joint positions and the target joint positions.
"""
joint_pos = self.dof_pos.clone() - self.default_dof_pos
pos_target = self.ref_dof_pos.clone()
diff = joint_pos - pos_target
r = torch.exp(-2 * torch.norm(diff, dim=1)) - 0.2 * torch.norm(diff, dim=1).clamp(0, 0.5)
return r #* (torch.norm(self.base_lin_vel[:, :2], dim=1) > 0.1)
def _reward_foot_slip(self):
"""
Calculates the reward for minimizing foot slip. The reward is based on the contact forces
and the speed of the feet. A contact threshold is used to determine if the foot is in contact
with the ground. The speed of the foot is calculated and scaled by the contact condition.
"""
contact = self.contact_forces[:, self.feet_indices, 2] > 5.
foot_speed_norm = torch.norm(self.rigid_body_states[:, self.feet_indices, 10:12], dim=2)
rew = torch.sqrt(foot_speed_norm)
rew *= contact
return torch.sum(rew, dim=1)
def _reward_feet_contact_number(self):
"""
Calculates a reward based on the number of feet contacts aligning with the gait phase.
Rewards or penalizes depending on whether the foot contact matches the expected gait phase.
"""
contact = self.contact_forces[:, self.feet_indices, 2] > 5.
stance_mask = self._get_gait_phase()
#if self.status == 'standing':
# stance_mask[:] = 1
reward = torch.where(contact == stance_mask, 1, -0.3)
return torch.mean(reward, dim=1)# * (torch.norm(self.base_lin_vel[:, :2], dim=1) > 0.1)
def _reward_feet_clearance(self):
"""
Calculates reward based on the clearance of the swing leg from the ground during movement.
Encourages appropriate lift of the feet during the swing phase of the gait.
"""
# Compute feet contact mask
contact = self.contact_forces[:, self.feet_indices, 2] > 5.
# Get the z-position of the feet and compute the change in z-position
feet_z = self.rigid_body_states[:, self.feet_indices, 2] - 0.05
delta_z = feet_z - self.last_feet_z
self.feet_height += delta_z
self.last_feet_z = feet_z
# Compute swing mask
swing_mask = 1 - self._get_gait_phase()
# feet height should be closed to target feet height at the peak
rew_pos = torch.abs(self.feet_height - self.cfg.rewards.target_feet_height) < 0.01
rew_pos = torch.sum(rew_pos * swing_mask, dim=1)
self.feet_height *= ~contact
return rew_pos #* (torch.norm(self.base_lin_vel[:, :2], dim=1) > 0.1)
def _reward_default_joint_pos(self):
"""
Calculates the reward for keeping joint positions close to default positions, with a focus
on penalizing deviation in yaw and roll directions. Excludes yaw and roll from the main penalty.
"""
joint_diff = self.dof_pos - self.default_dof_pos
left_yaw_roll = joint_diff[:, :2]
right_yaw_roll = joint_diff[:, 5: 7]
yaw_roll = torch.norm(left_yaw_roll, dim=1) + torch.norm(right_yaw_roll, dim=1)
yaw_roll = torch.clamp(yaw_roll - 0.1, 0, 50)
return torch.exp(-yaw_roll * 100) - 0.01 * torch.norm(joint_diff, dim=1)
def _reward_feet_symmetry(self):
left_foot = self.rigid_body_states[:, self.feet_indices[0], :3] - self.base_pos # left ankle
right_foot = self.rigid_body_states[:, self.feet_indices[1], :3] - self.base_pos # right ankle
left_foot_y = quat_rotate_inverse(self.base_quat, left_foot)[:,1]
right_foot_y = quat_rotate_inverse(self.base_quat, right_foot)[:,1]
feet_distance = (torch.abs(left_foot_y) - torch.abs(right_foot_y))**2
return feet_distance
def _reward_feet_contact_forces(self):
# penalize high contact forces
return torch.sum((torch.norm(self.contact_forces[:, self.feet_indices, :], dim=-1) - self.cfg.rewards.max_contact_force).clip(min=0.), dim=1)