Persistent agent lifecycle manager.
Composition layer — stores agent state in SQLite, delegates all computation
to the five existing primitives (Intelligence, Agent, Tools, Engine, Learning).
Classes
AgentManager
AgentManager(db_path: str)
Persistent agent lifecycle manager with SQLite backing.
Source code in src/openjarvis/agents/manager.py
| def __init__(self, db_path: str) -> None:
self._db_path = str(db_path)
self._conn = sqlite3.connect(self._db_path, check_same_thread=False)
self._conn.row_factory = sqlite3.Row
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.execute("PRAGMA foreign_keys=ON")
self._conn.execute(_CREATE_AGENTS)
self._conn.execute(_CREATE_TASKS)
self._conn.execute(_CREATE_BINDINGS)
self._conn.executescript(_CREATE_CHECKPOINTS)
self._conn.executescript(_CREATE_MESSAGES)
self._conn.executescript(_CREATE_LEARNING_LOG)
self._conn.commit()
# Schema migrations for runtime columns
_MIGRATIONS = [
"ALTER TABLE managed_agents ADD COLUMN total_tokens INTEGER DEFAULT 0",
"ALTER TABLE managed_agents ADD COLUMN total_cost REAL DEFAULT 0",
"ALTER TABLE managed_agents ADD COLUMN total_runs INTEGER DEFAULT 0",
"ALTER TABLE managed_agents ADD COLUMN last_run_at REAL",
"ALTER TABLE managed_agents ADD COLUMN last_activity_at REAL",
"ALTER TABLE managed_agents ADD COLUMN stall_retries INTEGER DEFAULT 0",
]
for migration in _MIGRATIONS:
try:
self._conn.execute(migration)
except sqlite3.OperationalError:
pass # Column already exists
self._conn.commit()
|
Functions
start_tick
start_tick(agent_id: str) -> None
Mark agent as running. Raises ValueError if already running.
Source code in src/openjarvis/agents/manager.py
| def start_tick(self, agent_id: str) -> None:
"""Mark agent as running. Raises ValueError if already running."""
agent = self.get_agent(agent_id)
if agent and agent["status"] == "running":
raise ValueError(f"Agent {agent_id} is already executing a tick")
self._set_status(agent_id, "running")
|
find_binding_for_channel
find_binding_for_channel(channel_type: str, channel_id: str) -> Optional[Dict[str, Any]]
Find a dedicated binding for a specific channel.
Source code in src/openjarvis/agents/manager.py
| def find_binding_for_channel(
self, channel_type: str, channel_id: str
) -> Optional[Dict[str, Any]]:
"""Find a dedicated binding for a specific channel."""
rows = self._conn.execute(
"SELECT * FROM channel_bindings WHERE channel_type = ?",
(channel_type,),
).fetchall()
for row in rows:
binding = self._row_to_binding(row)
config = binding.get("config", {})
if config.get("channel") == channel_id:
return binding
return None
|
list_templates
staticmethod
list_templates() -> List[Dict[str, Any]]
Discover built-in and user templates.
Source code in src/openjarvis/agents/manager.py
| @staticmethod
def list_templates() -> List[Dict[str, Any]]:
"""Discover built-in and user templates."""
import importlib.resources
try:
import tomllib
except ModuleNotFoundError:
import tomli as tomllib # type: ignore[no-redef]
templates: List[Dict[str, Any]] = []
# Built-in templates
try:
tpl_dir = importlib.resources.files("openjarvis.agents") / "templates"
for item in tpl_dir.iterdir():
if str(item).endswith(".toml"):
data = tomllib.loads(item.read_text(encoding="utf-8"))
tpl = data.get("template", {})
tpl["source"] = "built-in"
templates.append(tpl)
except Exception:
pass
# User templates
user_dir = Path("~/.openjarvis/templates").expanduser()
if user_dir.is_dir():
for f in user_dir.glob("*.toml"):
try:
data = tomllib.loads(f.read_text(encoding="utf-8"))
tpl = data.get("template", {})
tpl["source"] = "user"
templates.append(tpl)
except Exception:
pass
return templates
|
create_from_template
create_from_template(template_id: str, name: str, overrides: Optional[Dict[str, Any]] = None) -> Dict[str, Any]
Create an agent from a template with optional overrides.
Source code in src/openjarvis/agents/manager.py
| def create_from_template(
self, template_id: str, name: str, overrides: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
"""Create an agent from a template with optional overrides."""
templates = self.list_templates()
tpl = next((t for t in templates if t.get("id") == template_id), None)
if not tpl:
raise ValueError(f"Template not found: {template_id}")
skip = {"id", "name", "description", "source"}
config = {k: v for k, v in tpl.items() if k not in skip}
if overrides:
config.update(overrides)
agent_type = config.pop("agent_type", "monitor_operative")
return self.create_agent(name=name, agent_type=agent_type, config=config)
|
store_agent_response
store_agent_response(agent_id: str, content: str) -> dict
Store an agent-to-user response message.
Source code in src/openjarvis/agents/manager.py
| def store_agent_response(self, agent_id: str, content: str) -> dict:
"""Store an agent-to-user response message."""
msg_id = uuid4().hex[:16]
now = time.time()
self._conn.execute(
"INSERT INTO agent_messages"
" (id, agent_id, direction, content, mode, status, created_at)"
" VALUES (?, ?, 'agent_to_user', ?, 'immediate', 'delivered', ?)",
(msg_id, agent_id, content, now),
)
self._conn.commit()
return {
"id": msg_id,
"agent_id": agent_id,
"direction": "agent_to_user",
"content": content,
"mode": "immediate",
"status": "delivered",
"created_at": now,
}
|