kos_sim.services

Service implementations for MuJoCo simulation.

  1"""Service implementations for MuJoCo simulation."""
  2
  3import asyncio
  4import math
  5
  6import grpc
  7from google.protobuf import empty_pb2
  8from kos_protos import (
  9    actuator_pb2,
 10    actuator_pb2_grpc,
 11    common_pb2,
 12    imu_pb2,
 13    imu_pb2_grpc,
 14    process_manager_pb2,
 15    process_manager_pb2_grpc,
 16    sim_pb2,
 17    sim_pb2_grpc,
 18)
 19
 20from kos_sim import logger
 21from kos_sim.simulator import ActuatorCommand, ConfigureActuatorRequest, MujocoSimulator
 22from kos_sim.video_recorder import VideoRecorder
 23
 24
 25class SimService(sim_pb2_grpc.SimulationServiceServicer):
 26    """Implementation of SimService that wraps a MuJoCo simulation."""
 27
 28    def __init__(self, simulator: MujocoSimulator) -> None:
 29        self.simulator = simulator
 30
 31    async def Reset(  # noqa: N802
 32        self,
 33        request: sim_pb2.ResetRequest,
 34        context: grpc.ServicerContext,
 35    ) -> common_pb2.ActionResponse:
 36        """Reset the simulation to initial or specified state."""
 37        try:
 38            logger.debug("Resetting simulator")
 39            await self.simulator.reset(
 40                xyz=None if request.HasField("pos") is None else (request.pos.x, request.pos.y, request.pos.z),
 41                quat=(
 42                    None
 43                    if request.HasField("quat") is None
 44                    else (request.quat.w, request.quat.x, request.quat.y, request.quat.z)
 45                ),
 46                joint_pos=(
 47                    None
 48                    if request.joints is None
 49                    else {joint.name: joint.pos for joint in request.joints.values if joint.HasField("pos")}
 50                ),
 51                joint_vel=(
 52                    None
 53                    if request.joints is None
 54                    else {joint.name: joint.vel for joint in request.joints.values if joint.HasField("vel")}
 55                ),
 56            )
 57            return common_pb2.ActionResponse(success=True)
 58        except Exception as e:
 59            logger.error("Reset failed: %s", e)
 60            context.set_code(grpc.StatusCode.INTERNAL)
 61            context.set_details(str(e))
 62            return common_pb2.ActionResponse(success=False, error=str(e))
 63
 64    async def SetPaused(  # noqa: N802
 65        self,
 66        request: sim_pb2.SetPausedRequest,
 67        context: grpc.ServicerContext,
 68    ) -> common_pb2.ActionResponse:  # noqa: N802
 69        """Pause or unpause the simulation."""
 70        try:
 71            return common_pb2.ActionResponse(success=True)
 72        except Exception as e:
 73            logger.error("SetPaused failed: %s", e)
 74            context.set_code(grpc.StatusCode.INTERNAL)
 75            context.set_details(str(e))
 76            return common_pb2.ActionResponse(success=False, error=str(e))
 77
 78    async def Step(  # noqa: N802
 79        self,
 80        request: sim_pb2.StepRequest,
 81        context: grpc.ServicerContext,
 82    ) -> common_pb2.ActionResponse:
 83        raise NotImplementedError("Step is not implemented")
 84
 85    async def SetParameters(  # noqa: N802
 86        self,
 87        request: sim_pb2.SetParametersRequest,
 88        context: grpc.ServicerContext,
 89    ) -> common_pb2.ActionResponse:
 90        """Set simulation parameters."""
 91        try:
 92            params = request.parameters
 93            if params.HasField("time_scale"):
 94                logger.debug("Setting time scale to %f", params.time_scale)
 95                self.simulator._model.opt.timestep = self.simulator._dt / params.time_scale
 96            if params.HasField("gravity"):
 97                logger.debug("Setting gravity to %f", params.gravity)
 98                self.simulator._model.opt.gravity[2] = params.gravity
 99            return common_pb2.ActionResponse(success=True)
