fix: settle completed runs and clean up shutdown children (#924)

* fix: clean up completed and shutting down runs

* fix: bound daemon CLI shutdown

Generated-By: looper 0.6.0 (runner=fixer, agent=codex)

* fix: harden daemon shutdown cleanup

Generated-By: looper 0.6.0 (runner=fixer, agent=codex)

* fix: harden daemon shutdown cleanup

Generated-By: looper 0.6.0 (runner=fixer, agent=codex)

* test: align acp abort fake with typed child
This commit is contained in:
Siri-Ray 2026-05-08 21:05:22 +08:00 committed by GitHub
parent ef9ca7baff
commit 208f09c60e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 442 additions and 14 deletions

View file

@ -328,6 +328,7 @@ export function attachAcpSession({
let emittedFirstTokenStatus = false;
let finished = false;
let fatal = false;
let aborted = false;
let stageTimer: TimerHandle | null = null;
const resetStageTimer = (label: string) => {
@ -393,6 +394,7 @@ export function attachAcpSession({
};
const parser = createJsonLineStream((raw, rawLine) => {
if (aborted) return;
resetStageTimer('response');
const obj = asObject(raw);
if (!obj) return;
@ -571,5 +573,18 @@ export function attachAcpSession({
hasFatalError() {
return fatal;
},
abort() {
if (aborted || finished) return;
aborted = true;
finished = true;
clearStageTimer();
if (!sessionId || !child.stdin || child.stdin.destroyed || child.stdin.writableEnded) return;
try {
sendRpc(child.stdin, nextId, 'session/cancel', { sessionId });
nextId += 1;
} catch {
// The caller owns process-signal fallback if the ACP transport is gone.
}
},
};
}

View file

@ -134,7 +134,43 @@ for (let i = 0; i < argv.length; i++) {
}
}
startServer({ port, host }).then(url => {
startServer({ port, host, returnServer: true }).then((started) => {
const { url, server, shutdown } = started;
const closeTimeoutMs = 5_000;
const closeServer = () => new Promise((resolve) => {
let resolved = false;
const resolveOnce = () => {
if (resolved) return;
resolved = true;
resolve();
};
const idleTimer = setTimeout(() => {
server.closeIdleConnections?.();
}, Math.min(1_000, closeTimeoutMs));
const hardTimer = setTimeout(() => {
server.closeAllConnections?.();
resolveOnce();
}, closeTimeoutMs);
idleTimer.unref?.();
hardTimer.unref?.();
server.close(() => resolveOnce());
}).finally(() => {
server.closeIdleConnections?.();
});
let shuttingDown = false;
const stop = () => {
if (shuttingDown) {
process.exit(0);
}
shuttingDown = true;
const closePromise = closeServer();
const shutdownPromise = Promise.resolve().then(() => shutdown?.());
void Promise.resolve()
.then(() => Promise.allSettled([shutdownPromise, closePromise]))
.finally(() => process.exit(0));
};
process.on('SIGINT', stop);
process.on('SIGTERM', stop);
console.log(`[od] listening on ${url}`);
if (open) {
const opener = process.platform === 'darwin' ? 'open'

View file

@ -8,6 +8,7 @@ export function createChatRunService({
createSseErrorPayload,
maxEvents = 2_000,
ttlMs = 30 * 60 * 1000,
shutdownGraceMs = 3_000,
}) {
const runs = new Map();
@ -121,6 +122,36 @@ export function createChatRunService({
return true;
});
const waitForChildExit = (child, timeoutMs) => {
if (!child) return Promise.resolve(true);
if (child.exitCode !== null || child.signalCode !== null) return Promise.resolve(true);
return new Promise((resolve) => {
let settled = false;
const done = (exited) => {
if (settled) return;
settled = true;
clearTimeout(timer);
child.off?.('close', onClose);
child.off?.('exit', onClose);
resolve(exited);
};
const onClose = () => done(true);
const timer = setTimeout(() => done(false), timeoutMs);
timer.unref?.();
child.once?.('close', onClose);
child.once?.('exit', onClose);
});
};
const killChild = (run, signal) => {
if (!run.child || run.child.exitCode !== null || run.child.signalCode !== null) return false;
try {
return run.child.kill(signal);
} catch {
return false;
}
};
const cancel = (run) => {
if (!TERMINAL_RUN_STATUSES.has(run.status)) {
run.cancelRequested = true;
@ -143,6 +174,27 @@ export function createChatRunService({
}
};
const shutdownActive = async ({ graceMs = shutdownGraceMs } = {}) => {
const activeRuns = Array.from(runs.values()).filter((run) => !TERMINAL_RUN_STATUSES.has(run.status));
await Promise.all(activeRuns.map(async (run) => {
run.cancelRequested = true;
run.updatedAt = Date.now();
if (run.acpSession?.abort) {
try {
run.acpSession.abort();
} catch {
// Process signals below are the shutdown fallback.
}
}
killChild(run, 'SIGTERM');
finish(run, 'canceled', null, 'SIGTERM');
if (run.child && !(await waitForChildExit(run.child, graceMs))) {
killChild(run, 'SIGKILL');
await waitForChildExit(run.child, 500);
}
}));
};
const wait = (run) => {
if (TERMINAL_RUN_STATUSES.has(run.status)) return Promise.resolve(statusBody(run));
return new Promise((resolve) => run.waiters.add(resolve));
@ -155,6 +207,7 @@ export function createChatRunService({
list,
stream,
cancel,
shutdownActive,
wait,
emit,
finish,

View file

@ -1730,8 +1730,15 @@ function resolveChatRunInactivityTimeoutMs() {
return Math.max(0, Math.floor(raw));
}
function resolveChatRunShutdownGraceMs() {
const raw = Number(process.env.OD_CHAT_RUN_SHUTDOWN_GRACE_MS);
if (!Number.isFinite(raw)) return 3_000;
return Math.max(0, Math.floor(raw));
}
export async function startServer({ port = 7456, host = process.env.OD_BIND_HOST || '127.0.0.1', returnServer = false } = {}) {
let resolvedPort = port;
let daemonShuttingDown = false;
const extraAllowedOrigins = configuredAllowedOrigins();
const app = express();
app.use(express.json({ limit: '4mb' }));
@ -5868,6 +5875,9 @@ export async function startServer({ port = 7456, host = process.env.OD_BIND_HOST
});
app.post('/api/runs', (req, res) => {
if (daemonShuttingDown) {
return sendApiError(res, 503, 'UPSTREAM_UNAVAILABLE', 'daemon is shutting down');
}
const run = design.runs.create(req.body || {});
/** @type {import('@open-design/contracts').ChatRunCreateResponse} */
const body = { runId: run.id };
@ -5905,6 +5915,9 @@ export async function startServer({ port = 7456, host = process.env.OD_BIND_HOST
});
app.post('/api/chat', (req, res) => {
if (daemonShuttingDown) {
return sendApiError(res, 503, 'UPSTREAM_UNAVAILABLE', 'daemon is shutting down');
}
const run = design.runs.create();
design.runs.stream(run, req, res);
design.runs.start(run, () => startChatRun(req.body || {}, run));
@ -6591,14 +6604,21 @@ export async function startServer({ port = 7456, host = process.env.OD_BIND_HOST
// critical when port=0 (ephemeral port) and when the embedding sidecar
// needs to advertise the port to a parent process before any request
// can flow. Three callers depend on this contract:
// - `apps/daemon/src/cli.ts` → expects a `url` string
// - `apps/daemon/src/cli.ts` → expects `{ url, server, shutdown }`
// - `apps/daemon/sidecar/server.ts` → expects `{ url, server }`
// - `apps/daemon/tests/version-route.test.ts` → expects `{ url, server }`
return await new Promise((resolve, reject) => {
let daemonShutdownStarted = false;
const cleanupDaemonBackgroundWork = () => {
composioConnectorProvider.stopCatalogRefreshLoop();
orbitService.stop();
};
const shutdownDaemonRuns = async () => {
if (daemonShutdownStarted) return;
daemonShutdownStarted = true;
daemonShuttingDown = true;
await design.runs.shutdownActive({ graceMs: resolveChatRunShutdownGraceMs() });
};
let server;
try {
server = app.listen(port, host, () => {
@ -6627,14 +6647,16 @@ export async function startServer({ port = 7456, host = process.env.OD_BIND_HOST
console.log(`[od] daemon listening on ${url}`);
}
daemonUrl = url;
resolve(returnServer ? { url, server } : url);
resolve(returnServer ? { url, server, shutdown: shutdownDaemonRuns } : url);
});
} catch (error) {
cleanupDaemonBackgroundWork();
reject(error);
return;
}
server.once('close', cleanupDaemonBackgroundWork);
server.once('close', () => {
void shutdownDaemonRuns().finally(cleanupDaemonBackgroundWork);
});
// `app.listen` throws synchronously when the port is already in use on
// some Node versions, but emits an `error` event on others (and for
// EACCES / EADDRNOTAVAIL even on the same Node). Wire the event so the

View file

@ -18,6 +18,12 @@ import { startServer } from "../server.js";
const DAEMON_PORT_ENV = SIDECAR_ENV.DAEMON_PORT;
const TOOLS_DEV_PARENT_PID_ENV = SIDECAR_ENV.TOOLS_DEV_PARENT_PID;
type StartedDaemonServer = {
server: Server;
url: string;
shutdown?: () => Promise<void>;
};
export type DaemonSidecarHandle = {
status(): Promise<DaemonStatusSnapshot>;
stop(): Promise<void>;
@ -33,10 +39,39 @@ function parsePort(value: string | undefined): number {
return port;
}
async function closeHttpServer(server: Server): Promise<void> {
export async function closeHttpServer(
server: Server,
{ closeTimeoutMs = 5_000, idleCloseMs = 1_000 } = {},
): Promise<void> {
if (!server.listening) return;
await new Promise<void>((resolveClose, rejectClose) => {
server.close((error) => (error == null ? resolveClose() : rejectClose(error)));
let resolved = false;
const resolveOnce = () => {
if (resolved) return;
resolved = true;
clearTimeout(idleTimer);
clearTimeout(hardTimer);
resolveClose();
};
const rejectOnce = (error: Error) => {
if (resolved) return;
resolved = true;
clearTimeout(idleTimer);
clearTimeout(hardTimer);
rejectClose(error);
};
const idleTimer = setTimeout(() => {
server.closeIdleConnections?.();
}, Math.min(idleCloseMs, closeTimeoutMs));
const hardTimer = setTimeout(() => {
server.closeAllConnections?.();
resolveOnce();
}, closeTimeoutMs);
idleTimer.unref?.();
hardTimer.unref?.();
server.close((error) => (error == null ? resolveOnce() : rejectOnce(error)));
}).finally(() => {
server.closeIdleConnections?.();
});
}
@ -64,7 +99,7 @@ function attachParentMonitor(stop: () => Promise<void>): void {
export async function startDaemonSidecar(runtime: SidecarRuntimeContext<SidecarStamp>): Promise<DaemonSidecarHandle> {
const started = await startServer({ port: parsePort(process.env[DAEMON_PORT_ENV]), returnServer: true }) as
| string
| { server: Server; url: string };
| StartedDaemonServer;
if (typeof started === "string") {
throw new Error("daemon startServer did not return a server handle");
}
@ -88,8 +123,12 @@ export async function startDaemonSidecar(runtime: SidecarRuntimeContext<SidecarS
stopped = true;
state.state = "stopped";
state.updatedAt = new Date().toISOString();
const closePromise = closeHttpServer(serverHandle.server).catch(() => undefined);
const shutdownPromise = serverHandle.shutdown?.().catch((error: unknown) => {
console.error("daemon shutdown cleanup failed", error);
}) ?? Promise.resolve();
await ipcServer?.close().catch(() => undefined);
await closeHttpServer(serverHandle.server).catch(() => undefined);
await Promise.allSettled([closePromise, shutdownPromise]);
resolveStopped();
}

View file

@ -1,7 +1,9 @@
import assert from 'node:assert/strict';
import { EventEmitter } from 'node:events';
import { PassThrough } from 'node:stream';
import path from 'node:path';
import { test } from 'vitest';
import { buildAcpSessionNewParams } from '../src/acp.js';
import { attachAcpSession, buildAcpSessionNewParams } from '../src/acp.js';
test('ACP session params do not require MCP servers by default', () => {
assert.deepEqual(buildAcpSessionNewParams('/tmp/od-project'), {
@ -47,3 +49,47 @@ test('ACP session params preserve caller-provided type and env fields', () => {
assert.equal(server.name, 'http-server');
assert.deepEqual(server.env, [{ key: 'TOKEN', value: 'secret' }]);
});
test('attachAcpSession exposes abort and sends session cancel after session creation', () => {
const child = new FakeAcpChild();
const writes: string[] = [];
child.stdin.on('data', (chunk) => writes.push(String(chunk)));
const session = attachAcpSession({
child: child as never,
prompt: 'hello',
cwd: '/tmp/od-project',
model: null,
mcpServers: [],
send: () => {},
});
child.stdout.write(`${JSON.stringify({ id: 1, result: {} })}\n`);
child.stdout.write(`${JSON.stringify({ id: 2, result: { sessionId: 'session-1' } })}\n`);
assert.equal(typeof session.abort, 'function');
session.abort();
session.abort();
const parsed = writes
.join('')
.trim()
.split('\n')
.filter(Boolean)
.map((line) => JSON.parse(line));
const cancelRequests = parsed.filter((entry) => entry.method === 'session/cancel');
assert.equal(cancelRequests.length, 1);
assert.deepEqual(cancelRequests[0].params, { sessionId: 'session-1' });
});
class FakeAcpChild extends EventEmitter {
stdin = new PassThrough();
stdout = new PassThrough();
stderr = new PassThrough();
killed = false;
kill() {
this.killed = true;
return true;
}
}

View file

@ -387,6 +387,61 @@ setInterval(() => {}, 1000);
});
});
describe('daemon run creation during shutdown', () => {
it('rejects new run creation while shutdown cleanup is still in flight', async () => {
const previousGrace = process.env.OD_CHAT_RUN_SHUTDOWN_GRACE_MS;
process.env.OD_CHAT_RUN_SHUTDOWN_GRACE_MS = '100';
const started = await startServer({ port: 0, returnServer: true }) as {
url: string;
server: http.Server;
shutdown: () => Promise<void>;
};
try {
await withFakeAgent(
'opencode',
`
process.on('SIGTERM', () => {});
setInterval(() => {}, 1000);
`,
async () => {
const activeResponse = await fetch(`${started.url}/api/runs`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ agentId: 'opencode', message: 'hello' }),
});
expect(activeResponse.status).toBe(202);
const { runId } = await activeResponse.json() as { runId: string };
await waitForRunStatus(started.url, runId, (status) => status === 'running');
const shutdownPromise = started.shutdown();
const runResponse = await fetch(`${started.url}/api/runs`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ agentId: 'opencode', message: 'late run' }),
});
const chatResponse = await fetch(`${started.url}/api/chat`, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ agentId: 'opencode', message: 'late chat' }),
});
expect(runResponse.status).toBe(503);
expect(chatResponse.status).toBe(503);
await shutdownPromise;
},
);
} finally {
if (previousGrace == null) {
delete process.env.OD_CHAT_RUN_SHUTDOWN_GRACE_MS;
} else {
process.env.OD_CHAT_RUN_SHUTDOWN_GRACE_MS = previousGrace;
}
await new Promise<void>((resolve) => started.server.close(() => resolve()));
}
});
});
async function readSseUntil(response: Response, marker: string): Promise<string> {
const reader = response.body!.getReader();
const decoder = new TextDecoder();
@ -400,14 +455,18 @@ async function readSseUntil(response: Response, marker: string): Promise<string>
return body;
}
async function waitForRunStatus(baseUrl: string, runId: string): Promise<{ status: string }> {
async function waitForRunStatus(
baseUrl: string,
runId: string,
done: (status: string) => boolean = (status) => status !== 'queued' && status !== 'running',
): Promise<{ status: string }> {
for (let attempt = 0; attempt < 120; attempt += 1) {
const statusResponse = await fetch(`${baseUrl}/api/runs/${runId}`);
const statusBody = await statusResponse.json() as { status: string };
if (statusBody.status !== 'queued' && statusBody.status !== 'running') return statusBody;
if (done(statusBody.status)) return statusBody;
await new Promise((resolve) => setTimeout(resolve, 25));
}
throw new Error('run did not finish');
throw new Error('run did not reach expected status');
}
describe('chat prompt helpers', () => {

View file

@ -0,0 +1,93 @@
import { EventEmitter } from 'node:events';
import { describe, expect, it, vi } from 'vitest';
import { createChatRunService } from '../src/runs.js';
describe('chat run service shutdown', () => {
it('cancels active runs and terminates their child process during daemon shutdown', async () => {
const runs = createRuns();
const child = new FakeChildProcess({ closeOn: 'SIGTERM' });
const run = runs.create({ projectId: 'project-1', conversationId: 'conv-1' });
run.status = 'running';
(run as any).child = child;
const wait = runs.wait(run);
await runs.shutdownActive({ graceMs: 10 });
expect(child.signals).toEqual(['SIGTERM']);
expect(run.status).toBe('canceled');
expect(run.cancelRequested).toBe(true);
expect(run.signal).toBe('SIGTERM');
await expect(wait).resolves.toMatchObject({ status: 'canceled', signal: 'SIGTERM' });
expect(run.events.at(-1)).toMatchObject({
event: 'end',
data: { status: 'canceled', signal: 'SIGTERM' },
});
});
it('escalates to SIGKILL when a child ignores the shutdown SIGTERM grace window', async () => {
const runs = createRuns();
const child = new FakeChildProcess({ closeOn: 'SIGKILL' });
const run = runs.create();
run.status = 'running';
(run as any).child = child;
await runs.shutdownActive({ graceMs: 1 });
expect(child.signals).toEqual(['SIGTERM', 'SIGKILL']);
expect(run.status).toBe('canceled');
});
it('uses adapter abort before process signals for ACP-style runs', async () => {
const runs = createRuns();
const child = new FakeChildProcess({ closeOn: 'SIGTERM' });
const abort = vi.fn();
const run = runs.create();
run.status = 'running';
(run as any).child = child;
(run as any).acpSession = { abort };
await runs.shutdownActive({ graceMs: 10 });
expect(abort).toHaveBeenCalledTimes(1);
expect(child.signals).toEqual(['SIGTERM']);
expect(run.status).toBe('canceled');
});
});
function createRuns() {
return createChatRunService({
createSseResponse: () => ({
send: vi.fn(() => true),
end: vi.fn(),
cleanup: vi.fn(),
}),
createSseErrorPayload: (code: string, message: string) => ({ error: { code, message } }),
shutdownGraceMs: 10,
ttlMs: 60_000,
});
}
class FakeChildProcess extends EventEmitter {
exitCode: number | null = null;
signalCode: string | null = null;
killed = false;
signals: string[] = [];
constructor(private readonly options: { closeOn: 'SIGTERM' | 'SIGKILL' }) {
super();
}
kill(signal: string): boolean {
this.killed = true;
this.signals.push(signal);
if (signal === this.options.closeOn) {
this.signalCode = signal;
queueMicrotask(() => {
this.emit('exit', null, signal);
this.emit('close', null, signal);
});
}
return true;
}
}

View file

@ -0,0 +1,53 @@
import { createServer, type Server } from 'node:http';
import { afterEach, describe, expect, it } from 'vitest';
import { closeHttpServer } from '../src/sidecar/server.js';
describe('daemon sidecar HTTP shutdown', () => {
let server: Server | null = null;
afterEach(async () => {
if (!server?.listening) return;
await new Promise<void>((resolve) => server!.close(() => resolve()));
server = null;
});
it('force-closes long-lived responses when the graceful close timeout expires', async () => {
server = createServer((_req, res) => {
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
Connection: 'keep-alive',
});
res.write('event: open\ndata: {}\n\n');
});
await listen(server);
const response = await fetch(`http://127.0.0.1:${port(server)}/events`);
expect(response.status).toBe(200);
const startedAt = Date.now();
await closeHttpServer(server, { closeTimeoutMs: 50, idleCloseMs: 5 });
expect(Date.now() - startedAt).toBeLessThan(1_000);
expect(server.listening).toBe(false);
});
});
async function listen(server: Server): Promise<void> {
await new Promise<void>((resolve, reject) => {
server.once('error', reject);
server.listen(0, '127.0.0.1', () => {
server.off('error', reject);
resolve();
});
});
}
function port(server: Server): number {
const address = server.address();
if (!address || typeof address === 'string') {
throw new Error('server did not bind to a TCP port');
}
return address.port;
}

View file

@ -1191,7 +1191,7 @@ export function ProjectView({
updateAssistant((prev) => ({
...prev,
endedAt: Date.now(),
runStatus: config.mode === 'api' || prev.runId ? 'succeeded' : prev.runStatus,
runStatus: resolveSucceededRunStatus(prev.runStatus),
}));
if (commentAttachments.length > 0) {
void patchAttachedStatuses(commentAttachments, 'needs_review');
@ -1937,6 +1937,10 @@ function isActiveRunStatus(status: ChatMessage['runStatus']): boolean {
return status === 'queued' || status === 'running';
}
export function resolveSucceededRunStatus(status: ChatMessage['runStatus']): ChatMessage['runStatus'] {
return status === 'failed' || status === 'canceled' ? status : 'succeeded';
}
type BufferedTextUpdates = ReturnType<typeof createBufferedTextUpdates>;
function createBufferedTextUpdates({

View file

@ -2,7 +2,7 @@
import { cleanup, render, waitFor } from '@testing-library/react';
import { afterEach, describe, expect, it, vi } from 'vitest';
import { ProjectView } from '../../src/components/ProjectView';
import { ProjectView, resolveSucceededRunStatus } from '../../src/components/ProjectView';
const listConversations = vi.fn();
const listMessages = vi.fn();
@ -165,4 +165,12 @@ describe('ProjectView daemon cleanup', () => {
expect((seenSignal as any).aborted).toBe(true);
expect((seenCancelSignal as any).aborted).toBe(false);
});
it('marks successful daemon completion as succeeded even before runId reaches message state', () => {
expect(resolveSucceededRunStatus('running')).toBe('succeeded');
expect(resolveSucceededRunStatus('queued')).toBe('succeeded');
expect(resolveSucceededRunStatus(undefined)).toBe('succeeded');
expect(resolveSucceededRunStatus('failed')).toBe('failed');
expect(resolveSucceededRunStatus('canceled')).toBe('canceled');
});
});