Skip to content

Index

security

Security guardrails — scanners, engine wrapper, audit, SSRF.

Classes

BaseScanner

Bases: ABC

Base class for all security scanners.

Subclasses implement pattern-based scanning for secrets, PII, or other sensitive content.

Functions
scan abstractmethod
scan(text: str) -> ScanResult

Scan text and return findings.

Source code in src/openjarvis/security/_stubs.py
@abstractmethod
def scan(self, text: str) -> ScanResult:
    """Scan *text* and return findings."""
redact abstractmethod
redact(text: str) -> str

Return text with sensitive matches replaced by redaction markers.

Source code in src/openjarvis/security/_stubs.py
@abstractmethod
def redact(self, text: str) -> str:
    """Return *text* with sensitive matches replaced by redaction markers."""

AuditLogger

AuditLogger(db_path: Union[str, Path] = DEFAULT_CONFIG_DIR / 'audit.db', bus: Optional[EventBus] = None)

Append-only SQLite audit log for security events.

PARAMETER DESCRIPTION
db_path

Path to the SQLite database file.

TYPE: Union[str, Path] DEFAULT: DEFAULT_CONFIG_DIR / 'audit.db'

bus

Optional event bus — if provided, subscribes to security events (SECURITY_SCAN, SECURITY_ALERT, SECURITY_BLOCK).

TYPE: Optional[EventBus] DEFAULT: None

Source code in src/openjarvis/security/audit.py
def __init__(
    self,
    db_path: Union[str, Path] = DEFAULT_CONFIG_DIR / "audit.db",
    bus: Optional[EventBus] = None,
) -> None:
    self._db_path = Path(db_path)
    self._db_path.parent.mkdir(parents=True, exist_ok=True)
    self._conn = sqlite3.connect(str(self._db_path))
    self._conn.execute(
        """
        CREATE TABLE IF NOT EXISTS security_events (
            id          INTEGER PRIMARY KEY,
            timestamp   REAL,
            event_type  TEXT,
            findings_json TEXT,
            content_preview TEXT,
            action_taken TEXT,
            row_hash    TEXT DEFAULT '',
            prev_hash   TEXT DEFAULT ''
        )
        """
    )
    self._conn.commit()
    self._migrate_schema()

    if bus is not None:
        bus.subscribe(EventType.SECURITY_SCAN, self._on_event)
        bus.subscribe(EventType.SECURITY_ALERT, self._on_event)
        bus.subscribe(EventType.SECURITY_BLOCK, self._on_event)
Functions
log
log(event: SecurityEvent) -> None

Insert a security event into the audit log with Merkle hash chain.

Source code in src/openjarvis/security/audit.py
def log(self, event: SecurityEvent) -> None:
    """Insert a security event into the audit log with Merkle hash chain."""
    findings_json = json.dumps([
        {
            "pattern_name": f.pattern_name,
            "matched_text": f.matched_text,
            "threat_level": f.threat_level.value,
            "start": f.start,
            "end": f.end,
            "description": f.description,
        }
        for f in event.findings
    ])

    # Compute hash chain
    prev_hash = self.tail_hash()
    hash_input = (
        f"{prev_hash}|{event.timestamp}|{event.event_type.value}"
        f"|{findings_json}|{event.content_preview}|{event.action_taken}"
    )
    row_hash = hashlib.sha256(hash_input.encode()).hexdigest()

    self._conn.execute(
        """
        INSERT INTO security_events
            (timestamp, event_type, findings_json, content_preview,
             action_taken, row_hash, prev_hash)
        VALUES (?, ?, ?, ?, ?, ?, ?)
        """,
        (
            event.timestamp,
            event.event_type.value,
            findings_json,
            event.content_preview,
            event.action_taken,
            row_hash,
            prev_hash,
        ),
    )
    self._conn.commit()
query
query(*, event_type: Optional[str] = None, since: Optional[float] = None, limit: int = 100) -> List[SecurityEvent]

Query logged security events with optional filters.

