Skip to content

Index

security

Security guardrails — scanners, engine wrapper, audit, SSRF.

Classes

BaseScanner

Bases: ABC

Base class for all security scanners.

Subclasses implement pattern-based scanning for secrets, PII, or other sensitive content.

Functions
scan abstractmethod
scan(text: str) -> ScanResult

Scan text and return findings.

Source code in src/openjarvis/security/_stubs.py
@abstractmethod
def scan(self, text: str) -> ScanResult:
    """Scan *text* and return findings."""
redact abstractmethod
redact(text: str) -> str

Return text with sensitive matches replaced by redaction markers.

Source code in src/openjarvis/security/_stubs.py
@abstractmethod
def redact(self, text: str) -> str:
    """Return *text* with sensitive matches replaced by redaction markers."""

AuditLogger

AuditLogger(db_path: Union[str, Path] = DEFAULT_CONFIG_DIR / 'audit.db', bus: Optional[EventBus] = None)

Append-only SQLite audit log for security events.

PARAMETER DESCRIPTION
db_path

Path to the SQLite database file.

TYPE: Union[str, Path] DEFAULT: DEFAULT_CONFIG_DIR / 'audit.db'

bus

Optional event bus — if provided, subscribes to security events (SECURITY_SCAN, SECURITY_ALERT, SECURITY_BLOCK).

TYPE: Optional[EventBus] DEFAULT: None

Source code in src/openjarvis/security/audit.py
def __init__(
    self,
    db_path: Union[str, Path] = DEFAULT_CONFIG_DIR / "audit.db",
    bus: Optional[EventBus] = None,
) -> None:
    self._db_path = Path(db_path)
    from openjarvis.security.file_utils import secure_create

    secure_create(self._db_path)
    self._conn = sqlite3.connect(str(self._db_path))
    self._conn.execute(
        """
        CREATE TABLE IF NOT EXISTS security_events (
            id          INTEGER PRIMARY KEY,
            timestamp   REAL,
            event_type  TEXT,
            findings_json TEXT,
            content_preview TEXT,
            action_taken TEXT,
            row_hash    TEXT DEFAULT '',
            prev_hash   TEXT DEFAULT ''
        )
        """
    )
    self._conn.commit()
    self._migrate_schema()

    if bus is not None:
        bus.subscribe(EventType.SECURITY_SCAN, self._on_event)
        bus.subscribe(EventType.SECURITY_ALERT, self._on_event)
        bus.subscribe(EventType.SECURITY_BLOCK, self._on_event)
Functions
log
log(event: SecurityEvent) -> None

Insert a security event into the audit log with Merkle hash chain.

Source code in src/openjarvis/security/audit.py
def log(self, event: SecurityEvent) -> None:
    """Insert a security event into the audit log with Merkle hash chain."""
    findings_json = json.dumps(
        [
            {
                "pattern_name": f.pattern_name,
                "matched_text": f.matched_text,
                "threat_level": f.threat_level.value,
                "start": f.start,
                "end": f.end,
                "description": f.description,
            }
            for f in event.findings
        ]
    )

    # Compute hash chain
    prev_hash = self.tail_hash()
    hash_input = (
        f"{prev_hash}|{event.timestamp}|{event.event_type.value}"
        f"|{findings_json}|{event.content_preview}|{event.action_taken}"
    )
    row_hash = hashlib.sha256(hash_input.encode()).hexdigest()

    self._conn.execute(
        """
        INSERT INTO security_events
            (timestamp, event_type, findings_json, content_preview,
             action_taken, row_hash, prev_hash)
        VALUES (?, ?, ?, ?, ?, ?, ?)
        """,
        (
            event.timestamp,
            event.event_type.value,
            findings_json,
            event.content_preview,
            event.action_taken,
            row_hash,
            prev_hash,
        ),
    )
    self._conn.commit()
query
query(*, event_type: Optional[str] = None, since: Optional[float] = None, limit: int = 100) -> List[SecurityEvent]

Query logged security events with optional filters.

