mirror of
https://github.com/zed-industries/zed.git
synced 2026-05-31 19:05:00 +07:00
client: Reconnect cloud websocket after it disconnects (#57078)
The cloud websocket was established once during sign-in and never re-established. On any server restart or transient network drop the connection task exited. yawc itself does not reconnect. This wraps `connect_to_cloud` in a long-lived task that re-establishes the websocket with exponential backoff and jitter, reusing `INITIAL_RECONNECTION_DELAY` and `MAX_RECONNECTION_DELAY` so the behavior matches the Collab reconnect loop in the same module. Part of CLO-713. Release Notes: - N/A
This commit is contained in:
parent
78b5c0e774
commit
8d28ca5c01
1 changed files with 47 additions and 15 deletions
|
|
@ -334,6 +334,7 @@ struct ClientState {
|
|||
credentials: Option<Credentials>,
|
||||
status: (watch::Sender<Status>, watch::Receiver<Status>),
|
||||
_reconnect_task: Option<Task<()>>,
|
||||
_cloud_connection_task: Option<Task<()>>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Eq, PartialEq)]
|
||||
|
|
@ -435,6 +436,7 @@ impl Default for ClientState {
|
|||
credentials: None,
|
||||
status: watch::channel_with(Status::SignedOut),
|
||||
_reconnect_task: None,
|
||||
_cloud_connection_task: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -607,6 +609,7 @@ impl Client {
|
|||
pub fn teardown(&self) {
|
||||
let mut state = self.state.write();
|
||||
state._reconnect_task.take();
|
||||
state._cloud_connection_task.take();
|
||||
self.handler_set.lock().clear();
|
||||
self.peer.teardown();
|
||||
}
|
||||
|
|
@ -724,6 +727,7 @@ impl Client {
|
|||
Status::SignedOut | Status::UpgradeRequired => {
|
||||
self.telemetry.set_authenticated_user_info(None, false);
|
||||
state._reconnect_task.take();
|
||||
state._cloud_connection_task.take();
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
|
@ -957,28 +961,56 @@ impl Client {
|
|||
}
|
||||
}
|
||||
|
||||
/// Establishes a WebSocket connection with Cloud for receiving updates from the server.
|
||||
async fn connect_to_cloud(self: &Arc<Self>, cx: &AsyncApp) -> Result<()> {
|
||||
/// Maintains a WebSocket connection with Cloud for receiving updates from the server.
|
||||
///
|
||||
/// The connection is re-established with exponential backoff if it drops or fails to
|
||||
/// establish.
|
||||
fn connect_to_cloud(self: &Arc<Self>, cx: &AsyncApp) {
|
||||
let this = self.clone();
|
||||
let task = cx.spawn(async move |cx| {
|
||||
#[cfg(any(test, feature = "test-support"))]
|
||||
let mut rng = StdRng::seed_from_u64(0);
|
||||
#[cfg(not(any(test, feature = "test-support")))]
|
||||
let mut rng = StdRng::from_os_rng();
|
||||
|
||||
let mut delay = INITIAL_RECONNECTION_DELAY;
|
||||
loop {
|
||||
match Self::run_cloud_connection(&this, cx).await {
|
||||
Ok(()) => {
|
||||
log::info!("cloud websocket disconnected, will reconnect");
|
||||
delay = INITIAL_RECONNECTION_DELAY;
|
||||
}
|
||||
Err(err) => {
|
||||
log::warn!(
|
||||
"cloud websocket connect failed: {err:#}; retrying in {delay:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let jitter = Duration::from_millis(rng.random_range(0..delay.as_millis() as u64));
|
||||
cx.background_executor().timer(delay + jitter).await;
|
||||
delay = cmp::min(delay * 2, MAX_RECONNECTION_DELAY);
|
||||
}
|
||||
});
|
||||
self.state.write()._cloud_connection_task = Some(task);
|
||||
}
|
||||
|
||||
/// Runs a single attempt of the cloud websocket connection, returning once the connection
|
||||
/// closes (cleanly or otherwise) or fails to establish.
|
||||
async fn run_cloud_connection(self: &Arc<Self>, cx: &mut AsyncApp) -> Result<()> {
|
||||
let connect_task = cx.update({
|
||||
let cloud_client = self.cloud_client.clone();
|
||||
move |cx| cloud_client.connect(cx)
|
||||
})?;
|
||||
let connection = connect_task.await?;
|
||||
|
||||
let (mut messages, task) = cx.update(|cx| connection.spawn(cx));
|
||||
task.detach();
|
||||
let (mut messages, _cloud_io_task) = cx.update(|cx| connection.spawn(cx));
|
||||
|
||||
cx.spawn({
|
||||
let this = self.clone();
|
||||
async move |cx| {
|
||||
while let Some(message) = messages.next().await {
|
||||
if let Some(message) = message.log_err() {
|
||||
this.handle_message_to_client(message, cx);
|
||||
}
|
||||
}
|
||||
while let Some(message) = messages.next().await {
|
||||
if let Some(message) = message.log_err() {
|
||||
self.handle_message_to_client(message, cx);
|
||||
}
|
||||
})
|
||||
.detach();
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
@ -1009,7 +1041,7 @@ impl Client {
|
|||
|
||||
let credentials = self.sign_in(try_provider, cx).await?;
|
||||
|
||||
self.connect_to_cloud(cx).await.log_err();
|
||||
self.connect_to_cloud(cx);
|
||||
|
||||
cx.update(move |cx| {
|
||||
cx.spawn({
|
||||
|
|
|
|||
Loading…
Reference in a new issue