Skip to content

manager

manager

Operator manager — lifecycle management for autonomous operators.

Classes

OperatorManager

OperatorManager(system: Any)

Manages operator manifests and their lifecycle via the TaskScheduler.

PARAMETER DESCRIPTION
system

A JarvisSystem instance (used to access scheduler, session_store, memory_backend, and to run operators via system.ask()).

TYPE: Any

Source code in src/openjarvis/operators/manager.py
def __init__(self, system: Any) -> None:
    self._system = system
    self._manifests: Dict[str, OperatorManifest] = {}
Attributes
manifests property
manifests: Dict[str, OperatorManifest]

All registered manifests.

Functions
register
register(manifest: OperatorManifest) -> None

Register an operator manifest.

Source code in src/openjarvis/operators/manager.py
def register(self, manifest: OperatorManifest) -> None:
    """Register an operator manifest."""
    self._manifests[manifest.id] = manifest
    logger.info("Registered operator: %s", manifest.id)
discover
discover(directory: str | Path) -> List[OperatorManifest]

Discover and register operator manifests from a directory.

Scans for *.toml files in directory and loads each as an operator manifest.

Source code in src/openjarvis/operators/manager.py
def discover(self, directory: str | Path) -> List[OperatorManifest]:
    """Discover and register operator manifests from a directory.

    Scans for ``*.toml`` files in *directory* and loads each as an
    operator manifest.
    """
    directory = Path(directory).expanduser()
    found: List[OperatorManifest] = []
    if not directory.is_dir():
        return found
    for toml_path in sorted(directory.glob("*.toml")):
        try:
            manifest = load_operator(toml_path)
            self.register(manifest)
            found.append(manifest)
        except Exception:
            logger.warning("Failed to load operator from %s", toml_path)
    return found
activate
activate(operator_id: str) -> str

Activate an operator by creating a scheduler task.

Returns the scheduler task ID (deterministic: operator:{id}).

Raises KeyError if the operator is not registered, or RuntimeError if the scheduler is not available.

Source code in src/openjarvis/operators/manager.py
def activate(self, operator_id: str) -> str:
    """Activate an operator by creating a scheduler task.

    Returns the scheduler task ID (deterministic: ``operator:{id}``).

    Raises ``KeyError`` if the operator is not registered, or
    ``RuntimeError`` if the scheduler is not available.
    """
    manifest = self._manifests.get(operator_id)
    if manifest is None:
        raise KeyError(f"Operator not registered: {operator_id}")

    scheduler = self._system.scheduler
    if scheduler is None:
        raise RuntimeError(
            "TaskScheduler not available. Enable [scheduler] in config."
        )

    task_id = f"operator:{operator_id}"

    # Check if already active
    try:
        existing = scheduler.list_tasks()
        for t in existing:
            if t.id == task_id and t.status == "active":
                logger.info("Operator %s already active", operator_id)
                return task_id
    except Exception:
        pass

    tools_str = ",".join(manifest.tools) if manifest.tools else ""

    metadata: Dict[str, Any] = {
        "operator_id": operator_id,
        "system_prompt": manifest.system_prompt,
        "temperature": manifest.temperature,
        "max_turns": manifest.max_turns,
    }

    # Use the scheduler's create_task but with a deterministic ID
    task = scheduler.create_task(
        prompt=_TICK_PROMPT,
        schedule_type=manifest.schedule_type,
        schedule_value=manifest.schedule_value,
        agent="operative",
        tools=tools_str,
        metadata=metadata,
    )
    # Override the random ID with our deterministic one
    task_dict = task.to_dict()
    task_dict["id"] = task_id
    scheduler._store.save_task(task_dict)

    logger.info("Activated operator %s (task_id=%s)", operator_id, task_id)
    return task_id
deactivate
deactivate(operator_id: str) -> None

Deactivate an operator by cancelling its scheduler task.

Source code in src/openjarvis/operators/manager.py
def deactivate(self, operator_id: str) -> None:
    """Deactivate an operator by cancelling its scheduler task."""
    scheduler = self._system.scheduler
    if scheduler is None:
        raise RuntimeError("TaskScheduler not available.")
    task_id = f"operator:{operator_id}"
    try:
        scheduler.cancel_task(task_id)
        logger.info("Deactivated operator %s", operator_id)
    except KeyError:
        logger.warning("No active task for operator %s", operator_id)
pause
pause(operator_id: str) -> None

Pause an active operator.

Source code in src/openjarvis/operators/manager.py
def pause(self, operator_id: str) -> None:
    """Pause an active operator."""
    scheduler = self._system.scheduler
    if scheduler is None:
        raise RuntimeError("TaskScheduler not available.")
    scheduler.pause_task(f"operator:{operator_id}")
    logger.info("Paused operator %s", operator_id)
resume
resume(operator_id: str) -> None

Resume a paused operator.

Source code in src/openjarvis/operators/manager.py
def resume(self, operator_id: str) -> None:
    """Resume a paused operator."""
    scheduler = self._system.scheduler
    if scheduler is None:
        raise RuntimeError("TaskScheduler not available.")
    scheduler.resume_task(f"operator:{operator_id}")
    logger.info("Resumed operator %s", operator_id)
status
status() -> List[Dict[str, Any]]

Return status of all registered operators.

Merges manifest info with scheduler task state.

Source code in src/openjarvis/operators/manager.py
def status(self) -> List[Dict[str, Any]]:
    """Return status of all registered operators.

    Merges manifest info with scheduler task state.
    """
    results: List[Dict[str, Any]] = []
    scheduler = self._system.scheduler

    for op_id, manifest in self._manifests.items():
        info: Dict[str, Any] = {
            "id": op_id,
            "name": manifest.name,
            "description": manifest.description,
            "schedule_type": manifest.schedule_type,
            "schedule_value": manifest.schedule_value,
            "tools": manifest.tools,
            "status": "registered",
            "next_run": None,
            "last_run": None,
        }

        if scheduler is not None:
            task_id = f"operator:{op_id}"
            try:
                tasks = scheduler.list_tasks()
                for t in tasks:
                    if t.id == task_id:
                        info["status"] = t.status
                        info["next_run"] = t.next_run
                        info["last_run"] = t.last_run
                        break
            except Exception:
                pass

        results.append(info)
    return results
run_once
run_once(operator_id: str) -> str

Execute a single tick of an operator immediately.

Useful for development and testing. Returns the agent's response.

Source code in src/openjarvis/operators/manager.py
def run_once(self, operator_id: str) -> str:
    """Execute a single tick of an operator immediately.

    Useful for development and testing. Returns the agent's response.
    """
    manifest = self._manifests.get(operator_id)
    if manifest is None:
        raise KeyError(f"Operator not registered: {operator_id}")

    tools_list = manifest.tools if manifest.tools else None
    result = self._system.ask(
        _TICK_PROMPT,
        agent="operative",
        tools=tools_list,
        system_prompt=manifest.system_prompt,
        operator_id=operator_id,
        temperature=manifest.temperature,
    )
    if isinstance(result, dict):
        return result.get("content", str(result))
    return str(result)
get_manifest
get_manifest(operator_id: str) -> Optional[OperatorManifest]

Return the manifest for an operator, or None.

Source code in src/openjarvis/operators/manager.py
def get_manifest(self, operator_id: str) -> Optional[OperatorManifest]:
    """Return the manifest for an operator, or None."""
    return self._manifests.get(operator_id)

Functions