Skip to content

Index

workflow

Workflow engine — DAG-based multi-agent pipelines.

Classes

WorkflowBuilder

WorkflowBuilder(name: str = '')

Fluent API for building workflow graphs.

Example: wf = (WorkflowBuilder("research_pipeline") .add_agent("researcher", agent="orchestrator", tools=["web_search"]) .add_agent("summarizer", agent="simple") .connect("researcher", "summarizer") .build())

Source code in src/openjarvis/workflow/builder.py
def __init__(self, name: str = "") -> None:
    self._name = name
    self._nodes: List[WorkflowNode] = []
    self._edges: List[WorkflowEdge] = []
Functions
sequential
sequential(*node_ids: str) -> WorkflowBuilder

Connect nodes in sequential order.

Source code in src/openjarvis/workflow/builder.py
def sequential(self, *node_ids: str) -> WorkflowBuilder:
    """Connect nodes in sequential order."""
    for i in range(len(node_ids) - 1):
        self._edges.append(WorkflowEdge(
            source=node_ids[i], target=node_ids[i + 1],
        ))
    return self

WorkflowEngine

WorkflowEngine(*, bus: Optional[EventBus] = None, max_parallel: int = 4, default_node_timeout: int = 300)

Execute DAG-based workflows.

Sequential nodes run in topological order. Parallel-eligible nodes (same execution stage, no inter-dependencies) run via ThreadPoolExecutor. Condition nodes evaluate expressions against prior step outputs. Loop nodes use LoopGuard from Phase 14.3.

Source code in src/openjarvis/workflow/engine.py
def __init__(
    self,
    *,
    bus: Optional[EventBus] = None,
    max_parallel: int = 4,
    default_node_timeout: int = 300,
) -> None:
    self._bus = bus
    self._max_parallel = max_parallel
    self._default_node_timeout = default_node_timeout
Functions
run
run(graph: WorkflowGraph, system: Any = None, *, initial_input: str = '', context: Optional[Dict[str, Any]] = None) -> WorkflowResult

Execute a workflow graph end-to-end.

Source code in src/openjarvis/workflow/engine.py
def run(
    self,
    graph: WorkflowGraph,
    system: Any = None,  # JarvisSystem
    *,
    initial_input: str = "",
    context: Optional[Dict[str, Any]] = None,
) -> WorkflowResult:
    """Execute a workflow graph end-to-end."""
    valid, msg = graph.validate()
    if not valid:
        return WorkflowResult(
            workflow_name=graph.name,
            success=False,
            final_output=f"Invalid workflow: {msg}",
        )

    t0 = time.time()
    if self._bus:
        self._bus.publish(
            EventType.WORKFLOW_START,
            {"workflow": graph.name},
        )

    # State: outputs keyed by node_id
    outputs: Dict[str, str] = {"_input": initial_input}
    ctx = dict(context or {})
    all_steps: List[WorkflowStepResult] = []
    success = True

    stages = graph.execution_stages()
    for stage in stages:
        if len(stage) == 1:
            # Sequential execution
            step = self._execute_node(
                graph.get_node(stage[0]),  # type: ignore[arg-type]
                outputs,
                ctx,
                system,
                graph,
            )
            all_steps.append(step)
            outputs[stage[0]] = step.output
            if not step.success:
                success = False
                break
        else:
            # Parallel execution
            with concurrent.futures.ThreadPoolExecutor(
                max_workers=min(len(stage), self._max_parallel),
            ) as pool:
                futures = {
                    pool.submit(
                        self._execute_node,
                        graph.get_node(nid),
                        dict(outputs),
                        dict(ctx),
                        system,
                        graph,
                    ): nid
                    for nid in stage
                }
                for future in concurrent.futures.as_completed(futures):
                    nid = futures[future]
                    try:
                        step = future.result(
                            timeout=self._default_node_timeout,
                        )
                    except Exception as exc:
                        step = WorkflowStepResult(
                            node_id=nid,
                            success=False,
                            output=f"Node execution error: {exc}",
                        )
                    all_steps.append(step)
                    outputs[nid] = step.output
                    if not step.success:
                        success = False

        if not success:
            break

    total = time.time() - t0
    # Final output is the output of the last executed node
    final_output = all_steps[-1].output if all_steps else ""

    if self._bus:
        self._bus.publish(
            EventType.WORKFLOW_END,
            {"workflow": graph.name, "success": success, "duration": total},
        )

    return WorkflowResult(
        workflow_name=graph.name,
        success=success,
        steps=all_steps,
        final_output=final_output,
        total_duration_seconds=total,
    )

WorkflowGraph

WorkflowGraph(name: str = '')

Directed acyclic graph of workflow nodes.

Supports DAG validation (cycle detection), topological sort, and execution_stages() for parallel-ready ordering.

