Skip to content

Index

scheduler

Task scheduler module — cron/interval/once scheduling with SQLite persistence.

Classes

ScheduledTask dataclass

ScheduledTask(id: str, prompt: str, schedule_type: str, schedule_value: str, context_mode: str = 'isolated', status: str = 'active', next_run: Optional[str] = None, last_run: Optional[str] = None, agent: str = 'simple', tools: str = '', metadata: Dict[str, Any] = dict())

A task scheduled for future or recurring execution.

Functions
to_dict
to_dict() -> Dict[str, Any]

Serialize to a plain dict for store persistence.

Source code in src/openjarvis/scheduler/scheduler.py
def to_dict(self) -> Dict[str, Any]:
    """Serialize to a plain dict for store persistence."""
    return {
        "id": self.id,
        "prompt": self.prompt,
        "schedule_type": self.schedule_type,
        "schedule_value": self.schedule_value,
        "context_mode": self.context_mode,
        "status": self.status,
        "next_run": self.next_run,
        "last_run": self.last_run,
        "agent": self.agent,
        "tools": self.tools,
        "metadata": self.metadata,
    }
from_dict classmethod
from_dict(d: Dict[str, Any]) -> ScheduledTask

Deserialize from a plain dict.

Source code in src/openjarvis/scheduler/scheduler.py
@classmethod
def from_dict(cls, d: Dict[str, Any]) -> ScheduledTask:
    """Deserialize from a plain dict."""
    return cls(
        id=d["id"],
        prompt=d["prompt"],
        schedule_type=d["schedule_type"],
        schedule_value=d["schedule_value"],
        context_mode=d.get("context_mode", "isolated"),
        status=d.get("status", "active"),
        next_run=d.get("next_run"),
        last_run=d.get("last_run"),
        agent=d.get("agent", "simple"),
        tools=d.get("tools", ""),
        metadata=d.get("metadata", {}),
    )

TaskScheduler

TaskScheduler(store: SchedulerStore, system: Any = None, *, poll_interval: int = 60, bus: Any = None)

Scheduler that polls for due tasks and executes them.

PARAMETER DESCRIPTION
store

The persistence backend.

TYPE: SchedulerStore

system

Optional JarvisSystem instance for executing prompts.

TYPE: Any DEFAULT: None

poll_interval

Seconds between poll cycles (default 60).

TYPE: int DEFAULT: 60

bus

Optional event bus for publishing scheduler events.

TYPE: Any DEFAULT: None

Source code in src/openjarvis/scheduler/scheduler.py
def __init__(
    self,
    store: SchedulerStore,
    system: Any = None,
    *,
    poll_interval: int = 60,
    bus: Any = None,
) -> None:
    self._store = store
    self._system = system
    self._poll_interval = poll_interval
    self._bus = bus
    self._stop_event = threading.Event()
    self._thread: Optional[threading.Thread] = None
    self._lock = threading.Lock()
Functions
start
start() -> None

Start the background polling daemon thread.

Source code in src/openjarvis/scheduler/scheduler.py
def start(self) -> None:
    """Start the background polling daemon thread."""
    if self._thread is not None and self._thread.is_alive():
        return
    self._stop_event.clear()
    self._thread = threading.Thread(
        target=self._poll_loop, daemon=True, name="jarvis-scheduler"
    )
    self._thread.start()
    logger.info("Scheduler started (poll_interval=%ds)", self._poll_interval)
stop
stop() -> None

Signal the background thread to stop and wait for it.

Source code in src/openjarvis/scheduler/scheduler.py
def stop(self) -> None:
    """Signal the background thread to stop and wait for it."""
    self._stop_event.set()
    if self._thread is not None:
        self._thread.join(timeout=self._poll_interval + 5)
        self._thread = None
    logger.info("Scheduler stopped")
create_task
create_task(prompt: str, schedule_type: str, schedule_value: str, **kwargs: Any) -> ScheduledTask

