Skip to content

Index

openjarvis

OpenJarvis — modular AI assistant backend with composable intelligence primitives.

Classes

Jarvis

Jarvis(*, config: Optional[JarvisConfig] = None, config_path: Optional[str] = None, engine_key: Optional[str] = None, model: Optional[str] = None)

High-level OpenJarvis SDK.

Usage::

from openjarvis import Jarvis

with Jarvis() as j:
    response = j.ask("Hello, what can you do?")
    print(response)

# Streaming:
import asyncio

async def main():
    j = Jarvis()
    async for token in j.ask_stream("Tell me a joke"):
        print(token, end="", flush=True)
    j.close()

asyncio.run(main())

# Or without context manager:
j = Jarvis()
response = j.ask("Hello")
j.close()
Source code in src/openjarvis/sdk.py
def __init__(
    self,
    *,
    config: Optional[JarvisConfig] = None,
    config_path: Optional[str] = None,
    engine_key: Optional[str] = None,
    model: Optional[str] = None,
) -> None:
    if config is not None:
        self._config = config
    elif config_path is not None:
        self._config = load_config(Path(config_path))
    else:
        self._config = load_config()

    self._engine_key = engine_key
    self._model_override = model
    self._engine: Any = None
    self._energy_monitor: Any = None
    self._resolved_engine_key: Optional[str] = None
    self._bus = EventBus()
    self._telem_store: Optional[TelemetryStore] = None
    self._audit_logger: Any = None
    self.memory = MemoryHandle(self._config)

    # Set up telemetry
    if self._config.telemetry.enabled:
        try:
            self._telem_store = TelemetryStore(self._config.telemetry.db_path)
            self._telem_store.subscribe_to_bus(self._bus)
        except Exception as exc:
            logger.warning("Failed to initialize telemetry store: %s", exc)

    # Set up security audit logger
    if self._config.security.enabled:
        try:
            from openjarvis.security.audit import AuditLogger

            self._audit_logger = AuditLogger(
                db_path=self._config.security.audit_log_path,
                bus=self._bus,
            )
        except Exception as exc:
            logger.warning("Failed to initialize security audit logger: %s", exc)
Attributes
config property
config: JarvisConfig

Return the active configuration.

version property
version: str

Return the OpenJarvis version string.

Functions
ask
ask(query: str, *, model: Optional[str] = None, agent: Optional[str] = None, tools: Optional[List[str]] = None, temperature: Optional[float] = None, max_tokens: Optional[int] = None, context: bool = True) -> str

Send a query and return the response text.

Source code in src/openjarvis/sdk.py
def ask(
    self,
    query: str,
    *,
    model: Optional[str] = None,
    agent: Optional[str] = None,
    tools: Optional[List[str]] = None,
    temperature: Optional[float] = None,
    max_tokens: Optional[int] = None,
    context: bool = True,
) -> str:
    """Send a query and return the response text."""
    result = self.ask_full(
        query,
        model=model,
        agent=agent,
        tools=tools,
        temperature=temperature,
        max_tokens=max_tokens,
        context=context,
    )
    return result["content"]
ask_full
ask_full(query: str, *, model: Optional[str] = None, agent: Optional[str] = None, tools: Optional[List[str]] = None, temperature: Optional[float] = None, max_tokens: Optional[int] = None, context: bool = True) -> Dict[str, Any]

Send a query and return the full result dict.

Returns a dict with keys: content, usage, tool_results (if agent mode).

Source code in src/openjarvis/sdk.py
def ask_full(
    self,
    query: str,
    *,
    model: Optional[str] = None,
    agent: Optional[str] = None,
    tools: Optional[List[str]] = None,
    temperature: Optional[float] = None,
    max_tokens: Optional[int] = None,
    context: bool = True,
) -> Dict[str, Any]:
    """Send a query and return the full result dict.

    Returns a dict with keys: content, usage, tool_results (if agent mode).
    """
    self._ensure_engine()
    if temperature is None:
        temperature = self._config.intelligence.temperature
    if max_tokens is None:
        max_tokens = self._config.intelligence.max_tokens

    model_name = model or self._model_override

    # Resolve model via router if not specified
    if model_name is None:
        model_name = self._resolve_model(query)

    if not model_name:
        models = self._engine.list_models()
        model_name = models[0] if models else "default"

    # Agent mode
    if agent is not None:
        return self._run_agent(
            agent, query, model_name,
            tools=tools or [],
            temperature=temperature,
            max_tokens=max_tokens,
            context=context,
        )

    # Direct engine mode
    messages = [Message(role=Role.USER, content=query)]

    # Memory context injection
    if context and self._config.agent.context_from_memory:
        messages = self._inject_context(query, messages)

    # InstrumentedEngine handles telemetry + energy recording
    result = self._engine.generate(
        messages,
        model=model_name,
        temperature=temperature,
        max_tokens=max_tokens,
    )

    return {
        "content": result.get("content", ""),
        "usage": result.get("usage", {}),
        "model": model_name,
        "engine": self._resolved_engine_key,
    }
