feat(daemon): add critique interrupt endpoint + project-keyed run registry (Task 6.1) (#819)

Phase 6.1 of the Critique Theater rollout: a single new endpoint and the
in-process registry that backs it.

POST /api/projects/:projectId/critique/:runId/interrupt cascades an
AbortController to the orchestrator that owns the spawned CLI so the
parser can flush best-so-far state and emit critique.interrupted before
the process exits. Backed by a new in-process run registry that the
orchestrator wiring registers each run into before runOrchestrator is
invoked, and unregisters in a finally block.

The registry is keyed by (projectId, runId), not just runId. A request
to interrupt project p1's runId cannot find or abort a registry handle
that belongs to project p2 even if their ids ever collide. The HTTP
handler also performs its own DB-row projectId check before calling the
registry, so cross-project leakage is blocked at two layers.

The endpoint is idempotent on already-interrupted rows: a client that
lost the first response and retries observes 202 with prevStatus
"interrupted" rather than a 409 conflict. Other terminal statuses
(shipped, failed, timed_out, degraded, below_threshold, legacy) still
return 409 because those runs reached their real terminal state on
their own and an interrupt is no longer meaningful.

Recovery path for stale running rows: when registry.interrupt returns
false (the in-process registry has no AbortController for this
projectId/runId pair) but the DB still says 'running', the endpoint
marks the row 'interrupted' directly with recoveryReason='no_live_handle'
and returns 202 with recovered=true. This window opens after a daemon
restart in the gap before reconcileStaleRuns sees the row old enough.
Without the recovery branch the endpoint would lie: 202 accepted, no
child signaled, no critique.interrupted event, row stuck running. The
new persistence helper markRunInterruptedRecovery mirrors the per-row
write reconcileStaleRuns already does, gated on status='running' so a
row that just transitioned terminal is not overwritten.

Task 6.2 (rerun endpoint) is intentionally not in this PR. The earlier
draft conflated row insertion between the handler and runOrchestrator
(primary key collision) and did not actually start a new agent spawn.
Rerun needs a real chat-run path with prior-art context, an artifact-id
validator, and SQL LIKE escaping that the row lookup path is missing
today; it is cleaner shipped as a follow-up than wedged into this PR.

Tests:
- critique-run-registry: 17 cases covering register, get, interrupt,
  unregister, list, plus the new (projectId, runId) composite key
  invariants (cross-project register, cross-project get/interrupt
  isolation, unregister keying).
- critique-interrupt-endpoint: 17 cases covering 202 happy path, 404 on
  unknown run, 404 on cross-project run, 404 cross-project leak guard at
  the registry layer, 409 on terminal statuses, 202 idempotent retry on
  already-interrupted, stale-handle defense, 202 + recovered on a stale
  running row with no live handle, 400 on bad params.

Incidental: apps/web/src/i18n/locales/id.ts was missing 18 fileViewer
deploy/Cloudflare keys after upstream landed PR #805 (R2 release
publishing). Without those keys the workspace web typecheck fails on
the i18n Dict equality check, blocking CI on every PR. Added Indonesian
translations for the missing keys to unblock.

Co-authored-by: Nagendhra <nagendhra405@gmail.com>
This commit is contained in:
Nagendhra Madishetti 2026-05-07 23:29:37 -04:00 committed by GitHub
parent 42ae1da03d
commit 6de802ba70
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 839 additions and 0 deletions

View file

