Skip to content

core

core

JarvisSystem — the fully wired system dataclass.

Classes

JarvisSystem dataclass

JarvisSystem(config: JarvisConfig, bus: EventBus, engine: InferenceEngine, engine_key: str, model: str, agent: Optional[BaseAgent] = None, agent_name: str = '', tools: List[BaseTool] = list(), tool_executor: Optional[ToolExecutor] = None, memory_backend: Optional[MemoryBackend] = None, channel_backend: Optional[BaseChannel] = None, router: Optional[RouterPolicy] = None, mcp_server: Optional[MCPServer] = None, telemetry_store: Optional[TelemetryStore] = None, trace_store: Optional[TraceStore] = None, trace_collector: Optional[TraceCollector] = None, gpu_monitor: Optional[GpuMonitor] = None, scheduler_store: Optional[SchedulerStore] = None, scheduler: Optional[TaskScheduler] = None, container_runner: Optional[ContainerRunner] = None, workflow_engine: Optional[WorkflowEngine] = None, session_store: Optional[SessionStore] = None, capability_policy: Optional[CapabilityPolicy] = None, audit_logger: Optional[AuditLogger] = None, boundary_guard: Optional[BoundaryGuard] = None, operator_manager: Optional[OperatorManager] = None, agent_manager: Optional[AgentManager] = None, agent_scheduler: Optional[AgentScheduler] = None, agent_executor: Optional[AgentExecutor] = None, speech_backend: Optional[SpeechBackend] = None, skill_manager: Optional[SkillManager] = None, _learning_orchestrator: Optional[LearningOrchestrator] = None, _mcp_clients: List[MCPClient] = list())

Fully wired system -- the single source of truth for primitive composition.

Functions
wire_channel
wire_channel(channel_bridge: Any) -> None

Register a message handler on channel_bridge that routes every incoming message through this system (agent or engine) and replies.

Sessions are isolated per "<channel>:<conversation_id>" key so each chat retains its own history.

PARAMETER DESCRIPTION
channel_bridge

A connected :class:~openjarvis.channels._stubs.BaseChannel instance whose on_message method accepts a callable.

TYPE: Any

Source code in src/openjarvis/system/core.py
def wire_channel(self, channel_bridge: Any) -> None:
    """Register a message handler on *channel_bridge* that routes every
    incoming message through this system (agent or engine) and replies.

    Sessions are isolated per ``"<channel>:<conversation_id>"`` key so
    each chat retains its own history.

    Parameters
    ----------
    channel_bridge:
        A connected :class:`~openjarvis.channels._stubs.BaseChannel`
        instance whose ``on_message`` method accepts a callable.
    """
    from openjarvis.core.types import Message
    from openjarvis.sessions.session import SessionStore

    if self.session_store is None:
        from pathlib import Path

        self.session_store = SessionStore(
            db_path=Path(self.config.sessions.db_path).expanduser(),
            max_age_hours=self.config.sessions.max_age_hours,
            consolidation_threshold=self.config.sessions.consolidation_threshold,
        )

    _system = self  # capture for closure

    def _on_channel_message(cm) -> None:
        session_key = f"{cm.channel}:{cm.conversation_id}"
        session = _system.session_store.get_or_create(
            session_key,
            channel=cm.channel,
            channel_user_id=cm.sender,
        )

        prior_msgs: List[Message] = []
        for sm in session.messages:
            try:
                role = Role(sm.role)
            except ValueError:
                role = Role.USER
            prior_msgs.append(Message(role=role, content=sm.content))

        reply = ""
        try:
            if _system.agent_name and _system.agent_name != "none":
                result = _system.ask(
                    cm.content,
                    context=False,
                    agent=_system.agent_name,
                    prior_messages=prior_msgs,
                )
                reply = result.get("content", "")
            else:
                result = _system.ask(
                    cm.content,
                    context=False,
                    prior_messages=prior_msgs,
                )
                reply = result.get("content", "")
        except Exception:
            logger.exception("Channel message handler error")
            reply = "Sorry, I encountered an error processing your message."

        try:
            _system.session_store.save_message(
                session.session_id,
                "user",
                cm.content,
                channel=cm.channel,
            )
            _system.session_store.save_message(
                session.session_id,
                "assistant",
                reply,
                channel=cm.channel,
            )
        except Exception:
            logger.debug("Session save error", exc_info=True)

        if reply:
            try:
                channel_bridge.send(
                    cm.channel,
                    reply,
                    conversation_id=cm.conversation_id,
                )
            except Exception:
                logger.exception("Channel send error")

    channel_bridge.on_message(_on_channel_message)
close
close() -> None

Release resources.

Source code in src/openjarvis/system/core.py
def close(self) -> None:
    """Release resources."""
    if self.scheduler and hasattr(self.scheduler, "stop"):
        self.scheduler.stop()
    for resource in (
        self.scheduler_store,
        self.engine,
        self.gpu_monitor,
        self.telemetry_store,
        self.trace_store,
        self.memory_backend,
        self.session_store,
        self.channel_backend,
        self.workflow_engine,
        self.container_runner,
    ):
        if resource and hasattr(resource, "close"):
            resource.close()
    if self.agent_manager is not None:
        self.agent_manager.close()
    if self.agent_scheduler is not None:
        self.agent_scheduler.stop()
    self._close_mcp_clients()