Skip to content

ollama

ollama

Ollama inference engine backend.

Classes

OllamaEngine

OllamaEngine(host: str | None = None, *, timeout: float = 1800.0)

Bases: InferenceEngine

Ollama backend via its native HTTP API.

Source code in src/openjarvis/engine/ollama.py
def __init__(
    self,
    host: str | None = None,
    *,
    timeout: float = 1800.0,
) -> None:
    # Priority: explicit host (from config.toml) > OLLAMA_HOST env var > default
    if host is None:
        env_host = os.environ.get("OLLAMA_HOST")
        host = env_host or self._DEFAULT_HOST
    self._host = host.rstrip("/")
    self._client = httpx.Client(base_url=self._host, timeout=timeout)
    # Last stream usage — captured from Ollama's final chunk
    self._last_stream_usage: Dict[str, int] = {}
Functions
stream_full async
stream_full(messages: Sequence[Message], *, model: str, temperature: float = 0.7, max_tokens: int = 1024, **kwargs: Any) -> AsyncIterator[StreamChunk]

Yield StreamChunks including tool_calls.

Unlike the default stream_full in the base class (which wraps stream() and drops tools), this posts to /api/chat with tools from kwargs and parses tool_calls out of the streamed response. Falls back to a tools-less retry on 400 (mirrors generate()'s behaviour for models that don't support tools).

Source code in src/openjarvis/engine/ollama.py
async def stream_full(
    self,
    messages: Sequence[Message],
    *,
    model: str,
    temperature: float = 0.7,
    max_tokens: int = 1024,
    **kwargs: Any,
) -> AsyncIterator[StreamChunk]:
    """Yield ``StreamChunk``s including tool_calls.

    Unlike the default ``stream_full`` in the base class (which wraps
    ``stream()`` and drops tools), this posts to ``/api/chat`` with
    ``tools`` from kwargs and parses tool_calls out of the streamed
    response. Falls back to a tools-less retry on 400 (mirrors
    ``generate()``'s behaviour for models that don't support tools).
    """
    msg_dicts = messages_to_dicts(messages)
    for md in msg_dicts:
        for tc in md.get("tool_calls", []):
            fn = tc.get("function", {})
            args = fn.get("arguments")
            if isinstance(args, str):
                try:
                    fn["arguments"] = json.loads(args)
                except (json.JSONDecodeError, TypeError):
                    pass

    payload: Dict[str, Any] = {
        "model": model,
        "messages": msg_dicts,
        "stream": True,
        "options": {
            "temperature": temperature,
            "num_predict": max_tokens,
            "num_ctx": kwargs.get("num_ctx", 8192),
        },
    }
    if "think" not in kwargs:
        payload["think"] = False
    elif kwargs["think"] is not None:
        payload["think"] = kwargs["think"]

    tools = kwargs.get("tools")
    if tools:
        payload["tools"] = tools

    async for chunk in self._run_stream(
        payload, messages, retry_without_tools=bool(tools)
    ):
        yield chunk

Functions