@ -0,0 +1,129 @@
import type { Request, Response } from 'express';
import type Database from 'better-sqlite3';
import {
getCritiqueRun,
markRunInterruptedRecovery,
type CritiqueRunStatus,
} from './persistence.js';
import type { RunRegistry } from './run-registry.js';
/** HTTP status codes used by the interrupt endpoint. */
const HTTP_BAD_REQUEST = 400;
const HTTP_NOT_FOUND = 404;
const HTTP_CONFLICT = 409;
const HTTP_ACCEPTED = 202;
/**
* POST /api/projects/:projectId/critique/:runId/interrupt
*
* Validates the run exists and belongs to the URL project, then signals the
* registered AbortController so the orchestrator can flush best-so-far state
* and emit critique.interrupted.
*
* Idempotency: if the row is already 'interrupted', the endpoint returns 202
* with prevStatus='interrupted' rather than 409, so a client that lost the
* first response and retries does not see the run flip from 202 to a hard
* conflict. Other terminal statuses (shipped, failed, timed_out, degraded,
* below_threshold, legacy) still return 409 because the run reached its real
* terminal state on its own and an interrupt is no longer meaningful.
*
* @see specs/current/critique-theater.md § interrupt endpoint (Task 6.1)
*/
export function handleCritiqueInterrupt(
db: Database.Database,
registry: RunRegistry,
): (req: Request, res: Response) => void {
return function critiqueInterruptHandler(req: Request, res: Response): void {
const projectId =
typeof req.params['projectId'] === 'string'
? req.params['projectId'].trim()
: '';
const runId =
typeof req.params['runId'] === 'string'
? req.params['runId'].trim()
: '';
if (!projectId || !runId) {
res
.status(HTTP_BAD_REQUEST)
.json({ error: { code: 'BAD_REQUEST', message: 'projectId and runId are required' } });
return;
}
const row = getCritiqueRun(db, runId);
// Cross-project leak guard: a request to interrupt project p1's runId
// must NOT find a row that actually belongs to project p2. Returning 404
// (not 403) avoids leaking the existence of other projects' runs.
if (row === null || row.projectId !== projectId) {
res
.status(HTTP_NOT_FOUND)
.json({ error: { code: 'NOT_FOUND', message: 'critique run not found' } });
return;
}
const liveStatus = row.status as CritiqueRunStatus | 'running';
if (liveStatus === 'interrupted') {
// Idempotent retry path. The original interrupt already drove the run
// into the terminal 'interrupted' state; a duplicate request from a
// retrying client should observe the same accepted outcome rather than
// a 409.
res.status(HTTP_ACCEPTED).json({
runId,
accepted: true,
prevStatus: 'interrupted',
});
return;
}
if (liveStatus !== 'running') {
res
.status(HTTP_CONFLICT)
.json({
error: {
code: 'CONFLICT',
message: `run is already in terminal status: ${row.status}`,
currentStatus: row.status,
},
});
return;
}
// Project-keyed registry call: a request to interrupt project p1's runId
// cannot match a registry handle from project p2 even if a runId
// collision somehow occurred.
const aborted = registry.interrupt(projectId, runId, 'user_requested');
if (!aborted) {
// The DB row says 'running' but the in-process registry has no live
// AbortController for it. This happens after a daemon restart, in
// the window before reconcileStaleRuns considers the row old enough
// to flip to 'interrupted' on its own. Without this branch the
// endpoint would lie: 202 accepted, but no child is signaled, no
// critique.interrupted event is emitted, and the row stays
// 'running' until reconcileStaleRuns finally catches it.
//
// Recovery path: mark the row 'interrupted' directly with
// recoveryReason='no_live_handle' (mirroring how reconcileStaleRuns
// writes 'daemon_restart'), so the row's terminal state matches
// what the user asked for and the response carries the recovered
// flag for clients that want to distinguish the two paths.
const recovered = markRunInterruptedRecovery(db, runId, 'no_live_handle');
res.status(HTTP_ACCEPTED).json({
runId,
accepted: true,
prevStatus: 'running',
recovered: true,
...(recovered ? {} : { recoveryFailed: true }),
});
return;
}
res.status(HTTP_ACCEPTED).json({
runId,
accepted: true,
prevStatus: 'running',
});
};
}

View file