Create and persist a new scheduled task.

Source code in src/openjarvis/scheduler/scheduler.py
def create_task(
    self,
    prompt: str,
    schedule_type: str,
    schedule_value: str,
    **kwargs: Any,
) -> ScheduledTask:
    """Create and persist a new scheduled task."""
    task = ScheduledTask(
        id=uuid.uuid4().hex[:16],
        prompt=prompt,
        schedule_type=schedule_type,
        schedule_value=schedule_value,
        agent=kwargs.get("agent", "simple"),
        tools=kwargs.get("tools", ""),
        context_mode=kwargs.get("context_mode", "isolated"),
        metadata=kwargs.get("metadata", {}),
    )
    task.next_run = self._compute_next_run(task)
    with self._lock:
        self._store.save_task(task.to_dict())
    return task
list_tasks
list_tasks(*, status: Optional[str] = None) -> List[ScheduledTask]

Return tasks, optionally filtered by status.

Source code in src/openjarvis/scheduler/scheduler.py
def list_tasks(self, *, status: Optional[str] = None) -> List[ScheduledTask]:
    """Return tasks, optionally filtered by *status*."""
    with self._lock:
        rows = self._store.list_tasks(status=status)
    return [ScheduledTask.from_dict(r) for r in rows]
pause_task
pause_task(task_id: str) -> None

Pause an active task.

Source code in src/openjarvis/scheduler/scheduler.py
def pause_task(self, task_id: str) -> None:
    """Pause an active task."""
    with self._lock:
        d = self._store.get_task(task_id)
        if d is None:
            raise KeyError(f"Task not found: {task_id}")
        d["status"] = "paused"
        self._store.update_task(d)
resume_task
resume_task(task_id: str) -> None

Resume a paused task.

Source code in src/openjarvis/scheduler/scheduler.py
def resume_task(self, task_id: str) -> None:
    """Resume a paused task."""
    with self._lock:
        d = self._store.get_task(task_id)
        if d is None:
            raise KeyError(f"Task not found: {task_id}")
        d["status"] = "active"
        # Recompute next_run from now
        task = ScheduledTask.from_dict(d)
        task.next_run = self._compute_next_run(task)
        self._store.update_task(task.to_dict())
cancel_task
cancel_task(task_id: str) -> None

Cancel a task (sets status to cancelled).

Source code in src/openjarvis/scheduler/scheduler.py
def cancel_task(self, task_id: str) -> None:
    """Cancel a task (sets status to cancelled)."""
    with self._lock:
        d = self._store.get_task(task_id)
        if d is None:
            raise KeyError(f"Task not found: {task_id}")
        d["status"] = "cancelled"
        d["next_run"] = None
        self._store.update_task(d)

SchedulerStore

SchedulerStore(db_path: str | Path)

SQLite CRUD store for scheduled tasks and their run logs.

Source code in src/openjarvis/scheduler/store.py
def __init__(self, db_path: str | Path) -> None:
    self._db_path = str(db_path)
    self._conn = sqlite3.connect(self._db_path, check_same_thread=False)
    self._conn.row_factory = sqlite3.Row
    self._conn.execute(_CREATE_TASKS_TABLE)
    self._conn.execute(_CREATE_LOGS_TABLE)
    self._conn.commit()
Functions
save_task
save_task(task: Dict[str, Any]) -> None

Insert or replace a scheduled task record.

Source code in src/openjarvis/scheduler/store.py
def save_task(self, task: Dict[str, Any]) -> None:
    """Insert or replace a scheduled task record."""
    self._conn.execute(
        _INSERT_TASK,
        (
            task["id"],
            task["prompt"],
            task["schedule_type"],
            task["schedule_value"],
            task.get("context_mode", "isolated"),
            task.get("status", "active"),
            task.get("next_run"),
            task.get("last_run"),
            task.get("agent", "simple"),
            task.get("tools", ""),
            json.dumps(task.get("metadata", {})),
        ),
    )
    self._conn.commit()