ask_stream async
ask_stream(query: str, *, model: Optional[str] = None, temperature: Optional[float] = None, max_tokens: Optional[int] = None, context: bool = True) -> AsyncIterator[str]

Stream tokens as they are generated. Yields token strings.

Source code in src/openjarvis/sdk.py
async def ask_stream(
    self,
    query: str,
    *,
    model: Optional[str] = None,
    temperature: Optional[float] = None,
    max_tokens: Optional[int] = None,
    context: bool = True,
) -> AsyncIterator[str]:
    """Stream tokens as they are generated. Yields token strings."""
    self._ensure_engine()
    if temperature is None:
        temperature = self._config.intelligence.temperature
    if max_tokens is None:
        max_tokens = self._config.intelligence.max_tokens

    model_name = model or self._model_override

    if model_name is None:
        model_name = self._resolve_model(query)

    if not model_name:
        models = self._engine.list_models()
        model_name = models[0] if models else "default"

    messages = [Message(role=Role.USER, content=query)]

    if context and self._config.agent.context_from_memory:
        messages = self._inject_context(query, messages)

    async for token in self._engine.stream(
        messages,
        model=model_name,
        temperature=temperature,
        max_tokens=max_tokens,
    ):
        yield token
ask_full_stream async
ask_full_stream(query: str, *, model: Optional[str] = None, temperature: Optional[float] = None, max_tokens: Optional[int] = None, context: bool = True) -> AsyncIterator[Dict[str, Any]]

Stream token dicts with metadata.

Yields dicts with token and index keys for each token. The final dict has done: True along with the full concatenated content, model, and engine keys.

Source code in src/openjarvis/sdk.py
async def ask_full_stream(
    self,
    query: str,
    *,
    model: Optional[str] = None,
    temperature: Optional[float] = None,
    max_tokens: Optional[int] = None,
    context: bool = True,
) -> AsyncIterator[Dict[str, Any]]:
    """Stream token dicts with metadata.

    Yields dicts with ``token`` and ``index`` keys for each token.
    The final dict has ``done: True`` along with the full concatenated
    ``content``, ``model``, and ``engine`` keys.
    """
    self._ensure_engine()
    if temperature is None:
        temperature = self._config.intelligence.temperature
    if max_tokens is None:
        max_tokens = self._config.intelligence.max_tokens

    model_name = model or self._model_override

    if model_name is None:
        model_name = self._resolve_model(query)

    if not model_name:
        models = self._engine.list_models()
        model_name = models[0] if models else "default"

    messages = [Message(role=Role.USER, content=query)]

    if context and self._config.agent.context_from_memory:
        messages = self._inject_context(query, messages)

    parts: List[str] = []
    i = 0
    async for token in self._engine.stream(
        messages,
        model=model_name,
        temperature=temperature,
        max_tokens=max_tokens,
    ):
        parts.append(token)
        yield {"token": token, "index": i}
        i += 1

    yield {
        "done": True,
        "content": "".join(parts),
        "model": model_name,
        "engine": self._resolved_engine_key,
    }
list_models
list_models() -> List[str]

Return a list of available model identifiers.

Source code in src/openjarvis/sdk.py
def list_models(self) -> List[str]:
    """Return a list of available model identifiers."""
    self._ensure_engine()
    return self._engine.list_models()
list_engines
list_engines() -> List[str]

Return a list of registered engine keys.

Source code in src/openjarvis/sdk.py
def list_engines(self) -> List[str]:
    """Return a list of registered engine keys."""
    from openjarvis.core.registry import EngineRegistry

    return list(EngineRegistry.keys())
close
close() -> None

Release all resources.

