kos_sim.video_recorder
Video recording functionality for the KOS simulator.
1"""Video recording functionality for the KOS simulator.""" 2 3import asyncio 4import datetime 5import os 6import time 7from pathlib import Path 8 9import mediapy as mp 10import numpy as np 11 12from kos_sim import logger 13from kos_sim.simulator import MujocoSimulator 14 15 16class VideoRecorder: 17 """Manages video recording for a MuJoCo simulator.""" 18 19 def __init__( 20 self, 21 simulator: MujocoSimulator, 22 output_dir: str | Path, 23 fps: int = 30, 24 frame_width: int = 640, 25 frame_height: int = 480, 26 ) -> None: 27 self.simulator = simulator 28 self.output_dir = Path(output_dir) 29 self.fps = fps 30 self.frame_width = frame_width 31 self.frame_height = frame_height 32 33 self.last_frame_time = 0.0 34 self.writer = None 35 self.is_recording = False 36 self.recording_task: asyncio.Task[None] | None = None 37 self.frames: list[np.ndarray] = [] 38 39 os.makedirs(self.output_dir, exist_ok=True) 40 41 def start_recording(self, custom_filename: str | None = None) -> str: 42 """Start recording video. 43 44 Args: 45 custom_filename: Optional custom filename (without extension) 46 47 Returns: 48 Path to the output video file 49 """ 50 if self.is_recording: 51 self.stop_recording() 52 53 # Generate filename based on datetime if not provided 54 if custom_filename is None: 55 timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") 56 filename = f"sim_recording_{timestamp}.mp4" 57 else: 58 filename = f"{custom_filename}.mp4" 59 60 self.filepath = self.output_dir / filename 61 62 self.frames = [] 63 self.last_frame_time = time.time() 64 self.is_recording = True 65 66 logger.info(f"Started video recording to {self.filepath}") 67 return str(self.filepath) 68 69 def stop_recording(self) -> None: 70 """Stop recording and save the video to disk.""" 71 if not self.is_recording: 72 return 73 74 self.is_recording = False 75 76 if self.recording_task and not self.recording_task.done(): 77 self.recording_task.cancel() 78 79 # This will introduce a delay server-side... there is probably a better way to do this 80 if self.frames: 81 try: 82 logger.info(f"Writing {len(self.frames)} frames to {self.filepath}") 83 mp.write_video(str(self.filepath), self.frames, fps=self.fps) 84 logger.info("Video recording stopped and saved") 85 except Exception as e: 86 logger.error(f"Error saving video: {e}") 87 else: 88 logger.warning("No frames captured, no video file created") 89 90 self.frames = [] 91 92 async def capture_frame(self) -> None: 93 """Capture and add a single frame to the recording.""" 94 if not self.is_recording: 95 return 96 97 try: 98 frame, _ = await self.simulator.capture_frame() 99 self.frames.append(frame) 100 101 except Exception as e: 102 logger.error(f"Error capturing frame: {e}")
class
VideoRecorder:
17class VideoRecorder: 18 """Manages video recording for a MuJoCo simulator.""" 19 20 def __init__( 21 self, 22 simulator: MujocoSimulator, 23 output_dir: str | Path, 24 fps: int = 30, 25 frame_width: int = 640, 26 frame_height: int = 480, 27 ) -> None: 28 self.simulator = simulator 29 self.output_dir = Path(output_dir) 30 self.fps = fps 31 self.frame_width = frame_width 32 self.frame_height = frame_height 33 34 self.last_frame_time = 0.0 35 self.writer = None 36 self.is_recording = False 37 self.recording_task: asyncio.Task[None] | None = None 38 self.frames: list[np.ndarray] = [] 39 40 os.makedirs(self.output_dir, exist_ok=True) 41 42 def start_recording(self, custom_filename: str | None = None) -> str: 43 """Start recording video. 44 45 Args: 46 custom_filename: Optional custom filename (without extension) 47 48 Returns: 49 Path to the output video file 50 """ 51 if self.is_recording: 52 self.stop_recording() 53 54 # Generate filename based on datetime if not provided 55 if custom_filename is None: 56 timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") 57 filename = f"sim_recording_{timestamp}.mp4" 58 else: 59 filename = f"{custom_filename}.mp4" 60 61 self.filepath = self.output_dir / filename 62 63 self.frames = [] 64 self.last_frame_time = time.time() 65 self.is_recording = True 66 67 logger.info(f"Started video recording to {self.filepath}") 68 return str(self.filepath) 69 70 def stop_recording(self) -> None: 71 """Stop recording and save the video to disk.""" 72 if not self.is_recording: 73 return 74 75 self.is_recording = False 76 77 if self.recording_task and not self.recording_task.done(): 78 self.recording_task.cancel() 79 80 # This will introduce a delay server-side... there is probably a better way to do this 81 if self.frames: 82 try: 83 logger.info(f"Writing {len(self.frames)} frames to {self.filepath}") 84 mp.write_video(str(self.filepath), self.frames, fps=self.fps) 85 logger.info("Video recording stopped and saved") 86 except Exception as e: 87 logger.error(f"Error saving video: {e}") 88 else: 89 logger.warning("No frames captured, no video file created") 90 91 self.frames = [] 92 93 async def capture_frame(self) -> None: 94 """Capture and add a single frame to the recording.""" 95 if not self.is_recording: 96 return 97 98 try: 99 frame, _ = await self.simulator.capture_frame() 100 self.frames.append(frame) 101 102 except Exception as e: 103 logger.error(f"Error capturing frame: {e}")
Manages video recording for a MuJoCo simulator.
VideoRecorder( simulator: kos_sim.simulator.MujocoSimulator, output_dir: str | pathlib.Path, fps: int = 30, frame_width: int = 640, frame_height: int = 480)
20 def __init__( 21 self, 22 simulator: MujocoSimulator, 23 output_dir: str | Path, 24 fps: int = 30, 25 frame_width: int = 640, 26 frame_height: int = 480, 27 ) -> None: 28 self.simulator = simulator 29 self.output_dir = Path(output_dir) 30 self.fps = fps 31 self.frame_width = frame_width 32 self.frame_height = frame_height 33 34 self.last_frame_time = 0.0 35 self.writer = None 36 self.is_recording = False 37 self.recording_task: asyncio.Task[None] | None = None 38 self.frames: list[np.ndarray] = [] 39 40 os.makedirs(self.output_dir, exist_ok=True)
def
start_recording(self, custom_filename: str | None = None) -> str:
42 def start_recording(self, custom_filename: str | None = None) -> str: 43 """Start recording video. 44 45 Args: 46 custom_filename: Optional custom filename (without extension) 47 48 Returns: 49 Path to the output video file 50 """ 51 if self.is_recording: 52 self.stop_recording() 53 54 # Generate filename based on datetime if not provided 55 if custom_filename is None: 56 timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") 57 filename = f"sim_recording_{timestamp}.mp4" 58 else: 59 filename = f"{custom_filename}.mp4" 60 61 self.filepath = self.output_dir / filename 62 63 self.frames = [] 64 self.last_frame_time = time.time() 65 self.is_recording = True 66 67 logger.info(f"Started video recording to {self.filepath}") 68 return str(self.filepath)
Start recording video.
Args: custom_filename: Optional custom filename (without extension)
Returns: Path to the output video file
def
stop_recording(self) -> None:
70 def stop_recording(self) -> None: 71 """Stop recording and save the video to disk.""" 72 if not self.is_recording: 73 return 74 75 self.is_recording = False 76 77 if self.recording_task and not self.recording_task.done(): 78 self.recording_task.cancel() 79 80 # This will introduce a delay server-side... there is probably a better way to do this 81 if self.frames: 82 try: 83 logger.info(f"Writing {len(self.frames)} frames to {self.filepath}") 84 mp.write_video(str(self.filepath), self.frames, fps=self.fps) 85 logger.info("Video recording stopped and saved") 86 except Exception as e: 87 logger.error(f"Error saving video: {e}") 88 else: 89 logger.warning("No frames captured, no video file created") 90 91 self.frames = []
Stop recording and save the video to disk.
async def
capture_frame(self) -> None:
93 async def capture_frame(self) -> None: 94 """Capture and add a single frame to the recording.""" 95 if not self.is_recording: 96 return 97 98 try: 99 frame, _ = await self.simulator.capture_frame() 100 self.frames.append(frame) 101 102 except Exception as e: 103 logger.error(f"Error capturing frame: {e}")
Capture and add a single frame to the recording.