@ -309,6 +309,43 @@ export function deleteCritiqueRun(db: Database.Database, id: string): void {
db.prepare(`DELETE FROM critique_runs WHERE id = ?`).run(id);
}
/**
* Marks a single 'running' row as 'interrupted' with the supplied
* recoveryReason embedded in rounds_json. Mirror of the per-row write in
* reconcileStaleRuns(), kept as its own function so the interrupt endpoint
* can use it when a request arrives for a row that has no live
* AbortController in the registry (the post-daemon-restart window before
* reconcileStaleRuns considers the row old enough). Atomic on the
* status='running' guard so a row that just transitioned to a different
* terminal state is not overwritten.
*
* Returns true when a row was mutated, false when the id was missing or
* not in 'running' status.
*/
export function markRunInterruptedRecovery(
db: Database.Database,
id: string,
recoveryReason: string,
now: number = Date.now(),
): boolean {
const existing = db
.prepare(`SELECT ${COLS} FROM critique_runs WHERE id = ? AND status = 'running'`)
.get(id) as RawCritiqueRunRow | undefined;
if (existing === undefined) return false;
const { rounds } = parseRoundsPayload(existing.roundsJson);
const newPayload = serializeRoundsPayload(rounds, recoveryReason);
const result = db
.prepare(
`UPDATE critique_runs
SET status = 'interrupted',
rounds_json = ?,
updated_at = ?
WHERE id = ? AND status = 'running'`,
)
.run(newPayload, now, id);
return result.changes > 0;
}
/**
* Recovery scan called on daemon boot: any run still in a non-terminal status
* older than staleAfterMs is marked 'interrupted' with rounds_json.recoveryReason

View file

@ -0,0 +1,108 @@
/**
* In-process registry of in-flight critique runs. The daemon process is the
* single owner of all critique state; the registry exists so the interrupt
* endpoint can cascade an AbortController to the orchestrator that owns the
* spawned CLI. The registry is intentionally NOT persisted: a daemon restart
* mid-run is handled by reconcileStaleRuns on boot, not by recovering live
* AbortControllers.
*
* All lookup operations require BOTH projectId and runId. The composite key
* prevents a request to interrupt project p1's runId from accidentally
* aborting project p2's run that happens to share the same id (defense in
* depth on top of the HTTP handler's own DB-row projectId check).
*
* @see specs/current/critique-theater.md § Failure modes (interrupt)
*/
/** Handle for a single in-flight critique run. */
export interface RunHandle {
runId: string;
projectId: string;
abort: AbortController;
startedAt: number;
}
/** Public surface of the in-process run registry. */
export interface RunRegistry {
/**
* Register a new in-flight handle. Throws if a handle for the same
* (projectId, runId) is already registered (indicates a bug in the caller,
* not a user error).
*/
register(handle: RunHandle): void;
/**
* Returns the handle if the (projectId, runId) pair is registered; null
* otherwise. A runId from a different project will not match.
*/
get(projectId: string, runId: string): RunHandle | null;
/**
* Signals the AbortController for the given (projectId, runId).
* Returns true if the pair was found and aborted; false otherwise. A
* runId-only match against a different project does NOT abort.
*/
interrupt(projectId: string, runId: string, reason?: string): boolean;
/**
* Removes the entry for the given (projectId, runId). Called by the server
* after the orchestrator settles. No-op if the pair is not registered.
*/
unregister(projectId: string, runId: string): void;
/**
* Snapshot for diagnostics only. Returns a defensive copy so callers cannot
* mutate the registry's internal state.
*/
list(): RunHandle[];
}
/**
* Builds the internal composite key for a (projectId, runId) pair. Pipe is
* not a legal character in either projectId or runId per the daemon's id
* generation rules, so collisions across pairs are impossible.
*/
function compositeKey(projectId: string, runId: string): string {
return `${projectId}|${runId}`;
}
/**
* Creates an in-memory RunRegistry backed by a Map.
* Node is single-threaded; no locking is needed.
*
* @see specs/current/critique-theater.md § interrupt endpoint (Task 6.1)
*/
export function createRunRegistry(): RunRegistry {
const store = new Map<string, RunHandle>();
return {
register(handle: RunHandle): void {
const key = compositeKey(handle.projectId, handle.runId);
if (store.has(key)) {
throw new Error(
`RunRegistry: duplicate (projectId="${handle.projectId}", runId="${handle.runId}"); unregister before re-registering`,
);
}
store.set(key, handle);
},
get(projectId: string, runId: string): RunHandle | null {
return store.get(compositeKey(projectId, runId)) ?? null;
},
interrupt(projectId: string, runId: string, reason?: string): boolean {
const handle = store.get(compositeKey(projectId, runId));
if (handle === undefined) return false;
handle.abort.abort(reason);
return true;
},
unregister(projectId: string, runId: string): void {
store.delete(compositeKey(projectId, runId));
},
list(): RunHandle[] {
return [...store.values()];
},
};
}

View file

