Skip to content

graph

graph

WorkflowGraph — DAG with validation and topological sort.

Classes

WorkflowGraph

WorkflowGraph(name: str = '')

Directed acyclic graph of workflow nodes.

Supports DAG validation (cycle detection), topological sort, and execution_stages() for parallel-ready ordering.

Source code in src/openjarvis/workflow/graph.py
def __init__(self, name: str = "") -> None:
    self.name = name
    self._nodes: Dict[str, WorkflowNode] = {}
    self._edges: List[WorkflowEdge] = []
    self._adjacency: Dict[str, List[str]] = defaultdict(list)
    self._reverse: Dict[str, List[str]] = defaultdict(list)
Functions
validate
validate() -> Tuple[bool, str]

Validate the graph: check for cycles and orphan nodes.

Source code in src/openjarvis/workflow/graph.py
def validate(self) -> Tuple[bool, str]:
    """Validate the graph: check for cycles and orphan nodes."""
    # Check for cycles using DFS
    visited: Set[str] = set()
    in_stack: Set[str] = set()

    def _dfs(node_id: str) -> bool:
        visited.add(node_id)
        in_stack.add(node_id)
        for neighbor in self._adjacency.get(node_id, []):
            if neighbor in in_stack:
                return True  # cycle detected
            if neighbor not in visited and _dfs(neighbor):
                return True
        in_stack.discard(node_id)
        return False

    for node_id in self._nodes:
        if node_id not in visited:
            if _dfs(node_id):
                return False, f"Cycle detected involving node '{node_id}'"

    return True, ""
topological_sort
topological_sort() -> List[str]

Return node IDs in topological order (Kahn's algorithm).

Source code in src/openjarvis/workflow/graph.py
def topological_sort(self) -> List[str]:
    """Return node IDs in topological order (Kahn's algorithm)."""
    in_degree: Dict[str, int] = {nid: 0 for nid in self._nodes}
    for edge in self._edges:
        in_degree[edge.target] = in_degree.get(edge.target, 0) + 1

    queue = deque(nid for nid, deg in in_degree.items() if deg == 0)
    order: List[str] = []

    while queue:
        node_id = queue.popleft()
        order.append(node_id)
        for neighbor in self._adjacency.get(node_id, []):
            in_degree[neighbor] -= 1
            if in_degree[neighbor] == 0:
                queue.append(neighbor)

    if len(order) != len(self._nodes):
        raise ValueError("Graph contains a cycle; topological sort is impossible")
    return order
execution_stages
execution_stages() -> List[List[str]]

Group nodes into parallel execution stages.

Nodes in the same stage have no dependencies on each other and can be executed concurrently.

Source code in src/openjarvis/workflow/graph.py
def execution_stages(self) -> List[List[str]]:
    """Group nodes into parallel execution stages.

    Nodes in the same stage have no dependencies on each other and
    can be executed concurrently.
    """
    in_degree: Dict[str, int] = {nid: 0 for nid in self._nodes}
    for edge in self._edges:
        in_degree[edge.target] = in_degree.get(edge.target, 0) + 1

    stages: List[List[str]] = []
    ready = [nid for nid, deg in in_degree.items() if deg == 0]

    while ready:
        stages.append(sorted(ready))
        next_ready: List[str] = []
        for node_id in ready:
            for neighbor in self._adjacency.get(node_id, []):
                in_degree[neighbor] -= 1
                if in_degree[neighbor] == 0:
                    next_ready.append(neighbor)
        ready = next_ready

    return stages