100        except Exception as e:
101            logger.error("SetParameters failed: %s", e)
102            context.set_code(grpc.StatusCode.INTERNAL)
103            context.set_details(str(e))
104            return common_pb2.ActionResponse(success=False, error=str(e))
105
106    async def GetParameters(  # noqa: N802
107        self,
108        request: empty_pb2.Empty,
109        context: grpc.ServicerContext,
110    ) -> sim_pb2.GetParametersResponse:
111        """Get current simulation parameters."""
112        try:
113            params = sim_pb2.SimulationParameters(
114                time_scale=self.simulator._dt / self.simulator._model.opt.timestep,
115                gravity=float(self.simulator._model.opt.gravity[2]),
116            )
117            logger.debug("Current parameters: time_scale=%f, gravity=%f", params.time_scale, params.gravity)
118            return sim_pb2.GetParametersResponse(parameters=params)
119        except Exception as e:
120            logger.error("GetParameters failed: %s", e)
121            context.set_code(grpc.StatusCode.INTERNAL)
122            context.set_details(str(e))
123            return sim_pb2.GetParametersResponse(error=common_pb2.Error(message=str(e)))
124
125
126class ActuatorService(actuator_pb2_grpc.ActuatorServiceServicer):
127    """Implementation of ActuatorService that wraps a MuJoCo simulation."""
128
129    def __init__(self, simulator: MujocoSimulator, step_lock: asyncio.Semaphore) -> None:
130        self.simulator = simulator
131        self.step_lock = step_lock
132
133    async def CommandActuators(  # noqa: N802
134        self,
135        request: actuator_pb2.CommandActuatorsRequest,
136        context: grpc.ServicerContext,
137    ) -> actuator_pb2.CommandActuatorsResponse:
138        """Implements CommandActuators by forwarding to simulator."""
139        try:
140            # Convert degrees to radians.
141            commands: dict[int, ActuatorCommand] = {
142                cmd.actuator_id: {
143                    "position": math.radians(cmd.position),
144                    "velocity": math.radians(cmd.velocity),
145                    "torque": cmd.torque,
146                }
147                for cmd in request.commands
148            }
149            async with self.step_lock:
150                await self.simulator.command_actuators(commands)
151            return actuator_pb2.CommandActuatorsResponse()
152        except Exception as e:
153            context.set_code(grpc.StatusCode.INTERNAL)
154            context.set_details(str(e))
155            return actuator_pb2.CommandActuatorsResponse()
156
157    async def GetActuatorsState(  # noqa: N802
158        self,
159        request: actuator_pb2.GetActuatorsStateRequest,
160        context: grpc.ServicerContext,
161    ) -> actuator_pb2.GetActuatorsStateResponse:
162        """Implements GetActuatorsState by reading from simulator."""
163        ids = request.actuator_ids or list(self.simulator._joint_id_to_name.keys())
164        try:
165            states = {joint_id: await self.simulator.get_actuator_state(joint_id) for joint_id in ids}
166            return actuator_pb2.GetActuatorsStateResponse(
167                states=[
168                    actuator_pb2.ActuatorStateResponse(
169                        actuator_id=joint_id,
170                        position=math.degrees(state.position),
171                        velocity=math.degrees(state.velocity),
172                        online=True,
173                    )
174                    for joint_id, state in states.items()
175                ]
176            )
177        except Exception as e:
178            logger.error("GetActuatorsState failed: %s", e)
179            context.set_code(grpc.StatusCode.INTERNAL)
180            context.set_details(str(e))
181            return actuator_pb2.GetActuatorsStateResponse()
182
183    async def ConfigureActuator(  # noqa: N802
184        self,
185        request: actuator_pb2.ConfigureActuatorRequest,
186        context: grpc.ServicerContext,
187    ) -> common_pb2.ActionResponse:
188        configuration: ConfigureActuatorRequest = {}
189        if request.HasField("torque_enabled"):
190            configuration["torque_enabled"] = request.torque_enabled
191        if request.HasField("zero_position"):
192            configuration["zero_position"] = request.zero_position
193        if request.HasField("kp"):
194            configuration["kp"] = request.kp
195        if request.HasField("kd"):
196            configuration["kd"] = request.kd
197        if request.HasField("max_torque"):
198            configuration["max_torque"] = request.max_torque
199        await self.simulator.configure_actuator(request.actuator_id, configuration)
200
201        return common_pb2.ActionResponse(success=True)
202
203
204class IMUService(imu_pb2_grpc.IMUServiceServicer):
205    """Implementation of IMUService that wraps a MuJoCo simulation."""
206
207    def __init__(
208        self,
209        simulator: MujocoSimulator,
210        acc_name: str | None = "imu_acc",
211        gyro_name: str | None = "imu_gyro",
212        mag_name: str | None = "imu_mag",
213        quat_name: str | None = "base_link_quat",
214    ) -> None:
215        self.simulator = simulator
216        self.acc_name = acc_name
217        self.gyro_name = gyro_name
218        self.mag_name = mag_name
219        self.quat_name = quat_name
220
221    async def GetValues(  # noqa: N802
222        self,
223        request: empty_pb2.Empty,
224        context: grpc.ServicerContext,
225    ) -> imu_pb2.IMUValuesResponse:
226        """Implements GetValues by reading IMU sensor data from simulator."""
227        try:
228            if self.acc_name is None or self.gyro_name is None:
229                raise ValueError("Accelerometer or gyroscope name not set")
230            acc_data = await self.simulator.get_sensor_data(self.acc_name)
231            gyro_data = await self.simulator.get_sensor_data(self.gyro_name)
232            mag_data = None if self.mag_name is None else await self.simulator.get_sensor_data(self.mag_name)
233            return imu_pb2.IMUValuesResponse(
234                accel_x=float(acc_data[0]),
235                accel_y=float(acc_data[1]),
236                accel_z=float(acc_data[2]),
237                gyro_x=float(gyro_data[0]),
238                gyro_y=float(gyro_data[1]),
239                gyro_z=float(gyro_data[2]),
240                mag_x=None if mag_data is None else float(mag_data[0]),
241                mag_y=None if mag_data is None else float(mag_data[1]),
242                mag_z=None if mag_data is None else float(mag_data[2]),
243            )
244        except Exception as e:
245            context.set_code(grpc.StatusCode.INTERNAL)
246            context.set_details(str(e))
247            return imu_pb2.IMUValuesResponse()
248
249    async def GetQuaternion(  # noqa: N802
250        self,
251        request: empty_pb2.Empty,
252        context: grpc.ServicerContext,
253    ) -> imu_pb2.QuaternionResponse:
254        """Implements GetQuaternion by reading orientation data from simulator."""
255        try:
256            if self.quat_name is None:
257                raise ValueError("Quaternion name not set")
258            quat_data = await self.simulator.get_sensor_data(self.quat_name)
259            return imu_pb2.QuaternionResponse(
260                w=float(quat_data[0]), x=float(quat_data[1]), y=float(quat_data[2]), z=float(quat_data[3])
261            )
262        except Exception as e:
263            context.set_code(grpc.StatusCode.INTERNAL)
264            context.set_details(str(e))
265            return imu_pb2.QuaternionResponse()
266
267    async def GetEuler(  # noqa: N802
268        self,
269        request: empty_pb2.Empty,
270        context: grpc.ServicerContext,
271    ) -> imu_pb2.EulerAnglesResponse:
272        """Implements GetEuler by converting orientation quaternion to Euler angles."""
273        try:
274            if self.quat_name is None:
275                raise ValueError("Quaternion name not set")
276            quat_data = await self.simulator.get_sensor_data(self.quat_name)
277
278            # Extract quaternion components
279            w, x, y, z = [float(q) for q in quat_data]
280
281            # Convert quaternion to Euler angles (roll, pitch, yaw)
282            # Roll (x-axis rotation)
283            sinr_cosp = 2 * (w * x + y * z)
284            cosr_cosp = 1 - 2 * (x * x + y * y)
285            roll = math.atan2(sinr_cosp, cosr_cosp)
286
287            # Pitch (y-axis rotation)
288            sinp = 2 * (w * y - z * x)
289            pitch = math.asin(sinp) if abs(sinp) < 1 else math.copysign(math.pi / 2, sinp)
290
291            # Yaw (z-axis rotation)
292            siny_cosp = 2 * (w * z + x * y)
293            cosy_cosp = 1 - 2 * (y * y + z * z)
294            yaw = math.atan2(siny_cosp, cosy_cosp)
295
296            # Convert to degrees
297            roll_deg = math.degrees(roll)
298            pitch_deg = math.degrees(pitch)
299            yaw_deg = math.degrees(yaw)
300
301            return imu_pb2.EulerAnglesResponse(roll=roll_deg, pitch=pitch_deg, yaw=yaw_deg)
302        except Exception as e:
303            context.set_code(grpc.StatusCode.INTERNAL)
304            context.set_details(str(e))
305            return imu_pb2.EulerAnglesResponse()
306
307
308class ProcessManagerService(process_manager_pb2_grpc.ProcessManagerServiceServicer):
309    """Implementation of ProcessManagerService that wraps a MuJoCo simulation."""
310
311    def __init__(self, simulator: MujocoSimulator, video_recorder: VideoRecorder) -> None:
312        self.simulator = simulator
313        self.video_recorder = video_recorder
314
315    async def StartKClip(  # noqa: N802
316        self, request: process_manager_pb2.KClipStartRequest, context: grpc.ServicerContext
317    ) -> process_manager_pb2.KClipStartResponse:
318        """Implements StartKClip by starting k-clip recording."""
319        if self.video_recorder is None:
320            return process_manager_pb2.KClipStartResponse(
321                error=common_pb2.Error(
322                    code=common_pb2.ErrorCode.INVALID_ARGUMENT,
323                    message="`video_recorder` is `None`. Video recording not enabled.",
324                )
325            )
326        clip_id = self.video_recorder.start_recording()
327        return process_manager_pb2.KClipStartResponse(clip_uuid=clip_id)
328
329    async def StopKClip(  # noqa: N802
330        self, request: empty_pb2.Empty, context: grpc.ServicerContext
331    ) -> process_manager_pb2.KClipStopResponse:
332        """Implements StopKClip by stopping k-clip recording."""
333        if self.video_recorder is None:
334            return process_manager_pb2.KClipStopResponse(
335                error=common_pb2.Error(
336                    code=common_pb2.ErrorCode.INVALID_ARGUMENT,
337                    message="`video_recorder` is `None`. Video recording not enabled.",
338                )
339            )
340        self.video_recorder.stop_recording()
341        return process_manager_pb2.KClipStopResponse()
class SimService(kos_protos.sim_pb2_grpc.SimulationServiceServicer):
 26class SimService(sim_pb2_grpc.SimulationServiceServicer):
 27    """Implementation of SimService that wraps a MuJoCo simulation."""
 28
 29    def __init__(self, simulator: MujocoSimulator) -> None:
 30        self.simulator = simulator
 31
 32    async def Reset(  # noqa: N802
 33        self,
 34        request: sim_pb2.ResetRequest,
 35        context: grpc.ServicerContext,
 36    ) -> common_pb2.ActionResponse:
 37        """Reset the simulation to initial or specified state."""
 38        try:
 39            logger.debug("Resetting simulator")
 40            await self.simulator.reset(
 41                xyz=None if request.HasField("pos") is None else (request.pos.x, request.pos.y, request.pos.z),
 42                quat=(
 43                    None
 44                    if request.HasField("quat") is None
 45                    else (request.quat.w, request.quat.x, request.quat.y, request.quat.z)
 46                ),
 47                joint_pos=(
 48                    None
 49                    if request.joints is None
 50                    else {joint.name: joint.pos for joint in request.joints.values if joint.HasField("pos")}
 51                ),
 52                joint_vel=(
 53                    None
 54                    if request.joints is None
 55                    else {joint.name: joint.vel for joint in request.joints.values if joint.HasField("vel")}
 56                ),
 57            )
 58            return common_pb2.ActionResponse(success=True)
 59        except Exception as e:
 60            logger.error("Reset failed: %s", e)
 61            context.set_code(grpc.StatusCode.INTERNAL)
 62            context.set_details(str(e))
 63            return common_pb2.ActionResponse(success=False, error=str(e))
 64
 65    async def SetPaused(  # noqa: N802
 66        self,
 67        request: sim_pb2.SetPausedRequest,
 68        context: grpc.ServicerContext,
 69    ) -> common_pb2.ActionResponse:  # noqa: N802
 70        """Pause or unpause the simulation."""
 71        try:
 72            return common_pb2.ActionResponse(success=True)
 73        except Exception as e:
 74            logger.error("SetPaused failed: %s", e)
 75            context.set_code(grpc.StatusCode.INTERNAL)
 76            context.set_details(str(e))
 77            return common_pb2.ActionResponse(success=False, error=str(e))
 78
 79    async def Step(  # noqa: N802
 80        self,
 81        request: sim_pb2.StepRequest,
 82        context: grpc.ServicerContext,
 83    ) -> common_pb2.ActionResponse:
 84        raise NotImplementedError("Step is not implemented")
 85
 86    async def SetParameters(  # noqa: N802
 87        self,
 88        request: sim_pb2.SetParametersRequest,
 89        context: grpc.ServicerContext,
 90    ) -> common_pb2.ActionResponse:
 91        """Set simulation parameters."""
 92        try:
 93            params = request.parameters
 94            if params.HasField("time_scale"):
 95                logger.debug("Setting time scale to %f", params.time_scale)
 96                self.simulator._model.opt.timestep = self.simulator._dt / params.time_scale
 97            if params.HasField("gravity"):
 98                logger.debug("Setting gravity to %f", params.gravity)
 99                self.simulator._model.opt.gravity[2] = params.gravity
