Skip to content

scheduler

scheduler

AgentScheduler — cron/interval tick scheduling for managed agents.

Classes

AgentScheduler

AgentScheduler(manager: AgentManager, executor: AgentExecutor | Any, tick_interval: float = 1.0, event_bus: Any = None)

Schedules managed agent ticks based on cron/interval configs.

Runs a background thread that checks for due agents and dispatches ticks to the executor.

Source code in src/openjarvis/agents/scheduler.py
def __init__(
    self,
    manager: AgentManager,
    executor: AgentExecutor | Any,
    tick_interval: float = 1.0,
    event_bus: Any = None,
) -> None:
    self._manager = manager
    self._executor = executor
    self._tick_interval = tick_interval
    self._bus = event_bus
    # agent_id -> {schedule_type, schedule_value, next_fire}
    self._agents: dict[str, dict] = {}
    self._tick_counts: dict[str, int] = {}
    self._lock = threading.Lock()
    self._thread: threading.Thread | None = None
    self._stop_event = threading.Event()
Functions
register_agent
register_agent(agent_id: str) -> None

Register an agent for scheduling.

Source code in src/openjarvis/agents/scheduler.py
def register_agent(self, agent_id: str) -> None:
    """Register an agent for scheduling."""
    agent = self._manager.get_agent(agent_id)
    if agent is None:
        raise ValueError(f"Agent {agent_id} not found")

    config = agent.get("config", {})
    schedule_type = config.get("schedule_type", "manual")
    schedule_value = config.get("schedule_value", 0)

    now = time.time()
    if schedule_type == "cron":
        next_fire = _next_cron_fire(str(schedule_value), now)
    elif schedule_type == "interval":
        next_fire = now + float(schedule_value)
    else:
        next_fire = float("inf")  # Manual: never auto-fires

    with self._lock:
        self._agents[agent_id] = {
            "schedule_type": schedule_type,
            "schedule_value": schedule_value,
            "next_fire": next_fire,
        }

    logger.info(
        "Registered agent %s (%s), next fire: %s",
        agent_id,
        schedule_type,
        next_fire,
    )
deregister_agent
deregister_agent(agent_id: str) -> None

Remove an agent from scheduling.

Source code in src/openjarvis/agents/scheduler.py
def deregister_agent(self, agent_id: str) -> None:
    """Remove an agent from scheduling."""
    with self._lock:
        self._agents.pop(agent_id, None)
    logger.info("Deregistered agent %s", agent_id)
start
start() -> None

Start the scheduler background thread.

Source code in src/openjarvis/agents/scheduler.py
def start(self) -> None:
    """Start the scheduler background thread."""
    if self.is_running:
        return
    if self._bus:
        self._bus.subscribe(EventType.AGENT_TICK_END, self._on_tick_event)
    self._stop_event.clear()
    self._thread = threading.Thread(
        target=self._loop, daemon=True, name="agent-scheduler"
    )
    self._thread.start()
    logger.info("Agent scheduler started")
stop
stop() -> None

Stop the scheduler background thread.

Source code in src/openjarvis/agents/scheduler.py
def stop(self) -> None:
    """Stop the scheduler background thread."""
    self._stop_event.set()
    if self._bus:
        self._bus.unsubscribe(EventType.AGENT_TICK_END, self._on_tick_event)
    if self._thread is not None:
        self._thread.join(timeout=10)
        self._thread = None
    logger.info("Agent scheduler stopped")