Source code in src/openjarvis/security/audit.py
def query(
    self,
    *,
    event_type: Optional[str] = None,
    since: Optional[float] = None,
    limit: int = 100,
) -> List[SecurityEvent]:
    """Query logged security events with optional filters."""
    sql = (
        "SELECT timestamp, event_type, findings_json,"
        " content_preview, action_taken"
        " FROM security_events WHERE 1=1"
    )
    params: list = []

    if event_type is not None:
        sql += " AND event_type = ?"
        params.append(event_type)
    if since is not None:
        sql += " AND timestamp >= ?"
        params.append(since)

    sql += " ORDER BY timestamp DESC LIMIT ?"
    params.append(limit)

    rows = self._conn.execute(sql, params).fetchall()
    events: List[SecurityEvent] = []
    for row in rows:
        ts, etype, findings_json, preview, action = row
        findings_raw = json.loads(findings_json) if findings_json else []
        findings = [
            ScanFinding(
                pattern_name=f["pattern_name"],
                matched_text=f["matched_text"],
                threat_level=ThreatLevel(f["threat_level"]),
                start=f["start"],
                end=f["end"],
                description=f.get("description", ""),
            )
            for f in findings_raw
        ]
        events.append(
            SecurityEvent(
                event_type=SecurityEventType(etype),
                timestamp=ts,
                findings=findings,
                content_preview=preview or "",
                action_taken=action or "",
            )
        )
    return events
tail_hash
tail_hash() -> str

Return the hash of the last row in the chain, or empty string.

Source code in src/openjarvis/security/audit.py
def tail_hash(self) -> str:
    """Return the hash of the last row in the chain, or empty string."""
    row = self._conn.execute(
        "SELECT row_hash FROM security_events ORDER BY id DESC LIMIT 1"
    ).fetchone()
    return row[0] if row and row[0] else ""
verify_chain
verify_chain() -> Tuple[bool, Optional[int]]

Verify the Merkle hash chain integrity.

RETURNS DESCRIPTION
tuple

(True, None) if the chain is valid, or (False, row_id) where row_id is the first broken link.

Source code in src/openjarvis/security/audit.py
def verify_chain(self) -> Tuple[bool, Optional[int]]:
    """Verify the Merkle hash chain integrity.

    Returns
    -------
    tuple
        ``(True, None)`` if the chain is valid, or
        ``(False, row_id)`` where *row_id* is the first broken link.
    """
    rows = self._conn.execute(
        "SELECT id, timestamp, event_type, findings_json,"
        " content_preview, action_taken, row_hash, prev_hash"
        " FROM security_events ORDER BY id"
    ).fetchall()

    expected_prev = ""
    for row in rows:
        rid, ts, etype, fj, preview, action, stored_hash, stored_prev = row
        # Skip rows that predate the Merkle upgrade
        if not stored_hash:
            continue
        # Verify prev_hash link
        if stored_prev != expected_prev:
            return False, rid
        # Verify row_hash
        hash_input = (
            f"{stored_prev}|{ts}|{etype}"
            f"|{fj}|{preview}|{action}"
        )
        computed = hashlib.sha256(hash_input.encode()).hexdigest()
        if computed != stored_hash:
            return False, rid
        expected_prev = stored_hash

    return True, None
count
count() -> int

Return the total number of logged security events.

Source code in src/openjarvis/security/audit.py
def count(self) -> int:
    """Return the total number of logged security events."""
    row = self._conn.execute(
        "SELECT COUNT(*) FROM security_events"
    ).fetchone()
    return row[0] if row else 0
close
close() -> None

Close the SQLite connection.

Source code in src/openjarvis/security/audit.py
def close(self) -> None:
    """Close the SQLite connection."""
    self._conn.close()

GuardrailsEngine

GuardrailsEngine(engine: InferenceEngine, *, scanners: Optional[List[BaseScanner]] = None, mode: RedactionMode = WARN, scan_input: bool = True, scan_output: bool = True, bus: Optional[EventBus] = None)

Bases: InferenceEngine

Wraps an existing InferenceEngine with security scanning.

Not registered in EngineRegistry — instantiated dynamically to wrap any engine at runtime.

PARAMETER DESCRIPTION
engine

The wrapped inference engine.

TYPE: InferenceEngine

scanners

List of scanners to run. Defaults to SecretScanner + PIIScanner.

TYPE: Optional[List[BaseScanner]] DEFAULT: None

mode

Action taken on findings: WARN, REDACT, or BLOCK.

TYPE: RedactionMode DEFAULT: WARN

scan_input

Whether to scan input messages.

TYPE: bool DEFAULT: True

scan_output

Whether to scan output content.

TYPE: bool DEFAULT: True

bus

Optional event bus for publishing security events.

TYPE: Optional[EventBus] DEFAULT: None

