kos_sim.video_recorder

Video recording functionality for the KOS simulator.

  1"""Video recording functionality for the KOS simulator."""
  2
  3import asyncio
  4import datetime
  5import os
  6import time
  7from pathlib import Path
  8
  9import mediapy as mp
 10import numpy as np
 11
 12from kos_sim import logger
 13from kos_sim.simulator import MujocoSimulator
 14
 15
 16class VideoRecorder:
 17    """Manages video recording for a MuJoCo simulator."""
 18
 19    def __init__(
 20        self,
 21        simulator: MujocoSimulator,
 22        output_dir: str | Path,
 23        fps: int = 30,
 24        frame_width: int = 640,
 25        frame_height: int = 480,
 26    ) -> None:
 27        self.simulator = simulator
 28        self.output_dir = Path(output_dir)
 29        self.fps = fps
 30        self.frame_width = frame_width
 31        self.frame_height = frame_height
 32
 33        self.last_frame_time = 0.0
 34        self.writer = None
 35        self.is_recording = False
 36        self.recording_task: asyncio.Task[None] | None = None
 37        self.frames: list[np.ndarray] = []
 38
 39        os.makedirs(self.output_dir, exist_ok=True)
 40
 41    def start_recording(self, custom_filename: str | None = None) -> str:
 42        """Start recording video.
 43
 44        Args:
 45            custom_filename: Optional custom filename (without extension)
 46
 47        Returns:
 48            Path to the output video file
 49        """
 50        if self.is_recording:
 51            self.stop_recording()
 52
 53        # Generate filename based on datetime if not provided
 54        if custom_filename is None:
 55            timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
 56            filename = f"sim_recording_{timestamp}.mp4"
 57        else:
 58            filename = f"{custom_filename}.mp4"
 59
 60        self.filepath = self.output_dir / filename
 61
 62        self.frames = []
 63        self.last_frame_time = time.time()
 64        self.is_recording = True
 65
 66        logger.info(f"Started video recording to {self.filepath}")
 67        return str(self.filepath)
 68
 69    def stop_recording(self) -> None:
 70        """Stop recording and save the video to disk."""
 71        if not self.is_recording:
 72            return
 73
 74        self.is_recording = False
 75
 76        if self.recording_task and not self.recording_task.done():
 77            self.recording_task.cancel()
 78
 79        # This will introduce a delay server-side... there is probably a better way to do this
 80        if self.frames:
 81            try:
 82                logger.info(f"Writing {len(self.frames)} frames to {self.filepath}")
 83                mp.write_video(str(self.filepath), self.frames, fps=self.fps)
 84                logger.info("Video recording stopped and saved")
 85            except Exception as e:
 86                logger.error(f"Error saving video: {e}")
 87        else:
 88            logger.warning("No frames captured, no video file created")
 89
 90        self.frames = []
 91
 92    async def capture_frame(self) -> None:
 93        """Capture and add a single frame to the recording."""
 94        if not self.is_recording:
 95            return
 96
 97        try:
 98            frame, _ = await self.simulator.capture_frame()
 99            self.frames.append(frame)
100
101        except Exception as e:
102            logger.error(f"Error capturing frame: {e}")
class VideoRecorder:
 17class VideoRecorder:
 18    """Manages video recording for a MuJoCo simulator."""
 19
 20    def __init__(
 21        self,
 22        simulator: MujocoSimulator,
 23        output_dir: str | Path,
 24        fps: int = 30,
 25        frame_width: int = 640,
 26        frame_height: int = 480,
 27    ) -> None:
 28        self.simulator = simulator
 29        self.output_dir = Path(output_dir)
 30        self.fps = fps
 31        self.frame_width = frame_width
 32        self.frame_height = frame_height
 33
 34        self.last_frame_time = 0.0
 35        self.writer = None
 36        self.is_recording = False
 37        self.recording_task: asyncio.Task[None] | None = None
 38        self.frames: list[np.ndarray] = []
 39
 40        os.makedirs(self.output_dir, exist_ok=True)
 41
 42    def start_recording(self, custom_filename: str | None = None) -> str:
 43        """Start recording video.
 44
 45        Args:
 46            custom_filename: Optional custom filename (without extension)
 47
 48        Returns:
 49            Path to the output video file
 50        """
 51        if self.is_recording:
 52            self.stop_recording()
 53
 54        # Generate filename based on datetime if not provided
 55        if custom_filename is None:
 56            timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
 57            filename = f"sim_recording_{timestamp}.mp4"
 58        else:
 59            filename = f"{custom_filename}.mp4"
 60
 61        self.filepath = self.output_dir / filename
 62
 63        self.frames = []
 64        self.last_frame_time = time.time()
 65        self.is_recording = True
 66
 67        logger.info(f"Started video recording to {self.filepath}")
 68        return str(self.filepath)
 69
 70    def stop_recording(self) -> None:
 71        """Stop recording and save the video to disk."""
 72        if not self.is_recording:
 73            return
 74
 75        self.is_recording = False
 76
 77        if self.recording_task and not self.recording_task.done():
 78            self.recording_task.cancel()
 79
 80        # This will introduce a delay server-side... there is probably a better way to do this
 81        if self.frames:
 82            try:
 83                logger.info(f"Writing {len(self.frames)} frames to {self.filepath}")
 84                mp.write_video(str(self.filepath), self.frames, fps=self.fps)
 85                logger.info("Video recording stopped and saved")
 86            except Exception as e:
 87                logger.error(f"Error saving video: {e}")
 88        else:
 89            logger.warning("No frames captured, no video file created")
 90
 91        self.frames = []
 92
 93    async def capture_frame(self) -> None:
 94        """Capture and add a single frame to the recording."""
 95        if not self.is_recording:
 96            return
 97
 98        try:
 99            frame, _ = await self.simulator.capture_frame()
