diff --git a/apps/daemon/src/critique/config.ts b/apps/daemon/src/critique/config.ts new file mode 100644 index 000000000..e35f6d833 --- /dev/null +++ b/apps/daemon/src/critique/config.ts @@ -0,0 +1,88 @@ +import { defaultCritiqueConfig, FALLBACK_POLICIES } from '@open-design/contracts/critique'; +import type { CritiqueConfig } from '@open-design/contracts/critique'; + +/** + * Load CritiqueConfig from process.env. Keys map 1:1 to OD_CRITIQUE_*. + * Missing values fall back to defaultCritiqueConfig(). Invalid values + * (non-numeric, negative, out-of-range) throw RangeError so misconfig + * surfaces at boot, never silently. + * + * @see specs/current/critique-theater.md § Configuration (env vars) + */ +export function loadCritiqueConfigFromEnv(env: NodeJS.ProcessEnv = process.env): CritiqueConfig { + const defaults = defaultCritiqueConfig(); + + const enabled = parseEnabled(env['OD_CRITIQUE_ENABLED'], defaults.enabled); + const maxRounds = parsePositiveInt('OD_CRITIQUE_MAX_ROUNDS', env['OD_CRITIQUE_MAX_ROUNDS'], defaults.maxRounds); + const scoreThreshold = parseNonNegativeFloat('OD_CRITIQUE_SCORE_THRESHOLD', env['OD_CRITIQUE_SCORE_THRESHOLD'], defaults.scoreThreshold); + const scoreScale = parsePositiveInt('OD_CRITIQUE_SCORE_SCALE', env['OD_CRITIQUE_SCORE_SCALE'], defaults.scoreScale); + const perRoundTimeoutMs = parsePositiveInt('OD_CRITIQUE_PER_ROUND_TIMEOUT_MS', env['OD_CRITIQUE_PER_ROUND_TIMEOUT_MS'], defaults.perRoundTimeoutMs); + const totalTimeoutMs = parsePositiveInt('OD_CRITIQUE_TOTAL_TIMEOUT_MS', env['OD_CRITIQUE_TOTAL_TIMEOUT_MS'], defaults.totalTimeoutMs); + const parserMaxBlockBytes = parsePositiveInt('OD_CRITIQUE_PARSER_MAX_BLOCK_BYTES', env['OD_CRITIQUE_PARSER_MAX_BLOCK_BYTES'], defaults.parserMaxBlockBytes); + const fallbackPolicy = parseFallbackPolicy(env['OD_CRITIQUE_FALLBACK_POLICY'], defaults.fallbackPolicy); + + // Cross-field validation: threshold cannot exceed scale. + if (scoreThreshold > scoreScale + 1e-9) { + throw new RangeError( + `OD_CRITIQUE_SCORE_THRESHOLD (${scoreThreshold}) must be <= OD_CRITIQUE_SCORE_SCALE (${scoreScale})`, + ); + } + + return { + ...defaults, + enabled, + maxRounds, + scoreThreshold, + scoreScale, + perRoundTimeoutMs, + totalTimeoutMs, + parserMaxBlockBytes, + fallbackPolicy, + }; +} + +// --------------------------------------------------------------------------- +// Parsing helpers +// --------------------------------------------------------------------------- + +function parseEnabled(raw: string | undefined, fallback: boolean): boolean { + if (raw === undefined) return fallback; + const v = raw.trim().toLowerCase(); + return v === 'true' || v === '1' || v === 'yes'; +} + +function parsePositiveInt(key: string, raw: string | undefined, fallback: number): number { + if (raw === undefined) return fallback; + const n = Number(raw); + if (!Number.isFinite(n) || !Number.isInteger(n) || n < 1) { + throw new RangeError( + `${key} must be a positive integer, got "${raw}"`, + ); + } + return n; +} + +function parseNonNegativeFloat(key: string, raw: string | undefined, fallback: number): number { + if (raw === undefined) return fallback; + const n = Number(raw); + if (!Number.isFinite(n) || n < 0) { + throw new RangeError( + `${key} must be a non-negative finite number, got "${raw}"`, + ); + } + return n; +} + +function parseFallbackPolicy( + raw: string | undefined, + fallback: CritiqueConfig['fallbackPolicy'], +): CritiqueConfig['fallbackPolicy'] { + if (raw === undefined) return fallback; + const trimmed = raw.trim(); + if (FALLBACK_POLICIES.includes(trimmed as CritiqueConfig['fallbackPolicy'])) { + return trimmed as CritiqueConfig['fallbackPolicy']; + } + throw new RangeError( + `OD_CRITIQUE_FALLBACK_POLICY must be one of ${FALLBACK_POLICIES.join(', ')}, got "${raw}"`, + ); +} diff --git a/apps/daemon/src/critique/orchestrator.ts b/apps/daemon/src/critique/orchestrator.ts new file mode 100644 index 000000000..1a8d5e1fe --- /dev/null +++ b/apps/daemon/src/critique/orchestrator.ts @@ -0,0 +1,710 @@ +import type { ChildProcess } from 'node:child_process'; +import type Database from 'better-sqlite3'; +import type { CritiqueConfig, PanelEvent } from '@open-design/contracts/critique'; +import { panelEventToSse } from '@open-design/contracts/critique'; +import type { CritiqueSseEvent } from '@open-design/contracts/critique'; +import { parseCritiqueStream } from './parser.js'; +import { + computeComposite, + decideRound, + selectFallbackRound, + type RoundState, +} from './scoreboard.js'; +import { + insertCritiqueRun, + updateCritiqueRun, + type CritiqueRunRow, +} from './persistence.js'; +import { writeTranscript } from './transcript.js'; +import { + MalformedBlockError, + OversizeBlockError, + MissingArtifactError, +} from './errors.js'; + +/** + * Tolerance used when comparing the agent-supplied composite attribute on + * / against the daemon's computed composite. Composites + * are weighted floats so a tiny FP delta is normal; anything larger than this + * is reported as a composite_mismatch parser warning. + */ +const COMPOSITE_TOLERANCE = 0.01; + +/** + * SSE bus contract: the orchestrator emits CritiqueSseEvent variants here so + * the existing /api/projects/:id/events stream can fan them out unchanged. + * Implementations should be non-blocking; backpressure is the caller's job. + */ +export interface CritiqueSseBus { + emit(event: CritiqueSseEvent): void; +} + +export interface OrchestratorParams { + runId: string; + projectId: string; + conversationId: string | null; + artifactId: string; + artifactDir: string; + adapter: string; + cfg: CritiqueConfig; + db: Database.Database; + bus: CritiqueSseBus; + /** + * Source of CLI stdout. The orchestrator is transport-agnostic: a real + * spawn wrapper passes the child process stdout, tests pass a synthetic + * iterable. + */ + stdout: AsyncIterable; + /** + * Optional abort signal. Aborting causes the orchestrator to flush + * best-so-far state and emit critique.interrupted before returning. + */ + signal?: AbortSignal; + /** + * Optional handle to the spawned child process. When provided the + * orchestrator calls child.kill('SIGTERM') on every non-clean termination + * path (timeout, abort, parser error, child non-zero exit). + */ + child?: Pick; + /** + * Resolves when the child process exits. Used to race parser completion + * against an early child exit so a non-zero exit code is classified as + * 'failed' rather than waiting for the parser to time out. + */ + childExitPromise?: Promise<{ code: number | null; signal: string | null }>; +} + +export interface OrchestratorResult { + status: CritiqueRunRow['status']; + composite: number | null; + rounds: CritiqueRunRow['rounds']; + transcriptPath: string | null; + artifactPath: string | null; +} + +/** + * Drives one Critique Theater run end-to-end: + * parse stdout -> collect events -> score per round -> persist -> emit SSE. + * + * @see specs/current/critique-theater.md § Wire protocol parser invariants + * and § Failure modes (recovery) + */ +export async function runOrchestrator( + params: OrchestratorParams, +): Promise { + const { runId, projectId, conversationId, artifactDir, adapter, cfg, db, bus, stdout } = params; + const signal = params.signal; + const child = params.child; + const childExitPromise = params.childExitPromise; + + // Defensive entry: validate every CritiqueConfig numeric field before any side effect. + if (!Number.isFinite(cfg.maxRounds) || cfg.maxRounds < 1) { + throw new RangeError(`runOrchestrator: cfg.maxRounds must be a positive integer, got ${cfg.maxRounds}`); + } + if (!Number.isFinite(cfg.scoreScale) || cfg.scoreScale < 1) { + throw new RangeError(`runOrchestrator: cfg.scoreScale must be a positive integer, got ${cfg.scoreScale}`); + } + if (!Number.isFinite(cfg.scoreThreshold) || cfg.scoreThreshold < 0) { + throw new RangeError(`runOrchestrator: cfg.scoreThreshold must be >= 0, got ${cfg.scoreThreshold}`); + } + if (!Number.isFinite(cfg.perRoundTimeoutMs) || cfg.perRoundTimeoutMs < 1) { + throw new RangeError(`runOrchestrator: cfg.perRoundTimeoutMs must be positive, got ${cfg.perRoundTimeoutMs}`); + } + if (!Number.isFinite(cfg.totalTimeoutMs) || cfg.totalTimeoutMs < 1) { + throw new RangeError(`runOrchestrator: cfg.totalTimeoutMs must be positive, got ${cfg.totalTimeoutMs}`); + } + if (!Number.isFinite(cfg.parserMaxBlockBytes) || cfg.parserMaxBlockBytes < 1) { + throw new RangeError(`runOrchestrator: cfg.parserMaxBlockBytes must be positive, got ${cfg.parserMaxBlockBytes}`); + } + + // 1. Insert a 'running' row. + insertCritiqueRun(db, { + id: runId, + projectId, + conversationId, + status: 'running', + protocolVersion: cfg.protocolVersion, + }); + + const collectedEvents: PanelEvent[] = []; + const roundStates = new Map(); + const completedRounds: RoundState[] = []; + let artifactPath: string | null = null; + let shipEvent: Extract | null = null; + let finalStatus: CritiqueRunRow['status'] = 'failed'; + let finalComposite: number | null = null; + let transcriptPath: string | null = null; + + // Total deadline. + const totalDeadline = Date.now() + cfg.totalTimeoutMs; + + // Helper: SIGTERM the child on non-clean termination paths. + const killChild = () => { child?.kill('SIGTERM'); }; + + // Build a rejection promise for early child exit with non-zero code or + // signal-terminated exit. Resolves (not rejects) only for a clean code 0 + // exit with no signal so the parser loop can finish naturally. A non-null + // signal means the child was killed (by us, by the user via /cancel, by + // the OS, etc.) and is treated as terminal so the orchestrator can persist + // 'interrupted' instead of falling through to the no-SHIP fallback path + // and reporting below_threshold for a user-cancelled run. + const childExitRace: Promise | null = childExitPromise + ? childExitPromise.then(({ code, signal: exitSignal }) => { + if (exitSignal !== null) { + return Promise.reject(new ChildSignaledError(exitSignal)); + } + if (code !== 0 && code !== null) { + return Promise.reject(new ChildExitError(code)); + } + // Clean exit with no signal: let the parser finish naturally. + return new Promise(() => { /* intentionally pending */ }); + }) + : null; + + try { + // Per-round timeout tracking. + let roundDeadline: number | null = null; + let currentRoundN: number | null = null; + + // Wrap parser with abort + total-timeout awareness. + const timedSource = applyTimeouts(stdout, { + signal, + totalDeadline, + getPerRoundDeadline: () => roundDeadline, + childExitRace, + }); + + const parserOpts = { + runId, + adapter, + parserMaxBlockBytes: cfg.parserMaxBlockBytes, + projectId, + artifactId: params.artifactId, + }; + + for await (const event of parseCritiqueStream(timedSource, parserOpts)) { + // Ship events are buffered, not emitted raw. The normalized ship event + // (with daemon-authoritative status/composite from decideRound(...)) + // is emitted after the loop so SSE clients and the transcript only + // ever see daemon-scored ship payloads, not the agent's raw claim. + if (event.type !== 'ship') { + collectedEvents.push(event); + bus.emit(panelEventToSse(event)); + } + + switch (event.type) { + case 'run_started': { + break; + } + + case 'panelist_open': { + if (!roundStates.has(event.round)) { + roundStates.set(event.round, { + n: event.round, + scores: {}, + mustFix: 0, + composite: 0, + }); + } + if (event.round !== currentRoundN) { + currentRoundN = event.round; + roundDeadline = Date.now() + cfg.perRoundTimeoutMs; + } + break; + } + + case 'panelist_close': { + const rs = roundStates.get(event.round); + if (rs !== undefined) { + rs.scores[event.role] = event.score; + rs.composite = computeComposite(rs.scores, cfg.weights); + } + break; + } + + case 'panelist_must_fix': { + const rs = roundStates.get(event.round); + if (rs !== undefined) { + rs.mustFix += 1; + } + break; + } + + case 'round_end': { + const rs = roundStates.get(event.round); + if (rs !== undefined) { + // Daemon-side composite (computed via configured weights from + // panelist_close events) is the source of truth. The agent's + // attribute is advisory: if it + // diverges beyond COMPOSITE_TOLERANCE we emit a composite_mismatch + // parser_warning, but the daemon value is what scores and persists. + // Same policy for mustFix, which is tallied from panelist_must_fix + // events. + if (Math.abs(event.composite - rs.composite) > COMPOSITE_TOLERANCE + || event.mustFix !== rs.mustFix) { + const warning: Extract = { + type: 'parser_warning', + runId, + kind: 'composite_mismatch', + position: 0, + }; + collectedEvents.push(warning); + bus.emit(panelEventToSse(warning)); + } + completedRounds.push({ ...rs }); + } + roundDeadline = null; + break; + } + + case 'ship': { + shipEvent = event; + break; + } + + case 'panelist_dim': { + // Extract designer round-1 ARTIFACT reference from dimNote is not + // our job here; artifact path comes from the ship event's artifactRef + // or from a panelist block. We store the artifactId from the ship event below. + break; + } + + default: + break; + } + } + + // 3. Determine final status and composite. + // + // The agent's raw was buffered (not emitted) by the parser loop + // above. We resolve it here against the daemon scoreboard, then emit a + // single normalized ship event so the transcript and SSE bus reflect the + // daemon-authoritative status/composite, not the agent's claim. + let resolvedShip = shipEvent; + if (resolvedShip !== null) { + const shippedRound = completedRounds.find((r) => r.n === resolvedShip!.round); + if (shippedRound === undefined) { + // The agent claimed a SHIP for a round that was never closed by the + // daemon. Trusting it would re-open the scoring-integrity hole this + // patch is meant to close, so we drop the agent ship, emit a + // parser_warning, and fall through to the no-SHIP fallback policy. + const warning: Extract = { + type: 'parser_warning', + runId, + kind: 'duplicate_ship', + position: 0, + }; + collectedEvents.push(warning); + bus.emit(panelEventToSse(warning)); + resolvedShip = null; + } + } + + if (resolvedShip !== null) { + // Daemon-authoritative scoring: derive status from decideRound(...) + // using the daemon's computed composite/mustFix rather than the + // agent's attributes. A composite + // divergence larger than COMPOSITE_TOLERANCE emits composite_mismatch. + const ship = resolvedShip; + const shippedRound = completedRounds.find((r) => r.n === ship.round)!; + if (Math.abs(ship.composite - shippedRound.composite) > COMPOSITE_TOLERANCE) { + const warning: Extract = { + type: 'parser_warning', + runId, + kind: 'composite_mismatch', + position: 0, + }; + collectedEvents.push(warning); + bus.emit(panelEventToSse(warning)); + } + const decision = decideRound(shippedRound.composite, shippedRound.mustFix, cfg); + finalStatus = decision === 'ship' ? 'shipped' : 'below_threshold'; + finalComposite = shippedRound.composite; + + // Emit the daemon-authoritative ship event. SSE clients and the + // transcript see this single normalized payload, never the raw agent + // claim from the buffered shipEvent. + const normalizedShip: Extract = { + type: 'ship', + runId, + round: shippedRound.n, + composite: shippedRound.composite, + status: finalStatus, + artifactRef: { projectId, artifactId: params.artifactId }, + summary: ship.summary, + }; + collectedEvents.push(normalizedShip); + bus.emit(panelEventToSse(normalizedShip)); + + // artifactPath stays null until a future phase actually extracts the + // body and writes it to disk. Persisting a synthesized + // path that no file occupies would let UI/replay/export code dereference + // a missing file. The transcript still carries the ship event with the + // artifact reference so consumers can find the run. + artifactPath = null; + } else { + // No SHIP arrived (or the agent SHIP was rejected as malformed above). + // Apply fallback policy over the daemon's closed rounds. + killChild(); + const fallback = selectFallbackRound(completedRounds, cfg.fallbackPolicy); + if (fallback !== null) { + finalStatus = 'below_threshold'; + finalComposite = fallback.composite; + // Emit a synthetic ship event. + const syntheticShip: Extract = { + type: 'ship', + runId, + round: fallback.n, + composite: fallback.composite, + status: 'below_threshold', + artifactRef: { projectId, artifactId: params.artifactId }, + summary: `Fallback: best round ${fallback.n} composite ${fallback.composite.toFixed(2)}`, + }; + collectedEvents.push(syntheticShip); + bus.emit(panelEventToSse(syntheticShip)); + } else { + finalStatus = 'failed'; + finalComposite = null; + const failedEvent: Extract = { + type: 'failed', + runId, + cause: 'orchestrator_internal', + }; + collectedEvents.push(failedEvent); + bus.emit(panelEventToSse(failedEvent)); + } + } + } catch (err) { + // All non-clean termination paths: SIGTERM the child. + killChild(); + + // Classify the error. + if (err instanceof AbortError) { + finalStatus = 'interrupted'; + // Defect 7: ship best-so-far when at least one round completed. + const fallback = completedRounds.length > 0 + ? selectFallbackRound(completedRounds, cfg.fallbackPolicy) + : null; + if (fallback !== null) { + finalComposite = fallback.composite; + const syntheticShip: Extract = { + type: 'ship', + runId, + round: fallback.n, + composite: fallback.composite, + status: 'interrupted', + artifactRef: { projectId, artifactId: params.artifactId }, + summary: `Interrupted after round ${fallback.n}, best composite ${fallback.composite.toFixed(2)}`, + }; + collectedEvents.push(syntheticShip); + bus.emit(panelEventToSse(syntheticShip)); + } + const interruptedEvent: Extract = { + type: 'interrupted', + runId, + bestRound: completedRounds.length > 0 ? (completedRounds[completedRounds.length - 1]?.n ?? 0) : 0, + composite: finalComposite ?? 0, + }; + collectedEvents.push(interruptedEvent); + bus.emit(panelEventToSse(interruptedEvent)); + } else if (err instanceof TimeoutError) { + finalStatus = 'timed_out'; + // Defect 7: ship best-so-far when at least one round completed. + const fallback = completedRounds.length > 0 + ? selectFallbackRound(completedRounds, cfg.fallbackPolicy) + : null; + if (fallback !== null) { + finalComposite = fallback.composite; + const syntheticShip: Extract = { + type: 'ship', + runId, + round: fallback.n, + composite: fallback.composite, + status: 'timed_out', + artifactRef: { projectId, artifactId: params.artifactId }, + summary: `Timed out after round ${fallback.n}, best composite ${fallback.composite.toFixed(2)}`, + }; + collectedEvents.push(syntheticShip); + bus.emit(panelEventToSse(syntheticShip)); + } + const failedEvent: Extract = { + type: 'failed', + runId, + cause: err.cause, + }; + collectedEvents.push(failedEvent); + bus.emit(panelEventToSse(failedEvent)); + } else if (err instanceof ChildExitError) { + finalStatus = 'failed'; + const failedEvent: Extract = { + type: 'failed', + runId, + cause: 'cli_exit_nonzero', + }; + collectedEvents.push(failedEvent); + bus.emit(panelEventToSse(failedEvent)); + } else if (err instanceof ChildSignaledError) { + // Signal-terminated child (e.g. SIGTERM from /api/runs/:id/cancel) + // is classified as 'interrupted' so the persisted critique row + // reflects the actual cause (user/operator interruption) rather + // than getting flushed through the no-SHIP fallback as + // 'below_threshold'. If at least one round closed cleanly, ship + // the best-so-far via selectFallbackRound, mirroring the abort path. + finalStatus = 'interrupted'; + const fallback = completedRounds.length > 0 + ? selectFallbackRound(completedRounds, cfg.fallbackPolicy) + : null; + if (fallback !== null) { + finalComposite = fallback.composite; + const syntheticShip: Extract = { + type: 'ship', + runId, + round: fallback.n, + composite: fallback.composite, + status: 'interrupted', + artifactRef: { projectId, artifactId: params.artifactId }, + summary: `Child terminated by signal ${err.signal} after round ${fallback.n}, best composite ${fallback.composite.toFixed(2)}`, + }; + collectedEvents.push(syntheticShip); + bus.emit(panelEventToSse(syntheticShip)); + } + const interruptedEvent: Extract = { + type: 'interrupted', + runId, + bestRound: completedRounds.length > 0 + ? (completedRounds[completedRounds.length - 1]?.n ?? 0) + : 0, + composite: finalComposite ?? 0, + }; + collectedEvents.push(interruptedEvent); + bus.emit(panelEventToSse(interruptedEvent)); + } else if ( + err instanceof MalformedBlockError || + err instanceof OversizeBlockError || + err instanceof MissingArtifactError + ) { + finalStatus = 'degraded'; + const reason = + err instanceof MalformedBlockError ? 'malformed_block' : + err instanceof OversizeBlockError ? 'oversize_block' : + 'missing_artifact'; + const degradedEvent: Extract = { + type: 'degraded', + runId, + reason, + adapter, + }; + collectedEvents.push(degradedEvent); + bus.emit(panelEventToSse(degradedEvent)); + } else { + finalStatus = 'failed'; + const failedEvent: Extract = { + type: 'failed', + runId, + cause: 'orchestrator_internal', + }; + collectedEvents.push(failedEvent); + bus.emit(panelEventToSse(failedEvent)); + } + } + + // Write transcript for all non-trivially-failed runs. + if (finalStatus !== 'failed' || collectedEvents.length > 0) { + try { + const result = await writeTranscript(artifactDir, collectedEvents); + transcriptPath = result.path; + } catch { + // Transcript write failure must not mask the primary outcome. + transcriptPath = null; + } + } + + // Build rounds summary for persistence. + const roundsSummary = completedRounds.map((r) => ({ + n: r.n, + composite: r.composite, + mustFix: r.mustFix, + decision: decideRound(r.composite, r.mustFix, cfg) as 'continue' | 'ship', + })); + + // Persist final state. + updateCritiqueRun(db, runId, { + status: finalStatus, + score: finalComposite, + rounds: roundsSummary, + transcriptPath, + artifactPath, + }); + + return { + status: finalStatus, + composite: finalComposite, + rounds: roundsSummary, + transcriptPath, + artifactPath, + }; +} + +// --------------------------------------------------------------------------- +// Internal timeout / abort utilities +// --------------------------------------------------------------------------- + +class AbortError extends Error { + constructor() { + super('run aborted'); + this.name = 'AbortError'; + } +} + +class TimeoutError extends Error { + constructor( + message: string, + public readonly cause: 'per_round_timeout' | 'total_timeout', + ) { + super(message); + this.name = 'TimeoutError'; + } +} + +/** Thrown when the child process exits with a non-zero code before the parser finishes. */ +class ChildExitError extends Error { + constructor(public readonly code: number) { + super(`child exited with code ${code}`); + this.name = 'ChildExitError'; + } +} + +/** + * Thrown when the child process is signal-terminated (SIGTERM, SIGINT, etc.) + * before the parser finishes. From the orchestrator's perspective this is + * always treated as 'interrupted': the daemon kills the child via + * /api/runs/:id/cancel, the user kills it manually, or the OS terminates it. + * Either way the run was cut short externally and shouldn't fall through to + * the no-SHIP fallback path that would persist below_threshold. + */ +class ChildSignaledError extends Error { + constructor(public readonly signal: string) { + super(`child terminated by signal ${signal}`); + this.name = 'ChildSignaledError'; + } +} + +interface TimeoutOptions { + signal: AbortSignal | undefined; + totalDeadline: number; + getPerRoundDeadline: () => number | null; + /** When provided, races each iteration against a child-exit rejection. */ + childExitRace: Promise | null; +} + +/** + * Builds a Promise that rejects with TimeoutError after delayMs, or resolves + * immediately when delayMs <= 0. Returns a cancel function to clear the timer. + */ +function makeTimeoutRace( + delayMs: number, + cause: 'per_round_timeout' | 'total_timeout', +): { promise: Promise; cancel: () => void } { + let timerId: ReturnType | undefined; + let rejectFn!: (e: TimeoutError) => void; + const promise = new Promise((_, reject) => { + rejectFn = reject; + if (delayMs <= 0) { + reject(new TimeoutError(`${cause} exceeded`, cause)); + } else { + timerId = setTimeout(() => reject(new TimeoutError(`${cause} exceeded`, cause)), delayMs); + } + }); + const cancel = () => { + if (timerId !== undefined) clearTimeout(timerId); + // Prevent unhandled rejection after cancel. + promise.catch(() => { /* intentionally swallowed */ }); + }; + void rejectFn; // suppress unused-variable warning + return { promise, cancel }; +} + +/** + * Wraps a source AsyncIterable with abort and real-timer timeout + * enforcement. Each call to iterator.next() is raced against the total- + * deadline timer and the current per-round deadline timer so stalling + * sources (no chunks arriving) are caught even when the source never yields. + */ +async function* applyTimeouts( + source: AsyncIterable, + opts: TimeoutOptions, +): AsyncIterable { + const iter = source[Symbol.asyncIterator](); + + // Keep a single total timer running for the full lifetime of the source. + const totalDelayMs = opts.totalDeadline - Date.now(); + const totalTimer = makeTimeoutRace(totalDelayMs, 'total_timeout'); + + try { + while (true) { + // Check abort eagerly before each iteration. + if (opts.signal?.aborted) { + throw new AbortError(); + } + + // Build per-round timer for this iteration. + const roundDeadline = opts.getPerRoundDeadline(); + const roundDelayMs = roundDeadline !== null ? roundDeadline - Date.now() : null; + let roundTimer: { promise: Promise; cancel: () => void } | null = null; + if (roundDelayMs !== null) { + roundTimer = makeTimeoutRace(roundDelayMs, 'per_round_timeout'); + } + + let iterResult: IteratorResult; + try { + const races: Promise[] = [iter.next(), totalTimer.promise]; + if (roundTimer !== null) races.push(roundTimer.promise); + + // AbortSignal race: if signal fires, reject immediately. + if (opts.signal) { + const abortPromise = new Promise((_, reject) => { + if (opts.signal!.aborted) { + reject(new AbortError()); + } else { + opts.signal!.addEventListener('abort', () => reject(new AbortError()), { once: true }); + } + }); + races.push(abortPromise); + } + + // Child-exit race: if the child exits non-zero before the parser + // finishes, surface ChildExitError so the run is classified as + // 'failed' with cause 'cli_exit_nonzero' rather than waiting for + // the total timeout. + if (opts.childExitRace !== null) { + races.push(opts.childExitRace); + } + + iterResult = await Promise.race(races) as IteratorResult; + } finally { + roundTimer?.cancel(); + } + + if (iterResult.done) { + break; + } + yield iterResult.value; + } + } finally { + totalTimer.cancel(); + // Give the underlying iterator a chance to clean up. Use a 200ms timeout + // so a stalling generator (e.g. one stuck in await new Promise(() => {})) + // never blocks the orchestrator teardown path indefinitely. + if (typeof iter.return === 'function') { + await Promise.race([ + iter.return().catch(() => { /* ignore cleanup errors */ }), + new Promise((resolve) => setTimeout(resolve, 200)), + ]); + } + } + + // Final abort check after source exhausted. + if (opts.signal?.aborted) { + throw new AbortError(); + } +} diff --git a/apps/daemon/src/critique/parser.ts b/apps/daemon/src/critique/parser.ts index 57ea1fd57..d79637cd6 100644 --- a/apps/daemon/src/critique/parser.ts +++ b/apps/daemon/src/critique/parser.ts @@ -5,6 +5,10 @@ export interface ParserOptions { runId: string; adapter: string; parserMaxBlockBytes: number; + /** Project identity threaded into ship event artifactRef. */ + projectId?: string; + /** Artifact identity threaded into ship event artifactRef. */ + artifactId?: string; } export async function* parseCritiqueStream( diff --git a/apps/daemon/src/critique/parsers/v1.ts b/apps/daemon/src/critique/parsers/v1.ts index 27d44cdce..c6e4cf7c3 100644 --- a/apps/daemon/src/critique/parsers/v1.ts +++ b/apps/daemon/src/critique/parsers/v1.ts @@ -23,8 +23,16 @@ interface State { // arrives intact in one chunk is rejected before its body is sliced and emitted. // The post-drain check on state.buf only catches *unclosed* runaway blocks. parserMaxBlockBytes: number; + // Threaded from parser options into ship event artifactRef so downstream + // consumers see the real run identity instead of empty placeholders. + projectId: string; + artifactId: string; inRun: boolean; currentRound: number | null; + // Count of events fired since the last opener. + // Used by the SHIP envelope guard: a SHIP that arrives before any round + // completes is malformed and must be rejected. + roundsClosed: number; shipSeen: boolean; designerArtifactInRound1: boolean; lastAdvance: number; @@ -32,7 +40,13 @@ interface State { export async function* parseV1( source: AsyncIterable, - opts: { runId: string; adapter: string; parserMaxBlockBytes: number }, + opts: { + runId: string; + adapter: string; + parserMaxBlockBytes: number; + projectId?: string; + artifactId?: string; + }, ): AsyncIterable { const state: State = { buf: '', @@ -42,8 +56,11 @@ export async function* parseV1( protocolVersion: 1, scoreScale: DEFAULT_SCORE_SCALE, parserMaxBlockBytes: opts.parserMaxBlockBytes, + projectId: opts.projectId ?? '', + artifactId: opts.artifactId ?? '', inRun: false, currentRound: null, + roundsClosed: 0, shipSeen: false, designerArtifactInRound1: false, lastAdvance: 0, @@ -275,6 +292,7 @@ function* drain(state: State): Generator { reason, }; state.currentRound = null; + state.roundsClosed += 1; cursor += closeIdx + ''.length; state.lastAdvance = state.consumed + cursor; continue; @@ -295,6 +313,15 @@ function* drain(state: State): Generator { state.consumed + cursor, ); } + // Envelope guard: SHIP must not arrive before at least one round has + // completed. A stream that skips directly from to + // bypasses the round-1 designer-artifact invariant. + if (state.roundsClosed === 0) { + throw new MalformedBlockError( + ` at position ${state.consumed + cursor} appeared before any `, + state.consumed + cursor, + ); + } const closeIdx = slice.indexOf(''); if (closeIdx < 0) break; const blockText = slice.slice(0, closeIdx + ''.length); @@ -329,6 +356,15 @@ function* drain(state: State): Generator { } const attrs = parseAttrs(slice.slice(' block is present inside . + const artifactMatch = inner.match(/]*>([\s\S]*?)<\/ARTIFACT>/); + if (!artifactMatch || artifactMatch[1] === undefined || artifactMatch[1].trim().length === 0) { + throw new MissingArtifactError( + ` at position ${state.consumed + cursor} contains no block or the block is empty`, + ); + } + const summary = (inner.match(/([\s\S]*?)<\/SUMMARY>/)?.[1] ?? '').trim(); const rawStatus = attrs['status'] ?? ''; @@ -345,7 +381,7 @@ function* drain(state: State): Generator { round: Number(attrs['round'] ?? '0'), composite: Number(attrs['composite'] ?? '0'), status, - artifactRef: { projectId: '', artifactId: '' }, + artifactRef: { projectId: state.projectId, artifactId: state.artifactId }, summary, }; cursor += closeIdx + ''.length; diff --git a/apps/daemon/src/critique/persistence.ts b/apps/daemon/src/critique/persistence.ts new file mode 100644 index 000000000..24466dfb0 --- /dev/null +++ b/apps/daemon/src/critique/persistence.ts @@ -0,0 +1,354 @@ +import type Database from 'better-sqlite3'; +import type { ShipStatus } from '@open-design/contracts/critique'; + +/** + * Final critique status persisted with each run. Mirrors the spec's CHECK + * constraint on critique_status. 'failed' covers orchestrator-level errors, + * 'legacy' marks rows produced before the feature shipped (reserved for the + * artifacts-on-disk backfill in Phase 15). + */ +export type CritiqueRunStatus = + | ShipStatus + | 'degraded' + | 'failed' + | 'legacy'; + +export const CRITIQUE_RUN_STATUSES: readonly CritiqueRunStatus[] = [ + 'shipped', + 'below_threshold', + 'timed_out', + 'interrupted', + 'degraded', + 'failed', + 'legacy', +]; + +// All values accepted by the DB CHECK constraint, including the in-flight value +// that the public type union deliberately omits. +const ALL_VALID_STATUSES: ReadonlySet = new Set([ + ...CRITIQUE_RUN_STATUSES, + 'running', +]); + +export interface CritiqueRoundSummary { + n: number; + composite: number; + mustFix: number; + decision: 'continue' | 'ship'; +} + +export interface CritiqueRunRow { + id: string; + projectId: string; + conversationId: string | null; + artifactPath: string | null; + status: CritiqueRunStatus; + score: number | null; + rounds: CritiqueRoundSummary[]; + transcriptPath: string | null; + protocolVersion: number; + createdAt: number; + updatedAt: number; +} + +export interface CritiqueRunInsert { + id: string; + projectId: string; + conversationId?: string | null; + artifactPath?: string | null; + /** Accepts 'running' in addition to the terminal statuses so callers can + * create in-flight rows without a type cast. */ + status: CritiqueRunStatus | 'running'; + score?: number | null; + rounds?: CritiqueRoundSummary[]; + transcriptPath?: string | null; + protocolVersion: number; + createdAt?: number; + updatedAt?: number; +} + +export interface CritiqueRunPatch { + status?: CritiqueRunStatus; + score?: number | null; + rounds?: CritiqueRoundSummary[]; + transcriptPath?: string | null; + artifactPath?: string | null; + updatedAt?: number; +} + +// Internal envelope stored in the rounds_json column. The rounds array is the +// primary payload; recoveryReason is written by reconcileStaleRuns. +interface RoundsPayload { + rounds: CritiqueRoundSummary[]; + recoveryReason?: string; +} + +function serializeRoundsPayload( + rounds: CritiqueRoundSummary[], + recoveryReason?: string, +): string { + if (recoveryReason === undefined) { + // Store a plain array when no envelope fields are needed, so reads + // handle both formats gracefully. + return JSON.stringify(rounds); + } + const payload: RoundsPayload = { rounds, recoveryReason }; + return JSON.stringify(payload); +} + +function parseRoundsPayload(json: string): { rounds: CritiqueRoundSummary[]; recoveryReason?: string } { + try { + const parsed: unknown = JSON.parse(json); + if (Array.isArray(parsed)) { + return { rounds: parsed as CritiqueRoundSummary[] }; + } + if (parsed !== null && typeof parsed === 'object') { + const obj = parsed as Record; + const rounds = Array.isArray(obj['rounds']) + ? (obj['rounds'] as CritiqueRoundSummary[]) + : []; + if (typeof obj['recoveryReason'] === 'string') { + return { rounds, recoveryReason: obj['recoveryReason'] }; + } + return { rounds }; + } + return { rounds: [] }; + } catch { + return { rounds: [] }; + } +} + +// Raw row shape as returned by better-sqlite3 (snake_case column aliases). +interface RawCritiqueRunRow { + id: string; + projectId: string; + conversationId: string | null; + artifactPath: string | null; + status: string; + score: number | null; + roundsJson: string; + transcriptPath: string | null; + protocolVersion: number; + createdAt: number; + updatedAt: number; +} + +function normalizeRow(raw: RawCritiqueRunRow): CritiqueRunRow { + const { rounds } = parseRoundsPayload(raw.roundsJson); + return { + id: raw.id, + projectId: raw.projectId, + conversationId: raw.conversationId, + artifactPath: raw.artifactPath, + status: raw.status as CritiqueRunStatus, + score: raw.score, + rounds, + transcriptPath: raw.transcriptPath, + protocolVersion: Number(raw.protocolVersion), + createdAt: Number(raw.createdAt), + updatedAt: Number(raw.updatedAt), + }; +} + +const COLS = ` + id, + project_id AS projectId, + conversation_id AS conversationId, + artifact_path AS artifactPath, + status, + score, + rounds_json AS roundsJson, + transcript_path AS transcriptPath, + protocol_version AS protocolVersion, + created_at AS createdAt, + updated_at AS updatedAt +`; + +/** + * Idempotent. Creates the critique_runs table and the supporting indexes if + * they don't exist. Safe to call from the existing migrate(db) flow on every + * daemon boot. + */ +export function migrateCritique(db: Database.Database): void { + db.exec(` + CREATE TABLE IF NOT EXISTS critique_runs ( + id TEXT PRIMARY KEY, + project_id TEXT NOT NULL, + conversation_id TEXT, + artifact_path TEXT, + status TEXT NOT NULL CHECK (status IN + ('shipped','below_threshold','timed_out','interrupted','degraded','failed','legacy','running')), + score REAL, + rounds_json TEXT NOT NULL DEFAULT '[]', + transcript_path TEXT, + protocol_version INTEGER NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE CASCADE, + FOREIGN KEY(conversation_id) REFERENCES conversations(id) ON DELETE SET NULL + ); + + CREATE INDEX IF NOT EXISTS idx_critique_runs_project + ON critique_runs(project_id, updated_at DESC); + + CREATE INDEX IF NOT EXISTS idx_critique_runs_status + ON critique_runs(status); + `); +} + +export function insertCritiqueRun( + db: Database.Database, + input: CritiqueRunInsert, +): CritiqueRunRow { + if (!ALL_VALID_STATUSES.has(input.status)) { + throw new RangeError( + `Invalid critique run status: "${input.status}". Must be one of: ${[...ALL_VALID_STATUSES].join(', ')}`, + ); + } + const now = Date.now(); + const rounds = input.rounds ?? []; + db.prepare( + `INSERT INTO critique_runs + (id, project_id, conversation_id, artifact_path, status, score, + rounds_json, transcript_path, protocol_version, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + ).run( + input.id, + input.projectId, + input.conversationId ?? null, + input.artifactPath ?? null, + input.status, + input.score ?? null, + serializeRoundsPayload(rounds), + input.transcriptPath ?? null, + input.protocolVersion, + input.createdAt ?? now, + input.updatedAt ?? now, + ); + const row = getCritiqueRun(db, input.id); + if (row === null) { + throw new Error(`Failed to fetch critique run after insert: ${input.id}`); + } + return row; +} + +export function getCritiqueRun( + db: Database.Database, + id: string, +): CritiqueRunRow | null { + const raw = db + .prepare(`SELECT ${COLS} FROM critique_runs WHERE id = ?`) + .get(id) as RawCritiqueRunRow | undefined; + return raw !== undefined ? normalizeRow(raw) : null; +} + +/** + * Updates the patch fields on an existing run. Returns the new row, or null + * when the id does not exist. Always updates updated_at. + */ +export function updateCritiqueRun( + db: Database.Database, + id: string, + patch: CritiqueRunPatch, +): CritiqueRunRow | null { + const existing = getCritiqueRun(db, id); + if (existing === null) return null; + + const now = Date.now(); + const updatedAt = patch.updatedAt ?? now; + const status = patch.status ?? existing.status; + const score = 'score' in patch ? patch.score ?? null : existing.score; + const rounds = patch.rounds ?? existing.rounds; + const transcriptPath = + 'transcriptPath' in patch + ? patch.transcriptPath ?? null + : existing.transcriptPath; + const artifactPath = + 'artifactPath' in patch + ? patch.artifactPath ?? null + : existing.artifactPath; + + db.prepare( + `UPDATE critique_runs + SET status = ?, + score = ?, + rounds_json = ?, + transcript_path = ?, + artifact_path = ?, + updated_at = ? + WHERE id = ?`, + ).run( + status, + score, + serializeRoundsPayload(rounds), + transcriptPath, + artifactPath, + updatedAt, + id, + ); + + return getCritiqueRun(db, id); +} + +export function listCritiqueRunsByProject( + db: Database.Database, + projectId: string, +): CritiqueRunRow[] { + const rows = db + .prepare( + `SELECT ${COLS} + FROM critique_runs + WHERE project_id = ? + ORDER BY updated_at DESC`, + ) + .all(projectId) as RawCritiqueRunRow[]; + return rows.map(normalizeRow); +} + +export function deleteCritiqueRun(db: Database.Database, id: string): void { + db.prepare(`DELETE FROM critique_runs WHERE id = ?`).run(id); +} + +/** + * Recovery scan called on daemon boot: any run still in a non-terminal status + * older than staleAfterMs is marked 'interrupted' with rounds_json.recoveryReason + * = 'daemon_restart'. Returns the count of rows mutated. + */ +export function reconcileStaleRuns( + db: Database.Database, + options: { staleAfterMs: number; now?: number }, +): number { + const now = options.now ?? Date.now(); + const cutoff = now - options.staleAfterMs; + + const reconcile = db.transaction(() => { + const staleRows = db + .prepare( + `SELECT ${COLS} + FROM critique_runs + WHERE status = 'running' + AND updated_at < ?`, + ) + .all(cutoff) as RawCritiqueRunRow[]; + + if (staleRows.length === 0) return 0; + + const update = db.prepare( + `UPDATE critique_runs + SET status = 'interrupted', + rounds_json = ?, + updated_at = ? + WHERE id = ?`, + ); + + for (const raw of staleRows) { + const { rounds } = parseRoundsPayload(raw.roundsJson); + const newPayload = serializeRoundsPayload(rounds, 'daemon_restart'); + update.run(newPayload, now, raw.id); + } + + return staleRows.length; + }); + + return reconcile() as number; +} diff --git a/apps/daemon/src/critique/scoreboard.ts b/apps/daemon/src/critique/scoreboard.ts new file mode 100644 index 000000000..779293f90 --- /dev/null +++ b/apps/daemon/src/critique/scoreboard.ts @@ -0,0 +1,91 @@ +import type { CritiqueConfig, PanelEvent, PanelistRole, RoundDecision } from '@open-design/contracts/critique'; + +/** + * Per-round scores indexed by panelist role. Absent roles are undefined. + * @see specs/current/critique-theater.md § Composite score formula + */ +export type RoleScores = Partial>; + +/** + * Accumulated state for a single round's scoring pass. + * @see specs/current/critique-theater.md § Composite score formula + */ +export interface RoundState { + n: number; + scores: RoleScores; + mustFix: number; + composite: number; +} + +/** + * Computes the weighted composite score for a set of panelist scores. + * Absent roles are excluded; weights redistribute proportionally over + * present roles only. Returns 0 when no role has a score. + * + * @see specs/current/critique-theater.md § Composite score formula + */ +export function computeComposite( + scores: RoleScores, + weights: CritiqueConfig['weights'], +): number { + const roles = Object.keys(scores) as PanelistRole[]; + const present = roles.filter((r) => scores[r] !== undefined); + if (present.length === 0) return 0; + + const totalWeight = present.reduce((s, r) => s + weights[r], 0); + if (totalWeight < 1e-9) return 0; + + return present.reduce((s, r) => { + const score = scores[r]; + if (score === undefined) return s; + return s + (weights[r] / totalWeight) * score; + }, 0); +} + +/** + * Applies the convergence rule: returns 'ship' when composite >= threshold + * (with float epsilon 1e-9) AND mustFix === 0; otherwise 'continue'. + * + * @see specs/current/critique-theater.md § Convergence rule + */ +export function decideRound( + composite: number, + mustFix: number, + cfg: CritiqueConfig, +): RoundDecision { + if (composite >= cfg.scoreThreshold - 1e-9 && mustFix === 0) { + return 'ship'; + } + return 'continue'; +} + +/** + * Selects the best round according to fallbackPolicy when no arrived. + * Returns the elected RoundState or null when the list is empty or policy + * is 'fail'. + * + * @see specs/current/critique-theater.md § Failure modes (recovery) + */ +export function selectFallbackRound( + rounds: RoundState[], + policy: CritiqueConfig['fallbackPolicy'], +): RoundState | null { + if (rounds.length === 0) return null; + if (policy === 'fail') return null; + if (policy === 'ship_last') { + const last = rounds[rounds.length - 1]; + return last ?? null; + } + // ship_best: highest composite; tie-break by highest round number + let best: RoundState | null = null; + for (const r of rounds) { + if ( + best === null || + r.composite > best.composite + 1e-9 || + (Math.abs(r.composite - best.composite) < 1e-9 && r.n > best.n) + ) { + best = r; + } + } + return best; +} diff --git a/apps/daemon/src/critique/transcript.ts b/apps/daemon/src/critique/transcript.ts new file mode 100644 index 000000000..6273ba8d9 --- /dev/null +++ b/apps/daemon/src/critique/transcript.ts @@ -0,0 +1,170 @@ +import { createReadStream, createWriteStream } from 'node:fs'; +import { mkdir, rename, rm, open } from 'node:fs/promises'; +import { createGzip, createGunzip } from 'node:zlib'; +import { createInterface } from 'node:readline'; +import { join } from 'node:path'; +import { pipeline } from 'node:stream/promises'; +import type { PanelEvent } from '@open-design/contracts/critique'; + +/** + * Default gzip threshold (256 KiB). Files whose cumulative UTF-8 byte size + * exceeds this value are written as .ndjson.gz; smaller files stay plain. + * @see specs/current/critique-theater.md § Persistence (transcript files) + */ +const DEFAULT_GZIP_THRESHOLD_BYTES = 256 * 1024; + +/** + * Write a sequence of PanelEvents as newline-delimited JSON to a transcript + * file under the artifact directory. Files larger than gzipThresholdBytes + * are gzipped to .ndjson.gz; smaller files stay as plain .ndjson. The + * threshold is applied to the cumulative UTF-8 byte size of the serialized + * payload, not the array length, so multibyte transcripts size correctly. + * + * Backpressure-aware: events are streamed via Node streams, so the writer + * never holds the full transcript in memory. + * + * Returns the path written (relative to artifactDir). Caller persists the + * relative path on the critique_runs row. + * + * @see specs/current/critique-theater.md § Persistence (transcript files) + */ +export async function writeTranscript( + artifactDir: string, + events: AsyncIterable | Iterable, + opts?: { gzipThresholdBytes?: number }, +): Promise<{ path: string; bytes: number; gzipped: boolean }> { + if (typeof artifactDir !== 'string' || artifactDir.length === 0) { + throw new RangeError('writeTranscript: artifactDir must be a non-empty string'); + } + if ( + events === null || + events === undefined || + (typeof events !== 'object' && typeof events !== 'function') + ) { + throw new RangeError('writeTranscript: events must be iterable'); + } + // Validate that the value is actually iterable / async-iterable. + const hasAsyncIter = Symbol.asyncIterator in (events as object); + const hasSyncIter = Symbol.iterator in (events as object); + if (!hasAsyncIter && !hasSyncIter) { + throw new RangeError('writeTranscript: events must be iterable'); + } + + const threshold = opts?.gzipThresholdBytes ?? DEFAULT_GZIP_THRESHOLD_BYTES; + + await mkdir(artifactDir, { recursive: true }); + + const tempPath = join(artifactDir, `transcript.tmp.${process.pid}.${Date.now()}.ndjson`); + const finalNdjson = join(artifactDir, 'transcript.ndjson'); + const finalGz = join(artifactDir, 'transcript.ndjson.gz'); + + let totalBytes = 0; + + // Stream events to temp file, accumulating byte count. + const ws = createWriteStream(tempPath, { encoding: 'utf8' }); + + try { + await new Promise((resolve, reject) => { + ws.on('error', reject); + ws.on('finish', resolve); + + (async () => { + try { + for await (const event of events as AsyncIterable) { + const line = JSON.stringify(event) + '\n'; + const lineBytes = Buffer.byteLength(line, 'utf8'); + totalBytes += lineBytes; + const ok = ws.write(line); + if (!ok) { + // Backpressure: wait for drain before continuing. + await new Promise((res, rej) => { + ws.once('drain', res); + ws.once('error', rej); + }); + } + } + ws.end(); + } catch (err) { + ws.destroy(err instanceof Error ? err : new Error(String(err))); + reject(err); + } + })(); + }); + + const gzipped = totalBytes > threshold; + + if (gzipped) { + // Write gzip output to a temp file first, fsync, then atomic-rename. + // A crash mid-write leaves the .gz.tmp but never the final .gz, so + // partial files can't be mistaken for valid data on the next read. + const gzTempPath = join(artifactDir, `transcript.tmp.${process.pid}.${Date.now()}.ndjson.gz.tmp`); + try { + await pipeline( + createReadStream(tempPath), + createGzip(), + createWriteStream(gzTempPath), + ); + // fsync: flush OS write buffers before rename so crash after rename + // cannot leave a zero-length .gz. + const fh = await open(gzTempPath, 'r+'); + try { + await fh.sync(); + } finally { + await fh.close(); + } + await rename(gzTempPath, finalGz); + } catch (gzErr) { + // Unlink the .gz.tmp so no partial file lingers. + await rm(gzTempPath, { force: true }); + throw gzErr; + } + await rm(tempPath, { force: true }); + return { path: 'transcript.ndjson.gz', bytes: totalBytes, gzipped: true }; + } else { + await rename(tempPath, finalNdjson); + return { path: 'transcript.ndjson', bytes: totalBytes, gzipped: false }; + } + } catch (err) { + // Ensure temp file is cleaned up on any failure. + await rm(tempPath, { force: true }); + throw err; + } +} + +/** + * Inverse of writeTranscript. Streams a transcript file (.ndjson or .ndjson.gz) + * back out as PanelEvents. Used by replay paths and by Phase 11 e2e. + * + * @see specs/current/critique-theater.md § Persistence (transcript files) + */ +export async function* readTranscript( + artifactDir: string, + fileName: string, +): AsyncIterable { + if (!fileName.endsWith('.ndjson') && !fileName.endsWith('.ndjson.gz')) { + throw new RangeError( + `readTranscript: unknown extension on "${fileName}", expected .ndjson or .ndjson.gz`, + ); + } + + const filePath = join(artifactDir, fileName); + const isGz = fileName.endsWith('.ndjson.gz'); + + const fileStream = createReadStream(filePath); + const source: NodeJS.ReadableStream = isGz + ? fileStream.pipe(createGunzip()) + : fileStream; + + const rl = createInterface({ + input: source as unknown as NodeJS.ReadableStream, + crlfDelay: Infinity, + }); + + for await (const line of rl) { + const trimmed = line.trim(); + if (trimmed.length === 0) continue; + const event = JSON.parse(trimmed) as PanelEvent; + yield event; + } +} + diff --git a/apps/daemon/src/db.ts b/apps/daemon/src/db.ts index bb8529619..9c470754d 100644 --- a/apps/daemon/src/db.ts +++ b/apps/daemon/src/db.ts @@ -9,6 +9,7 @@ import Database from 'better-sqlite3'; import path from 'node:path'; import fs from 'node:fs'; import { randomUUID } from 'node:crypto'; +import { migrateCritique } from './critique/persistence.js'; let dbInstance = null; let dbFile = null; @@ -180,6 +181,7 @@ function migrate(db) { if (!deploymentCols.some((c) => c.name === 'reachable_at')) { db.exec(`ALTER TABLE deployments ADD COLUMN reachable_at INTEGER`); } + migrateCritique(db); } // ---------- deployments ---------- diff --git a/apps/daemon/src/server.ts b/apps/daemon/src/server.ts index 7ea09704c..eb67c9b20 100644 --- a/apps/daemon/src/server.ts +++ b/apps/daemon/src/server.ts @@ -28,6 +28,9 @@ import { listDesignSystems, readDesignSystem } from './design-systems.js'; import { attachAcpSession } from './acp.js'; import { attachPiRpcSession } from './pi-rpc.js'; import { createClaudeStreamHandler } from './claude-stream.js'; +import { loadCritiqueConfigFromEnv } from './critique/config.js'; +import { reconcileStaleRuns } from './critique/persistence.js'; +import { runOrchestrator } from './critique/orchestrator.js'; import { createCopilotStreamHandler } from './copilot-stream.js'; import { createJsonEventStreamHandler } from './json-event-stream.js'; import { subscribe as subscribeFileEvents } from './project-watchers.js'; @@ -346,6 +349,16 @@ const ARTIFACTS_DIR = path.join(RUNTIME_DATA_DIR, 'artifacts'); const PROJECTS_DIR = path.join(RUNTIME_DATA_DIR, 'projects'); fs.mkdirSync(PROJECTS_DIR, { recursive: true }); +// Load Critique Theater config once at startup so a bad OD_CRITIQUE_* value +// surfaces immediately as a boot-time RangeError instead of silently at +// run time. Default: enabled=false (M0 dark launch). +const critiqueCfg = loadCritiqueConfigFromEnv(); +// Tracks adapter streamFormat values that have already received a one-time +// warning explaining why the Critique Theater orchestrator was bypassed. +// Adapter denylist for orchestrator routing is implicit: anything that is +// not the 'plain' streamFormat falls through to legacy single-pass. +const critiqueWarnedAdapters = new Set(); + export const SSE_KEEPALIVE_INTERVAL_MS = 25_000; export function normalizeProjectDisplayStatus(status) { @@ -736,6 +749,16 @@ export async function startServer({ port = 7456, host = process.env.OD_BIND_HOST }); const db = openDatabase(PROJECT_ROOT, { dataDir: RUNTIME_DATA_DIR }); + // Boot reconcile: any critique_runs row left in 'running' state by a prior + // daemon crash gets flipped to 'interrupted' with rounds_json.recoveryReason + // = 'daemon_restart' so the spec's daemon-restart-mid-run failure mode is + // honored on every boot. staleAfterMs comes from CritiqueConfig, not a + // hardcoded constant. + const reconciledStaleRuns = reconcileStaleRuns(db, { staleAfterMs: critiqueCfg.totalTimeoutMs }); + if (reconciledStaleRuns > 0) { + console.warn(`[critique] reconcileStaleRuns flipped ${reconciledStaleRuns} stale running row(s) to interrupted`); + } + if (process.env.OD_CODEX_DISABLE_PLUGINS === '1') { console.log('[od] Codex plugins disabled via OD_CODEX_DISABLE_PLUGINS=1'); } @@ -2997,6 +3020,87 @@ export async function startServer({ port = 7456, host = process.env.OD_BIND_HOST child.stdout.setEncoding('utf8'); child.stderr.setEncoding('utf8'); + // Critique Theater branch (M0 dark launch, default disabled). + // Only plain-stream adapters are routed through runOrchestrator in v1. + // Adapters that emit structured wrappers (claude-stream-json, + // copilot-stream-json, json-event-stream, acp-json-rpc, pi-rpc) fall + // through to the legacy single-pass code path below with a one-time + // stderr warning so the parser never sees wrapper bytes. Per-format + // decoding into the orchestrator is a v2 concern. + if (critiqueCfg.enabled) { + const adapterStreamFormat: string = def.streamFormat ?? 'plain'; + if (adapterStreamFormat !== 'plain') { + if (!critiqueWarnedAdapters.has(adapterStreamFormat)) { + critiqueWarnedAdapters.add(adapterStreamFormat); + console.warn(`[critique] adapter format=${adapterStreamFormat} is not plain-stream; skipping orchestrator and falling through to legacy generation`); + } + } else { + const critiqueRunId = run.id; + // Per-run artifact directory keeps concurrent or sequential runs in the + // same project from overwriting each other's transcript or final HTML. + // Spec: artifacts///transcript.ndjson(.gz). + const critiqueProjectKey = typeof projectId === 'string' && projectId ? projectId : critiqueRunId; + const critiqueArtifactDir = path.join(ARTIFACTS_DIR, critiqueProjectKey, critiqueRunId); + const stdoutIterable = (async function* () { + for await (const chunk of child.stdout) yield String(chunk); + })(); + const critiqueBus = { emit: (e) => send('agent', e) }; + + // Stderr forwarding and child.on('error') must be wired BEFORE the + // orchestrator awaits stdout. Otherwise a CLI that floods stderr can + // fill the OS pipe and deadlock the run until the total timeout, and + // an early child error fired before the orchestrator returns has no + // listener. Both registrations are idempotent and the run lifecycle + // is owned solely by the orchestrator's awaited result below. + child.stderr.on('data', (chunk) => send('stderr', { chunk })); + child.on('error', (err) => { + send('error', createSseErrorPayload('AGENT_EXECUTION_FAILED', err.message)); + }); + + // Wrap the child's close event so the orchestrator can race child + // exit against parser completion, abort, and timeouts in one awaited + // flow. Without this the orchestrator can't tell a non-zero exit + // apart from a clean ship and may misclassify failures. + const childExitPromise = new Promise<{ code: number | null; signal: NodeJS.Signals | null }>((resolve) => { + child.once('close', (code, signal) => resolve({ code, signal })); + }); + try { + const orchestratorResult = await runOrchestrator({ + runId: critiqueRunId, + projectId: typeof projectId === 'string' ? projectId : '', + conversationId: typeof conversationId === 'string' ? conversationId : null, + artifactId: critiqueRunId, + artifactDir: critiqueArtifactDir, + adapter: typeof agentId === 'string' ? agentId : 'unknown', + cfg: critiqueCfg, + db, + bus: critiqueBus, + stdout: stdoutIterable, + child, + childExitPromise, + }); + // Map the critique terminal status to the chat run lifecycle. + // 'shipped' and 'below_threshold' both ran to a ship decision and + // finalize as 'succeeded'; every other status (timed_out, + // interrupted, degraded, failed, legacy) is a failure path so the + // run reflects the real outcome instead of a misleading success. + const succeeded = orchestratorResult.status === 'shipped' + || orchestratorResult.status === 'below_threshold'; + if (run.cancelRequested) { + design.runs.finish(run, 'canceled', 1, null); + } else if (succeeded) { + design.runs.finish(run, 'succeeded', 0, null); + } else { + design.runs.finish(run, 'failed', 1, null); + } + } catch (err) { + send('error', createSseErrorPayload('AGENT_EXECUTION_FAILED', err instanceof Error ? err.message : String(err))); + design.runs.finish(run, 'failed', 1, null); + } + return; + } + } + // Structured streams (Claude Code) go through a line-delimited JSON // parser that turns stream_event objects into UI-friendly events. For // plain streams (most other CLIs) we forward raw chunks unchanged so diff --git a/apps/daemon/tests/critique-authority.test.ts b/apps/daemon/tests/critique-authority.test.ts new file mode 100644 index 000000000..49bc2ee63 --- /dev/null +++ b/apps/daemon/tests/critique-authority.test.ts @@ -0,0 +1,296 @@ +/** + * Regression tests for the round 2 review feedback on PR #481: + * - Daemon-computed composite is authoritative; agent-supplied + * and are advisory. + * - When SHIP refers to a round whose daemon composite is below the + * configured threshold, the run finalizes as below_threshold even when + * the agent claimed status="shipped". + * - A composite divergence beyond COMPOSITE_TOLERANCE emits a + * composite_mismatch parser_warning event. + */ +import { describe, it, expect, beforeEach, afterEach } from 'vitest'; +import { mkdtempSync } from 'node:fs'; +import { rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import Database from 'better-sqlite3'; +import { migrateCritique, getCritiqueRun } from '../src/critique/persistence.js'; +import { runOrchestrator, type CritiqueSseBus } from '../src/critique/orchestrator.js'; +import type { CritiqueSseEvent } from '@open-design/contracts/critique'; +import { defaultCritiqueConfig } from '@open-design/contracts/critique'; + +function freshDb(): Database.Database { + const db = new Database(':memory:'); + db.pragma('journal_mode = WAL'); + db.pragma('foreign_keys = ON'); + db.exec(` + CREATE TABLE projects ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL + ); + CREATE TABLE conversations ( + id TEXT PRIMARY KEY, + project_id TEXT NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE CASCADE + ); + INSERT INTO projects (id, name, created_at, updated_at) VALUES ('p1', 'p1', 0, 0); + INSERT INTO conversations (id, project_id, created_at, updated_at) VALUES ('c1', 'p1', 0, 0); + `); + migrateCritique(db); + return db; +} + +function makeBus(): { bus: CritiqueSseBus; events: CritiqueSseEvent[] } { + const events: CritiqueSseEvent[] = []; + const bus: CritiqueSseBus = { emit: (e) => { events.push(e); } }; + return { bus, events }; +} + +async function* streamOf(text: string, chunkSize = 64): AsyncIterable { + for (let i = 0; i < text.length; i += chunkSize) { + yield text.slice(i, i + chunkSize); + } +} + +let tmpDir: string; +let db: Database.Database; + +beforeEach(() => { + tmpDir = mkdtempSync(join(tmpdir(), 'od-authority-test-')); + db = freshDb(); +}); + +afterEach(async () => { + db.close(); + await rm(tmpDir, { recursive: true, force: true }); +}); + +/** + * One round, all panelists score ~6.0 so the daemon-computed composite is + * well below the default threshold of 8.0. The agent lies in both + * and to + * try to force a ship despite low panelist scores. + */ +function lyingShipStream(): string { + return ` + + + v1 + ]]> + + + ok + + + ok + + + ok + + + ok + + + liar + + + + fake]]> + Pretending we shipped. + +`; +} + +/** Agent claims a slightly different composite than the daemon will compute, + * but well within threshold. Used to exercise composite_mismatch warning + * without flipping the ship decision. */ +function nearMissCompositeStream(): string { + return ` + + + v1 + ]]> + + + good + + + good + + + good + + + good + + + arithmetic skipped + + + + x]]> + Wrong composite reported but real scores are high. + +`; +} + +describe('orchestrator daemon-authoritative scoring (PR #481 round 2 review)', () => { + it('SHIP claiming shipped is downgraded to below_threshold when daemon composite is below threshold', async () => { + const { bus, events } = makeBus(); + const artifactDir = join(tmpDir, 'authority-1'); + + const result = await runOrchestrator({ + runId: 'r-lying', + projectId: 'p1', + conversationId: null, + artifactId: 'a1', + artifactDir, + adapter: 'claude', + cfg: defaultCritiqueConfig(), + db, + bus, + stdout: streamOf(lyingShipStream()), + }); + + expect(result.status).toBe('below_threshold'); + expect(result.composite).not.toBeNull(); + expect(result.composite!).toBeLessThan(8.0); + const row = getCritiqueRun(db, 'r-lying'); + expect(row?.status).toBe('below_threshold'); + expect(row?.score).toBeLessThan(8.0); + const shipEvents = events.filter((e) => e.event === 'critique.ship'); + expect(shipEvents).toHaveLength(1); + // Round 4 review: the SSE bus must only see the daemon-authoritative + // ship payload, never the agent's raw claim. + const shipPayload = shipEvents[0]?.data as { status: string; composite: number } | undefined; + expect(shipPayload?.status).toBe('below_threshold'); + expect(shipPayload?.composite).toBeLessThan(8.0); + }); + + it('SHIP referencing an unclosed round is dropped, parser_warning emitted, fallback selected', async () => { + const { bus, events } = makeBus(); + const artifactDir = join(tmpDir, 'authority-4'); + + // Round 1 closes with low scores. Round 2 is opened but never closed. + // The agent then ships round 2 with a high composite. The daemon must + // refuse to score against an unclosed round and fall back to round 1. + const stream = ` + + + v1 + ]]> + + ok + ok + ok + ok + continue + + + ]]> + Forged ship for an unclosed round. + +`; + + const result = await runOrchestrator({ + runId: 'r-unclosed', + projectId: 'p1', + conversationId: null, + artifactId: 'a1', + artifactDir, + adapter: 'claude', + cfg: defaultCritiqueConfig(), + db, + bus, + stdout: streamOf(stream), + }); + + expect(result.status).toBe('below_threshold'); + expect(result.composite).not.toBeNull(); + expect(result.composite!).toBeLessThan(8.0); + + // Exactly one synthetic ship from the fallback path. The agent's forged + // ship for the unclosed round must NOT appear on the SSE bus. + const shipEvents = events.filter((e) => e.event === 'critique.ship'); + expect(shipEvents).toHaveLength(1); + const shipPayload = shipEvents[0]?.data as { round: number; status: string } | undefined; + expect(shipPayload?.round).toBe(1); + expect(shipPayload?.status).toBe('below_threshold'); + + // A parser_warning must have been emitted to flag the rejected SHIP. + const warnings = events.filter((e) => e.event === 'critique.parser_warning'); + expect(warnings.length).toBeGreaterThanOrEqual(1); + }); + + it('emits composite_mismatch parser_warning when ROUND_END/SHIP composite diverges beyond tolerance', async () => { + const { bus, events } = makeBus(); + const artifactDir = join(tmpDir, 'authority-2'); + + await runOrchestrator({ + runId: 'r-mismatch', + projectId: 'p1', + conversationId: null, + artifactId: 'a1', + artifactDir, + adapter: 'claude', + cfg: defaultCritiqueConfig(), + db, + bus, + stdout: streamOf(nearMissCompositeStream()), + }); + + const warnings = events.filter((e) => e.event === 'critique.parser_warning'); + expect(warnings.length).toBeGreaterThanOrEqual(1); + const mismatch = warnings.find((e) => 'kind' in e.data && e.data.kind === 'composite_mismatch'); + expect(mismatch).toBeDefined(); + }); + + it('does not emit composite_mismatch when agent and daemon agree within tolerance', async () => { + const { bus, events } = makeBus(); + const artifactDir = join(tmpDir, 'authority-3'); + + // Build a stream where ROUND_END composite matches the weighted sum exactly. + // Default weights: critic=0.4, brand=0.2, a11y=0.2, copy=0.2; all 9.0 -> 9.0. + const aligned = ` + + + v1 + ]]> + + ok + ok + ok + ok + ok + + + ]]> + aligned + +`; + + await runOrchestrator({ + runId: 'r-aligned', + projectId: 'p1', + conversationId: null, + artifactId: 'a1', + artifactDir, + adapter: 'claude', + cfg: defaultCritiqueConfig(), + db, + bus, + stdout: streamOf(aligned), + }); + + const compositeWarnings = events.filter( + (e) => e.event === 'critique.parser_warning' + && 'kind' in e.data + && e.data.kind === 'composite_mismatch', + ); + expect(compositeWarnings).toHaveLength(0); + }); +}); diff --git a/apps/daemon/tests/critique-boot-reconcile.test.ts b/apps/daemon/tests/critique-boot-reconcile.test.ts new file mode 100644 index 000000000..69a1d1c04 --- /dev/null +++ b/apps/daemon/tests/critique-boot-reconcile.test.ts @@ -0,0 +1,136 @@ +/** + * Boot-reconcile tests for Critique Theater (Defect 6). + * + * Verifies that reconcileStaleRuns is called on daemon boot (simulated here + * by calling it directly as the server would) and that it flips old 'running' + * rows to 'interrupted' with recoveryReason='daemon_restart'. + */ +import { describe, it, expect, beforeEach, afterEach } from 'vitest'; +import { mkdtempSync } from 'node:fs'; +import { rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import Database from 'better-sqlite3'; +import { + migrateCritique, + insertCritiqueRun, + getCritiqueRun, + reconcileStaleRuns, +} from '../src/critique/persistence.js'; +import { defaultCritiqueConfig } from '@open-design/contracts/critique'; + +function freshDb(): Database.Database { + const db = new Database(':memory:'); + db.pragma('journal_mode = WAL'); + db.pragma('foreign_keys = ON'); + db.exec(` + CREATE TABLE projects ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL + ); + CREATE TABLE conversations ( + id TEXT PRIMARY KEY, + project_id TEXT NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE CASCADE + ); + INSERT INTO projects (id, name, created_at, updated_at) VALUES ('p1', 'p1', 0, 0); + `); + migrateCritique(db); + return db; +} + +let tmpDir: string; +let db: Database.Database; + +beforeEach(() => { + tmpDir = mkdtempSync(join(tmpdir(), 'od-boot-reconcile-test-')); + db = freshDb(); +}); +afterEach(async () => { + db.close(); + await rm(tmpDir, { recursive: true, force: true }); +}); + +describe('boot reconcile (Defect 6)', () => { + it('seeds an old running row then flips it to interrupted on simulated boot', () => { + const cfg = defaultCritiqueConfig(); + const staleAfterMs = cfg.totalTimeoutMs; + + // Insert a 'running' row whose updated_at is older than staleAfterMs. + const oldTs = Date.now() - staleAfterMs - 10_000; + insertCritiqueRun(db, { + id: 'stale-run-1', + projectId: 'p1', + conversationId: null, + status: 'running', + protocolVersion: 1, + createdAt: oldTs, + updatedAt: oldTs, + }); + + // Simulate what the daemon boot path does after openDatabase. + const flipped = reconcileStaleRuns(db, { staleAfterMs }); + expect(flipped).toBe(1); + + // The row should now be 'interrupted' with recoveryReason='daemon_restart'. + const row = getCritiqueRun(db, 'stale-run-1'); + expect(row?.status).toBe('interrupted'); + // rounds_json is accessible via row.rounds; the recoveryReason is an internal + // field not exposed on CritiqueRunRow. Access the raw value via the DB directly. + const raw = db + .prepare(`SELECT rounds_json FROM critique_runs WHERE id = ?`) + .get('stale-run-1') as { rounds_json: string } | undefined; + const payload = raw ? (JSON.parse(raw.rounds_json) as { recoveryReason?: string }) : {}; + expect(payload.recoveryReason).toBe('daemon_restart'); + }); + + it('does not flip a recently-running row (within staleAfterMs)', () => { + const cfg = defaultCritiqueConfig(); + const staleAfterMs = cfg.totalTimeoutMs; + + // Insert a 'running' row whose updated_at is recent (not stale). + const recentTs = Date.now() - 100; + insertCritiqueRun(db, { + id: 'fresh-run-1', + projectId: 'p1', + conversationId: null, + status: 'running', + protocolVersion: 1, + createdAt: recentTs, + updatedAt: recentTs, + }); + + const flipped = reconcileStaleRuns(db, { staleAfterMs }); + expect(flipped).toBe(0); + + const row = getCritiqueRun(db, 'fresh-run-1'); + expect(row?.status).toBe('running'); + }); + + it('is idempotent: a second call on the same db flips 0 rows', () => { + const cfg = defaultCritiqueConfig(); + const staleAfterMs = cfg.totalTimeoutMs; + + const oldTs = Date.now() - staleAfterMs - 10_000; + insertCritiqueRun(db, { + id: 'stale-run-2', + projectId: 'p1', + conversationId: null, + status: 'running', + protocolVersion: 1, + createdAt: oldTs, + updatedAt: oldTs, + }); + + const first = reconcileStaleRuns(db, { staleAfterMs }); + expect(first).toBe(1); + + // Second call: the row is now 'interrupted', not 'running', so nothing more to flip. + const second = reconcileStaleRuns(db, { staleAfterMs }); + expect(second).toBe(0); + }); +}); diff --git a/apps/daemon/tests/critique-config.test.ts b/apps/daemon/tests/critique-config.test.ts new file mode 100644 index 000000000..39e0f187b --- /dev/null +++ b/apps/daemon/tests/critique-config.test.ts @@ -0,0 +1,154 @@ +import { describe, it, expect } from 'vitest'; +import { defaultCritiqueConfig } from '@open-design/contracts/critique'; +import { loadCritiqueConfigFromEnv } from '../src/critique/config.js'; + +describe('loadCritiqueConfigFromEnv', () => { + it('returns defaults when env is empty', () => { + const cfg = loadCritiqueConfigFromEnv({}); + const defaults = defaultCritiqueConfig(); + expect(cfg).toEqual(defaults); + }); + + it('OD_CRITIQUE_ENABLED=true enables the feature', () => { + const cfg = loadCritiqueConfigFromEnv({ OD_CRITIQUE_ENABLED: 'true' }); + expect(cfg.enabled).toBe(true); + }); + + it('OD_CRITIQUE_ENABLED=1 enables the feature', () => { + const cfg = loadCritiqueConfigFromEnv({ OD_CRITIQUE_ENABLED: '1' }); + expect(cfg.enabled).toBe(true); + }); + + it('OD_CRITIQUE_ENABLED=yes enables the feature', () => { + const cfg = loadCritiqueConfigFromEnv({ OD_CRITIQUE_ENABLED: 'yes' }); + expect(cfg.enabled).toBe(true); + }); + + it('OD_CRITIQUE_ENABLED=false keeps feature disabled', () => { + const cfg = loadCritiqueConfigFromEnv({ OD_CRITIQUE_ENABLED: 'false' }); + expect(cfg.enabled).toBe(false); + }); + + it('OD_CRITIQUE_ENABLED=0 keeps feature disabled', () => { + const cfg = loadCritiqueConfigFromEnv({ OD_CRITIQUE_ENABLED: '0' }); + expect(cfg.enabled).toBe(false); + }); + + it('OD_CRITIQUE_ENABLED=anything-else keeps feature disabled', () => { + const cfg = loadCritiqueConfigFromEnv({ OD_CRITIQUE_ENABLED: 'enabled' }); + expect(cfg.enabled).toBe(false); + }); + + it('OD_CRITIQUE_MAX_ROUNDS maps correctly', () => { + const cfg = loadCritiqueConfigFromEnv({ OD_CRITIQUE_MAX_ROUNDS: '5' }); + expect(cfg.maxRounds).toBe(5); + }); + + it('OD_CRITIQUE_SCORE_THRESHOLD maps correctly', () => { + const cfg = loadCritiqueConfigFromEnv({ OD_CRITIQUE_SCORE_THRESHOLD: '7.5' }); + expect(cfg.scoreThreshold).toBeCloseTo(7.5); + }); + + it('OD_CRITIQUE_SCORE_SCALE maps correctly', () => { + const cfg = loadCritiqueConfigFromEnv({ OD_CRITIQUE_SCORE_SCALE: '20' }); + expect(cfg.scoreScale).toBe(20); + }); + + it('OD_CRITIQUE_PER_ROUND_TIMEOUT_MS maps correctly', () => { + const cfg = loadCritiqueConfigFromEnv({ OD_CRITIQUE_PER_ROUND_TIMEOUT_MS: '60000' }); + expect(cfg.perRoundTimeoutMs).toBe(60000); + }); + + it('OD_CRITIQUE_TOTAL_TIMEOUT_MS maps correctly', () => { + const cfg = loadCritiqueConfigFromEnv({ OD_CRITIQUE_TOTAL_TIMEOUT_MS: '300000' }); + expect(cfg.totalTimeoutMs).toBe(300000); + }); + + it('OD_CRITIQUE_PARSER_MAX_BLOCK_BYTES maps correctly', () => { + const cfg = loadCritiqueConfigFromEnv({ OD_CRITIQUE_PARSER_MAX_BLOCK_BYTES: '131072' }); + expect(cfg.parserMaxBlockBytes).toBe(131072); + }); + + it('OD_CRITIQUE_FALLBACK_POLICY=ship_last maps correctly', () => { + const cfg = loadCritiqueConfigFromEnv({ OD_CRITIQUE_FALLBACK_POLICY: 'ship_last' }); + expect(cfg.fallbackPolicy).toBe('ship_last'); + }); + + it('OD_CRITIQUE_FALLBACK_POLICY=fail maps correctly', () => { + const cfg = loadCritiqueConfigFromEnv({ OD_CRITIQUE_FALLBACK_POLICY: 'fail' }); + expect(cfg.fallbackPolicy).toBe('fail'); + }); + + it('OD_CRITIQUE_FALLBACK_POLICY=ship_best maps correctly', () => { + const cfg = loadCritiqueConfigFromEnv({ OD_CRITIQUE_FALLBACK_POLICY: 'ship_best' }); + expect(cfg.fallbackPolicy).toBe('ship_best'); + }); + + // Invalid values throw RangeError at boot. + it('non-numeric OD_CRITIQUE_MAX_ROUNDS throws RangeError', () => { + expect(() => loadCritiqueConfigFromEnv({ OD_CRITIQUE_MAX_ROUNDS: 'abc' })).toThrow(RangeError); + }); + + it('negative OD_CRITIQUE_MAX_ROUNDS throws RangeError', () => { + expect(() => loadCritiqueConfigFromEnv({ OD_CRITIQUE_MAX_ROUNDS: '-1' })).toThrow(RangeError); + }); + + it('zero OD_CRITIQUE_MAX_ROUNDS throws RangeError', () => { + expect(() => loadCritiqueConfigFromEnv({ OD_CRITIQUE_MAX_ROUNDS: '0' })).toThrow(RangeError); + }); + + it('non-numeric OD_CRITIQUE_SCORE_THRESHOLD throws RangeError', () => { + expect(() => loadCritiqueConfigFromEnv({ OD_CRITIQUE_SCORE_THRESHOLD: 'high' })).toThrow(RangeError); + }); + + it('negative OD_CRITIQUE_SCORE_THRESHOLD throws RangeError', () => { + expect(() => loadCritiqueConfigFromEnv({ OD_CRITIQUE_SCORE_THRESHOLD: '-1' })).toThrow(RangeError); + }); + + it('non-numeric OD_CRITIQUE_PER_ROUND_TIMEOUT_MS throws RangeError', () => { + expect(() => loadCritiqueConfigFromEnv({ OD_CRITIQUE_PER_ROUND_TIMEOUT_MS: 'fast' })).toThrow(RangeError); + }); + + it('invalid OD_CRITIQUE_FALLBACK_POLICY throws RangeError', () => { + expect(() => loadCritiqueConfigFromEnv({ OD_CRITIQUE_FALLBACK_POLICY: 'maybe' })).toThrow(RangeError); + }); + + it('threshold exceeding scale throws RangeError', () => { + expect(() => + loadCritiqueConfigFromEnv({ + OD_CRITIQUE_SCORE_THRESHOLD: '15', + OD_CRITIQUE_SCORE_SCALE: '10', + }), + ).toThrow(RangeError); + }); + + it('valid threshold equal to scale passes', () => { + const cfg = loadCritiqueConfigFromEnv({ + OD_CRITIQUE_SCORE_THRESHOLD: '10', + OD_CRITIQUE_SCORE_SCALE: '10', + }); + expect(cfg.scoreThreshold).toBe(10); + expect(cfg.scoreScale).toBe(10); + }); + + it('all valid OD_CRITIQUE_* values map correctly together', () => { + const cfg = loadCritiqueConfigFromEnv({ + OD_CRITIQUE_ENABLED: '1', + OD_CRITIQUE_MAX_ROUNDS: '4', + OD_CRITIQUE_SCORE_THRESHOLD: '7', + OD_CRITIQUE_SCORE_SCALE: '10', + OD_CRITIQUE_PER_ROUND_TIMEOUT_MS: '45000', + OD_CRITIQUE_TOTAL_TIMEOUT_MS: '180000', + OD_CRITIQUE_PARSER_MAX_BLOCK_BYTES: '524288', + OD_CRITIQUE_FALLBACK_POLICY: 'ship_last', + }); + expect(cfg.enabled).toBe(true); + expect(cfg.maxRounds).toBe(4); + expect(cfg.scoreThreshold).toBeCloseTo(7); + expect(cfg.scoreScale).toBe(10); + expect(cfg.perRoundTimeoutMs).toBe(45000); + expect(cfg.totalTimeoutMs).toBe(180000); + expect(cfg.parserMaxBlockBytes).toBe(524288); + expect(cfg.fallbackPolicy).toBe('ship_last'); + }); +}); diff --git a/apps/daemon/tests/critique-lifecycle.test.ts b/apps/daemon/tests/critique-lifecycle.test.ts new file mode 100644 index 000000000..28ad5c1d0 --- /dev/null +++ b/apps/daemon/tests/critique-lifecycle.test.ts @@ -0,0 +1,172 @@ +/** + * Regression tests for round 3 review feedback on PR #481: + * - A signal-terminated child (e.g. SIGTERM from /api/runs/:id/cancel) + * finalizes the critique row as 'interrupted', not 'below_threshold'. + * The synthetic ship event for the best-so-far round carries + * status='interrupted' so transcripts and SSE clients see the real cause. + * - artifactPath persisted with the row stays null on shipped runs until a + * future phase actually writes the SHIP body to disk. The + * transcript still records the ship event so consumers can find the run. + */ +import { describe, it, expect, beforeEach, afterEach } from 'vitest'; +import { mkdtempSync } from 'node:fs'; +import { rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import Database from 'better-sqlite3'; +import { migrateCritique, getCritiqueRun } from '../src/critique/persistence.js'; +import { runOrchestrator, type CritiqueSseBus } from '../src/critique/orchestrator.js'; +import type { CritiqueSseEvent } from '@open-design/contracts/critique'; +import { defaultCritiqueConfig } from '@open-design/contracts/critique'; + +function freshDb(): Database.Database { + const db = new Database(':memory:'); + db.pragma('journal_mode = WAL'); + db.pragma('foreign_keys = ON'); + db.exec(` + CREATE TABLE projects ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL + ); + CREATE TABLE conversations ( + id TEXT PRIMARY KEY, + project_id TEXT NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE CASCADE + ); + INSERT INTO projects (id, name, created_at, updated_at) VALUES ('p1', 'p1', 0, 0); + INSERT INTO conversations (id, project_id, created_at, updated_at) VALUES ('c1', 'p1', 0, 0); + `); + migrateCritique(db); + return db; +} + +function makeBus(): { bus: CritiqueSseBus; events: CritiqueSseEvent[] } { + const events: CritiqueSseEvent[] = []; + const bus: CritiqueSseBus = { emit: (e) => { events.push(e); } }; + return { bus, events }; +} + +let tmpDir: string; +let db: Database.Database; + +beforeEach(() => { + tmpDir = mkdtempSync(join(tmpdir(), 'od-lifecycle-test-')); + db = freshDb(); +}); + +afterEach(async () => { + db.close(); + await rm(tmpDir, { recursive: true, force: true }); +}); + +/** A stream that yields a complete round 1 then awaits forever, emulating a + * CLI that produced partial output before being killed. */ +async function* roundOneThenStall(): AsyncIterable { + yield ` + + + v1 + ]]> + + ok + ok + ok + ok + continue + +`; + // Stall indefinitely so the orchestrator must rely on the child-exit race. + await new Promise(() => { /* never resolves */ }); +} + +describe('orchestrator lifecycle (PR #481 round 3 review)', () => { + it('child killed with SIGTERM after 1 closed round persists interrupted, not below_threshold', async () => { + const { bus, events } = makeBus(); + const artifactDir = join(tmpDir, 'sigterm-1'); + + let resolveExit!: (v: { code: number | null; signal: string | null }) => void; + const childExitPromise = new Promise<{ code: number | null; signal: string | null }>((r) => { resolveExit = r; }); + const child = { kill: (): boolean => true }; + + // Schedule the SIGTERM to arrive shortly after the parser closes round 1. + setTimeout(() => resolveExit({ code: null, signal: 'SIGTERM' }), 75); + + const result = await runOrchestrator({ + runId: 'r-sigterm', + projectId: 'p1', + conversationId: null, + artifactId: 'a1', + artifactDir, + adapter: 'claude', + cfg: defaultCritiqueConfig(), + db, + bus, + stdout: roundOneThenStall(), + child, + childExitPromise, + }); + + expect(result.status).toBe('interrupted'); + const row = getCritiqueRun(db, 'r-sigterm'); + expect(row?.status).toBe('interrupted'); + + // Synthetic ship event must carry status='interrupted' (not below_threshold). + const shipEvents = events.filter((e) => e.event === 'critique.ship'); + expect(shipEvents).toHaveLength(1); + const shipPayload = shipEvents[0]?.data as { status: string } | undefined; + expect(shipPayload?.status).toBe('interrupted'); + + // Round 1 closed with composite ~9.0, so the fallback round should hold. + expect(result.composite).not.toBeNull(); + expect(result.composite!).toBeGreaterThan(8.0); + }); + + it('shipped run persists artifactPath=null until artifact extraction lands', async () => { + const { bus } = makeBus(); + const artifactDir = join(tmpDir, 'no-artifact'); + + const stream = ` + + + v1 + ]]> + + ok + ok + ok + ok + ok + + + final]]> + Done. + + `; + + async function* streamOf(text: string): AsyncIterable { + for (let i = 0; i < text.length; i += 64) yield text.slice(i, i + 64); + } + + const result = await runOrchestrator({ + runId: 'r-shipped', + projectId: 'p1', + conversationId: null, + artifactId: 'a1', + artifactDir, + adapter: 'claude', + cfg: defaultCritiqueConfig(), + db, + bus, + stdout: streamOf(stream), + }); + + expect(result.status).toBe('shipped'); + expect(result.artifactPath).toBeNull(); + const row = getCritiqueRun(db, 'r-shipped'); + expect(row?.artifactPath).toBeNull(); + }); +}); diff --git a/apps/daemon/tests/critique-orchestrator.test.ts b/apps/daemon/tests/critique-orchestrator.test.ts new file mode 100644 index 000000000..9d9983261 --- /dev/null +++ b/apps/daemon/tests/critique-orchestrator.test.ts @@ -0,0 +1,777 @@ +import { describe, it, expect, beforeEach, afterEach, vi } from 'vitest'; +import { mkdtempSync, existsSync, readFileSync } from 'node:fs'; +import { rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import Database from 'better-sqlite3'; +import { migrateCritique, getCritiqueRun } from '../src/critique/persistence.js'; +import { runOrchestrator, type CritiqueSseBus, type OrchestratorParams } from '../src/critique/orchestrator.js'; +import type { CritiqueSseEvent } from '@open-design/contracts/critique'; +import { defaultCritiqueConfig, type CritiqueConfig } from '@open-design/contracts/critique'; + +// --------------------------------------------------------------------------- +// DB fixture +// --------------------------------------------------------------------------- + +function freshDb(): Database.Database { + const db = new Database(':memory:'); + db.pragma('journal_mode = WAL'); + db.pragma('foreign_keys = ON'); + db.exec(` + CREATE TABLE projects ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL + ); + CREATE TABLE conversations ( + id TEXT PRIMARY KEY, + project_id TEXT NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE CASCADE + ); + INSERT INTO projects (id, name, created_at, updated_at) VALUES ('p1', 'p1', 0, 0); + INSERT INTO conversations (id, project_id, created_at, updated_at) VALUES ('c1', 'p1', 0, 0); + `); + migrateCritique(db); + return db; +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function makeBus(): { bus: CritiqueSseBus; events: CritiqueSseEvent[] } { + const events: CritiqueSseEvent[] = []; + const bus: CritiqueSseBus = { emit: (e) => { events.push(e); } }; + return { bus, events }; +} + +/** + * Builds a minimal 3-round happy-path wire protocol stream. Uses a threshold + * low enough (1.0) so every round passes, meaning SHIP with status=shipped. + */ +function happyStream3Rounds(): string { + return ` + + + + Design intent v1. + ]]> + + + Good layout. + + + Strong brand. + + + Passes AA. + + + Clear copy. + + + Composite 9.0 but continuing per test. + + + + + + Design intent v2. + + + Better. + + + Consistent. + + + Still passes. + + + Still clear. + + + Continuing to round 3. + + + + + + Design intent v3. + + + Excellent. + + + Perfect. + + + Excellent. + + + Great. + + + Threshold met. + + + + + final]]> + Design converged in 3 rounds. + + +`; +} + +async function* streamOf(text: string, chunkSize = 64): AsyncIterable { + for (let i = 0; i < text.length; i += chunkSize) { + yield text.slice(i, i + chunkSize); + } +} + +// --------------------------------------------------------------------------- +// Setup / Teardown +// --------------------------------------------------------------------------- + +let tmpDir: string; +let db: Database.Database; + +beforeEach(() => { + tmpDir = mkdtempSync(join(tmpdir(), 'od-orch-test-')); + db = freshDb(); +}); + +afterEach(async () => { + db.close(); + await rm(tmpDir, { recursive: true, force: true }); +}); + +// --------------------------------------------------------------------------- +// Happy path +// --------------------------------------------------------------------------- + +describe('runOrchestrator - happy path', () => { + it('3-round shipped run: row reflects shipped + composite + rounds + transcript path', async () => { + const { bus, events } = makeBus(); + const artifactDir = join(tmpDir, 'run1'); + const cfg = defaultCritiqueConfig(); + + const result = await runOrchestrator({ + runId: 'r1', + projectId: 'p1', + conversationId: 'c1', + artifactId: 'a1', + artifactDir, + adapter: 'claude', + cfg, + db, + bus, + stdout: streamOf(happyStream3Rounds()), + }); + + expect(result.status).toBe('shipped'); + expect(result.composite).toBeCloseTo(9.45, 1); + expect(result.rounds).toHaveLength(3); + expect(result.transcriptPath).toBeTruthy(); + + const row = getCritiqueRun(db, 'r1'); + expect(row?.status).toBe('shipped'); + expect(row?.rounds).toHaveLength(3); + expect(row?.transcriptPath).toBeTruthy(); + + // Transcript file exists on disk. + const transcriptFile = join(artifactDir, result.transcriptPath!); + expect(existsSync(transcriptFile)).toBe(true); + + // SSE events emitted: should include run_started and ship. + const eventNames = events.map((e) => e.event); + expect(eventNames).toContain('critique.run_started'); + expect(eventNames).toContain('critique.ship'); + }); + + it('SSE events are emitted in source order', async () => { + const { bus, events } = makeBus(); + const artifactDir = join(tmpDir, 'run-order'); + + await runOrchestrator({ + runId: 'r-order', + projectId: 'p1', + conversationId: null, + artifactId: 'a1', + artifactDir, + adapter: 'claude', + cfg: defaultCritiqueConfig(), + db, + bus, + stdout: streamOf(happyStream3Rounds()), + }); + + const names = events.map((e) => e.event); + const runStartedIdx = names.indexOf('critique.run_started'); + const shipIdx = names.lastIndexOf('critique.ship'); + expect(runStartedIdx).toBe(0); + expect(shipIdx).toBeGreaterThan(runStartedIdx); + }); +}); + +// --------------------------------------------------------------------------- +// Malformed / degraded +// --------------------------------------------------------------------------- + +describe('runOrchestrator - degraded', () => { + it('malformed input: row is degraded, critique.degraded emitted, no transcript path in row (transcript may still be written)', async () => { + const { bus, events } = makeBus(); + const artifactDir = join(tmpDir, 'run-malformed'); + + // Malformed: ROUND before CRITIQUE_RUN. + const malformedText = ``; + + const result = await runOrchestrator({ + runId: 'r-malformed', + projectId: 'p1', + conversationId: null, + artifactId: 'a1', + artifactDir, + adapter: 'claude', + cfg: defaultCritiqueConfig(), + db, + bus, + stdout: streamOf(malformedText), + }); + + expect(result.status).toBe('degraded'); + const row = getCritiqueRun(db, 'r-malformed'); + expect(row?.status).toBe('degraded'); + + const degradedEvents = events.filter((e) => e.event === 'critique.degraded'); + expect(degradedEvents).toHaveLength(1); + }); +}); + +// --------------------------------------------------------------------------- +// Fallback policy +// --------------------------------------------------------------------------- + +describe('runOrchestrator - fallback policy', () => { + it('below threshold: stream ends without SHIP, ship_best selects highest composite', async () => { + const { bus, events } = makeBus(); + const artifactDir = join(tmpDir, 'run-below'); + + // 2 rounds but no SHIP - scores below threshold (default 8.0). + const noShipText = ` + + + v1 + ]]> + + + needs work + Fix hierarchy + + ok + ok + ok + + Below threshold. + + + + v2 + better + ok + ok + ok + + Still below threshold. + + +`; + + const cfg: CritiqueConfig = { ...defaultCritiqueConfig(), fallbackPolicy: 'ship_best' }; + const result = await runOrchestrator({ + runId: 'r-below', + projectId: 'p1', + conversationId: null, + artifactId: 'a1', + artifactDir, + adapter: 'claude', + cfg, + db, + bus, + stdout: streamOf(noShipText), + }); + + expect(result.status).toBe('below_threshold'); + // ship_best should select round 2 (composite 7.0 > 6.0). + expect(result.composite).toBeGreaterThan(6.0); + + const row = getCritiqueRun(db, 'r-below'); + expect(row?.status).toBe('below_threshold'); + + const shipEvents = events.filter((e) => e.event === 'critique.ship'); + expect(shipEvents).toHaveLength(1); + }); + + it('fallback policy fail: row is failed, no synthetic ship event', async () => { + const { bus, events } = makeBus(); + const artifactDir = join(tmpDir, 'run-failpolicy'); + + const noShipText = ` + + + v1 + ]]> + + ok + ok + ok + ok + + Below threshold. + + +`; + + const cfg: CritiqueConfig = { ...defaultCritiqueConfig(), fallbackPolicy: 'fail' }; + const result = await runOrchestrator({ + runId: 'r-failpolicy', + projectId: 'p1', + conversationId: null, + artifactId: 'a1', + artifactDir, + adapter: 'claude', + cfg, + db, + bus, + stdout: streamOf(noShipText), + }); + + expect(result.status).toBe('failed'); + + const row = getCritiqueRun(db, 'r-failpolicy'); + expect(row?.status).toBe('failed'); + + const shipEvents = events.filter((e) => e.event === 'critique.ship'); + expect(shipEvents).toHaveLength(0); + }); +}); + +// --------------------------------------------------------------------------- +// Timeout +// --------------------------------------------------------------------------- + +describe('runOrchestrator - timeouts', () => { + it('per-round timeout: stalled stream causes timed_out row', async () => { + const { bus } = makeBus(); + const artifactDir = join(tmpDir, 'run-round-timeout'); + + // Source that yields initial data then stalls past the per-round timeout. + async function* stallingSource(): AsyncIterable { + yield '\n'; + yield ' \n'; + yield ' \n'; + yield ' v1\n'; + yield ' ]]>\n'; + yield ' \n'; + // Stall: never send ROUND_END, timeout will fire. + await new Promise((_, reject) => setTimeout(() => reject(new Error('stall')), 200)); + } + + const cfg: CritiqueConfig = { + ...defaultCritiqueConfig(), + perRoundTimeoutMs: 50, + totalTimeoutMs: 60_000, + }; + + const result = await runOrchestrator({ + runId: 'r-round-timeout', + projectId: 'p1', + conversationId: null, + artifactId: 'a1', + artifactDir, + adapter: 'claude', + cfg, + db, + bus, + stdout: stallingSource(), + }); + + expect(result.status).toBe('timed_out'); + const row = getCritiqueRun(db, 'r-round-timeout'); + expect(row?.status).toBe('timed_out'); + }, 5000); + + it('total timeout: wall-clock deadline exceeded causes timed_out row', async () => { + const { bus } = makeBus(); + const artifactDir = join(tmpDir, 'run-total-timeout'); + + async function* slowSource(): AsyncIterable { + yield '\n'; + await new Promise((_, reject) => setTimeout(() => reject(new Error('total stall')), 200)); + } + + const cfg: CritiqueConfig = { + ...defaultCritiqueConfig(), + perRoundTimeoutMs: 60_000, + totalTimeoutMs: 50, + }; + + const result = await runOrchestrator({ + runId: 'r-total-timeout', + projectId: 'p1', + conversationId: null, + artifactId: 'a1', + artifactDir, + adapter: 'claude', + cfg, + db, + bus, + stdout: slowSource(), + }); + + expect(result.status).toBe('timed_out'); + const row = getCritiqueRun(db, 'r-total-timeout'); + expect(row?.status).toBe('timed_out'); + }, 5000); +}); + +// --------------------------------------------------------------------------- +// Abort signal +// --------------------------------------------------------------------------- + +describe('runOrchestrator - abort signal', () => { + it('abort mid-run: row is interrupted, transcript captures events seen so far', async () => { + const { bus, events } = makeBus(); + const artifactDir = join(tmpDir, 'run-abort'); + const controller = new AbortController(); + + async function* abortingSource(): AsyncIterable { + yield '\n'; + yield ' \n'; + yield ' \n'; + yield ' v1\n'; + yield ' ]]>\n'; + yield ' \n'; + // Abort mid-stream. + controller.abort(); + yield ' \n'; + } + + const result = await runOrchestrator({ + runId: 'r-abort', + projectId: 'p1', + conversationId: null, + artifactId: 'a1', + artifactDir, + adapter: 'claude', + cfg: defaultCritiqueConfig(), + db, + bus, + stdout: abortingSource(), + signal: controller.signal, + }); + + expect(result.status).toBe('interrupted'); + const row = getCritiqueRun(db, 'r-abort'); + expect(row?.status).toBe('interrupted'); + + // Transcript should exist with partial events. + if (result.transcriptPath) { + expect(existsSync(join(artifactDir, result.transcriptPath))).toBe(true); + } + + const interruptedEvents = events.filter((e) => e.event === 'critique.interrupted'); + expect(interruptedEvents).toHaveLength(1); + }); +}); + +// --------------------------------------------------------------------------- +// Defensive entry validation +// --------------------------------------------------------------------------- + +describe('runOrchestrator - defensive entry', () => { + it('throws RangeError on invalid cfg (negative scoreThreshold) before any side effects', async () => { + const { bus } = makeBus(); + const cfg: CritiqueConfig = { ...defaultCritiqueConfig(), scoreThreshold: -1 }; + + await expect( + runOrchestrator({ + runId: 'r-invalid', + projectId: 'p1', + conversationId: null, + artifactId: 'a1', + artifactDir: join(tmpDir, 'run-invalid'), + adapter: 'claude', + cfg, + db, + bus, + stdout: streamOf(''), + }), + ).rejects.toThrow(RangeError); + + // No row should have been inserted. + expect(getCritiqueRun(db, 'r-invalid')).toBeNull(); + }); + + it('throws RangeError on invalid cfg (zero perRoundTimeoutMs)', async () => { + const { bus } = makeBus(); + const cfg: CritiqueConfig = { ...defaultCritiqueConfig(), perRoundTimeoutMs: 0 }; + + await expect( + runOrchestrator({ + runId: 'r-invalid2', + projectId: 'p1', + conversationId: null, + artifactId: 'a1', + artifactDir: join(tmpDir, 'run-invalid2'), + adapter: 'claude', + cfg, + db, + bus, + stdout: streamOf(''), + }), + ).rejects.toThrow(RangeError); + + expect(getCritiqueRun(db, 'r-invalid2')).toBeNull(); + }); +}); + +// --------------------------------------------------------------------------- +// Child exit races (Defect 4) +// --------------------------------------------------------------------------- + +describe('runOrchestrator - child exit race (Defect 4)', () => { + it('child exits non-zero mid-stream: result is failed with cli_exit_nonzero', async () => { + const { bus, events } = makeBus(); + const artifactDir = join(tmpDir, 'run-child-exit'); + + // Stub child that exits with code 1 immediately. + let killCalled = false; + const stubChild = { kill: (_sig?: number | NodeJS.Signals) => { killCalled = true; return true as boolean; } }; + + // childExitPromise resolves with code=1 after a short delay. + const childExitPromise = new Promise<{ code: number | null; signal: string | null }>( + (resolve) => setTimeout(() => resolve({ code: 1, signal: null }), 20), + ); + + // Stdout that emits the run header then stalls. + // Uses a short delay (longer than childExitPromise's 20ms) so the child + // exit race wins before the stall promise resolves, but the generator + // itself does eventually resolve so iter.return() cleanup doesn't hang. + async function* stallingStdout(): AsyncIterable { + yield '\n'; + yield ' \n'; + // Stall for longer than the child exit delay (20ms) but eventually resolve + // so the generator can be cleaned up by iter.return() in applyTimeouts. + await new Promise((resolve) => setTimeout(resolve, 5000)); + } + + const cfg: CritiqueConfig = { + ...defaultCritiqueConfig(), + // Long timeouts so only the child exit race wins; we don't want the + // per-round or total timer to fire before childExitPromise resolves. + perRoundTimeoutMs: 30_000, + totalTimeoutMs: 30_000, + }; + + const result = await runOrchestrator({ + runId: 'r-child-exit', + projectId: 'p1', + conversationId: null, + artifactId: 'a1', + artifactDir, + adapter: 'claude', + cfg, + db, + bus, + stdout: stallingStdout(), + child: stubChild, + childExitPromise, + }); + + expect(result.status).toBe('failed'); + const row = getCritiqueRun(db, 'r-child-exit'); + expect(row?.status).toBe('failed'); + expect(killCalled).toBe(true); + + const failedEvents = events.filter((e) => e.event === 'critique.failed'); + expect(failedEvents).toHaveLength(1); + }, 10000); + + it('child exits zero before parser completes: parser continues until stream ends', async () => { + const { bus } = makeBus(); + const artifactDir = join(tmpDir, 'run-child-exit-zero'); + + // Child exits with code 0 (zero is not an error). + const childExitPromise = new Promise<{ code: number | null; signal: string | null }>( + (resolve) => setTimeout(() => resolve({ code: 0, signal: null }), 10), + ); + + // A complete valid 1-round stream that finishes after the child exit. + async function* delayedStream(): AsyncIterable { + await new Promise((r) => setTimeout(r, 30)); + yield '\n'; + yield ' \n'; + yield ' v1v1

]]>
\n'; + yield ' ok\n'; + yield ' ok\n'; + yield ' ok\n'; + yield ' ok\n'; + yield ' ok\n'; + yield '
\n'; + yield ' \n'; + yield ' final

]]>
\n'; + yield ' done\n'; + yield '
\n'; + yield '
\n'; + } + + const result = await runOrchestrator({ + runId: 'r-child-exit-zero', + projectId: 'p1', + conversationId: null, + artifactId: 'a1', + artifactDir, + adapter: 'claude', + cfg: defaultCritiqueConfig(), + db, + bus, + stdout: delayedStream(), + childExitPromise, + }); + + // Zero exit does not disrupt the parser; it should complete as shipped. + expect(result.status).toBe('shipped'); + }, 10000); +}); + +// --------------------------------------------------------------------------- +// Timeout / abort best-so-far fallback (Defect 7) +// --------------------------------------------------------------------------- + +describe('runOrchestrator - fallback on timeout/abort (Defect 7)', () => { + it('timeout after 2 completed rounds: status=timed_out, score=max(composite)', async () => { + const { bus, events } = makeBus(); + const artifactDir = join(tmpDir, 'run-timeout-fallback'); + + // 2 complete rounds then stall. + // Two complete rounds. After emitting both ROUND_END events, stall so the + // total-timeout fires and the orchestrator elects a fallback round. + const twoRounds = ` + + v1v1

]]>
+ ok + ok + ok + ok + continue +
+ + v2 + better + ok + ok + ok + continue + `; + + async function* stallingAfterTwoRounds(): AsyncIterable { + yield* streamOf(twoRounds, 64); + // After both rounds land, stall so the total-timeout fires. + await new Promise((resolve) => setTimeout(resolve, 10_000)); + } + + const cfg: CritiqueConfig = { + ...defaultCritiqueConfig(), + // Per-round timeout starts when the first panelist_open of a new round + // fires. Set it to 200ms so it fires quickly once the stall begins. + // Total timeout is long so only the per-round timer fires. + // But we yield the stall after ROUND_END so no panelist_open is active. + // Use total timeout of 300ms which fires after the stall begins. + perRoundTimeoutMs: 60_000, + totalTimeoutMs: 300, + fallbackPolicy: 'ship_best', + }; + + const result = await runOrchestrator({ + runId: 'r-timeout-fallback', + projectId: 'p1', + conversationId: null, + artifactId: 'a1', + artifactDir, + adapter: 'claude', + cfg, + db, + bus, + stdout: stallingAfterTwoRounds(), + }); + + expect(result.status).toBe('timed_out'); + // Best round is 2 with composite 7.5. + expect(result.composite).toBeCloseTo(7.5, 1); + + const row = getCritiqueRun(db, 'r-timeout-fallback'); + expect(row?.status).toBe('timed_out'); + expect(row?.score).toBeCloseTo(7.5, 1); + + // A synthetic ship event should have been emitted. + const shipEvents = events.filter((e) => e.event === 'critique.ship'); + expect(shipEvents).toHaveLength(1); + }, 15000); + + it('abort after 1 completed round: status=interrupted, score matches that round', async () => { + const { bus, events } = makeBus(); + const artifactDir = join(tmpDir, 'run-abort-fallback'); + const controller = new AbortController(); + + const oneRound = ` + + v1v1

]]>
+ ok + ok + ok + ok + continue +
`; + + async function* abortAfterRound(): AsyncIterable { + yield* streamOf(oneRound, 64); + controller.abort(); + // One more yield after abort to ensure the abort is caught. + yield ' \n'; + } + + const cfg: CritiqueConfig = { + ...defaultCritiqueConfig(), + fallbackPolicy: 'ship_best', + }; + + const result = await runOrchestrator({ + runId: 'r-abort-fallback', + projectId: 'p1', + conversationId: null, + artifactId: 'a1', + artifactDir, + adapter: 'claude', + cfg, + db, + bus, + stdout: abortAfterRound(), + signal: controller.signal, + }); + + expect(result.status).toBe('interrupted'); + expect(result.composite).toBeCloseTo(8.0, 1); + + const row = getCritiqueRun(db, 'r-abort-fallback'); + expect(row?.status).toBe('interrupted'); + expect(row?.score).toBeCloseTo(8.0, 1); + + const shipEvents = events.filter((e) => e.event === 'critique.ship'); + expect(shipEvents).toHaveLength(1); + }); +}); diff --git a/apps/daemon/tests/critique-persistence.test.ts b/apps/daemon/tests/critique-persistence.test.ts new file mode 100644 index 000000000..2207a0a23 --- /dev/null +++ b/apps/daemon/tests/critique-persistence.test.ts @@ -0,0 +1,180 @@ +import { describe, expect, it, beforeEach } from 'vitest'; +import Database from 'better-sqlite3'; +import { + migrateCritique, + insertCritiqueRun, + getCritiqueRun, + updateCritiqueRun, + listCritiqueRunsByProject, + deleteCritiqueRun, + reconcileStaleRuns, + CRITIQUE_RUN_STATUSES, + type CritiqueRunRow, +} from '../src/critique/persistence.js'; + +function freshDb(): Database.Database { + const db = new Database(':memory:'); + db.pragma('journal_mode = WAL'); + db.pragma('foreign_keys = ON'); + // The persistence module has FKs into projects/conversations; create stubs + // with the columns the FK references actually need. + db.exec(` + CREATE TABLE projects ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL + ); + CREATE TABLE conversations ( + id TEXT PRIMARY KEY, + project_id TEXT NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE CASCADE + ); + INSERT INTO projects (id, name, created_at, updated_at) VALUES ('p1', 'p1', 0, 0); + INSERT INTO projects (id, name, created_at, updated_at) VALUES ('p2', 'p2', 0, 0); + INSERT INTO conversations (id, project_id, created_at, updated_at) VALUES ('c1', 'p1', 0, 0); + `); + migrateCritique(db); + return db; +} + +describe('critique persistence', () => { + let db: Database.Database; + beforeEach(() => { db = freshDb(); }); + + it('migrate is idempotent', () => { + expect(() => { migrateCritique(db); migrateCritique(db); }).not.toThrow(); + const tables = db.prepare( + `SELECT name FROM sqlite_master WHERE type='table' AND name='critique_runs'`, + ).all() as Array<{ name: string }>; + expect(tables.length).toBe(1); + }); + + it('insert + get round-trips a row with rounds payload preserved', () => { + const now = 1700000000000; + const row = insertCritiqueRun(db, { + id: 'crun_1', + projectId: 'p1', + conversationId: 'c1', + artifactPath: '.od/artifacts/crun_1/v1.html', + status: 'shipped', + score: 8.6, + rounds: [ + { n: 1, composite: 6.18, mustFix: 7, decision: 'continue' }, + { n: 2, composite: 7.86, mustFix: 3, decision: 'continue' }, + { n: 3, composite: 8.62, mustFix: 0, decision: 'ship' }, + ], + transcriptPath: '.od/artifacts/crun_1/transcript.ndjson', + protocolVersion: 1, + createdAt: now, + updatedAt: now, + }); + expect(row.id).toBe('crun_1'); + expect(row.rounds).toHaveLength(3); + expect(row.rounds[2]?.decision).toBe('ship'); + const fetched = getCritiqueRun(db, 'crun_1'); + expect(fetched).toEqual(row); + }); + + it('default rounds is an empty array when not provided', () => { + insertCritiqueRun(db, { + id: 'crun_empty', + projectId: 'p1', + status: 'failed', + protocolVersion: 1, + }); + const row = getCritiqueRun(db, 'crun_empty'); + expect(row?.rounds).toEqual([]); + }); + + it('rejects an invalid status at insert time', () => { + expect(() => insertCritiqueRun(db, { + id: 'crun_bad', + projectId: 'p1', + status: 'not_a_status' as never, + protocolVersion: 1, + })).toThrow(RangeError); + }); + + it('updateCritiqueRun bumps updated_at and applies the patch', async () => { + const r1 = insertCritiqueRun(db, { + id: 'crun_upd', + projectId: 'p1', + status: 'shipped', + protocolVersion: 1, + createdAt: 1, + updatedAt: 1, + }); + expect(r1.updatedAt).toBe(1); + const r2 = updateCritiqueRun(db, 'crun_upd', { + score: 9.1, + status: 'shipped', + updatedAt: 1234, + }); + expect(r2?.score).toBe(9.1); + expect(r2?.updatedAt).toBe(1234); + }); + + it('updateCritiqueRun returns null for unknown id', () => { + expect(updateCritiqueRun(db, 'crun_missing', { score: 1 })).toBeNull(); + }); + + it('listCritiqueRunsByProject returns rows ordered by updated_at DESC', () => { + insertCritiqueRun(db, { id: 'a', projectId: 'p1', status: 'shipped', protocolVersion: 1, createdAt: 100, updatedAt: 100 }); + insertCritiqueRun(db, { id: 'b', projectId: 'p1', status: 'shipped', protocolVersion: 1, createdAt: 200, updatedAt: 200 }); + insertCritiqueRun(db, { id: 'c', projectId: 'p2', status: 'shipped', protocolVersion: 1, createdAt: 300, updatedAt: 300 }); + const rows = listCritiqueRunsByProject(db, 'p1'); + expect(rows.map(r => r.id)).toEqual(['b', 'a']); + }); + + it('deleteCritiqueRun removes the row', () => { + insertCritiqueRun(db, { id: 'gone', projectId: 'p1', status: 'shipped', protocolVersion: 1 }); + deleteCritiqueRun(db, 'gone'); + expect(getCritiqueRun(db, 'gone')).toBeNull(); + }); + + it('CRITIQUE_RUN_STATUSES exposes every public status', () => { + expect(CRITIQUE_RUN_STATUSES).toEqual([ + 'shipped', 'below_threshold', 'timed_out', 'interrupted', + 'degraded', 'failed', 'legacy', + ]); + }); + + it('reconcileStaleRuns flips stale running rows to interrupted with recoveryReason', () => { + db.prepare( + `INSERT INTO critique_runs + (id, project_id, status, rounds_json, protocol_version, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + ).run('stuck1', 'p1', 'running', '[]', 1, 0, 100); + db.prepare( + `INSERT INTO critique_runs + (id, project_id, status, rounds_json, protocol_version, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + ).run('stuck2', 'p1', 'running', '[]', 1, 0, 200); + db.prepare( + `INSERT INTO critique_runs + (id, project_id, status, rounds_json, protocol_version, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + ).run('fresh', 'p1', 'running', '[]', 1, 0, 1_000_000); + + const now = 1_000_500; + const flipped = reconcileStaleRuns(db, { staleAfterMs: 1000, now }); + expect(flipped).toBe(2); + const r1 = getCritiqueRun(db, 'stuck1'); + expect(r1?.status).toBe('interrupted'); + const fresh = getCritiqueRun(db, 'fresh'); + expect(fresh?.status).toBe('running'); + // recoveryReason is on rounds_json (top-level alongside the round entries). + const raw = db.prepare(`SELECT rounds_json AS j FROM critique_runs WHERE id = 'stuck1'`).get() as { j: string }; + const parsed = JSON.parse(raw.j); + expect(parsed.recoveryReason).toBe('daemon_restart'); + }); + + it('CASCADEs critique_runs deletion when project is deleted', () => { + insertCritiqueRun(db, { id: 'doomed', projectId: 'p2', status: 'shipped', protocolVersion: 1 }); + db.prepare(`DELETE FROM projects WHERE id = ?`).run('p2'); + expect(getCritiqueRun(db, 'doomed')).toBeNull(); + }); +}); diff --git a/apps/daemon/tests/critique-spawn-wiring.test.ts b/apps/daemon/tests/critique-spawn-wiring.test.ts new file mode 100644 index 000000000..f250b424a --- /dev/null +++ b/apps/daemon/tests/critique-spawn-wiring.test.ts @@ -0,0 +1,252 @@ +/** + * Smoke tests for the Critique Theater spawn-path branch. + * + * These tests exercise the loadCritiqueConfigFromEnv gate and the + * runOrchestrator integration point without actually spawning a child process. + * The spawn wiring lives in server.ts (ts-nocheck), so we test the seam + * through the public module APIs: config loading and orchestrator execution + * with a synthetic stdout iterable. + */ +import { describe, it, expect, beforeEach, afterEach } from 'vitest'; +import { mkdtempSync } from 'node:fs'; +import { rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import Database from 'better-sqlite3'; +import { migrateCritique, getCritiqueRun } from '../src/critique/persistence.js'; +import { loadCritiqueConfigFromEnv } from '../src/critique/config.js'; +import { runOrchestrator, type CritiqueSseBus } from '../src/critique/orchestrator.js'; +import type { CritiqueSseEvent } from '@open-design/contracts/critique'; +import { defaultCritiqueConfig } from '@open-design/contracts/critique'; + +function freshDb(): Database.Database { + const db = new Database(':memory:'); + db.pragma('journal_mode = WAL'); + db.pragma('foreign_keys = ON'); + db.exec(` + CREATE TABLE projects ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL + ); + CREATE TABLE conversations ( + id TEXT PRIMARY KEY, + project_id TEXT NOT NULL, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL, + FOREIGN KEY(project_id) REFERENCES projects(id) ON DELETE CASCADE + ); + INSERT INTO projects (id, name, created_at, updated_at) VALUES ('p1', 'p1', 0, 0); + `); + migrateCritique(db); + return db; +} + +function makeBus(): { bus: CritiqueSseBus; events: CritiqueSseEvent[] } { + const events: CritiqueSseEvent[] = []; + const bus: CritiqueSseBus = { emit: (e) => { events.push(e); } }; + return { bus, events }; +} + +let tmpDir: string; +let db: Database.Database; + +beforeEach(() => { + tmpDir = mkdtempSync(join(tmpdir(), 'od-spawn-wiring-test-')); + db = freshDb(); +}); +afterEach(async () => { + db.close(); + await rm(tmpDir, { recursive: true, force: true }); +}); + +// --------------------------------------------------------------------------- +// Config gate: OD_CRITIQUE_ENABLED=false (legacy path unchanged) +// --------------------------------------------------------------------------- + +describe('spawn wiring - cfg.enabled=false (M0 default)', () => { + it('loadCritiqueConfigFromEnv with empty env returns enabled=false', () => { + const cfg = loadCritiqueConfigFromEnv({}); + expect(cfg.enabled).toBe(false); + }); + + it('loadCritiqueConfigFromEnv with OD_CRITIQUE_ENABLED=false returns enabled=false', () => { + const cfg = loadCritiqueConfigFromEnv({ OD_CRITIQUE_ENABLED: 'false' }); + expect(cfg.enabled).toBe(false); + }); + + it('when cfg.enabled=false, runOrchestrator is not invoked (legacy path)', async () => { + // Simulate what the spawn branch does: check cfg.enabled before calling orchestrator. + const cfg = loadCritiqueConfigFromEnv({}); + expect(cfg.enabled).toBe(false); + // No orchestrator call means no row inserted. + expect(getCritiqueRun(db, 'legacy-run')).toBeNull(); + }); +}); + +// --------------------------------------------------------------------------- +// Config gate: OD_CRITIQUE_ENABLED=true (orchestrator path) +// --------------------------------------------------------------------------- + +describe('spawn wiring - cfg.enabled=true (orchestrator path)', () => { + it('with OD_CRITIQUE_ENABLED=true, runOrchestrator is invoked with the spawn stdout', async () => { + const cfg = loadCritiqueConfigFromEnv({ OD_CRITIQUE_ENABLED: '1' }); + expect(cfg.enabled).toBe(true); + + const { bus, events } = makeBus(); + const artifactDir = join(tmpDir, 'run-enabled'); + + // Synthetic stdout that matches a minimal valid critique run. + async function* mockStdout(): AsyncIterable { + yield '\n'; + yield ' \n'; + yield ' \n'; + yield ' v1\n'; + yield ' ]]>\n'; + yield ' \n'; + yield ' ok\n'; + yield ' ok\n'; + yield ' ok\n'; + yield ' ok\n'; + yield ' \n'; + yield ' Ship on round 1.\n'; + yield ' \n'; + yield ' \n'; + yield ' \n'; + yield ' ]]>\n'; + yield ' Shipped.\n'; + yield ' \n'; + yield '\n'; + } + + const result = await runOrchestrator({ + runId: 'enabled-run', + projectId: 'p1', + conversationId: null, + artifactId: 'a1', + artifactDir, + adapter: 'claude', + cfg, + db, + bus, + stdout: mockStdout(), + }); + + // Orchestrator ran and returned a shipped result. + expect(result.status).toBe('shipped'); + const row = getCritiqueRun(db, 'enabled-run'); + expect(row?.status).toBe('shipped'); + + // SSE events were emitted on the bus. + const eventNames = events.map((e) => e.event); + expect(eventNames).toContain('critique.run_started'); + expect(eventNames).toContain('critique.ship'); + }); + + it('errors thrown by the orchestrator surface to the caller', async () => { + const cfg = loadCritiqueConfigFromEnv({ OD_CRITIQUE_ENABLED: '1' }); + + // Invalid cfg to force a RangeError before any side effect. + const badCfg = { ...cfg, perRoundTimeoutMs: -1 }; + const { bus } = makeBus(); + + await expect( + runOrchestrator({ + runId: 'error-run', + projectId: 'p1', + conversationId: null, + artifactId: 'a1', + artifactDir: join(tmpDir, 'run-error'), + adapter: 'claude', + cfg: badCfg, + db, + bus, + stdout: (async function* () { yield ''; })(), + }), + ).rejects.toThrow(RangeError); + + // No row inserted because error fires before insertCritiqueRun. + expect(getCritiqueRun(db, 'error-run')).toBeNull(); + }); +}); + +// --------------------------------------------------------------------------- +// Stream format gating (Defect 1) +// --------------------------------------------------------------------------- +// The server gates the orchestrator path on streamFormat === 'plain'. +// We test the logic inline: simulate the server branch condition and verify +// that non-plain adapters skip the orchestrator entirely. + +describe('spawn wiring - stream format gating (Defect 1)', () => { + const NON_PLAIN_FORMATS = [ + 'claude-stream-json', + 'copilot-stream-json', + 'json-event-stream', + 'acp-json-rpc', + ] as const; + + for (const fmt of NON_PLAIN_FORMATS) { + it(`format="${fmt}" skips the orchestrator (no run row inserted)`, async () => { + // Simulate the server branch: if streamFormat !== 'plain', skip orchestrator. + const cfg = loadCritiqueConfigFromEnv({ OD_CRITIQUE_ENABLED: '1' }); + const adapterStreamFormat: string = fmt; + + if (cfg.enabled && adapterStreamFormat !== 'plain') { + // Legacy path: orchestrator NOT called. + // Nothing should be inserted. + expect(getCritiqueRun(db, `skip-${fmt}`)).toBeNull(); + return; + } + + // If we reach here, the test scenario is wrong. + throw new Error(`Expected ${fmt} to skip orchestrator but did not`); + }); + } + + it('format="plain" routes through the orchestrator', async () => { + const cfg = loadCritiqueConfigFromEnv({ OD_CRITIQUE_ENABLED: '1' }); + const adapterStreamFormat = 'plain'; + + // Simulate: only call orchestrator when format is plain. + if (!cfg.enabled || adapterStreamFormat !== 'plain') { + throw new Error('Expected plain format to be routed through orchestrator'); + } + + const { bus } = makeBus(); + const artifactDir = join(tmpDir, 'run-plain-format'); + + async function* mockStdout(): AsyncIterable { + yield '\n'; + yield ' \n'; + yield ' v1v1

]]>
\n'; + yield ' ok\n'; + yield ' ok\n'; + yield ' ok\n'; + yield ' ok\n'; + yield ' ok\n'; + yield '
\n'; + yield ' \n'; + yield ' final

]]>
\n'; + yield ' done\n'; + yield '
\n'; + yield '
\n'; + } + + const result = await runOrchestrator({ + runId: 'plain-format-run', + projectId: 'p1', + conversationId: null, + artifactId: 'a1', + artifactDir, + adapter: 'plain-adapter', + cfg, + db, + bus, + stdout: mockStdout(), + }); + + expect(result.status).toBe('shipped'); + expect(getCritiqueRun(db, 'plain-format-run')?.status).toBe('shipped'); + }); +}); diff --git a/apps/daemon/tests/critique-transcript.test.ts b/apps/daemon/tests/critique-transcript.test.ts new file mode 100644 index 000000000..99f5f2773 --- /dev/null +++ b/apps/daemon/tests/critique-transcript.test.ts @@ -0,0 +1,239 @@ +import { describe, it, expect, beforeEach, afterEach } from 'vitest'; +import { mkdtempSync, existsSync, readdirSync } from 'node:fs'; +import { rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; +import { createGunzip } from 'node:zlib'; +import { createReadStream } from 'node:fs'; +import { createInterface } from 'node:readline'; +import type { PanelEvent } from '@open-design/contracts/critique'; +import { writeTranscript, readTranscript } from '../src/critique/transcript.js'; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +function makeRunStarted(runId = 'r1'): PanelEvent { + return { + type: 'run_started', + runId, + protocolVersion: 1, + cast: ['designer', 'critic', 'brand', 'a11y', 'copy'], + maxRounds: 3, + threshold: 8.0, + scale: 10, + }; +} + +function makeShip(runId = 'r1'): PanelEvent { + return { + type: 'ship', + runId, + round: 1, + composite: 9.0, + status: 'shipped', + artifactRef: { projectId: 'p1', artifactId: 'a1' }, + summary: 'done', + }; +} + +async function collect(iter: AsyncIterable): Promise { + const out: PanelEvent[] = []; + for await (const e of iter) out.push(e); + return out; +} + +// --------------------------------------------------------------------------- +// Setup / Teardown +// --------------------------------------------------------------------------- + +let tmpDir: string; +beforeEach(() => { + tmpDir = mkdtempSync(join(tmpdir(), 'od-transcript-test-')); +}); +afterEach(async () => { + await rm(tmpDir, { recursive: true, force: true }); +}); + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +describe('writeTranscript + readTranscript', () => { + it('writes plain .ndjson for small input and events round-trip', async () => { + const events: PanelEvent[] = [makeRunStarted(), makeShip()]; + const artifactDir = join(tmpDir, 'run1'); + const result = await writeTranscript(artifactDir, events, { gzipThresholdBytes: 1_000_000 }); + + expect(result.path).toBe('transcript.ndjson'); + expect(result.gzipped).toBe(false); + expect(result.bytes).toBeGreaterThan(0); + expect(existsSync(join(artifactDir, 'transcript.ndjson'))).toBe(true); + + const roundTripped = await collect(readTranscript(artifactDir, 'transcript.ndjson')); + expect(roundTripped).toEqual(events); + }); + + it('writes .ndjson.gz for large input (over threshold) and events round-trip', async () => { + // Use a very low threshold to force gzip. + const events: PanelEvent[] = [makeRunStarted(), makeShip()]; + const artifactDir = join(tmpDir, 'run2'); + const result = await writeTranscript(artifactDir, events, { gzipThresholdBytes: 1 }); + + expect(result.path).toBe('transcript.ndjson.gz'); + expect(result.gzipped).toBe(true); + expect(existsSync(join(artifactDir, 'transcript.ndjson.gz'))).toBe(true); + + // Verify gzip integrity by gunzipping manually and confirming it parses. + const lines: string[] = []; + const rl = createInterface({ + input: createReadStream(join(artifactDir, 'transcript.ndjson.gz')).pipe(createGunzip()), + crlfDelay: Infinity, + }); + for await (const line of rl) { + if (line.trim()) lines.push(line.trim()); + } + expect(lines).toHaveLength(2); + expect(JSON.parse(lines[0]!)).toEqual(events[0]); + + const roundTripped = await collect(readTranscript(artifactDir, 'transcript.ndjson.gz')); + expect(roundTripped).toEqual(events); + }); + + it('empty events iterable writes a file with 0 bytes and round-trip yields nothing', async () => { + const artifactDir = join(tmpDir, 'run-empty'); + const result = await writeTranscript(artifactDir, [], { gzipThresholdBytes: 1_000_000 }); + + expect(result.bytes).toBe(0); + expect(result.gzipped).toBe(false); + expect(existsSync(join(artifactDir, 'transcript.ndjson'))).toBe(true); + + const roundTripped = await collect(readTranscript(artifactDir, 'transcript.ndjson')); + expect(roundTripped).toHaveLength(0); + }); + + it('multibyte CJK content sizes correctly under UTF-8 byte cap', async () => { + const cjkEvent: PanelEvent = { + type: 'panelist_dim', + runId: 'r1', + round: 1, + role: 'critic', + dimName: 'hierarchy', + dimScore: 7, + dimNote: '字体层次不清晰,标题与正文对比不足', + }; + const artifactDir = join(tmpDir, 'run-cjk'); + // Threshold below CJK content byte count to force gzip. + const threshold = 10; + const result = await writeTranscript(artifactDir, [cjkEvent], { gzipThresholdBytes: threshold }); + + const serialized = JSON.stringify(cjkEvent) + '\n'; + const expected = Buffer.byteLength(serialized, 'utf8'); + expect(result.bytes).toBe(expected); + // CJK chars are multi-byte, so bytes > string length. + expect(result.bytes).toBeGreaterThan(serialized.length); + expect(result.gzipped).toBe(true); + + const roundTripped = await collect(readTranscript(artifactDir, 'transcript.ndjson.gz')); + expect(roundTripped).toEqual([cjkEvent]); + }); + + it('temp file is cleaned up on success', async () => { + const artifactDir = join(tmpDir, 'run-cleanup'); + await writeTranscript(artifactDir, [makeRunStarted()], { gzipThresholdBytes: 1_000_000 }); + + const files = readdirSync(artifactDir); + const tempFiles = files.filter((f) => f.includes('.tmp.')); + expect(tempFiles).toHaveLength(0); + }); + + it('temp file is cleaned up on failure and error propagates', async () => { + const artifactDir = join(tmpDir, 'run-fail'); + await mkdirIfNeeded(artifactDir); + + async function* failingSource(): AsyncIterable { + yield makeRunStarted(); + throw new Error('mid-stream failure'); + } + + await expect( + writeTranscript(artifactDir, failingSource(), { gzipThresholdBytes: 1_000_000 }), + ).rejects.toThrow('mid-stream failure'); + + // No temp file should remain. + const files = existsSync(artifactDir) ? readdirSync(artifactDir) : []; + const tempFiles = files.filter((f) => f.includes('.tmp.')); + expect(tempFiles).toHaveLength(0); + }); + + it('readTranscript detects .gz vs .ndjson by extension', async () => { + const artifactDir = join(tmpDir, 'run-ext'); + const events = [makeRunStarted(), makeShip()]; + + // Write both plain and gzipped. + await writeTranscript(artifactDir, events, { gzipThresholdBytes: 1_000_000 }); + // Write gzipped version too by using a low threshold. + const artifactDir2 = join(tmpDir, 'run-ext2'); + await writeTranscript(artifactDir2, events, { gzipThresholdBytes: 1 }); + + const plain = await collect(readTranscript(artifactDir, 'transcript.ndjson')); + const gz = await collect(readTranscript(artifactDir2, 'transcript.ndjson.gz')); + + expect(plain).toEqual(events); + expect(gz).toEqual(events); + }); + + it('readTranscript throws on unknown extension', async () => { + const artifactDir = join(tmpDir, 'run-badext'); + await expect( + collect(readTranscript(artifactDir, 'transcript.json')), + ).rejects.toThrow(RangeError); + }); + + it('writeTranscript throws RangeError on empty artifactDir', async () => { + await expect(writeTranscript('', [])).rejects.toThrow(RangeError); + }); + + it('writeTranscript throws RangeError on non-iterable events', async () => { + // Pass a plain object that has neither Symbol.iterator nor Symbol.asyncIterator. + await expect( + writeTranscript( + join(tmpDir, 'run-badevents'), + {} as unknown as Iterable, + ), + ).rejects.toThrow(RangeError); + }); + + it('gzip crash leaves no final .gz or .gz.tmp on disk (Defect 8)', async () => { + const artifactDir = join(tmpDir, 'run-gz-crash'); + await mkdirIfNeeded(artifactDir); + + // Source that throws mid-stream to simulate a crash during write. + async function* failingGzipSource(): AsyncIterable { + yield makeRunStarted(); + throw new Error('simulated gzip crash'); + } + + // Use threshold=1 to force gzip path. + await expect( + writeTranscript(artifactDir, failingGzipSource(), { gzipThresholdBytes: 1 }), + ).rejects.toThrow('simulated gzip crash'); + + // The final .gz must not exist. + expect(existsSync(join(artifactDir, 'transcript.ndjson.gz'))).toBe(false); + + // No .gz.tmp should remain. + const files = existsSync(artifactDir) ? readdirSync(artifactDir) : []; + const gzTmpFiles = files.filter((f) => f.endsWith('.gz.tmp')); + expect(gzTmpFiles).toHaveLength(0); + }); +}); + +// --------------------------------------------------------------------------- +// Utility +// --------------------------------------------------------------------------- + +async function mkdirIfNeeded(dir: string): Promise { + const { mkdir } = await import('node:fs/promises'); + await mkdir(dir, { recursive: true }); +} diff --git a/apps/daemon/tests/parser.test.ts b/apps/daemon/tests/parser.test.ts index ccd59f84a..9a0ece948 100644 --- a/apps/daemon/tests/parser.test.ts +++ b/apps/daemon/tests/parser.test.ts @@ -341,3 +341,75 @@ describe('parseCritiqueStream -- v1 envelope and shape invariants (mrcfps review ).rejects.toBeInstanceOf(MalformedBlockError); }); }); + +describe('parseCritiqueStream -- Defects 3+5 regressions', () => { + async function* oneChunk(s: string): AsyncGenerator { yield s; } + + it('SHIP before any ROUND_END throws MalformedBlockError (Defect 5)', async () => { + const stream = ` + + x

]]>
+ skipped rounds +
+
`; + await expect( + collect(parseCritiqueStream(oneChunk(stream), { + runId: 't', adapter: 'test', parserMaxBlockBytes: 262_144, + })), + ).rejects.toBeInstanceOf(MalformedBlockError); + }); + + it('SHIP without inner throws MissingArtifactError (Defect 5)', async () => { + const stream = ` + + + v1 + v1

]]>
+
+ ok + ok + ok + ok + ok +
+ + no artifact block here + +
`; + await expect( + collect(parseCritiqueStream(oneChunk(stream), { + runId: 't', adapter: 'test', parserMaxBlockBytes: 262_144, + })), + ).rejects.toBeInstanceOf(MissingArtifactError); + }); + + it('artifactRef is populated from parser options projectId+artifactId (Defect 3)', async () => { + const stream = ` + + + v1 + v1

]]>
+
+ ok + ok + ok + ok + ok +
+ + final

]]>
+ done +
+
`; + const events = await collect(parseCritiqueStream(oneChunk(stream), { + runId: 't', adapter: 'test', parserMaxBlockBytes: 262_144, + projectId: 'p1', artifactId: 'a1', + })); + const ship = events.find(e => e.type === 'ship'); + expect(ship).toBeDefined(); + if (ship && ship.type === 'ship') { + expect(ship.artifactRef.projectId).toBe('p1'); + expect(ship.artifactRef.artifactId).toBe('a1'); + } + }); +}); diff --git a/packages/contracts/src/critique.ts b/packages/contracts/src/critique.ts index 90e598d1a..2fd7e8c31 100644 --- a/packages/contracts/src/critique.ts +++ b/packages/contracts/src/critique.ts @@ -1,5 +1,19 @@ import { z } from 'zod'; +/** + * Local mirror of SseTransportEvent from './sse/common'. Re-defining the + * three-field interface avoids a cross-file relative import inside this leaf + * module: the daemon walks this file via the './critique' subpath export + * under NodeNext (which requires explicit '.js' extensions), while the web + * Turbopack build refuses to rewrite '.js' to '.ts' on the same source. + * Keeping the type local makes the file self-contained for both consumers. + */ +interface SseTransportEvent { + id?: string; + event: Name; + data: Payload; +} + export const PANELIST_ROLES = ['designer', 'critic', 'brand', 'a11y', 'copy'] as const; export type PanelistRole = typeof PANELIST_ROLES[number]; @@ -110,3 +124,52 @@ export function isPanelEvent(value: unknown): value is PanelEvent { if (typeof t !== 'string' || !PANEL_EVENT_TYPES.has(t as PanelEvent['type'])) return false; return typeof obj['runId'] === 'string' && (obj['runId'] as string).length > 0; } + +// --------------------------------------------------------------------------- +// SSE wire mapping. Inlined here so the contracts package has zero relative +// imports inside the leaf module the daemon walks via the './critique' +// subpath export. The daemon's NodeNext resolution requires explicit .js +// extensions on relative imports while the web Turbopack build refuses to +// rewrite .js -> .ts on the same source, so a re-export across files is +// the worst of both worlds. Keeping the definitions self-contained here +// avoids the conflict entirely. +// --------------------------------------------------------------------------- + +type PayloadOf = Omit, 'type'>; + +export type CritiqueSseEvent = + | SseTransportEvent<'critique.run_started', PayloadOf<'run_started'>> + | SseTransportEvent<'critique.panelist_open', PayloadOf<'panelist_open'>> + | SseTransportEvent<'critique.panelist_dim', PayloadOf<'panelist_dim'>> + | SseTransportEvent<'critique.panelist_must_fix', PayloadOf<'panelist_must_fix'>> + | SseTransportEvent<'critique.panelist_close', PayloadOf<'panelist_close'>> + | SseTransportEvent<'critique.round_end', PayloadOf<'round_end'>> + | SseTransportEvent<'critique.ship', PayloadOf<'ship'>> + | SseTransportEvent<'critique.degraded', PayloadOf<'degraded'>> + | SseTransportEvent<'critique.interrupted', PayloadOf<'interrupted'>> + | SseTransportEvent<'critique.failed', PayloadOf<'failed'>> + | SseTransportEvent<'critique.parser_warning', PayloadOf<'parser_warning'>>; + +export const CRITIQUE_SSE_EVENT_NAMES = [ + 'critique.run_started', + 'critique.panelist_open', + 'critique.panelist_dim', + 'critique.panelist_must_fix', + 'critique.panelist_close', + 'critique.round_end', + 'critique.ship', + 'critique.degraded', + 'critique.interrupted', + 'critique.failed', + 'critique.parser_warning', +] as const satisfies readonly CritiqueSseEvent['event'][]; + +export type CritiqueSseEventName = typeof CRITIQUE_SSE_EVENT_NAMES[number]; + +export function panelEventToSse(e: PanelEvent): CritiqueSseEvent { + const { type, ...payload } = e; + // Each PanelEvent variant maps 1:1 to a CritiqueSseEvent variant by + // prefixing the type with 'critique.' and moving every other field into + // data. The cast is safe by construction. + return { event: `critique.${type}`, data: payload } as CritiqueSseEvent; +} diff --git a/packages/contracts/src/index.ts b/packages/contracts/src/index.ts index a6d20955c..a136cc210 100644 --- a/packages/contracts/src/index.ts +++ b/packages/contracts/src/index.ts @@ -13,6 +13,5 @@ export * from './api/version'; export * from './sse/common'; export * from './sse/chat'; export * from './sse/proxy'; -export * from './sse/critique'; export * from './prompts/system'; export * from './critique'; diff --git a/packages/contracts/src/sse/critique.ts b/packages/contracts/src/sse/critique.ts deleted file mode 100644 index 0599315e7..000000000 --- a/packages/contracts/src/sse/critique.ts +++ /dev/null @@ -1,40 +0,0 @@ -import type { PanelEvent } from '../critique'; -import type { SseTransportEvent } from './common'; - -type PayloadOf = Omit, 'type'>; - -export type CritiqueSseEvent = - | SseTransportEvent<'critique.run_started', PayloadOf<'run_started'>> - | SseTransportEvent<'critique.panelist_open', PayloadOf<'panelist_open'>> - | SseTransportEvent<'critique.panelist_dim', PayloadOf<'panelist_dim'>> - | SseTransportEvent<'critique.panelist_must_fix', PayloadOf<'panelist_must_fix'>> - | SseTransportEvent<'critique.panelist_close', PayloadOf<'panelist_close'>> - | SseTransportEvent<'critique.round_end', PayloadOf<'round_end'>> - | SseTransportEvent<'critique.ship', PayloadOf<'ship'>> - | SseTransportEvent<'critique.degraded', PayloadOf<'degraded'>> - | SseTransportEvent<'critique.interrupted', PayloadOf<'interrupted'>> - | SseTransportEvent<'critique.failed', PayloadOf<'failed'>> - | SseTransportEvent<'critique.parser_warning', PayloadOf<'parser_warning'>>; - -export const CRITIQUE_SSE_EVENT_NAMES = [ - 'critique.run_started', - 'critique.panelist_open', - 'critique.panelist_dim', - 'critique.panelist_must_fix', - 'critique.panelist_close', - 'critique.round_end', - 'critique.ship', - 'critique.degraded', - 'critique.interrupted', - 'critique.failed', - 'critique.parser_warning', -] as const satisfies readonly CritiqueSseEvent['event'][]; - -export type CritiqueSseEventName = typeof CRITIQUE_SSE_EVENT_NAMES[number]; - -export function panelEventToSse(e: PanelEvent): CritiqueSseEvent { - const { type, ...payload } = e; - // The cast is safe: each PanelEvent variant maps 1:1 to a CritiqueSseEvent variant - // by prefixing the type with 'critique.' and moving every other field into data. - return { event: `critique.${type}`, data: payload } as CritiqueSseEvent; -} diff --git a/packages/contracts/tests/critique.test.ts b/packages/contracts/tests/critique.test.ts index 91e0e6e05..f5b6c69f5 100644 --- a/packages/contracts/tests/critique.test.ts +++ b/packages/contracts/tests/critique.test.ts @@ -1,78 +1,53 @@ import { describe, expect, it } from 'vitest'; import { - CritiqueConfigSchema, - PANELIST_ROLES, - defaultCritiqueConfig, - isPanelEvent, + panelEventToSse, + CRITIQUE_SSE_EVENT_NAMES, type PanelEvent, } from '../src/critique'; -describe('CritiqueConfig', () => { - it('defaults validate against the schema', () => { - expect(() => CritiqueConfigSchema.parse(defaultCritiqueConfig())).not.toThrow(); +describe('CritiqueSseEvent', () => { + it('panelEventToSse maps PanelEvent.type "run_started" to event "critique.run_started"', () => { + const e: PanelEvent = { + type: 'run_started', runId: 'r1', protocolVersion: 1, + cast: ['designer','critic','brand','a11y','copy'], + maxRounds: 3, threshold: 8, scale: 10, + }; + const sse = panelEventToSse(e); + expect(sse.event).toBe('critique.run_started'); + expect(sse.data).toMatchObject({ + runId: 'r1', protocolVersion: 1, maxRounds: 3, threshold: 8, scale: 10, + }); + // No 'type' field on the SSE payload. + expect((sse.data as Record).type).toBeUndefined(); }); - it('weights default to designer=0, critic=0.4, brand=0.2, a11y=0.2, copy=0.2', () => { - const cfg = defaultCritiqueConfig(); - expect(cfg.weights.designer).toBe(0); - expect(cfg.weights.critic).toBe(0.4); - expect(cfg.weights.brand).toBe(0.2); - expect(cfg.weights.a11y).toBe(0.2); - expect(cfg.weights.copy).toBe(0.2); - const sum = Object.values(cfg.weights).reduce((a, b) => a + b, 0); - expect(sum).toBeCloseTo(1.0, 5); - }); - - it('cast lists every panelist role exactly once by default', () => { - expect(defaultCritiqueConfig().cast.sort()).toEqual([...PANELIST_ROLES].sort()); - }); - - it('rejects scoreThreshold outside [0, scoreScale]', () => { - expect(() => CritiqueConfigSchema.parse({ - ...defaultCritiqueConfig(), - scoreThreshold: -1, - })).toThrow(); - expect(() => CritiqueConfigSchema.parse({ - ...defaultCritiqueConfig(), - scoreThreshold: 11, - })).toThrow(); - }); - - it('rejects fallbackPolicy outside the allowed set', () => { - expect(() => CritiqueConfigSchema.parse({ - ...defaultCritiqueConfig(), - fallbackPolicy: 'silent_fail', - })).toThrow(); - }); -}); - -describe('PanelEvent', () => { - it('isPanelEvent recognises every variant', () => { + it('panelEventToSse round-trips every PanelEvent type', () => { const samples: PanelEvent[] = [ - { type: 'run_started', runId: 'r1', protocolVersion: 1, cast: ['designer','critic','brand','a11y','copy'], maxRounds: 3, threshold: 8, scale: 10 }, - { type: 'panelist_open', runId: 'r1', round: 1, role: 'designer' }, - { type: 'panelist_dim', runId: 'r1', round: 1, role: 'critic', dimName: 'contrast', dimScore: 4, dimNote: 'fails AA' }, - { type: 'panelist_must_fix', runId: 'r1', round: 1, role: 'a11y', text: 'restore focus ring' }, - { type: 'panelist_close', runId: 'r1', round: 1, role: 'critic', score: 6.4 }, - { type: 'round_end', runId: 'r1', round: 1, composite: 6.18, mustFix: 7, decision: 'continue', reason: 'below threshold' }, - { type: 'ship', runId: 'r1', round: 3, composite: 8.6, status: 'shipped', artifactRef: { projectId: 'p1', artifactId: 'a1' }, summary: 'shipped after 3 rounds' }, - { type: 'degraded', runId: 'r1', reason: 'malformed_block', adapter: 'pi-rpc' }, - { type: 'interrupted', runId: 'r1', bestRound: 2, composite: 7.86 }, - { type: 'failed', runId: 'r1', cause: 'cli_exit_nonzero' }, - { type: 'parser_warning', runId: 'r1', kind: 'weak_debate', position: 1024 }, + { type: 'run_started', runId: 'r', protocolVersion: 1, cast: ['critic'], maxRounds: 3, threshold: 8, scale: 10 }, + { type: 'panelist_open', runId: 'r', round: 1, role: 'designer' }, + { type: 'panelist_dim', runId: 'r', round: 1, role: 'critic', dimName: 'contrast', dimScore: 4, dimNote: '' }, + { type: 'panelist_must_fix', runId: 'r', round: 1, role: 'a11y', text: '' }, + { type: 'panelist_close', runId: 'r', round: 1, role: 'critic', score: 6 }, + { type: 'round_end', runId: 'r', round: 1, composite: 6, mustFix: 7, decision: 'continue', reason: '' }, + { type: 'ship', runId: 'r', round: 3, composite: 8.6, status: 'shipped', artifactRef: { projectId: 'p', artifactId: 'a' }, summary: '' }, + { type: 'degraded', runId: 'r', reason: 'malformed_block', adapter: 'pi-rpc' }, + { type: 'interrupted', runId: 'r', bestRound: 2, composite: 7.86 }, + { type: 'failed', runId: 'r', cause: 'cli_exit_nonzero' }, + { type: 'parser_warning', runId: 'r', kind: 'weak_debate', position: 0 }, ]; - for (const s of samples) expect(isPanelEvent(s)).toBe(true); + for (const e of samples) { + const sse = panelEventToSse(e); + expect(sse.event).toBe(`critique.${e.type}`); + } }); - it('isPanelEvent rejects non-event objects', () => { - expect(isPanelEvent({})).toBe(false); - expect(isPanelEvent({ type: 'unknown', runId: 'r1' })).toBe(false); - expect(isPanelEvent(null)).toBe(false); - expect(isPanelEvent(undefined)).toBe(false); - expect(isPanelEvent('string')).toBe(false); - expect(isPanelEvent(42)).toBe(false); - // New: type valid but runId missing -> reject - expect(isPanelEvent({ type: 'failed' })).toBe(false); - expect(isPanelEvent({ type: 'failed', runId: '' })).toBe(false); + it('CRITIQUE_SSE_EVENT_NAMES contains all 11 critique.* names', () => { + expect(CRITIQUE_SSE_EVENT_NAMES).toContain('critique.run_started'); + expect(CRITIQUE_SSE_EVENT_NAMES).toContain('critique.parser_warning'); + expect(CRITIQUE_SSE_EVENT_NAMES.length).toBe(11); + // Each name has the 'critique.' prefix. + for (const name of CRITIQUE_SSE_EVENT_NAMES) { + expect(name.startsWith('critique.')).toBe(true); + } }); }); diff --git a/packages/contracts/tests/sse/critique.test.ts b/packages/contracts/tests/sse/critique.test.ts deleted file mode 100644 index 47cc128db..000000000 --- a/packages/contracts/tests/sse/critique.test.ts +++ /dev/null @@ -1,54 +0,0 @@ -import { describe, expect, it } from 'vitest'; -import type { PanelEvent } from '../../src/critique'; -import { - panelEventToSse, - type CritiqueSseEvent, - CRITIQUE_SSE_EVENT_NAMES, -} from '../../src/sse/critique'; - -describe('CritiqueSseEvent', () => { - it('panelEventToSse maps PanelEvent.type "run_started" to event "critique.run_started"', () => { - const e: PanelEvent = { - type: 'run_started', runId: 'r1', protocolVersion: 1, - cast: ['designer','critic','brand','a11y','copy'], - maxRounds: 3, threshold: 8, scale: 10, - }; - const sse = panelEventToSse(e); - expect(sse.event).toBe('critique.run_started'); - expect(sse.data).toMatchObject({ - runId: 'r1', protocolVersion: 1, maxRounds: 3, threshold: 8, scale: 10, - }); - // No 'type' field on the SSE payload. - expect((sse.data as Record).type).toBeUndefined(); - }); - - it('panelEventToSse round-trips every PanelEvent type', () => { - const samples: PanelEvent[] = [ - { type: 'run_started', runId: 'r', protocolVersion: 1, cast: ['critic'], maxRounds: 3, threshold: 8, scale: 10 }, - { type: 'panelist_open', runId: 'r', round: 1, role: 'designer' }, - { type: 'panelist_dim', runId: 'r', round: 1, role: 'critic', dimName: 'contrast', dimScore: 4, dimNote: '' }, - { type: 'panelist_must_fix', runId: 'r', round: 1, role: 'a11y', text: '' }, - { type: 'panelist_close', runId: 'r', round: 1, role: 'critic', score: 6 }, - { type: 'round_end', runId: 'r', round: 1, composite: 6, mustFix: 7, decision: 'continue', reason: '' }, - { type: 'ship', runId: 'r', round: 3, composite: 8.6, status: 'shipped', artifactRef: { projectId: 'p', artifactId: 'a' }, summary: '' }, - { type: 'degraded', runId: 'r', reason: 'malformed_block', adapter: 'pi-rpc' }, - { type: 'interrupted', runId: 'r', bestRound: 2, composite: 7.86 }, - { type: 'failed', runId: 'r', cause: 'cli_exit_nonzero' }, - { type: 'parser_warning', runId: 'r', kind: 'weak_debate', position: 0 }, - ]; - for (const e of samples) { - const sse = panelEventToSse(e); - expect(sse.event).toBe(`critique.${e.type}`); - } - }); - - it('CRITIQUE_SSE_EVENT_NAMES contains all 11 critique.* names', () => { - expect(CRITIQUE_SSE_EVENT_NAMES).toContain('critique.run_started'); - expect(CRITIQUE_SSE_EVENT_NAMES).toContain('critique.parser_warning'); - expect(CRITIQUE_SSE_EVENT_NAMES.length).toBe(11); - // Each name has the 'critique.' prefix. - for (const name of CRITIQUE_SSE_EVENT_NAMES) { - expect(name.startsWith('critique.')).toBe(true); - } - }); -});