LLM-backed extraction of durable facts from a conversation turn.
The extractor takes a single (user, assistant) exchange and asks a small
local model to distill any long-term, user-specific facts worth remembering.
It is deliberately defensive: extraction runs on a background thread far from
the request path, so any failure — a dropped Ollama connection, a timeout, a
BrokenPipeError when the client went away, or simply unparseable output —
must degrade to "no facts" rather than propagate.
FactExtractor(engine: Any, model: str, *, temperature: float = 0.0, max_tokens: int = 512, max_facts_per_turn: int = 10, max_fact_chars: int = 200, system_prompt: Optional[str] = None)
Extract memory-worthy facts from a conversation turn via an engine.
Source code in src/openjarvis/memory/extractor.py
| def __init__(
self,
engine: Any,
model: str,
*,
temperature: float = 0.0,
max_tokens: int = 512,
max_facts_per_turn: int = 10,
max_fact_chars: int = 200,
system_prompt: Optional[str] = None,
) -> None:
self._engine = engine
self._model = model
self._temperature = temperature
self._max_tokens = max_tokens
self._max_facts_per_turn = max_facts_per_turn
self._max_fact_chars = max_fact_chars
self._system_prompt = system_prompt or _DEFAULT_SYSTEM_PROMPT
|
extract(user_text: str, assistant_text: str = '') -> List[str]
Return durable facts from the exchange. Never raises.
Source code in src/openjarvis/memory/extractor.py
| def extract(self, user_text: str, assistant_text: str = "") -> List[str]:
"""Return durable facts from the exchange. Never raises."""
user_text = (user_text or "").strip()
if not user_text:
return []
exchange = f"User: {user_text}"
if assistant_text and assistant_text.strip():
exchange += f"\nAssistant: {assistant_text.strip()}"
messages = [
Message(role=Role.SYSTEM, content=self._system_prompt),
Message(role=Role.USER, content=exchange),
]
try:
result = self._engine.generate(
messages,
model=self._model,
temperature=self._temperature,
max_tokens=self._max_tokens,
)
except BrokenPipeError:
# The classic failure mode: the model call's transport died.
# Extraction is best-effort, so swallow it.
logger.debug("Memory extraction aborted: broken pipe", exc_info=True)
return []
except Exception: # noqa: BLE001 — extraction must never crash the worker
logger.debug("Memory extraction failed", exc_info=True)
return []
if isinstance(result, dict):
content = result.get("content", "") or ""
else:
content = str(result)
return self._parse(content)
|