Source code in src/openjarvis/sdk.py
def close(self) -> None:
    """Release all resources."""
    self.memory.close()
    if self._energy_monitor is not None:
        try:
            self._energy_monitor.close()
        except Exception as exc:
            logger.debug("Error closing energy monitor: %s", exc)
        self._energy_monitor = None
    if self._telem_store is not None:
        try:
            self._telem_store.close()
        except Exception as exc:
            logger.debug("Error closing telemetry store: %s", exc)
        self._telem_store = None
    if self._audit_logger is not None:
        try:
            self._audit_logger.close()
        except Exception as exc:
            logger.debug("Error closing audit logger: %s", exc)
        self._audit_logger = None
    self._engine = None

JarvisSystem dataclass

JarvisSystem(config: JarvisConfig, bus: EventBus, engine: InferenceEngine, engine_key: str, model: str, agent: Optional[Any] = None, agent_name: str = '', tools: List[BaseTool] = list(), tool_executor: Optional[ToolExecutor] = None, memory_backend: Optional[Any] = None, channel_backend: Optional[Any] = None, router: Optional[Any] = None, mcp_server: Optional[Any] = None, telemetry_store: Optional[Any] = None, trace_store: Optional[Any] = None, trace_collector: Optional[Any] = None, gpu_monitor: Optional[Any] = None, scheduler_store: Optional[Any] = None, scheduler: Optional[Any] = None, container_runner: Optional[Any] = None, workflow_engine: Optional[Any] = None, session_store: Optional[Any] = None, capability_policy: Optional[Any] = None, operator_manager: Optional[Any] = None, agent_manager: Optional[Any] = None, agent_scheduler: Optional[Any] = None, agent_executor: Optional[Any] = None, speech_backend: Optional[Any] = None, _learning_orchestrator: Optional[Any] = None)

Fully wired system -- the single source of truth for primitive composition.

Functions
ask
ask(query: str, *, context: bool = True, temperature: Optional[float] = None, max_tokens: Optional[int] = None, agent: Optional[str] = None, tools: Optional[List[str]] = None, system_prompt: Optional[str] = None, operator_id: Optional[str] = None) -> Dict[str, Any]

Execute a query through the system and return a result dict.

Source code in src/openjarvis/system.py
def ask(
    self,
    query: str,
    *,
    context: bool = True,
    temperature: Optional[float] = None,
    max_tokens: Optional[int] = None,
    agent: Optional[str] = None,
    tools: Optional[List[str]] = None,
    system_prompt: Optional[str] = None,
    operator_id: Optional[str] = None,
) -> Dict[str, Any]:
    """Execute a query through the system and return a result dict."""
    if temperature is None:
        temperature = self.config.intelligence.temperature
    if max_tokens is None:
        max_tokens = self.config.intelligence.max_tokens

    messages = [Message(role=Role.USER, content=query)]

    # Context injection from memory
    if context and self.memory_backend and self.config.agent.context_from_memory:
        try:
            from openjarvis.tools.storage.context import (
                ContextConfig,
                inject_context,
            )

            ctx_cfg = ContextConfig(
                top_k=self.config.memory.context_top_k,
                min_score=self.config.memory.context_min_score,
                max_context_tokens=self.config.memory.context_max_tokens,
            )
            messages = inject_context(
                query, messages, self.memory_backend, config=ctx_cfg,
            )
        except Exception as exc:
            logger.warning("Failed to inject memory context: %s", exc)

    # Agent mode
    use_agent = agent or self.agent_name
    if use_agent and use_agent != "none":
        return self._run_agent(
            query, messages, use_agent, tools, temperature, max_tokens,
            system_prompt=system_prompt, operator_id=operator_id,
        )

    # Direct engine mode
    result = self.engine.generate(
        messages, model=self.model,
        temperature=temperature, max_tokens=max_tokens,
    )
    return {
        "content": result.get("content", ""),
        "usage": result.get("usage", {}),
        "model": self.model,
        "engine": self.engine_key,
    }
close
close() -> None

Release resources.

Source code in src/openjarvis/system.py
def close(self) -> None:
    """Release resources."""
    if self.scheduler and hasattr(self.scheduler, "stop"):
        self.scheduler.stop()
    for resource in (
        self.scheduler_store,
        self.engine,
        self.gpu_monitor,
        self.telemetry_store,
        self.trace_store,
        self.memory_backend,
        self.session_store,
        self.channel_backend,
        self.workflow_engine,
        self.container_runner,
    ):
        if resource and hasattr(resource, "close"):
            resource.close()
    if self.agent_manager is not None:
        self.agent_manager.close()
    if self.agent_scheduler is not None:
        self.agent_scheduler.stop()