100            return common_pb2.ActionResponse(success=True)
101        except Exception as e:
102            logger.error("SetParameters failed: %s", e)
103            context.set_code(grpc.StatusCode.INTERNAL)
104            context.set_details(str(e))
105            return common_pb2.ActionResponse(success=False, error=str(e))
106
107    async def GetParameters(  # noqa: N802
108        self,
109        request: empty_pb2.Empty,
110        context: grpc.ServicerContext,
111    ) -> sim_pb2.GetParametersResponse:
112        """Get current simulation parameters."""
113        try:
114            params = sim_pb2.SimulationParameters(
115                time_scale=self.simulator._dt / self.simulator._model.opt.timestep,
116                gravity=float(self.simulator._model.opt.gravity[2]),
117            )
118            logger.debug("Current parameters: time_scale=%f, gravity=%f", params.time_scale, params.gravity)
119            return sim_pb2.GetParametersResponse(parameters=params)
120        except Exception as e:
121            logger.error("GetParameters failed: %s", e)
122            context.set_code(grpc.StatusCode.INTERNAL)
123            context.set_details(str(e))
124            return sim_pb2.GetParametersResponse(error=common_pb2.Error(message=str(e)))

Implementation of SimService that wraps a MuJoCo simulation.

SimService(simulator: kos_sim.simulator.MujocoSimulator)
29    def __init__(self, simulator: MujocoSimulator) -> None:
30        self.simulator = simulator
simulator
async def Reset( self, request: kos.sim_pb2.ResetRequest, context: grpc.ServicerContext) -> kos.common_pb2.ActionResponse:
32    async def Reset(  # noqa: N802
33        self,
34        request: sim_pb2.ResetRequest,
35        context: grpc.ServicerContext,
36    ) -> common_pb2.ActionResponse:
37        """Reset the simulation to initial or specified state."""
38        try:
39            logger.debug("Resetting simulator")
40            await self.simulator.reset(
41                xyz=None if request.HasField("pos") is None else (request.pos.x, request.pos.y, request.pos.z),
42                quat=(
43                    None
44                    if request.HasField("quat") is None
45                    else (request.quat.w, request.quat.x, request.quat.y, request.quat.z)
46                ),
47                joint_pos=(
48                    None
49                    if request.joints is None
50                    else {joint.name: joint.pos for joint in request.joints.values if joint.HasField("pos")}
51                ),
52                joint_vel=(
53                    None
54                    if request.joints is None
55                    else {joint.name: joint.vel for joint in request.joints.values if joint.HasField("vel")}
56                ),
57            )
58            return common_pb2.ActionResponse(success=True)
59        except Exception as e:
60            logger.error("Reset failed: %s", e)
61            context.set_code(grpc.StatusCode.INTERNAL)
62            context.set_details(str(e))
63            return common_pb2.ActionResponse(success=False, error=str(e))