@ -42,6 +42,8 @@ import { createClaudeStreamHandler } from './claude-stream.js';
import { loadCritiqueConfigFromEnv } from './critique/config.js';
import { reconcileStaleRuns } from './critique/persistence.js';
import { runOrchestrator } from './critique/orchestrator.js';
import { createRunRegistry } from './critique/run-registry.js';
import { handleCritiqueInterrupt } from './critique/interrupt-handler.js';
import { createCopilotStreamHandler } from './copilot-stream.js';
import { createJsonEventStreamHandler } from './json-event-stream.js';
import { createQoderStreamHandler } from './qoder-stream.js';
@ -822,6 +824,11 @@ const critiqueCfg = loadCritiqueConfigFromEnv();
// Adapter denylist for orchestrator routing is implicit: anything that is
// not the 'plain' streamFormat falls through to legacy single-pass.
const critiqueWarnedAdapters = new Set<string>();
// In-process registry of in-flight critique runs so the interrupt endpoint
// can cascade an AbortController to the matching orchestrator invocation.
// Created once per process; not persisted across daemon restarts.
const critiqueRunRegistry = createRunRegistry();
export const SSE_KEEPALIVE_INTERVAL_MS = 25_000;
export function createAgentRuntimeEnv(
@ -4694,6 +4701,19 @@ export async function startServer({ port = 7456, host = process.env.OD_BIND_HOST
// contract payload as event.data.
const critiqueBus = { emit: (e) => send(e.event, e.data) };
// Register this run with the in-process registry so the interrupt
// endpoint can cascade an AbortController to the orchestrator. The
// register call must run BEFORE runOrchestrator is invoked, so a
// request that arrives between spawn and orchestrator-start cannot
// miss a runId that already has a live child process.
const critiqueAbort = new AbortController();
critiqueRunRegistry.register({
runId: critiqueRunId,
projectId: critiqueProjectKey,
abort: critiqueAbort,
startedAt: Date.now(),
});
// Stderr forwarding and child.on('error') must be wired BEFORE the
// orchestrator awaits stdout. Otherwise a CLI that floods stderr can
// fill the OS pipe and deadlock the run until the total timeout, and
@ -4726,6 +4746,7 @@ export async function startServer({ port = 7456, host = process.env.OD_BIND_HOST
stdout: stdoutIterable,
child,
childExitPromise,
signal: critiqueAbort.signal,
});
// Map the critique terminal status to the chat run lifecycle.
// 'shipped' and 'below_threshold' both ran to a ship decision and
@ -4744,6 +4765,8 @@ export async function startServer({ port = 7456, host = process.env.OD_BIND_HOST
} catch (err) {
send('error', createSseErrorPayload('AGENT_EXECUTION_FAILED', err instanceof Error ? err.message : String(err)));
design.runs.finish(run, 'failed', 1, null);
} finally {
critiqueRunRegistry.unregister(critiqueProjectKey, critiqueRunId);
}
return;
}
@ -5092,6 +5115,15 @@ export async function startServer({ port = 7456, host = process.env.OD_BIND_HOST
}
});
// ---- Critique Theater endpoints (Phase 6) --------------------------------
// POST /api/projects/:projectId/critique/:runId/interrupt
// Cascades an AbortController to the in-flight orchestrator for the given run.
app.post(
'/api/projects/:projectId/critique/:runId/interrupt',
handleCritiqueInterrupt(db, critiqueRunRegistry),
);
// ---- API Proxy (SSE) for API-compatible endpoints ------------------------
// Browser → daemon → external API. Avoids CORS issues with third-party
// providers. This keeps BYOK setup zero-config for local users at the cost of

View file

@ -0,0 +1,359 @@
/**
* Tests for POST /api/projects/:projectId/critique/:runId/interrupt
*
* Each test mounts the handler on a fresh express mini-app with an in-memory
* SQLite database and a real RunRegistry so the full handler logic is exercised
* without starting the full daemon server.
*
* @see specs/current/critique-theater.md § interrupt endpoint (Task 6.1)
*/
import http from 'node:http';
import { describe, it, expect, beforeEach, afterEach } from 'vitest';
import express from 'express';
import Database from 'better-sqlite3';
import {
getCritiqueRun,
insertCritiqueRun,
migrateCritique,
type CritiqueRunStatus,
} from '../src/critique/persistence.js';
import { createRunRegistry } from '../src/critique/run-registry.js';
import { handleCritiqueInterrupt } from '../src/critique/interrupt-handler.js';
// ---------------------------------------------------------------------------
// Test infrastructure
// ---------------------------------------------------------------------------
function freshDb(): Database.Database {
const db = new Database(':memory:');
db.pragma('journal_mode = WAL');
db.pragma('foreign_keys = ON');
db.exec(`
CREATE TABLE projects (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
CREATE TABLE conversations (
id TEXT PRIMARY KEY,
project_id TEXT NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE CASCADE
);
INSERT INTO projects (id, name, created_at, updated_at) VALUES ('p1', 'Project 1', 0, 0);
INSERT INTO projects (id, name, created_at, updated_at) VALUES ('p2', 'Project 2', 0, 0);
`);
migrateCritique(db);
return db;
}
function startMiniServer(
db: Database.Database,
registry: ReturnType<typeof createRunRegistry>,
): Promise<{ baseUrl: string; server: http.Server }> {
const app = express();
app.use(express.json());
app.post(
'/api/projects/:projectId/critique/:runId/interrupt',
handleCritiqueInterrupt(db, registry),
);
return new Promise((resolve, reject) => {
const server = app.listen(0, '127.0.0.1', () => {
const addr = server.address();
if (!addr || typeof addr !== 'object') {
reject(new Error('could not bind'));
return;
}
resolve({ baseUrl: `http://127.0.0.1:${addr.port}`, server });
});
server.on('error', reject);
});
}
async function post(url: string, body?: unknown): Promise<{ status: number; json: unknown }> {
const init: RequestInit =
body !== undefined
? { method: 'POST', headers: { 'Content-Type': 'application/json' }, body: JSON.stringify(body) }
: { method: 'POST' };
const res = await fetch(url, init);
const json: unknown = await res.json().catch(() => null);
return { status: res.status, json };
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
describe('POST /api/projects/:projectId/critique/:runId/interrupt', () => {
let db: Database.Database;
let registry: ReturnType<typeof createRunRegistry>;
let baseUrl: string;
let server: http.Server;
beforeEach(async () => {
db = freshDb();
registry = createRunRegistry();
({ baseUrl, server } = await startMiniServer(db, registry));
});
afterEach(() => {
db.close();
return new Promise<void>((resolve) => server.close(() => resolve()));
});
// ---- 202: happy path -------------------------------------------------------
it('returns 202 and fires the AbortController for a registered running run', async () => {
const abort = new AbortController();
insertCritiqueRun(db, {
id: 'crun_aaa',
projectId: 'p1',
status: 'running',
protocolVersion: 1,
});
registry.register({ runId: 'crun_aaa', projectId: 'p1', abort, startedAt: Date.now() });
const { status, json } = await post(`${baseUrl}/api/projects/p1/critique/crun_aaa/interrupt`);
expect(status).toBe(202);
expect(json).toMatchObject({ runId: 'crun_aaa', accepted: true, prevStatus: 'running' });
expect(abort.signal.aborted).toBe(true);
});
it('aborts with reason "user_requested"', async () => {
const abort = new AbortController();
insertCritiqueRun(db, {
id: 'crun_bbb',
projectId: 'p1',
status: 'running',
protocolVersion: 1,
});
registry.register({ runId: 'crun_bbb', projectId: 'p1', abort, startedAt: Date.now() });
await post(`${baseUrl}/api/projects/p1/critique/crun_bbb/interrupt`);
expect(abort.signal.reason).toBe('user_requested');
});
// ---- 404: unknown run ------------------------------------------------------
it('returns 404 when runId does not exist in the DB', async () => {
const { status } = await post(`${baseUrl}/api/projects/p1/critique/ghost/interrupt`);
expect(status).toBe(404);
});
it('returns 404 when runId exists but belongs to a different project', async () => {
insertCritiqueRun(db, {
id: 'crun_ccc',
projectId: 'p2',
status: 'running',
protocolVersion: 1,
});
const { status } = await post(`${baseUrl}/api/projects/p1/critique/crun_ccc/interrupt`);
expect(status).toBe(404);
});
// ---- 202 + recovered: stale running row, no live handle ------------------
it('marks the row interrupted as a recovery path when no live handle is registered', async () => {
// mrcfps round-2: after a daemon restart but before reconcileStaleRuns
// sees the row old enough, the DB still says 'running' and the
// in-process registry has no AbortController. Without recovery the
// endpoint would return 202 accepted but no child is signaled and no
// critique.interrupted event fires; the row would stay 'running' until
// reconcileStaleRuns eventually catches it. Recovery path: mark the row
// 'interrupted' directly with recoveryReason='no_live_handle'.
insertCritiqueRun(db, {
id: 'crun_no_handle',
projectId: 'p1',
status: 'running',
protocolVersion: 1,
});
// No registry.register call. The row exists in DB but has no live
// handle in the registry.
const { status, json } = await post(
`${baseUrl}/api/projects/p1/critique/crun_no_handle/interrupt`,
);
expect(status).toBe(202);
expect(json).toMatchObject({
runId: 'crun_no_handle',
accepted: true,
prevStatus: 'running',
recovered: true,
});
expect((json as Record<string, unknown>)['recoveryFailed']).toBeUndefined();
// Most important post-condition: the row is now terminal so it does
// not sit in 'running' forever and a subsequent rerun is not blocked.
const row = getCritiqueRun(db, 'crun_no_handle');
expect(row?.status).toBe('interrupted');
});
it('does not abort a same-runId registry handle from a different project (cross-project leak guard)', async () => {
// A request to interrupt project p1's runId must NOT find or abort a
// registry handle from project p2 even if the runIds collide. The
// registry is keyed by (projectId, runId), so this is enforced at the
// registry layer in addition to the handler's DB-row projectId check.
const sharedRunId = 'crun_shared_id';
const p2Abort = new AbortController();
insertCritiqueRun(db, {
id: sharedRunId,
projectId: 'p2',
status: 'running',
protocolVersion: 1,
});
registry.register({
runId: sharedRunId,
projectId: 'p2',
abort: p2Abort,
startedAt: Date.now(),
});
// Request to p1 for that same runId: must 404 and must NOT abort p2's signal.
const { status } = await post(
`${baseUrl}/api/projects/p1/critique/${sharedRunId}/interrupt`,
);
expect(status).toBe(404);
expect(p2Abort.signal.aborted).toBe(false);
});
// ---- 409: terminal status --------------------------------------------------
// Terminal statuses where the run reached its real outcome on its own and
// an interrupt is no longer meaningful: must return 409. Note that
// 'interrupted' is intentionally excluded here and handled by the
// idempotency tests below.
const TERMINAL_STATUSES: CritiqueRunStatus[] = [
'shipped',
'below_threshold',
'timed_out',
'degraded',
'failed',
'legacy',
];
for (const terminalStatus of TERMINAL_STATUSES) {
it(`returns 409 for run already in status "${terminalStatus}"`, async () => {
insertCritiqueRun(db, {
id: `crun_t_${terminalStatus}`,
projectId: 'p1',
status: terminalStatus,
protocolVersion: 1,
});
const { status, json } = await post(
`${baseUrl}/api/projects/p1/critique/crun_t_${terminalStatus}/interrupt`,
);
expect(status).toBe(409);
const err = (json as Record<string, unknown>)['error'] as Record<string, unknown>;
expect(err['currentStatus']).toBe(terminalStatus);
});
}
// ---- 202: idempotent retry on already-interrupted run --------------------
it('returns 202 (not 409) when the run is already in status "interrupted"', async () => {
// Idempotency contract: a client that lost the first interrupt response
// and retries should observe the same accepted outcome rather than a
// 409 conflict. The interrupt has already done its job; the retry is a
// no-op against a row whose terminal state was reached *because of*
// interrupt itself.
insertCritiqueRun(db, {
id: 'crun_interrupted_retry',
projectId: 'p1',
status: 'interrupted',
protocolVersion: 1,
});
const { status, json } = await post(
`${baseUrl}/api/projects/p1/critique/crun_interrupted_retry/interrupt`,
);
expect(status).toBe(202);
expect(json).toMatchObject({
runId: 'crun_interrupted_retry',
accepted: true,
prevStatus: 'interrupted',
});
});
it('idempotent retry does not re-fire a registry abort even if a stale handle is still registered', async () => {
// Defense in depth: if the registry handle was somehow not cleaned up
// after the first interrupt drove the row terminal, a retry must not
// double-abort or otherwise invoke registry side effects.
const abort = new AbortController();
insertCritiqueRun(db, {
id: 'crun_interrupted_stale',
projectId: 'p1',
status: 'interrupted',
protocolVersion: 1,
});
registry.register({
runId: 'crun_interrupted_stale',
projectId: 'p1',
abort,
startedAt: Date.now(),
});
await post(
`${baseUrl}/api/projects/p1/critique/crun_interrupted_stale/interrupt`,
);
expect(abort.signal.aborted).toBe(false);
});
// ---- 400: bad params -------------------------------------------------------
it('returns 400 when projectId param is missing (route mismatch becomes 404 at express)', async () => {
// The route /api/projects//critique/crun_x/interrupt won't match at all
// so express 404s; we verify the handler itself returns 400 for empty strings
// by calling an adjacent unmatched path; the real validation is covered by
// the handler unit: empty projectId after trim() returns 400.
// Test via a direct handler invocation to cover the guard.
const mockReq = {
params: { projectId: '', runId: 'crun_x' },
body: {},
} as unknown as import('express').Request;
let capturedStatus = 0;
let capturedBody: unknown = null;
const mockRes = {
status(s: number) { capturedStatus = s; return mockRes; },
json(b: unknown) { capturedBody = b; return mockRes; },
} as unknown as import('express').Response;
handleCritiqueInterrupt(db, registry)(mockReq, mockRes);
expect(capturedStatus).toBe(400);
expect((capturedBody as Record<string, unknown>)['error']).toBeTruthy();
});
it('returns 400 when runId param is empty string', () => {
const mockReq = {
params: { projectId: 'p1', runId: ' ' },
body: {},
} as unknown as import('express').Request;
let capturedStatus = 0;
const mockRes = {
status(s: number) { capturedStatus = s; return mockRes; },
json() { return mockRes; },
} as unknown as import('express').Response;
handleCritiqueInterrupt(db, registry)(mockReq, mockRes);
expect(capturedStatus).toBe(400);
});
// ---- no unhandled throws ---------------------------------------------------
it('does not throw to the framework error handler on any valid input path', async () => {
// Interrupt a non-existent run; handler should return 404 cleanly.
const { status } = await post(`${baseUrl}/api/projects/p1/critique/no-such-run/interrupt`);
expect(status).toBe(404);
});
});

View file

@ -0,0 +1,174 @@
import { describe, it, expect, beforeEach } from 'vitest';
import {
createRunRegistry,
type RunHandle,
type RunRegistry,
} from '../src/critique/run-registry.js';
function makeHandle(runId: string, projectId = 'p1'): RunHandle {
return {
runId,
projectId,
abort: new AbortController(),
startedAt: Date.now(),
};
}
describe('RunRegistry', () => {
let registry: RunRegistry;
beforeEach(() => {
registry = createRunRegistry();
});
// ---------------------------------------------------------------------------
// register / get
// ---------------------------------------------------------------------------
it('returns null for an unregistered (projectId, runId)', () => {
expect(registry.get('p1', 'unknown')).toBeNull();
});
it('register + get round-trips the handle', () => {
const h = makeHandle('crun_01');
registry.register(h);
expect(registry.get('p1', 'crun_01')).toBe(h);
});
it('register throws on duplicate (projectId, runId)', () => {
const h = makeHandle('crun_dup');
registry.register(h);
expect(() => registry.register(makeHandle('crun_dup'))).toThrow(
/duplicate \(projectId="p1", runId="crun_dup"\)/,
);
});
it('register accepts the same runId across different projects', () => {
// The composite key is (projectId, runId), so two different projects
// legitimately can carry runs that share an id without colliding.
const a = makeHandle('crun_shared', 'p1');
const b = makeHandle('crun_shared', 'p2');
expect(() => {
registry.register(a);
registry.register(b);
}).not.toThrow();
expect(registry.get('p1', 'crun_shared')).toBe(a);
expect(registry.get('p2', 'crun_shared')).toBe(b);
});
it('get does not return a handle from a different project even when the runId matches', () => {
// Cross-project leak guard: looking up a runId that exists in project p2
// while authenticated as p1 must return null, not p2's handle.
const inP2 = makeHandle('crun_only_in_p2', 'p2');
registry.register(inP2);
expect(registry.get('p1', 'crun_only_in_p2')).toBeNull();
expect(registry.get('p2', 'crun_only_in_p2')).toBe(inP2);
});
// ---------------------------------------------------------------------------
// interrupt
// ---------------------------------------------------------------------------
it('interrupt returns false for an unknown (projectId, runId)', () => {
expect(registry.interrupt('p1', 'not-there')).toBe(false);
});
it('interrupt fires the AbortController and returns true', () => {
const h = makeHandle('crun_02');
registry.register(h);
const aborted = registry.interrupt('p1', 'crun_02', 'user_requested');
expect(aborted).toBe(true);
expect(h.abort.signal.aborted).toBe(true);
});
it('interrupt passes the reason string to the abort signal', () => {
const h = makeHandle('crun_03');
registry.register(h);
registry.interrupt('p1', 'crun_03', 'test_reason');
expect(h.abort.signal.reason).toBe('test_reason');
});
it('interrupt with no reason still aborts the signal', () => {
const h = makeHandle('crun_04');
registry.register(h);
registry.interrupt('p1', 'crun_04');
expect(h.abort.signal.aborted).toBe(true);
});
it('interrupt does not abort a same-runId handle that belongs to a different project', () => {
// Cross-project leak guard for interrupt: a request to abort project p1's
// runId must NOT abort project p2's handle if their ids ever collide.
const sharedRunId = 'crun_shared_id';
const inP1 = makeHandle(sharedRunId, 'p1');
const inP2 = makeHandle(sharedRunId, 'p2');
registry.register(inP1);
registry.register(inP2);
const aborted = registry.interrupt('p1', sharedRunId);
expect(aborted).toBe(true);
expect(inP1.abort.signal.aborted).toBe(true);
expect(inP2.abort.signal.aborted).toBe(false);
});
// ---------------------------------------------------------------------------
// unregister
// ---------------------------------------------------------------------------
it('unregister removes the entry so get returns null', () => {
const h = makeHandle('crun_05');
registry.register(h);
registry.unregister('p1', 'crun_05');
expect(registry.get('p1', 'crun_05')).toBeNull();
});
it('unregister is a no-op for an unknown (projectId, runId)', () => {
expect(() => registry.unregister('p1', 'ghost')).not.toThrow();
});
it('unregister keys by project too: removing p1\'s runId does not remove p2\'s same-id handle', () => {
const sharedRunId = 'crun_shared_for_unregister';
const inP1 = makeHandle(sharedRunId, 'p1');
const inP2 = makeHandle(sharedRunId, 'p2');
registry.register(inP1);
registry.register(inP2);
registry.unregister('p1', sharedRunId);
expect(registry.get('p1', sharedRunId)).toBeNull();
expect(registry.get('p2', sharedRunId)).toBe(inP2);
});
// ---------------------------------------------------------------------------
// list
// ---------------------------------------------------------------------------
it('list returns empty array when no runs are registered', () => {
expect(registry.list()).toEqual([]);
});
it('list returns a snapshot of all registered handles across projects', () => {
const h1 = makeHandle('crun_10', 'p1');
const h2 = makeHandle('crun_11', 'p2');
registry.register(h1);
registry.register(h2);
const snap = registry.list();
expect(snap).toHaveLength(2);
expect(snap).toContain(h1);
expect(snap).toContain(h2);
});
it('list returns a defensive copy (mutations do not affect registry)', () => {
registry.register(makeHandle('crun_12'));
const snap = registry.list();
snap.splice(0);
expect(registry.list()).toHaveLength(1);
});
it('list excludes unregistered handles', () => {
registry.register(makeHandle('crun_13'));
registry.register(makeHandle('crun_14'));
registry.unregister('p1', 'crun_13');
const ids = registry.list().map((h) => h.runId);
expect(ids).not.toContain('crun_13');
expect(ids).toContain('crun_14');
});
});