Source code in src/openjarvis/security/audit.py
def query(
    self,
    *,
    event_type: Optional[str] = None,
    since: Optional[float] = None,
    limit: int = 100,
) -> List[SecurityEvent]:
    """Query logged security events with optional filters."""
    sql = (
        "SELECT timestamp, event_type, findings_json,"
        " content_preview, action_taken"
        " FROM security_events WHERE 1=1"
    )
    params: list = []

    if event_type is not None:
        sql += " AND event_type = ?"
        params.append(event_type)
    if since is not None:
        sql += " AND timestamp >= ?"
        params.append(since)

    sql += " ORDER BY timestamp DESC LIMIT ?"
    params.append(limit)

    rows = self._conn.execute(sql, params).fetchall()
    events: List[SecurityEvent] = []
    for row in rows:
        ts, etype, findings_json, preview, action = row
        findings_raw = json.loads(findings_json) if findings_json else []
        findings = [
            ScanFinding(
                pattern_name=f["pattern_name"],
                matched_text=f["matched_text"],
                threat_level=ThreatLevel(f["threat_level"]),
                start=f["start"],
                end=f["end"],
                description=f.get("description", ""),
            )
            for f in findings_raw
        ]
        events.append(
            SecurityEvent(
                event_type=SecurityEventType(etype),
                timestamp=ts,
                findings=findings,
                content_preview=preview or "",
                action_taken=action or "",
            )
        )
    return events
tail_hash
tail_hash() -> str

Return the hash of the last row in the chain, or empty string.

Source code in src/openjarvis/security/audit.py
def tail_hash(self) -> str:
    """Return the hash of the last row in the chain, or empty string."""
    row = self._conn.execute(
        "SELECT row_hash FROM security_events ORDER BY id DESC LIMIT 1"
    ).fetchone()
    return row[0] if row and row[0] else ""
verify_chain
verify_chain() -> Tuple[bool, Optional[int]]

Verify the Merkle hash chain integrity.

RETURNS DESCRIPTION
tuple

(True, None) if the chain is valid, or (False, row_id) where row_id is the first broken link.

Source code in src/openjarvis/security/audit.py
def verify_chain(self) -> Tuple[bool, Optional[int]]:
    """Verify the Merkle hash chain integrity.

    Returns
    -------
    tuple
        ``(True, None)`` if the chain is valid, or
        ``(False, row_id)`` where *row_id* is the first broken link.
    """
    rows = self._conn.execute(
        "SELECT id, timestamp, event_type, findings_json,"
        " content_preview, action_taken, row_hash, prev_hash"
        " FROM security_events ORDER BY id"
    ).fetchall()

    expected_prev = ""
    for row in rows:
        rid, ts, etype, fj, preview, action, stored_hash, stored_prev = row
        # Skip rows that predate the Merkle upgrade
        if not stored_hash:
            continue
        # Verify prev_hash link
        if stored_prev != expected_prev:
            return False, rid
        # Verify row_hash
        hash_input = f"{stored_prev}|{ts}|{etype}|{fj}|{preview}|{action}"
        computed = hashlib.sha256(hash_input.encode()).hexdigest()
        if computed != stored_hash:
            return False, rid
        expected_prev = stored_hash

    return True, None
count
count() -> int

Return the total number of logged security events.

Source code in src/openjarvis/security/audit.py
def count(self) -> int:
    """Return the total number of logged security events."""
    row = self._conn.execute("SELECT COUNT(*) FROM security_events").fetchone()
    return row[0] if row else 0
close
close() -> None

Close the SQLite connection.

Source code in src/openjarvis/security/audit.py
def close(self) -> None:
    """Close the SQLite connection."""
    self._conn.close()

GuardrailsEngine

GuardrailsEngine(engine: InferenceEngine, *, scanners: Optional[List[BaseScanner]] = None, mode: RedactionMode = WARN, scan_input: bool = True, scan_output: bool = True, bus: Optional[EventBus] = None)

Bases: InferenceEngine

Wraps an existing InferenceEngine with security scanning.

Not registered in EngineRegistry — instantiated dynamically to wrap any engine at runtime.

PARAMETER DESCRIPTION
engine

The wrapped inference engine.

TYPE: InferenceEngine

scanners

List of scanners to run. Defaults to SecretScanner + PIIScanner.

TYPE: Optional[List[BaseScanner]] DEFAULT: None

mode

Action taken on findings: WARN, REDACT, or BLOCK.

TYPE: RedactionMode DEFAULT: WARN

scan_input

Whether to scan input messages.

