Skip to content

Index

sandbox

Container sandbox for isolated agent execution.

Classes

AllowedRoot dataclass

AllowedRoot(path: str, read_only: bool = True)

An allowed mount root with optional read-only constraint.

MountAllowlist dataclass

MountAllowlist(roots: List[AllowedRoot] = list(), blocked_patterns: List[str] = (lambda: list(DEFAULT_BLOCKED_PATTERNS))())

Allowlist for container mounts.

ContainerRunner

ContainerRunner(*, image: str = '', timeout: int = 0, mount_allowlist_path: str = '', max_concurrent: int = 5, runtime: str = 'docker')

Manages Docker container lifecycle for sandboxed execution.

PARAMETER DESCRIPTION
image

Docker image to run. Defaults to openjarvis-sandbox:latest.

TYPE: str DEFAULT: ''

timeout

Maximum execution time in seconds.

TYPE: int DEFAULT: 0

mount_allowlist_path

Path to a JSON mount-allowlist file.

TYPE: str DEFAULT: ''

max_concurrent

Maximum number of concurrent containers.

TYPE: int DEFAULT: 5

runtime

Container runtime binary name (docker or podman).

TYPE: str DEFAULT: 'docker'

Source code in src/openjarvis/sandbox/runner.py
def __init__(
    self,
    *,
    image: str = "",
    timeout: int = 0,
    mount_allowlist_path: str = "",
    max_concurrent: int = 5,
    runtime: str = "docker",
) -> None:
    self._image = image or self.DEFAULT_IMAGE
    self._timeout = timeout or self.DEFAULT_TIMEOUT
    self._mount_allowlist_path = mount_allowlist_path
    self._max_concurrent = max_concurrent
    self._runtime = runtime
    self._allowlist = self._load_allowlist()
Functions
run
run(input_data: Dict[str, Any], *, workspace: str = '', mounts: Optional[List[str]] = None, secrets: Optional[Dict[str, str]] = None, env: Optional[Dict[str, str]] = None) -> Dict[str, Any]

Spawn a container, send input, parse output.

PARAMETER DESCRIPTION
input_data

JSON-serializable payload sent to the container's stdin.

TYPE: Dict[str, Any]

workspace

Working directory inside the container.

TYPE: str DEFAULT: ''

mounts

Host paths to bind-mount (read-only).

TYPE: Optional[List[str]] DEFAULT: None

secrets

Key-value pairs injected into input (not env vars).

TYPE: Optional[Dict[str, str]] DEFAULT: None

env

Environment variables for the container.

TYPE: Optional[Dict[str, str]] DEFAULT: None

RETURNS DESCRIPTION
dict

Parsed JSON output from the container.

Source code in src/openjarvis/sandbox/runner.py
def run(
    self,
    input_data: Dict[str, Any],
    *,
    workspace: str = "",
    mounts: Optional[List[str]] = None,
    secrets: Optional[Dict[str, str]] = None,
    env: Optional[Dict[str, str]] = None,
) -> Dict[str, Any]:
    """Spawn a container, send input, parse output.

    Parameters
    ----------
    input_data:
        JSON-serializable payload sent to the container's stdin.
    workspace:
        Working directory inside the container.
    mounts:
        Host paths to bind-mount (read-only).
    secrets:
        Key-value pairs injected into input (not env vars).
    env:
        Environment variables for the container.

    Returns
    -------
    dict
        Parsed JSON output from the container.
    """
    validated_mounts = self._validate_mounts(mounts)

    container_name = f"oj-sandbox-{uuid.uuid4().hex[:12]}"

    # Build request payload
    payload = dict(input_data)
    if secrets:
        payload["_secrets"] = secrets
    if workspace:
        payload["_workspace"] = workspace

    args = self._build_docker_args(
        container_name, validated_mounts, env,
    )

    try:
        proc = subprocess.run(
            args,
            input=json.dumps(payload),
            capture_output=True,
            text=True,
            timeout=self._timeout,
        )
    except subprocess.TimeoutExpired:
        # Kill the container on timeout
        self.stop(container_name)
        return {
            "content": (
                f"Container timed out after {self._timeout}s."
            ),
            "error": True,
            "error_type": "timeout",
        }

    if proc.returncode != 0:
        stderr = proc.stderr.strip() if proc.stderr else ""
        logger.error(
            "Container %s exited %d: %s",
            container_name, proc.returncode, stderr,
        )
        return {
            "content": f"Container failed: {stderr}",
            "error": True,
            "returncode": proc.returncode,
        }

    return self._parse_output(proc.stdout)
stop
stop(container_name: str) -> None

Force-stop a running container.

Source code in src/openjarvis/sandbox/runner.py
def stop(self, container_name: str) -> None:
    """Force-stop a running container."""
    try:
        runtime = shutil.which(self._runtime) or self._runtime
        subprocess.run(
            [runtime, "rm", "-f", container_name],
            capture_output=True,
            timeout=30,
        )
    except Exception:
        logger.debug(
            "Failed to stop container %s", container_name,
            exc_info=True,
        )
cleanup_orphans
cleanup_orphans() -> None

Remove orphaned sandbox containers.

