Module kos_sim.stepping

Stepping controller for the simulation.

Classes

class StepController (steppable: Steppable,
mode: StepMode = StepMode.CONTINUOUS)
Expand source code
class StepController:
    """Controls the stepping behavior of a simulation."""

    def __init__(self, steppable: Steppable, mode: StepMode = StepMode.CONTINUOUS) -> None:
        self.steppable = steppable
        self.mode = mode
        self._paused = False
        self._lock = asyncio.Lock()
        self._step_request = False
        self._num_steps = 0

    @property
    def paused(self) -> bool:
        return self._paused

    async def set_paused(self, paused: bool) -> None:
        """Pause or unpause the simulation."""
        async with self._lock:
            self._paused = paused

    async def request_steps(self, num_steps: int = 1) -> None:
        """Request a number of steps to be taken."""
        async with self._lock:
            self._step_request = True
            self._num_steps = num_steps

    async def should_step(self) -> bool:
        """Check if a step should be taken."""
        async with self._lock:
            if self.mode == StepMode.CONTINUOUS:
                return not self._paused

            if self._step_request and self._num_steps > 0:
                self._num_steps -= 1
                if self._num_steps == 0:
                    self._step_request = False
                return True

            return False

Controls the stepping behavior of a simulation.

Instance variables

prop paused : bool
Expand source code
@property
def paused(self) -> bool:
    return self._paused

Methods

async def request_steps(self, num_steps: int = 1) ‑> None
Expand source code
async def request_steps(self, num_steps: int = 1) -> None:
    """Request a number of steps to be taken."""
    async with self._lock:
        self._step_request = True
        self._num_steps = num_steps

Request a number of steps to be taken.

async def set_paused(self, paused: bool) ‑> None
Expand source code
async def set_paused(self, paused: bool) -> None:
    """Pause or unpause the simulation."""
    async with self._lock:
        self._paused = paused

Pause or unpause the simulation.

async def should_step(self) ‑> bool
Expand source code
async def should_step(self) -> bool:
    """Check if a step should be taken."""
    async with self._lock:
        if self.mode == StepMode.CONTINUOUS:
            return not self._paused

        if self._step_request and self._num_steps > 0:
            self._num_steps -= 1
            if self._num_steps == 0:
                self._step_request = False
            return True

        return False

Check if a step should be taken.

class StepMode (*args, **kwds)
Expand source code
class StepMode(Enum):
    """Defines how the simulation stepping should behave."""

    CONTINUOUS = auto()  # Run continuously in real-time
    MANUAL = auto()  # Only step when explicitly called

Defines how the simulation stepping should behave.

Ancestors

  • enum.Enum

Class variables

var CONTINUOUS
var MANUAL
class Steppable (*args, **kwargs)
Expand source code
class Steppable(Protocol):
    """Protocol for objects that can be stepped."""

    async def step(self) -> None: ...

Protocol for objects that can be stepped.

Ancestors

  • typing.Protocol
  • typing.Generic

Methods

async def step(self) ‑> None
Expand source code
async def step(self) -> None: ...