Skip to content

system

system

Composition layer -- config-driven construction of a fully wired JarvisSystem.

Classes

JarvisSystem dataclass

JarvisSystem(config: JarvisConfig, bus: EventBus, engine: InferenceEngine, engine_key: str, model: str, agent: Optional[Any] = None, agent_name: str = '', tools: List[BaseTool] = list(), tool_executor: Optional[ToolExecutor] = None, memory_backend: Optional[Any] = None, channel_backend: Optional[Any] = None, router: Optional[Any] = None, mcp_server: Optional[Any] = None, telemetry_store: Optional[Any] = None, trace_store: Optional[Any] = None, trace_collector: Optional[Any] = None, gpu_monitor: Optional[Any] = None, scheduler_store: Optional[Any] = None, scheduler: Optional[Any] = None, container_runner: Optional[Any] = None, workflow_engine: Optional[Any] = None, session_store: Optional[Any] = None, capability_policy: Optional[Any] = None, audit_logger: Optional[Any] = None, boundary_guard: Optional[Any] = None, operator_manager: Optional[Any] = None, agent_manager: Optional[Any] = None, agent_scheduler: Optional[Any] = None, agent_executor: Optional[Any] = None, speech_backend: Optional[Any] = None, _learning_orchestrator: Optional[Any] = None, _mcp_clients: List = list())

Fully wired system -- the single source of truth for primitive composition.

Functions
ask
ask(query: str, *, context: bool = True, temperature: Optional[float] = None, max_tokens: Optional[int] = None, agent: Optional[str] = None, tools: Optional[List[str]] = None, system_prompt: Optional[str] = None, operator_id: Optional[str] = None, prior_messages: Optional[List[Message]] = None) -> Dict[str, Any]

Execute a query through the system and return a result dict.

Source code in src/openjarvis/system.py
def ask(
    self,
    query: str,
    *,
    context: bool = True,
    temperature: Optional[float] = None,
    max_tokens: Optional[int] = None,
    agent: Optional[str] = None,
    tools: Optional[List[str]] = None,
    system_prompt: Optional[str] = None,
    operator_id: Optional[str] = None,
    prior_messages: Optional[List[Message]] = None,
) -> Dict[str, Any]:
    """Execute a query through the system and return a result dict."""
    if temperature is None:
        temperature = self.config.intelligence.temperature
    if max_tokens is None:
        max_tokens = self.config.intelligence.max_tokens

    messages = [Message(role=Role.USER, content=query)]

    # Context injection from memory
    if context and self.memory_backend and self.config.agent.context_from_memory:
        try:
            from openjarvis.tools.storage.context import (
                ContextConfig,
                inject_context,
            )

            ctx_cfg = ContextConfig(
                top_k=self.config.memory.context_top_k,
                min_score=self.config.memory.context_min_score,
                max_context_tokens=self.config.memory.context_max_tokens,
            )
            messages = inject_context(
                query,
                messages,
                self.memory_backend,
                config=ctx_cfg,
            )
        except Exception as exc:
            logger.warning("Failed to inject memory context: %s", exc)

    # Agent mode
    use_agent = agent or self.agent_name
    if use_agent and use_agent != "none":
        return self._run_agent(
            query,
            messages,
            use_agent,
            tools,
            temperature,
            max_tokens,
            system_prompt=system_prompt,
            operator_id=operator_id,
            prior_messages=prior_messages,
        )

    # Direct engine mode
    result = self.engine.generate(
        messages,
        model=self.model,
        temperature=temperature,
        max_tokens=max_tokens,
    )
    return {
        "content": result.get("content", ""),
        "usage": result.get("usage", {}),
        "model": self.model,
        "engine": self.engine_key,
    }
wire_channel
wire_channel(channel_bridge: Any) -> None

Register a message handler on channel_bridge that routes every incoming message through this system (agent or engine) and replies.

Sessions are isolated per "<channel>:<conversation_id>" key so each chat retains its own history.

PARAMETER DESCRIPTION
channel_bridge

A connected :class:~openjarvis.channels._stubs.BaseChannel instance whose on_message method accepts a callable.

TYPE: Any

