Bases: MemoryBackend
In-memory ColBERTv2 late interaction retrieval backend.
Encodes queries and documents into token-level embeddings using a
ColBERT checkpoint, then scores via MaxSim (for each query token,
take the maximum cosine similarity across all document tokens and
sum the results).
The checkpoint is lazily loaded on first use to avoid heavy model
loading during import or instantiation.
Source code in src/openjarvis/tools/storage/colbert_backend.py
| def __init__(
self,
*,
checkpoint: str = "colbert-ir/colbertv2.0",
device: str = "cpu",
) -> None:
self._checkpoint_name = checkpoint
self._device = device
# id -> (content, source, metadata)
self._documents: Dict[
str, Tuple[str, str, Dict[str, Any]]
] = {}
# id -> token-level embedding tensor
self._embeddings: Dict[str, Any] = {}
self._checkpoint_loaded: bool = False
self._checkpoint_obj: Any = None
|
store(content: str, *, source: str = '', metadata: Optional[Dict[str, Any]] = None) -> str
Persist content and return a unique document id.
Source code in src/openjarvis/tools/storage/colbert_backend.py
| def store(
self,
content: str,
*,
source: str = "",
metadata: Optional[Dict[str, Any]] = None,
) -> str:
"""Persist *content* and return a unique document id."""
doc_id = uuid.uuid4().hex
self._documents[doc_id] = (
content,
source,
metadata or {},
)
self._embeddings[doc_id] = self._encode(content)
bus = get_event_bus()
bus.publish(EventType.MEMORY_STORE, {
"backend": self.backend_id,
"doc_id": doc_id,
"source": source,
})
return doc_id
|
retrieve(query: str, *, top_k: int = 5, **kwargs: Any) -> List[RetrievalResult]
Search for query and return the top-k results.
Source code in src/openjarvis/tools/storage/colbert_backend.py
| def retrieve(
self,
query: str,
*,
top_k: int = 5,
**kwargs: Any,
) -> List[RetrievalResult]:
"""Search for *query* and return the top-k results."""
if not query.strip() or not self._documents:
bus = get_event_bus()
bus.publish(EventType.MEMORY_RETRIEVE, {
"backend": self.backend_id,
"query": query,
"num_results": 0,
})
return []
query_embs = self._encode(query)
scored: List[Tuple[str, float]] = []
for doc_id, doc_embs in self._embeddings.items():
score = self._maxsim(query_embs, doc_embs)
scored.append((doc_id, score))
scored.sort(key=lambda pair: pair[1], reverse=True)
results: List[RetrievalResult] = []
for doc_id, score in scored[:top_k]:
content, source, metadata = self._documents[doc_id]
results.append(RetrievalResult(
content=content,
score=score,
source=source,
metadata=dict(metadata),
))
bus = get_event_bus()
bus.publish(EventType.MEMORY_RETRIEVE, {
"backend": self.backend_id,
"query": query,
"num_results": len(results),
})
return results
|
delete(doc_id: str) -> bool
Delete a document by id. Return True if it existed.
Source code in src/openjarvis/tools/storage/colbert_backend.py
| def delete(self, doc_id: str) -> bool:
"""Delete a document by id. Return ``True`` if it existed."""
if doc_id not in self._documents:
return False
del self._documents[doc_id]
del self._embeddings[doc_id]
return True
|
Remove all stored documents.
Source code in src/openjarvis/tools/storage/colbert_backend.py
| def clear(self) -> None:
"""Remove all stored documents."""
self._documents.clear()
self._embeddings.clear()
|
Return the number of stored documents.
Source code in src/openjarvis/tools/storage/colbert_backend.py
| def count(self) -> int:
"""Return the number of stored documents."""
return len(self._documents)
|