Skip to content

Index

storage

Storage primitive — persistent searchable storage.

Classes

MemoryBackend

Bases: ABC

Base class for all memory / retrieval backends.

Subclasses must be registered via @MemoryRegistry.register("name") to become discoverable.

Functions
store abstractmethod
store(content: str, *, source: str = '', metadata: Optional[Dict[str, Any]] = None) -> str

Persist content and return a unique document id.

Source code in src/openjarvis/tools/storage/_stubs.py
@abstractmethod
def store(
    self,
    content: str,
    *,
    source: str = "",
    metadata: Optional[Dict[str, Any]] = None,
) -> str:
    """Persist *content* and return a unique document id."""
retrieve abstractmethod
retrieve(query: str, *, top_k: int = 5, **kwargs: Any) -> List[RetrievalResult]

Search for query and return the top-k results.

Source code in src/openjarvis/tools/storage/_stubs.py
@abstractmethod
def retrieve(
    self,
    query: str,
    *,
    top_k: int = 5,
    **kwargs: Any,
) -> List[RetrievalResult]:
    """Search for *query* and return the top-k results."""
delete abstractmethod
delete(doc_id: str) -> bool

Delete a document by id. Return True if it existed.

Source code in src/openjarvis/tools/storage/_stubs.py
@abstractmethod
def delete(self, doc_id: str) -> bool:
    """Delete a document by id. Return ``True`` if it existed."""
clear abstractmethod
clear() -> None

Remove all stored documents.

Source code in src/openjarvis/tools/storage/_stubs.py
@abstractmethod
def clear(self) -> None:
    """Remove all stored documents."""

RetrievalResult dataclass

RetrievalResult(content: str, score: float = 0.0, source: str = '', metadata: Dict[str, Any] = dict())

A single result returned by a memory backend query.

Chunk dataclass

Chunk(content: str, source: str = '', offset: int = 0, index: int = 0, metadata: Dict[str, Any] = dict())

A single chunk produced by the chunking pipeline.

ChunkConfig dataclass

ChunkConfig(chunk_size: int = 512, chunk_overlap: int = 64, min_chunk_size: int = 50)

Parameters controlling the chunking strategy.

ContextConfig dataclass

ContextConfig(enabled: bool = True, top_k: int = 5, min_score: float = 0.1, max_context_tokens: int = 2048)

Controls how retrieved context is injected into prompts.

Functions

chunk_text

chunk_text(text: str, *, source: str = '', config: Optional[ChunkConfig] = None) -> List[Chunk]

Split text into chunks respecting paragraph boundaries.

PARAMETER DESCRIPTION
text

The full document text.

TYPE: str

source

Originating filename or identifier.

TYPE: str DEFAULT: ''

config

Chunking parameters (uses defaults if None).

TYPE: Optional[ChunkConfig] DEFAULT: None