Reset the simulation to initial or specified state.

async def SetPaused( self, request: kos.sim_pb2.SetPausedRequest, context: grpc.ServicerContext) -> kos.common_pb2.ActionResponse:
65    async def SetPaused(  # noqa: N802
66        self,
67        request: sim_pb2.SetPausedRequest,
68        context: grpc.ServicerContext,
69    ) -> common_pb2.ActionResponse:  # noqa: N802
70        """Pause or unpause the simulation."""
71        try:
72            return common_pb2.ActionResponse(success=True)
73        except Exception as e:
74            logger.error("SetPaused failed: %s", e)
75            context.set_code(grpc.StatusCode.INTERNAL)
76            context.set_details(str(e))
77            return common_pb2.ActionResponse(success=False, error=str(e))

Pause or unpause the simulation.

async def Step( self, request: kos.sim_pb2.StepRequest, context: grpc.ServicerContext) -> kos.common_pb2.ActionResponse:
79    async def Step(  # noqa: N802
80        self,
81        request: sim_pb2.StepRequest,
82        context: grpc.ServicerContext,
83    ) -> common_pb2.ActionResponse:
84        raise NotImplementedError("Step is not implemented")

Steps the simulation forward by a specified amount.

async def SetParameters( self, request: kos.sim_pb2.SetParametersRequest, context: grpc.ServicerContext) -> kos.common_pb2.ActionResponse:
 86    async def SetParameters(  # noqa: N802
 87        self,
 88        request: sim_pb2.SetParametersRequest,
 89        context: grpc.ServicerContext,
 90    ) -> common_pb2.ActionResponse:
 91        """Set simulation parameters."""
 92        try:
 93            params = request.parameters
 94            if params.HasField("time_scale"):
 95                logger.debug("Setting time scale to %f", params.time_scale)
 96                self.simulator._model.opt.timestep = self.simulator._dt / params.time_scale
 97            if params.HasField("gravity"):
 98                logger.debug("Setting gravity to %f", params.gravity)
 99                self.simulator._model.opt.gravity[2] = params.gravity
