Schedules managed agent ticks based on cron/interval configs.
Runs a background thread that checks for due agents and dispatches
ticks to the executor.
Source code in src/openjarvis/agents/scheduler.py
| def __init__(
self,
manager: AgentManager,
executor: AgentExecutor | Any,
tick_interval: float = 1.0,
event_bus: Any = None,
) -> None:
self._manager = manager
self._executor = executor
self._tick_interval = tick_interval
self._bus = event_bus
# agent_id -> {schedule_type, schedule_value, next_fire}
self._agents: dict[str, dict] = {}
self._tick_counts: dict[str, int] = {}
self._lock = threading.Lock()
self._thread: threading.Thread | None = None
self._stop_event = threading.Event()
|
Functions
register_agent
register_agent(agent_id: str) -> None
Register an agent for scheduling.
Source code in src/openjarvis/agents/scheduler.py
| def register_agent(self, agent_id: str) -> None:
"""Register an agent for scheduling."""
agent = self._manager.get_agent(agent_id)
if agent is None:
raise ValueError(f"Agent {agent_id} not found")
config = agent.get("config", {})
schedule_type = config.get("schedule_type", "manual")
schedule_value = config.get("schedule_value", 0)
now = time.time()
if schedule_type == "cron":
next_fire = _next_cron_fire(str(schedule_value), now)
elif schedule_type == "interval":
next_fire = now + float(schedule_value)
else:
next_fire = float("inf") # Manual: never auto-fires
with self._lock:
self._agents[agent_id] = {
"schedule_type": schedule_type,
"schedule_value": schedule_value,
"next_fire": next_fire,
}
logger.info(
"Registered agent %s (%s), next fire: %s",
agent_id,
schedule_type,
next_fire,
)
|
deregister_agent
deregister_agent(agent_id: str) -> None
Remove an agent from scheduling.
Source code in src/openjarvis/agents/scheduler.py
| def deregister_agent(self, agent_id: str) -> None:
"""Remove an agent from scheduling."""
with self._lock:
self._agents.pop(agent_id, None)
logger.info("Deregistered agent %s", agent_id)
|
start
Start the scheduler background thread.
Source code in src/openjarvis/agents/scheduler.py
| def start(self) -> None:
"""Start the scheduler background thread."""
if self.is_running:
return
if self._bus:
self._bus.subscribe(EventType.AGENT_TICK_END, self._on_tick_event)
self._stop_event.clear()
self._thread = threading.Thread(
target=self._loop, daemon=True, name="agent-scheduler"
)
self._thread.start()
logger.info("Agent scheduler started")
|
stop
Stop the scheduler background thread.
Source code in src/openjarvis/agents/scheduler.py
| def stop(self) -> None:
"""Stop the scheduler background thread."""
self._stop_event.set()
if self._bus:
self._bus.unsubscribe(EventType.AGENT_TICK_END, self._on_tick_event)
if self._thread is not None:
self._thread.join(timeout=10)
self._thread = None
logger.info("Agent scheduler stopped")
|