TYPE: bool DEFAULT: True

scan_output

Whether to scan output content.

TYPE: bool DEFAULT: True

bus

Optional event bus for publishing security events.

TYPE: Optional[EventBus] DEFAULT: None

Source code in src/openjarvis/security/guardrails.py
def __init__(
    self,
    engine: InferenceEngine,
    *,
    scanners: Optional[List[BaseScanner]] = None,
    mode: RedactionMode = RedactionMode.WARN,
    scan_input: bool = True,
    scan_output: bool = True,
    bus: Optional[EventBus] = None,
) -> None:
    self._engine = engine
    self._scanners: List[BaseScanner] = (
        scanners
        if scanners is not None
        else [
            SecretScanner(),
            PIIScanner(),
        ]
    )
    self._mode = mode
    self._scan_input = scan_input
    self._scan_output = scan_output
    self._bus = bus
Attributes
engine_id property
engine_id: str

Delegate to the wrapped engine.

Functions
generate
generate(messages: Sequence[Message], *, model: str, temperature: float = 0.7, max_tokens: int = 1024, **kwargs: Any) -> Dict[str, Any]

Scan input, call wrapped engine, scan output.

Source code in src/openjarvis/security/guardrails.py
def generate(
    self,
    messages: Sequence[Message],
    *,
    model: str,
    temperature: float = 0.7,
    max_tokens: int = 1024,
    **kwargs: Any,
) -> Dict[str, Any]:
    """Scan input, call wrapped engine, scan output."""
    # Scan input messages
    if self._scan_input:
        processed = list(messages)
        for i, msg in enumerate(processed):
            if msg.content:
                result = self._scan_text(msg.content)
                if not result.clean:
                    processed[i] = Message(
                        role=msg.role,
                        content=self._handle_findings(
                            msg.content,
                            result,
                            "input",
                        ),
                        name=msg.name,
                        tool_calls=msg.tool_calls,
                        tool_call_id=msg.tool_call_id,
                        metadata=msg.metadata,
                    )
        messages = processed

    # Call wrapped engine
    response = self._engine.generate(
        messages,
        model=model,
        temperature=temperature,
        max_tokens=max_tokens,
        **kwargs,
    )

    # Scan output
    if self._scan_output:
        content = response.get("content", "")
        if content:
            result = self._scan_text(content)
            if not result.clean:
                response["content"] = self._handle_findings(
                    content, result, "output"
                )

    return response
stream async
stream(messages: Sequence[Message], *, model: str, temperature: float = 0.7, max_tokens: int = 1024, **kwargs: Any) -> AsyncIterator[str]

Yield tokens in real-time, scan accumulated output post-hoc.

Source code in src/openjarvis/security/guardrails.py
async def stream(
    self,
    messages: Sequence[Message],
    *,
    model: str,
    temperature: float = 0.7,
    max_tokens: int = 1024,
    **kwargs: Any,
) -> AsyncIterator[str]:
    """Yield tokens in real-time, scan accumulated output post-hoc."""
    accumulated = []
    async for token in self._engine.stream(
        messages,
        model=model,
        temperature=temperature,
        max_tokens=max_tokens,
        **kwargs,
    ):
        accumulated.append(token)
        yield token

    # Post-hoc scan of accumulated output for logging only
    if self._scan_output:
        full_output = "".join(accumulated)
        if full_output:
            result = self._scan_text(full_output)
            if not result.clean and self._bus:
                finding_dicts = [
                    {
                        "pattern": f.pattern_name,
                        "threat": f.threat_level.value,
                        "description": f.description,
                    }
                    for f in result.findings
                ]
                self._bus.publish(
                    EventType.SECURITY_ALERT,
                    {
                        "direction": "output",
                        "findings": finding_dicts,
                        "mode": "stream_post_hoc",
                    },
                )
stream_full async
stream_full(messages: Sequence[Message], *, model: str, temperature: float = 0.7, max_tokens: int = 1024, **kwargs: Any) -> AsyncIterator['StreamChunk']

Delegate to wrapped engine, scan accumulated output post-hoc.

