Skip to content

engine

engine

WorkflowEngine — executes a WorkflowGraph against a JarvisSystem.

Classes

WorkflowEngine

WorkflowEngine(*, bus: Optional[EventBus] = None, max_parallel: int = 4, default_node_timeout: int = 300)

Execute DAG-based workflows.

Sequential nodes run in topological order. Parallel-eligible nodes (same execution stage, no inter-dependencies) run via ThreadPoolExecutor. Condition nodes evaluate expressions against prior step outputs. Loop nodes use LoopGuard from Phase 14.3.

Source code in src/openjarvis/workflow/engine.py
def __init__(
    self,
    *,
    bus: Optional[EventBus] = None,
    max_parallel: int = 4,
    default_node_timeout: int = 300,
) -> None:
    self._bus = bus
    self._max_parallel = max_parallel
    self._default_node_timeout = default_node_timeout
Functions
run
run(graph: WorkflowGraph, system: Any = None, *, initial_input: str = '', context: Optional[Dict[str, Any]] = None) -> WorkflowResult

Execute a workflow graph end-to-end.

Source code in src/openjarvis/workflow/engine.py
def run(
    self,
    graph: WorkflowGraph,
    system: Any = None,  # JarvisSystem
    *,
    initial_input: str = "",
    context: Optional[Dict[str, Any]] = None,
) -> WorkflowResult:
    """Execute a workflow graph end-to-end."""
    valid, msg = graph.validate()
    if not valid:
        return WorkflowResult(
            workflow_name=graph.name,
            success=False,
            final_output=f"Invalid workflow: {msg}",
        )

    t0 = time.time()
    if self._bus:
        self._bus.publish(
            EventType.WORKFLOW_START,
            {"workflow": graph.name},
        )

    # State: outputs keyed by node_id
    outputs: Dict[str, str] = {"_input": initial_input}
    ctx = dict(context or {})
    all_steps: List[WorkflowStepResult] = []
    success = True

    stages = graph.execution_stages()
    for stage in stages:
        if len(stage) == 1:
            # Sequential execution
            step = self._execute_node(
                graph.get_node(stage[0]),  # type: ignore[arg-type]
                outputs,
                ctx,
                system,
                graph,
            )
            all_steps.append(step)
            outputs[stage[0]] = step.output
            if not step.success:
                success = False
                break
        else:
            # Parallel execution
            with concurrent.futures.ThreadPoolExecutor(
                max_workers=min(len(stage), self._max_parallel),
            ) as pool:
                futures = {
                    pool.submit(
                        self._execute_node,
                        graph.get_node(nid),
                        dict(outputs),
                        dict(ctx),
                        system,
                        graph,
                    ): nid
                    for nid in stage
                }
                for future in concurrent.futures.as_completed(futures):
                    nid = futures[future]
                    try:
                        step = future.result(
                            timeout=self._default_node_timeout,
                        )
                    except Exception as exc:
                        step = WorkflowStepResult(
                            node_id=nid,
                            success=False,
                            output=f"Node execution error: {exc}",
                        )
                    all_steps.append(step)
                    outputs[nid] = step.output
                    if not step.success:
                        success = False

        if not success:
            break

    total = time.time() - t0
    # Final output is the output of the last executed node
    final_output = all_steps[-1].output if all_steps else ""

    if self._bus:
        self._bus.publish(
            EventType.WORKFLOW_END,
            {"workflow": graph.name, "success": success, "duration": total},
        )

    return WorkflowResult(
        workflow_name=graph.name,
        success=success,
        steps=all_steps,
        final_output=final_output,
        total_duration_seconds=total,
    )