100            self.frames.append(frame)
101
102        except Exception as e:
103            logger.error(f"Error capturing frame: {e}")

Manages video recording for a MuJoCo simulator.

VideoRecorder( simulator: kos_sim.simulator.MujocoSimulator, output_dir: str | pathlib.Path, fps: int = 30, frame_width: int = 640, frame_height: int = 480)
20    def __init__(
21        self,
22        simulator: MujocoSimulator,
23        output_dir: str | Path,
24        fps: int = 30,
25        frame_width: int = 640,
26        frame_height: int = 480,
27    ) -> None:
28        self.simulator = simulator
29        self.output_dir = Path(output_dir)
30        self.fps = fps
31        self.frame_width = frame_width
32        self.frame_height = frame_height
33
34        self.last_frame_time = 0.0
35        self.writer = None
36        self.is_recording = False
37        self.recording_task: asyncio.Task[None] | None = None
38        self.frames: list[np.ndarray] = []
39
40        os.makedirs(self.output_dir, exist_ok=True)
simulator
output_dir
fps
frame_width
frame_height
last_frame_time
writer
is_recording
recording_task: _asyncio.Task[None] | None
frames: list[numpy.ndarray]
def start_recording(self, custom_filename: str | None = None) -> str:
42    def start_recording(self, custom_filename: str | None = None) -> str:
43        """Start recording video.
44
45        Args:
46            custom_filename: Optional custom filename (without extension)
47
48        Returns:
49            Path to the output video file
50        """
51        if self.is_recording:
52            self.stop_recording()
53
54        # Generate filename based on datetime if not provided
55        if custom_filename is None:
56            timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
57            filename = f"sim_recording_{timestamp}.mp4"
58        else:
59            filename = f"{custom_filename}.mp4"
60
61        self.filepath = self.output_dir / filename
62
63        self.frames = []
64        self.last_frame_time = time.time()
65        self.is_recording = True
66
67        logger.info(f"Started video recording to {self.filepath}")
68        return str(self.filepath)

Start recording video.

Args: custom_filename: Optional custom filename (without extension)

Returns: Path to the output video file

def stop_recording(self) -> None:
70    def stop_recording(self) -> None:
71        """Stop recording and save the video to disk."""
72        if not self.is_recording:
73            return
74
75        self.is_recording = False
76
77        if self.recording_task and not self.recording_task.done():
78            self.recording_task.cancel()
79
80        # This will introduce a delay server-side... there is probably a better way to do this
81        if self.frames:
82            try:
83                logger.info(f"Writing {len(self.frames)} frames to {self.filepath}")
84                mp.write_video(str(self.filepath), self.frames, fps=self.fps)
85                logger.info("Video recording stopped and saved")
86            except Exception as e:
87                logger.error(f"Error saving video: {e}")
88        else:
89            logger.warning("No frames captured, no video file created")
90
91        self.frames = []

Stop recording and save the video to disk.

async def capture_frame(self) -> None:
 93    async def capture_frame(self) -> None:
 94        """Capture and add a single frame to the recording."""
 95        if not self.is_recording:
 96            return
 97
 98        try:
 99            frame, _ = await self.simulator.capture_frame()
100            self.frames.append(frame)
101
102        except Exception as e:
103            logger.error(f"Error capturing frame: {e}")

Capture and add a single frame to the recording.