Source code in src/openjarvis/security/guardrails.py
async def stream_full(
    self,
    messages: Sequence[Message],
    *,
    model: str,
    temperature: float = 0.7,
    max_tokens: int = 1024,
    **kwargs: Any,
) -> AsyncIterator["StreamChunk"]:
    """Delegate to wrapped engine, scan accumulated output post-hoc."""
    accumulated: list[str] = []
    async for chunk in self._engine.stream_full(
        messages,
        model=model,
        temperature=temperature,
        max_tokens=max_tokens,
        **kwargs,
    ):
        if chunk.content:
            accumulated.append(chunk.content)
        yield chunk

    # Post-hoc scan of accumulated output
    if self._scan_output:
        full_output = "".join(accumulated)
        if full_output:
            result = self._scan_text(full_output)
            if not result.clean and self._bus:
                finding_dicts = [
                    {
                        "pattern": f.pattern_name,
                        "threat": f.threat_level.value,
                        "description": f.description,
                    }
                    for f in result.findings
                ]
                self._bus.publish(
                    EventType.SECURITY_ALERT,
                    {
                        "direction": "output",
                        "findings": finding_dicts,
                        "mode": "stream_full_post_hoc",
                    },
                )
list_models
list_models() -> List[str]

Delegate to wrapped engine.

Source code in src/openjarvis/security/guardrails.py
def list_models(self) -> List[str]:
    """Delegate to wrapped engine."""
    return self._engine.list_models()
health
health() -> bool

Delegate to wrapped engine.

Source code in src/openjarvis/security/guardrails.py
def health(self) -> bool:
    """Delegate to wrapped engine."""
    return self._engine.health()

SecurityBlockError

Bases: Exception

Raised when mode is BLOCK and security findings are detected.

PIIScanner

PIIScanner()

Bases: BaseScanner

Detect personally identifiable information in text.

Source code in src/openjarvis/security/scanner.py
def __init__(self) -> None:
    _rust = get_rust_module()
    self._rust_impl = _rust.PIIScanner()
Functions
scan
scan(text: str) -> ScanResult

Scan text for PII patterns — always via Rust backend.

Source code in src/openjarvis/security/scanner.py
def scan(self, text: str) -> ScanResult:
    """Scan *text* for PII patterns — always via Rust backend."""
    return scan_result_from_json(self._rust_impl.scan(text))
redact
redact(text: str) -> str

Replace PII matches with [REDACTED:{pattern_name}].

Source code in src/openjarvis/security/scanner.py
def redact(self, text: str) -> str:
    """Replace PII matches with ``[REDACTED:{pattern_name}]``."""
    return self._rust_impl.redact(text)

SecretScanner

SecretScanner()

Bases: BaseScanner

Detect API keys, tokens, passwords, and other secrets in text.

Source code in src/openjarvis/security/scanner.py
def __init__(self) -> None:
    _rust = get_rust_module()
    self._rust_impl = _rust.SecretScanner()
Functions
scan
scan(text: str) -> ScanResult

Scan text for secret patterns — always via Rust backend.

Source code in src/openjarvis/security/scanner.py
def scan(self, text: str) -> ScanResult:
    """Scan *text* for secret patterns — always via Rust backend."""
    return scan_result_from_json(self._rust_impl.scan(text))
redact
redact(text: str) -> str

Replace secret matches with [REDACTED:{pattern_name}].

Source code in src/openjarvis/security/scanner.py
def redact(self, text: str) -> str:
    """Replace secret matches with ``[REDACTED:{pattern_name}]``."""
    return self._rust_impl.redact(text)

RedactionMode

Bases: str, Enum

Action mode when findings are detected.

ScanFinding dataclass

ScanFinding(pattern_name: str, matched_text: str, threat_level: ThreatLevel, start: int, end: int, description: str = '')

A single finding from a security scanner.

ScanResult dataclass

ScanResult(findings: List[ScanFinding] = list())

Aggregated result from one or more scanners.

Attributes
clean property
clean: bool

Return True if no findings were detected.

highest_threat property
highest_threat: Optional[ThreatLevel]

Return the highest threat level among findings, or None.

SecurityEvent dataclass

SecurityEvent(event_type: SecurityEventType, timestamp: float, findings: List[ScanFinding] = list(), content_preview: str = '', action_taken: str = '')

A recorded security event for audit logging.

SecurityEventType

Bases: str, Enum

Categories of security events.

ThreatLevel

Bases: str, Enum

Severity classification for security findings.

SecurityContext dataclass

