Skip to content

store

store

SQLite-backed persistence for scheduled tasks and run logs.

Classes

SchedulerStore

SchedulerStore(db_path: str | Path)

SQLite CRUD store for scheduled tasks and their run logs.

Source code in src/openjarvis/scheduler/store.py
def __init__(self, db_path: str | Path) -> None:
    self._db_path = str(db_path)
    self._conn = sqlite3.connect(self._db_path, check_same_thread=False)
    self._conn.row_factory = sqlite3.Row
    self._conn.execute(_CREATE_TASKS_TABLE)
    self._conn.execute(_CREATE_LOGS_TABLE)
    self._conn.commit()
Functions
save_task
save_task(task: Dict[str, Any]) -> None

Insert or replace a scheduled task record.

Source code in src/openjarvis/scheduler/store.py
def save_task(self, task: Dict[str, Any]) -> None:
    """Insert or replace a scheduled task record."""
    self._conn.execute(
        _INSERT_TASK,
        (
            task["id"],
            task["prompt"],
            task["schedule_type"],
            task["schedule_value"],
            task.get("context_mode", "isolated"),
            task.get("status", "active"),
            task.get("next_run"),
            task.get("last_run"),
            task.get("agent", "simple"),
            task.get("tools", ""),
            json.dumps(task.get("metadata", {})),
        ),
    )
    self._conn.commit()
get_task
get_task(task_id: str) -> Optional[Dict[str, Any]]

Retrieve a single task by ID, or None if not found.

Source code in src/openjarvis/scheduler/store.py
def get_task(self, task_id: str) -> Optional[Dict[str, Any]]:
    """Retrieve a single task by ID, or ``None`` if not found."""
    row = self._conn.execute(
        "SELECT * FROM scheduled_tasks WHERE id = ?", (task_id,)
    ).fetchone()
    if row is None:
        return None
    return self._row_to_dict(row)
list_tasks
list_tasks(status: Optional[str] = None) -> List[Dict[str, Any]]

Return all tasks, optionally filtered by status.

Source code in src/openjarvis/scheduler/store.py
def list_tasks(self, status: Optional[str] = None) -> List[Dict[str, Any]]:
    """Return all tasks, optionally filtered by *status*."""
    if status is not None:
        rows = self._conn.execute(
            "SELECT * FROM scheduled_tasks WHERE status = ?", (status,)
        ).fetchall()
    else:
        rows = self._conn.execute("SELECT * FROM scheduled_tasks").fetchall()
    return [self._row_to_dict(r) for r in rows]
get_due_tasks
get_due_tasks(now_iso: str) -> List[Dict[str, Any]]

Return active tasks whose next_run is at or before now_iso.

Source code in src/openjarvis/scheduler/store.py
def get_due_tasks(self, now_iso: str) -> List[Dict[str, Any]]:
    """Return active tasks whose ``next_run`` is at or before *now_iso*."""
    rows = self._conn.execute(
        "SELECT * FROM scheduled_tasks WHERE status = 'active' "
        "AND next_run IS NOT NULL AND next_run <= ?",
        (now_iso,),
    ).fetchall()
    return [self._row_to_dict(r) for r in rows]
update_task
update_task(task: Dict[str, Any]) -> None

Update an existing task (same as save_task — uses INSERT OR REPLACE).

Source code in src/openjarvis/scheduler/store.py
def update_task(self, task: Dict[str, Any]) -> None:
    """Update an existing task (same as save_task — uses INSERT OR REPLACE)."""
    self.save_task(task)
delete_task
delete_task(task_id: str) -> None

Delete a task by ID.

Source code in src/openjarvis/scheduler/store.py
def delete_task(self, task_id: str) -> None:
    """Delete a task by ID."""
    self._conn.execute(
        "DELETE FROM scheduled_tasks WHERE id = ?", (task_id,)
    )
    self._conn.commit()
log_run
log_run(task_id: str, started_at: str, finished_at: str, success: bool, result: str = '', error: str = '') -> None

Record a single execution of a task.

Source code in src/openjarvis/scheduler/store.py
def log_run(
    self,
    task_id: str,
    started_at: str,
    finished_at: str,
    success: bool,
    result: str = "",
    error: str = "",
) -> None:
    """Record a single execution of a task."""
    self._conn.execute(
        _INSERT_LOG,
        (task_id, started_at, finished_at, int(success), result, error),
    )
    self._conn.commit()
get_run_logs
get_run_logs(task_id: str, limit: int = 10) -> List[Dict[str, Any]]

Return the most recent run logs for task_id.

Source code in src/openjarvis/scheduler/store.py
def get_run_logs(
    self, task_id: str, limit: int = 10
) -> List[Dict[str, Any]]:
    """Return the most recent run logs for *task_id*."""
    rows = self._conn.execute(
        "SELECT * FROM task_run_logs WHERE task_id = ? "
        "ORDER BY id DESC LIMIT ?",
        (task_id, limit),
    ).fetchall()
    return [dict(r) for r in rows]
close
close() -> None

Close the underlying SQLite connection.

Source code in src/openjarvis/scheduler/store.py
def close(self) -> None:
    """Close the underlying SQLite connection."""
    self._conn.close()