Source code in src/openjarvis/sandbox/runner.py
def cleanup_orphans(self) -> None:
    """Remove orphaned sandbox containers."""
    try:
        runtime = shutil.which(self._runtime) or self._runtime
        result = subprocess.run(
            [
                runtime, "ps", "-aq",
                "--filter", "label=openjarvis-sandbox=true",
            ],
            capture_output=True,
            text=True,
            timeout=30,
        )
        container_ids = result.stdout.strip().split()
        if container_ids:
            subprocess.run(
                [runtime, "rm", "-f", *container_ids],
                capture_output=True,
                timeout=30,
            )
            logger.info(
                "Cleaned up %d orphaned containers",
                len(container_ids),
            )
    except Exception:
        logger.debug(
            "Orphan cleanup failed", exc_info=True,
        )

SandboxedAgent

SandboxedAgent(agent: BaseAgent, runner: ContainerRunner, *, engine: Optional[InferenceEngine] = None, model: str = '', workspace: str = '', mounts: Optional[List[str]] = None, secrets: Optional[Dict[str, str]] = None, bus: Optional[EventBus] = None)

Bases: BaseAgent

Transparent wrapper that runs any BaseAgent in a container.

Follows the GuardrailsEngine wrapper pattern — the wrapped agent's configuration is serialized and sent to the container.

Source code in src/openjarvis/sandbox/runner.py
def __init__(
    self,
    agent: BaseAgent,
    runner: ContainerRunner,
    *,
    engine: Optional[InferenceEngine] = None,
    model: str = "",
    workspace: str = "",
    mounts: Optional[List[str]] = None,
    secrets: Optional[Dict[str, str]] = None,
    bus: Optional[EventBus] = None,
) -> None:
    # Use the wrapped agent's engine/model for BaseAgent init
    _engine = engine or getattr(agent, "_engine", None)
    _model = model or getattr(agent, "_model", "")
    super().__init__(
        _engine,  # type: ignore[arg-type]
        _model,
        bus=bus,
    )
    self._wrapped_agent = agent
    self._runner = runner
    self._workspace = workspace
    self._mounts = mounts or []
    self._secrets = secrets or {}
Functions
run
run(input: str, context: Optional[AgentContext] = None, **kwargs: Any) -> AgentResult

Delegate execution to the container runner.

Source code in src/openjarvis/sandbox/runner.py
def run(
    self,
    input: str,
    context: Optional[AgentContext] = None,
    **kwargs: Any,
) -> AgentResult:
    """Delegate execution to the container runner."""
    self._emit_turn_start(input)

    input_data = {
        "prompt": input,
        "agent_id": self._wrapped_agent.agent_id,
        "model": getattr(self._wrapped_agent, "_model", ""),
    }

    result = self._runner.run(
        input_data,
        workspace=self._workspace,
        mounts=self._mounts,
        secrets=self._secrets,
    )

    content = result.get("content", "")
    error = result.get("error", False)

    # Parse tool results if present
    raw_tools = result.get("tool_results", [])
    tool_results = [
        ToolResult(
            tool_name=tr.get("tool_name", "unknown"),
            content=tr.get("content", ""),
            success=tr.get("success", True),
        )
        for tr in raw_tools
    ]

    self._emit_turn_end(turns=1, error=error)
    return AgentResult(
        content=content,
        tool_results=tool_results,
        turns=1,
        metadata=result.get("metadata", {}),
    )

Functions

validate_mount

validate_mount(mount_path: str, allowlist: MountAllowlist) -> bool

Validate a single mount path against the allowlist.

Returns True if the mount is allowed, False otherwise.

Source code in src/openjarvis/sandbox/mount_security.py
def validate_mount(
    mount_path: str,
    allowlist: MountAllowlist,
) -> bool:
    """Validate a single mount path against the allowlist.

    Returns ``True`` if the mount is allowed, ``False`` otherwise.
    """
    # Resolve symlinks and normalize
    try:
        resolved = str(Path(mount_path).resolve())
    except (OSError, ValueError):
        return False

    # Check blocked patterns
    if _is_blocked(resolved, allowlist.blocked_patterns):
        logger.debug("Mount blocked by pattern: %s", mount_path)
        return False

    # Check allowed roots
    if not _is_under_allowed_root(resolved, allowlist.roots):
        logger.debug("Mount not under any allowed root: %s", mount_path)
        return False

    return True

validate_mounts

validate_mounts(mounts: List[str], allowlist: MountAllowlist) -> List[str]

Validate a list of mount paths. Returns only valid mounts.

Raises :class:ValueError for any blocked mount.

Source code in src/openjarvis/sandbox/mount_security.py
def validate_mounts(
    mounts: List[str],
    allowlist: MountAllowlist,
) -> List[str]:
    """Validate a list of mount paths. Returns only valid mounts.

    Raises :class:`ValueError` for any blocked mount.
    """
    valid: List[str] = []
    for mount in mounts:
        if _is_blocked(mount, allowlist.blocked_patterns):
            raise ValueError(
                f"Mount path blocked by security policy: {mount}"
            )
        if _is_under_allowed_root(mount, allowlist.roots):
            valid.append(mount)
        else:
            raise ValueError(
                f"Mount path not under any allowed root: {mount}"
            )
    return valid