SecurityContext(engine: Any, capability_policy: Any = None, audit_logger: Any = None)

Result of setup_security() — wrapped engine, policy, audit.

Functions

filter_sensitive_paths

filter_sensitive_paths(paths: Iterable[Union[str, Path]]) -> List[Path]

Return only non-sensitive paths from paths.

Source code in src/openjarvis/security/file_policy.py
def filter_sensitive_paths(paths: Iterable[Union[str, Path]]) -> List[Path]:
    """Return only non-sensitive paths from *paths*."""
    return [Path(p) for p in paths if not is_sensitive_file(p)]

is_sensitive_file

is_sensitive_file(path: Union[str, Path]) -> bool

Return True if path matches a sensitive file pattern.

Checks both the filename and the full name against DEFAULT_SENSITIVE_PATTERNS using :func:fnmatch.fnmatch. Uses the Rust implementation when available, falls back to Python.

Source code in src/openjarvis/security/file_policy.py
def is_sensitive_file(path: Union[str, Path]) -> bool:
    """Return ``True`` if *path* matches a sensitive file pattern.

    Checks both the filename and the full name against
    ``DEFAULT_SENSITIVE_PATTERNS`` using :func:`fnmatch.fnmatch`.
    Uses the Rust implementation when available, falls back to Python.
    """
    try:
        from openjarvis._rust_bridge import get_rust_module

        _rust = get_rust_module()
        return _rust.is_sensitive_file(str(path))
    except ImportError:
        return _is_sensitive_file_py(str(path))

check_ssrf

check_ssrf(url: str) -> Optional[str]

Check a URL for SSRF vulnerabilities — always via Rust backend.

Source code in src/openjarvis/security/ssrf.py
def check_ssrf(url: str) -> Optional[str]:
    """Check a URL for SSRF vulnerabilities — always via Rust backend."""
    from openjarvis._rust_bridge import get_rust_module

    _rust = get_rust_module()
    return _rust.check_ssrf(url)

is_private_ip

is_private_ip(ip_str: str) -> bool

Check if an IP address is private/reserved.

Source code in src/openjarvis/security/ssrf.py
def is_private_ip(ip_str: str) -> bool:
    """Check if an IP address is private/reserved."""
    try:
        addr = ipaddress.ip_address(ip_str)
        return any(addr in net for net in _BLOCKED_CIDR)
    except ValueError:
        return False

setup_security

setup_security(config: Any, engine: Any, bus: Optional[EventBus] = None) -> SecurityContext

Apply security guardrails to an engine based on config.

Returns a SecurityContext. No-ops if config.security.enabled is False.

Source code in src/openjarvis/security/__init__.py
def setup_security(
    config: Any,
    engine: Any,
    bus: Optional[EventBus] = None,
) -> SecurityContext:
    """Apply security guardrails to an engine based on config.

    Returns a SecurityContext. No-ops if config.security.enabled is False.
    """
    if not config.security.enabled:
        return SecurityContext(engine=engine)

    # Scanners + engine wrapping
    try:
        scanners: list[BaseScanner] = []
        if config.security.secret_scanner:
            scanners.append(SecretScanner())
        if config.security.pii_scanner:
            scanners.append(PIIScanner())

        if scanners:
            mode = RedactionMode(config.security.mode)
            engine = GuardrailsEngine(
                engine,
                scanners=scanners,
                mode=mode,
                scan_input=config.security.scan_input,
                scan_output=config.security.scan_output,
                bus=bus,
            )
    except Exception as exc:
        logger.debug("Failed to set up security scanners: %s", exc)

    # Capability policy
    cap_policy = None
    if config.security.capabilities.enabled:
        try:
            from openjarvis.security.capabilities import CapabilityPolicy

            cap_policy = CapabilityPolicy(
                policy_path=config.security.capabilities.policy_path or None,
            )
        except Exception as exc:
            logger.debug("Failed to set up capability policy: %s", exc)

    # Audit logger
    audit = None
    try:
        audit = AuditLogger(
            db_path=config.security.audit_log_path,
            bus=bus,
        )
    except Exception as exc:
        logger.debug("Failed to set up audit logger: %s", exc)

    return SecurityContext(
        engine=engine,
        capability_policy=cap_policy,
        audit_logger=audit,
    )