Improve ChatGPT subscription response resilience (#57891)

## Summary

This started from #57636, after we saw ChatGPT subscription/Codex
requests stall over the past week. OpenCode v1.15.11 shipped related
resilience fixes for the same class of Codex subscription endpoint
issues, so this ports the relevant pieces into Zed's native ChatGPT
subscription provider.

When Zed asks ChatGPT/Codex for a response, sometimes the server
connection can get stuck before it even sends the first response
headers. Before this PR, Zed could wait indefinitely, which looks like
OpenCode/Zed “stalling.”

This PR makes Zed:

- Wait up to 10 seconds for the server to start responding.
- If nothing comes back in that window, treat it as a temporary
network/API failure.
- Let the existing retry logic try again instead of leaving the user
stuck.
- Send a stable session-id header so OpenAI’s Codex backend can
associate requests with the same Zed agent thread.
- Add tests to make sure:
  - stuck-before-response requests time out,
  - normal slow streaming responses are not cut off,
  - ChatGPT subscription requests send the right session header,
  - the agent retries this kind of failure.

intended user-facing result is: fewer “the assistant is just sitting
there forever” failures when using ChatGPT subscription models.

## Verification

- cargo test -p open_ai responses
- cargo test -p language_models openai_subscribed
- cargo test -p agent test_send_retry_on_http_send_error
- cargo check -p open_ai
- cargo check -p language_models
- cargo check -p agent

Release Notes:

- Fixed ChatGPT subscription requests stalling indefinitely before
response headers arrive.
This commit is contained in:
morgankrey 2026-05-28 11:41:26 -05:00 committed by GitHub
parent 5abe4bcbc6
commit ef5606bb61
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 557 additions and 15 deletions

View file

@ -4069,6 +4069,63 @@ async fn test_send_retry_on_error(cx: &mut TestAppContext) {
});
}
#[gpui::test]
async fn test_send_retry_on_http_send_error(cx: &mut TestAppContext) {
let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;
let fake_model = model.as_fake();
let mut events = thread
.update(cx, |thread, cx| {
thread.send(UserMessageId::new(), ["Hello!"], cx)
})
.expect("thread send should start");
cx.run_until_parked();
fake_model.send_last_completion_stream_error(LanguageModelCompletionError::HttpSend {
provider: LanguageModelProviderName::new("OpenAI"),
error: anyhow::anyhow!("response headers timed out after 10s"),
});
fake_model.end_last_completion_stream();
cx.executor().advance_clock(BASE_RETRY_DELAY);
cx.run_until_parked();
fake_model.send_last_completion_stream_text_chunk("Recovered!");
fake_model.end_last_completion_stream();
cx.run_until_parked();
let mut retry_events = Vec::new();
while let Some(Ok(event)) = events.next().await {
match event {
ThreadEvent::Retry(retry_status) => {
retry_events.push(retry_status);
}
ThreadEvent::Stop(..) => break,
_ => {}
}
}
assert_eq!(retry_events.len(), 1);
assert!(matches!(
retry_events[0],
acp_thread::RetryStatus { attempt: 1, .. }
));
thread.read_with(cx, |thread, _cx| {
assert_eq!(
thread.to_markdown(),
indoc! {"
## User
Hello!
## Assistant
Recovered!
"}
)
});
}
#[gpui::test]
async fn test_send_retry_finishes_tool_calls_on_error(cx: &mut TestAppContext) {
let ThreadTest { thread, model, .. } = setup(cx, TestModel::Fake).await;

View file

@ -12,12 +12,15 @@ use language_model::{
LanguageModelProviderName, LanguageModelProviderState, LanguageModelRequest,
LanguageModelToolChoice, RateLimiter,
};
use open_ai::{ReasoningEffort, responses::stream_response};
use open_ai::{
ReasoningEffort,
responses::{StreamResponseOptions, stream_response_with_options},
};
use rand::RngCore as _;
use serde::{Deserialize, Serialize};
use sha2::{Digest, Sha256};
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use ui::{ConfiguredApiCard, prelude::*};
use url::form_urlencoded;
use util::ResultExt as _;
@ -35,6 +38,31 @@ const CLIENT_ID: &str = "app_EMoamEEZ73f0CkXaXp7hrann";
const CREDENTIALS_KEY: &str = "https://chatgpt.com/backend-api/codex";
const TOKEN_REFRESH_BUFFER_MS: u64 = 5 * 60 * 1000;
const CODEX_RESPONSE_HEADER_TIMEOUT: Duration = Duration::from_secs(10);
fn codex_extra_headers(
account_id: Option<&str>,
session_id: Option<&str>,
) -> Vec<(String, String)> {
let mut extra_headers: Vec<(String, String)> = vec![
("originator".into(), "zed".into()),
("OpenAI-Beta".into(), "responses=experimental".into()),
];
if let Some(id) = account_id {
if !id.is_empty() {
extra_headers.push(("ChatGPT-Account-Id".into(), id.into()));
}
}
if let Some(id) = session_id {
if !id.is_empty() {
extra_headers.push(("session-id".into(), id.into()));
}
}
extra_headers
}
#[derive(Serialize, Deserialize, Clone, Debug)]
struct CodexCredentials {
@ -472,6 +500,7 @@ impl LanguageModel for OpenAiSubscribedLanguageModel {
// The Codex backend rejects `max_output_tokens` (`Unsupported parameter`),
// unlike the public OpenAI Responses API. Pass `None` so the field is
// omitted from the serialized request body entirely.
let session_id = request.thread_id.clone();
let mut responses_request = into_open_ai_response(
request,
self.model.id(),
@ -510,26 +539,24 @@ impl LanguageModel for OpenAiSubscribedLanguageModel {
let future = cx.spawn(async move |cx| {
let creds = get_fresh_credentials(&state, &http_client, cx).await?;
let mut extra_headers: Vec<(String, String)> = vec![
("originator".into(), "zed".into()),
("OpenAI-Beta".into(), "responses=experimental".into()),
];
if let Some(ref id) = creds.account_id {
if !id.is_empty() {
extra_headers.push(("ChatGPT-Account-Id".into(), id.clone()));
}
}
let extra_headers =
codex_extra_headers(creds.account_id.as_deref(), session_id.as_deref());
let access_token = creds.access_token.clone();
let background_executor = cx.background_executor().clone();
request_limiter
.stream(async move {
stream_response(
stream_response_with_options(
http_client.as_ref(),
PROVIDER_NAME.0.as_str(),
CODEX_BASE_URL,
&access_token,
responses_request,
extra_headers,
StreamResponseOptions::response_header_timeout(
CODEX_RESPONSE_HEADER_TIMEOUT,
background_executor.timer(CODEX_RESPONSE_HEADER_TIMEOUT),
),
)
.await
.map_err(LanguageModelCompletionError::from)
@ -1108,6 +1135,7 @@ mod tests {
use super::*;
use gpui::TestAppContext;
use http_client::FakeHttpClient;
use language_model::{LanguageModelRequestMessage, Role};
use parking_lot::Mutex;
use std::future::Future;
use std::pin::Pin;
@ -1157,6 +1185,30 @@ mod tests {
}
}
#[test]
fn test_codex_extra_headers_include_session_id() {
assert_eq!(
codex_extra_headers(Some("account-1"), Some("thread-1")),
vec![
("originator".into(), "zed".into()),
("OpenAI-Beta".into(), "responses=experimental".into()),
("ChatGPT-Account-Id".into(), "account-1".into()),
("session-id".into(), "thread-1".into()),
]
);
}
#[test]
fn test_codex_extra_headers_omit_empty_optional_ids() {
assert_eq!(
codex_extra_headers(Some(""), Some("")),
vec![
("originator".into(), "zed".into()),
("OpenAI-Beta".into(), "responses=experimental".into()),
]
);
}
fn make_expired_credentials() -> CodexCredentials {
CodexCredentials {
access_token: "old_access".to_string(),
@ -1177,6 +1229,13 @@ mod tests {
}
}
fn make_fresh_credentials_with_account() -> CodexCredentials {
CodexCredentials {
account_id: Some("account-1".to_string()),
..make_fresh_credentials()
}
}
fn fake_token_response() -> String {
serde_json::json!({
"access_token": "fresh_access",
@ -1186,6 +1245,127 @@ mod tests {
.to_string()
}
#[gpui::test]
async fn test_stream_completion_sends_codex_session_header(cx: &mut TestAppContext) {
let captured_headers = Arc::new(Mutex::new(None::<http_client::http::HeaderMap>));
let captured_headers_clone = captured_headers.clone();
let http_client = FakeHttpClient::create(move |request| {
*captured_headers_clone.lock() = Some(request.headers().clone());
async move {
let body = r#"data: {"type":"response.completed","response":{"id":"resp_1","status":"completed"}}"#;
Ok(http_client::Response::builder()
.status(200)
.body(http_client::AsyncBody::from(format!("{body}\n\n")))?)
}
});
let state = cx.new(|_cx| State {
credentials: Some(make_fresh_credentials_with_account()),
sign_in_task: None,
refresh_task: None,
load_task: None,
credentials_provider: Arc::new(FakeCredentialsProvider::new()),
auth_generation: 0,
last_auth_error: None,
});
let model = OpenAiSubscribedLanguageModel {
id: LanguageModelId::from(ChatGptModel::Gpt55.id().to_string()),
model: ChatGptModel::Gpt55,
state,
http_client,
request_limiter: RateLimiter::new(4),
};
let request = LanguageModelRequest {
thread_id: Some("thread-1".to_string()),
prompt_id: Some("prompt-1".to_string()),
messages: vec![LanguageModelRequestMessage {
role: Role::User,
content: vec!["Hello".into()],
cache: false,
reasoning_details: None,
}],
..Default::default()
};
let mut stream = model
.stream_completion(request, &cx.to_async())
.await
.expect("stream should start");
stream
.next()
.await
.expect("stream should emit event")
.expect("event should parse");
let captured_headers = captured_headers
.lock()
.clone()
.expect("request headers should be captured");
assert_eq!(
captured_headers
.get("session-id")
.and_then(|value| value.to_str().ok()),
Some("thread-1")
);
assert_eq!(
captured_headers
.get("ChatGPT-Account-Id")
.and_then(|value| value.to_str().ok()),
Some("account-1")
);
}
#[gpui::test]
async fn test_stream_completion_times_out_before_codex_headers(cx: &mut TestAppContext) {
let http_client = FakeHttpClient::create(|_request| {
futures::future::pending::<anyhow::Result<http_client::Response<AsyncBody>>>()
});
let state = cx.new(|_cx| State {
credentials: Some(make_fresh_credentials()),
sign_in_task: None,
refresh_task: None,
load_task: None,
credentials_provider: Arc::new(FakeCredentialsProvider::new()),
auth_generation: 0,
last_auth_error: None,
});
let model = OpenAiSubscribedLanguageModel {
id: LanguageModelId::from(ChatGptModel::Gpt55.id().to_string()),
model: ChatGptModel::Gpt55,
state,
http_client,
request_limiter: RateLimiter::new(4),
};
let request = LanguageModelRequest {
thread_id: Some("thread-1".to_string()),
prompt_id: Some("prompt-1".to_string()),
messages: vec![LanguageModelRequestMessage {
role: Role::User,
content: vec!["Hello".into()],
cache: false,
reasoning_details: None,
}],
..Default::default()
};
let stream_completion = model.stream_completion(request, &cx.to_async());
cx.run_until_parked();
cx.executor().advance_clock(CODEX_RESPONSE_HEADER_TIMEOUT);
let error = match stream_completion.await {
Ok(_) => panic!("stream should time out before headers arrive"),
Err(error) => error,
};
assert!(matches!(
error,
LanguageModelCompletionError::HttpSend { provider, .. }
if provider == PROVIDER_NAME
));
}
#[gpui::test]
async fn test_concurrent_refresh_deduplicates(cx: &mut TestAppContext) {
let refresh_count = Arc::new(AtomicUsize::new(0));

View file

@ -316,6 +316,12 @@ fn map_open_ai_error(error: open_ai::RequestError) -> LanguageModelCompletionErr
retry_after,
)
}
open_ai::RequestError::ResponseHeaderTimeout { timeout, .. } => {
LanguageModelCompletionError::HttpSend {
provider: PROVIDER_NAME,
error: anyhow::anyhow!("response headers timed out after {timeout:?}"),
}
}
open_ai::RequestError::Other(error) => LanguageModelCompletionError::Other(error),
}
}

View file

@ -11,7 +11,7 @@ use http_client::{
pub use language_model_core::ReasoningEffort;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{convert::TryFrom, future::Future};
use std::{convert::TryFrom, future::Future, time::Duration};
use strum::EnumIter;
use thiserror::Error;
@ -684,6 +684,8 @@ pub enum RequestError {
body: String,
headers: HeaderMap<HeaderValue>,
},
#[error("response headers from {provider}'s API timed out after {timeout:?}")]
ResponseHeaderTimeout { provider: String, timeout: Duration },
#[error(transparent)]
Other(#[from] anyhow::Error),
}
@ -903,6 +905,10 @@ impl From<RequestError> for language_model_core::LanguageModelCompletionError {
Self::from_http_status(provider.into(), status_code, body, retry_after)
}
RequestError::ResponseHeaderTimeout { provider, timeout } => Self::HttpSend {
provider: provider.into(),
error: anyhow!("response headers timed out after {timeout:?}"),
},
RequestError::Other(e) => Self::Other(e),
}
}

View file

@ -1,11 +1,266 @@
use anyhow::{Result, anyhow};
use futures::{AsyncBufReadExt, AsyncReadExt, StreamExt, io::BufReader, stream::BoxStream};
use futures::{
AsyncBufReadExt, AsyncReadExt, FutureExt, StreamExt, future::BoxFuture, io::BufReader,
stream::BoxStream,
};
use http_client::{AsyncBody, HttpClient, Method, Request as HttpRequest};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::{future::Future, time::Duration};
use crate::{ReasoningEffort, RequestError, Role, ServiceTier, ToolChoice};
#[derive(Default)]
pub struct StreamResponseOptions {
response_header_timeout: Option<(Duration, BoxFuture<'static, ()>)>,
}
impl StreamResponseOptions {
pub fn response_header_timeout(
timeout: Duration,
timer: impl Future<Output = ()> + Send + 'static,
) -> Self {
Self {
response_header_timeout: Some((timeout, timer.boxed())),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::{FutureExt, StreamExt, future};
use http_client::{
AsyncBody, HttpClient, Request as HttpRequest, Response as HttpResponse, Url,
};
use std::{
io::{Cursor, Read},
pin::Pin,
sync::{
Arc, Mutex,
atomic::{AtomicBool, Ordering},
},
task::{Context, Poll, Waker},
};
struct TestHttpClient {
handler: Arc<
dyn Fn(
HttpRequest<AsyncBody>,
) -> BoxFuture<'static, anyhow::Result<HttpResponse<AsyncBody>>>
+ Send
+ Sync,
>,
}
impl TestHttpClient {
fn new<F>(handler: F) -> Self
where
F: Fn(
HttpRequest<AsyncBody>,
) -> BoxFuture<'static, anyhow::Result<HttpResponse<AsyncBody>>>
+ Send
+ Sync
+ 'static,
{
Self {
handler: Arc::new(handler),
}
}
}
impl HttpClient for TestHttpClient {
fn user_agent(&self) -> Option<&http_client::http::HeaderValue> {
None
}
fn proxy(&self) -> Option<&Url> {
None
}
fn send(
&self,
request: HttpRequest<AsyncBody>,
) -> BoxFuture<'static, anyhow::Result<HttpResponse<AsyncBody>>> {
(self.handler)(request)
}
}
struct DelayedBody {
state: Arc<DelayedBodyState>,
bytes: Cursor<Vec<u8>>,
}
struct DelayedBodyState {
released: AtomicBool,
waker: Mutex<Option<Waker>>,
}
struct DelayedBodyHandle {
state: Arc<DelayedBodyState>,
}
impl DelayedBody {
fn new(bytes: Vec<u8>) -> (Self, DelayedBodyHandle) {
let state = Arc::new(DelayedBodyState {
released: AtomicBool::new(false),
waker: Mutex::new(None),
});
(
Self {
state: state.clone(),
bytes: Cursor::new(bytes),
},
DelayedBodyHandle { state },
)
}
}
impl DelayedBodyHandle {
fn release(&self) {
self.state.released.store(true, Ordering::SeqCst);
if let Some(waker) = self.state.waker.lock().expect("lock poisoned").take() {
waker.wake();
}
}
}
impl futures::AsyncRead for DelayedBody {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buffer: &mut [u8],
) -> Poll<std::io::Result<usize>> {
if !self.state.released.load(Ordering::SeqCst) {
self.state
.waker
.lock()
.expect("lock poisoned")
.replace(cx.waker().clone());
return Poll::Pending;
}
Poll::Ready(self.bytes.read(buffer))
}
}
fn test_request() -> Request {
Request {
model: "gpt-test".into(),
instructions: None,
input: Vec::new(),
include: Vec::new(),
stream: true,
temperature: None,
top_p: None,
max_output_tokens: None,
parallel_tool_calls: None,
tool_choice: None,
tools: Vec::new(),
prompt_cache_key: None,
reasoning: None,
store: None,
service_tier: None,
}
}
#[test]
fn stream_response_times_out_before_headers() {
futures::executor::block_on(async {
let client = TestHttpClient::new(|_| {
future::pending::<anyhow::Result<HttpResponse<AsyncBody>>>().boxed()
});
let result = stream_response_with_options(
&client,
"Test Provider",
"https://api.test/v1",
"test-key",
test_request(),
Vec::new(),
StreamResponseOptions::response_header_timeout(
Duration::from_secs(10),
future::ready(()),
),
)
.await;
assert!(matches!(
result,
Err(RequestError::ResponseHeaderTimeout {
provider,
timeout
}) if provider == "Test Provider" && timeout == Duration::from_secs(10)
));
});
}
#[test]
fn stream_response_does_not_timeout_after_headers_arrive() {
futures::executor::block_on(async {
let body = r#"data: {"type":"response.completed","response":{"id":"resp_1","status":"completed"}}"#;
let (delayed_body, delayed_body_handle) =
DelayedBody::new(format!("{body}\n\n").into_bytes());
let delayed_body = Mutex::new(Some(delayed_body));
let client = TestHttpClient::new(move |_| {
let delayed_body = delayed_body
.lock()
.expect("lock poisoned")
.take()
.expect("test sends only one request");
async {
Ok(HttpResponse::builder()
.status(200)
.body(AsyncBody::from_reader(delayed_body))?)
}
.boxed()
});
let (timeout_tx, timeout_rx) = futures::channel::oneshot::channel::<()>();
let mut stream = stream_response_with_options(
&client,
"Test Provider",
"https://api.test/v1",
"test-key",
test_request(),
Vec::new(),
StreamResponseOptions::response_header_timeout(
Duration::from_secs(10),
async move {
assert!(
timeout_rx.await.is_ok(),
"timer should be dropped after headers arrive"
);
},
),
)
.await
.expect("headers should arrive before timeout");
assert!(
timeout_tx.send(()).is_err(),
"timeout future should be dropped after headers arrive"
);
assert!(
stream.next().now_or_never().is_none(),
"stream should wait for delayed body bytes"
);
delayed_body_handle.release();
let event = stream
.next()
.await
.expect("stream should produce an event")
.expect("event should parse");
assert!(matches!(event, StreamEvent::Completed { .. }));
});
}
}
#[derive(Serialize, Debug)]
pub struct Request {
pub model: String,
@ -440,6 +695,27 @@ pub async fn stream_response(
api_key: &str,
request: Request,
extra_headers: Vec<(String, String)>,
) -> Result<BoxStream<'static, Result<StreamEvent>>, RequestError> {
stream_response_with_options(
client,
provider_name,
api_url,
api_key,
request,
extra_headers,
StreamResponseOptions::default(),
)
.await
}
pub async fn stream_response_with_options(
client: &dyn HttpClient,
provider_name: &str,
api_url: &str,
api_key: &str,
request: Request,
extra_headers: Vec<(String, String)>,
options: StreamResponseOptions,
) -> Result<BoxStream<'static, Result<StreamEvent>>, RequestError> {
let uri = format!("{api_url}/responses");
let mut request_builder = HttpRequest::builder()
@ -458,7 +734,24 @@ pub async fn stream_response(
))
.map_err(|e| RequestError::Other(e.into()))?;
let mut response = client.send(request).await?;
let mut response = if let Some((timeout, timer)) = options.response_header_timeout {
let send_request = client.send(request).fuse();
let timer = timer.fuse();
futures::pin_mut!(send_request);
futures::pin_mut!(timer);
futures::select! {
response = send_request => response?,
() = timer => {
return Err(RequestError::ResponseHeaderTimeout {
provider: provider_name.to_owned(),
timeout,
});
}
}
} else {
client.send(request).await?
};
if response.status().is_success() {
if is_streaming {
let reader = BufReader::new(response.into_body());