Skip to content

guardrails

guardrails

GuardrailsEngine — security-aware inference engine wrapper.

Classes

SecurityBlockError

Bases: Exception

Raised when mode is BLOCK and security findings are detected.

GuardrailsEngine

GuardrailsEngine(engine: InferenceEngine, *, scanners: Optional[List[BaseScanner]] = None, mode: RedactionMode = WARN, scan_input: bool = True, scan_output: bool = True, bus: Optional[EventBus] = None)

Bases: InferenceEngine

Wraps an existing InferenceEngine with security scanning.

Not registered in EngineRegistry — instantiated dynamically to wrap any engine at runtime.

PARAMETER DESCRIPTION
engine

The wrapped inference engine.

TYPE: InferenceEngine

scanners

List of scanners to run. Defaults to SecretScanner + PIIScanner.

TYPE: Optional[List[BaseScanner]] DEFAULT: None

mode

Action taken on findings: WARN, REDACT, or BLOCK.

TYPE: RedactionMode DEFAULT: WARN

scan_input

Whether to scan input messages.

TYPE: bool DEFAULT: True

scan_output

Whether to scan output content.

TYPE: bool DEFAULT: True

bus

Optional event bus for publishing security events.

TYPE: Optional[EventBus] DEFAULT: None

Source code in src/openjarvis/security/guardrails.py
def __init__(
    self,
    engine: InferenceEngine,
    *,
    scanners: Optional[List[BaseScanner]] = None,
    mode: RedactionMode = RedactionMode.WARN,
    scan_input: bool = True,
    scan_output: bool = True,
    bus: Optional[EventBus] = None,
) -> None:
    self._engine = engine
    self._scanners: List[BaseScanner] = scanners if scanners is not None else [
        SecretScanner(),
        PIIScanner(),
    ]
    self._mode = mode
    self._scan_input = scan_input
    self._scan_output = scan_output
    self._bus = bus
Attributes
engine_id property
engine_id: str

Delegate to the wrapped engine.

Functions
generate
generate(messages: Sequence[Message], *, model: str, temperature: float = 0.7, max_tokens: int = 1024, **kwargs: Any) -> Dict[str, Any]

Scan input, call wrapped engine, scan output.

Source code in src/openjarvis/security/guardrails.py
def generate(
    self,
    messages: Sequence[Message],
    *,
    model: str,
    temperature: float = 0.7,
    max_tokens: int = 1024,
    **kwargs: Any,
) -> Dict[str, Any]:
    """Scan input, call wrapped engine, scan output."""
    # Scan input messages
    if self._scan_input:
        processed = list(messages)
        for i, msg in enumerate(processed):
            if msg.content:
                result = self._scan_text(msg.content)
                if not result.clean:
                    processed[i] = Message(
                        role=msg.role,
                        content=self._handle_findings(
                            msg.content, result, "input",
                        ),
                        name=msg.name,
                        tool_calls=msg.tool_calls,
                        tool_call_id=msg.tool_call_id,
                        metadata=msg.metadata,
                    )
        messages = processed

    # Call wrapped engine
    response = self._engine.generate(
        messages, model=model, temperature=temperature,
        max_tokens=max_tokens, **kwargs,
    )

    # Scan output
    if self._scan_output:
        content = response.get("content", "")
        if content:
            result = self._scan_text(content)
            if not result.clean:
                response["content"] = self._handle_findings(
                    content, result, "output"
                )

    return response
stream async
stream(messages: Sequence[Message], *, model: str, temperature: float = 0.7, max_tokens: int = 1024, **kwargs: Any) -> AsyncIterator[str]

Yield tokens in real-time, scan accumulated output post-hoc.

Source code in src/openjarvis/security/guardrails.py
async def stream(
    self,
    messages: Sequence[Message],
    *,
    model: str,
    temperature: float = 0.7,
    max_tokens: int = 1024,
    **kwargs: Any,
) -> AsyncIterator[str]:
    """Yield tokens in real-time, scan accumulated output post-hoc."""
    accumulated = []
    async for token in self._engine.stream(
        messages, model=model, temperature=temperature,
        max_tokens=max_tokens, **kwargs,
    ):
        accumulated.append(token)
        yield token

    # Post-hoc scan of accumulated output for logging only
    if self._scan_output:
        full_output = "".join(accumulated)
        if full_output:
            result = self._scan_text(full_output)
            if not result.clean and self._bus:
                finding_dicts = [
                    {
                        "pattern": f.pattern_name,
                        "threat": f.threat_level.value,
                        "description": f.description,
                    }
                    for f in result.findings
                ]
                self._bus.publish(
                    EventType.SECURITY_ALERT,
                    {
                        "direction": "output",
                        "findings": finding_dicts,
                        "mode": "stream_post_hoc",
                    },
                )
list_models
list_models() -> List[str]

Delegate to wrapped engine.

Source code in src/openjarvis/security/guardrails.py
def list_models(self) -> List[str]:
    """Delegate to wrapped engine."""
    return self._engine.list_models()
health
health() -> bool

Delegate to wrapped engine.

Source code in src/openjarvis/security/guardrails.py
def health(self) -> bool:
    """Delegate to wrapped engine."""
    return self._engine.health()