100            return common_pb2.ActionResponse(success=True)
101        except Exception as e:
102            logger.error("SetParameters failed: %s", e)
103            context.set_code(grpc.StatusCode.INTERNAL)
104            context.set_details(str(e))
105            return common_pb2.ActionResponse(success=False, error=str(e))

Set simulation parameters.

async def GetParameters( self, request: google.protobuf.empty_pb2.Empty, context: grpc.ServicerContext) -> kos.sim_pb2.GetParametersResponse:
107    async def GetParameters(  # noqa: N802
108        self,
109        request: empty_pb2.Empty,
110        context: grpc.ServicerContext,
111    ) -> sim_pb2.GetParametersResponse:
112        """Get current simulation parameters."""
113        try:
114            params = sim_pb2.SimulationParameters(
115                time_scale=self.simulator._dt / self.simulator._model.opt.timestep,
116                gravity=float(self.simulator._model.opt.gravity[2]),
117            )
118            logger.debug("Current parameters: time_scale=%f, gravity=%f", params.time_scale, params.gravity)
119            return sim_pb2.GetParametersResponse(parameters=params)
120        except Exception as e:
121            logger.error("GetParameters failed: %s", e)
122            context.set_code(grpc.StatusCode.INTERNAL)
123            context.set_details(str(e))
124            return sim_pb2.GetParametersResponse(error=common_pb2.Error(message=str(e)))

Get current simulation parameters.

class ActuatorService(kos_protos.actuator_pb2_grpc.ActuatorServiceServicer):
127class ActuatorService(actuator_pb2_grpc.ActuatorServiceServicer):
128    """Implementation of ActuatorService that wraps a MuJoCo simulation."""
129
130    def __init__(self, simulator: MujocoSimulator, step_lock: asyncio.Semaphore) -> None:
131        self.simulator = simulator
132        self.step_lock = step_lock
133
134    async def CommandActuators(  # noqa: N802
135        self,
136        request: actuator_pb2.CommandActuatorsRequest,
137        context: grpc.ServicerContext,
138    ) -> actuator_pb2.CommandActuatorsResponse:
139        """Implements CommandActuators by forwarding to simulator."""
140        try:
141            # Convert degrees to radians.
142            commands: dict[int, ActuatorCommand] = {
143                cmd.actuator_id: {
144                    "position": math.radians(cmd.position),
145                    "velocity": math.radians(cmd.velocity),
146                    "torque": cmd.torque,
147                }
148                for cmd in request.commands
149            }
150            async with self.step_lock:
151                await self.simulator.command_actuators(commands)
152            return actuator_pb2.CommandActuatorsResponse()
153        except Exception as e:
154            context.set_code(grpc.StatusCode.INTERNAL)
155            context.set_details(str(e))
156            return actuator_pb2.CommandActuatorsResponse()
157
158    async def GetActuatorsState(  # noqa: N802
159        self,
160        request: actuator_pb2.GetActuatorsStateRequest,
161        context: grpc.ServicerContext,
162    ) -> actuator_pb2.GetActuatorsStateResponse:
163        """Implements GetActuatorsState by reading from simulator."""
164        ids = request.actuator_ids or list(self.simulator._joint_id_to_name.keys())
165        try:
166            states = {joint_id: await self.simulator.get_actuator_state(joint_id) for joint_id in ids}
167            return actuator_pb2.GetActuatorsStateResponse(
168                states=[
169                    actuator_pb2.ActuatorStateResponse(
170                        actuator_id=joint_id,
171                        position=math.degrees(state.position),
172                        velocity=math.degrees(state.velocity),
173                        online=True,
174                    )
175                    for joint_id, state in states.items()
176                ]
177            )
178        except Exception as e:
179            logger.error("GetActuatorsState failed: %s", e)
180            context.set_code(grpc.StatusCode.INTERNAL)
181            context.set_details(str(e))
182            return actuator_pb2.GetActuatorsStateResponse()
183
184    async def ConfigureActuator(  # noqa: N802
185        self,
186        request: actuator_pb2.ConfigureActuatorRequest,
187        context: grpc.ServicerContext,
188    ) -> common_pb2.ActionResponse:
189        configuration: ConfigureActuatorRequest = {}
190        if request.HasField("torque_enabled"):
191            configuration["torque_enabled"] = request.torque_enabled
192        if request.HasField("zero_position"):
193            configuration["zero_position"] = request.zero_position
194        if request.HasField("kp"):
195            configuration["kp"] = request.kp
196        if request.HasField("kd"):
197            configuration["kd"] = request.kd
198        if request.HasField("max_torque"):
199            configuration["max_torque"] = request.max_torque
200        await self.simulator.configure_actuator(request.actuator_id, configuration)
201
202        return common_pb2.ActionResponse(success=True)

Implementation of ActuatorService that wraps a MuJoCo simulation.

ActuatorService( simulator: kos_sim.simulator.MujocoSimulator, step_lock: asyncio.locks.Semaphore)
130    def __init__(self, simulator: MujocoSimulator, step_lock: asyncio.Semaphore) -> None:
131        self.simulator = simulator
132        self.step_lock = step_lock
simulator
step_lock
async def CommandActuators( self, request: kos.actuator_pb2.CommandActuatorsRequest, context: grpc.ServicerContext) -> kos.actuator_pb2.CommandActuatorsResponse:
134    async def CommandActuators(  # noqa: N802
135        self,
136        request: actuator_pb2.CommandActuatorsRequest,
137        context: grpc.ServicerContext,
138    ) -> actuator_pb2.CommandActuatorsResponse:
139        """Implements CommandActuators by forwarding to simulator."""
140        try:
141            # Convert degrees to radians.
142            commands: dict[int, ActuatorCommand] = {
143                cmd.actuator_id: {
144                    "position": math.radians(cmd.position),
145                    "velocity": math.radians(cmd.velocity),
146                    "torque": cmd.torque,
147                }
148                for cmd in request.commands
149            }
150            async with self.step_lock:
151                await self.simulator.command_actuators(commands)
152            return actuator_pb2.CommandActuatorsResponse()
153        except Exception as e:
154            context.set_code(grpc.StatusCode.INTERNAL)
155            context.set_details(str(e))
156            return actuator_pb2.CommandActuatorsResponse()