Source code in src/openjarvis/system.py
def wire_channel(self, channel_bridge: Any) -> None:
    """Register a message handler on *channel_bridge* that routes every
    incoming message through this system (agent or engine) and replies.

    Sessions are isolated per ``"<channel>:<conversation_id>"`` key so
    each chat retains its own history.

    Parameters
    ----------
    channel_bridge:
        A connected :class:`~openjarvis.channels._stubs.BaseChannel`
        instance whose ``on_message`` method accepts a callable.
    """
    from openjarvis.core.types import Message, Role
    from openjarvis.sessions.session import SessionStore

    if self.session_store is None:
        from pathlib import Path

        self.session_store = SessionStore(
            db_path=Path(self.config.sessions.db_path).expanduser(),
            max_age_hours=self.config.sessions.max_age_hours,
            consolidation_threshold=self.config.sessions.consolidation_threshold,
        )

    _system = self  # capture for closure

    def _on_channel_message(cm) -> None:
        session_key = f"{cm.channel}:{cm.conversation_id}"
        session = _system.session_store.get_or_create(
            session_key,
            channel=cm.channel,
            channel_user_id=cm.sender,
        )

        prior_msgs: List[Message] = []
        for sm in session.messages:
            try:
                role = Role(sm.role)
            except ValueError:
                role = Role.USER
            prior_msgs.append(Message(role=role, content=sm.content))

        reply = ""
        try:
            if _system.agent_name and _system.agent_name != "none":
                result = _system.ask(
                    cm.content,
                    context=False,
                    agent=_system.agent_name,
                    prior_messages=prior_msgs,
                )
                reply = result.get("content", "")
            else:
                result = _system.ask(
                    cm.content,
                    context=False,
                    prior_messages=prior_msgs,
                )
                reply = result.get("content", "")
        except Exception:
            logger.exception("Channel message handler error")
            reply = "Sorry, I encountered an error processing your message."

        try:
            _system.session_store.save_message(
                session.session_id,
                "user",
                cm.content,
                channel=cm.channel,
            )
            _system.session_store.save_message(
                session.session_id,
                "assistant",
                reply,
                channel=cm.channel,
            )
        except Exception:
            logger.debug("Session save error", exc_info=True)

        if reply:
            try:
                channel_bridge.send(
                    cm.channel,
                    reply,
                    conversation_id=cm.conversation_id,
                )
            except Exception:
                logger.exception("Channel send error")

    channel_bridge.on_message(_on_channel_message)
close
close() -> None

Release resources.

Source code in src/openjarvis/system.py
def close(self) -> None:
    """Release resources."""
    if self.scheduler and hasattr(self.scheduler, "stop"):
        self.scheduler.stop()
    for resource in (
        self.scheduler_store,
        self.engine,
        self.gpu_monitor,
        self.telemetry_store,
        self.trace_store,
        self.memory_backend,
        self.session_store,
        self.channel_backend,
        self.workflow_engine,
        self.container_runner,
    ):
        if resource and hasattr(resource, "close"):
            resource.close()
    if self.agent_manager is not None:
        self.agent_manager.close()
    if self.agent_scheduler is not None:
        self.agent_scheduler.stop()
    self._close_mcp_clients()

SystemBuilder

SystemBuilder(config: Optional[JarvisConfig] = None, *, config_path: Optional[Any] = None)

Config-driven fluent builder for JarvisSystem.

Source code in src/openjarvis/system.py
def __init__(
    self,
    config: Optional[JarvisConfig] = None,
    *,
    config_path: Optional[Any] = None,
) -> None:
    if config is not None:
        self._config = config
    elif config_path is not None:
        from pathlib import Path

        self._config = load_config(Path(config_path))
    else:
        self._config = load_config()

    self._engine_key: Optional[str] = None
    self._model: Optional[str] = None
    self._agent_name: Optional[str] = None
    self._tool_names: Optional[List[str]] = None
    self._telemetry: Optional[bool] = None
    self._traces: Optional[bool] = None
    self._bus: Optional[EventBus] = None
    self._sandbox: Optional[bool] = None
    self._scheduler: Optional[bool] = None
    self._workflow: Optional[bool] = None
    self._sessions: Optional[bool] = None
    self._speech: Optional[bool] = None
    self._mcp_clients: List = []
Functions
build
build() -> JarvisSystem

Construct a fully wired JarvisSystem.

