stream_full(messages: Sequence[Message], *, model: str, temperature: float = 0.7, max_tokens: int = 1024, **kwargs: Any) -> AsyncIterator[StreamChunk]
Yield StreamChunks including tool_calls.
Unlike the default stream_full in the base class (which wraps
stream() and drops tools), this posts to /api/chat with
tools from kwargs and parses tool_calls out of the streamed
response. Falls back to a tools-less retry on 400 (mirrors
generate()'s behaviour for models that don't support tools).
Source code in src/openjarvis/engine/ollama.py
| async def stream_full(
self,
messages: Sequence[Message],
*,
model: str,
temperature: float = 0.7,
max_tokens: int = 1024,
**kwargs: Any,
) -> AsyncIterator[StreamChunk]:
"""Yield ``StreamChunk``s including tool_calls.
Unlike the default ``stream_full`` in the base class (which wraps
``stream()`` and drops tools), this posts to ``/api/chat`` with
``tools`` from kwargs and parses tool_calls out of the streamed
response. Falls back to a tools-less retry on 400 (mirrors
``generate()``'s behaviour for models that don't support tools).
"""
msg_dicts = messages_to_dicts(messages)
for md in msg_dicts:
for tc in md.get("tool_calls", []):
fn = tc.get("function", {})
args = fn.get("arguments")
if isinstance(args, str):
try:
fn["arguments"] = json.loads(args)
except (json.JSONDecodeError, TypeError):
pass
payload: Dict[str, Any] = {
"model": model,
"messages": msg_dicts,
"stream": True,
"options": {
"temperature": temperature,
"num_predict": max_tokens,
"num_ctx": kwargs.get("num_ctx", _default_num_ctx()),
},
}
if "think" not in kwargs:
payload["think"] = False
elif kwargs["think"] is not None:
payload["think"] = kwargs["think"]
tools = kwargs.get("tools")
if tools:
payload["tools"] = tools
async for chunk in self._run_stream(
payload, messages, retry_without_tools=bool(tools)
):
yield chunk
|