feat(web): Critique Theater Phase 7 — reducer + useCritiqueStream + useCritiqueReplay (#1307)

* feat(web): pure reducer for Critique Theater states (Phase 7.1)

Pure CritiqueState reducer driven by the contracts-level PanelEvent
(the same shape both the live SSE stream and the recorded transcript
emit), so a single reducer powers both the in-flight panel and the
rerun replay. Lifecycle covers run_started → running → (shipped /
degraded / interrupted / failed), with panelist_open / dim /
must_fix / close / round_end events building per-round
CritiquePanelistView entries as they arrive.

Defensive behaviour that surfaced while writing the spec tests:
- Terminal phases (shipped / degraded / interrupted / failed) are
  sticky against further lifecycle events for the same run, except
  for parser_warning which can land late and is recorded in a side
  channel without changing phase.
- A new run_started for a different runId at any time discards the
  prior state and reboots, so the UI can launch consecutive runs
  without an explicit reset action.
- Events whose runId does not match the active run return the same
  state reference, so React's useReducer doesn't re-render
  subscribers on stray traffic.
- Round bookkeeping keys by round number rather than "always last",
  so an out-of-order panelist_dim for round 1 arriving after a
  round 2 dim does not corrupt the round 2 bucket.

Test coverage: 18 cases covering each transition, the runId guard,
sticky-terminal behaviour, the out-of-order round invariant, and
the stable-identity guarantee. Sets up Phase 7.2 and 7.3 to wire
SSE + replay into the same reducer.

* feat(web): useCritiqueStream hook subscribes to SSE and feeds reducer (Phase 7.2)

createCritiqueEventsConnection is a pure connection manager that
mirrors apps/web/src/providers/project-events.ts: opens an
EventSource at /api/projects/:id/events, listens for every name in
CRITIQUE_SSE_EVENT_NAMES, decodes each frame back into a PanelEvent
(stripping the critique. prefix and merging the data payload), and
hands it to the caller's onEvent. Reconnect uses exponential
backoff (1s → 30s) and resets on `ready`; malformed payloads drop
with a dev-mode warning rather than tearing the stream.

useCritiqueStream wraps the manager in a useReducer that owns the
CritiqueState. enabled=false or a null projectId tears down the
connection cleanly; switching projectId closes the old connection
and opens a fresh one. The returned dispatch lets local UI
synthesise actions (e.g. an Esc keypress firing a synthetic
interrupted while a kill request is in flight); production traffic
comes from the SSE stream.

Test coverage:
- sse.test.ts (10 cases, node env): subscription set covers every
  CRITIQUE_SSE_EVENT_NAMES channel; payload decoding lifts the wire
  shape back to PanelEvent; malformed JSON is swallowed and does
  not stop the stream; exponential backoff schedule and ready-reset
  semantics are pinned with a setTimeout seam; close() cancels
  pending reconnects and shuts the live source; no-op fallback
  when EventSource is unavailable.
- useCritiqueStream.test.tsx (6 cases, jsdom env): idle pre-event,
  reducer driven by synthetic actions, no connection when disabled
  or projectId is null, clean close on unmount, projectId change
  reopens cleanly.

* feat(web): useCritiqueReplay hook drives reducer from transcript file (Phase 7.3)

Fetches the per-run NDJSON transcript (one PanelEvent per line),
parses every line via the shared isPanelEvent predicate, and
dispatches into the same CritiqueState reducer the live SSE stream
uses. A single reducer means the UI rendering a replay can be
identical to the live panel, and a UI mounting both
useCritiqueStream and useCritiqueReplay in parallel does not have
to reconcile two state shapes.

speed knob is `paused | instant | live | { intervalMs: N }`.
- instant flushes every event synchronously, useful for opening a
  finished run already at its terminal state.
- intervalMs paces dispatches at a fixed cadence so the reviewer
  can watch the run unfold.
- paused parses the transcript but holds events back until the
  caller advances speed (consumers can drive a scrubber later).
- live is reserved for the future "playback at original cadence"
  feature, currently treated as instant; replay timestamps are not
  yet persisted with each event so honest pacing requires a
  follow-up Phase 7+ task.

gunzip seam handles `.ndjson.gz` transcripts via
DecompressionStream when present; the production fetch path picks
between text and arrayBuffer based on the URL extension. Both seams
are injectable so the unit tests don't need to spin up a real
network or a real gzip pipeline.

Test coverage (8 cases, jsdom env):
- Idle status before any URL is provided.
- speed=instant flushes the full transcript synchronously to
  shipped state.
- speed={intervalMs:N} paces with the setTimeout seam, reaching
  done after the last tick.
- speed=paused leaves status=playing with no dispatches.
- Empty transcript reports done with state still idle.
- Fetch rejection surfaces an error status with the message.
- Malformed NDJSON lines are skipped; valid events around them
  still land.
- .gz transcripts route through the gunzip seam.

Closes the Phase 7 plan tasks 7.1 / 7.2 / 7.3 (reducer + stream +
replay), all on one branch ready for review. Phases 8+ (Theater
components) consume these from this PR.

* fix(web): close payload-override gap + paused-resume bug in Critique Theater hooks (Phase 7 review)

Two P1 fixes from lefarcen's review on PR #1307:

SSE payload override

`sseToPanelEvent` previously spread `data` after the channel-derived
`type`, so a payload-provided `type` could override the channel and
route a `critique.run_started` frame into the reducer as a `ship`
action. Reversed the spread so the channel-derived `type` is
authoritative, and revalidated the resulting object through the
contracts-level `isPanelEvent` predicate before returning. Frames
that fail validation (missing runId, empty runId, unknown type) are
dropped, so a malformed or compromised SSE frame can no longer
dispatch a wrong-shape action into the reducer.

Three new sse.test.ts cases pin the regression: hostile `type:'ship'`
in the payload still resolves to `run_started`, missing runId is
dropped, empty runId is dropped.

Replay pause/resume

`useCritiqueReplay` had one big effect keyed on `transcriptUrl`
only, so flipping `speed` from `paused` to `instant` never re-fired
and the held events sat undispatched. Split into a parse effect
(depends on URL, fetches and stores events in state) and a pace
effect (depends on parsed-events + speed, owns the cursor + timers).
The playback cursor lives in a ref that survives pause/resume
cycles, so flipping `paused` -> `instant` flushes from the current
position rather than restarting (which would double-dispatch
`run_started` and reset the reducer).

Two new useCritiqueReplay.test.tsx cases:
- paused-then-instant transitions from `playing` to `done` and
  reaches the shipped terminal phase
- intervalMs paced playback dispatches one event, pauses to drain
  the next scheduled timer, flips to instant, and confirms the
  remaining transcript drains exactly once (cursor was preserved)

Doc consistency

The earlier source comment in useCritiqueReplay.ts claimed `live`
"paces by recorded timestamps" while the impl used zero-delay
timers and the PR body said it behaves like `instant`. Aligned to
reality: `live` currently behaves like `{ intervalMs: 0 }` (events
drain on successive microtasks via setTimeoutFn) because transcripts
do not yet carry per-event timestamps. Honest timestamp-driven
pacing is queued as a Phase 7+ follow-up.

Validated: pnpm guard, pnpm --filter @open-design/web typecheck,
Theater suite 47/47 (up from 42, +3 sse + 2 replay), full web suite
96 files / 888 tests.

---------

Co-authored-by: Nagendhra <nagendhra405@gmail.com>
This commit is contained in:
Nagendhra Madishetti 2026-05-11 22:45:07 -04:00 committed by GitHub
parent 64510b790b
commit 1df3eca161
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 1930 additions and 0 deletions

View file

@ -0,0 +1,256 @@
import { useEffect, useReducer, useRef, useState } from 'react';
import type { Dispatch } from 'react';
import { isPanelEvent, type PanelEvent } from '@open-design/contracts/critique';
import {
initialState,
reduce,
type CritiqueAction,
type CritiqueState,
} from '../state/reducer';
export type ReplaySpeed = 'paused' | 'instant' | 'live' | { intervalMs: number };
export interface UseCritiqueReplayOptions {
/**
* Resolve the transcript bytes for a given URL. Tests stub this; production
* passes through `fetch`. Returns either a UTF-8 string or a binary buffer
* (for `.gz` payloads we decompress below).
*/
fetchTranscript?: (url: string) => Promise<string | ArrayBuffer>;
/**
* Decompress a gzip ArrayBuffer to a UTF-8 string. Defaults to
* `DecompressionStream('gzip')` when the runtime exposes it, with a
* test-time injection for jsdom which doesn't.
*/
gunzip?: (buffer: ArrayBuffer) => Promise<string>;
/**
* Test seam: substitute setTimeout for fake timers. Defaults to the
* platform `setTimeout`. The hook never uses `setInterval` because the
* per-event delay can vary in future (recorded-timestamp pacing for
* `live` once transcripts carry timestamps).
*/
setTimeoutFn?: typeof setTimeout;
clearTimeoutFn?: typeof clearTimeout;
}
export interface UseCritiqueReplayResult {
state: CritiqueState;
dispatch: Dispatch<CritiqueAction>;
status: ReplayStatus;
error: string | null;
}
export type ReplayStatus = 'idle' | 'loading' | 'playing' | 'done' | 'error';
/**
* Drive the Critique Theater reducer from a recorded transcript so users can
* scrub through a finished run. The transcript is a `.ndjson` (or
* `.ndjson.gz`) stream of one `PanelEvent` per line; we fetch it once, parse
* every line into a `PanelEvent`, then dispatch each one at the cadence
* selected by `speed`:
*
* - `'instant'` flush every remaining event synchronously, useful for
* test fixtures and for opening a finished run already at its terminal
* state.
* - `{ intervalMs: N }` fixed N-ms delay between events (debug mode).
* - `'live'` placeholder for the future "playback at the original
* recorded cadence" path. Current transcripts don't carry per-event
* timestamps, so for now this behaves like `{ intervalMs: 0 }` (events
* drained on successive microtasks via setTimeoutFn). The honest
* timestamp-driven implementation is queued as a Phase 7+ follow-up.
* - `'paused'` load the transcript but hold every event until the speed
* is changed to one of the values above; the cursor is preserved
* across pause/resume cycles so flipping to `instant` flushes the
* remaining events from wherever playback was suspended.
*
* The hook owns its own reducer (separate from `useCritiqueStream`) so a UI
* can mount both in parallel: live next to a replay of a prior run.
*/
export function useCritiqueReplay(
transcriptUrl: string | null,
speed: ReplaySpeed,
options: UseCritiqueReplayOptions = {},
): UseCritiqueReplayResult {
const [state, dispatch] = useReducer(reduce, initialState);
const [meta, setMeta] = useStableMeta();
const [events, setEvents] = useState<PanelEvent[] | null>(null);
const dispatchRef = useRef(dispatch);
useEffect(() => {
dispatchRef.current = dispatch;
}, [dispatch]);
// Playback cursor lives in a ref so pause -> resume picks up where the
// last tick left off instead of replaying from the start. A fresh
// transcript URL resets it to zero (see the parse effect below).
const cursorRef = useRef(0);
// Parse effect: own the fetch + parse. Runs only when the URL changes.
// Stores the parsed events in component state so the pace effect can
// react to them; cursor is reset because a new transcript is a new run.
useEffect(() => {
if (!transcriptUrl) {
setMeta({ status: 'idle', error: null });
setEvents(null);
cursorRef.current = 0;
return;
}
let cancelled = false;
const fetcher = options.fetchTranscript ?? defaultFetch;
const gunzip = options.gunzip ?? defaultGunzip;
setMeta({ status: 'loading', error: null });
cursorRef.current = 0;
setEvents(null);
(async () => {
let raw: string;
try {
const fetched = await fetcher(transcriptUrl);
if (cancelled) return;
if (typeof fetched === 'string') {
raw = fetched;
} else {
raw = transcriptUrl.endsWith('.gz')
? await gunzip(fetched)
: new TextDecoder('utf-8').decode(fetched);
}
} catch (err) {
if (cancelled) return;
setMeta({
status: 'error',
error: err instanceof Error ? err.message : String(err),
});
return;
}
if (cancelled) return;
const parsed = parseTranscript(raw);
setEvents(parsed);
})().catch(() => {
// Already surfaced via setMeta inside the async block.
});
return () => {
cancelled = true;
};
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [transcriptUrl]);
// Pace effect: react to both the parsed-events list AND speed changes.
// Cleanup cancels any in-flight setTimeout, but the cursor ref survives
// so toggling speed from `paused` to `instant` resumes from the current
// position instead of restarting from zero.
useEffect(() => {
if (!events) return;
if (events.length === 0) {
setMeta({ status: 'done', error: null });
return;
}
if (cursorRef.current >= events.length) {
setMeta({ status: 'done', error: null });
return;
}
if (speed === 'paused') {
// Holding: surface a non-terminal status so the UI can distinguish
// "fetched and ready" from "fetching" or "done". The reducer is
// untouched; the user picks a non-paused speed to advance.
setMeta({ status: 'playing', error: null });
return;
}
setMeta({ status: 'playing', error: null });
if (speed === 'instant') {
while (cursorRef.current < events.length) {
dispatchRef.current(events[cursorRef.current]!);
cursorRef.current += 1;
}
setMeta({ status: 'done', error: null });
return;
}
const setT = options.setTimeoutFn ?? setTimeout;
const clearT = options.clearTimeoutFn ?? clearTimeout;
const timers: Array<ReturnType<typeof setTimeout>> = [];
let cancelled = false;
// `live` is reserved for recorded-cadence playback; until transcripts
// carry per-event timestamps it behaves like a zero-delay tick.
const baseDelay = speed === 'live'
? 0
: typeof speed === 'object' ? speed.intervalMs : 0;
const step = () => {
if (cancelled) return;
if (cursorRef.current >= events.length) {
setMeta({ status: 'done', error: null });
return;
}
dispatchRef.current(events[cursorRef.current]!);
cursorRef.current += 1;
if (cursorRef.current < events.length) {
timers.push(setT(step, baseDelay) as ReturnType<typeof setTimeout>);
} else {
setMeta({ status: 'done', error: null });
}
};
// First event fires synchronously so `playing` is visibly distinct
// from the parse-effect's `loading`; subsequent events pace via the
// timer seam.
step();
return () => {
cancelled = true;
for (const id of timers) clearT(id);
};
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [events, speed]);
return { state, dispatch, status: meta.status, error: meta.error };
}
interface ReplayMeta {
status: ReplayStatus;
error: string | null;
}
function useStableMeta(): [ReplayMeta, (next: ReplayMeta) => void] {
const ref = useRef<ReplayMeta>({ status: 'idle', error: null });
const [, setTick] = useReducer((n: number) => n + 1, 0);
const set = (next: ReplayMeta) => {
if (ref.current.status === next.status && ref.current.error === next.error) return;
ref.current = next;
setTick();
};
return [ref.current, set];
}
function parseTranscript(raw: string): PanelEvent[] {
const out: PanelEvent[] = [];
for (const line of raw.split(/\r?\n/)) {
const trimmed = line.trim();
if (!trimmed) continue;
try {
const parsed = JSON.parse(trimmed);
if (isPanelEvent(parsed)) out.push(parsed);
} catch {
// Tolerate stray lines; the orchestrator writes one event per line so
// a bad line is recoverable. Production loggers should record the
// discard but the hook stays pure.
}
}
return out;
}
async function defaultFetch(url: string): Promise<string | ArrayBuffer> {
const res = await fetch(url);
if (!res.ok) throw new Error(`transcript fetch failed: ${res.status}`);
return url.endsWith('.gz') ? await res.arrayBuffer() : await res.text();
}
async function defaultGunzip(buffer: ArrayBuffer): Promise<string> {
// DecompressionStream is available in Node 18+ and modern browsers.
const ds = new (globalThis as { DecompressionStream?: typeof DecompressionStream }).DecompressionStream!('gzip');
const stream = new Response(buffer).body!.pipeThrough(ds);
return await new Response(stream).text();
}

View file

@ -0,0 +1,78 @@
import { useEffect, useReducer, useRef } from 'react';
import type { Dispatch } from 'react';
import {
initialState,
reduce,
type CritiqueAction,
type CritiqueState,
} from '../state/reducer';
import {
createCritiqueEventsConnection,
type CritiqueEventsConnection,
type CritiqueEventsConnectionOptions,
} from '../state/sse';
export interface UseCritiqueStreamOptions extends CritiqueEventsConnectionOptions {
/**
* Test seam: substitute the connection factory. Lets tests drive the
* reducer without spinning up a real EventSource. Defaults to
* `createCritiqueEventsConnection`.
*/
connectionFactory?: (
projectId: string,
onEvent: (action: CritiqueAction) => void,
opts: CritiqueEventsConnectionOptions,
) => CritiqueEventsConnection;
}
export interface UseCritiqueStreamResult {
state: CritiqueState;
dispatch: Dispatch<CritiqueAction>;
}
/**
* Subscribe a Critique Theater reducer to the project-scoped SSE bus. The
* hook owns the reducer (`useReducer`) and a single live connection while
* `enabled` is true and `projectId` is non-null. Tear-down (project change,
* disable, unmount) closes the connection cleanly.
*
* The returned `dispatch` lets local UI synthesise actions (e.g. a confirm
* button that fires a synthetic `interrupted` while a kill request is in
* flight); production traffic comes from the SSE stream.
*/
export function useCritiqueStream(
projectId: string | null | undefined,
enabled: boolean,
options: UseCritiqueStreamOptions = {},
): UseCritiqueStreamResult {
const [state, dispatch] = useReducer(reduce, initialState);
const dispatchRef = useRef(dispatch);
useEffect(() => {
dispatchRef.current = dispatch;
}, [dispatch]);
const factory = options.connectionFactory ?? createCritiqueEventsConnection;
useEffect(() => {
if (!enabled || !projectId) return;
if (typeof window === 'undefined' && !options.EventSourceCtor) return;
const conn = factory(
projectId,
(action) => dispatchRef.current(action),
{
EventSourceCtor: options.EventSourceCtor,
initialBackoffMs: options.initialBackoffMs,
maxBackoffMs: options.maxBackoffMs,
setTimeoutFn: options.setTimeoutFn,
clearTimeoutFn: options.clearTimeoutFn,
},
);
return () => conn.close();
// factory identity is intentionally captured at mount; rest are the
// configurable knobs a parent might tweak.
// eslint-disable-next-line react-hooks/exhaustive-deps
}, [projectId, enabled, options.EventSourceCtor, options.initialBackoffMs, options.maxBackoffMs]);
return { state, dispatch };
}

View file

@ -0,0 +1,322 @@
import type {
DegradedReason,
FailedCause,
PanelEvent,
PanelistRole,
ParserWarningKind,
RoundDecision,
ShipStatus,
} from '@open-design/contracts/critique';
export type CritiqueAction = PanelEvent;
export interface CritiqueDimScore {
name: string;
score: number;
note: string;
}
export interface CritiquePanelistView {
dims: CritiqueDimScore[];
mustFixes: string[];
score?: number;
}
export interface CritiqueRound {
n: number;
composite?: number;
/** Cumulative must-fix count across panelists for this round. */
mustFix: number;
decision?: RoundDecision;
decisionReason?: string;
panelists: Partial<Record<PanelistRole, CritiquePanelistView>>;
}
export interface CritiqueArtifactRef {
projectId: string;
artifactId: string;
}
export interface CritiqueRunConfig {
cast: PanelistRole[];
maxRounds: number;
threshold: number;
scale: number;
protocolVersion: number;
}
export interface CritiqueParserWarning {
kind: ParserWarningKind;
position: number;
}
export interface CritiqueShipped {
composite: number;
round: number;
status: ShipStatus;
artifactRef: CritiqueArtifactRef;
summary: string;
}
export interface CritiqueDegradedInfo {
reason: DegradedReason;
adapter: string;
}
export type CritiqueState =
| { phase: 'idle' }
| {
phase: 'running';
runId: string;
config: CritiqueRunConfig;
rounds: CritiqueRound[];
activeRound: number;
activePanelist: PanelistRole | null;
warnings: CritiqueParserWarning[];
}
| {
phase: 'shipped';
runId: string;
config: CritiqueRunConfig;
rounds: CritiqueRound[];
warnings: CritiqueParserWarning[];
final: CritiqueShipped;
}
| {
phase: 'degraded';
runId: string;
config: CritiqueRunConfig | null;
rounds: CritiqueRound[];
warnings: CritiqueParserWarning[];
degraded: CritiqueDegradedInfo;
}
| {
phase: 'interrupted';
runId: string;
config: CritiqueRunConfig | null;
rounds: CritiqueRound[];
warnings: CritiqueParserWarning[];
bestRound: number;
composite: number;
}
| {
phase: 'failed';
runId: string;
config: CritiqueRunConfig | null;
rounds: CritiqueRound[];
warnings: CritiqueParserWarning[];
cause: FailedCause;
};
export const initialState: CritiqueState = { phase: 'idle' };
function blankPanelist(): CritiquePanelistView {
return { dims: [], mustFixes: [] };
}
/**
* Look up an existing round by number, or seed a new one at the end of the
* list. Panelists arrive interleaved with `round_end`, so we can't assume the
* round we want is always at `rounds[rounds.length - 1]` (a stray late event
* from a previous round would otherwise corrupt the in-flight round).
*/
function withRound(
rounds: CritiqueRound[],
n: number,
mutate: (round: CritiqueRound) => void,
): CritiqueRound[] {
const idx = rounds.findIndex((r) => r.n === n);
if (idx === -1) {
const seeded: CritiqueRound = { n, mustFix: 0, panelists: {} };
mutate(seeded);
return [...rounds, seeded].sort((a, b) => a.n - b.n);
}
const next = rounds.slice();
const cloned: CritiqueRound = {
...next[idx]!,
panelists: { ...next[idx]!.panelists },
};
mutate(cloned);
next[idx] = cloned;
return next;
}
function ensurePanelist(
round: CritiqueRound,
role: PanelistRole,
): CritiquePanelistView {
const current = round.panelists[role] ?? blankPanelist();
const cloned: CritiquePanelistView = {
dims: current.dims.slice(),
mustFixes: current.mustFixes.slice(),
score: current.score,
};
round.panelists[role] = cloned;
return cloned;
}
/**
* Pure reducer for the Critique Theater run lifecycle. Driven by `PanelEvent`
* (the contracts-level event shape that both the live SSE stream and the
* replay path emit), so a single reducer powers both the in-flight panel and
* the rerun replay. Terminal phases (`shipped`, `degraded`, `interrupted`,
* `failed`) are sticky: further events for the same run are ignored. A
* `run_started` for a NEW runId at any time resets the reducer so the UI can
* launch consecutive runs without an explicit reset action.
*/
export function reduce(state: CritiqueState, action: CritiqueAction): CritiqueState {
// `run_started` is always accepted: from idle it boots a new run, and
// mid-stream it discards any prior state and reboots cleanly (the daemon
// does not multiplex two runs onto one SSE channel, so this only fires on
// an intentional rerun).
if (action.type === 'run_started') {
const config: CritiqueRunConfig = {
cast: action.cast.slice(),
maxRounds: action.maxRounds,
threshold: action.threshold,
scale: action.scale,
protocolVersion: action.protocolVersion,
};
return {
phase: 'running',
runId: action.runId,
config,
rounds: [],
activeRound: 1,
activePanelist: null,
warnings: [],
};
}
if (state.phase === 'idle') return state;
if (state.runId !== action.runId) return state;
// Terminal phases are sticky except for `run_started` (handled above) and
// `parser_warning`, which is informational and can land late.
if (
state.phase === 'shipped'
|| state.phase === 'degraded'
|| state.phase === 'interrupted'
|| state.phase === 'failed'
) {
if (action.type === 'parser_warning') {
return {
...state,
warnings: [...state.warnings, { kind: action.kind, position: action.position }],
};
}
return state;
}
switch (action.type) {
case 'panelist_open':
return {
...state,
activePanelist: action.role,
activeRound: action.round,
};
case 'panelist_dim': {
const rounds = withRound(state.rounds, action.round, (round) => {
const panelist = ensurePanelist(round, action.role);
panelist.dims.push({
name: action.dimName,
score: action.dimScore,
note: action.dimNote,
});
});
return { ...state, rounds };
}
case 'panelist_must_fix': {
const rounds = withRound(state.rounds, action.round, (round) => {
const panelist = ensurePanelist(round, action.role);
panelist.mustFixes.push(action.text);
round.mustFix += 1;
});
return { ...state, rounds };
}
case 'panelist_close': {
const rounds = withRound(state.rounds, action.round, (round) => {
const panelist = ensurePanelist(round, action.role);
panelist.score = action.score;
});
// Closing a panelist clears the active highlight; the next `panelist_open`
// will set it again.
return { ...state, rounds, activePanelist: null };
}
case 'round_end': {
const rounds = withRound(state.rounds, action.round, (round) => {
round.composite = action.composite;
round.decision = action.decision;
round.decisionReason = action.reason;
});
// After a `continue` decision the next round is implicitly active. After
// a `ship` the orchestrator will follow with `ship`, so leaving
// `activeRound` pointing at the same round is fine — the terminal
// transition will overwrite the state anyway.
const nextActive
= action.decision === 'continue' ? action.round + 1 : action.round;
return {
...state,
rounds,
activeRound: nextActive,
activePanelist: null,
};
}
case 'ship':
return {
phase: 'shipped',
runId: state.runId,
config: state.config,
rounds: state.rounds,
warnings: state.warnings,
final: {
composite: action.composite,
round: action.round,
status: action.status,
artifactRef: action.artifactRef,
summary: action.summary,
},
};
case 'degraded':
return {
phase: 'degraded',
runId: state.runId,
config: state.config,
rounds: state.rounds,
warnings: state.warnings,
degraded: { reason: action.reason, adapter: action.adapter },
};
case 'interrupted':
return {
phase: 'interrupted',
runId: state.runId,
config: state.config,
rounds: state.rounds,
warnings: state.warnings,
bestRound: action.bestRound,
composite: action.composite,
};
case 'failed':
return {
phase: 'failed',
runId: state.runId,
config: state.config,
rounds: state.rounds,
warnings: state.warnings,
cause: action.cause,
};
case 'parser_warning':
return {
...state,
warnings: [
...state.warnings,
{ kind: action.kind, position: action.position },
],
};
default: {
const _exhaustive: never = action;
void _exhaustive;
return state;
}
}
}

View file

@ -0,0 +1,138 @@
import {
CRITIQUE_SSE_EVENT_NAMES,
isPanelEvent,
type CritiqueSseEvent,
type CritiqueSseEventName,
type PanelEvent,
} from '@open-design/contracts/critique';
import type { CritiqueAction } from './reducer';
export interface CritiqueEventsConnection {
close(): void;
}
export interface CritiqueEventsConnectionOptions {
/** Test seam: substitute a mock EventSource constructor. */
EventSourceCtor?: typeof EventSource;
/** Initial backoff in ms. Defaults to 1000. */
initialBackoffMs?: number;
/** Max backoff in ms. Defaults to 30000. */
maxBackoffMs?: number;
/** Test seam: setTimeout substitutes for fake timers. */
setTimeoutFn?: typeof setTimeout;
clearTimeoutFn?: typeof clearTimeout;
}
const DEFAULT_INITIAL_BACKOFF = 1000;
const DEFAULT_MAX_BACKOFF = 30_000;
export function critiqueEventsUrl(projectId: string): string {
return `/api/projects/${encodeURIComponent(projectId)}/events`;
}
/**
* Lift an SSE-wire `CritiqueSseEvent` back into the flat `PanelEvent` shape
* the reducer consumes. The daemon emits one channel per event name with
* the payload (sans `type`) as JSON; this is the inverse of
* `panelEventToSse` in the contracts package.
*
* Two defensive moves matter here:
*
* 1. The SSE channel name is authoritative for `type`. A payload-provided
* `type` (malformed or compromised frame) must NOT override the
* channel-derived value, so we spread `data` first and pin `type`
* last. Otherwise a daemon bug or a man-in-the-middle could route a
* `critique.run_started` channel into a `ship` action shape.
*
* 2. The result has to pass `isPanelEvent` before it leaves this
* function. That predicate is the contract-level source of truth for
* "this is a recognised event with a non-empty runId"; if the cast
* fails (missing runId, unknown type), we drop the frame and the
* reducer never sees it.
*/
export function sseToPanelEvent(eventName: CritiqueSseEventName, data: unknown): PanelEvent | null {
if (data === null || typeof data !== 'object') return null;
const type = eventName.slice('critique.'.length);
const candidate = { ...(data as Record<string, unknown>), type };
return isPanelEvent(candidate) ? candidate : null;
}
/**
* Pure connection manager for a project's critique SSE channels. Mirrors the
* shape of `createProjectEventsConnection` in `apps/web/src/providers/
* project-events.ts` so tests can drive it under a node environment without
* React + JSDOM. The two managers run side-by-side on the same
* `/api/projects/:id/events` endpoint, each listening for its own event
* names.
*
* Reconnects with exponential backoff (default 1s -> 30s cap). A successful
* `ready` event resets the backoff so a flaky network doesn't permanently
* stretch the gap between events. Malformed payloads are dropped with a dev
* warning so a single bad frame doesn't tear the stream.
*/
export function createCritiqueEventsConnection(
projectId: string,
onEvent: (action: CritiqueAction) => void,
options: CritiqueEventsConnectionOptions = {},
): CritiqueEventsConnection {
const Ctor = options.EventSourceCtor
?? (typeof EventSource === 'undefined' ? null : EventSource);
if (!Ctor) return { close() { /* noop */ } };
const initialBackoff = options.initialBackoffMs ?? DEFAULT_INITIAL_BACKOFF;
const maxBackoff = options.maxBackoffMs ?? DEFAULT_MAX_BACKOFF;
const setT = options.setTimeoutFn ?? setTimeout;
const clearT = options.clearTimeoutFn ?? clearTimeout;
let cancelled = false;
let backoff = initialBackoff;
let source: EventSource | null = null;
let reconnectTimer: ReturnType<typeof setTimeout> | null = null;
const handleCritiqueFrame = (eventName: CritiqueSseEventName) => (raw: Event) => {
try {
const parsed = JSON.parse((raw as MessageEvent).data) as CritiqueSseEvent['data'];
const action = sseToPanelEvent(eventName, parsed);
if (action) onEvent(action);
} catch (err) {
if (
typeof process !== 'undefined'
&& process.env?.NODE_ENV === 'development'
) {
// eslint-disable-next-line no-console
console.warn(`[critique-events] malformed payload on ${eventName}`, err);
}
}
};
const connect = (): void => {
if (cancelled) return;
const es = new Ctor(critiqueEventsUrl(projectId));
source = es;
es.addEventListener('ready', () => {
backoff = initialBackoff;
});
for (const name of CRITIQUE_SSE_EVENT_NAMES) {
es.addEventListener(name, handleCritiqueFrame(name));
}
es.addEventListener('error', () => {
if (cancelled) return;
es.close();
if (source === es) source = null;
const delay = backoff;
backoff = Math.min(backoff * 2, maxBackoff);
reconnectTimer = setT(connect, delay) as ReturnType<typeof setTimeout>;
});
};
connect();
return {
close(): void {
cancelled = true;
if (reconnectTimer) clearT(reconnectTimer);
if (source) source.close();
},
};
}

View file

@ -0,0 +1,356 @@
// @vitest-environment jsdom
/**
* jsdom coverage for `useCritiqueReplay` (Phase 7, Task 7.3). The hook
* fetches a recorded NDJSON transcript, parses each line into a PanelEvent
* via the shared `isPanelEvent` predicate, and dispatches into the
* reducer at the chosen speed.
*/
import { act, cleanup, render, waitFor } from '@testing-library/react';
import { afterEach, describe, expect, it, vi } from 'vitest';
import type { PanelEvent } from '@open-design/contracts/critique';
import { useCritiqueReplay } from '../../../../src/components/Theater/hooks/useCritiqueReplay';
import type { CritiqueState } from '../../../../src/components/Theater/state/reducer';
import type { ReplaySpeed, UseCritiqueReplayOptions } from '../../../../src/components/Theater/hooks/useCritiqueReplay';
afterEach(() => {
cleanup();
});
interface Sink {
state: CritiqueState;
status: string;
error: string | null;
}
function Probe({ url, speed, options, sink }: {
url: string | null;
speed: ReplaySpeed;
options: UseCritiqueReplayOptions;
sink: Sink;
}) {
const { state, status, error } = useCritiqueReplay(url, speed, options);
sink.state = state;
sink.status = status;
sink.error = error;
return null;
}
const RUN_ID = 'run_replay';
function ndjson(events: PanelEvent[]): string {
return events.map((e) => JSON.stringify(e)).join('\n') + '\n';
}
const TRANSCRIPT: PanelEvent[] = [
{
type: 'run_started', runId: RUN_ID, protocolVersion: 1,
cast: ['critic'], maxRounds: 3, threshold: 8, scale: 10,
},
{ type: 'panelist_open', runId: RUN_ID, round: 1, role: 'critic' },
{
type: 'panelist_dim', runId: RUN_ID, round: 1, role: 'critic',
dimName: 'contrast', dimScore: 8, dimNote: 'ok',
},
{ type: 'panelist_close', runId: RUN_ID, round: 1, role: 'critic', score: 8 },
{
type: 'round_end', runId: RUN_ID, round: 1,
composite: 8.2, mustFix: 0, decision: 'ship', reason: 'threshold met',
},
{
type: 'ship', runId: RUN_ID, round: 1, composite: 8.2, status: 'shipped',
artifactRef: { projectId: 'p', artifactId: 'a' }, summary: 'looks good',
},
];
describe('useCritiqueReplay (Phase 7.3)', () => {
it('reports idle status when no transcriptUrl is provided', () => {
const sink: Sink = { state: { phase: 'idle' }, status: 'idle', error: null };
render(<Probe url={null} speed="instant" options={{}} sink={sink} />);
expect(sink.status).toBe('idle');
expect(sink.state.phase).toBe('idle');
});
it('parses NDJSON and dispatches every event when speed=instant', async () => {
const sink: Sink = { state: { phase: 'idle' }, status: 'idle', error: null };
const fetchTranscript = vi.fn(async () => ndjson(TRANSCRIPT));
render(
<Probe
url="/api/replay.ndjson"
speed="instant"
options={{ fetchTranscript }}
sink={sink}
/>,
);
await waitFor(() => {
expect(sink.status).toBe('done');
});
expect(fetchTranscript).toHaveBeenCalledWith('/api/replay.ndjson');
expect(sink.state.phase).toBe('shipped');
if (sink.state.phase !== 'shipped') return;
expect(sink.state.final.composite).toBe(8.2);
expect(sink.state.rounds).toHaveLength(1);
});
it('paces events with intervalMs and reaches done after the last tick', async () => {
const sink: Sink = { state: { phase: 'idle' }, status: 'idle', error: null };
const queue: Array<{ delay: number; fn: () => void }> = [];
const setTimeoutFn = ((fn: () => void, delay: number) => {
queue.push({ delay, fn });
return queue.length as unknown as ReturnType<typeof setTimeout>;
}) as typeof setTimeout;
const clearTimeoutFn = (() => undefined) as typeof clearTimeout;
render(
<Probe
url="/api/replay.ndjson"
speed={{ intervalMs: 250 }}
options={{
fetchTranscript: async () => ndjson(TRANSCRIPT),
setTimeoutFn,
clearTimeoutFn,
}}
sink={sink}
/>,
);
// After the async load resolves the first event fires synchronously and
// the hook schedules the second event via setTimeoutFn at delay 250.
await waitFor(() => {
expect(queue.length).toBeGreaterThan(0);
});
expect(queue[0]!.delay).toBe(250);
expect(sink.state.phase).toBe('running');
// Drain the rest of the queue. Each scheduled callback dispatches the
// next event AND schedules the one after it, so we keep firing until
// the queue empties.
await act(async () => {
while (queue.length > 0) {
const next = queue.shift()!;
next.fn();
}
});
expect(sink.status).toBe('done');
expect(sink.state.phase).toBe('shipped');
});
it('holds in playing state when speed=paused without dispatching', async () => {
// `paused` is still a playing status (the transcript is parsed but
// events are held back). The state stays at idle because we never
// dispatch the run_started.
const sink: Sink = { state: { phase: 'idle' }, status: 'idle', error: null };
render(
<Probe
url="/api/replay.ndjson"
speed="paused"
options={{ fetchTranscript: async () => ndjson(TRANSCRIPT) }}
sink={sink}
/>,
);
await waitFor(() => {
expect(sink.status).toBe('playing');
});
expect(sink.state.phase).toBe('idle');
});
it('resumes dispatch when speed flips from paused to instant', async () => {
// Lefarcen P1 (#1307 review): the previous revision tied the
// entire playback path to the parse effect's deps, so a
// `paused` -> `instant` flip never re-fired and the held events
// sat undispatched. Splitting parse/pace effects fixes this; the
// test pins the regression.
const sink: Sink = { state: { phase: 'idle' }, status: 'idle', error: null };
const { rerender } = render(
<Probe
url="/api/replay.ndjson"
speed="paused"
options={{ fetchTranscript: async () => ndjson(TRANSCRIPT) }}
sink={sink}
/>,
);
await waitFor(() => {
expect(sink.status).toBe('playing');
});
expect(sink.state.phase).toBe('idle');
rerender(
<Probe
url="/api/replay.ndjson"
speed="instant"
options={{ fetchTranscript: async () => ndjson(TRANSCRIPT) }}
sink={sink}
/>,
);
await waitFor(() => {
expect(sink.status).toBe('done');
});
expect(sink.state.phase).toBe('shipped');
});
it('preserves the playback cursor across pause/resume cycles in intervalMs mode', async () => {
// Drive one event manually, pause, then resume: the second tick
// must dispatch event #2 (not event #1). Cursor-survival across
// speed flips is the load-bearing property of the parse/pace
// split.
const sink: Sink = { state: { phase: 'idle' }, status: 'idle', error: null };
const queue: Array<{ delay: number; fn: () => void }> = [];
const setTimeoutFn = ((fn: () => void, delay: number) => {
queue.push({ delay, fn });
return queue.length as unknown as ReturnType<typeof setTimeout>;
}) as typeof setTimeout;
const clearTimeoutFn = (() => undefined) as typeof clearTimeout;
const { rerender } = render(
<Probe
url="/api/replay.ndjson"
speed={{ intervalMs: 500 }}
options={{
fetchTranscript: async () => ndjson(TRANSCRIPT),
setTimeoutFn,
clearTimeoutFn,
}}
sink={sink}
/>,
);
// First event dispatches synchronously after parse; the second
// is scheduled in the queue.
await waitFor(() => {
expect(queue.length).toBeGreaterThan(0);
});
expect(sink.state.phase).toBe('running');
// Flip to paused: pending timer is cancelled, status stays
// playing, no dispatches happen.
rerender(
<Probe
url="/api/replay.ndjson"
speed="paused"
options={{
fetchTranscript: async () => ndjson(TRANSCRIPT),
setTimeoutFn,
clearTimeoutFn,
}}
sink={sink}
/>,
);
// Pause cleanup drops timers but the cursor stays at 1.
queue.length = 0;
// Flip to instant: drains from event index 1 to end (NOT index 0
// again — that would double-dispatch the run_started and reset
// the reducer).
rerender(
<Probe
url="/api/replay.ndjson"
speed="instant"
options={{
fetchTranscript: async () => ndjson(TRANSCRIPT),
setTimeoutFn,
clearTimeoutFn,
}}
sink={sink}
/>,
);
await waitFor(() => {
expect(sink.status).toBe('done');
});
expect(sink.state.phase).toBe('shipped');
});
it('reports done with idle state on an empty transcript', async () => {
const sink: Sink = { state: { phase: 'idle' }, status: 'idle', error: null };
render(
<Probe
url="/api/replay.ndjson"
speed="instant"
options={{ fetchTranscript: async () => '' }}
sink={sink}
/>,
);
await waitFor(() => {
expect(sink.status).toBe('done');
});
expect(sink.state.phase).toBe('idle');
});
it('reports error status when the fetch rejects', async () => {
const sink: Sink = { state: { phase: 'idle' }, status: 'idle', error: null };
render(
<Probe
url="/api/replay.ndjson"
speed="instant"
options={{
fetchTranscript: async () => {
throw new Error('boom');
},
}}
sink={sink}
/>,
);
await waitFor(() => {
expect(sink.status).toBe('error');
});
expect(sink.error).toBe('boom');
expect(sink.state.phase).toBe('idle');
});
it('skips malformed lines and dispatches valid events around them', async () => {
const sink: Sink = { state: { phase: 'idle' }, status: 'idle', error: null };
const raw
= JSON.stringify(TRANSCRIPT[0]) + '\n'
+ 'not-valid-json\n'
+ JSON.stringify({ type: 'something_invalid', runId: RUN_ID }) + '\n'
+ JSON.stringify(TRANSCRIPT[1]) + '\n';
render(
<Probe
url="/api/replay.ndjson"
speed="instant"
options={{ fetchTranscript: async () => raw }}
sink={sink}
/>,
);
await waitFor(() => {
expect(sink.status).toBe('done');
});
expect(sink.state.phase).toBe('running');
if (sink.state.phase !== 'running') return;
expect(sink.state.activePanelist).toBe('critic');
});
it('decodes a .gz transcript through the gunzip seam', async () => {
const sink: Sink = { state: { phase: 'idle' }, status: 'idle', error: null };
// The fetch returns binary bytes; gunzip resolves to the decompressed
// string. We don't care that the bytes are real gzip; the seam is the
// contract.
const fetchTranscript = vi.fn(async () => new ArrayBuffer(8));
const gunzip = vi.fn(async () => ndjson(TRANSCRIPT));
render(
<Probe
url="/api/replay.ndjson.gz"
speed="instant"
options={{ fetchTranscript, gunzip }}
sink={sink}
/>,
);
await waitFor(() => {
expect(sink.status).toBe('done');
});
expect(gunzip).toHaveBeenCalledTimes(1);
expect(sink.state.phase).toBe('shipped');
});
});

View file

@ -0,0 +1,156 @@
// @vitest-environment jsdom
/**
* jsdom coverage for `useCritiqueStream` (Phase 7, Task 7.2). The reducer
* itself is exercised under a node environment; this suite uses a tiny
* harness so we can dispatch synthetic SSE events into the hook's lifecycle
* (mount, project change, unmount) and assert the resulting state.
*/
import { act, cleanup, render } from '@testing-library/react';
import { afterEach, describe, expect, it } from 'vitest';
import { useCritiqueStream } from '../../../../src/components/Theater/hooks/useCritiqueStream';
import type {
CritiqueAction,
CritiqueState,
} from '../../../../src/components/Theater/state/reducer';
import type {
CritiqueEventsConnection,
CritiqueEventsConnectionOptions,
} from '../../../../src/components/Theater/state/sse';
afterEach(() => {
cleanup();
});
interface FactoryHandle {
send: (action: CritiqueAction) => void;
closed: boolean;
}
function makeFactory(): {
factory: (
projectId: string,
onEvent: (action: CritiqueAction) => void,
opts: CritiqueEventsConnectionOptions,
) => CritiqueEventsConnection;
handles: FactoryHandle[];
} {
const handles: FactoryHandle[] = [];
const factory = (
_projectId: string,
onEvent: (action: CritiqueAction) => void,
_opts: CritiqueEventsConnectionOptions,
): CritiqueEventsConnection => {
const handle: FactoryHandle = {
send: (action) => onEvent(action),
closed: false,
};
handles.push(handle);
return {
close(): void {
handle.closed = true;
},
};
};
return { factory, handles };
}
interface Harness {
state: CritiqueState;
}
function Probe({ projectId, enabled, factory, sink }: {
projectId: string | null;
enabled: boolean;
factory: ReturnType<typeof makeFactory>['factory'];
sink: Harness;
}) {
const { state } = useCritiqueStream(projectId, enabled, { connectionFactory: factory });
sink.state = state;
return null;
}
describe('useCritiqueStream (Phase 7.2)', () => {
it('returns idle state before any event lands', () => {
const { factory } = makeFactory();
const sink: Harness = { state: { phase: 'idle' } };
render(<Probe projectId="proj-1" enabled factory={factory} sink={sink} />);
expect(sink.state.phase).toBe('idle');
});
it('drives the reducer from synthetic SSE events through the factory', () => {
const { factory, handles } = makeFactory();
const sink: Harness = { state: { phase: 'idle' } };
render(<Probe projectId="proj-1" enabled factory={factory} sink={sink} />);
expect(handles).toHaveLength(1);
act(() => {
handles[0]!.send({
type: 'run_started',
runId: 'r',
protocolVersion: 1,
cast: ['critic'],
maxRounds: 3,
threshold: 8,
scale: 10,
});
});
expect(sink.state.phase).toBe('running');
if (sink.state.phase !== 'running') return;
expect(sink.state.runId).toBe('r');
act(() => {
handles[0]!.send({
type: 'panelist_must_fix',
runId: 'r',
round: 1,
role: 'critic',
text: 'low contrast',
});
});
if (sink.state.phase !== 'running') return;
expect(sink.state.rounds[0]?.panelists.critic?.mustFixes).toEqual(['low contrast']);
});
it('does NOT open a connection when enabled=false', () => {
const { factory, handles } = makeFactory();
const sink: Harness = { state: { phase: 'idle' } };
render(<Probe projectId="proj-1" enabled={false} factory={factory} sink={sink} />);
expect(handles).toHaveLength(0);
expect(sink.state.phase).toBe('idle');
});
it('does NOT open a connection when projectId is null', () => {
const { factory, handles } = makeFactory();
const sink: Harness = { state: { phase: 'idle' } };
render(<Probe projectId={null} enabled factory={factory} sink={sink} />);
expect(handles).toHaveLength(0);
});
it('closes the connection on unmount', () => {
const { factory, handles } = makeFactory();
const sink: Harness = { state: { phase: 'idle' } };
const { unmount } = render(
<Probe projectId="proj-1" enabled factory={factory} sink={sink} />,
);
expect(handles).toHaveLength(1);
expect(handles[0]!.closed).toBe(false);
unmount();
expect(handles[0]!.closed).toBe(true);
});
it('reopens the connection when projectId changes (closes prior, opens fresh)', () => {
const { factory, handles } = makeFactory();
const sink: Harness = { state: { phase: 'idle' } };
const { rerender } = render(
<Probe projectId="proj-1" enabled factory={factory} sink={sink} />,
);
expect(handles).toHaveLength(1);
rerender(<Probe projectId="proj-2" enabled factory={factory} sink={sink} />);
expect(handles).toHaveLength(2);
expect(handles[0]!.closed).toBe(true);
expect(handles[1]!.closed).toBe(false);
});
});

View file

@ -0,0 +1,314 @@
/**
* Pure-state coverage for the Critique Theater reducer (Phase 7, Task 7.1).
*
* The reducer is driven by `PanelEvent` (the contracts-level event shape) so
* both the live SSE stream and the rerun replay land the same way. These
* tests pin the lifecycle transitions documented in the spec and the
* defensive behaviours (event for a different `runId` is ignored, terminal
* phases are sticky except for late parser warnings).
*/
import { describe, expect, it } from 'vitest';
import type { PanelEvent, PanelistRole } from '@open-design/contracts/critique';
import {
initialState,
reduce,
type CritiqueRound,
type CritiqueState,
} from '../../../../src/components/Theater/state/reducer';
const RUN_ID = 'run_abc';
function start(
runId = RUN_ID,
overrides: Partial<Extract<PanelEvent, { type: 'run_started' }>> = {},
): Extract<PanelEvent, { type: 'run_started' }> {
return {
type: 'run_started',
runId,
protocolVersion: 1,
cast: ['designer', 'critic', 'brand', 'a11y', 'copy'],
maxRounds: 3,
threshold: 8,
scale: 10,
...overrides,
};
}
function runningState(): Extract<CritiqueState, { phase: 'running' }> {
const next = reduce(initialState, start());
if (next.phase !== 'running') throw new Error('expected running phase');
return next;
}
function close(role: PanelistRole, round = 1, score = 7.5): Extract<PanelEvent, { type: 'panelist_close' }> {
return { type: 'panelist_close', runId: RUN_ID, round, role, score };
}
describe('Critique Theater reducer (Phase 7)', () => {
it('boots from idle to running on run_started', () => {
const next = reduce(initialState, start());
expect(next.phase).toBe('running');
if (next.phase !== 'running') return;
expect(next.runId).toBe(RUN_ID);
expect(next.config.cast).toEqual(['designer', 'critic', 'brand', 'a11y', 'copy']);
expect(next.config.maxRounds).toBe(3);
expect(next.config.threshold).toBe(8);
expect(next.config.scale).toBe(10);
expect(next.activeRound).toBe(1);
expect(next.activePanelist).toBeNull();
expect(next.rounds).toEqual([]);
expect(next.warnings).toEqual([]);
});
it('discards a fresh state copy on a new run_started even mid-run (consecutive runs)', () => {
const s1 = runningState();
const s2 = reduce(s1, {
type: 'panelist_must_fix',
runId: RUN_ID,
round: 1,
role: 'critic',
text: 'contrast too low',
});
expect(s2.phase).toBe('running');
if (s2.phase !== 'running') return;
expect(s2.rounds[0]?.mustFix).toBe(1);
const s3 = reduce(s2, start('run_xyz'));
expect(s3.phase).toBe('running');
if (s3.phase !== 'running') return;
expect(s3.runId).toBe('run_xyz');
expect(s3.rounds).toEqual([]);
});
it('ignores events whose runId does not match the active run', () => {
const s1 = runningState();
const s2 = reduce(s1, {
type: 'panelist_must_fix',
runId: 'wrong_run',
round: 1,
role: 'critic',
text: 'should not land',
});
// State reference must be identical (no allocation).
expect(s2).toBe(s1);
});
it('tracks panelist_open updates to activePanelist and activeRound', () => {
const s1 = runningState();
const s2 = reduce(s1, { type: 'panelist_open', runId: RUN_ID, round: 1, role: 'designer' });
expect(s2.phase).toBe('running');
if (s2.phase !== 'running') return;
expect(s2.activePanelist).toBe('designer');
expect(s2.activeRound).toBe(1);
});
it('accumulates panelist_dim entries grouped by role + round', () => {
let s: CritiqueState = runningState();
s = reduce(s, { type: 'panelist_open', runId: RUN_ID, round: 1, role: 'critic' });
s = reduce(s, {
type: 'panelist_dim', runId: RUN_ID, round: 1, role: 'critic',
dimName: 'hierarchy', dimScore: 8, dimNote: 'clear',
});
s = reduce(s, {
type: 'panelist_dim', runId: RUN_ID, round: 1, role: 'critic',
dimName: 'contrast', dimScore: 6, dimNote: 'borderline',
});
expect(s.phase).toBe('running');
if (s.phase !== 'running') return;
const critic = s.rounds[0]?.panelists.critic;
expect(critic?.dims).toHaveLength(2);
expect(critic?.dims[0]).toEqual({ name: 'hierarchy', score: 8, note: 'clear' });
expect(critic?.dims[1]).toEqual({ name: 'contrast', score: 6, note: 'borderline' });
});
it('counts must-fix entries cumulatively per round', () => {
let s: CritiqueState = runningState();
s = reduce(s, {
type: 'panelist_must_fix', runId: RUN_ID, round: 1, role: 'a11y',
text: 'missing alt on hero image',
});
s = reduce(s, {
type: 'panelist_must_fix', runId: RUN_ID, round: 1, role: 'a11y',
text: 'focus ring missing on primary button',
});
expect(s.phase).toBe('running');
if (s.phase !== 'running') return;
expect(s.rounds[0]?.mustFix).toBe(2);
expect(s.rounds[0]?.panelists.a11y?.mustFixes).toEqual([
'missing alt on hero image',
'focus ring missing on primary button',
]);
});
it('records panelist_close score and clears activePanelist', () => {
let s: CritiqueState = runningState();
s = reduce(s, { type: 'panelist_open', runId: RUN_ID, round: 1, role: 'brand' });
s = reduce(s, close('brand', 1, 8.2));
expect(s.phase).toBe('running');
if (s.phase !== 'running') return;
expect(s.rounds[0]?.panelists.brand?.score).toBe(8.2);
expect(s.activePanelist).toBeNull();
});
it('round_end with decision=continue advances activeRound by one', () => {
let s: CritiqueState = runningState();
s = reduce(s, close('critic', 1, 7));
s = reduce(s, {
type: 'round_end', runId: RUN_ID, round: 1,
composite: 7.4, mustFix: 2, decision: 'continue', reason: 'below threshold',
});
expect(s.phase).toBe('running');
if (s.phase !== 'running') return;
expect(s.activeRound).toBe(2);
expect(s.rounds[0]?.composite).toBe(7.4);
expect(s.rounds[0]?.decision).toBe('continue');
expect(s.rounds[0]?.decisionReason).toBe('below threshold');
});
it('round_end with decision=ship keeps activeRound pointing at the shipped round', () => {
let s: CritiqueState = runningState();
s = reduce(s, {
type: 'round_end', runId: RUN_ID, round: 1,
composite: 8.6, mustFix: 0, decision: 'ship', reason: 'threshold met',
});
expect(s.phase).toBe('running');
if (s.phase !== 'running') return;
expect(s.activeRound).toBe(1);
expect(s.rounds[0]?.decision).toBe('ship');
});
it('transitions running -> shipped on ship with final composite + artifact ref', () => {
let s: CritiqueState = runningState();
s = reduce(s, {
type: 'round_end', runId: RUN_ID, round: 1,
composite: 8.6, mustFix: 0, decision: 'ship', reason: 'threshold met',
});
s = reduce(s, {
type: 'ship', runId: RUN_ID, round: 1, composite: 8.6, status: 'shipped',
artifactRef: { projectId: 'p1', artifactId: 'a1' }, summary: 'looks good',
});
expect(s.phase).toBe('shipped');
if (s.phase !== 'shipped') return;
expect(s.final.composite).toBe(8.6);
expect(s.final.round).toBe(1);
expect(s.final.status).toBe('shipped');
expect(s.final.artifactRef).toEqual({ projectId: 'p1', artifactId: 'a1' });
expect(s.final.summary).toBe('looks good');
expect(s.rounds[0]?.composite).toBe(8.6);
});
it('transitions running -> degraded with reason + adapter', () => {
const s1 = runningState();
const s2 = reduce(s1, {
type: 'degraded', runId: RUN_ID, reason: 'malformed_block', adapter: 'pi-rpc',
});
expect(s2.phase).toBe('degraded');
if (s2.phase !== 'degraded') return;
expect(s2.degraded.reason).toBe('malformed_block');
expect(s2.degraded.adapter).toBe('pi-rpc');
});
it('transitions running -> interrupted carrying rounds + bestRound + composite', () => {
let s: CritiqueState = runningState();
s = reduce(s, {
type: 'round_end', runId: RUN_ID, round: 1,
composite: 7.9, mustFix: 1, decision: 'continue', reason: 'below threshold',
});
s = reduce(s, {
type: 'interrupted', runId: RUN_ID, bestRound: 1, composite: 7.9,
});
expect(s.phase).toBe('interrupted');
if (s.phase !== 'interrupted') return;
expect(s.rounds).toHaveLength(1);
expect(s.bestRound).toBe(1);
expect(s.composite).toBe(7.9);
});
it('transitions running -> failed with cause and preserves rounds for inspection', () => {
let s: CritiqueState = runningState();
s = reduce(s, {
type: 'round_end', runId: RUN_ID, round: 1,
composite: 7.2, mustFix: 3, decision: 'continue', reason: 'below threshold',
});
s = reduce(s, { type: 'failed', runId: RUN_ID, cause: 'cli_exit_nonzero' });
expect(s.phase).toBe('failed');
if (s.phase !== 'failed') return;
expect(s.cause).toBe('cli_exit_nonzero');
expect(s.rounds).toHaveLength(1);
});
it('records parser_warning events in a side channel without changing phase', () => {
let s: CritiqueState = runningState();
s = reduce(s, {
type: 'parser_warning', runId: RUN_ID, kind: 'weak_debate', position: 412,
});
expect(s.phase).toBe('running');
if (s.phase !== 'running') return;
expect(s.warnings).toEqual([{ kind: 'weak_debate', position: 412 }]);
});
it('accepts late parser_warning even after a terminal transition (shipped)', () => {
let s: CritiqueState = runningState();
s = reduce(s, {
type: 'round_end', runId: RUN_ID, round: 1,
composite: 8.6, mustFix: 0, decision: 'ship', reason: 'threshold met',
});
s = reduce(s, {
type: 'ship', runId: RUN_ID, round: 1, composite: 8.6, status: 'shipped',
artifactRef: { projectId: 'p1', artifactId: 'a1' }, summary: 'ok',
});
s = reduce(s, {
type: 'parser_warning', runId: RUN_ID, kind: 'composite_mismatch', position: 9001,
});
expect(s.phase).toBe('shipped');
if (s.phase !== 'shipped') return;
expect(s.warnings).toEqual([{ kind: 'composite_mismatch', position: 9001 }]);
});
it('terminal phases are sticky against non-warning events (no double-shipping)', () => {
let s: CritiqueState = runningState();
s = reduce(s, {
type: 'ship', runId: RUN_ID, round: 1, composite: 8.6, status: 'shipped',
artifactRef: { projectId: 'p1', artifactId: 'a1' }, summary: 'ok',
});
const shipped = s;
s = reduce(s, { type: 'failed', runId: RUN_ID, cause: 'orchestrator_internal' });
expect(s).toBe(shipped);
s = reduce(s, { type: 'degraded', runId: RUN_ID, reason: 'missing_artifact', adapter: 'pi-rpc' });
expect(s).toBe(shipped);
});
it('places out-of-order panelist_dim events into the right round (round 2 lands before round 1 closes)', () => {
// Defends against a stray late event from round 1 corrupting round 2's
// bucket. The reducer keys rounds by number, not by "always the last".
let s: CritiqueState = runningState();
s = reduce(s, {
type: 'panelist_dim', runId: RUN_ID, round: 2, role: 'copy',
dimName: 'voice', dimScore: 9, dimNote: 'on brand',
});
s = reduce(s, {
type: 'panelist_dim', runId: RUN_ID, round: 1, role: 'critic',
dimName: 'contrast', dimScore: 7, dimNote: 'ok',
});
expect(s.phase).toBe('running');
if (s.phase !== 'running') return;
const round1 = s.rounds.find((r: CritiqueRound) => r.n === 1);
const round2 = s.rounds.find((r: CritiqueRound) => r.n === 2);
expect(round1?.panelists.critic?.dims).toHaveLength(1);
expect(round1?.panelists.copy).toBeUndefined();
expect(round2?.panelists.copy?.dims).toHaveLength(1);
expect(round2?.panelists.critic).toBeUndefined();
});
it('returns identical state reference for events that have no effect (no allocation churn)', () => {
// The hook will dispatch into useReducer; stable identity for inert
// events keeps React from re-rendering subscribers unnecessarily.
const s1 = runningState();
const s2 = reduce(s1, {
type: 'panelist_must_fix', runId: 'wrong', round: 1, role: 'critic', text: 'x',
});
expect(s2).toBe(s1);
});
});

View file

@ -0,0 +1,310 @@
/**
* Node-environment coverage for the critique SSE connection manager
* (Phase 7, Task 7.2). Mirrors the shape of
* `apps/web/tests/providers/project-events.test.ts` so a future PR that
* unifies the two managers behind one EventSource has a single test pattern
* to merge against.
*/
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import {
createCritiqueEventsConnection,
critiqueEventsUrl,
sseToPanelEvent,
} from '../../../../src/components/Theater/state/sse';
import { CRITIQUE_SSE_EVENT_NAMES } from '@open-design/contracts/critique';
import type { CritiqueSseEventName } from '@open-design/contracts/critique';
type Listener = (evt: Event) => void;
/**
* Hand-rolled EventSource stub. The web platform's EventSource is missing
* under the default `node` vitest environment we use here, and JSDOM's
* implementation isn't friendly to deterministic event dispatch.
*/
class StubEventSource implements EventTarget {
static instances: StubEventSource[] = [];
readonly url: string;
readonly listeners = new Map<string, Set<Listener>>();
closed = false;
constructor(url: string) {
this.url = url;
StubEventSource.instances.push(this);
}
addEventListener(type: string, listener: EventListenerOrEventListenerObject | null): void {
if (typeof listener !== 'function') return;
const set = this.listeners.get(type) ?? new Set<Listener>();
set.add(listener as Listener);
this.listeners.set(type, set);
}
removeEventListener(type: string, listener: EventListenerOrEventListenerObject | null): void {
if (typeof listener !== 'function') return;
this.listeners.get(type)?.delete(listener as Listener);
}
dispatchEvent(evt: Event): boolean {
this.listeners.get(evt.type)?.forEach((l) => l(evt));
return true;
}
close(): void {
this.closed = true;
}
// Test helpers --------------------------------------------------------------
emit(type: string, data: unknown): void {
const evt = new MessageEvent(type, { data: JSON.stringify(data) });
this.dispatchEvent(evt);
}
emitRaw(type: string, raw: string): void {
const evt = new MessageEvent(type, { data: raw });
this.dispatchEvent(evt);
}
fireError(): void {
this.dispatchEvent(new Event('error'));
}
}
const RUN_ID = 'run_sse';
beforeEach(() => {
StubEventSource.instances = [];
});
afterEach(() => {
vi.restoreAllMocks();
});
describe('critique SSE connection manager (Phase 7.2)', () => {
it('opens an EventSource at /api/projects/:id/events', () => {
const conn = createCritiqueEventsConnection('proj-1', () => undefined, {
EventSourceCtor: StubEventSource as unknown as typeof EventSource,
});
expect(StubEventSource.instances).toHaveLength(1);
expect(StubEventSource.instances[0]!.url).toBe(critiqueEventsUrl('proj-1'));
conn.close();
});
it('subscribes to every CRITIQUE_SSE_EVENT_NAMES channel', () => {
createCritiqueEventsConnection('proj-1', () => undefined, {
EventSourceCtor: StubEventSource as unknown as typeof EventSource,
});
const es = StubEventSource.instances[0]!;
for (const name of CRITIQUE_SSE_EVENT_NAMES) {
expect(es.listeners.has(name)).toBe(true);
}
});
it('decodes critique.run_started and dispatches a PanelEvent with the unwrapped type', () => {
const seen: unknown[] = [];
createCritiqueEventsConnection('proj-1', (action) => seen.push(action), {
EventSourceCtor: StubEventSource as unknown as typeof EventSource,
});
const es = StubEventSource.instances[0]!;
es.emit('critique.run_started', {
runId: RUN_ID,
protocolVersion: 1,
cast: ['critic'],
maxRounds: 3,
threshold: 8,
scale: 10,
});
expect(seen).toHaveLength(1);
expect(seen[0]).toMatchObject({
type: 'run_started',
runId: RUN_ID,
protocolVersion: 1,
cast: ['critic'],
maxRounds: 3,
threshold: 8,
scale: 10,
});
});
it('decodes critique.ship preserving artifactRef + status', () => {
const seen: unknown[] = [];
createCritiqueEventsConnection('proj-1', (a) => seen.push(a), {
EventSourceCtor: StubEventSource as unknown as typeof EventSource,
});
StubEventSource.instances[0]!.emit('critique.ship', {
runId: RUN_ID,
round: 1,
composite: 8.6,
status: 'shipped',
artifactRef: { projectId: 'p1', artifactId: 'a1' },
summary: 'ok',
});
expect(seen[0]).toEqual({
type: 'ship',
runId: RUN_ID,
round: 1,
composite: 8.6,
status: 'shipped',
artifactRef: { projectId: 'p1', artifactId: 'a1' },
summary: 'ok',
});
});
it('drops a malformed payload (parse error) instead of throwing the stream', () => {
const seen: unknown[] = [];
createCritiqueEventsConnection('proj-1', (a) => seen.push(a), {
EventSourceCtor: StubEventSource as unknown as typeof EventSource,
});
const es = StubEventSource.instances[0]!;
// `not json` triggers JSON.parse to throw; the handler must swallow it.
es.emitRaw('critique.run_started', 'not json');
// The next valid frame still lands.
es.emit('critique.panelist_open', { runId: RUN_ID, round: 1, role: 'critic' });
expect(seen).toHaveLength(1);
expect(seen[0]).toMatchObject({ type: 'panelist_open' });
});
it('reconnects with exponential backoff on error and resets backoff on ready', () => {
let now = 0;
const queue: Array<{ at: number; fn: () => void }> = [];
const setTimeoutFn = ((fn: () => void, delay: number) => {
queue.push({ at: now + delay, fn });
return queue.length as unknown as ReturnType<typeof setTimeout>;
}) as typeof setTimeout;
createCritiqueEventsConnection('proj-1', () => undefined, {
EventSourceCtor: StubEventSource as unknown as typeof EventSource,
initialBackoffMs: 1000,
maxBackoffMs: 30_000,
setTimeoutFn,
});
expect(StubEventSource.instances).toHaveLength(1);
// First failure -> schedule reconnect at +1000ms with backoff doubling
// to 2000 for next time.
StubEventSource.instances[0]!.fireError();
expect(queue).toHaveLength(1);
expect(queue[0]!.at).toBe(1000);
// Drain the scheduled reconnect.
now = queue[0]!.at;
queue.shift()!.fn();
expect(StubEventSource.instances).toHaveLength(2);
// A successful `ready` resets the backoff to the initial value.
StubEventSource.instances[1]!.dispatchEvent(new Event('ready'));
// Next failure should re-arm at +1000ms, not at +2000ms.
StubEventSource.instances[1]!.fireError();
expect(queue).toHaveLength(1);
expect(queue[0]!.at - now).toBe(1000);
});
it('close() cancels pending reconnects and stops the live source', () => {
const queue: Array<{ at: number; fn: () => void }> = [];
const cancelled: Array<number> = [];
const setTimeoutFn = ((fn: () => void, delay: number) => {
queue.push({ at: delay, fn });
return queue.length as unknown as ReturnType<typeof setTimeout>;
}) as typeof setTimeout;
const clearTimeoutFn = ((id: number) => {
cancelled.push(id);
}) as typeof clearTimeout;
const conn = createCritiqueEventsConnection('proj-1', () => undefined, {
EventSourceCtor: StubEventSource as unknown as typeof EventSource,
setTimeoutFn,
clearTimeoutFn,
});
StubEventSource.instances[0]!.fireError();
expect(queue).toHaveLength(1);
conn.close();
expect(cancelled).toHaveLength(1);
expect(StubEventSource.instances[0]!.closed).toBe(true);
});
it('returns a no-op connection when EventSource is unavailable', () => {
// No EventSource constructor in node by default; the manager must not
// crash, just return a closeable no-op so the hook can guard with
// `enabled=false` without an extra check.
const conn = createCritiqueEventsConnection('proj-1', () => undefined, {
EventSourceCtor: undefined,
});
expect(typeof conn.close).toBe('function');
// close() on a no-op must not throw.
conn.close();
});
});
describe('sseToPanelEvent (Phase 7.2)', () => {
it('strips the `critique.` prefix and merges the data payload', () => {
const action = sseToPanelEvent('critique.degraded' as CritiqueSseEventName, {
runId: RUN_ID,
reason: 'malformed_block',
adapter: 'pi-rpc',
});
expect(action).toEqual({
type: 'degraded',
runId: RUN_ID,
reason: 'malformed_block',
adapter: 'pi-rpc',
});
});
it('returns null for non-object payloads (defensive, never thrown into the reducer)', () => {
expect(sseToPanelEvent('critique.run_started' as CritiqueSseEventName, null)).toBeNull();
expect(sseToPanelEvent('critique.run_started' as CritiqueSseEventName, 'oops')).toBeNull();
expect(sseToPanelEvent('critique.run_started' as CritiqueSseEventName, 42)).toBeNull();
});
it('refuses to let a payload-provided `type` override the channel-derived one', () => {
// Lefarcen P1 (#1307 review): without this guard a malformed or
// compromised SSE frame on the `critique.run_started` channel
// could carry { type: 'ship', ... } in its payload and end up
// dispatched as a ship action. The spread order pins `type` last
// AND we re-validate via isPanelEvent before returning.
const action = sseToPanelEvent('critique.run_started' as CritiqueSseEventName, {
type: 'ship', // hostile field
runId: RUN_ID,
protocolVersion: 1,
cast: ['critic'],
maxRounds: 3,
threshold: 8,
scale: 10,
});
expect(action).not.toBeNull();
expect(action?.type).toBe('run_started');
});
it('drops a payload that does not carry the required runId (isPanelEvent guard)', () => {
expect(
sseToPanelEvent('critique.run_started' as CritiqueSseEventName, {
protocolVersion: 1,
cast: ['critic'],
maxRounds: 3,
threshold: 8,
scale: 10,
}),
).toBeNull();
});
it('drops a payload with an empty runId string', () => {
expect(
sseToPanelEvent('critique.run_started' as CritiqueSseEventName, {
runId: '',
protocolVersion: 1,
cast: ['critic'],
maxRounds: 3,
threshold: 8,
scale: 10,
}),
).toBeNull();
});
});