Directed acyclic graph of workflow nodes.
Supports DAG validation (cycle detection), topological sort,
and execution_stages() for parallel-ready ordering.
Source code in src/openjarvis/workflow/graph.py
| def __init__(self, name: str = "") -> None:
self.name = name
self._nodes: Dict[str, WorkflowNode] = {}
self._edges: List[WorkflowEdge] = []
self._adjacency: Dict[str, List[str]] = defaultdict(list)
self._reverse: Dict[str, List[str]] = defaultdict(list)
|
Functions
validate
validate() -> Tuple[bool, str]
Validate the graph: check for cycles and orphan nodes.
Source code in src/openjarvis/workflow/graph.py
| def validate(self) -> Tuple[bool, str]:
"""Validate the graph: check for cycles and orphan nodes."""
# Check for cycles using DFS
visited: Set[str] = set()
in_stack: Set[str] = set()
def _dfs(node_id: str) -> bool:
visited.add(node_id)
in_stack.add(node_id)
for neighbor in self._adjacency.get(node_id, []):
if neighbor in in_stack:
return True # cycle detected
if neighbor not in visited and _dfs(neighbor):
return True
in_stack.discard(node_id)
return False
for node_id in self._nodes:
if node_id not in visited:
if _dfs(node_id):
return False, f"Cycle detected involving node '{node_id}'"
return True, ""
|
topological_sort
topological_sort() -> List[str]
Return node IDs in topological order (Kahn's algorithm).
Source code in src/openjarvis/workflow/graph.py
| def topological_sort(self) -> List[str]:
"""Return node IDs in topological order (Kahn's algorithm)."""
in_degree: Dict[str, int] = {nid: 0 for nid in self._nodes}
for edge in self._edges:
in_degree[edge.target] = in_degree.get(edge.target, 0) + 1
queue = deque(nid for nid, deg in in_degree.items() if deg == 0)
order: List[str] = []
while queue:
node_id = queue.popleft()
order.append(node_id)
for neighbor in self._adjacency.get(node_id, []):
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
if len(order) != len(self._nodes):
raise ValueError("Graph contains a cycle; topological sort is impossible")
return order
|
execution_stages
execution_stages() -> List[List[str]]
Group nodes into parallel execution stages.
Nodes in the same stage have no dependencies on each other and
can be executed concurrently.
Source code in src/openjarvis/workflow/graph.py
| def execution_stages(self) -> List[List[str]]:
"""Group nodes into parallel execution stages.
Nodes in the same stage have no dependencies on each other and
can be executed concurrently.
"""
in_degree: Dict[str, int] = {nid: 0 for nid in self._nodes}
for edge in self._edges:
in_degree[edge.target] = in_degree.get(edge.target, 0) + 1
stages: List[List[str]] = []
ready = [nid for nid, deg in in_degree.items() if deg == 0]
while ready:
stages.append(sorted(ready))
next_ready: List[str] = []
for node_id in ready:
for neighbor in self._adjacency.get(node_id, []):
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
next_ready.append(neighbor)
ready = next_ready
return stages
|