Source code for navsim_envs.arora.unity_env

#ARORA

import time
from mlagents_envs.environment import UnityEnvironment
from mlagents_envs.exception import UnityWorkerInUseException
from mlagents_envs.side_channel.engine_configuration_channel import EngineConfigurationChannel
from mlagents_envs.side_channel.environment_parameters_channel import EnvironmentParametersChannel

from .configs import default_env_config
from navsim_envs.envs_base import AroraUnityEnvBase

[docs]class AroraUnityEnv(AroraUnityEnvBase): """AroraUnityEnv Class is a wrapper to UnityEnvironment Read the **NavSim Environment Tutorial** on how to use this class. """ actions = { 'forward_left' : [1,-1,0], 'forward_right' :[1, 1,0], 'forward' : [1,0,0], 'backward' : [-1,0,0] } def __init__(self, env_config) -> None: """ env_config: The environment configuration dictionary Object """ super().__init__(env_config,default_env_config) del env_config eng_sc = EngineConfigurationChannel() eng_sc.set_configuration_parameters(time_scale=0.25, quality_level=0) env_pc = EnvironmentParametersChannel() timeout = self.env_config['timeout'] + (0.5 * (self.env_config['start_from_episode'] - 1)) ad_args = [ "-allowedArea", f"{self.env_config['area']}", "-agentCarPhysics", f"{self.env_config['agent_car_physics']}", "-episodeLength", f"{self.env_config['episode_max_steps']}", "-fastForward", f"{self.env_config['start_from_episode'] - 1}", "-force-device-index", f"{self.env_config['env_gpu_id']}", "-force-vulkan" if (self.env_config["env_gpu_id"] > 0) else "", "-goalDistance", f"{self.env_config['goal_distance']}", "-goalClearance", f"{self.env_config['goal_clearance']}", "-goalSelectionIndex", f"{self.env_config['goal']}", "-numberOfTrafficVehicles", f"{self.env_config['traffic_vehicles']}", "-observationMode", f"{self.env_config['obs_mode']}", "-observationWidth", f"{self.env_config['obs_width']}", "-observationHeight", f"{self.env_config['obs_height']}", "-relativeSteering", f"1" if self.env_config['relative_steering'] else f"0", "-rewardForGoal", f"{self.env_config['reward_for_goal']}", "-rewardForNoViablePath", f"{self.env_config['reward_for_no_viable_path']}", "-rewardStepMul", f"{self.env_config['reward_step_mul']}", "-rewardCollisionMul", f"{self.env_config['reward_collision_mul']}", "-rewardSplDeltaMul", f"{self.env_config['reward_spl_delta_mul']}", "-showVisualObservations" if self.env_config['show_visual'] else "", "-saveStepLog" if self.env_config["debug"] else "", "-segmentationMode", f"{self.env_config['segmentation_mode']}", "-selectedTaskIndex", f"{self.env_config['task']}" ] self._navsim_base_port = self.env_config['base_port'] if self._navsim_base_port is None: self._navsim_base_port = UnityEnvironment.BASE_ENVIRONMENT_PORT if env_config[ 'env_path'] else UnityEnvironment.DEFAULT_EDITOR_PORT self._navsim_worker_id = self.env_config['worker_id'] while True: try: self.env_config["worker_id"] = self._navsim_worker_id self.env_config["base_port"] = self._navsim_base_port UnityEnvironment.__init__(self,file_name=self.env_config['env_path'], log_folder=self.env_config["log_folder"], no_graphics=False, seed=self.env_config["seed"], timeout_wait=timeout, worker_id=self.env_config["worker_id"], base_port=self.env_config["base_port"], side_channels=[eng_sc, env_pc, self.map_side_channel, self.fpc, self.nsc, self.sapsc, self.spsc ], additional_args=ad_args) except UnityWorkerInUseException: time.sleep(2) self._navsim_base_port += 1 else: from_str = "Editor" if self.env_config['env_path'] is None else f"from {self.env_config['env_path']}" AroraUnityEnv.logger.info(f"Created UnityEnvironment {from_str} " f"at port {self._navsim_base_port + self._navsim_worker_id} " f"to start from episode {self.env_config['start_from_episode']}") break
#self.env_config = env_config