Source code in src/openjarvis/security/guardrails.py
def __init__(
    self,
    engine: InferenceEngine,
    *,
    scanners: Optional[List[BaseScanner]] = None,
    mode: RedactionMode = RedactionMode.WARN,
    scan_input: bool = True,
    scan_output: bool = True,
    bus: Optional[EventBus] = None,
) -> None:
    self._engine = engine
    self._scanners: List[BaseScanner] = scanners if scanners is not None else [
        SecretScanner(),
        PIIScanner(),
    ]
    self._mode = mode
    self._scan_input = scan_input
    self._scan_output = scan_output
    self._bus = bus
Attributes
engine_id property
engine_id: str

Delegate to the wrapped engine.

Functions
generate
generate(messages: Sequence[Message], *, model: str, temperature: float = 0.7, max_tokens: int = 1024, **kwargs: Any) -> Dict[str, Any]

Scan input, call wrapped engine, scan output.

Source code in src/openjarvis/security/guardrails.py
def generate(
    self,
    messages: Sequence[Message],
    *,
    model: str,
    temperature: float = 0.7,
    max_tokens: int = 1024,
    **kwargs: Any,
) -> Dict[str, Any]:
    """Scan input, call wrapped engine, scan output."""
    # Scan input messages
    if self._scan_input:
        processed = list(messages)
        for i, msg in enumerate(processed):
            if msg.content:
                result = self._scan_text(msg.content)
                if not result.clean:
                    processed[i] = Message(
                        role=msg.role,
                        content=self._handle_findings(
                            msg.content, result, "input",
                        ),
                        name=msg.name,
                        tool_calls=msg.tool_calls,
                        tool_call_id=msg.tool_call_id,
                        metadata=msg.metadata,
                    )
        messages = processed

    # Call wrapped engine
    response = self._engine.generate(
        messages, model=model, temperature=temperature,
        max_tokens=max_tokens, **kwargs,
    )

    # Scan output
    if self._scan_output:
        content = response.get("content", "")
        if content:
            result = self._scan_text(content)
            if not result.clean:
                response["content"] = self._handle_findings(
                    content, result, "output"
                )

    return response
stream async
stream(messages: Sequence[Message], *, model: str, temperature: float = 0.7, max_tokens: int = 1024, **kwargs: Any) -> AsyncIterator[str]

Yield tokens in real-time, scan accumulated output post-hoc.

Source code in src/openjarvis/security/guardrails.py
async def stream(
    self,
    messages: Sequence[Message],
    *,
    model: str,
    temperature: float = 0.7,
    max_tokens: int = 1024,
    **kwargs: Any,
) -> AsyncIterator[str]:
    """Yield tokens in real-time, scan accumulated output post-hoc."""
    accumulated = []
    async for token in self._engine.stream(
        messages, model=model, temperature=temperature,
        max_tokens=max_tokens, **kwargs,
    ):
        accumulated.append(token)
        yield token

    # Post-hoc scan of accumulated output for logging only
    if self._scan_output:
        full_output = "".join(accumulated)
        if full_output:
            result = self._scan_text(full_output)
            if not result.clean and self._bus:
                finding_dicts = [
                    {
                        "pattern": f.pattern_name,
                        "threat": f.threat_level.value,
                        "description": f.description,
                    }
                    for f in result.findings
                ]
                self._bus.publish(
                    EventType.SECURITY_ALERT,
                    {
                        "direction": "output",
                        "findings": finding_dicts,
                        "mode": "stream_post_hoc",
                    },
                )
list_models
list_models() -> List[str]

Delegate to wrapped engine.

Source code in src/openjarvis/security/guardrails.py
def list_models(self) -> List[str]:
    """Delegate to wrapped engine."""
    return self._engine.list_models()
health
health() -> bool

Delegate to wrapped engine.

Source code in src/openjarvis/security/guardrails.py
def health(self) -> bool:
    """Delegate to wrapped engine."""
    return self._engine.health()

SecurityBlockError

Bases: Exception

Raised when mode is BLOCK and security findings are detected.

PIIScanner

PIIScanner()

Bases: BaseScanner

Detect personally identifiable information in text.

Source code in src/openjarvis/security/scanner.py
def __init__(self) -> None:
    _rust = get_rust_module()
    self._rust_impl = _rust.PIIScanner()
Functions
scan
scan(text: str) -> ScanResult

Scan text for PII patterns — always via Rust backend.