Implements CommandActuators by forwarding to simulator.

async def GetActuatorsState( self, request: kos.actuator_pb2.GetActuatorsStateRequest, context: grpc.ServicerContext) -> kos.actuator_pb2.GetActuatorsStateResponse:
158    async def GetActuatorsState(  # noqa: N802
159        self,
160        request: actuator_pb2.GetActuatorsStateRequest,
161        context: grpc.ServicerContext,
162    ) -> actuator_pb2.GetActuatorsStateResponse:
163        """Implements GetActuatorsState by reading from simulator."""
164        ids = request.actuator_ids or list(self.simulator._joint_id_to_name.keys())
165        try:
166            states = {joint_id: await self.simulator.get_actuator_state(joint_id) for joint_id in ids}
167            return actuator_pb2.GetActuatorsStateResponse(
168                states=[
169                    actuator_pb2.ActuatorStateResponse(
170                        actuator_id=joint_id,
171                        position=math.degrees(state.position),
172                        velocity=math.degrees(state.velocity),
173                        online=True,
174                    )
175                    for joint_id, state in states.items()
176                ]
177            )
178        except Exception as e:
179            logger.error("GetActuatorsState failed: %s", e)
180            context.set_code(grpc.StatusCode.INTERNAL)
181            context.set_details(str(e))
182            return actuator_pb2.GetActuatorsStateResponse()

Implements GetActuatorsState by reading from simulator.

async def ConfigureActuator( self, request: kos.actuator_pb2.ConfigureActuatorRequest, context: grpc.ServicerContext) -> kos.common_pb2.ActionResponse:
184    async def ConfigureActuator(  # noqa: N802
185        self,
186        request: actuator_pb2.ConfigureActuatorRequest,
187        context: grpc.ServicerContext,
188    ) -> common_pb2.ActionResponse:
189        configuration: ConfigureActuatorRequest = {}
190        if request.HasField("torque_enabled"):
191            configuration["torque_enabled"] = request.torque_enabled
192        if request.HasField("zero_position"):
193            configuration["zero_position"] = request.zero_position
194        if request.HasField("kp"):
195            configuration["kp"] = request.kp
196        if request.HasField("kd"):
197            configuration["kd"] = request.kd
198        if request.HasField("max_torque"):
199            configuration["max_torque"] = request.max_torque
200        await self.simulator.configure_actuator(request.actuator_id, configuration)
201
202        return common_pb2.ActionResponse(success=True)

Configures an actuator's parameters.

class IMUService(kos_protos.imu_pb2_grpc.IMUServiceServicer):
205class IMUService(imu_pb2_grpc.IMUServiceServicer):
206    """Implementation of IMUService that wraps a MuJoCo simulation."""
207
208    def __init__(
209        self,
210        simulator: MujocoSimulator,
211        acc_name: str | None = "imu_acc",
212        gyro_name: str | None = "imu_gyro",
213        mag_name: str | None = "imu_mag",
214        quat_name: str | None = "base_link_quat",
215    ) -> None:
216        self.simulator = simulator
217        self.acc_name = acc_name
218        self.gyro_name = gyro_name
219        self.mag_name = mag_name
220        self.quat_name = quat_name
221
222    async def GetValues(  # noqa: N802
223        self,
224        request: empty_pb2.Empty,
225        context: grpc.ServicerContext,
226    ) -> imu_pb2.IMUValuesResponse:
227        """Implements GetValues by reading IMU sensor data from simulator."""
228        try:
229            if self.acc_name is None or self.gyro_name is None:
230                raise ValueError("Accelerometer or gyroscope name not set")
231            acc_data = await self.simulator.get_sensor_data(self.acc_name)
232            gyro_data = await self.simulator.get_sensor_data(self.gyro_name)
233            mag_data = None if self.mag_name is None else await self.simulator.get_sensor_data(self.mag_name)
234            return imu_pb2.IMUValuesResponse(
235                accel_x=float(acc_data[0]),
236                accel_y=float(acc_data[1]),
237                accel_z=float(acc_data[2]),
238                gyro_x=float(gyro_data[0]),
239                gyro_y=float(gyro_data[1]),
240                gyro_z=float(gyro_data[2]),
241                mag_x=None if mag_data is None else float(mag_data[0]),
242                mag_y=None if mag_data is None else float(mag_data[1]),
243                mag_z=None if mag_data is None else float(mag_data[2]),
244            )
245        except Exception as e:
246            context.set_code(grpc.StatusCode.INTERNAL)
247            context.set_details(str(e))
248            return imu_pb2.IMUValuesResponse()
249
250    async def GetQuaternion(  # noqa: N802
251        self,
252        request: empty_pb2.Empty,
253        context: grpc.ServicerContext,
254    ) -> imu_pb2.QuaternionResponse:
255        """Implements GetQuaternion by reading orientation data from simulator."""
256        try:
257            if self.quat_name is None:
258                raise ValueError("Quaternion name not set")
259            quat_data = await self.simulator.get_sensor_data(self.quat_name)
260            return imu_pb2.QuaternionResponse(
261                w=float(quat_data[0]), x=float(quat_data[1]), y=float(quat_data[2]), z=float(quat_data[3])
262            )
263        except Exception as e:
264            context.set_code(grpc.StatusCode.INTERNAL)
265            context.set_details(str(e))
266            return imu_pb2.QuaternionResponse()
267
268    async def GetEuler(  # noqa: N802
269        self,
270        request: empty_pb2.Empty,
271        context: grpc.ServicerContext,
272    ) -> imu_pb2.EulerAnglesResponse:
273        """Implements GetEuler by converting orientation quaternion to Euler angles."""
274        try:
275            if self.quat_name is None:
276                raise ValueError("Quaternion name not set")
277            quat_data = await self.simulator.get_sensor_data(self.quat_name)
278
279            # Extract quaternion components
280            w, x, y, z = [float(q) for q in quat_data]
281
282            # Convert quaternion to Euler angles (roll, pitch, yaw)
283            # Roll (x-axis rotation)
284            sinr_cosp = 2 * (w * x + y * z)
285            cosr_cosp = 1 - 2 * (x * x + y * y)
286            roll = math.atan2(sinr_cosp, cosr_cosp)
287
288            # Pitch (y-axis rotation)
289            sinp = 2 * (w * y - z * x)
290            pitch = math.asin(sinp) if abs(sinp) < 1 else math.copysign(math.pi / 2, sinp)
291
292            # Yaw (z-axis rotation)
293            siny_cosp = 2 * (w * z + x * y)
294            cosy_cosp = 1 - 2 * (y * y + z * z)
295            yaw = math.atan2(siny_cosp, cosy_cosp)
296
297            # Convert to degrees
298            roll_deg = math.degrees(roll)
299            pitch_deg = math.degrees(pitch)
300            yaw_deg = math.degrees(yaw)
301
302            return imu_pb2.EulerAnglesResponse(roll=roll_deg, pitch=pitch_deg, yaw=yaw_deg)
303        except Exception as e:
304            context.set_code(grpc.StatusCode.INTERNAL)
305            context.set_details(str(e))
306            return imu_pb2.EulerAnglesResponse()

