Skip to content

extractor

extractor

LLM-backed extraction of durable facts from a conversation turn.

The extractor takes a single (user, assistant) exchange and asks a small local model to distill any long-term, user-specific facts worth remembering. It is deliberately defensive: extraction runs on a background thread far from the request path, so any failure — a dropped Ollama connection, a timeout, a BrokenPipeError when the client went away, or simply unparseable output — must degrade to "no facts" rather than propagate.

Classes

FactExtractor

FactExtractor(engine: Any, model: str, *, temperature: float = 0.0, max_tokens: int = 512, max_facts_per_turn: int = 10, max_fact_chars: int = 200, system_prompt: Optional[str] = None)

Extract memory-worthy facts from a conversation turn via an engine.

Source code in src/openjarvis/memory/extractor.py
def __init__(
    self,
    engine: Any,
    model: str,
    *,
    temperature: float = 0.0,
    max_tokens: int = 512,
    max_facts_per_turn: int = 10,
    max_fact_chars: int = 200,
    system_prompt: Optional[str] = None,
) -> None:
    self._engine = engine
    self._model = model
    self._temperature = temperature
    self._max_tokens = max_tokens
    self._max_facts_per_turn = max_facts_per_turn
    self._max_fact_chars = max_fact_chars
    self._system_prompt = system_prompt or _DEFAULT_SYSTEM_PROMPT
Functions
extract
extract(user_text: str, assistant_text: str = '') -> List[str]

Return durable facts from the exchange. Never raises.

Source code in src/openjarvis/memory/extractor.py
def extract(self, user_text: str, assistant_text: str = "") -> List[str]:
    """Return durable facts from the exchange. Never raises."""
    user_text = (user_text or "").strip()
    if not user_text:
        return []

    exchange = f"User: {user_text}"
    if assistant_text and assistant_text.strip():
        exchange += f"\nAssistant: {assistant_text.strip()}"

    messages = [
        Message(role=Role.SYSTEM, content=self._system_prompt),
        Message(role=Role.USER, content=exchange),
    ]

    try:
        result = self._engine.generate(
            messages,
            model=self._model,
            temperature=self._temperature,
            max_tokens=self._max_tokens,
        )
    except BrokenPipeError:
        # The classic failure mode: the model call's transport died.
        # Extraction is best-effort, so swallow it.
        logger.debug("Memory extraction aborted: broken pipe", exc_info=True)
        return []
    except Exception:  # noqa: BLE001 — extraction must never crash the worker
        logger.debug("Memory extraction failed", exc_info=True)
        return []

    if isinstance(result, dict):
        content = result.get("content", "") or ""
    else:
        content = str(result)

    return self._parse(content)