RETURNS DESCRIPTION
List of :class:`Chunk` objects, in order.
Source code in src/openjarvis/tools/storage/chunking.py
def chunk_text(
    text: str,
    *,
    source: str = "",
    config: Optional[ChunkConfig] = None,
) -> List[Chunk]:
    """Split *text* into chunks respecting paragraph boundaries.

    Parameters
    ----------
    text:
        The full document text.
    source:
        Originating filename or identifier.
    config:
        Chunking parameters (uses defaults if ``None``).

    Returns
    -------
    List of :class:`Chunk` objects, in order.
    """
    if not text or not text.strip():
        return []

    cfg = config or ChunkConfig()

    # Split into paragraphs (double newline)
    paragraphs = [p for p in text.split("\n\n") if p.strip()]

    chunks: List[Chunk] = []
    current_tokens: List[str] = []
    current_offset = 0
    chunk_start_offset = 0

    for para in paragraphs:
        para_tokens = para.split()

        # If adding this paragraph would exceed chunk_size and we already
        # have content, flush the current chunk first.
        if (
            current_tokens
            and len(current_tokens) + len(para_tokens) > cfg.chunk_size
        ):
            chunk_content = " ".join(current_tokens)
            if _count_tokens(chunk_content) >= cfg.min_chunk_size:
                chunks.append(Chunk(
                    content=chunk_content,
                    source=source,
                    offset=chunk_start_offset,
                    index=len(chunks),
                ))

            # Keep the overlap tail for the next chunk
            if cfg.chunk_overlap > 0 and len(current_tokens) > cfg.chunk_overlap:
                overlap = current_tokens[-cfg.chunk_overlap:]
                current_tokens = list(overlap)
            else:
                current_tokens = []
            chunk_start_offset = current_offset

        # If a single paragraph exceeds chunk_size, split it directly
        if len(para_tokens) > cfg.chunk_size:
            # Flush anything accumulated first
            if current_tokens:
                chunk_content = " ".join(current_tokens)
                if _count_tokens(chunk_content) >= cfg.min_chunk_size:
                    chunks.append(Chunk(
                        content=chunk_content,
                        source=source,
                        offset=chunk_start_offset,
                        index=len(chunks),
                    ))
                current_tokens = []

            # Split the oversized paragraph into fixed windows
            idx = 0
            while idx < len(para_tokens):
                window = para_tokens[idx:idx + cfg.chunk_size]
                chunk_content = " ".join(window)
                if _count_tokens(chunk_content) >= cfg.min_chunk_size:
                    chunks.append(Chunk(
                        content=chunk_content,
                        source=source,
                        offset=current_offset + idx,
                        index=len(chunks),
                    ))
                step = max(1, cfg.chunk_size - cfg.chunk_overlap)
                idx += step

            current_offset += len(para_tokens)
            chunk_start_offset = current_offset
            continue

        current_tokens.extend(para_tokens)
        current_offset += len(para_tokens)

    # Flush remaining tokens
    if current_tokens:
        chunk_content = " ".join(current_tokens)
        if _count_tokens(chunk_content) >= cfg.min_chunk_size:
            chunks.append(Chunk(
                content=chunk_content,
                source=source,
                offset=chunk_start_offset,
                index=len(chunks),
            ))

    return chunks

inject_context

inject_context(query: str, messages: List[Message], backend: MemoryBackend, *, config: Optional[ContextConfig] = None) -> List[Message]

Retrieve relevant context and prepend it to messages.

Returns a new list — the original list is not mutated. If no results pass the score threshold, returns the original messages unchanged.

PARAMETER DESCRIPTION
query

The user query to search for.

TYPE: str

messages

The existing message list.

TYPE: List[Message]

backend

The memory backend to search.

TYPE: MemoryBackend

config

Context injection settings (uses defaults if None).

TYPE: Optional[ContextConfig] DEFAULT: None

Source code in src/openjarvis/tools/storage/context.py
def inject_context(
    query: str,
    messages: List[Message],
    backend: MemoryBackend,
    *,
    config: Optional[ContextConfig] = None,
) -> List[Message]:
    """Retrieve relevant context and prepend it to *messages*.

    Returns a **new** list — the original list is not mutated.
    If no results pass the score threshold, returns the original
    messages unchanged.

    Parameters
    ----------
    query:
        The user query to search for.
    messages:
        The existing message list.
    backend:
        The memory backend to search.
    config:
        Context injection settings (uses defaults if ``None``).
    """
    cfg = config or ContextConfig()
    if not cfg.enabled:
        return messages

    results = backend.retrieve(query, top_k=cfg.top_k)

    # Filter by minimum score
    results = [r for r in results if r.score >= cfg.min_score]

    if not results:
        return messages

    # Truncate to max_context_tokens
    truncated: List[RetrievalResult] = []
    total_tokens = 0
    for r in results:
        tokens = _count_tokens(r.content)
        if total_tokens + tokens > cfg.max_context_tokens:
            break
        truncated.append(r)
        total_tokens += tokens

    if not truncated:
        return messages

    # Publish event
    bus = get_event_bus()
    bus.publish(EventType.MEMORY_RETRIEVE, {
        "context_injection": True,
        "query": query,
        "num_results": len(truncated),
        "total_tokens": total_tokens,
    })

    # Build context message and prepend
    ctx_msg = build_context_message(truncated)
    return [ctx_msg] + list(messages)

