Skip to content

store

store

SQLite-backed trace storage.

Classes

TraceStore

TraceStore(db_path: str | Path)

Append-only SQLite store for interaction traces.

Source code in src/openjarvis/traces/store.py
def __init__(self, db_path: str | Path) -> None:
    self._db_path = str(db_path)
    self._conn = sqlite3.connect(self._db_path)
    self._conn.execute("PRAGMA journal_mode=WAL")
    self._conn.execute(_CREATE_TRACES)
    self._conn.execute(_CREATE_STEPS)
    self._conn.execute(_CREATE_FTS)
    self._conn.execute(_FTS_SYNC_INSERT)
    self._conn.commit()
Functions
save
save(trace: Trace) -> None

Persist a complete trace with all its steps.

Source code in src/openjarvis/traces/store.py
def save(self, trace: Trace) -> None:
    """Persist a complete trace with all its steps."""
    self._conn.execute(
        _INSERT_TRACE,
        (
            trace.trace_id,
            trace.query,
            trace.agent,
            trace.model,
            trace.engine,
            trace.result,
            trace.outcome,
            trace.feedback,
            trace.started_at,
            trace.ended_at,
            trace.total_tokens,
            trace.total_latency_seconds,
            json.dumps(trace.metadata),
        ),
    )
    for idx, step in enumerate(trace.steps):
        self._conn.execute(
            _INSERT_STEP,
            (
                trace.trace_id,
                idx,
                step.step_type.value
                if isinstance(step.step_type, StepType)
                else step.step_type,
                step.timestamp,
                step.duration_seconds,
                json.dumps(step.input),
                json.dumps(step.output),
                json.dumps(step.metadata),
            ),
        )
    self._conn.commit()
get
get(trace_id: str) -> Optional[Trace]

Retrieve a trace by id, or None if not found.

Source code in src/openjarvis/traces/store.py
def get(self, trace_id: str) -> Optional[Trace]:
    """Retrieve a trace by id, or ``None`` if not found."""
    row = self._conn.execute(
        "SELECT * FROM traces WHERE trace_id = ?", (trace_id,)
    ).fetchone()
    if row is None:
        return None
    return self._row_to_trace(row)
list_traces
list_traces(*, agent: Optional[str] = None, model: Optional[str] = None, outcome: Optional[str] = None, since: Optional[float] = None, until: Optional[float] = None, limit: int = 100) -> List[Trace]

Query traces with optional filters.

Source code in src/openjarvis/traces/store.py
def list_traces(
    self,
    *,
    agent: Optional[str] = None,
    model: Optional[str] = None,
    outcome: Optional[str] = None,
    since: Optional[float] = None,
    until: Optional[float] = None,
    limit: int = 100,
) -> List[Trace]:
    """Query traces with optional filters."""
    clauses: List[str] = []
    params: List[Any] = []
    if agent is not None:
        clauses.append("agent = ?")
        params.append(agent)
    if model is not None:
        clauses.append("model = ?")
        params.append(model)
    if outcome is not None:
        clauses.append("outcome = ?")
        params.append(outcome)
    if since is not None:
        clauses.append("started_at >= ?")
        params.append(since)
    if until is not None:
        clauses.append("started_at <= ?")
        params.append(until)
    where = " AND ".join(clauses) if clauses else "1=1"
    sql = f"SELECT * FROM traces WHERE {where} ORDER BY started_at DESC LIMIT ?"
    params.append(limit)
    rows = self._conn.execute(sql, params).fetchall()
    return [self._row_to_trace(r) for r in rows]
count
count() -> int

Return the total number of stored traces.

Source code in src/openjarvis/traces/store.py
def count(self) -> int:
    """Return the total number of stored traces."""
    row = self._conn.execute("SELECT COUNT(*) FROM traces").fetchone()
    return row[0] if row else 0
search
search(query: str, *, agent: str | None = None, limit: int = 20) -> list[dict[str, Any]]

Full-text search across traces. Optionally filter by agent.

Source code in src/openjarvis/traces/store.py
def search(
    self,
    query: str,
    *,
    agent: str | None = None,
    limit: int = 20,
) -> list[dict[str, Any]]:
    """Full-text search across traces. Optionally filter by agent."""
    sql = (
        "SELECT t.trace_id, t.query, t.result, t.agent, t.model, t.outcome,"
        " t.started_at "
        "FROM traces_fts f JOIN traces t ON f.rowid = t.rowid "
        "WHERE traces_fts MATCH ?"
    )
    params: list[Any] = [query]
    if agent:
        sql += " AND t.agent = ?"
        params.append(agent)
    sql += " ORDER BY rank LIMIT ?"
    params.append(limit)
    rows = self._conn.execute(sql, params).fetchall()
    return [
        {
            "trace_id": r[0], "query": r[1], "result": r[2],
            "agent": r[3], "model": r[4], "outcome": r[5], "started_at": r[6],
        }
        for r in rows
    ]
subscribe_to_bus
subscribe_to_bus(bus: EventBus) -> None

Subscribe to TRACE_COMPLETE events on bus.

Source code in src/openjarvis/traces/store.py
def subscribe_to_bus(self, bus: EventBus) -> None:
    """Subscribe to ``TRACE_COMPLETE`` events on *bus*."""
    bus.subscribe(EventType.TRACE_COMPLETE, self._on_event)
update_feedback
update_feedback(trace_id: str, score: float) -> bool

Update the feedback score for a trace.

Returns True if the trace was found and updated, False otherwise.

Source code in src/openjarvis/traces/store.py
def update_feedback(self, trace_id: str, score: float) -> bool:
    """Update the feedback score for a trace.

    Returns True if the trace was found and updated, False otherwise.
    """
    cursor = self._conn.execute(
        "UPDATE traces SET feedback = ? WHERE trace_id = ?",
        (score, trace_id),
    )
    self._conn.commit()
    return cursor.rowcount > 0
close
close() -> None

Close the underlying SQLite connection.

Source code in src/openjarvis/traces/store.py
def close(self) -> None:
    """Close the underlying SQLite connection."""
    self._conn.close()