Implementation of IMUService that wraps a MuJoCo simulation.

IMUService( simulator: kos_sim.simulator.MujocoSimulator, acc_name: str | None = 'imu_acc', gyro_name: str | None = 'imu_gyro', mag_name: str | None = 'imu_mag', quat_name: str | None = 'base_link_quat')
208    def __init__(
209        self,
210        simulator: MujocoSimulator,
211        acc_name: str | None = "imu_acc",
212        gyro_name: str | None = "imu_gyro",
213        mag_name: str | None = "imu_mag",
214        quat_name: str | None = "base_link_quat",
215    ) -> None:
216        self.simulator = simulator
217        self.acc_name = acc_name
218        self.gyro_name = gyro_name
219        self.mag_name = mag_name
220        self.quat_name = quat_name
simulator
acc_name
gyro_name
mag_name
quat_name
async def GetValues( self, request: google.protobuf.empty_pb2.Empty, context: grpc.ServicerContext) -> kos.imu_pb2.IMUValuesResponse:
222    async def GetValues(  # noqa: N802
223        self,
224        request: empty_pb2.Empty,
225        context: grpc.ServicerContext,
226    ) -> imu_pb2.IMUValuesResponse:
227        """Implements GetValues by reading IMU sensor data from simulator."""
228        try:
229            if self.acc_name is None or self.gyro_name is None:
230                raise ValueError("Accelerometer or gyroscope name not set")
231            acc_data = await self.simulator.get_sensor_data(self.acc_name)
232            gyro_data = await self.simulator.get_sensor_data(self.gyro_name)
233            mag_data = None if self.mag_name is None else await self.simulator.get_sensor_data(self.mag_name)
234            return imu_pb2.IMUValuesResponse(
235                accel_x=float(acc_data[0]),
236                accel_y=float(acc_data[1]),
237                accel_z=float(acc_data[2]),
238                gyro_x=float(gyro_data[0]),
239                gyro_y=float(gyro_data[1]),
240                gyro_z=float(gyro_data[2]),
241                mag_x=None if mag_data is None else float(mag_data[0]),
242                mag_y=None if mag_data is None else float(mag_data[1]),
243                mag_z=None if mag_data is None else float(mag_data[2]),
244            )
245        except Exception as e:
246            context.set_code(grpc.StatusCode.INTERNAL)
247            context.set_details(str(e))
248            return imu_pb2.IMUValuesResponse()

Implements GetValues by reading IMU sensor data from simulator.

async def GetQuaternion( self, request: google.protobuf.empty_pb2.Empty, context: grpc.ServicerContext) -> kos.imu_pb2.QuaternionResponse:
250    async def GetQuaternion(  # noqa: N802
251        self,
252        request: empty_pb2.Empty,
253        context: grpc.ServicerContext,
254    ) -> imu_pb2.QuaternionResponse:
255        """Implements GetQuaternion by reading orientation data from simulator."""
256        try:
257            if self.quat_name is None:
258                raise ValueError("Quaternion name not set")
259            quat_data = await self.simulator.get_sensor_data(self.quat_name)
260            return imu_pb2.QuaternionResponse(
261                w=float(quat_data[0]), x=float(quat_data[1]), y=float(quat_data[2]), z=float(quat_data[3])
262            )
263        except Exception as e:
264            context.set_code(grpc.StatusCode.INTERNAL)
265            context.set_details(str(e))
266            return imu_pb2.QuaternionResponse()

Implements GetQuaternion by reading orientation data from simulator.

