kos_sim.services
Service implementations for MuJoCo simulation.
1"""Service implementations for MuJoCo simulation.""" 2 3import asyncio 4import math 5 6import grpc 7from google.protobuf import empty_pb2 8from kos_protos import ( 9 actuator_pb2, 10 actuator_pb2_grpc, 11 common_pb2, 12 imu_pb2, 13 imu_pb2_grpc, 14 process_manager_pb2, 15 process_manager_pb2_grpc, 16 sim_pb2, 17 sim_pb2_grpc, 18) 19 20from kos_sim import logger 21from kos_sim.simulator import ActuatorCommand, ConfigureActuatorRequest, MujocoSimulator 22from kos_sim.video_recorder import VideoRecorder 23 24 25class SimService(sim_pb2_grpc.SimulationServiceServicer): 26 """Implementation of SimService that wraps a MuJoCo simulation.""" 27 28 def __init__(self, simulator: MujocoSimulator) -> None: 29 self.simulator = simulator 30 31 async def Reset( # noqa: N802 32 self, 33 request: sim_pb2.ResetRequest, 34 context: grpc.ServicerContext, 35 ) -> common_pb2.ActionResponse: 36 """Reset the simulation to initial or specified state.""" 37 try: 38 logger.debug("Resetting simulator") 39 await self.simulator.reset( 40 xyz=None if request.HasField("pos") is None else (request.pos.x, request.pos.y, request.pos.z), 41 quat=( 42 None 43 if request.HasField("quat") is None 44 else (request.quat.w, request.quat.x, request.quat.y, request.quat.z) 45 ), 46 joint_pos=( 47 None 48 if request.joints is None 49 else {joint.name: joint.pos for joint in request.joints.values if joint.HasField("pos")} 50 ), 51 joint_vel=( 52 None 53 if request.joints is None 54 else {joint.name: joint.vel for joint in request.joints.values if joint.HasField("vel")} 55 ), 56 ) 57 return common_pb2.ActionResponse(success=True) 58 except Exception as e: 59 logger.error("Reset failed: %s", e) 60 context.set_code(grpc.StatusCode.INTERNAL) 61 context.set_details(str(e)) 62 return common_pb2.ActionResponse(success=False, error=str(e)) 63 64 async def SetPaused( # noqa: N802 65 self, 66 request: sim_pb2.SetPausedRequest, 67 context: grpc.ServicerContext, 68 ) -> common_pb2.ActionResponse: # noqa: N802 69 """Pause or unpause the simulation.""" 70 try: 71 return common_pb2.ActionResponse(success=True) 72 except Exception as e: 73 logger.error("SetPaused failed: %s", e) 74 context.set_code(grpc.StatusCode.INTERNAL) 75 context.set_details(str(e)) 76 return common_pb2.ActionResponse(success=False, error=str(e)) 77 78 async def Step( # noqa: N802 79 self, 80 request: sim_pb2.StepRequest, 81 context: grpc.ServicerContext, 82 ) -> common_pb2.ActionResponse: 83 raise NotImplementedError("Step is not implemented") 84 85 async def SetParameters( # noqa: N802 86 self, 87 request: sim_pb2.SetParametersRequest, 88 context: grpc.ServicerContext, 89 ) -> common_pb2.ActionResponse: 90 """Set simulation parameters.""" 91 try: 92 params = request.parameters 93 if params.HasField("time_scale"): 94 logger.debug("Setting time scale to %f", params.time_scale) 95 self.simulator._model.opt.timestep = self.simulator._dt / params.time_scale 96 if params.HasField("gravity"): 97 logger.debug("Setting gravity to %f", params.gravity) 98 self.simulator._model.opt.gravity[2] = params.gravity 99 return common_pb2.ActionResponse(success=True) 100 except Exception as e: 101 logger.error("SetParameters failed: %s", e) 102 context.set_code(grpc.StatusCode.INTERNAL) 103 context.set_details(str(e)) 104 return common_pb2.ActionResponse(success=False, error=str(e)) 105 106 async def GetParameters( # noqa: N802 107 self, 108 request: empty_pb2.Empty, 109 context: grpc.ServicerContext, 110 ) -> sim_pb2.GetParametersResponse: 111 """Get current simulation parameters.""" 112 try: 113 params = sim_pb2.SimulationParameters( 114 time_scale=self.simulator._dt / self.simulator._model.opt.timestep, 115 gravity=float(self.simulator._model.opt.gravity[2]), 116 ) 117 logger.debug("Current parameters: time_scale=%f, gravity=%f", params.time_scale, params.gravity) 118 return sim_pb2.GetParametersResponse(parameters=params) 119 except Exception as e: 120 logger.error("GetParameters failed: %s", e) 121 context.set_code(grpc.StatusCode.INTERNAL) 122 context.set_details(str(e)) 123 return sim_pb2.GetParametersResponse(error=common_pb2.Error(message=str(e))) 124 125 126class ActuatorService(actuator_pb2_grpc.ActuatorServiceServicer): 127 """Implementation of ActuatorService that wraps a MuJoCo simulation.""" 128 129 def __init__(self, simulator: MujocoSimulator, step_lock: asyncio.Semaphore) -> None: 130 self.simulator = simulator 131 self.step_lock = step_lock 132 133 async def CommandActuators( # noqa: N802 134 self, 135 request: actuator_pb2.CommandActuatorsRequest, 136 context: grpc.ServicerContext, 137 ) -> actuator_pb2.CommandActuatorsResponse: 138 """Implements CommandActuators by forwarding to simulator.""" 139 try: 140 # Convert degrees to radians. 141 commands: dict[int, ActuatorCommand] = { 142 cmd.actuator_id: { 143 "position": math.radians(cmd.position), 144 "velocity": math.radians(cmd.velocity), 145 "torque": cmd.torque, 146 } 147 for cmd in request.commands 148 } 149 async with self.step_lock: 150 await self.simulator.command_actuators(commands) 151 return actuator_pb2.CommandActuatorsResponse() 152 except Exception as e: 153 context.set_code(grpc.StatusCode.INTERNAL) 154 context.set_details(str(e)) 155 return actuator_pb2.CommandActuatorsResponse() 156 157 async def GetActuatorsState( # noqa: N802 158 self, 159 request: actuator_pb2.GetActuatorsStateRequest, 160 context: grpc.ServicerContext, 161 ) -> actuator_pb2.GetActuatorsStateResponse: 162 """Implements GetActuatorsState by reading from simulator.""" 163 ids = request.actuator_ids or list(self.simulator._joint_id_to_name.keys()) 164 try: 165 states = {joint_id: await self.simulator.get_actuator_state(joint_id) for joint_id in ids} 166 return actuator_pb2.GetActuatorsStateResponse( 167 states=[ 168 actuator_pb2.ActuatorStateResponse( 169 actuator_id=joint_id, 170 position=math.degrees(state.position), 171 velocity=math.degrees(state.velocity), 172 online=True, 173 ) 174 for joint_id, state in states.items() 175 ] 176 ) 177 except Exception as e: 178 logger.error("GetActuatorsState failed: %s", e) 179 context.set_code(grpc.StatusCode.INTERNAL) 180 context.set_details(str(e)) 181 return actuator_pb2.GetActuatorsStateResponse() 182 183 async def ConfigureActuator( # noqa: N802 184 self, 185 request: actuator_pb2.ConfigureActuatorRequest, 186 context: grpc.ServicerContext, 187 ) -> common_pb2.ActionResponse: 188 configuration: ConfigureActuatorRequest = {} 189 if request.HasField("torque_enabled"): 190 configuration["torque_enabled"] = request.torque_enabled 191 if request.HasField("zero_position"): 192 configuration["zero_position"] = request.zero_position 193 if request.HasField("kp"): 194 configuration["kp"] = request.kp 195 if request.HasField("kd"): 196 configuration["kd"] = request.kd 197 if request.HasField("max_torque"): 198 configuration["max_torque"] = request.max_torque 199 await self.simulator.configure_actuator(request.actuator_id, configuration) 200 201 return common_pb2.ActionResponse(success=True) 202 203 204class IMUService(imu_pb2_grpc.IMUServiceServicer): 205 """Implementation of IMUService that wraps a MuJoCo simulation.""" 206 207 def __init__( 208 self, 209 simulator: MujocoSimulator, 210 acc_name: str | None = "imu_acc", 211 gyro_name: str | None = "imu_gyro", 212 mag_name: str | None = "imu_mag", 213 quat_name: str | None = "base_link_quat", 214 ) -> None: 215 self.simulator = simulator 216 self.acc_name = acc_name 217 self.gyro_name = gyro_name 218 self.mag_name = mag_name 219 self.quat_name = quat_name 220 221 async def GetValues( # noqa: N802 222 self, 223 request: empty_pb2.Empty, 224 context: grpc.ServicerContext, 225 ) -> imu_pb2.IMUValuesResponse: 226 """Implements GetValues by reading IMU sensor data from simulator.""" 227 try: 228 if self.acc_name is None or self.gyro_name is None: 229 raise ValueError("Accelerometer or gyroscope name not set") 230 acc_data = await self.simulator.get_sensor_data(self.acc_name) 231 gyro_data = await self.simulator.get_sensor_data(self.gyro_name) 232 mag_data = None if self.mag_name is None else await self.simulator.get_sensor_data(self.mag_name) 233 return imu_pb2.IMUValuesResponse( 234 accel_x=float(acc_data[0]), 235 accel_y=float(acc_data[1]), 236 accel_z=float(acc_data[2]), 237 gyro_x=float(gyro_data[0]), 238 gyro_y=float(gyro_data[1]), 239 gyro_z=float(gyro_data[2]), 240 mag_x=None if mag_data is None else float(mag_data[0]), 241 mag_y=None if mag_data is None else float(mag_data[1]), 242 mag_z=None if mag_data is None else float(mag_data[2]), 243 ) 244 except Exception as e: 245 context.set_code(grpc.StatusCode.INTERNAL) 246 context.set_details(str(e)) 247 return imu_pb2.IMUValuesResponse() 248 249 async def GetQuaternion( # noqa: N802 250 self, 251 request: empty_pb2.Empty, 252 context: grpc.ServicerContext, 253 ) -> imu_pb2.QuaternionResponse: 254 """Implements GetQuaternion by reading orientation data from simulator.""" 255 try: 256 if self.quat_name is None: 257 raise ValueError("Quaternion name not set") 258 quat_data = await self.simulator.get_sensor_data(self.quat_name) 259 return imu_pb2.QuaternionResponse( 260 w=float(quat_data[0]), x=float(quat_data[1]), y=float(quat_data[2]), z=float(quat_data[3]) 261 ) 262 except Exception as e: 263 context.set_code(grpc.StatusCode.INTERNAL) 264 context.set_details(str(e)) 265 return imu_pb2.QuaternionResponse() 266 267 async def GetEuler( # noqa: N802 268 self, 269 request: empty_pb2.Empty, 270 context: grpc.ServicerContext, 271 ) -> imu_pb2.EulerAnglesResponse: 272 """Implements GetEuler by converting orientation quaternion to Euler angles.""" 273 try: 274 if self.quat_name is None: 275 raise ValueError("Quaternion name not set") 276 quat_data = await self.simulator.get_sensor_data(self.quat_name) 277 278 # Extract quaternion components 279 w, x, y, z = [float(q) for q in quat_data] 280 281 # Convert quaternion to Euler angles (roll, pitch, yaw) 282 # Roll (x-axis rotation) 283 sinr_cosp = 2 * (w * x + y * z) 284 cosr_cosp = 1 - 2 * (x * x + y * y) 285 roll = math.atan2(sinr_cosp, cosr_cosp) 286 287 # Pitch (y-axis rotation) 288 sinp = 2 * (w * y - z * x) 289 pitch = math.asin(sinp) if abs(sinp) < 1 else math.copysign(math.pi / 2, sinp) 290 291 # Yaw (z-axis rotation) 292 siny_cosp = 2 * (w * z + x * y) 293 cosy_cosp = 1 - 2 * (y * y + z * z) 294 yaw = math.atan2(siny_cosp, cosy_cosp) 295 296 # Convert to degrees 297 roll_deg = math.degrees(roll) 298 pitch_deg = math.degrees(pitch) 299 yaw_deg = math.degrees(yaw) 300 301 return imu_pb2.EulerAnglesResponse(roll=roll_deg, pitch=pitch_deg, yaw=yaw_deg) 302 except Exception as e: 303 context.set_code(grpc.StatusCode.INTERNAL) 304 context.set_details(str(e)) 305 return imu_pb2.EulerAnglesResponse() 306 307 308class ProcessManagerService(process_manager_pb2_grpc.ProcessManagerServiceServicer): 309 """Implementation of ProcessManagerService that wraps a MuJoCo simulation.""" 310 311 def __init__(self, simulator: MujocoSimulator, video_recorder: VideoRecorder) -> None: 312 self.simulator = simulator 313 self.video_recorder = video_recorder 314 315 async def StartKClip( # noqa: N802 316 self, request: process_manager_pb2.KClipStartRequest, context: grpc.ServicerContext 317 ) -> process_manager_pb2.KClipStartResponse: 318 """Implements StartKClip by starting k-clip recording.""" 319 if self.video_recorder is None: 320 return process_manager_pb2.KClipStartResponse( 321 error=common_pb2.Error( 322 code=common_pb2.ErrorCode.INVALID_ARGUMENT, 323 message="`video_recorder` is `None`. Video recording not enabled.", 324 ) 325 ) 326 clip_id = self.video_recorder.start_recording() 327 return process_manager_pb2.KClipStartResponse(clip_uuid=clip_id) 328 329 async def StopKClip( # noqa: N802 330 self, request: empty_pb2.Empty, context: grpc.ServicerContext 331 ) -> process_manager_pb2.KClipStopResponse: 332 """Implements StopKClip by stopping k-clip recording.""" 333 if self.video_recorder is None: 334 return process_manager_pb2.KClipStopResponse( 335 error=common_pb2.Error( 336 code=common_pb2.ErrorCode.INVALID_ARGUMENT, 337 message="`video_recorder` is `None`. Video recording not enabled.", 338 ) 339 ) 340 self.video_recorder.stop_recording() 341 return process_manager_pb2.KClipStopResponse()
26class SimService(sim_pb2_grpc.SimulationServiceServicer): 27 """Implementation of SimService that wraps a MuJoCo simulation.""" 28 29 def __init__(self, simulator: MujocoSimulator) -> None: 30 self.simulator = simulator 31 32 async def Reset( # noqa: N802 33 self, 34 request: sim_pb2.ResetRequest, 35 context: grpc.ServicerContext, 36 ) -> common_pb2.ActionResponse: 37 """Reset the simulation to initial or specified state.""" 38 try: 39 logger.debug("Resetting simulator") 40 await self.simulator.reset( 41 xyz=None if request.HasField("pos") is None else (request.pos.x, request.pos.y, request.pos.z), 42 quat=( 43 None 44 if request.HasField("quat") is None 45 else (request.quat.w, request.quat.x, request.quat.y, request.quat.z) 46 ), 47 joint_pos=( 48 None 49 if request.joints is None 50 else {joint.name: joint.pos for joint in request.joints.values if joint.HasField("pos")} 51 ), 52 joint_vel=( 53 None 54 if request.joints is None 55 else {joint.name: joint.vel for joint in request.joints.values if joint.HasField("vel")} 56 ), 57 ) 58 return common_pb2.ActionResponse(success=True) 59 except Exception as e: 60 logger.error("Reset failed: %s", e) 61 context.set_code(grpc.StatusCode.INTERNAL) 62 context.set_details(str(e)) 63 return common_pb2.ActionResponse(success=False, error=str(e)) 64 65 async def SetPaused( # noqa: N802 66 self, 67 request: sim_pb2.SetPausedRequest, 68 context: grpc.ServicerContext, 69 ) -> common_pb2.ActionResponse: # noqa: N802 70 """Pause or unpause the simulation.""" 71 try: 72 return common_pb2.ActionResponse(success=True) 73 except Exception as e: 74 logger.error("SetPaused failed: %s", e) 75 context.set_code(grpc.StatusCode.INTERNAL) 76 context.set_details(str(e)) 77 return common_pb2.ActionResponse(success=False, error=str(e)) 78 79 async def Step( # noqa: N802 80 self, 81 request: sim_pb2.StepRequest, 82 context: grpc.ServicerContext, 83 ) -> common_pb2.ActionResponse: 84 raise NotImplementedError("Step is not implemented") 85 86 async def SetParameters( # noqa: N802 87 self, 88 request: sim_pb2.SetParametersRequest, 89 context: grpc.ServicerContext, 90 ) -> common_pb2.ActionResponse: 91 """Set simulation parameters.""" 92 try: 93 params = request.parameters 94 if params.HasField("time_scale"): 95 logger.debug("Setting time scale to %f", params.time_scale) 96 self.simulator._model.opt.timestep = self.simulator._dt / params.time_scale 97 if params.HasField("gravity"): 98 logger.debug("Setting gravity to %f", params.gravity) 99 self.simulator._model.opt.gravity[2] = params.gravity 100 return common_pb2.ActionResponse(success=True) 101 except Exception as e: 102 logger.error("SetParameters failed: %s", e) 103 context.set_code(grpc.StatusCode.INTERNAL) 104 context.set_details(str(e)) 105 return common_pb2.ActionResponse(success=False, error=str(e)) 106 107 async def GetParameters( # noqa: N802 108 self, 109 request: empty_pb2.Empty, 110 context: grpc.ServicerContext, 111 ) -> sim_pb2.GetParametersResponse: 112 """Get current simulation parameters.""" 113 try: 114 params = sim_pb2.SimulationParameters( 115 time_scale=self.simulator._dt / self.simulator._model.opt.timestep, 116 gravity=float(self.simulator._model.opt.gravity[2]), 117 ) 118 logger.debug("Current parameters: time_scale=%f, gravity=%f", params.time_scale, params.gravity) 119 return sim_pb2.GetParametersResponse(parameters=params) 120 except Exception as e: 121 logger.error("GetParameters failed: %s", e) 122 context.set_code(grpc.StatusCode.INTERNAL) 123 context.set_details(str(e)) 124 return sim_pb2.GetParametersResponse(error=common_pb2.Error(message=str(e)))
Implementation of SimService that wraps a MuJoCo simulation.
32 async def Reset( # noqa: N802 33 self, 34 request: sim_pb2.ResetRequest, 35 context: grpc.ServicerContext, 36 ) -> common_pb2.ActionResponse: 37 """Reset the simulation to initial or specified state.""" 38 try: 39 logger.debug("Resetting simulator") 40 await self.simulator.reset( 41 xyz=None if request.HasField("pos") is None else (request.pos.x, request.pos.y, request.pos.z), 42 quat=( 43 None 44 if request.HasField("quat") is None 45 else (request.quat.w, request.quat.x, request.quat.y, request.quat.z) 46 ), 47 joint_pos=( 48 None 49 if request.joints is None 50 else {joint.name: joint.pos for joint in request.joints.values if joint.HasField("pos")} 51 ), 52 joint_vel=( 53 None 54 if request.joints is None 55 else {joint.name: joint.vel for joint in request.joints.values if joint.HasField("vel")} 56 ), 57 ) 58 return common_pb2.ActionResponse(success=True) 59 except Exception as e: 60 logger.error("Reset failed: %s", e) 61 context.set_code(grpc.StatusCode.INTERNAL) 62 context.set_details(str(e)) 63 return common_pb2.ActionResponse(success=False, error=str(e))
Reset the simulation to initial or specified state.
65 async def SetPaused( # noqa: N802 66 self, 67 request: sim_pb2.SetPausedRequest, 68 context: grpc.ServicerContext, 69 ) -> common_pb2.ActionResponse: # noqa: N802 70 """Pause or unpause the simulation.""" 71 try: 72 return common_pb2.ActionResponse(success=True) 73 except Exception as e: 74 logger.error("SetPaused failed: %s", e) 75 context.set_code(grpc.StatusCode.INTERNAL) 76 context.set_details(str(e)) 77 return common_pb2.ActionResponse(success=False, error=str(e))
Pause or unpause the simulation.
79 async def Step( # noqa: N802 80 self, 81 request: sim_pb2.StepRequest, 82 context: grpc.ServicerContext, 83 ) -> common_pb2.ActionResponse: 84 raise NotImplementedError("Step is not implemented")
Steps the simulation forward by a specified amount.
86 async def SetParameters( # noqa: N802 87 self, 88 request: sim_pb2.SetParametersRequest, 89 context: grpc.ServicerContext, 90 ) -> common_pb2.ActionResponse: 91 """Set simulation parameters.""" 92 try: 93 params = request.parameters 94 if params.HasField("time_scale"): 95 logger.debug("Setting time scale to %f", params.time_scale) 96 self.simulator._model.opt.timestep = self.simulator._dt / params.time_scale 97 if params.HasField("gravity"): 98 logger.debug("Setting gravity to %f", params.gravity) 99 self.simulator._model.opt.gravity[2] = params.gravity 100 return common_pb2.ActionResponse(success=True) 101 except Exception as e: 102 logger.error("SetParameters failed: %s", e) 103 context.set_code(grpc.StatusCode.INTERNAL) 104 context.set_details(str(e)) 105 return common_pb2.ActionResponse(success=False, error=str(e))
Set simulation parameters.
107 async def GetParameters( # noqa: N802 108 self, 109 request: empty_pb2.Empty, 110 context: grpc.ServicerContext, 111 ) -> sim_pb2.GetParametersResponse: 112 """Get current simulation parameters.""" 113 try: 114 params = sim_pb2.SimulationParameters( 115 time_scale=self.simulator._dt / self.simulator._model.opt.timestep, 116 gravity=float(self.simulator._model.opt.gravity[2]), 117 ) 118 logger.debug("Current parameters: time_scale=%f, gravity=%f", params.time_scale, params.gravity) 119 return sim_pb2.GetParametersResponse(parameters=params) 120 except Exception as e: 121 logger.error("GetParameters failed: %s", e) 122 context.set_code(grpc.StatusCode.INTERNAL) 123 context.set_details(str(e)) 124 return sim_pb2.GetParametersResponse(error=common_pb2.Error(message=str(e)))
Get current simulation parameters.
127class ActuatorService(actuator_pb2_grpc.ActuatorServiceServicer): 128 """Implementation of ActuatorService that wraps a MuJoCo simulation.""" 129 130 def __init__(self, simulator: MujocoSimulator, step_lock: asyncio.Semaphore) -> None: 131 self.simulator = simulator 132 self.step_lock = step_lock 133 134 async def CommandActuators( # noqa: N802 135 self, 136 request: actuator_pb2.CommandActuatorsRequest, 137 context: grpc.ServicerContext, 138 ) -> actuator_pb2.CommandActuatorsResponse: 139 """Implements CommandActuators by forwarding to simulator.""" 140 try: 141 # Convert degrees to radians. 142 commands: dict[int, ActuatorCommand] = { 143 cmd.actuator_id: { 144 "position": math.radians(cmd.position), 145 "velocity": math.radians(cmd.velocity), 146 "torque": cmd.torque, 147 } 148 for cmd in request.commands 149 } 150 async with self.step_lock: 151 await self.simulator.command_actuators(commands) 152 return actuator_pb2.CommandActuatorsResponse() 153 except Exception as e: 154 context.set_code(grpc.StatusCode.INTERNAL) 155 context.set_details(str(e)) 156 return actuator_pb2.CommandActuatorsResponse() 157 158 async def GetActuatorsState( # noqa: N802 159 self, 160 request: actuator_pb2.GetActuatorsStateRequest, 161 context: grpc.ServicerContext, 162 ) -> actuator_pb2.GetActuatorsStateResponse: 163 """Implements GetActuatorsState by reading from simulator.""" 164 ids = request.actuator_ids or list(self.simulator._joint_id_to_name.keys()) 165 try: 166 states = {joint_id: await self.simulator.get_actuator_state(joint_id) for joint_id in ids} 167 return actuator_pb2.GetActuatorsStateResponse( 168 states=[ 169 actuator_pb2.ActuatorStateResponse( 170 actuator_id=joint_id, 171 position=math.degrees(state.position), 172 velocity=math.degrees(state.velocity), 173 online=True, 174 ) 175 for joint_id, state in states.items() 176 ] 177 ) 178 except Exception as e: 179 logger.error("GetActuatorsState failed: %s", e) 180 context.set_code(grpc.StatusCode.INTERNAL) 181 context.set_details(str(e)) 182 return actuator_pb2.GetActuatorsStateResponse() 183 184 async def ConfigureActuator( # noqa: N802 185 self, 186 request: actuator_pb2.ConfigureActuatorRequest, 187 context: grpc.ServicerContext, 188 ) -> common_pb2.ActionResponse: 189 configuration: ConfigureActuatorRequest = {} 190 if request.HasField("torque_enabled"): 191 configuration["torque_enabled"] = request.torque_enabled 192 if request.HasField("zero_position"): 193 configuration["zero_position"] = request.zero_position 194 if request.HasField("kp"): 195 configuration["kp"] = request.kp 196 if request.HasField("kd"): 197 configuration["kd"] = request.kd 198 if request.HasField("max_torque"): 199 configuration["max_torque"] = request.max_torque 200 await self.simulator.configure_actuator(request.actuator_id, configuration) 201 202 return common_pb2.ActionResponse(success=True)
Implementation of ActuatorService that wraps a MuJoCo simulation.
134 async def CommandActuators( # noqa: N802 135 self, 136 request: actuator_pb2.CommandActuatorsRequest, 137 context: grpc.ServicerContext, 138 ) -> actuator_pb2.CommandActuatorsResponse: 139 """Implements CommandActuators by forwarding to simulator.""" 140 try: 141 # Convert degrees to radians. 142 commands: dict[int, ActuatorCommand] = { 143 cmd.actuator_id: { 144 "position": math.radians(cmd.position), 145 "velocity": math.radians(cmd.velocity), 146 "torque": cmd.torque, 147 } 148 for cmd in request.commands 149 } 150 async with self.step_lock: 151 await self.simulator.command_actuators(commands) 152 return actuator_pb2.CommandActuatorsResponse() 153 except Exception as e: 154 context.set_code(grpc.StatusCode.INTERNAL) 155 context.set_details(str(e)) 156 return actuator_pb2.CommandActuatorsResponse()
Implements CommandActuators by forwarding to simulator.
158 async def GetActuatorsState( # noqa: N802 159 self, 160 request: actuator_pb2.GetActuatorsStateRequest, 161 context: grpc.ServicerContext, 162 ) -> actuator_pb2.GetActuatorsStateResponse: 163 """Implements GetActuatorsState by reading from simulator.""" 164 ids = request.actuator_ids or list(self.simulator._joint_id_to_name.keys()) 165 try: 166 states = {joint_id: await self.simulator.get_actuator_state(joint_id) for joint_id in ids} 167 return actuator_pb2.GetActuatorsStateResponse( 168 states=[ 169 actuator_pb2.ActuatorStateResponse( 170 actuator_id=joint_id, 171 position=math.degrees(state.position), 172 velocity=math.degrees(state.velocity), 173 online=True, 174 ) 175 for joint_id, state in states.items() 176 ] 177 ) 178 except Exception as e: 179 logger.error("GetActuatorsState failed: %s", e) 180 context.set_code(grpc.StatusCode.INTERNAL) 181 context.set_details(str(e)) 182 return actuator_pb2.GetActuatorsStateResponse()
Implements GetActuatorsState by reading from simulator.
184 async def ConfigureActuator( # noqa: N802 185 self, 186 request: actuator_pb2.ConfigureActuatorRequest, 187 context: grpc.ServicerContext, 188 ) -> common_pb2.ActionResponse: 189 configuration: ConfigureActuatorRequest = {} 190 if request.HasField("torque_enabled"): 191 configuration["torque_enabled"] = request.torque_enabled 192 if request.HasField("zero_position"): 193 configuration["zero_position"] = request.zero_position 194 if request.HasField("kp"): 195 configuration["kp"] = request.kp 196 if request.HasField("kd"): 197 configuration["kd"] = request.kd 198 if request.HasField("max_torque"): 199 configuration["max_torque"] = request.max_torque 200 await self.simulator.configure_actuator(request.actuator_id, configuration) 201 202 return common_pb2.ActionResponse(success=True)
Configures an actuator's parameters.
205class IMUService(imu_pb2_grpc.IMUServiceServicer): 206 """Implementation of IMUService that wraps a MuJoCo simulation.""" 207 208 def __init__( 209 self, 210 simulator: MujocoSimulator, 211 acc_name: str | None = "imu_acc", 212 gyro_name: str | None = "imu_gyro", 213 mag_name: str | None = "imu_mag", 214 quat_name: str | None = "base_link_quat", 215 ) -> None: 216 self.simulator = simulator 217 self.acc_name = acc_name 218 self.gyro_name = gyro_name 219 self.mag_name = mag_name 220 self.quat_name = quat_name 221 222 async def GetValues( # noqa: N802 223 self, 224 request: empty_pb2.Empty, 225 context: grpc.ServicerContext, 226 ) -> imu_pb2.IMUValuesResponse: 227 """Implements GetValues by reading IMU sensor data from simulator.""" 228 try: 229 if self.acc_name is None or self.gyro_name is None: 230 raise ValueError("Accelerometer or gyroscope name not set") 231 acc_data = await self.simulator.get_sensor_data(self.acc_name) 232 gyro_data = await self.simulator.get_sensor_data(self.gyro_name) 233 mag_data = None if self.mag_name is None else await self.simulator.get_sensor_data(self.mag_name) 234 return imu_pb2.IMUValuesResponse( 235 accel_x=float(acc_data[0]), 236 accel_y=float(acc_data[1]), 237 accel_z=float(acc_data[2]), 238 gyro_x=float(gyro_data[0]), 239 gyro_y=float(gyro_data[1]), 240 gyro_z=float(gyro_data[2]), 241 mag_x=None if mag_data is None else float(mag_data[0]), 242 mag_y=None if mag_data is None else float(mag_data[1]), 243 mag_z=None if mag_data is None else float(mag_data[2]), 244 ) 245 except Exception as e: 246 context.set_code(grpc.StatusCode.INTERNAL) 247 context.set_details(str(e)) 248 return imu_pb2.IMUValuesResponse() 249 250 async def GetQuaternion( # noqa: N802 251 self, 252 request: empty_pb2.Empty, 253 context: grpc.ServicerContext, 254 ) -> imu_pb2.QuaternionResponse: 255 """Implements GetQuaternion by reading orientation data from simulator.""" 256 try: 257 if self.quat_name is None: 258 raise ValueError("Quaternion name not set") 259 quat_data = await self.simulator.get_sensor_data(self.quat_name) 260 return imu_pb2.QuaternionResponse( 261 w=float(quat_data[0]), x=float(quat_data[1]), y=float(quat_data[2]), z=float(quat_data[3]) 262 ) 263 except Exception as e: 264 context.set_code(grpc.StatusCode.INTERNAL) 265 context.set_details(str(e)) 266 return imu_pb2.QuaternionResponse() 267 268 async def GetEuler( # noqa: N802 269 self, 270 request: empty_pb2.Empty, 271 context: grpc.ServicerContext, 272 ) -> imu_pb2.EulerAnglesResponse: 273 """Implements GetEuler by converting orientation quaternion to Euler angles.""" 274 try: 275 if self.quat_name is None: 276 raise ValueError("Quaternion name not set") 277 quat_data = await self.simulator.get_sensor_data(self.quat_name) 278 279 # Extract quaternion components 280 w, x, y, z = [float(q) for q in quat_data] 281 282 # Convert quaternion to Euler angles (roll, pitch, yaw) 283 # Roll (x-axis rotation) 284 sinr_cosp = 2 * (w * x + y * z) 285 cosr_cosp = 1 - 2 * (x * x + y * y) 286 roll = math.atan2(sinr_cosp, cosr_cosp) 287 288 # Pitch (y-axis rotation) 289 sinp = 2 * (w * y - z * x) 290 pitch = math.asin(sinp) if abs(sinp) < 1 else math.copysign(math.pi / 2, sinp) 291 292 # Yaw (z-axis rotation) 293 siny_cosp = 2 * (w * z + x * y) 294 cosy_cosp = 1 - 2 * (y * y + z * z) 295 yaw = math.atan2(siny_cosp, cosy_cosp) 296 297 # Convert to degrees 298 roll_deg = math.degrees(roll) 299 pitch_deg = math.degrees(pitch) 300 yaw_deg = math.degrees(yaw) 301 302 return imu_pb2.EulerAnglesResponse(roll=roll_deg, pitch=pitch_deg, yaw=yaw_deg) 303 except Exception as e: 304 context.set_code(grpc.StatusCode.INTERNAL) 305 context.set_details(str(e)) 306 return imu_pb2.EulerAnglesResponse()
Implementation of IMUService that wraps a MuJoCo simulation.
208 def __init__( 209 self, 210 simulator: MujocoSimulator, 211 acc_name: str | None = "imu_acc", 212 gyro_name: str | None = "imu_gyro", 213 mag_name: str | None = "imu_mag", 214 quat_name: str | None = "base_link_quat", 215 ) -> None: 216 self.simulator = simulator 217 self.acc_name = acc_name 218 self.gyro_name = gyro_name 219 self.mag_name = mag_name 220 self.quat_name = quat_name
222 async def GetValues( # noqa: N802 223 self, 224 request: empty_pb2.Empty, 225 context: grpc.ServicerContext, 226 ) -> imu_pb2.IMUValuesResponse: 227 """Implements GetValues by reading IMU sensor data from simulator.""" 228 try: 229 if self.acc_name is None or self.gyro_name is None: 230 raise ValueError("Accelerometer or gyroscope name not set") 231 acc_data = await self.simulator.get_sensor_data(self.acc_name) 232 gyro_data = await self.simulator.get_sensor_data(self.gyro_name) 233 mag_data = None if self.mag_name is None else await self.simulator.get_sensor_data(self.mag_name) 234 return imu_pb2.IMUValuesResponse( 235 accel_x=float(acc_data[0]), 236 accel_y=float(acc_data[1]), 237 accel_z=float(acc_data[2]), 238 gyro_x=float(gyro_data[0]), 239 gyro_y=float(gyro_data[1]), 240 gyro_z=float(gyro_data[2]), 241 mag_x=None if mag_data is None else float(mag_data[0]), 242 mag_y=None if mag_data is None else float(mag_data[1]), 243 mag_z=None if mag_data is None else float(mag_data[2]), 244 ) 245 except Exception as e: 246 context.set_code(grpc.StatusCode.INTERNAL) 247 context.set_details(str(e)) 248 return imu_pb2.IMUValuesResponse()
Implements GetValues by reading IMU sensor data from simulator.
250 async def GetQuaternion( # noqa: N802 251 self, 252 request: empty_pb2.Empty, 253 context: grpc.ServicerContext, 254 ) -> imu_pb2.QuaternionResponse: 255 """Implements GetQuaternion by reading orientation data from simulator.""" 256 try: 257 if self.quat_name is None: 258 raise ValueError("Quaternion name not set") 259 quat_data = await self.simulator.get_sensor_data(self.quat_name) 260 return imu_pb2.QuaternionResponse( 261 w=float(quat_data[0]), x=float(quat_data[1]), y=float(quat_data[2]), z=float(quat_data[3]) 262 ) 263 except Exception as e: 264 context.set_code(grpc.StatusCode.INTERNAL) 265 context.set_details(str(e)) 266 return imu_pb2.QuaternionResponse()
Implements GetQuaternion by reading orientation data from simulator.
268 async def GetEuler( # noqa: N802 269 self, 270 request: empty_pb2.Empty, 271 context: grpc.ServicerContext, 272 ) -> imu_pb2.EulerAnglesResponse: 273 """Implements GetEuler by converting orientation quaternion to Euler angles.""" 274 try: 275 if self.quat_name is None: 276 raise ValueError("Quaternion name not set") 277 quat_data = await self.simulator.get_sensor_data(self.quat_name) 278 279 # Extract quaternion components 280 w, x, y, z = [float(q) for q in quat_data] 281 282 # Convert quaternion to Euler angles (roll, pitch, yaw) 283 # Roll (x-axis rotation) 284 sinr_cosp = 2 * (w * x + y * z) 285 cosr_cosp = 1 - 2 * (x * x + y * y) 286 roll = math.atan2(sinr_cosp, cosr_cosp) 287 288 # Pitch (y-axis rotation) 289 sinp = 2 * (w * y - z * x) 290 pitch = math.asin(sinp) if abs(sinp) < 1 else math.copysign(math.pi / 2, sinp) 291 292 # Yaw (z-axis rotation) 293 siny_cosp = 2 * (w * z + x * y) 294 cosy_cosp = 1 - 2 * (y * y + z * z) 295 yaw = math.atan2(siny_cosp, cosy_cosp) 296 297 # Convert to degrees 298 roll_deg = math.degrees(roll) 299 pitch_deg = math.degrees(pitch) 300 yaw_deg = math.degrees(yaw) 301 302 return imu_pb2.EulerAnglesResponse(roll=roll_deg, pitch=pitch_deg, yaw=yaw_deg) 303 except Exception as e: 304 context.set_code(grpc.StatusCode.INTERNAL) 305 context.set_details(str(e)) 306 return imu_pb2.EulerAnglesResponse()
Implements GetEuler by converting orientation quaternion to Euler angles.
309class ProcessManagerService(process_manager_pb2_grpc.ProcessManagerServiceServicer): 310 """Implementation of ProcessManagerService that wraps a MuJoCo simulation.""" 311 312 def __init__(self, simulator: MujocoSimulator, video_recorder: VideoRecorder) -> None: 313 self.simulator = simulator 314 self.video_recorder = video_recorder 315 316 async def StartKClip( # noqa: N802 317 self, request: process_manager_pb2.KClipStartRequest, context: grpc.ServicerContext 318 ) -> process_manager_pb2.KClipStartResponse: 319 """Implements StartKClip by starting k-clip recording.""" 320 if self.video_recorder is None: 321 return process_manager_pb2.KClipStartResponse( 322 error=common_pb2.Error( 323 code=common_pb2.ErrorCode.INVALID_ARGUMENT, 324 message="`video_recorder` is `None`. Video recording not enabled.", 325 ) 326 ) 327 clip_id = self.video_recorder.start_recording() 328 return process_manager_pb2.KClipStartResponse(clip_uuid=clip_id) 329 330 async def StopKClip( # noqa: N802 331 self, request: empty_pb2.Empty, context: grpc.ServicerContext 332 ) -> process_manager_pb2.KClipStopResponse: 333 """Implements StopKClip by stopping k-clip recording.""" 334 if self.video_recorder is None: 335 return process_manager_pb2.KClipStopResponse( 336 error=common_pb2.Error( 337 code=common_pb2.ErrorCode.INVALID_ARGUMENT, 338 message="`video_recorder` is `None`. Video recording not enabled.", 339 ) 340 ) 341 self.video_recorder.stop_recording() 342 return process_manager_pb2.KClipStopResponse()
Implementation of ProcessManagerService that wraps a MuJoCo simulation.
316 async def StartKClip( # noqa: N802 317 self, request: process_manager_pb2.KClipStartRequest, context: grpc.ServicerContext 318 ) -> process_manager_pb2.KClipStartResponse: 319 """Implements StartKClip by starting k-clip recording.""" 320 if self.video_recorder is None: 321 return process_manager_pb2.KClipStartResponse( 322 error=common_pb2.Error( 323 code=common_pb2.ErrorCode.INVALID_ARGUMENT, 324 message="`video_recorder` is `None`. Video recording not enabled.", 325 ) 326 ) 327 clip_id = self.video_recorder.start_recording() 328 return process_manager_pb2.KClipStartResponse(clip_uuid=clip_id)
Implements StartKClip by starting k-clip recording.
330 async def StopKClip( # noqa: N802 331 self, request: empty_pb2.Empty, context: grpc.ServicerContext 332 ) -> process_manager_pb2.KClipStopResponse: 333 """Implements StopKClip by stopping k-clip recording.""" 334 if self.video_recorder is None: 335 return process_manager_pb2.KClipStopResponse( 336 error=common_pb2.Error( 337 code=common_pb2.ErrorCode.INVALID_ARGUMENT, 338 message="`video_recorder` is `None`. Video recording not enabled.", 339 ) 340 ) 341 self.video_recorder.stop_recording() 342 return process_manager_pb2.KClipStopResponse()
Implements StopKClip by stopping k-clip recording.