Source code in src/openjarvis/system.py
def build(self) -> JarvisSystem:
    """Construct a fully wired JarvisSystem."""
    config = self._config
    bus = self._bus or get_event_bus()

    # Resolve engine
    engine, engine_key = self._resolve_engine(config)

    # Resolve model
    model = self._resolve_model(config, engine)

    # Compute telemetry_enabled and traces_enabled once
    telemetry_enabled = (
        self._telemetry if self._telemetry is not None else config.telemetry.enabled
    )
    traces_enabled = (
        self._traces if self._traces is not None else config.traces.enabled
    )
    # Apply traces flag to config so downstream code respects it
    config.traces.enabled = traces_enabled
    gpu_monitor = None
    energy_monitor = None
    if telemetry_enabled and config.telemetry.gpu_metrics:
        # Try new multi-vendor EnergyMonitor first
        try:
            from openjarvis.telemetry.energy_monitor import (
                create_energy_monitor,
            )

            energy_monitor = create_energy_monitor(
                poll_interval_ms=config.telemetry.gpu_poll_interval_ms,
                prefer_vendor=config.telemetry.energy_vendor or None,
            )
        except ImportError:
            pass

        # Fall back to legacy GpuMonitor
        if energy_monitor is None:
            try:
                from openjarvis.telemetry.gpu_monitor import GpuMonitor

                if GpuMonitor.available():
                    gpu_monitor = GpuMonitor(
                        poll_interval_ms=config.telemetry.gpu_poll_interval_ms,
                    )
            except ImportError:
                pass

    # Apply security guardrails FIRST (innermost wrapper)
    from openjarvis.security import setup_security

    sec = setup_security(config, engine, bus)
    engine = sec.engine

    # Then wrap with InstrumentedEngine (outermost wrapper)
    if telemetry_enabled:
        from openjarvis.telemetry.instrumented_engine import (
            InstrumentedEngine,
        )

        engine = InstrumentedEngine(
            engine,
            bus,
            gpu_monitor=gpu_monitor,
            energy_monitor=energy_monitor,
        )

    # Set up telemetry store
    telemetry_store = None
    if telemetry_enabled:
        telemetry_store = self._setup_telemetry(config, bus)

    # Resolve memory backend
    memory_backend = self._resolve_memory(config)

    # Resolve channel backend
    channel_backend = self._resolve_channel(config, bus)

    # Resolve tools
    tool_list = self._resolve_tools(
        config,
        engine,
        model,
        memory_backend,
        channel_backend,
    )

    # Build tool executor
    tool_executor = ToolExecutor(tool_list, bus) if tool_list else None

    # Resolve agent name
    agent_name = self._agent_name or config.agent.default_agent

    # Set up container sandbox runner
    container_runner = self._setup_sandbox(config)

    # Set up scheduler
    scheduler_store, task_scheduler = self._setup_scheduler(config, bus)

    # Set up workflow engine
    workflow_engine = self._setup_workflow(config, bus)

    # Set up session store
    session_store = self._setup_sessions(config)

    # Set up trace store
    trace_store = None
    if traces_enabled:
        try:
            from openjarvis.traces.store import TraceStore

            trace_store = TraceStore(config.traces.db_path)
        except Exception:
            logger.warning("Failed to initialize TraceStore", exc_info=True)

    # Set up capability policy
    capability_policy = sec.capability_policy

    # Set up learning orchestrator (when training is enabled)
    learning_orchestrator = self._setup_learning_orchestrator(config)

    # Agent Manager
    agent_manager = None
    if config.agent_manager.enabled:
        try:
            from pathlib import Path

            from openjarvis.agents.manager import AgentManager

            am_db = config.agent_manager.db_path or str(
                Path("~/.openjarvis/agents.db").expanduser()
            )
            agent_manager = AgentManager(db_path=am_db)
        except Exception as exc:
            logger.warning("Failed to initialize agent manager: %s", exc)

    # Executor + Scheduler (depend on agent_manager)
    agent_executor = None
    agent_scheduler = None
    if agent_manager is not None:
        try:
            from openjarvis.agents.executor import AgentExecutor
            from openjarvis.agents.scheduler import AgentScheduler

            # Wire TraceStore into executor when tracing is enabled
            _trace_store = None
            if config.traces.enabled:
                try:
                    from openjarvis.traces.store import TraceStore

                    _trace_store = TraceStore(config.traces.db_path)
                except Exception:
                    logger.warning(
                        "Failed to initialize TraceStore",
                        exc_info=True,
                    )

            agent_executor = AgentExecutor(
                manager=agent_manager,
                event_bus=bus,
                trace_store=_trace_store,
            )
            agent_scheduler = AgentScheduler(
                manager=agent_manager,
                executor=agent_executor,
            )
        except Exception:
            logger.warning("Failed to initialize agent scheduler", exc_info=True)

    # Set up speech backend
    speech_backend = None
    speech_enabled = self._speech if self._speech is not None else True
    if speech_enabled:
        try:
            from openjarvis.speech._discovery import get_speech_backend

            speech_backend = get_speech_backend(config)
        except Exception as exc:
            logger.warning("Failed to initialize speech backend: %s", exc)

    system = JarvisSystem(
        config=config,
        bus=bus,
        engine=engine,
        engine_key=engine_key,
        model=model,
        agent_name=agent_name,
        tools=tool_list,
        tool_executor=tool_executor,
        memory_backend=memory_backend,
        channel_backend=channel_backend,
        telemetry_store=telemetry_store,
        trace_store=trace_store,
        gpu_monitor=gpu_monitor,
        scheduler_store=scheduler_store,
        scheduler=task_scheduler,
        container_runner=container_runner,
        workflow_engine=workflow_engine,
        session_store=session_store,
        capability_policy=capability_policy,
        audit_logger=sec.audit_logger,
        agent_manager=agent_manager,
        agent_scheduler=agent_scheduler,
        agent_executor=agent_executor,
        speech_backend=speech_backend,
    )
    system._learning_orchestrator = learning_orchestrator
    # Transfer MCP clients so JarvisSystem.close() can shut them down
    system._mcp_clients = list(getattr(self, "_mcp_clients", []))
    # Wire system reference — must happen before scheduler.start()
    if system.agent_executor is not None:
        system.agent_executor.set_system(system)
    return system

Functions