async def GetEuler( self, request: google.protobuf.empty_pb2.Empty, context: grpc.ServicerContext) -> kos.imu_pb2.EulerAnglesResponse:
268    async def GetEuler(  # noqa: N802
269        self,
270        request: empty_pb2.Empty,
271        context: grpc.ServicerContext,
272    ) -> imu_pb2.EulerAnglesResponse:
273        """Implements GetEuler by converting orientation quaternion to Euler angles."""
274        try:
275            if self.quat_name is None:
276                raise ValueError("Quaternion name not set")
277            quat_data = await self.simulator.get_sensor_data(self.quat_name)
278
279            # Extract quaternion components
280            w, x, y, z = [float(q) for q in quat_data]
281
282            # Convert quaternion to Euler angles (roll, pitch, yaw)
283            # Roll (x-axis rotation)
284            sinr_cosp = 2 * (w * x + y * z)
285            cosr_cosp = 1 - 2 * (x * x + y * y)
286            roll = math.atan2(sinr_cosp, cosr_cosp)
287
288            # Pitch (y-axis rotation)
289            sinp = 2 * (w * y - z * x)
290            pitch = math.asin(sinp) if abs(sinp) < 1 else math.copysign(math.pi / 2, sinp)
291
292            # Yaw (z-axis rotation)
293            siny_cosp = 2 * (w * z + x * y)
294            cosy_cosp = 1 - 2 * (y * y + z * z)
295            yaw = math.atan2(siny_cosp, cosy_cosp)
296
297            # Convert to degrees
298            roll_deg = math.degrees(roll)
299            pitch_deg = math.degrees(pitch)
300            yaw_deg = math.degrees(yaw)
301
302            return imu_pb2.EulerAnglesResponse(roll=roll_deg, pitch=pitch_deg, yaw=yaw_deg)
303        except Exception as e:
304            context.set_code(grpc.StatusCode.INTERNAL)
305            context.set_details(str(e))
306            return imu_pb2.EulerAnglesResponse()

Implements GetEuler by converting orientation quaternion to Euler angles.

class ProcessManagerService(kos_protos.process_manager_pb2_grpc.ProcessManagerServiceServicer):
309class ProcessManagerService(process_manager_pb2_grpc.ProcessManagerServiceServicer):
310    """Implementation of ProcessManagerService that wraps a MuJoCo simulation."""
311
312    def __init__(self, simulator: MujocoSimulator, video_recorder: VideoRecorder) -> None:
313        self.simulator = simulator
314        self.video_recorder = video_recorder
315
316    async def StartKClip(  # noqa: N802
317        self, request: process_manager_pb2.KClipStartRequest, context: grpc.ServicerContext
318    ) -> process_manager_pb2.KClipStartResponse:
319        """Implements StartKClip by starting k-clip recording."""
320        if self.video_recorder is None:
321            return process_manager_pb2.KClipStartResponse(
322                error=common_pb2.Error(
323                    code=common_pb2.ErrorCode.INVALID_ARGUMENT,
324                    message="`video_recorder` is `None`. Video recording not enabled.",
325                )
326            )
327        clip_id = self.video_recorder.start_recording()
328        return process_manager_pb2.KClipStartResponse(clip_uuid=clip_id)
329
330    async def StopKClip(  # noqa: N802
331        self, request: empty_pb2.Empty, context: grpc.ServicerContext
332    ) -> process_manager_pb2.KClipStopResponse:
333        """Implements StopKClip by stopping k-clip recording."""
334        if self.video_recorder is None:
335            return process_manager_pb2.KClipStopResponse(
336                error=common_pb2.Error(
337                    code=common_pb2.ErrorCode.INVALID_ARGUMENT,
338                    message="`video_recorder` is `None`. Video recording not enabled.",
339                )
340            )
341        self.video_recorder.stop_recording()
342        return process_manager_pb2.KClipStopResponse()

Implementation of ProcessManagerService that wraps a MuJoCo simulation.

ProcessManagerService( simulator: kos_sim.simulator.MujocoSimulator, video_recorder: kos_sim.video_recorder.VideoRecorder)
312    def __init__(self, simulator: MujocoSimulator, video_recorder: VideoRecorder) -> None:
313        self.simulator = simulator
314        self.video_recorder = video_recorder
simulator
video_recorder
async def StartKClip( self, request: kos.process_manager_pb2.KClipStartRequest, context: grpc.ServicerContext) -> kos.process_manager_pb2.KClipStartResponse:
316    async def StartKClip(  # noqa: N802
317        self, request: process_manager_pb2.KClipStartRequest, context: grpc.ServicerContext
318    ) -> process_manager_pb2.KClipStartResponse:
319        """Implements StartKClip by starting k-clip recording."""
320        if self.video_recorder is None:
321            return process_manager_pb2.KClipStartResponse(
322                error=common_pb2.Error(
323                    code=common_pb2.ErrorCode.INVALID_ARGUMENT,
324                    message="`video_recorder` is `None`. Video recording not enabled.",
325                )
326            )
327        clip_id = self.video_recorder.start_recording()
328        return process_manager_pb2.KClipStartResponse(clip_uuid=clip_id)

Implements StartKClip by starting k-clip recording.

async def StopKClip( self, request: google.protobuf.empty_pb2.Empty, context: grpc.ServicerContext) -> kos.process_manager_pb2.KClipStopResponse:
330    async def StopKClip(  # noqa: N802
331        self, request: empty_pb2.Empty, context: grpc.ServicerContext
332    ) -> process_manager_pb2.KClipStopResponse:
333        """Implements StopKClip by stopping k-clip recording."""
334        if self.video_recorder is None:
335            return process_manager_pb2.KClipStopResponse(
336                error=common_pb2.Error(
337                    code=common_pb2.ErrorCode.INVALID_ARGUMENT,
338                    message="`video_recorder` is `None`. Video recording not enabled.",
339                )
340            )
341        self.video_recorder.stop_recording()
342        return process_manager_pb2.KClipStopResponse()

Implements StopKClip by stopping k-clip recording.