def create_agent_manager_router(
manager: AgentManager,
) -> Tuple[APIRouter, APIRouter, APIRouter, APIRouter]:
"""Create FastAPI routers with agent management endpoints.
Returns a 4-tuple: (agents_router, templates_router, global_router, tools_router).
"""
agents_router = APIRouter(prefix="/v1/managed-agents", tags=["managed-agents"])
templates_router = APIRouter(prefix="/v1/templates", tags=["templates"])
# ── Agent lifecycle ──────────────────────────────────────
@agents_router.get("")
async def list_agents():
return {"agents": manager.list_agents()}
@agents_router.post("")
async def create_agent(req: CreateAgentRequest, request: Request):
if req.template_id:
agent = manager.create_from_template(
req.template_id, req.name, overrides=req.config
)
else:
agent = manager.create_agent(
name=req.name, agent_type=req.agent_type, config=req.config
)
# Register with scheduler if cron/interval
scheduler = getattr(request.app.state, "agent_scheduler", None)
sched_type = (req.config or {}).get("schedule_type", "manual")
if scheduler and sched_type in ("cron", "interval"):
scheduler.register_agent(agent["id"])
return agent
@agents_router.get("/{agent_id}")
async def get_agent(agent_id: str):
agent = manager.get_agent(agent_id)
if not agent:
raise HTTPException(status_code=404, detail="Agent not found")
return agent
@agents_router.patch("/{agent_id}")
async def update_agent(agent_id: str, req: UpdateAgentRequest):
if not manager.get_agent(agent_id):
raise HTTPException(status_code=404, detail="Agent not found")
kwargs: Dict[str, Any] = {}
if req.name is not None:
kwargs["name"] = req.name
if req.agent_type is not None:
kwargs["agent_type"] = req.agent_type
if req.config is not None:
kwargs["config"] = req.config
return manager.update_agent(agent_id, **kwargs)
@agents_router.delete("/{agent_id}")
async def delete_agent(agent_id: str):
if not manager.get_agent(agent_id):
raise HTTPException(status_code=404, detail="Agent not found")
manager.delete_agent(agent_id)
return {"status": "archived"}
@agents_router.post("/{agent_id}/pause")
async def pause_agent(agent_id: str):
if not manager.get_agent(agent_id):
raise HTTPException(status_code=404, detail="Agent not found")
manager.pause_agent(agent_id)
return {"status": "paused"}
@agents_router.post("/{agent_id}/resume")
async def resume_agent(agent_id: str):
if not manager.get_agent(agent_id):
raise HTTPException(status_code=404, detail="Agent not found")
manager.resume_agent(agent_id)
return {"status": "idle"}
@agents_router.post("/{agent_id}/run")
async def run_agent(agent_id: str, request: Request):
import threading
agent = manager.get_agent(agent_id)
if not agent:
raise HTTPException(status_code=404, detail="Agent not found")
if agent["status"] == "archived":
raise HTTPException(status_code=400, detail="Agent is archived")
# Auto-recover from error/needs_attention state
if agent["status"] in ("error", "needs_attention"):
manager.update_agent(agent_id, status="idle")
# Acquire tick BEFORE spawning thread — prevents race
try:
manager.start_tick(agent_id)
except ValueError:
raise HTTPException(status_code=409, detail="Agent is already running")
# Re-use the server's engine + model so we don't pick a
# random model from Ollama's list.
server_engine = getattr(request.app.state, "engine", None)
server_model = getattr(request.app.state, "model", "")
server_config = getattr(request.app.state, "config", None)
def _run_tick():
try:
from openjarvis.agents.executor import AgentExecutor
from openjarvis.core.events import get_event_bus
executor = AgentExecutor(
manager=manager,
event_bus=get_event_bus(),
)
system = _make_lightweight_system(
server_engine,
server_model,
server_config,
)
executor.set_system(system)
executor.execute_tick(agent_id)
except Exception as exc:
logger.error(
"Run-tick failed for agent %s: %s",
agent_id,
exc,
exc_info=True,
)
try:
manager.end_tick(agent_id)
except Exception:
pass
manager.update_agent(agent_id, status="error")
manager.update_summary_memory(
agent_id,
f"ERROR: {exc}",
)
threading.Thread(target=_run_tick, daemon=True).start()
return {"status": "running", "agent_id": agent_id}
# ── Recover ──────────────────────────────────────────────
@agents_router.post("/{agent_id}/recover")
def recover_agent(agent_id: str):
if not manager.get_agent(agent_id):
raise HTTPException(status_code=404, detail="Agent not found")
checkpoint = manager.recover_agent(agent_id)
return {"recovered": True, "checkpoint": checkpoint}
# ── Tasks ────────────────────────────────────────────────
@agents_router.get("/{agent_id}/tasks")
async def list_tasks(agent_id: str, status: Optional[str] = None):
return {"tasks": manager.list_tasks(agent_id, status=status)}
@agents_router.post("/{agent_id}/tasks")
async def create_task(agent_id: str, req: CreateTaskRequest):
if not manager.get_agent(agent_id):
raise HTTPException(status_code=404, detail="Agent not found")
return manager.create_task(agent_id, description=req.description)
@agents_router.get("/{agent_id}/tasks/{task_id}")
async def get_task(agent_id: str, task_id: str):
task = manager._get_task(task_id)
if not task:
raise HTTPException(status_code=404, detail="Task not found")
return task
@agents_router.patch("/{agent_id}/tasks/{task_id}")
async def update_task(agent_id: str, task_id: str, req: UpdateTaskRequest):
kwargs: Dict[str, Any] = {}
if req.description is not None:
kwargs["description"] = req.description
if req.status is not None:
kwargs["status"] = req.status
if req.progress is not None:
kwargs["progress"] = req.progress
if req.findings is not None:
kwargs["findings"] = req.findings
return manager.update_task(task_id, **kwargs)
@agents_router.delete("/{agent_id}/tasks/{task_id}")
async def delete_task(agent_id: str, task_id: str):
manager.delete_task(task_id)
return {"status": "deleted"}
# ── Channel bindings ─────────────────────────────────────
@agents_router.get("/{agent_id}/channels")
async def list_channels(agent_id: str):
return {"bindings": manager.list_channel_bindings(agent_id)}
@agents_router.post("/{agent_id}/channels")
async def bind_channel(
agent_id: str, req: BindChannelRequest, request: Request,
):
if not manager.get_agent(agent_id):
raise HTTPException(status_code=404, detail="Agent not found")
binding = manager.bind_channel(
agent_id,
channel_type=req.channel_type,
config=req.config,
routing_mode=req.routing_mode,
)
# Start iMessage daemon if binding iMessage
if req.channel_type == "imessage":
identifier = (req.config or {}).get("identifier", "")
if identifier:
try:
from openjarvis.channels.imessage_daemon import (
is_running,
run_daemon,
)
if not is_running():
import threading
engine = getattr(request.app.state, "engine", None)
if engine:
from openjarvis.server.agent_manager_routes import (
_build_deep_research_tools,
)
tools = _build_deep_research_tools(engine=engine, model="")
if tools:
from openjarvis.agents.deep_research import (
DeepResearchAgent,
)
agent_inst = DeepResearchAgent(
engine=engine,
model=getattr(engine, "_model", ""),
tools=tools,
)
def handler(text: str) -> str:
result = agent_inst.run(text)
return result.content or "No results."
t = threading.Thread(
target=run_daemon,
kwargs={
"chat_identifier": identifier,
"handler": handler,
},
daemon=True,
)
t.start()
except Exception as exc:
logger.warning("Failed to start iMessage daemon: %s", exc)
# Initialize SendBlue channel if binding sendblue
if req.channel_type == "sendblue":
config = req.config or {}
api_key_id = config.get("api_key_id", "")
api_secret_key = config.get("api_secret_key", "")
from_number = config.get("from_number", "")
if api_key_id and api_secret_key:
try:
from openjarvis.channels.sendblue import (
SendBlueChannel,
)
sb_channel = SendBlueChannel(
api_key_id=api_key_id,
api_secret_key=api_secret_key,
from_number=from_number,
)
sb_channel.connect()
# Store on app state so webhook route can use it
request.app.state.sendblue_channel = sb_channel
# Create or update the channel bridge
bridge = getattr(
request.app.state, "channel_bridge", None
)
if bridge and hasattr(bridge, "_channels"):
bridge._channels["sendblue"] = sb_channel
else:
# Create a new ChannelBridge with DeepResearch
from openjarvis.server.channel_bridge import (
ChannelBridge,
)
from openjarvis.server.session_store import (
SessionStore,
)
session_store = SessionStore()
engine = getattr(
request.app.state, "engine", None
)
dr_agent = None
if engine:
from openjarvis.server.agent_manager_routes import (
_build_deep_research_tools as _bdr,
)
tools = _bdr(
engine=engine, model=""
)
if tools:
from openjarvis.agents.deep_research import (
DeepResearchAgent,
)
model_name = (
getattr(engine, "_model", "")
or getattr(
request.app.state,
"model",
"",
)
)
dr_agent = DeepResearchAgent(
engine=engine,
model=model_name,
tools=tools,
)
bus = getattr(
request.app.state, "bus", None
)
if bus is None:
from openjarvis.core.events import EventBus
bus = EventBus()
bridge = ChannelBridge(
channels={"sendblue": sb_channel},
session_store=session_store,
bus=bus,
agent_manager=manager,
deep_research_agent=dr_agent,
)
request.app.state.channel_bridge = bridge
logger.info(
"SendBlue channel connected: %s",
from_number,
)
except Exception as exc:
logger.warning(
"Failed to init SendBlue channel: %s", exc
)
# Start Slack via slack-bolt Socket Mode
if req.channel_type == "slack":
config = req.config or {}
bot_token = config.get("bot_token", "")
app_token = config.get("app_token", "")
if bot_token and app_token:
try:
from openjarvis.channels.slack_daemon import (
start_slack_daemon,
)
from openjarvis.channels.slack_daemon import (
stop_daemon as stop_slack,
)
# Stop any existing daemon first
stop_slack()
# Spawn as subprocess (reliable)
srv_model = getattr(
getattr(
request.app.state, "engine", None,
),
"_model",
"qwen3.5:9b",
) or "qwen3.5:9b"
pid = start_slack_daemon(
bot_token=bot_token,
app_token=app_token,
model=srv_model,
)
logger.info(
"Slack daemon started (PID %d)", pid,
)
except Exception as exc:
logger.warning(
"Failed to start Slack: %s", exc,
)
return binding
@agents_router.delete("/{agent_id}/channels/{binding_id}")
async def unbind_channel(
agent_id: str, binding_id: str, request: Request,
):
try:
binding = manager._get_binding(binding_id)
if binding:
ch_type = binding.get("channel_type")
if ch_type == "imessage":
from openjarvis.channels.imessage_daemon import (
stop_daemon,
)
stop_daemon()
elif ch_type == "slack":
from openjarvis.channels.slack_daemon import (
stop_daemon as stop_slack_daemon,
)
stop_slack_daemon()
except Exception:
pass
manager.unbind_channel(binding_id)
return {"status": "unbound"}
# ── Messaging ────────────────────────────────────────────
@agents_router.get("/{agent_id}/messages")
def list_messages(agent_id: str):
return {"messages": manager.list_messages(agent_id)}
@agents_router.post("/{agent_id}/messages")
async def send_message(agent_id: str, req: SendMessageRequest, request: Request):
agent_record = manager.get_agent(agent_id)
if not agent_record:
raise HTTPException(status_code=404, detail="Agent not found")
# Auto-recover error-state agents on immediate messages
if req.mode == "immediate" and agent_record["status"] in (
"error",
"needs_attention",
):
manager.update_agent(agent_id, status="idle")
# Store user message in DB (always, regardless of stream mode)
msg = manager.send_message(agent_id, req.content, mode=req.mode)
if not req.stream and req.mode != "immediate":
return msg
if not req.stream and req.mode == "immediate":
# Non-streaming immediate: trigger a background tick so the
# agent processes the message, then return the stored msg.
# Re-use the server's existing system (correct model/engine).
import threading
import time as _time
from openjarvis.agents.executor import AgentExecutor
from openjarvis.core.events import get_event_bus
_srv_engine = getattr(request.app.state, "engine", None)
_srv_model = getattr(request.app.state, "model", "")
_srv_config = getattr(request.app.state, "config", None)
def _immediate_tick():
_start = _time.time()
logger.info(
"Immediate tick starting for agent %s (model=%s)",
agent_id,
_srv_model,
)
try:
executor = AgentExecutor(
manager=manager,
event_bus=get_event_bus(),
)
system = _make_lightweight_system(
_srv_engine,
_srv_model,
_srv_config,
)
executor.set_system(system)
logger.info(
"Immediate tick: system ready in %.1fs, "
"executing tick for agent %s",
_time.time() - _start,
agent_id,
)
executor.execute_tick(agent_id)
logger.info(
"Immediate tick completed for agent %s in %.1fs",
agent_id,
_time.time() - _start,
)
except Exception as exc:
logger.error(
"Immediate tick failed for agent %s: %s",
agent_id,
exc,
exc_info=True,
)
try:
manager.end_tick(agent_id)
except Exception:
pass
manager.update_agent(agent_id, status="error")
manager.update_summary_memory(
agent_id,
f"ERROR: {exc}",
)
threading.Thread(
target=_immediate_tick,
daemon=True,
).start()
return msg
# --- Streaming mode: run agent and return SSE response ---
engine = getattr(request.app.state, "engine", None)
bus = getattr(request.app.state, "bus", None)
if engine is None:
raise HTTPException(
status_code=503,
detail="Engine not available for streaming",
)
return await _stream_managed_agent(
manager=manager,
agent_record=agent_record,
user_content=req.content,
message_id=msg["id"],
engine=engine,
bus=bus,
app_state=request.app.state,
)
# ── State inspection ─────────────────────────────────────
@agents_router.get("/{agent_id}/state")
def get_agent_state(agent_id: str):
agent = manager.get_agent(agent_id)
if agent is None:
raise HTTPException(status_code=404, detail="Agent not found")
return {
"agent": agent,
"tasks": manager.list_tasks(agent_id),
"channels": manager.list_channel_bindings(agent_id),
"messages": manager.list_messages(agent_id),
"checkpoint": manager.get_latest_checkpoint(agent_id),
}
# ── Learning ─────────────────────────────────────────────
@agents_router.get("/{agent_id}/learning")
def get_learning_log(agent_id: str):
if not manager.get_agent(agent_id):
raise HTTPException(status_code=404, detail="Agent not found")
return {"learning_log": manager.list_learning_log(agent_id)}
@agents_router.post("/{agent_id}/learning/run")
def trigger_learning(agent_id: str):
if not manager.get_agent(agent_id):
raise HTTPException(status_code=404, detail="Agent not found")
from openjarvis.core.events import EventType, get_event_bus
bus = get_event_bus()
bus.publish(EventType.AGENT_LEARNING_STARTED, {"agent_id": agent_id})
return {"status": "triggered"}
# ── Traces ───────────────────────────────────────────────
@agents_router.get("/{agent_id}/traces")
def list_traces(agent_id: str, limit: int = 20):
if not manager.get_agent(agent_id):
raise HTTPException(status_code=404, detail="Agent not found")
try:
from openjarvis.core.config import load_config
from openjarvis.traces.store import TraceStore
config = load_config()
store = TraceStore(config.traces.db_path or "~/.openjarvis/traces.db")
traces = store.list_traces(agent=agent_id, limit=limit)
return {
"traces": [
{
"id": t.trace_id,
"outcome": t.outcome,
"duration": t.total_latency_seconds,
"started_at": t.started_at,
"steps": len(t.steps),
"metadata": t.metadata,
}
for t in traces
]
}
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc))
@agents_router.get("/{agent_id}/traces/{trace_id}")
def get_trace(agent_id: str, trace_id: str):
try:
from openjarvis.core.config import load_config
from openjarvis.traces.store import TraceStore
config = load_config()
store = TraceStore(config.traces.db_path or "~/.openjarvis/traces.db")
trace = store.get(trace_id)
if trace is None:
raise HTTPException(status_code=404, detail="Trace not found")
return {
"id": trace.trace_id,
"agent": trace.agent,
"outcome": trace.outcome,
"duration": trace.total_latency_seconds,
"started_at": trace.started_at,
"steps": [
{
"step_type": s.step_type.value,
"input": s.input,
"output": s.output,
"duration": s.duration_seconds,
"metadata": s.metadata,
}
for s in trace.steps
],
}
except HTTPException:
raise
except Exception as exc:
raise HTTPException(status_code=500, detail=str(exc))
# ── Templates ────────────────────────────────────────────
@templates_router.get("")
async def list_templates():
return {"templates": AgentManager.list_templates()}
@templates_router.post("/{template_id}/instantiate")
async def instantiate_template(template_id: str, req: CreateAgentRequest):
return manager.create_from_template(template_id, req.name, overrides=req.config)
# ── Global agent endpoints ───────────────────────────────
global_router = APIRouter(tags=["agents-global"])
@global_router.get("/v1/agents/errors")
def list_error_agents():
all_agents = manager.list_agents()
error_agents = [
a
for a in all_agents
if a["status"] in ("error", "needs_attention", "stalled", "budget_exceeded")
]
return {"agents": error_agents}
@global_router.get("/v1/agents/health")
def agents_health():
all_agents = manager.list_agents()
from collections import Counter
counts = Counter(a["status"] for a in all_agents)
return {
"total": len(all_agents),
"by_status": dict(counts),
}
@global_router.get("/v1/recommended-model")
def recommended_model(request: Request):
engine = getattr(request.app.state, "engine", None)
if engine is None:
return {"model": "", "reason": "No engine available"}
try:
models = engine.list_models()
except Exception:
models = []
return _pick_recommended_model(models)
# ── Tools & credentials ──────────────────────────────────
tools_router = APIRouter(prefix="/v1/tools", tags=["tools"])
@tools_router.get("")
def list_tools(request: Request):
items = build_tools_list()
try:
mcp_tools, _ = _get_mcp_tools(request.app.state)
for tool in mcp_tools:
fn = tool.get("function", {})
items.append(
{
"name": fn.get("name", ""),
"description": fn.get("description", ""),
"category": "mcp",
"source": "mcp",
"requires_credentials": False,
"credential_keys": [],
"configured": True,
}
)
except Exception:
pass
return {"tools": items}
@tools_router.post("/{tool_name}/credentials")
async def save_tool_credentials(tool_name: str, request: Request):
from openjarvis.core.credentials import save_credential
body = await request.json()
saved = []
for key, value in body.items():
save_credential(tool_name, key, value)
saved.append(key)
return {"saved": saved}
@tools_router.get("/{tool_name}/credentials/status")
def credential_status(tool_name: str):
from openjarvis.core.credentials import get_credential_status
return get_credential_status(tool_name)
# ── SendBlue auto-setup helpers ─────────────────────────
sendblue_router = APIRouter(
prefix="/v1/channels/sendblue", tags=["sendblue"]
)
@sendblue_router.post("/verify")
async def sendblue_verify(request: Request):
"""Verify SendBlue credentials and return assigned phone lines."""
body = await request.json()
api_key_id = body.get("api_key_id", "")
api_secret_key = body.get("api_secret_key", "")
if not api_key_id or not api_secret_key:
raise HTTPException(
status_code=400,
detail="api_key_id and api_secret_key are required",
)
import httpx
try:
resp = httpx.get(
"https://api.sendblue.co/api/lines",
headers={
"sb-api-key-id": api_key_id,
"sb-api-secret-key": api_secret_key,
},
timeout=15.0,
)
if resp.status_code == 401:
raise HTTPException(
status_code=401,
detail="Invalid SendBlue credentials",
)
if resp.status_code >= 400:
raise HTTPException(
status_code=resp.status_code,
detail=f"SendBlue API error: {resp.text[:200]}",
)
data = resp.json()
# data might be a list of lines or {"lines": [...]}
lines = data if isinstance(data, list) else data.get(
"lines", data.get("data", [])
)
numbers = []
for line in lines:
num = (
line.get("number")
or line.get("phone_number")
or line.get("from_number")
or (line if isinstance(line, str) else "")
)
if num:
numbers.append(num)
return {
"valid": True,
"numbers": numbers,
"raw": data,
}
except HTTPException:
raise
except Exception as exc:
raise HTTPException(
status_code=502,
detail=f"Failed to reach SendBlue: {exc}",
)
@sendblue_router.post("/register-webhook")
async def sendblue_register_webhook(request: Request):
"""Auto-register the /webhooks/sendblue endpoint with SendBlue."""
body = await request.json()
api_key_id = body.get("api_key_id", "")
api_secret_key = body.get("api_secret_key", "")
webhook_url = body.get("webhook_url", "")
if not webhook_url:
raise HTTPException(
status_code=400,
detail="webhook_url is required",
)
import httpx
try:
resp = httpx.post(
"https://api.sendblue.co/api/account/webhooks",
headers={
"sb-api-key-id": api_key_id,
"sb-api-secret-key": api_secret_key,
"Content-Type": "application/json",
},
json={
"receive": webhook_url,
},
timeout=15.0,
)
return {
"registered": resp.status_code < 300,
"status": resp.status_code,
"response": resp.json()
if resp.status_code < 300
else resp.text[:200],
}
except Exception as exc:
raise HTTPException(
status_code=502,
detail=f"Failed to register webhook: {exc}",
)
@sendblue_router.post("/test")
async def sendblue_test(request: Request):
"""Send a test message via SendBlue to verify the setup works."""
body = await request.json()
api_key_id = body.get("api_key_id", "")
api_secret_key = body.get("api_secret_key", "")
from_number = body.get("from_number", "")
to_number = body.get("to_number", "")
if not to_number:
raise HTTPException(
status_code=400,
detail="to_number is required",
)
import httpx
try:
payload: Dict[str, str] = {
"number": to_number,
"content": (
"Hello from your OpenJarvis agent! "
"Text this number anytime to search your "
"personal data. Reply with any question to try it."
),
}
if from_number:
payload["from_number"] = from_number
resp = httpx.post(
"https://api.sendblue.co/api/send-message",
headers={
"sb-api-key-id": api_key_id,
"sb-api-secret-key": api_secret_key,
"Content-Type": "application/json",
},
json=payload,
timeout=15.0,
)
return {
"sent": resp.status_code < 300,
"status": resp.status_code,
"response": resp.json()
if resp.status_code < 300
else resp.text[:200],
}
except Exception as exc:
raise HTTPException(
status_code=502,
detail=f"Failed to send test message: {exc}",
)
@sendblue_router.get("/health")
async def sendblue_health(request: Request):
"""Check if the SendBlue channel bridge is wired and ready."""
sb = getattr(request.app.state, "sendblue_channel", None)
bridge = getattr(request.app.state, "channel_bridge", None)
has_bridge = bridge is not None and (
hasattr(bridge, "_channels")
and "sendblue" in bridge._channels
)
return {
"channel_connected": sb is not None,
"bridge_wired": has_bridge,
"ready": sb is not None and has_bridge,
}
return (
agents_router,
templates_router,
global_router,
tools_router,
sendblue_router,
)