get_task
get_task(task_id: str) -> Optional[Dict[str, Any]]

Retrieve a single task by ID, or None if not found.

Source code in src/openjarvis/scheduler/store.py
def get_task(self, task_id: str) -> Optional[Dict[str, Any]]:
    """Retrieve a single task by ID, or ``None`` if not found."""
    row = self._conn.execute(
        "SELECT * FROM scheduled_tasks WHERE id = ?", (task_id,)
    ).fetchone()
    if row is None:
        return None
    return self._row_to_dict(row)
list_tasks
list_tasks(status: Optional[str] = None) -> List[Dict[str, Any]]

Return all tasks, optionally filtered by status.

Source code in src/openjarvis/scheduler/store.py
def list_tasks(self, status: Optional[str] = None) -> List[Dict[str, Any]]:
    """Return all tasks, optionally filtered by *status*."""
    if status is not None:
        rows = self._conn.execute(
            "SELECT * FROM scheduled_tasks WHERE status = ?", (status,)
        ).fetchall()
    else:
        rows = self._conn.execute("SELECT * FROM scheduled_tasks").fetchall()
    return [self._row_to_dict(r) for r in rows]
get_due_tasks
get_due_tasks(now_iso: str) -> List[Dict[str, Any]]

Return active tasks whose next_run is at or before now_iso.

Source code in src/openjarvis/scheduler/store.py
def get_due_tasks(self, now_iso: str) -> List[Dict[str, Any]]:
    """Return active tasks whose ``next_run`` is at or before *now_iso*."""
    rows = self._conn.execute(
        "SELECT * FROM scheduled_tasks WHERE status = 'active' "
        "AND next_run IS NOT NULL AND next_run <= ?",
        (now_iso,),
    ).fetchall()
    return [self._row_to_dict(r) for r in rows]
update_task
update_task(task: Dict[str, Any]) -> None

Update an existing task (same as save_task — uses INSERT OR REPLACE).

Source code in src/openjarvis/scheduler/store.py
def update_task(self, task: Dict[str, Any]) -> None:
    """Update an existing task (same as save_task — uses INSERT OR REPLACE)."""
    self.save_task(task)
delete_task
delete_task(task_id: str) -> None

Delete a task by ID.

Source code in src/openjarvis/scheduler/store.py
def delete_task(self, task_id: str) -> None:
    """Delete a task by ID."""
    self._conn.execute(
        "DELETE FROM scheduled_tasks WHERE id = ?", (task_id,)
    )
    self._conn.commit()
log_run
log_run(task_id: str, started_at: str, finished_at: str, success: bool, result: str = '', error: str = '') -> None

Record a single execution of a task.

Source code in src/openjarvis/scheduler/store.py
def log_run(
    self,
    task_id: str,
    started_at: str,
    finished_at: str,
    success: bool,
    result: str = "",
    error: str = "",
) -> None:
    """Record a single execution of a task."""
    self._conn.execute(
        _INSERT_LOG,
        (task_id, started_at, finished_at, int(success), result, error),
    )
    self._conn.commit()
get_run_logs
get_run_logs(task_id: str, limit: int = 10) -> List[Dict[str, Any]]

Return the most recent run logs for task_id.

Source code in src/openjarvis/scheduler/store.py
def get_run_logs(
    self, task_id: str, limit: int = 10
) -> List[Dict[str, Any]]:
    """Return the most recent run logs for *task_id*."""
    rows = self._conn.execute(
        "SELECT * FROM task_run_logs WHERE task_id = ? "
        "ORDER BY id DESC LIMIT ?",
        (task_id, limit),
    ).fetchall()
    return [dict(r) for r in rows]
close
close() -> None

Close the underlying SQLite connection.

Source code in src/openjarvis/scheduler/store.py
def close(self) -> None:
    """Close the underlying SQLite connection."""
    self._conn.close()