Source code in src/openjarvis/workflow/graph.py
def __init__(self, name: str = "") -> None:
    self.name = name
    self._nodes: Dict[str, WorkflowNode] = {}
    self._edges: List[WorkflowEdge] = []
    self._adjacency: Dict[str, List[str]] = defaultdict(list)
    self._reverse: Dict[str, List[str]] = defaultdict(list)
Functions
validate
validate() -> Tuple[bool, str]

Validate the graph: check for cycles and orphan nodes.

Source code in src/openjarvis/workflow/graph.py
def validate(self) -> Tuple[bool, str]:
    """Validate the graph: check for cycles and orphan nodes."""
    # Check for cycles using DFS
    visited: Set[str] = set()
    in_stack: Set[str] = set()

    def _dfs(node_id: str) -> bool:
        visited.add(node_id)
        in_stack.add(node_id)
        for neighbor in self._adjacency.get(node_id, []):
            if neighbor in in_stack:
                return True  # cycle detected
            if neighbor not in visited and _dfs(neighbor):
                return True
        in_stack.discard(node_id)
        return False

    for node_id in self._nodes:
        if node_id not in visited:
            if _dfs(node_id):
                return False, f"Cycle detected involving node '{node_id}'"

    return True, ""
topological_sort
topological_sort() -> List[str]

Return node IDs in topological order (Kahn's algorithm).

Source code in src/openjarvis/workflow/graph.py
def topological_sort(self) -> List[str]:
    """Return node IDs in topological order (Kahn's algorithm)."""
    in_degree: Dict[str, int] = {nid: 0 for nid in self._nodes}
    for edge in self._edges:
        in_degree[edge.target] = in_degree.get(edge.target, 0) + 1

    queue = deque(nid for nid, deg in in_degree.items() if deg == 0)
    order: List[str] = []

    while queue:
        node_id = queue.popleft()
        order.append(node_id)
        for neighbor in self._adjacency.get(node_id, []):
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)

    if len(order) != len(self._nodes):
        raise ValueError("Graph contains a cycle; topological sort is impossible")
    return order
execution_stages
execution_stages() -> List[List[str]]

Group nodes into parallel execution stages.

Nodes in the same stage have no dependencies on each other and can be executed concurrently.

Source code in src/openjarvis/workflow/graph.py
def execution_stages(self) -> List[List[str]]:
    """Group nodes into parallel execution stages.

    Nodes in the same stage have no dependencies on each other and
    can be executed concurrently.
    """
    in_degree: Dict[str, int] = {nid: 0 for nid in self._nodes}
    for edge in self._edges:
        in_degree[edge.target] = in_degree.get(edge.target, 0) + 1

    stages: List[List[str]] = []
    ready = [nid for nid, deg in in_degree.items() if deg == 0]

    while ready:
        stages.append(sorted(ready))
        next_ready: List[str] = []
        for node_id in ready:
            for neighbor in self._adjacency.get(node_id, []):
                in_degree[neighbor] -= 1
                if in_degree[neighbor] == 0:
                    next_ready.append(neighbor)
        ready = next_ready

    return stages

Functions

load_workflow

load_workflow(path: str | Path) -> WorkflowGraph

Load a workflow definition from a TOML file.

Expected format:

[workflow]
name = "my_workflow"

[[workflow.nodes]]
id = "researcher"
type = "agent"
agent = "orchestrator"
tools = ["web_search"]

[[workflow.nodes]]
id = "summarizer"
type = "agent"
agent = "simple"

[[workflow.edges]]
source = "researcher"
target = "summarizer"

Source code in src/openjarvis/workflow/loader.py
def load_workflow(path: str | Path) -> WorkflowGraph:
    """Load a workflow definition from a TOML file.

    Expected format:
    ```toml
    [workflow]
    name = "my_workflow"

    [[workflow.nodes]]
    id = "researcher"
    type = "agent"
    agent = "orchestrator"
    tools = ["web_search"]

    [[workflow.nodes]]
    id = "summarizer"
    type = "agent"
    agent = "simple"

    [[workflow.edges]]
    source = "researcher"
    target = "summarizer"
    ```
    """
    path = Path(path)
    with open(path, "rb") as fh:
        data = tomllib.load(fh)

    wf_data = data.get("workflow", {})
    name = wf_data.get("name", path.stem)

    graph = WorkflowGraph(name=name)

    for node_data in wf_data.get("nodes", []):
        node = _parse_node(node_data)
        graph.add_node(node)

    for edge_data in wf_data.get("edges", []):
        edge = WorkflowEdge(
            source=edge_data["source"],
            target=edge_data["target"],
            condition=edge_data.get("condition", ""),
        )
        graph.add_edge(edge)

    valid, msg = graph.validate()
    if not valid:
        raise ValueError(f"Invalid workflow in {path}: {msg}")

    return graph