diff --git a/legged_gym/envs/h1_2/README.md b/legged_gym/envs/h1_2/README.md new file mode 100644 index 0000000..4c491ff --- /dev/null +++ b/legged_gym/envs/h1_2/README.md @@ -0,0 +1,25 @@ +# H1_2 RL Example (Preview) + +## Simplified URDF + +This task utilizes a simplified version of URDF. We fix some joints and ignore most of the collisions. + +### Fixed Joints + +We fix all the joints in the hands, wrists and the elbow roll joints, since those joints have very limited effect on the whole body dynamics and are commonly controlled by other controllers. + +### Collisions + +We only keep the collision of foot roll links, knee links and the base. Early termination is majorly checked by the angular position of the base. + +## Dynamics + +Free "light" end effectors can lead to unstable simulation. Thus please be carefull with the control parameters for the joints that may affect such end effectors. + +## Results + +https://github.com/user-attachments/assets/a937e9c4-fe91-4240-88ea-d83b0160cad5 + +## Preview Stage + +**The reward functions are not well tuned and cannot produce satisfactory results at the current stage. A feasible version is comming soon.** diff --git a/legged_gym/envs/h1_2/__pycache__/h1_2.cpython-38.pyc b/legged_gym/envs/h1_2/__pycache__/h1_2.cpython-38.pyc new file mode 100644 index 0000000..4035be4 Binary files /dev/null and b/legged_gym/envs/h1_2/__pycache__/h1_2.cpython-38.pyc differ diff --git a/legged_gym/envs/h1_2/__pycache__/h1_2_config.cpython-38.pyc b/legged_gym/envs/h1_2/__pycache__/h1_2_config.cpython-38.pyc new file mode 100644 index 0000000..2c0b4a1 Binary files /dev/null and b/legged_gym/envs/h1_2/__pycache__/h1_2_config.cpython-38.pyc differ diff --git a/legged_gym/envs/h1_2/h1_2.py b/legged_gym/envs/h1_2/h1_2.py new file mode 100644 index 0000000..71d6938 --- /dev/null +++ b/legged_gym/envs/h1_2/h1_2.py @@ -0,0 +1,1227 @@ +from legged_gym import LEGGED_GYM_ROOT_DIR, envs +import time +from warnings import WarningMessage +import numpy as np +import os + +from isaacgym.torch_utils import * +from isaacgym import gymtorch, gymapi, gymutil + +import torch +from torch import Tensor +from typing import Tuple, Dict + +from legged_gym import LEGGED_GYM_ROOT_DIR +from legged_gym.envs.base.base_task import BaseTask +from legged_gym.utils.math import wrap_to_pi +from legged_gym.utils.isaacgym_utils import get_euler_xyz as get_euler_xyz_in_tensor +from legged_gym.utils.helpers import class_to_dict +from .h1_2_config import H1_2RoughCfg +from legged_gym.utils.terrain import Terrain + + +class H1_2Robot(BaseTask): + def __init__(self, cfg: H1_2RoughCfg, sim_params, physics_engine, sim_device, headless): + """ Parses the provided config file, + calls create_sim() (which creates, simulation and environments), + initilizes pytorch buffers used during training + + Args: + cfg (Dict): Environment config file + sim_params (gymapi.SimParams): simulation parameters + physics_engine (gymapi.SimType): gymapi.SIM_PHYSX (must be PhysX) + device_type (string): 'cuda' or 'cpu' + device_id (int): 0, 1, ... + headless (bool): Run without rendering if True + """ + self.cfg = cfg + self.sim_params = sim_params + self.height_samples = None + self.debug_viz = False + self.init_done = False + self._parse_cfg(self.cfg) + super().__init__(self.cfg, sim_params, physics_engine, sim_device, headless) + + if not self.headless: + self.set_camera(self.cfg.viewer.pos, self.cfg.viewer.lookat) + self._init_buffers() + self._prepare_reward_function() + self.init_done = True + + def step(self, actions): + """ Apply actions, simulate, call self.post_physics_step() + + Args: + actions (torch.Tensor): Tensor of shape (num_envs, num_actions_per_env) + """ + + clip_actions = self.cfg.normalization.clip_actions + self.actions = torch.clip(actions, -clip_actions, clip_actions).to(self.device) + + self.action_buffer = torch.cat((self.actions[:, None, :], self.action_buffer[:, :-1]), dim=1) + + # step physics and render each frame + self.render() + for decimation_step in range(self.cfg.control.decimation): + # delays + curr_target_idx = torch.ceil( + (self.action_delay_steps - decimation_step) / self.cfg.control.decimation + ).long() + assert (curr_target_idx >= 0).all(), "the action inx must not be negative" + + self.actions_c = self.action_buffer[torch.arange(self.num_envs, device=self.device).unsqueeze(-1), + curr_target_idx,].squeeze() + self.torques = self._compute_torques(self.actions_c).view(self.torques.shape) + self.gym.set_dof_actuation_force_tensor(self.sim, gymtorch.unwrap_tensor(self.torques)) + self.gym.simulate(self.sim) + if self.cfg.env.test: + elapsed_time = self.gym.get_elapsed_time(self.sim) + sim_time = self.gym.get_sim_time(self.sim) + if sim_time-elapsed_time>0: + time.sleep(sim_time-elapsed_time) + + if self.device == 'cpu': + self.gym.fetch_results(self.sim, True) + self.gym.refresh_dof_state_tensor(self.sim) + self.post_physics_step() + + # return clipped obs, clipped states (None), rewards, dones and infos + clip_obs = self.cfg.normalization.clip_observations + self.obs_buf = torch.clip(self.obs_buf, -clip_obs, clip_obs) + if self.privileged_obs_buf is not None: + self.privileged_obs_buf = torch.clip(self.privileged_obs_buf, -clip_obs, clip_obs) + return self.obs_buf, self.privileged_obs_buf, self.rew_buf, self.reset_buf, self.extras + + def post_physics_step(self): + """ check terminations, compute observations and rewards + calls self._post_physics_step_callback() for common computations + calls self._draw_debug_vis() if needed + """ + self.gym.refresh_actor_root_state_tensor(self.sim) + self.gym.refresh_net_contact_force_tensor(self.sim) + self.gym.refresh_rigid_body_state_tensor(self.sim) + + self.episode_length_buf += 1 + self.common_step_counter += 1 + + # prepare quantities + self.base_pos[:] = self.root_states[:, 0:3] + self.base_quat[:] = self.root_states[:, 3:7] + self.rpy[:] = get_euler_xyz_in_tensor(self.base_quat[:]) + self.base_lin_vel[:] = quat_rotate_inverse(self.base_quat, self.root_states[:, 7:10]) + self.base_ang_vel[:] = quat_rotate_inverse(self.base_quat, self.root_states[:, 10:13]) + self.projected_gravity[:] = quat_rotate_inverse(self.base_quat, self.gravity_vec) + + self._post_physics_step_callback() + + # compute observations, rewards, resets, ... + self.check_termination() + self.compute_reward() + env_ids = self.reset_buf.nonzero(as_tuple=False).flatten() + self.reset_idx(env_ids) + self.compute_observations() # in some cases a simulation step might be required to refresh some obs (for example body positions) + + self.last_last_actions[:] = self.last_actions[:] + self.last_actions[:] = self.actions[:] + self.last_dof_vel[:] = self.dof_vel[:] + self.last_root_vel[:] = self.root_states[:, 7:13] + + def check_termination(self): + """ Check if environments need to be reset + """ + self.reset_buf = torch.any(torch.norm(self.contact_forces[:, self.termination_contact_indices, :], dim=-1) > 1., dim=1) + self.reset_buf |= torch.logical_or(torch.abs(self.rpy[:,1])>1.0, torch.abs(self.rpy[:,0])>0.8) + self.time_out_buf = self.episode_length_buf > self.max_episode_length # no terminal reward for time-outs + self.reset_buf |= self.time_out_buf + + def reset_idx(self, env_ids): + """ Reset some environments. + Calls self._reset_dofs(env_ids), self._reset_root_states(env_ids), and self._resample_commands(env_ids) + [Optional] calls self._update_terrain_curriculum(env_ids), self.update_command_curriculum(env_ids) and + Logs episode info + Resets some buffers + + Args: + env_ids (list[int]): List of environment ids which must be reset + """ + if len(env_ids) == 0: + return + + # reset robot states + self._reset_dofs(env_ids) + self._reset_root_states(env_ids) + + self._resample_commands(env_ids) + + self._reset_dof_props(env_ids) + + # reset action delays + self.action_delay_steps[env_ids] = torch.randint_like(self.action_delay_steps_init, high=(int(self.action_delay_steps_init.max().item())+1))[env_ids] + + # reset buffers + self.last_last_actions[env_ids] = 0. + self.last_actions[env_ids] = 0. + self.last_dof_vel[env_ids] = 0. + self.feet_air_time[env_ids] = 0. + self.single_contact_time[env_ids] = 0. + self.episode_length_buf[env_ids] = 0 + self.reset_buf[env_ids] = 1 + self.obs_hist_buffer[env_ids] = 0. + self.privileged_obs_hist_buffer[env_ids] = 0. + # fill extras + self.extras["episode"] = {} + for key in self.episode_sums.keys(): + self.extras["episode"]['rew_' + key] = torch.mean(self.episode_sums[key][env_ids]) / self.max_episode_length_s + self.episode_sums[key][env_ids] = 0. + if self.cfg.commands.curriculum: + self.extras["episode"]["max_command_x"] = self.command_ranges["lin_vel_x"][1] + # send timeout info to the algorithm + if self.cfg.env.send_timeouts: + self.extras["time_outs"] = self.time_out_buf + + # fix reset gravity bug + self.base_quat[env_ids] = self.root_states[env_ids, 3:7] + self.rpy = get_euler_xyz_in_tensor(self.base_quat[:]) + self.projected_gravity[env_ids] = quat_rotate_inverse(self.base_quat[env_ids], self.gravity_vec[env_ids]) + + def compute_reward(self): + """ Compute rewards + Calls each reward function which had a non-zero scale (processed in self._prepare_reward_function()) + adds each terms to the episode sums and to the total reward + """ + self.rew_buf[:] = 0. + for i in range(len(self.reward_functions)): + name = self.reward_names[i] + rew = self.reward_functions[i]() * self.reward_scales[name] + self.rew_buf += rew + self.episode_sums[name] += rew + if self.cfg.rewards.only_positive_rewards: + self.rew_buf[:] = torch.clip(self.rew_buf[:], min=0.) + # add termination reward after clipping + if "termination" in self.reward_scales: + rew = self._reward_termination() * self.reward_scales["termination"] + self.rew_buf += rew + self.episode_sums["termination"] += rew + + def _get_phase(self): + ref_cycle_time = self.cfg.rewards.cycle_time + #if torch.sqrt(self.commands[:,0]**2 + self.commands[:,1]**2) < 1: + #time_scale = 1 + #else: + time_scale = torch.sqrt(self.commands[:,0]**2 + self.commands[:,1]**2)# / (2.236-1) + 1 + time_scale = torch.where(time_scale<1.2, ref_cycle_time, ref_cycle_time/2) + phase = self.episode_length_buf * self.dt / time_scale + return phase + + def _get_gait_phase(self): + # return float mask 1 is stance, 0 is swing + phase = self._get_phase() + sin_pos = torch.sin(2 * torch.pi * phase) + # Add double support phase + stance_mask = torch.zeros((self.num_envs, 2), device=self.device) + # left foot stance + stance_mask[:, 0] = sin_pos >= 0 + # right foot stance + stance_mask[:, 1] = sin_pos < 0 + # Double support phase + stance_mask[torch.abs(sin_pos) < 0.1] = 1 + #if self.status == "standing": + #stance_mask[:] = 1 + + return stance_mask + + + def compute_ref_state(self): + phase = self._get_phase() + sin_pos = torch.sin(2 * torch.pi * phase) + sin_pos_l = sin_pos.clone() + sin_pos_r = sin_pos.clone() + #print('ww') + #print(self.dof_pos.shape) + self.ref_dof_pos = torch.zeros_like(self.dof_pos) + scale_1 = self.cfg.rewards.target_joint_pos_scale + scale_2 = 2 * scale_1 + # left foot stance phase set to default joint pos + sin_pos_l[sin_pos_l > 0] = 0 + self.ref_dof_pos[:, 2] = sin_pos_l * scale_1 + self.ref_dof_pos[:, 3] = -1 * sin_pos_l * scale_2 + self.ref_dof_pos[:, 4] = sin_pos_l * scale_1 + # right foot stance phase set to default joint pos + sin_pos_r[sin_pos_r < 0] = 0 + self.ref_dof_pos[:, 6] = -1 * sin_pos_r * scale_1 + self.ref_dof_pos[:, 9] = sin_pos_r * scale_2 + self.ref_dof_pos[:, 10] = -1 * sin_pos_r * scale_1 + # Double support phase + self.ref_dof_pos[torch.abs(sin_pos) < 0.1] = 0 + + self.ref_action = 2 * self.ref_dof_pos + + def compute_observations(self): + """ Computes observations + """ + batch, _ = self.commands.shape + random_pos = torch_rand_float(-2,2,(batch,37), device=self.device).squeeze(1) + random_vel = torch_rand_float(-0.5,0.5,(batch,9), device=self.device).squeeze(1) + + phase = self._get_phase() + self.compute_ref_state() + sin_pos = torch.sin(2 * torch.pi * phase).unsqueeze(1) + cos_pos = torch.cos(2 * torch.pi * phase).unsqueeze(1) + stance_mask = self._get_gait_phase() + contact_mask = self.contact_forces[:, self.feet_indices, 2] > 5. + self.command_input = torch.cat( + (sin_pos, cos_pos, self.commands[:, :3] * self.commands_scale), dim=1) + + obs_buf = torch.cat(( + self.base_ang_vel * self.obs_scales.ang_vel, + self.projected_gravity, + self.command_input, + (self.dof_pos - self.default_dof_pos) * self.obs_scales.dof_pos, + #random_pos[:,:9].cuda() * self.obs_scales.dof_pos, + self.dof_vel * self.obs_scales.dof_vel, + #random_vel.cuda() * self.obs_scales.dof_vel, + self.actions, + random_pos[:,:21].cuda() + ),dim=-1) + + + privileged_obs_buf = torch.cat(( + self.base_lin_vel * self.obs_scales.lin_vel, + self.base_ang_vel * self.obs_scales.ang_vel, + self.projected_gravity, + self.command_input, + (self.dof_pos - self.default_dof_pos) * self.obs_scales.dof_pos, + #random_pos[:,:9].cuda(), + self.dof_vel * self.obs_scales.dof_vel, + #random_vel.cuda(), + self.actions, + random_pos[:,:21].cuda() + ),dim=-1) + + # add perceptive inputs if not blind + # add noise if needed + if self.add_noise: + obs_buf += (2 * torch.rand_like(obs_buf) - 1) * self.noise_scale_vec + + self.obs_hist_buffer = torch.cat(( + self.obs_hist_buffer[:, 1:], + obs_buf[:, None, :] + ), dim=1) + + self.privileged_obs_hist_buffer = torch.cat(( + self.privileged_obs_hist_buffer[:, 1:], + privileged_obs_buf[:, None, :] + ), dim=1) + + self.obs_buf = self.obs_hist_buffer.reshape((self.num_envs, self.num_obs)) + self.privileged_obs_buf = self.privileged_obs_hist_buffer.reshape((self.num_envs, self.num_privileged_obs)) + + def create_sim(self): + """ Creates simulation, terrain and evironments + """ + self.up_axis_idx = 2 # 2 for z, 1 for y -> adapt gravity accordingly + self.sim = self.gym.create_sim(self.sim_device_id, self.graphics_device_id, self.physics_engine, self.sim_params) + + mesh_type = self.cfg.terrain.mesh_type + if mesh_type in ['heightfield', 'trimesh']: + self.terrain = Terrain(self.cfg.terrain, self.num_envs) + + if mesh_type == 'plane': + self._create_ground_plane() + elif mesh_type == 'heightfield': + self._create_heightfield() + elif mesh_type == 'trimesh': + self._create_trimesh() + elif mesh_type is not None: + raise ValueError("Terrain mesh type not recognised. Allowed types are [None, plane, heightfield, trimesh]") + # self._create_ground_plane() + self._create_envs() + + def set_camera(self, position, lookat): + """ Set camera position and direction + """ + cam_pos = gymapi.Vec3(position[0], position[1], position[2]) + cam_target = gymapi.Vec3(lookat[0], lookat[1], lookat[2]) + self.gym.viewer_camera_look_at(self.viewer, None, cam_pos, cam_target) + + #------------- Callbacks -------------- + def _process_rigid_shape_props(self, props, env_id): + """ Callback allowing to store/change/randomize the rigid shape properties of each environment. + Called During environment creation. + Base behavior: randomizes the friction of each environment + + Args: + props (List[gymapi.RigidShapeProperties]): Properties of each shape of the asset + env_id (int): Environment id + + Returns: + [List[gymapi.RigidShapeProperties]]: Modified rigid shape properties + """ + if self.cfg.domain_rand.randomize_friction: + if env_id==0: + # prepare friction randomization + friction_range = self.cfg.domain_rand.friction_range + num_buckets = 64 + bucket_ids = torch.randint(0, num_buckets, (self.num_envs, 1)) + friction_buckets = torch_rand_float(friction_range[0], friction_range[1], (num_buckets,1), device='cpu') + self.friction_coeffs = friction_buckets[bucket_ids] + + for s in range(len(props)): + props[s].friction = self.friction_coeffs[env_id] + return props + + def _process_dof_props(self, props, env_id): + """ Callback allowing to store/change/randomize the DOF properties of each environment. + Called During environment creation. + Base behavior: stores position, velocity and torques limits defined in the URDF + + Args: + props (numpy.array): Properties of each DOF of the asset + env_id (int): Environment id + + Returns: + [numpy.array]: Modified DOF properties + """ + if env_id==0: + self.dof_pos_limits = torch.zeros(self.num_dof, 2, dtype=torch.float, device=self.device, requires_grad=False) + self.dof_vel_limits = torch.zeros(self.num_dof, dtype=torch.float, device=self.device, requires_grad=False) + self.torque_limits = torch.zeros(self.num_dof, dtype=torch.float, device=self.device, requires_grad=False) + for i in range(len(props)): + self.dof_pos_limits[i, 0] = props["lower"][i].item() * self.cfg.safety.pos_limit + self.dof_pos_limits[i, 1] = props["upper"][i].item() * self.cfg.safety.pos_limit + self.dof_vel_limits[i] = props["velocity"][i].item() * self.cfg.safety.vel_limit + self.torque_limits[i] = props["effort"][i].item() * self.cfg.safety.torque_limit + # soft limits + m = (self.dof_pos_limits[i, 0] + self.dof_pos_limits[i, 1]) / 2 + r = self.dof_pos_limits[i, 1] - self.dof_pos_limits[i, 0] + self.dof_pos_limits[i, 0] = m - 0.5 * r * self.cfg.rewards.soft_dof_pos_limit + self.dof_pos_limits[i, 1] = m + 0.5 * r * self.cfg.rewards.soft_dof_pos_limit + return props + + def _process_rigid_body_props(self, props, env_id): + # if env_id==0: + # sum = 0 + # for i, p in enumerate(props): + # sum += p.mass + # print(f"Mass of body {i}: {p.mass} (before randomization)") + # print(f"Total mass {sum} (before randomization)") + # randomize base mass + if self.cfg.domain_rand.randomize_base_mass: + rng = self.cfg.domain_rand.added_mass_range + props[0].mass += np.random.uniform(rng[0], rng[1]) + return props + + def _post_physics_step_callback(self): + """ Callback called before computing terminations, rewards, and observations + Default behaviour: Compute ang vel command based on target and heading, compute measured terrain heights and randomly push robots + """ + # + env_ids = (self.episode_length_buf % int(self.cfg.commands.resampling_time / self.dt)==0).nonzero(as_tuple=False).flatten() + self._resample_commands(env_ids) + if self.cfg.commands.heading_command: + forward = quat_apply(self.base_quat, self.forward_vec) + heading = torch.atan2(forward[:, 1], forward[:, 0]) + self.commands[:, 2] = torch.clip(0.5*wrap_to_pi(self.commands[:, 3] - heading), -1., 1.) + + if self.cfg.domain_rand.push_robots and (self.common_step_counter % self.cfg.domain_rand.push_interval == 0): + self._push_robots() + + # randomize dof properties + env_ids = (self.episode_length_buf % int(self.cfg.domain_rand.dof_prop_rand_interval) == 0).nonzero( + as_tuple=False).flatten() + self._reset_dof_props(env_ids) + + if self.cfg.domain_rand.randomize_gravity and (self.common_step_counter % int(self.cfg.domain_rand.gravity_rand_interval) == 0): + self._randomize_gravity() + if self.cfg.domain_rand.randomize_gravity and (int(self.common_step_counter - self.cfg.domain_rand.gravity_rand_duration) % int( + self.cfg.domain_rand.gravity_rand_interval)) == 0: + self._randomize_gravity(torch.tensor([0, 0, 0])) + + def _resample_commands(self, env_ids): + """ Randommly select commands of some environments + + Args: + env_ids (List[int]): Environments ids for which new commands are needed + """ + self.commands[env_ids, 0] = torch_rand_float(self.command_ranges["lin_vel_x"][0], self.command_ranges["lin_vel_x"][1], (len(env_ids), 1), device=self.device).squeeze(1) + self.commands[env_ids, 1] = torch_rand_float(self.command_ranges["lin_vel_y"][0], self.command_ranges["lin_vel_y"][1], (len(env_ids), 1), device=self.device).squeeze(1) + if self.cfg.commands.heading_command: + self.commands[env_ids, 3] = torch_rand_float(self.command_ranges["heading"][0], self.command_ranges["heading"][1], (len(env_ids), 1), device=self.device).squeeze(1) + else: + self.commands[env_ids, 2] = torch_rand_float(self.command_ranges["ang_vel_yaw"][0], self.command_ranges["ang_vel_yaw"][1], (len(env_ids), 1), device=self.device).squeeze(1) + + # set small commands to zero + self.commands[env_ids, :2] *= (torch.norm(self.commands[env_ids, :2], dim=1) > 0.2).unsqueeze(1) + + def _compute_torques(self, actions): + """ Compute torques from actions. + Actions can be interpreted as position or velocity targets given to a PD controller, or directly as scaled torques. + [NOTE]: torques must have the same dimension as the number of DOFs, even if some DOFs are not actuated. + + Args: + actions (torch.Tensor): Actions + + Returns: + [torch.Tensor]: Torques sent to the simulation + """ + #pd controller + actions_scaled = actions * self.cfg.control.action_scale + actions_scaled[:, [0, 2, 6, 8]] *= self.cfg.control.hip_scale_reduction # scale down hip flexion range + control_type = self.cfg.control.control_type + if control_type == "actuator_net": + self.joint_pos_err = self.dof_pos - (actions_scaled + self.default_dof_pos) + self.motor_offsets + self.joint_vel = self.dof_vel + torques = self.actuator_network(self.joint_pos_err, self.joint_pos_err_last, self.joint_pos_err_last_last, + self.joint_vel, self.joint_vel_last, self.joint_vel_last_last) + self.joint_pos_err_last_last = torch.clone(self.joint_pos_err_last) + self.joint_pos_err_last = torch.clone(self.joint_pos_err) + self.joint_vel_last_last = torch.clone(self.joint_vel_last) + self.joint_vel_last = torch.clone(self.joint_vel) + elif control_type=="P": +# print(self.kp_factors.shape) +# print(self.default_dof_pos.shape) +# print(self.motor_offsets.shape) +# print(self.dof_vel.shape) + torques = self.p_gains*self.kp_factors*(actions_scaled + self.default_dof_pos - self.dof_pos + self.motor_offsets) - self.d_gains*self.kd_factors*self.dof_vel + elif control_type=="V": + torques = self.p_gains*(actions_scaled - self.dof_vel) - self.d_gains*(self.dof_vel - self.last_dof_vel)/self.sim_params.dt + elif control_type=="T": + torques = actions_scaled + else: + raise NameError(f"Unknown controller type: {control_type}") + + torques = torques * self.motor_strengths + return torch.clip(torques, -self.torque_limits, self.torque_limits) + + def _reset_dofs(self, env_ids): + """ Resets DOF position and velocities of selected environmments + Positions are randomly selected within 0.5:1.5 x default positions. + Velocities are set to zero. + + Args: + env_ids (List[int]): Environemnt ids + """ + self.dof_pos[env_ids] = self.default_dof_pos * torch_rand_float(0.5, 1.5, (len(env_ids), self.num_dof), device=self.device) + + env_ids_int32 = env_ids.to(dtype=torch.int32) + self.gym.set_dof_state_tensor_indexed(self.sim, + gymtorch.unwrap_tensor(self.dof_state), + gymtorch.unwrap_tensor(env_ids_int32), len(env_ids_int32)) + + def _reset_root_states(self, env_ids): + """ Resets ROOT states position and velocities of selected environmments + Sets base position based on the curriculum + Selects randomized base velocities within -0.5:0.5 [m/s, rad/s] + Args: + env_ids (List[int]): Environemnt ids + """ + # base position + if self.custom_origins: + self.root_states[env_ids] = self.base_init_state + self.root_states[env_ids, :3] += self.env_origins[env_ids] + self.root_states[env_ids, :2] += torch_rand_float(-1., 1., (len(env_ids), 2), device=self.device) # xy position within 1m of the center + else: + self.root_states[env_ids] = self.base_init_state + self.root_states[env_ids, :3] += self.env_origins[env_ids] + # base velocities + self.root_states[env_ids, 7:13] = torch_rand_float(-0.5, 0.5, (len(env_ids), 6), device=self.device) # [7:10]: lin vel, [10:13]: ang vel + env_ids_int32 = env_ids.to(dtype=torch.int32) + self.gym.set_actor_root_state_tensor_indexed(self.sim, + gymtorch.unwrap_tensor(self.root_states), + gymtorch.unwrap_tensor(env_ids_int32), len(env_ids_int32)) + + def _reset_dof_props(self, env_ids): + if self.cfg.domain_rand.randomize_pd_params: + min_Kp_factor, max_Kp_factor = self.cfg.domain_rand.kp_ratio_range + self.kp_factors[env_ids, :] = torch.rand(len(env_ids), dtype=torch.float, device=self.device, + requires_grad=False).unsqueeze(1) * ( + max_Kp_factor - min_Kp_factor) + min_Kp_factor + + min_Kd_factor, max_Kd_factor = self.cfg.domain_rand.kd_ratio_range + self.kd_factors[env_ids, :] = torch.rand(len(env_ids), dtype=torch.float, device=self.device, + requires_grad=False).unsqueeze(1) * ( + max_Kd_factor - min_Kd_factor) + min_Kd_factor + + if self.cfg.domain_rand.randomize_motor_strength: + min_strength, max_strength = self.cfg.domain_rand.motor_strength_range + self.motor_strengths[env_ids, :] = torch.rand(len(env_ids), dtype=torch.float, device=self.device, + requires_grad=False).unsqueeze(1) * ( + max_strength - min_strength) + min_strength + if self.cfg.domain_rand.randomize_motor_offset: + min_offset, max_offset = self.cfg.domain_rand.motor_offset_range + self.motor_offsets[env_ids, :] = torch.rand(len(env_ids), self.num_dof, dtype=torch.float, + device=self.device, requires_grad=False) * ( + max_offset - min_offset) + min_offset + + def _randomize_gravity(self, external_force = None): + + if external_force is not None: + self.gravities[:, :] = external_force.unsqueeze(0) + elif self.cfg.domain_rand.randomize_gravity: + min_gravity, max_gravity = self.cfg.domain_rand.gravity_range + external_force = torch.rand(3, dtype=torch.float, device=self.device, + requires_grad=False) * (max_gravity - min_gravity) + min_gravity + + self.gravities[:, :] = external_force.unsqueeze(0) + + sim_params = self.gym.get_sim_params(self.sim) + gravity = self.gravities[0, :] + torch.Tensor([0, 0, -9.8]).to(self.device) + self.gravity_vec[:, :] = gravity.unsqueeze(0) / torch.norm(gravity) + sim_params.gravity = gymapi.Vec3(gravity[0], gravity[1], gravity[2]) + self.gym.set_sim_params(self.sim, sim_params) + + def _push_robots(self): + """ Random pushes the robots. Emulates an impulse by setting a randomized base velocity. + """ + max_vel = self.cfg.domain_rand.max_push_vel_xy + self.root_states[:, 7:9] = torch_rand_float(-max_vel, max_vel, (self.num_envs, 2), device=self.device) # lin vel x/y + self.gym.set_actor_root_state_tensor(self.sim, gymtorch.unwrap_tensor(self.root_states)) + + def update_command_curriculum(self, env_ids): + """ Implements a curriculum of increasing commands + + Args: + env_ids (List[int]): ids of environments being reset + """ + # If the tracking reward is above 80% of the maximum, increase the range of commands + if torch.mean(self.episode_sums["tracking_lin_vel"][env_ids]) / self.max_episode_length > 0.8 * self.reward_scales["tracking_lin_vel"]: + self.command_ranges["lin_vel_x"][0] = np.clip(self.command_ranges["lin_vel_x"][0] - 0.5, -self.cfg.commands.max_curriculum, 0.) + self.command_ranges["lin_vel_x"][1] = np.clip(self.command_ranges["lin_vel_x"][1] + 0.5, 0., self.cfg.commands.max_curriculum) + + + def _get_noise_scale_vec(self, cfg): + """ Sets a vector used to scale the noise added to the observations. + [NOTE]: Must be adapted when changing the observations structure + + Args: + cfg (Dict): Environment config file + + Returns: + [torch.Tensor]: Vector of scales used to multiply a uniform distribution in [-1, 1] + """ + # noise_vec = torch.zeros_like(self.obs_buf[0]) + noise_vec = torch.zeros_like(self.obs_hist_buffer[0][0]) + self.add_noise = self.cfg.noise.add_noise + noise_scales = self.cfg.noise.noise_scales + noise_level = self.cfg.noise.noise_level + # noise_vec[:3] = noise_scales.lin_vel * noise_level * self.obs_scales.lin_vel + # noise_vec[3:6] = noise_scales.ang_vel * noise_level * self.obs_scales.ang_vel + # noise_vec[6:9] = noise_scales.gravity * noise_level + # noise_vec[9:12] = 0. # commands + # noise_vec[12:12+self.num_actions] = noise_scales.dof_pos * noise_level * self.obs_scales.dof_pos + # noise_vec[12+self.num_actions:12+2*self.num_actions] = noise_scales.dof_vel * noise_level * self.obs_scales.dof_vel + # noise_vec[12+2*self.num_actions:12+3*self.num_actions] = 0. # previous actions + + noise_vec[:3] = noise_scales.ang_vel * noise_level * self.obs_scales.ang_vel + noise_vec[3:6] = noise_scales.gravity * noise_level + noise_vec[6:9] = 0. # commands + noise_vec[9:9+self.num_actions] = noise_scales.dof_pos * noise_level * self.obs_scales.dof_pos + noise_vec[9+self.num_actions:9+2*self.num_actions] = noise_scales.dof_vel * noise_level * self.obs_scales.dof_vel + noise_vec[9+2*self.num_actions:9+3*self.num_actions] = 0. # previous actions + + return noise_vec + + #---------------------------------------- + def _init_buffers(self): + """ Initialize torch tensors which will contain simulation states and processed quantities + """ + # get gym GPU state tensors + actor_root_state = self.gym.acquire_actor_root_state_tensor(self.sim) + dof_state_tensor = self.gym.acquire_dof_state_tensor(self.sim) + rigid_body_states_tensor = self.gym.acquire_rigid_body_state_tensor(self.sim) + net_contact_forces = self.gym.acquire_net_contact_force_tensor(self.sim) + self.gym.refresh_dof_state_tensor(self.sim) + self.gym.refresh_actor_root_state_tensor(self.sim) + self.gym.refresh_net_contact_force_tensor(self.sim) + + # create some wrapper tensors for different slices + self.root_states = gymtorch.wrap_tensor(actor_root_state) + self.dof_state = gymtorch.wrap_tensor(dof_state_tensor) + self.rigid_body_states = gymtorch.wrap_tensor(rigid_body_states_tensor).view(self.num_envs, self.num_bodies, -1) + self.dof_pos = self.dof_state.view(self.num_envs, self.num_dof, 2)[..., 0] + self.dof_vel = self.dof_state.view(self.num_envs, self.num_dof, 2)[..., 1] + self.base_quat = self.root_states[:, 3:7] + self.rpy = get_euler_xyz_in_tensor(self.base_quat) + self.base_pos = self.root_states[:self.num_envs, 0:3] + self.contact_forces = gymtorch.wrap_tensor(net_contact_forces).view(self.num_envs, -1, 3) # shape: num_envs, num_bodies, xyz axis + + self.ref_dof_pos = torch.zeros_like(self.dof_pos, device=self.device) + + # initialize some data used later on + + self.obs_hist_buffer = torch.zeros(self.num_envs, self.cfg.env.obs_context_len, self.cfg.env.num_observations_single, device=self.device) + self.privileged_obs_hist_buffer = torch.zeros(self.num_envs, self.cfg.env.obs_context_len, self.cfg.env.num_privileged_obs_single, device=self.device) + + self.common_step_counter = 0 + self.extras = {} + self.noise_scale_vec = self._get_noise_scale_vec(self.cfg) + + self.forward_vec = to_torch([1., 0., 0.], device=self.device).repeat((self.num_envs, 1)) + self.torques = torch.zeros(self.num_envs, self.num_actions, dtype=torch.float, device=self.device, requires_grad=False) + self.p_gains = torch.zeros(self.num_actions, dtype=torch.float, device=self.device, requires_grad=False) + self.d_gains = torch.zeros(self.num_actions, dtype=torch.float, device=self.device, requires_grad=False) + self.actions = torch.zeros(self.num_envs, self.num_actions, dtype=torch.float, device=self.device, requires_grad=False) + self.last_actions = torch.zeros(self.num_envs, self.num_actions, dtype=torch.float, device=self.device, requires_grad=False) + self.last_last_actions = torch.zeros(self.num_envs, self.num_actions, dtype=torch.float, device=self.device, requires_grad=False) + self.last_dof_vel = torch.zeros_like(self.dof_vel) + self.last_root_vel = torch.zeros_like(self.root_states[:, 7:13]) + self.commands = torch.zeros(self.num_envs, self.cfg.commands.num_commands, dtype=torch.float, device=self.device, requires_grad=False) # x vel, y vel, yaw vel, heading + self.commands_scale = torch.tensor([self.obs_scales.lin_vel, self.obs_scales.lin_vel, self.obs_scales.ang_vel], device=self.device, requires_grad=False,) # TODO change this + self.feet_air_time = torch.zeros(self.num_envs, self.feet_indices.shape[0], dtype=torch.float, device=self.device, requires_grad=False) + self.single_contact_time = torch.zeros(self.num_envs, dtype=torch.float, device=self.device, requires_grad=False) + self.last_contacts = torch.zeros(self.num_envs, len(self.feet_indices), dtype=torch.bool, device=self.device, requires_grad=False) + self.base_lin_vel = quat_rotate_inverse(self.base_quat, self.root_states[:, 7:10]) + self.base_ang_vel = quat_rotate_inverse(self.base_quat, self.root_states[:, 10:13]) + self.projected_gravity = quat_rotate_inverse(self.base_quat, self.gravity_vec) + + # action delay + action_delay = self.cfg.env.action_delay + if action_delay is not None: + ctrl_delay = torch.tensor(action_delay, device=self.device) + assert torch.allclose( + torch.round(ctrl_delay / self.sim_params.dt), + ctrl_delay / self.sim_params.dt, + ), "ctrl_delay must be a multiple of the simulation dt" + assert (ctrl_delay >= 0).all(), "ctrl_delay can not be negative" + self.action_delay_steps_init = torch.round(ctrl_delay / self.sim_params.dt) + else: + self.action_delay_steps_init = torch.tensor(0, device=self.device, dtype=torch.int32) + + self.action_delay_steps_init = self.action_delay_steps_init.repeat(self.num_envs, 1) + self.action_delay_steps = self.action_delay_steps_init.clone() + + self.action_buffer = torch.zeros((self.num_envs, self.cfg.control.decimation, self.num_actions), device=self.device) + + # joint positions offsets and PD gains + self.default_dof_pos = torch.zeros(self.num_dof, dtype=torch.float, device=self.device, requires_grad=False) + print('ioioioo') + print(self.num_dofs) + for i in range(self.num_dofs): + name = self.dof_names[i] + angle = self.cfg.init_state.default_joint_angles[name] + self.default_dof_pos[i] = angle + found = False + #print(self.cfg.control.stiffness.keys()) + for dof_name in self.cfg.control.stiffness.keys(): + if dof_name in name: + #print(i) + self.p_gains[i] = self.cfg.control.stiffness[dof_name] + self.d_gains[i] = self.cfg.control.damping[dof_name] + found = True + if not found: + self.p_gains[i] = 0. + self.d_gains[i] = 0. + if self.cfg.control.control_type in ["P", "V"]: + print(f"PD gain of joint {name} were not defined, setting them to zero") + + # self.init_p_gains = self.init_p_gains.repeat(self.num_envs, 1) + # self.init_d_gains = self.init_d_gains.repeat(self.num_envs, 1) + + # self.p_gains = self.init_p_gains.clone() + # self.d_gains = self.init_d_gains.clone() + + self.default_dof_pos = self.default_dof_pos.unsqueeze(0) + + # feet height for gait + self.last_feet_z = 0.05 + self.feet_height = torch.zeros((self.num_envs, 2), device=self.device) + + def _init_custom_buffers__(self): + # domain randomization properties + self.motor_strengths = torch.ones(self.num_envs, self.num_dof, dtype=torch.float, device=self.device, + requires_grad=False) + self.motor_offsets = torch.zeros(self.num_envs, self.num_dof, dtype=torch.float, device=self.device, + requires_grad=False) + + self.gravity_vec = to_torch(get_axis_params(-1., self.up_axis_idx), device=self.device).repeat((self.num_envs, 1)) + self.gravities = torch.zeros(self.num_envs, 3, dtype=torch.float, device=self.device, + requires_grad=False) + + self.kp_factors = torch.ones(self.num_envs, self.num_dof, dtype=torch.float, device=self.device, + requires_grad=False) + self.kd_factors = torch.ones(self.num_envs, self.num_dof, dtype=torch.float, device=self.device, + requires_grad=False) + + def _prepare_reward_function(self): + """ Prepares a list of reward functions, whcih will be called to compute the total reward. + Looks for self._reward_, where are names of all non zero reward scales in the cfg. + """ + # remove zero scales + multiply non-zero ones by dt + for key in list(self.reward_scales.keys()): + scale = self.reward_scales[key] + if scale==0: + self.reward_scales.pop(key) + else: + self.reward_scales[key] *= self.dt + # prepare list of functions + self.reward_functions = [] + self.reward_names = [] + for name, scale in self.reward_scales.items(): + if name=="termination": + continue + self.reward_names.append(name) + name = '_reward_' + name + self.reward_functions.append(getattr(self, name)) + + # reward episode sums + self.episode_sums = {name: torch.zeros(self.num_envs, dtype=torch.float, device=self.device, requires_grad=False) + for name in self.reward_scales.keys()} + + def _create_ground_plane(self): + """ Adds a ground plane to the simulation, sets friction and restitution based on the cfg. + """ + plane_params = gymapi.PlaneParams() + plane_params.normal = gymapi.Vec3(0.0, 0.0, 1.0) + plane_params.static_friction = self.cfg.terrain.static_friction + plane_params.dynamic_friction = self.cfg.terrain.dynamic_friction + plane_params.restitution = self.cfg.terrain.restitution + self.gym.add_ground(self.sim, plane_params) + + def _create_heightfield(self): + """ Adds a heightfield terrain to the simulation, sets parameters based on the cfg. + """ + hf_params = gymapi.HeightFieldParams() + hf_params.column_scale = self.terrain.cfg.horizontal_scale + hf_params.row_scale = self.terrain.cfg.horizontal_scale + hf_params.vertical_scale = self.terrain.cfg.vertical_scale + hf_params.nbRows = self.terrain.tot_cols + hf_params.nbColumns = self.terrain.tot_rows + hf_params.transform.p.x = -self.terrain.cfg.border_size + hf_params.transform.p.y = -self.terrain.cfg.border_size + hf_params.transform.p.z = 0.0 + hf_params.static_friction = self.cfg.terrain.static_friction + hf_params.dynamic_friction = self.cfg.terrain.dynamic_friction + hf_params.restitution = self.cfg.terrain.restitution + + print(self.terrain.heightsamples.shape, hf_params.nbRows, hf_params.nbColumns) + + self.gym.add_heightfield(self.sim, self.terrain.heightsamples.T, hf_params) + self.height_samples = torch.tensor(self.terrain.heightsamples).view(self.terrain.tot_rows, + self.terrain.tot_cols).to(self.device) + + def _create_trimesh(self): + """ Adds a triangle mesh terrain to the simulation, sets parameters based on the cfg. + # """ + tm_params = gymapi.TriangleMeshParams() + tm_params.nb_vertices = self.terrain.vertices.shape[0] + tm_params.nb_triangles = self.terrain.triangles.shape[0] + + tm_params.transform.p.x = -self.terrain.cfg.border_size + tm_params.transform.p.y = -self.terrain.cfg.border_size + tm_params.transform.p.z = 0.0 + tm_params.static_friction = self.cfg.terrain.static_friction + tm_params.dynamic_friction = self.cfg.terrain.dynamic_friction + tm_params.restitution = self.cfg.terrain.restitution + self.gym.add_triangle_mesh(self.sim, self.terrain.vertices.flatten(order='C'), + self.terrain.triangles.flatten(order='C'), tm_params) + self.height_samples = torch.tensor(self.terrain.heightsamples).view(self.terrain.tot_rows, + self.terrain.tot_cols).to(self.device) + + def _create_envs(self): + """ Creates environments: + 1. loads the robot URDF/MJCF asset, + 2. For each environment + 2.1 creates the environment, + 2.2 calls DOF and Rigid shape properties callbacks, + 2.3 create actor with these properties and add them to the env + 3. Store indices of different bodies of the robot + """ + asset_path = self.cfg.asset.file.format(LEGGED_GYM_ROOT_DIR=LEGGED_GYM_ROOT_DIR) + asset_root = os.path.dirname(asset_path) + asset_file = os.path.basename(asset_path) + + asset_options = gymapi.AssetOptions() + asset_options.default_dof_drive_mode = self.cfg.asset.default_dof_drive_mode + asset_options.collapse_fixed_joints = self.cfg.asset.collapse_fixed_joints + asset_options.replace_cylinder_with_capsule = self.cfg.asset.replace_cylinder_with_capsule + asset_options.flip_visual_attachments = self.cfg.asset.flip_visual_attachments + asset_options.fix_base_link = self.cfg.asset.fix_base_link + asset_options.density = self.cfg.asset.density + asset_options.angular_damping = self.cfg.asset.angular_damping + asset_options.linear_damping = self.cfg.asset.linear_damping + asset_options.max_angular_velocity = self.cfg.asset.max_angular_velocity + asset_options.max_linear_velocity = self.cfg.asset.max_linear_velocity + asset_options.armature = self.cfg.asset.armature + asset_options.thickness = self.cfg.asset.thickness + asset_options.disable_gravity = self.cfg.asset.disable_gravity + + if self.cfg.asset.vhacd_enabled: + asset_options.vhacd_enabled = True + asset_options.vhacd_params = gymapi.VhacdParams() + asset_options.vhacd_params.resolution = self.cfg.asset.vhacd_params_resolution + + robot_asset = self.gym.load_asset(self.sim, asset_root, asset_file, asset_options) + self.num_dof = self.gym.get_asset_dof_count(robot_asset) + self.num_bodies = self.gym.get_asset_rigid_body_count(robot_asset) + dof_props_asset = self.gym.get_asset_dof_properties(robot_asset) + rigid_shape_props_asset = self.gym.get_asset_rigid_shape_properties(robot_asset) + + # save body names from the asset + body_names = self.gym.get_asset_rigid_body_names(robot_asset) + self.dof_names = self.gym.get_asset_dof_names(robot_asset) + self.num_bodies = len(body_names) + self.num_dofs = len(self.dof_names) + feet_names = [s for s in body_names if self.cfg.asset.foot_name in s] + knee_names = [s for s in body_names if self.cfg.asset.knee_name in s] + + penalized_contact_names = [] + for name in self.cfg.asset.penalize_contacts_on: + penalized_contact_names.extend([s for s in body_names if name in s]) + termination_contact_names = [] + for name in self.cfg.asset.terminate_after_contacts_on: + termination_contact_names.extend([s for s in body_names if name in s]) + + base_init_state_list = self.cfg.init_state.pos + self.cfg.init_state.rot + self.cfg.init_state.lin_vel + self.cfg.init_state.ang_vel + self.base_init_state = to_torch(base_init_state_list, device=self.device, requires_grad=False) + start_pose = gymapi.Transform() + start_pose.p = gymapi.Vec3(*self.base_init_state[:3]) + + self._get_env_origins() + env_lower = gymapi.Vec3(0., 0., 0.) + env_upper = gymapi.Vec3(0., 0., 0.) + self.actor_handles = [] + self.envs = [] + + self._init_custom_buffers__() + self._randomize_gravity() + + for i in range(self.num_envs): + # create env instance + env_handle = self.gym.create_env(self.sim, env_lower, env_upper, int(np.sqrt(self.num_envs))) + pos = self.env_origins[i].clone() + pos[:2] += torch_rand_float(-1., 1., (2,1), device=self.device).squeeze(1) + start_pose.p = gymapi.Vec3(*pos) + + rigid_shape_props = self._process_rigid_shape_props(rigid_shape_props_asset, i) + self.gym.set_asset_rigid_shape_properties(robot_asset, rigid_shape_props) + actor_handle = self.gym.create_actor(env_handle, robot_asset, start_pose, self.cfg.asset.name, i, self.cfg.asset.self_collisions, 0) + dof_props = self._process_dof_props(dof_props_asset, i) + self.gym.set_actor_dof_properties(env_handle, actor_handle, dof_props) + body_props = self.gym.get_actor_rigid_body_properties(env_handle, actor_handle) + body_props = self._process_rigid_body_props(body_props, i) + self.gym.set_actor_rigid_body_properties(env_handle, actor_handle, body_props, recomputeInertia=True) + self.envs.append(env_handle) + self.actor_handles.append(actor_handle) + + self.feet_indices = torch.zeros(len(feet_names), dtype=torch.long, device=self.device, requires_grad=False) + for i in range(len(feet_names)): + self.feet_indices[i] = self.gym.find_actor_rigid_body_handle(self.envs[0], self.actor_handles[0], feet_names[i]) + + self.knee_indices = torch.zeros(len(knee_names), dtype=torch.long, device=self.device, requires_grad=False) + for i in range(len(knee_names)): + self.knee_indices[i] = self.gym.find_actor_rigid_body_handle(self.envs[0], self.actor_handles[0], knee_names[i]) + + self.penalised_contact_indices = torch.zeros(len(penalized_contact_names), dtype=torch.long, device=self.device, requires_grad=False) + for i in range(len(penalized_contact_names)): + self.penalised_contact_indices[i] = self.gym.find_actor_rigid_body_handle(self.envs[0], self.actor_handles[0], penalized_contact_names[i]) + + self.termination_contact_indices = torch.zeros(len(termination_contact_names), dtype=torch.long, device=self.device, requires_grad=False) + for i in range(len(termination_contact_names)): + self.termination_contact_indices[i] = self.gym.find_actor_rigid_body_handle(self.envs[0], self.actor_handles[0], termination_contact_names[i]) + + self.rigid_body_states = self.gym.get_actor_rigid_body_states(env_handle, actor_handle, gymapi.STATE_ALL) + + def _get_env_origins(self): + """ Sets environment origins. On rough terrain the origins are defined by the terrain platforms. + Otherwise create a grid. + """ + + self.custom_origins = False + self.env_origins = torch.zeros(self.num_envs, 3, device=self.device, requires_grad=False) + # create a grid of robots + num_cols = np.floor(np.sqrt(self.num_envs)) + num_rows = np.ceil(self.num_envs / num_cols) + xx, yy = torch.meshgrid(torch.arange(num_rows), torch.arange(num_cols)) + spacing = self.cfg.env.env_spacing + self.env_origins[:, 0] = spacing * xx.flatten()[:self.num_envs] + self.env_origins[:, 1] = spacing * yy.flatten()[:self.num_envs] + self.env_origins[:, 2] = 0. + + def _parse_cfg(self, cfg): + self.dt = self.cfg.control.decimation * self.sim_params.dt + self.obs_scales = self.cfg.normalization.obs_scales + self.reward_scales = class_to_dict(self.cfg.rewards.scales) + self.command_ranges = class_to_dict(self.cfg.commands.ranges) + + + self.max_episode_length_s = self.cfg.env.episode_length_s + self.max_episode_length = np.ceil(self.max_episode_length_s / self.dt) + + self.cfg.domain_rand.push_interval = np.ceil(self.cfg.domain_rand.push_interval_s / self.dt) + self.cfg.domain_rand.gravity_rand_interval = np.ceil(self.cfg.domain_rand.gravity_rand_interval_s / self.dt) + self.cfg.domain_rand.gravity_rand_duration = np.ceil( + self.cfg.domain_rand.gravity_rand_interval * self.cfg.domain_rand.gravity_impulse_duration) + self.cfg.domain_rand.dof_prop_rand_interval = np.ceil(self.cfg.domain_rand.dof_prop_rand_interval_s / self.dt) + + #------------ reward functions---------------- + def _reward_lin_vel_z(self): + # Penalize z axis base linear velocity + return torch.square(self.base_lin_vel[:, 2]) + + def _reward_ang_vel_xy(self): + # Penalize xy axes base angular velocity + return torch.sum(torch.square(self.base_ang_vel[:, :2]), dim=1) + + def _reward_orientation(self): + # Penalize non flat base orientation + return torch.sum(torch.square(self.projected_gravity[:, :2]), dim=1) + + def _reward_base_height(self): + # Penalize base height away from target + base_height = self.root_states[:, 2] + return torch.square(base_height - self.cfg.rewards.base_height_target) + + def _reward_torques(self): + # Penalize torques + return torch.sum(torch.square(self.torques), dim=1) + + def _reward_dof_vel(self): + # Penalize dof velocities + return torch.sum(torch.square(self.dof_vel), dim=1) + + def _reward_dof_acc(self): + # Penalize dof accelerations + return torch.sum(torch.square((self.last_dof_vel - self.dof_vel) / self.dt), dim=1) + + def _reward_collision(self): + # Penalize collisions on selected bodies + return torch.sum(1.*(torch.norm(self.contact_forces[:, self.penalised_contact_indices, :], dim=-1) > 0.1), dim=1) + + def _reward_termination(self): + # Terminal reward / penalty + return self.reset_buf * ~self.time_out_buf + + def _reward_dof_pos_limits(self): + # Penalize dof positions too close to the limit + out_of_limits = -(self.dof_pos - self.dof_pos_limits[:, 0]).clip(max=0.) # lower limit + out_of_limits += (self.dof_pos - self.dof_pos_limits[:, 1]).clip(min=0.) + return torch.sum(out_of_limits, dim=1) + + def _reward_dof_vel_limits(self): + # Penalize dof velocities too close to the limit + # clip to max error = 1 rad/s per joint to avoid huge penalties + return torch.sum((torch.abs(self.dof_vel) - self.dof_vel_limits*self.cfg.rewards.soft_dof_vel_limit).clip(min=0., max=1.), dim=1) + + def _reward_torque_limits(self): + # penalize torques too close to the limit + return torch.sum((torch.abs(self.torques) - self.torque_limits*self.cfg.rewards.soft_torque_limit).clip(min=0.), dim=1) + + def _reward_tracking_lin_vel(self): + # Tracking of linear velocity commands (xy axes) + lin_vel_error = torch.sum(torch.square(self.commands[:, :2] - self.base_lin_vel[:, :2]), dim=1) + return torch.exp(-lin_vel_error/self.cfg.rewards.tracking_sigma) + + def _reward_tracking_ang_vel(self): + # Tracking of angular velocity commands (yaw) + ang_vel_error = torch.square(self.commands[:, 2] - self.base_ang_vel[:, 2]) + return torch.exp(-ang_vel_error/self.cfg.rewards.tracking_sigma) + + def _reward_feet_air_time(self): + # Reward long steps + # Need to filter the contacts because the contact reporting of PhysX is unreliable on meshes + contact = self.contact_forces[:, self.feet_indices, 2] > 1. + contact_filt = torch.logical_or(contact, self.last_contacts) + self.last_contacts = contact + first_contact = (self.feet_air_time > 0.) * contact_filt + self.feet_air_time += self.dt + rew_airTime = torch.sum((self.feet_air_time - 0.2) * first_contact, dim=1) # reward only on first contact with the ground + rew_airTime *= torch.norm(self.commands[:, :2], dim=1) > 0.1 #no reward for zero command + self.feet_air_time *= ~contact_filt + return rew_airTime + + def _reward_stumble(self): + # Penalize feet hitting vertical surfaces + return torch.any(torch.norm(self.contact_forces[:, self.feet_indices, :2], dim=2) >\ + 5 *torch.abs(self.contact_forces[:, self.feet_indices, 2]), dim=1) + + def _reward_stand_still(self): + # Penalize motion at zero commands + return torch.sum(torch.abs(self.dof_pos - self.default_dof_pos), dim=1) * (torch.norm(self.commands[:, :2], dim=1) < 0.1) + + def _reward_feet_contact_forces(self): + # penalize high contact forces + return torch.sum((torch.norm(self.contact_forces[:, self.feet_indices, :], dim=-1) - self.cfg.rewards.max_contact_force).clip(min=0.), dim=1) + + def _reward_action_rate(self): + # Penalize changes in actions + return torch.sum(torch.square(self.last_actions - self.actions), dim=1) + + def _reward_action_smoothness(self): + # Penalize changes in actions + term_2 = torch.sum(torch.square(self.actions - 2.*self.last_actions + self.last_last_actions), dim=1) + term_3 = 0.05 * torch.sum(torch.abs(self.actions), dim=1) + return term_2 + term_3 + + def _reward_default_joint_pos(self): + """ + Calculates the reward for keeping joint positions close to default positions, with a focus + on penalizing deviation in yaw and roll directions. Excludes yaw and roll from the main penalty. + """ + joint_diff = self.dof_pos - self.default_dof_pos + left_yaw_roll = joint_diff[:, [0, 2]] + right_yaw_roll = joint_diff[:, [6, 8]] + yaw_roll = torch.norm(left_yaw_roll, dim=1) + torch.norm(right_yaw_roll, dim=1) + yaw_roll = torch.clamp(yaw_roll - 0.1, 0, 50) + return torch.exp(-yaw_roll * 100) - 0.01 * torch.norm(joint_diff, dim=1) + + def _reward_feet_contact_number(self): + """ + Calculates a reward based on the number of feet contacts aligning with the gait phase. + Rewards or penalizes depending on whether the foot contact matches the expected gait phase. + """ + contact = self.contact_forces[:, self.feet_indices, 2] > 1. + + # count the number of feet in contact at each time step + contact_count = torch.sum(contact, dim=1) + + # check for single foot contact (exactly one foot in contact) + single_contact = (contact_count == 1) + + # accumulate time where there was a single foot contact + self.single_contact_time += self.dt * single_contact.float() + + # apply grace period: if single contact occurred at least once in the last 0.2s, grant reward + grace_period_contact = self.single_contact_time > 0. + rew_contact_number = grace_period_contact.float() + + # Reset single contact time where there is no single foot contact + self.single_contact_time *= ~single_contact + + # Reward is granted if non-standing command and single contact in the last 0.2s + non_standing_command = torch.norm(self.commands[:, :2], dim=1) > 0.1 + rew_contact_number *= non_standing_command.float() + + rew_contact_number += (~non_standing_command).float() + + return rew_contact_number + + def _reward_feet_parallel(self): + left_foot_quat = self.rigid_body_states[:, self.feet_indices[0], 3:7] + right_foot_quat = self.rigid_body_states[:, self.feet_indices[1], 3:7] + + feet_dir_local = torch.zeros((self.num_envs, 3), device=self.device) + feet_dir_local[:,0] = 1 + + # 脚在世界坐标系的方向 + left_foot_dir_world = quat_rotate(left_foot_quat, feet_dir_local.clone()) + right_foot_dir_world = quat_rotate(right_foot_quat, feet_dir_local.clone()) + + # 脚在base坐标系的方向 + left_foot_dir_base_y = quat_rotate_inverse(self.base_quat, left_foot_dir_world)[:,1] + right_foot_dir_base_y = quat_rotate_inverse(self.base_quat, right_foot_dir_world)[:,1] + + feet_dir_y = torch.abs(left_foot_dir_base_y) + torch.abs(right_foot_dir_base_y) + return feet_dir_y + + def _reward_energy(self): + return torch.sum(torch.square(self.torques * self.dof_vel), dim=1) + + def _reward_knee_distance(self): + """ + Calculates the reward based on the distance between the knee of the humanoid. + """ + foot_pos = self.rigid_body_states[:, self.knee_indices, :2] + foot_dist = torch.norm(foot_pos[:, 0, :] - foot_pos[:, 1, :], dim=1) + fd = self.cfg.rewards.min_dist + max_df = self.cfg.rewards.max_dist / 2 + d_min = torch.clamp(foot_dist - fd, -0.5, 0.) + d_max = torch.clamp(foot_dist - max_df, 0, 0.5) + return (torch.exp(-torch.abs(d_min) * 100) + torch.exp(-torch.abs(d_max) * 100)) / 2 + + def _reward_feet_distance(self): + """ + Calculates the reward based on the distance between the feet. Penalize feet get close to each other or too far away. + """ + foot_pos = self.rigid_body_states[:, self.feet_indices, :2] + foot_dist = torch.norm(foot_pos[:, 0, :] - foot_pos[:, 1, :], dim=1) + fd = self.cfg.rewards.min_dist + max_df = self.cfg.rewards.max_dist + d_min = torch.clamp(foot_dist - fd, -0.5, 0.) + d_max = torch.clamp(foot_dist - max_df, 0, 0.5) + return (torch.exp(-torch.abs(d_min) * 100) + torch.exp(-torch.abs(d_max) * 100)) / 2 + + def _reward_arm_pose(self): + return torch.sum(torch.abs(self.actions[:,13:]), dim=1) + + def _reward_joint_pos(self): + """ + Calculates the reward based on the difference between the current joint positions and the target joint positions. + """ + joint_pos = self.dof_pos.clone() - self.default_dof_pos + pos_target = self.ref_dof_pos.clone() + diff = joint_pos - pos_target + r = torch.exp(-2 * torch.norm(diff, dim=1)) - 0.2 * torch.norm(diff, dim=1).clamp(0, 0.5) + return r #* (torch.norm(self.base_lin_vel[:, :2], dim=1) > 0.1) + + def _reward_foot_slip(self): + """ + Calculates the reward for minimizing foot slip. The reward is based on the contact forces + and the speed of the feet. A contact threshold is used to determine if the foot is in contact + with the ground. The speed of the foot is calculated and scaled by the contact condition. + """ + contact = self.contact_forces[:, self.feet_indices, 2] > 5. + foot_speed_norm = torch.norm(self.rigid_body_states[:, self.feet_indices, 10:12], dim=2) + rew = torch.sqrt(foot_speed_norm) + rew *= contact + return torch.sum(rew, dim=1) + + def _reward_feet_contact_number(self): + """ + Calculates a reward based on the number of feet contacts aligning with the gait phase. + Rewards or penalizes depending on whether the foot contact matches the expected gait phase. + """ + contact = self.contact_forces[:, self.feet_indices, 2] > 5. + stance_mask = self._get_gait_phase() + #if self.status == 'standing': + # stance_mask[:] = 1 + reward = torch.where(contact == stance_mask, 1, -0.3) + return torch.mean(reward, dim=1)# * (torch.norm(self.base_lin_vel[:, :2], dim=1) > 0.1) + + def _reward_feet_clearance(self): + """ + Calculates reward based on the clearance of the swing leg from the ground during movement. + Encourages appropriate lift of the feet during the swing phase of the gait. + """ + # Compute feet contact mask + contact = self.contact_forces[:, self.feet_indices, 2] > 5. + + # Get the z-position of the feet and compute the change in z-position + feet_z = self.rigid_body_states[:, self.feet_indices, 2] - 0.05 + delta_z = feet_z - self.last_feet_z + self.feet_height += delta_z + self.last_feet_z = feet_z + + # Compute swing mask + swing_mask = 1 - self._get_gait_phase() + + # feet height should be closed to target feet height at the peak + rew_pos = torch.abs(self.feet_height - self.cfg.rewards.target_feet_height) < 0.01 + rew_pos = torch.sum(rew_pos * swing_mask, dim=1) + self.feet_height *= ~contact + return rew_pos #* (torch.norm(self.base_lin_vel[:, :2], dim=1) > 0.1) + + def _reward_default_joint_pos(self): + """ + Calculates the reward for keeping joint positions close to default positions, with a focus + on penalizing deviation in yaw and roll directions. Excludes yaw and roll from the main penalty. + """ + joint_diff = self.dof_pos - self.default_dof_pos + left_yaw_roll = joint_diff[:, :2] + right_yaw_roll = joint_diff[:, 5: 7] + yaw_roll = torch.norm(left_yaw_roll, dim=1) + torch.norm(right_yaw_roll, dim=1) + yaw_roll = torch.clamp(yaw_roll - 0.1, 0, 50) + return torch.exp(-yaw_roll * 100) - 0.01 * torch.norm(joint_diff, dim=1) + + def _reward_feet_symmetry(self): + left_foot = self.rigid_body_states[:, self.feet_indices[0], :3] - self.base_pos # left ankle + right_foot = self.rigid_body_states[:, self.feet_indices[1], :3] - self.base_pos # right ankle + + left_foot_y = quat_rotate_inverse(self.base_quat, left_foot)[:,1] + right_foot_y = quat_rotate_inverse(self.base_quat, right_foot)[:,1] + + feet_distance = (torch.abs(left_foot_y) - torch.abs(right_foot_y))**2 + return feet_distance + + def _reward_feet_contact_forces(self): + # penalize high contact forces + return torch.sum((torch.norm(self.contact_forces[:, self.feet_indices, :], dim=-1) - self.cfg.rewards.max_contact_force).clip(min=0.), dim=1) diff --git a/legged_gym/envs/h1_2/h1_2_config.py b/legged_gym/envs/h1_2/h1_2_config.py new file mode 100644 index 0000000..bf607f2 --- /dev/null +++ b/legged_gym/envs/h1_2/h1_2_config.py @@ -0,0 +1,235 @@ +from legged_gym.envs.base.legged_robot_config import LeggedRobotCfg, LeggedRobotCfgPPO + + +class H1_2RoughCfg(LeggedRobotCfg): + + class init_state(LeggedRobotCfg.init_state): + pos = [0.0, 0.0, 0.98] # x,y,z [m] + default_joint_angles = { # = target angles [rad] when action = 0.0 + 'left_hip_yaw_joint': 0, + 'left_hip_roll_joint': 0, + 'left_hip_pitch_joint': -0.6, + 'left_knee_joint': 1.2, + 'left_ankle_pitch_joint': -0.6, + 'left_ankle_roll_joint': 0.0, + + 'right_hip_yaw_joint': 0, + 'right_hip_roll_joint': 0, + 'right_hip_pitch_joint': -0.6, + 'right_knee_joint': 1.2, + 'right_ankle_pitch_joint': -0.6, + 'right_ankle_roll_joint': 0.0, + + 'torso_joint': 0, + + 'left_shoulder_pitch_joint': 0.4, + 'left_shoulder_roll_joint': 0, + 'left_shoulder_yaw_joint': 0, + 'left_elbow_pitch_joint': 0.3, + + 'right_shoulder_pitch_joint': 0.4, + 'right_shoulder_roll_joint': 0, + 'right_shoulder_yaw_joint': 0, + 'right_elbow_pitch_joint': 0.3, + } + + class commands: + curriculum = True + max_curriculum = 1. + num_commands = 4 # default: lin_vel_x, lin_vel_y, ang_vel_yaw, heading (in heading mode ang_vel_yaw is recomputed from heading error) + resampling_time = 10. # time before command are changed[s] + heading_command = False # if true: compute ang vel command from heading error + class ranges: + lin_vel_x = [-1.0, 2.0] # min max [m/s] + lin_vel_y = [-1.0, 1.0] # min max [m/s] + ang_vel_yaw = [-1.0, 1.0] # min max [rad/s] + heading = [-3.14, 3.14] + + class env(LeggedRobotCfg.env): + num_actions = 21 + num_policy_output = 21 + num_envs = 3000 + obs_context_len = 15 + mlp_obs_context_len = 3 + num_observations_single = 11 + num_actions * 4 #+ 9 * 4 + num_privileged_obs_single = 14 + num_actions * 4 #+ 9 * 4 + num_observations = num_observations_single * obs_context_len + num_privileged_obs = num_privileged_obs_single * obs_context_len + num_teaching_observations = num_observations_single + + action_delay = 0.02 + episode_length_s = 20 + + class safety: + # safety factors + pos_limit = 1.0 + vel_limit = 1.0 + torque_limit = 0.9 + + class control(LeggedRobotCfg.control): + # PD Drive parameters: + control_type = 'P' + # PD Drive parameters: + stiffness = { + 'hip_yaw_joint': 200., + 'hip_roll_joint': 200., + 'hip_pitch_joint': 200., + 'knee_joint': 300., + 'ankle_pitch_joint': 60., + 'ankle_roll_joint': 40., + 'torso_joint': 600., + 'shoulder_pitch_joint': 80., + 'shoulder_roll_joint': 80., + 'shoulder_yaw_joint': 40., + 'elbow_pitch_joint': 60., + } # [N*m/rad] + damping = { + 'hip_yaw_joint': 5.0, + 'hip_roll_joint': 5.0, + 'hip_pitch_joint': 5.0, + 'knee_joint': 7.5, + 'ankle_pitch_joint': 1.0, + 'ankle_roll_joint': 0.3, + 'torso_joint': 15.0, + 'shoulder_pitch_joint': 2.0, + 'shoulder_roll_joint': 2.0, + 'shoulder_yaw_joint': 1.0, + 'elbow_pitch_joint': 1.0, + } # [N*m/rad] # [N*m*s/rad] + # action scale: target angle = actionScale * action + defaultAngle + action_scale = 0.2 + # decimation: Number of control action updates @ sim DT per policy DT + decimation = 10 + + class asset(LeggedRobotCfg.asset): + file = '{LEGGED_GYM_ROOT_DIR}/resources/robots/h1_2/h1_2_simplified.urdf' + name = "h1_2" + foot_name = "ankle_roll" + knee_name = "knee" + penalize_contacts_on = ["hip", "knee"] + terminate_after_contacts_on = [ + "pelvis", + "hip", + "knee", + "shoulder_pitch", + "elbow" + ] + self_collisions = 0 # 1 to disable, 0 to enable...bitwise filter + flip_visual_attachments = False + eplace_cylinder_with_capsule = False + collapse_fixed_joints = False + armature = 6e-4 # stablize semi-euler integration for end effectors + + vhacd_enabled = False + vhacd_params_resolution = 500000 + + class domain_rand: + randomize_friction = True + friction_range = [0.5, 1.5] + randomize_base_mass = True + added_mass_range = [-1., 1.] + push_robots = True + push_interval_s = 15 + max_push_vel_xy = 0.5 + + randomize_gravity = False + gravity_range = [-1.0, 1.0] + gravity_rand_interval_s = 8.0 # sec + gravity_impulse_duration = 0.99 + + dof_prop_rand_interval_s = 6 + randomize_pd_params = True + kp_ratio_range = [0.5, 1.5] + kd_ratio_range = [0.5, 1.5] + randomize_motor_strength = False + motor_strength_range = [0.9, 1.1] + randomize_motor_offset = False + motor_offset_range = [-0.02, 0.02] + # randomize_decimation = False + # decimation_range = [0.5, 1.5] + soft_dof_pos_limit = 0.9 + base_height_target = 0.98 + + class sim(LeggedRobotCfg.sim): + dt = 0.002 # stablize semi-euler integration for end effectors + + class rewards(LeggedRobotCfg.rewards): + soft_dof_pos_limit = 0.9 + soft_dof_vel_limit = 0.9 + soft_torque_limit = 0.9 + + + base_height_target = 0.98 + + # for knee and ankle distance keeping + min_dist = 0.3 + max_dist = 0.5 + + tracking_sigma = 0.25 # tracking reward = exp(-error^2/sigma) + max_contact_force = 600. # forces above this value are penalized + cycle_time = 0.4 #.4 #0.68 + target_joint_pos_scale = 0.17 + target_feet_height = 0.06 + + class scales(LeggedRobotCfg.rewards.scales): + + joint_pos = 1.6 * 4 + feet_clearance = 1. * 2 + feet_contact_number = 1.2 * 1 + feet_air_time = 1.0 * 2 + foot_slip = -0.1 + + tracking_lin_vel = 1.0 * 8 + tracking_ang_vel = 1.0 * 8 + lin_vel_z = -0.2 + ang_vel_xy = -0.1 + orientation = -1 + base_height = -100.0 + dof_acc = -3e-8 + collision = -1.0 + + action_rate = -0.6 + action_smoothness = -0.6 + + dof_pos_limits = -10.0 + dof_vel_limits = -10.0 + torque_limits = -10.0 + default_joint_pos = 0.5 * 4 + feet_contact_forces = -3e-6 * 50 + + ## feet reg + feet_parallel = -1.0 + feet_symmetry = -1.0 + knee_distance = 0.2 + feet_distance = 0.2 + + arm_pose = -1 + stumble = -2.0 + stand_still = -1.0 * 0 + + only_positive_rewards = True # if true negative total rewards are clipped at zero (avoids early termination problems) + + class normalization(LeggedRobotCfg.normalization): + clip_actions = 10.0 + + +class H1_2RoughCfgPPO(LeggedRobotCfgPPO): + + class policy(LeggedRobotCfgPPO.policy): + init_noise_std = 0.3 + activation = 'tanh' + policy_type = 'moving' + architecture = 'Trans' # choose from 'Mix', 'Trans', 'MLP', and 'RNN' + teaching_model_path = '/home/ziluoding/unitree_rl_gym/logs/Sep06_19-03-45_/model_13000.pt'#'/home/ps/unitree_rl_gym_o/legged_gym/model/Aug29_17-48-05_h1/model_10000.pt' + moving_model_path = '/home/ziluoding/humanoid-gym/logs/h1/Jul11_16-30-02_/model_12000.pt' + + class runner(LeggedRobotCfgPPO.runner): + run_name = '' + experiment_name = 'h1_2' + run_name = '' + # load and resume + render = True + resume = True + load_run = 'Oct24_21-41-11_finetunexvel-12noheadingcycle0.40.2' # -1 = last run + checkpoint = -1 # -1 = last saved model + resume_path = None # updated from load_run and chkpt