Skip to content

service

service

Persistent memory service: async fact extraction integrated into core.

MemoryService runs fact extraction on a dedicated background thread so it never blocks jarvis serve request handling or the jarvis chat REPL. Callers hand off an exchange via :meth:submit, which enqueues the work and returns immediately — the slow model call and disk write happen out of band. The worker swallows every per-job error (including BrokenPipeError when a client disconnects mid-extraction), so a flaky extraction model can never take down the host process.

The service is started and stopped as part of the OpenJarvis lifecycle (see cli/serve.py and cli/chat_cmd.py) and is configured through the [memory] section of config.toml.

Classes

MemoryService

MemoryService(store: FactStore, extractor: FactExtractor, *, max_queue: int = 256)

Background long-term-memory extraction and persistence service.

Source code in src/openjarvis/memory/service.py
def __init__(
    self,
    store: FactStore,
    extractor: FactExtractor,
    *,
    max_queue: int = 256,
) -> None:
    self._store = store
    self._extractor = extractor
    self._queue: "queue.Queue[Any]" = queue.Queue(maxsize=max(1, max_queue))
    self._thread: Optional[threading.Thread] = None
    self._running = threading.Event()
Functions
start
start() -> None

Start the background worker thread (idempotent).

Source code in src/openjarvis/memory/service.py
def start(self) -> None:
    """Start the background worker thread (idempotent)."""
    if self._running.is_set():
        return
    self._running.set()
    self._thread = threading.Thread(
        target=self._loop,
        name="memory-service",
        daemon=True,
    )
    self._thread.start()
    logger.debug("Memory service started")
stop
stop(timeout: float = 2.0) -> None

Signal the worker to drain and stop, then join it (idempotent).

Source code in src/openjarvis/memory/service.py
def stop(self, timeout: float = 2.0) -> None:
    """Signal the worker to drain and stop, then join it (idempotent)."""
    if not self._running.is_set():
        return
    self._running.clear()
    try:
        self._queue.put_nowait(_STOP)
    except queue.Full:
        pass  # worker will notice the cleared flag on its next loop
    thread = self._thread
    if thread is not None:
        thread.join(timeout=timeout)
    self._thread = None
    logger.debug("Memory service stopped")
submit
submit(user_text: str, assistant_text: str = '') -> bool

Queue an exchange for extraction. Non-blocking; never raises.

Returns True if the job was enqueued, False if the service is not running or the queue is full (in which case the exchange is dropped rather than blocking the caller — extraction is best-effort).

Source code in src/openjarvis/memory/service.py
def submit(self, user_text: str, assistant_text: str = "") -> bool:
    """Queue an exchange for extraction. Non-blocking; never raises.

    Returns True if the job was enqueued, False if the service is not
    running or the queue is full (in which case the exchange is dropped
    rather than blocking the caller — extraction is best-effort).
    """
    if not self._running.is_set():
        return False
    if not user_text or not user_text.strip():
        return False
    try:
        self._queue.put_nowait((user_text, assistant_text))
        return True
    except queue.Full:
        logger.debug("Memory service queue full; dropping exchange")
        return False

Functions

build_memory_service

build_memory_service(config: Any, engine: Any, default_model: str = '') -> Optional[MemoryService]

Build a :class:MemoryService from config, or None if disabled.

Reads the [memory] section (config.memory / config.tools.storage) for enabled, backend, extraction_model, max_facts and facts_path. Returns None when memory is disabled or no engine / extraction model is available, so callers can simply do::

svc = build_memory_service(config, engine, model)
if svc is not None:
    svc.start()
Source code in src/openjarvis/memory/service.py
def build_memory_service(
    config: Any,
    engine: Any,
    default_model: str = "",
) -> Optional[MemoryService]:
    """Build a :class:`MemoryService` from config, or ``None`` if disabled.

    Reads the ``[memory]`` section (``config.memory`` / ``config.tools.storage``)
    for ``enabled``, ``backend``, ``extraction_model``, ``max_facts`` and
    ``facts_path``.  Returns ``None`` when memory is disabled or no engine /
    extraction model is available, so callers can simply do::

        svc = build_memory_service(config, engine, model)
        if svc is not None:
            svc.start()
    """
    mem = getattr(config, "memory", None)
    if mem is None or not getattr(mem, "enabled", False):
        return None
    if engine is None:
        return None

    model = getattr(mem, "extraction_model", "") or default_model
    if not model:
        logger.debug("Memory service disabled: no extraction model available")
        return None

    store = create_fact_store(
        getattr(mem, "backend", "local"),
        path=getattr(mem, "facts_path", "~/.openjarvis/memory_facts.jsonl"),
        max_facts=getattr(mem, "max_facts", 1000),
    )
    extractor = FactExtractor(engine, model)
    return MemoryService(store, extractor)