MemoryHandle

MemoryHandle(config: JarvisConfig)

Proxy for memory operations. Lazily initializes backend.

Source code in src/openjarvis/sdk.py
def __init__(self, config: JarvisConfig) -> None:
    self._config = config
    self._backend: Any = None
Functions
index
index(path: str, *, chunk_size: int = 512, chunk_overlap: int = 64) -> Dict[str, Any]

Index a file or directory into memory.

Source code in src/openjarvis/sdk.py
def index(
    self,
    path: str,
    *,
    chunk_size: int = 512,
    chunk_overlap: int = 64,
) -> Dict[str, Any]:
    """Index a file or directory into memory."""
    from openjarvis.tools.storage.chunking import ChunkConfig
    from openjarvis.tools.storage.ingest import ingest_path

    backend = self._get_backend()
    cfg = ChunkConfig(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
    chunks = ingest_path(Path(path), config=cfg)

    doc_ids: List[str] = []
    for chunk in chunks:
        doc_id = backend.store(
            chunk.content, source=chunk.source,
            metadata={"index": chunk.index},
        )
        doc_ids.append(doc_id)

    return {
        "chunks": len(chunks),
        "doc_ids": doc_ids,
        "path": path,
    }
search
search(query: str, *, top_k: int = 5) -> List[Dict[str, Any]]

Search memory for relevant chunks.

Source code in src/openjarvis/sdk.py
def search(self, query: str, *, top_k: int = 5) -> List[Dict[str, Any]]:
    """Search memory for relevant chunks."""
    backend = self._get_backend()
    results = backend.retrieve(query, top_k=top_k)
    return [
        {
            "content": r.content,
            "score": r.score,
            "source": r.source,
            "metadata": r.metadata,
        }
        for r in results
    ]
stats
stats() -> Dict[str, Any]

Return memory backend statistics.

Source code in src/openjarvis/sdk.py
def stats(self) -> Dict[str, Any]:
    """Return memory backend statistics."""
    backend = self._get_backend()
    if hasattr(backend, "count"):
        return {
            "count": backend.count(),
            "backend": self._config.memory.default_backend,
        }
    return {"backend": self._config.memory.default_backend}
close
close() -> None

Release the memory backend.

Source code in src/openjarvis/sdk.py
def close(self) -> None:
    """Release the memory backend."""
    if self._backend is not None:
        if hasattr(self._backend, "close"):
            self._backend.close()
        self._backend = None

SystemBuilder

SystemBuilder(config: Optional[JarvisConfig] = None, *, config_path: Optional[Any] = None)

Config-driven fluent builder for JarvisSystem.

Source code in src/openjarvis/system.py
def __init__(
    self,
    config: Optional[JarvisConfig] = None,
    *,
    config_path: Optional[Any] = None,
) -> None:
    if config is not None:
        self._config = config
    elif config_path is not None:
        from pathlib import Path

        self._config = load_config(Path(config_path))
    else:
        self._config = load_config()

    self._engine_key: Optional[str] = None
    self._model: Optional[str] = None
    self._agent_name: Optional[str] = None
    self._tool_names: Optional[List[str]] = None
    self._telemetry: Optional[bool] = None
    self._traces: Optional[bool] = None
    self._bus: Optional[EventBus] = None
    self._sandbox: Optional[bool] = None
    self._scheduler: Optional[bool] = None
    self._workflow: Optional[bool] = None
    self._sessions: Optional[bool] = None
    self._speech: Optional[bool] = None
Functions
build
build() -> JarvisSystem

Construct a fully wired JarvisSystem.

Source code in src/openjarvis/system.py
def build(self) -> JarvisSystem:
    """Construct a fully wired JarvisSystem."""
    config = self._config
    bus = self._bus or get_event_bus()

    # Resolve engine
    engine, engine_key = self._resolve_engine(config)

    # Resolve model
    model = self._resolve_model(config, engine)

    # Compute telemetry_enabled once
    telemetry_enabled = (
        self._telemetry if self._telemetry is not None
        else config.telemetry.enabled
    )
    gpu_monitor = None
    energy_monitor = None
    if telemetry_enabled and config.telemetry.gpu_metrics:
        # Try new multi-vendor EnergyMonitor first
        try:
            from openjarvis.telemetry.energy_monitor import (
                create_energy_monitor,
            )

            energy_monitor = create_energy_monitor(
                poll_interval_ms=config.telemetry.gpu_poll_interval_ms,
                prefer_vendor=config.telemetry.energy_vendor or None,
            )
        except ImportError:
            pass

        # Fall back to legacy GpuMonitor
        if energy_monitor is None:
            try:
                from openjarvis.telemetry.gpu_monitor import GpuMonitor

                if GpuMonitor.available():
                    gpu_monitor = GpuMonitor(
                        poll_interval_ms=config.telemetry.gpu_poll_interval_ms,
                    )
            except ImportError:
                pass

    # Apply security guardrails FIRST (innermost wrapper)
    engine = self._apply_security(config, engine, bus)

    # Then wrap with InstrumentedEngine (outermost wrapper)
    if telemetry_enabled:
        from openjarvis.telemetry.instrumented_engine import (
            InstrumentedEngine,
        )

        engine = InstrumentedEngine(
            engine, bus,
            gpu_monitor=gpu_monitor,
            energy_monitor=energy_monitor,
        )

    # Set up telemetry store
    telemetry_store = None
    if telemetry_enabled:
        telemetry_store = self._setup_telemetry(config, bus)

    # Resolve memory backend
    memory_backend = self._resolve_memory(config)

    # Resolve channel backend
    channel_backend = self._resolve_channel(config, bus)

    # Resolve tools
    tool_list = self._resolve_tools(
        config, engine, model, memory_backend, channel_backend,
    )

    # Build tool executor
    tool_executor = ToolExecutor(tool_list, bus) if tool_list else None

    # Resolve agent name
    agent_name = self._agent_name or config.agent.default_agent

    # Set up container sandbox runner
    container_runner = self._setup_sandbox(config)

    # Set up scheduler
    scheduler_store, task_scheduler = self._setup_scheduler(config, bus)

    # Set up workflow engine
    workflow_engine = self._setup_workflow(config, bus)

    # Set up session store
    session_store = self._setup_sessions(config)

    # Set up capability policy
    capability_policy = self._setup_capabilities(config)

    # Set up learning orchestrator (when training is enabled)
    learning_orchestrator = self._setup_learning_orchestrator(config)

    # Agent Manager
    agent_manager = None
    if config.agent_manager.enabled:
        try:
            from pathlib import Path

            from openjarvis.agents.manager import AgentManager

            am_db = config.agent_manager.db_path or str(
                Path("~/.openjarvis/agents.db").expanduser()
            )
            agent_manager = AgentManager(db_path=am_db)
        except Exception as exc:
            logger.warning("Failed to initialize agent manager: %s", exc)

    # Executor + Scheduler (depend on agent_manager)
    agent_executor = None
    agent_scheduler = None
    if agent_manager is not None:
        try:
            from openjarvis.agents.executor import AgentExecutor
            from openjarvis.agents.scheduler import AgentScheduler

            agent_executor = AgentExecutor(
                manager=agent_manager,
                event_bus=bus,
            )
            agent_scheduler = AgentScheduler(
                manager=agent_manager,
                executor=agent_executor,
            )
        except Exception:
            logger.warning("Failed to initialize agent scheduler", exc_info=True)

    # Set up speech backend
    speech_backend = None
    speech_enabled = self._speech if self._speech is not None else True
    if speech_enabled:
        try:
            from openjarvis.speech._discovery import get_speech_backend
            speech_backend = get_speech_backend(config)
        except Exception as exc:
            logger.warning("Failed to initialize speech backend: %s", exc)

    system = JarvisSystem(
        config=config,
        bus=bus,
        engine=engine,
        engine_key=engine_key,
        model=model,
        agent_name=agent_name,
        tools=tool_list,
        tool_executor=tool_executor,
        memory_backend=memory_backend,
        channel_backend=channel_backend,
        telemetry_store=telemetry_store,
        gpu_monitor=gpu_monitor,
        scheduler_store=scheduler_store,
        scheduler=task_scheduler,
        container_runner=container_runner,
        workflow_engine=workflow_engine,
        session_store=session_store,
        capability_policy=capability_policy,
        agent_manager=agent_manager,
        agent_scheduler=agent_scheduler,
        agent_executor=agent_executor,
        speech_backend=speech_backend,
    )
    system._learning_orchestrator = learning_orchestrator
    # Wire system reference — must happen before scheduler.start()
    if system.agent_executor is not None:
        system.agent_executor.set_system(system)
    return system