Source code in src/openjarvis/security/scanner.py
def scan(self, text: str) -> ScanResult:
    """Scan *text* for PII patterns — always via Rust backend."""
    return scan_result_from_json(self._rust_impl.scan(text))
redact
redact(text: str) -> str

Replace PII matches with [REDACTED:{pattern_name}].

Source code in src/openjarvis/security/scanner.py
def redact(self, text: str) -> str:
    """Replace PII matches with ``[REDACTED:{pattern_name}]``."""
    return self._rust_impl.redact(text)

SecretScanner

SecretScanner()

Bases: BaseScanner

Detect API keys, tokens, passwords, and other secrets in text.

Source code in src/openjarvis/security/scanner.py
def __init__(self) -> None:
    _rust = get_rust_module()
    self._rust_impl = _rust.SecretScanner()
Functions
scan
scan(text: str) -> ScanResult

Scan text for secret patterns — always via Rust backend.

Source code in src/openjarvis/security/scanner.py
def scan(self, text: str) -> ScanResult:
    """Scan *text* for secret patterns — always via Rust backend."""
    return scan_result_from_json(self._rust_impl.scan(text))
redact
redact(text: str) -> str

Replace secret matches with [REDACTED:{pattern_name}].

Source code in src/openjarvis/security/scanner.py
def redact(self, text: str) -> str:
    """Replace secret matches with ``[REDACTED:{pattern_name}]``."""
    return self._rust_impl.redact(text)

RedactionMode

Bases: str, Enum

Action mode when findings are detected.

ScanFinding dataclass

ScanFinding(pattern_name: str, matched_text: str, threat_level: ThreatLevel, start: int, end: int, description: str = '')

A single finding from a security scanner.

ScanResult dataclass

ScanResult(findings: List[ScanFinding] = list())

Aggregated result from one or more scanners.

Attributes
clean property
clean: bool

Return True if no findings were detected.

highest_threat property
highest_threat: Optional[ThreatLevel]

Return the highest threat level among findings, or None.

SecurityEvent dataclass

SecurityEvent(event_type: SecurityEventType, timestamp: float, findings: List[ScanFinding] = list(), content_preview: str = '', action_taken: str = '')

A recorded security event for audit logging.

SecurityEventType

Bases: str, Enum

Categories of security events.

ThreatLevel

Bases: str, Enum

Severity classification for security findings.

Functions

filter_sensitive_paths

filter_sensitive_paths(paths: Iterable[Union[str, Path]]) -> List[Path]

Return only non-sensitive paths from paths.

Source code in src/openjarvis/security/file_policy.py
def filter_sensitive_paths(paths: Iterable[Union[str, Path]]) -> List[Path]:
    """Return only non-sensitive paths from *paths*."""
    return [Path(p) for p in paths if not is_sensitive_file(p)]

is_sensitive_file

is_sensitive_file(path: Union[str, Path]) -> bool

Return True if path matches a sensitive file pattern.

Checks both the filename and the full name against DEFAULT_SENSITIVE_PATTERNS using :func:fnmatch.fnmatch.

Source code in src/openjarvis/security/file_policy.py
def is_sensitive_file(path: Union[str, Path]) -> bool:
    """Return ``True`` if *path* matches a sensitive file pattern.

    Checks both the filename and the full name against
    ``DEFAULT_SENSITIVE_PATTERNS`` using :func:`fnmatch.fnmatch`.
    """
    from openjarvis._rust_bridge import get_rust_module

    _rust = get_rust_module()
    return _rust.is_sensitive_file(str(path))

check_ssrf

check_ssrf(url: str) -> Optional[str]

Check a URL for SSRF vulnerabilities — always via Rust backend.

Source code in src/openjarvis/security/ssrf.py
def check_ssrf(url: str) -> Optional[str]:
    """Check a URL for SSRF vulnerabilities — always via Rust backend."""
    from openjarvis._rust_bridge import get_rust_module

    _rust = get_rust_module()
    return _rust.check_ssrf(url)

is_private_ip

is_private_ip(ip_str: str) -> bool

Check if an IP address is private/reserved.

Source code in src/openjarvis/security/ssrf.py
def is_private_ip(ip_str: str) -> bool:
    """Check if an IP address is private/reserved."""
    try:
        addr = ipaddress.ip_address(ip_str)
        return any(addr in net for net in _BLOCKED_CIDR)
    except ValueError:
        return False