ingest_path

ingest_path(path: Path, *, config: Optional[ChunkConfig] = None) -> List[Chunk]

Ingest a file or directory into chunks.

If path is a file, reads and chunks it. If path is a directory, recursively walks it (skipping hidden and common non-content directories) and chunks each file.

Source code in src/openjarvis/tools/storage/ingest.py
def ingest_path(
    path: Path,
    *,
    config: Optional[ChunkConfig] = None,
) -> List[Chunk]:
    """Ingest a file or directory into chunks.

    If *path* is a file, reads and chunks it.
    If *path* is a directory, recursively walks it (skipping hidden and
    common non-content directories) and chunks each file.
    """
    if not path.exists():
        raise FileNotFoundError(f"Path not found: {path}")

    if path.is_file():
        text, _meta = read_document(path)
        return chunk_text(text, source=str(path), config=config)

    # Directory: recursive walk
    all_chunks: List[Chunk] = []
    for child in sorted(path.rglob("*")):
        # Skip directories themselves — rglob yields files too
        if child.is_dir():
            continue

        # Check if any parent directory should be skipped
        rel = child.relative_to(path)
        skip = False
        for part in rel.parts[:-1]:
            if _should_skip_dir(part):
                skip = True
                break
        if skip:
            continue

        # Skip hidden files
        if child.name.startswith("."):
            continue

        # Skip sensitive files (secrets, credentials, keys)
        from openjarvis.security.file_policy import is_sensitive_file

        if is_sensitive_file(child):
            continue

        # Skip binary-looking files
        if child.suffix.lower() in {
            ".png", ".jpg", ".jpeg", ".gif", ".bmp", ".ico",
            ".mp3", ".mp4", ".wav", ".avi", ".mov",
            ".zip", ".tar", ".gz", ".bz2", ".7z",
            ".exe", ".dll", ".so", ".dylib", ".o",
            ".pyc", ".pyo", ".class", ".wasm",
        }:
            continue

        try:
            text, _meta = read_document(child)
            chunks = chunk_text(text, source=str(child), config=config)
            all_chunks.extend(chunks)
        except (ImportError, OSError):
            # Skip files we can't read (e.g. PDF without pdfplumber)
            continue

    return all_chunks

read_document

read_document(path: Path) -> Tuple[str, DocumentMeta]

Read a file and return (text, metadata).

RAISES DESCRIPTION
ImportError

If the file is a PDF and pdfplumber is not installed.

FileNotFoundError

If path does not exist.

Source code in src/openjarvis/tools/storage/ingest.py
def read_document(path: Path) -> Tuple[str, DocumentMeta]:
    """Read a file and return ``(text, metadata)``.

    Raises
    ------
    ImportError
        If the file is a PDF and ``pdfplumber`` is not installed.
    FileNotFoundError
        If *path* does not exist.
    """
    if not path.exists():
        raise FileNotFoundError(f"File not found: {path}")

    ftype = detect_file_type(path)

    if ftype == "pdf":
        try:
            import pdfplumber  # noqa: F401
        except ImportError:
            raise ImportError(
                "PDF support requires pdfplumber. "
                "Install it with: uv sync --extra memory-pdf"
            ) from None

        text = _read_pdf(path)
    else:
        text = _read_text(path)

    line_count = text.count("\n") + 1 if text else 0
    meta = DocumentMeta(
        path=str(path),
        file_type=ftype,
        size_bytes=path.stat().st_size,
        line_count=line_count,
    )
    return text, meta