def run(self, trigger: Any) -> LearningSession:
"""Execute a full distillation session.
Returns the completed LearningSession.
"""
ts = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S")
session_id = f"session-{ts}_{uuid.uuid4().hex[:8]}"
session_dir = self._home / "learning" / "sessions" / session_id
session_dir.mkdir(parents=True, exist_ok=True)
# Initialize session
pre_sha = self._checkpoint_store.current_sha()
session = LearningSession(
id=session_id,
trigger=trigger.kind,
trigger_metadata=trigger.metadata,
status=SessionStatus.INITIATED,
autonomy_mode=self._autonomy,
started_at=datetime.now(timezone.utc),
diagnosis_path=session_dir / "diagnosis.md",
plan_path=session_dir / "plan.json",
benchmark_before=BenchmarkSnapshot(
benchmark_version=self._bench_version,
overall_score=0.0,
cluster_scores={},
task_count=0,
elapsed_seconds=0.0,
),
git_checkpoint_pre=pre_sha,
teacher_cost_usd=0.0,
)
try:
# Cold start check
readiness = check_readiness(self._trace_store, min_traces=self._min_traces)
if not readiness.ready:
session = session.model_copy(
update={
"status": SessionStatus.FAILED,
"error": readiness.message,
"ended_at": datetime.now(timezone.utc),
}
)
self._session_store.save_session(session)
return session
# Capture benchmark before
if self._scorer is not None:
before_snap = self._scorer(
benchmark_version=self._bench_version,
subsample_size=self._subsample_size,
seed=hash(session_id) % (2**31),
)
session = session.model_copy(update={"benchmark_before": before_snap})
# Phase 1: Diagnose
session = session.model_copy(update={"status": SessionStatus.DIAGNOSING})
self._session_store.save_session(session)
diagnosis_runner = DiagnosisRunner(
teacher_engine=self._engine,
teacher_model=self._model,
trace_store=self._trace_store,
benchmark_samples=self._benchmark_samples,
student_runner=self._student_runner,
judge=self._judge,
session_dir=session_dir,
session_id=session_id,
config={
"config_path": self._home / "config.toml",
"openjarvis_home": self._home,
},
max_turns=self._max_tool_calls,
max_cost_usd=self._max_cost,
)
diag_result = diagnosis_runner.run()
cost = diag_result.cost_usd
if not diag_result.clusters:
session = session.model_copy(
update={
"status": SessionStatus.FAILED,
"error": "diagnosis produced no actionable clusters",
"teacher_cost_usd": cost,
"ended_at": datetime.now(timezone.utc),
}
)
self._session_store.save_session(session)
return session
# Phase 2: Plan
session = session.model_copy(update={"status": SessionStatus.PLANNING})
self._session_store.save_session(session)
planner = LearningPlanner(
teacher_engine=self._engine,
teacher_model=self._model,
session_id=session_id,
session_dir=session_dir,
prompt_reader=lambda t: self._read_prompt(t),
)
plan = planner.run(
diagnosis_md=diag_result.diagnosis_md,
clusters=diag_result.clusters,
)
cost += plan.estimated_cost_usd
# Phase 3: Execute
session = session.model_copy(update={"status": SessionStatus.EXECUTING})
self._session_store.save_session(session)
ctx = ApplyContext(
openjarvis_home=self._home,
session_id=session_id,
)
registry = _build_registry()
gate: BenchmarkGate | None = None
if self._scorer is not None:
gate = BenchmarkGate(
scorer=self._scorer,
benchmark_version=self._bench_version,
min_improvement=self._min_improvement,
max_regression=self._max_regression,
subsample_size=self._subsample_size,
)
session_seed = hash(session_id) % (2**31)
outcomes: list[EditOutcome] = []
for edit in plan.edits:
# Manual autonomy mode: everything goes to review
if self._autonomy == AutonomyMode.MANUAL:
outcomes.append(
EditOutcome(
edit_id=edit.id,
status="pending_review",
benchmark_delta=None,
cluster_deltas={},
error=None,
applied_at=None,
)
)
continue
# Manual risk tier: always skip
if edit.risk_tier == EditRiskTier.MANUAL:
outcomes.append(
EditOutcome(
edit_id=edit.id,
status="skipped",
benchmark_delta=None,
cluster_deltas={},
error="manual tier, requires explicit approval",
applied_at=None,
)
)
continue
# Review tier in tiered mode: route to pending
if (
edit.risk_tier == EditRiskTier.REVIEW
and self._autonomy == AutonomyMode.TIERED
):
outcomes.append(
EditOutcome(
edit_id=edit.id,
status="pending_review",
benchmark_delta=None,
cluster_deltas={},
error=None,
applied_at=None,
)
)
continue
# Check if the op is supported
if not registry.is_supported(edit.op):
outcomes.append(
EditOutcome(
edit_id=edit.id,
status="skipped",
benchmark_delta=None,
cluster_deltas={},
error=f"op {edit.op.value} not implemented in v1",
applied_at=None,
)
)
continue
# Validate
applier = registry.get(edit.op)
validation = applier.validate(edit, ctx)
if not validation.ok:
outcomes.append(
EditOutcome(
edit_id=edit.id,
status="rejected_by_gate",
benchmark_delta=None,
cluster_deltas={},
error=validation.reason,
applied_at=None,
)
)
continue
# Apply the edit
try:
applier.apply(edit, ctx)
except Exception as exc:
logger.warning("Edit %s apply failed: %s", edit.id, exc)
outcomes.append(
EditOutcome(
edit_id=edit.id,
status="rejected_by_gate",
benchmark_delta=None,
cluster_deltas={},
error=str(exc),
applied_at=None,
)
)
continue
# If no scorer, accept directly (backward-compat with tests)
if gate is None:
outcomes.append(
EditOutcome(
edit_id=edit.id,
status="applied",
benchmark_delta=None,
cluster_deltas={},
error=None,
applied_at=datetime.now(timezone.utc),
)
)
continue
# Run the benchmark gate
before_snap = session.benchmark_before
gate_result = gate.evaluate(
before=before_snap,
session_seed=session_seed,
)
if gate_result.accepted:
outcomes.append(
EditOutcome(
edit_id=edit.id,
status="applied",
benchmark_delta=gate_result.delta,
cluster_deltas={},
error=None,
applied_at=datetime.now(timezone.utc),
)
)
else:
# Gate rejected — rollback the edit
try:
applier.rollback(edit, ctx)
except Exception as rb_exc:
logger.warning(
"Edit %s rollback failed: %s", edit.id, rb_exc
)
outcomes.append(
EditOutcome(
edit_id=edit.id,
status="rejected_by_gate",
benchmark_delta=gate_result.delta,
cluster_deltas={},
error=gate_result.reason,
applied_at=None,
)
)
# Enqueue pending_review edits
pending_queue = PendingQueue(self._home / "learning" / "pending_review")
has_pending = False
for outcome, edit in zip(outcomes, plan.edits):
if outcome.status == "pending_review":
pending_queue.enqueue(session_id, edit)
has_pending = True
# Capture benchmark after
after_snap = None
if self._scorer is not None:
after_snap = self._scorer(
benchmark_version=self._bench_version,
subsample_size=self._subsample_size,
seed=session_seed,
)
# Determine final status
if has_pending:
final_status = SessionStatus.AWAITING_REVIEW
else:
final_status = SessionStatus.COMPLETED
post_sha = self._checkpoint_store.current_sha()
session = session.model_copy(
update={
"status": final_status,
"edit_outcomes": outcomes,
"benchmark_after": after_snap,
"git_checkpoint_post": post_sha,
"teacher_cost_usd": cost,
"ended_at": datetime.now(timezone.utc),
}
)
except Exception as e:
logger.exception("Session %s failed: %s", session_id, e)
session = session.model_copy(
update={
"status": SessionStatus.FAILED,
"error": str(e),
"ended_at": datetime.now(timezone.utc),
}
)
self._session_store.save_session(session)
# Write session.json artifact
artifact_path = session_dir / "session.json"
artifact_path.write_text(session.model_dump_json(indent=2), encoding="utf-8")
return session