Skip to content

manager

manager

Persistent agent lifecycle manager.

Composition layer — stores agent state in SQLite, delegates all computation to the five existing primitives (Intelligence, Agent, Tools, Engine, Learning).

Classes

AgentManager

AgentManager(db_path: str)

Persistent agent lifecycle manager with SQLite backing.

Source code in src/openjarvis/agents/manager.py
def __init__(self, db_path: str) -> None:
    self._db_path = str(db_path)
    self._conn = sqlite3.connect(self._db_path, check_same_thread=False)
    self._conn.row_factory = sqlite3.Row
    self._conn.execute("PRAGMA journal_mode=WAL")
    self._conn.execute("PRAGMA foreign_keys=ON")
    self._conn.execute(_CREATE_AGENTS)
    self._conn.execute(_CREATE_TASKS)
    self._conn.execute(_CREATE_BINDINGS)
    self._conn.executescript(_CREATE_CHECKPOINTS)
    self._conn.executescript(_CREATE_MESSAGES)
    self._conn.executescript(_CREATE_LEARNING_LOG)
    self._conn.commit()
    # Schema migrations for runtime columns
    _MIGRATIONS = [
        "ALTER TABLE managed_agents ADD COLUMN total_tokens INTEGER DEFAULT 0",
        "ALTER TABLE managed_agents ADD COLUMN total_cost REAL DEFAULT 0",
        "ALTER TABLE managed_agents ADD COLUMN total_runs INTEGER DEFAULT 0",
        "ALTER TABLE managed_agents ADD COLUMN last_run_at REAL",
        "ALTER TABLE managed_agents ADD COLUMN last_activity_at REAL",
        "ALTER TABLE managed_agents ADD COLUMN stall_retries INTEGER DEFAULT 0",
    ]
    for migration in _MIGRATIONS:
        try:
            self._conn.execute(migration)
        except sqlite3.OperationalError:
            pass  # Column already exists
    self._conn.commit()
Functions
start_tick
start_tick(agent_id: str) -> None

Mark agent as running. Raises ValueError if already running.

Source code in src/openjarvis/agents/manager.py
def start_tick(self, agent_id: str) -> None:
    """Mark agent as running. Raises ValueError if already running."""
    agent = self.get_agent(agent_id)
    if agent and agent["status"] == "running":
        raise ValueError(f"Agent {agent_id} is already executing a tick")
    self._set_status(agent_id, "running")
find_binding_for_channel
find_binding_for_channel(channel_type: str, channel_id: str) -> Optional[Dict[str, Any]]

Find a dedicated binding for a specific channel.

Source code in src/openjarvis/agents/manager.py
def find_binding_for_channel(
    self, channel_type: str, channel_id: str
) -> Optional[Dict[str, Any]]:
    """Find a dedicated binding for a specific channel."""
    rows = self._conn.execute(
        "SELECT * FROM channel_bindings WHERE channel_type = ?",
        (channel_type,),
    ).fetchall()
    for row in rows:
        binding = self._row_to_binding(row)
        config = binding.get("config", {})
        if config.get("channel") == channel_id:
            return binding
    return None
list_templates staticmethod
list_templates() -> List[Dict[str, Any]]

Discover built-in and user templates.

Source code in src/openjarvis/agents/manager.py
@staticmethod
def list_templates() -> List[Dict[str, Any]]:
    """Discover built-in and user templates."""
    import importlib.resources

    try:
        import tomllib
    except ModuleNotFoundError:
        import tomli as tomllib  # type: ignore[no-redef]

    templates: List[Dict[str, Any]] = []

    # Built-in templates
    try:
        tpl_dir = importlib.resources.files("openjarvis.agents") / "templates"
        for item in tpl_dir.iterdir():
            if str(item).endswith(".toml"):
                data = tomllib.loads(item.read_text(encoding="utf-8"))
                tpl = data.get("template", {})
                tpl["source"] = "built-in"
                templates.append(tpl)
    except Exception:
        pass

    # User templates
    user_dir = Path("~/.openjarvis/templates").expanduser()
    if user_dir.is_dir():
        for f in user_dir.glob("*.toml"):
            try:
                data = tomllib.loads(f.read_text(encoding="utf-8"))
                tpl = data.get("template", {})
                tpl["source"] = "user"
                templates.append(tpl)
            except Exception:
                pass

    return templates
create_from_template
create_from_template(template_id: str, name: str, overrides: Optional[Dict[str, Any]] = None) -> Dict[str, Any]

Create an agent from a template with optional overrides.

Source code in src/openjarvis/agents/manager.py
def create_from_template(
    self, template_id: str, name: str, overrides: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
    """Create an agent from a template with optional overrides."""
    templates = self.list_templates()
    tpl = next((t for t in templates if t.get("id") == template_id), None)
    if not tpl:
        raise ValueError(f"Template not found: {template_id}")
    skip = {"id", "name", "description", "source"}
    config = {k: v for k, v in tpl.items() if k not in skip}
    if overrides:
        config.update(overrides)
    agent_type = config.pop("agent_type", "monitor_operative")
    return self.create_agent(name=name, agent_type=agent_type, config=config)
store_agent_response
store_agent_response(agent_id: str, content: str) -> dict

Store an agent-to-user response message.

Source code in src/openjarvis/agents/manager.py
def store_agent_response(self, agent_id: str, content: str) -> dict:
    """Store an agent-to-user response message."""
    msg_id = uuid4().hex[:16]
    now = time.time()
    self._conn.execute(
        "INSERT INTO agent_messages"
        " (id, agent_id, direction, content, mode, status, created_at)"
        " VALUES (?, ?, 'agent_to_user', ?, 'immediate', 'delivered', ?)",
        (msg_id, agent_id, content, now),
    )
    self._conn.commit()
    return {
        "id": msg_id,
        "agent_id": agent_id,
        "direction": "agent_to_user",
        "content": content,
        "mode": "immediate",
        "status": "delivered",
        "created_at": now,
    }