clock: Cleanup ReplicaId, Lamport and Global (#40600)

- Notable change is the use of a newtype for `ReplicaId`
- Fixes `WorktreeStore::create_remote_worktree` creating a remote
worktree with the local replica id, though this is not currently used
- Fixes observing the `Agent` (that is following the agent) causing
global clocks to allocate 65535 elements
- Shrinks the size of `Global` a bit. In a local or non-collab remote
session it won't ever allocate still.

Release Notes:

- N/A *or* Added/Fixed/Improved ...
This commit is contained in:
Lukas Wirth 2025-10-20 13:26:20 +02:00 committed by GitHub
parent 37e264ab99
commit 43a9368dff
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
33 changed files with 427 additions and 284 deletions

View file

@ -236,21 +236,21 @@ impl PendingDiff {
fn finalize(&self, cx: &mut Context<Diff>) -> FinalizedDiff {
let ranges = self.excerpt_ranges(cx);
let base_text = self.base_text.clone();
let language_registry = self.new_buffer.read(cx).language_registry();
let new_buffer = self.new_buffer.read(cx);
let language_registry = new_buffer.language_registry();
let path = self
.new_buffer
.read(cx)
let path = new_buffer
.file()
.map(|file| file.path().display(file.path_style(cx)))
.unwrap_or("untitled".into())
.into();
let replica_id = new_buffer.replica_id();
// Replace the buffer in the multibuffer with the snapshot
let buffer = cx.new(|cx| {
let language = self.new_buffer.read(cx).language().cloned();
let buffer = TextBuffer::new_normalized(
0,
replica_id,
cx.entity_id().as_non_zero_u64().into(),
self.new_buffer.read(cx).line_ending(),
self.new_buffer.read(cx).as_rope().clone(),

View file

@ -308,12 +308,13 @@ mod tests {
use indoc::indoc;
use language::{BufferId, TextBuffer};
use rand::prelude::*;
use text::ReplicaId;
use util::test::{generate_marked_text, marked_text_ranges};
#[test]
fn test_empty_query() {
let buffer = TextBuffer::new(
0,
ReplicaId::LOCAL,
BufferId::new(1).unwrap(),
"Hello world\nThis is a test\nFoo bar baz",
);
@ -327,7 +328,7 @@ mod tests {
#[test]
fn test_streaming_exact_match() {
let buffer = TextBuffer::new(
0,
ReplicaId::LOCAL,
BufferId::new(1).unwrap(),
"Hello world\nThis is a test\nFoo bar baz",
);
@ -351,7 +352,7 @@ mod tests {
#[test]
fn test_streaming_fuzzy_match() {
let buffer = TextBuffer::new(
0,
ReplicaId::LOCAL,
BufferId::new(1).unwrap(),
indoc! {"
function foo(a, b) {
@ -385,7 +386,7 @@ mod tests {
#[test]
fn test_incremental_improvement() {
let buffer = TextBuffer::new(
0,
ReplicaId::LOCAL,
BufferId::new(1).unwrap(),
"Line 1\nLine 2\nLine 3\nLine 4\nLine 5",
);
@ -410,7 +411,7 @@ mod tests {
#[test]
fn test_incomplete_lines_buffering() {
let buffer = TextBuffer::new(
0,
ReplicaId::LOCAL,
BufferId::new(1).unwrap(),
indoc! {"
The quick brown fox
@ -437,7 +438,7 @@ mod tests {
#[test]
fn test_multiline_fuzzy_match() {
let buffer = TextBuffer::new(
0,
ReplicaId::LOCAL,
BufferId::new(1).unwrap(),
indoc! {r#"
impl Display for User {
@ -691,7 +692,11 @@ mod tests {
}
"#};
let buffer = TextBuffer::new(0, BufferId::new(1).unwrap(), text.to_string());
let buffer = TextBuffer::new(
ReplicaId::LOCAL,
BufferId::new(1).unwrap(),
text.to_string(),
);
let snapshot = buffer.snapshot();
let mut matcher = StreamingFuzzyMatcher::new(snapshot.clone());
@ -724,7 +729,7 @@ mod tests {
#[track_caller]
fn assert_location_resolution(text_with_expected_range: &str, query: &str, rng: &mut StdRng) {
let (text, expected_ranges) = marked_text_ranges(text_with_expected_range, false);
let buffer = TextBuffer::new(0, BufferId::new(1).unwrap(), text.clone());
let buffer = TextBuffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), text.clone());
let snapshot = buffer.snapshot();
let mut matcher = StreamingFuzzyMatcher::new(snapshot);

View file

@ -486,7 +486,7 @@ pub enum ContextSummary {
Error,
}
#[derive(Default, Clone, Debug, Eq, PartialEq)]
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct ContextSummaryContent {
pub text: String,
pub done: bool,
@ -523,7 +523,11 @@ impl ContextSummary {
match self {
ContextSummary::Content(content) => content,
ContextSummary::Pending | ContextSummary::Error => {
let content = ContextSummaryContent::default();
let content = ContextSummaryContent {
text: "".to_string(),
done: false,
timestamp: clock::Lamport::MIN,
};
*self = ContextSummary::Content(content);
self.content_as_mut().unwrap()
}
@ -796,7 +800,7 @@ impl AssistantContext {
};
let first_message_id = MessageId(clock::Lamport {
replica_id: 0,
replica_id: ReplicaId::LOCAL,
value: 0,
});
let message = MessageAnchor {
@ -2692,7 +2696,7 @@ impl AssistantContext {
self.summary = ContextSummary::Content(ContextSummaryContent {
text: "".to_string(),
done: false,
timestamp: clock::Lamport::default(),
timestamp: clock::Lamport::MIN,
});
replace_old = true;
}
@ -3117,7 +3121,7 @@ impl SavedContext {
let mut first_message_metadata = None;
for message in self.messages {
if message.id == MessageId(clock::Lamport::default()) {
if message.id == MessageId(clock::Lamport::MIN) {
first_message_metadata = Some(message.metadata);
} else {
operations.push(ContextOperation::InsertMessage {
@ -3141,7 +3145,7 @@ impl SavedContext {
if let Some(metadata) = first_message_metadata {
let timestamp = next_timestamp.tick();
operations.push(ContextOperation::UpdateMessage {
message_id: MessageId(clock::Lamport::default()),
message_id: MessageId(clock::Lamport::MIN),
metadata: MessageMetadata {
role: metadata.role,
status: metadata.status,

View file

@ -741,7 +741,7 @@ async fn test_serialization(cx: &mut TestAppContext) {
);
}
#[gpui::test(iterations = 100)]
#[gpui::test(iterations = 25)]
async fn test_random_context_collaboration(cx: &mut TestAppContext, mut rng: StdRng) {
cx.update(init_test);
@ -771,7 +771,7 @@ async fn test_random_context_collaboration(cx: &mut TestAppContext, mut rng: Std
let context = cx.new(|cx| {
AssistantContext::new(
context_id.clone(),
i as ReplicaId,
ReplicaId::new(i as u16),
language::Capability::ReadWrite,
registry.clone(),
prompt_builder.clone(),
@ -789,7 +789,7 @@ async fn test_random_context_collaboration(cx: &mut TestAppContext, mut rng: Std
if let ContextEvent::Operation(op) = event {
network
.lock()
.broadcast(i as ReplicaId, vec![op.to_proto()]);
.broadcast(ReplicaId::new(i as u16), vec![op.to_proto()]);
}
}
})
@ -797,7 +797,7 @@ async fn test_random_context_collaboration(cx: &mut TestAppContext, mut rng: Std
});
contexts.push(context);
network.lock().add_peer(i as ReplicaId);
network.lock().add_peer(ReplicaId::new(i as u16));
}
let mut mutation_count = operations;
@ -943,9 +943,9 @@ async fn test_random_context_collaboration(cx: &mut TestAppContext, mut rng: Std
mutation_count -= 1;
}
_ => {
let replica_id = context_index as ReplicaId;
let replica_id = ReplicaId::new(context_index as u16);
if network.lock().is_disconnected(replica_id) {
network.lock().reconnect_peer(replica_id, 0);
network.lock().reconnect_peer(replica_id, ReplicaId::new(0));
let (ops_to_send, ops_to_receive) = cx.read(|cx| {
let host_context = &contexts[0].read(cx);
@ -971,7 +971,7 @@ async fn test_random_context_collaboration(cx: &mut TestAppContext, mut rng: Std
network.lock().broadcast(replica_id, ops_to_send);
context.update(cx, |context, cx| context.apply_ops(ops_to_receive, cx));
} else if rng.random_bool(0.1) && replica_id != 0 {
} else if rng.random_bool(0.1) && replica_id != ReplicaId::new(0) {
log::info!("Context {}: disconnecting", context_index);
network.lock().disconnect_peer(replica_id);
} else if network.lock().has_unreceived(replica_id) {
@ -996,25 +996,25 @@ async fn test_random_context_collaboration(cx: &mut TestAppContext, mut rng: Std
assert_eq!(
context.buffer.read(cx).text(),
first_context.buffer.read(cx).text(),
"Context {} text != Context 0 text",
"Context {:?} text != Context 0 text",
context.buffer.read(cx).replica_id()
);
assert_eq!(
context.message_anchors,
first_context.message_anchors,
"Context {} messages != Context 0 messages",
"Context {:?} messages != Context 0 messages",
context.buffer.read(cx).replica_id()
);
assert_eq!(
context.messages_metadata,
first_context.messages_metadata,
"Context {} message metadata != Context 0 message metadata",
"Context {:?} message metadata != Context 0 message metadata",
context.buffer.read(cx).replica_id()
);
assert_eq!(
context.slash_command_output_sections,
first_context.slash_command_output_sections,
"Context {} slash command output sections != Context 0 slash command output sections",
"Context {:?} slash command output sections != Context 0 slash command output sections",
context.buffer.read(cx).replica_id()
);
}

View file

@ -85,7 +85,7 @@ struct PendingHunk {
new_status: DiffHunkSecondaryStatus,
}
#[derive(Debug, Default, Clone)]
#[derive(Debug, Clone)]
pub struct DiffHunkSummary {
buffer_range: Range<Anchor>,
}
@ -114,7 +114,9 @@ impl sum_tree::Summary for DiffHunkSummary {
type Context<'a> = &'a text::BufferSnapshot;
fn zero(_cx: Self::Context<'_>) -> Self {
Default::default()
DiffHunkSummary {
buffer_range: Anchor::MIN..Anchor::MIN,
}
}
fn add_summary(&mut self, other: &Self, buffer: Self::Context<'_>) {
@ -937,7 +939,9 @@ impl BufferDiff {
pub fn clear_pending_hunks(&mut self, cx: &mut Context<Self>) {
if self.secondary_diff.is_some() {
self.inner.pending_hunks = SumTree::from_summary(DiffHunkSummary::default());
self.inner.pending_hunks = SumTree::from_summary(DiffHunkSummary {
buffer_range: Anchor::MIN..Anchor::MIN,
});
cx.emit(BufferDiffEvent::DiffChanged {
changed_range: Some(Anchor::MIN..Anchor::MAX),
});
@ -1368,7 +1372,7 @@ mod tests {
use gpui::TestAppContext;
use pretty_assertions::{assert_eq, assert_ne};
use rand::{Rng as _, rngs::StdRng};
use text::{Buffer, BufferId, Rope};
use text::{Buffer, BufferId, ReplicaId, Rope};
use unindent::Unindent as _;
use util::test::marked_text_ranges;
@ -1393,7 +1397,7 @@ mod tests {
"
.unindent();
let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), buffer_text);
let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), buffer_text);
let mut diff = BufferDiffSnapshot::new_sync(buffer.clone(), diff_base.clone(), cx);
assert_hunks(
diff.hunks_intersecting_range(Anchor::MIN..Anchor::MAX, &buffer),
@ -1467,7 +1471,7 @@ mod tests {
"
.unindent();
let buffer = Buffer::new(0, BufferId::new(1).unwrap(), buffer_text);
let buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), buffer_text);
let unstaged_diff = BufferDiffSnapshot::new_sync(buffer.clone(), index_text, cx);
let mut uncommitted_diff =
BufferDiffSnapshot::new_sync(buffer.clone(), head_text.clone(), cx);
@ -1536,7 +1540,7 @@ mod tests {
"
.unindent();
let buffer = Buffer::new(0, BufferId::new(1).unwrap(), buffer_text);
let buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), buffer_text);
let diff = cx
.update(|cx| {
BufferDiffSnapshot::new_with_base_text(
@ -1799,7 +1803,7 @@ mod tests {
for example in table {
let (buffer_text, ranges) = marked_text_ranges(&example.buffer_marked_text, false);
let buffer = Buffer::new(0, BufferId::new(1).unwrap(), buffer_text);
let buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), buffer_text);
let hunk_range =
buffer.anchor_before(ranges[0].start)..buffer.anchor_before(ranges[0].end);
@ -1872,7 +1876,11 @@ mod tests {
"
.unindent();
let buffer = Buffer::new(0, BufferId::new(1).unwrap(), buffer_text.clone());
let buffer = Buffer::new(
ReplicaId::LOCAL,
BufferId::new(1).unwrap(),
buffer_text.clone(),
);
let unstaged = BufferDiffSnapshot::new_sync(buffer.clone(), index_text, cx);
let uncommitted = BufferDiffSnapshot::new_sync(buffer.clone(), head_text.clone(), cx);
let unstaged_diff = cx.new(|cx| {
@ -1945,7 +1953,7 @@ mod tests {
"
.unindent();
let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), buffer_text_1);
let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), buffer_text_1);
let empty_diff = cx.update(|cx| BufferDiffSnapshot::empty(&buffer, cx));
let diff_1 = BufferDiffSnapshot::new_sync(buffer.clone(), base_text.clone(), cx);

View file

@ -9,7 +9,7 @@ use rpc::{
proto::{self, PeerId},
};
use std::{sync::Arc, time::Duration};
use text::BufferId;
use text::{BufferId, ReplicaId};
use util::ResultExt;
pub const ACKNOWLEDGE_DEBOUNCE_INTERVAL: Duration = Duration::from_millis(250);
@ -65,7 +65,12 @@ impl ChannelBuffer {
let buffer = cx.new(|cx| {
let capability = channel_store.read(cx).channel_capability(channel.id);
language::Buffer::remote(buffer_id, response.replica_id as u16, capability, base_text)
language::Buffer::remote(
buffer_id,
ReplicaId::new(response.replica_id as u16),
capability,
base_text,
)
})?;
buffer.update(cx, |buffer, cx| buffer.apply_ops(operations, cx))?;
@ -272,7 +277,7 @@ impl ChannelBuffer {
self.connected
}
pub fn replica_id(&self, cx: &App) -> u16 {
pub fn replica_id(&self, cx: &App) -> ReplicaId {
self.buffer.read(cx).replica_id()
}
}

View file

@ -943,7 +943,7 @@ impl Collaborator {
pub fn from_proto(message: proto::Collaborator) -> Result<Self> {
Ok(Self {
peer_id: message.peer_id.context("invalid peer id")?,
replica_id: message.replica_id as ReplicaId,
replica_id: ReplicaId::new(message.replica_id as u16),
user_id: message.user_id as UserId,
is_host: message.is_host,
committer_name: message.committer_name,

View file

@ -4,33 +4,73 @@ use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use std::{
cmp::{self, Ordering},
fmt, iter,
fmt,
};
pub use system_clock::*;
pub const LOCAL_BRANCH_REPLICA_ID: u16 = u16::MAX;
pub const AGENT_REPLICA_ID: u16 = u16::MAX - 1;
/// A unique identifier for each distributed node.
pub type ReplicaId = u16;
#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Ord, PartialOrd, Serialize, Deserialize)]
pub struct ReplicaId(u16);
impl ReplicaId {
/// The local replica
pub const LOCAL: ReplicaId = ReplicaId(0);
/// The remote replica of the connected remote server.
pub const REMOTE_SERVER: ReplicaId = ReplicaId(1);
/// The agent's unique identifier.
pub const AGENT: ReplicaId = ReplicaId(2);
/// A local branch.
pub const LOCAL_BRANCH: ReplicaId = ReplicaId(3);
/// The first collaborative replica ID, any replica equal or greater than this is a collaborative replica.
pub const FIRST_COLLAB_ID: ReplicaId = ReplicaId(Self::LOCAL_BRANCH.0 + 1);
pub fn new(id: u16) -> Self {
ReplicaId(id)
}
pub fn as_u16(&self) -> u16 {
self.0
}
pub fn is_remote(self) -> bool {
self == ReplicaId::REMOTE_SERVER || self >= ReplicaId::FIRST_COLLAB_ID
}
}
impl fmt::Debug for ReplicaId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if *self == ReplicaId::LOCAL {
write!(f, "<local>")
} else if *self == ReplicaId::REMOTE_SERVER {
write!(f, "<remote>")
} else if *self == ReplicaId::AGENT {
write!(f, "<agent>")
} else if *self == ReplicaId::LOCAL_BRANCH {
write!(f, "<branch>")
} else {
write!(f, "{}", self.0)
}
}
}
/// A [Lamport sequence number](https://en.wikipedia.org/wiki/Lamport_timestamp).
pub type Seq = u32;
/// A [Lamport timestamp](https://en.wikipedia.org/wiki/Lamport_timestamp),
/// used to determine the ordering of events in the editor.
#[derive(Clone, Copy, Default, Eq, Hash, PartialEq, Serialize, Deserialize)]
#[derive(Clone, Copy, Eq, Hash, PartialEq, Serialize, Deserialize)]
pub struct Lamport {
pub replica_id: ReplicaId,
pub value: Seq,
}
/// A [vector clock](https://en.wikipedia.org/wiki/Vector_clock).
/// A [version vector](https://en.wikipedia.org/wiki/Version_vector).
#[derive(Clone, Default, Hash, Eq, PartialEq)]
pub struct Global {
values: SmallVec<[u32; 8]>,
local_branch_value: u32,
// 4 is chosen as it is the biggest count that does not increase the size of the field itself.
// Coincidentally, it also covers all the important non-collab replica ids.
values: SmallVec<[u32; 4]>,
}
impl Global {
@ -38,30 +78,31 @@ impl Global {
Self::default()
}
/// Fetches the sequence number for the given replica ID.
pub fn get(&self, replica_id: ReplicaId) -> Seq {
if replica_id == LOCAL_BRANCH_REPLICA_ID {
self.local_branch_value
} else {
self.values.get(replica_id as usize).copied().unwrap_or(0) as Seq
}
self.values.get(replica_id.0 as usize).copied().unwrap_or(0) as Seq
}
/// Observe the lamport timestampe.
///
/// This sets the current sequence number of the observed replica ID to the maximum of this global's observed sequence and the observed timestamp.
pub fn observe(&mut self, timestamp: Lamport) {
debug_assert_ne!(timestamp.replica_id, Lamport::MAX.replica_id);
if timestamp.value > 0 {
if timestamp.replica_id == LOCAL_BRANCH_REPLICA_ID {
self.local_branch_value = cmp::max(self.local_branch_value, timestamp.value);
} else {
let new_len = timestamp.replica_id as usize + 1;
if new_len > self.values.len() {
self.values.resize(new_len, 0);
}
let entry = &mut self.values[timestamp.replica_id as usize];
*entry = cmp::max(*entry, timestamp.value);
let new_len = timestamp.replica_id.0 as usize + 1;
if new_len > self.values.len() {
self.values.resize(new_len, 0);
}
let entry = &mut self.values[timestamp.replica_id.0 as usize];
*entry = cmp::max(*entry, timestamp.value);
}
}
/// Join another global.
///
/// This observes all timestamps from the other global.
#[doc(alias = "synchronize")]
pub fn join(&mut self, other: &Self) {
if other.values.len() > self.values.len() {
self.values.resize(other.values.len(), 0);
@ -70,34 +111,36 @@ impl Global {
for (left, right) in self.values.iter_mut().zip(&other.values) {
*left = cmp::max(*left, *right);
}
self.local_branch_value = cmp::max(self.local_branch_value, other.local_branch_value);
}
/// Meet another global.
///
/// Sets all unobserved timestamps of this global to the sequences of other and sets all observed timestamps of this global to the minimum observed of both globals.
pub fn meet(&mut self, other: &Self) {
if other.values.len() > self.values.len() {
self.values.resize(other.values.len(), 0);
}
let mut new_len = 0;
for (ix, (left, right)) in self
.values
.iter_mut()
.zip(other.values.iter().chain(iter::repeat(&0)))
.enumerate()
{
if *left == 0 {
*left = *right;
} else if *right > 0 {
*left = cmp::min(*left, *right);
for (ix, (left, &right)) in self.values.iter_mut().zip(&other.values).enumerate() {
match (*left, right) {
// left has not observed the replica
(0, _) => *left = right,
// right has not observed the replica
(_, 0) => (),
(_, _) => *left = cmp::min(*left, right),
}
if *left != 0 {
new_len = ix + 1;
}
}
self.values.resize(new_len, 0);
self.local_branch_value = cmp::min(self.local_branch_value, other.local_branch_value);
if other.values.len() == self.values.len() {
// only truncate if other was equal or shorter (which at this point
// cant be due to the resize above) to `self` as otherwise we would
// truncate the unprocessed tail that is guaranteed to contain
// non-null timestamps
self.values.truncate(new_len);
}
}
pub fn observed(&self, timestamp: Lamport) -> bool {
@ -105,20 +148,18 @@ impl Global {
}
pub fn observed_any(&self, other: &Self) -> bool {
self.values
.iter()
.zip(other.values.iter())
.any(|(left, right)| *right > 0 && left >= right)
|| (other.local_branch_value > 0 && self.local_branch_value >= other.local_branch_value)
self.iter()
.zip(other.iter())
.any(|(left, right)| right.value > 0 && left.value >= right.value)
}
pub fn observed_all(&self, other: &Self) -> bool {
let mut rhs = other.values.iter();
self.values.iter().all(|left| match rhs.next() {
Some(right) => left >= right,
None => true,
}) && rhs.next().is_none()
&& self.local_branch_value >= other.local_branch_value
if self.values.len() < other.values.len() {
return false;
}
self.iter()
.zip(other.iter())
.all(|(left, right)| left.value >= right.value)
}
pub fn changed_since(&self, other: &Self) -> bool {
@ -128,21 +169,21 @@ impl Global {
.iter()
.zip(other.values.iter())
.any(|(left, right)| left > right)
|| self.local_branch_value > other.local_branch_value
}
pub fn most_recent(&self) -> Option<Lamport> {
self.iter().max_by_key(|timestamp| timestamp.value)
}
/// Iterates all replicas observed by this global as well as any unobserved replicas whose ID is lower than the highest observed replica.
pub fn iter(&self) -> impl Iterator<Item = Lamport> + '_ {
self.values
.iter()
.enumerate()
.map(|(replica_id, seq)| Lamport {
replica_id: replica_id as ReplicaId,
replica_id: ReplicaId(replica_id as u16),
value: *seq,
})
.chain((self.local_branch_value > 0).then_some(Lamport {
replica_id: LOCAL_BRANCH_REPLICA_ID,
value: self.local_branch_value,
}))
}
}
@ -173,12 +214,12 @@ impl PartialOrd for Lamport {
impl Lamport {
pub const MIN: Self = Self {
replica_id: ReplicaId::MIN,
replica_id: ReplicaId(u16::MIN),
value: Seq::MIN,
};
pub const MAX: Self = Self {
replica_id: ReplicaId::MAX,
replica_id: ReplicaId(u16::MAX),
value: Seq::MAX,
};
@ -190,7 +231,7 @@ impl Lamport {
}
pub fn as_u64(self) -> u64 {
((self.value as u64) << 32) | (self.replica_id as u64)
((self.value as u64) << 32) | (self.replica_id.0 as u64)
}
pub fn tick(&mut self) -> Self {
@ -211,7 +252,7 @@ impl fmt::Debug for Lamport {
} else if *self == Self::MIN {
write!(f, "Lamport {{MIN}}")
} else {
write!(f, "Lamport {{{}: {}}}", self.replica_id, self.value)
write!(f, "Lamport {{{:?}: {}}}", self.replica_id, self.value)
}
}
}
@ -220,16 +261,10 @@ impl fmt::Debug for Global {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "Global {{")?;
for timestamp in self.iter() {
if timestamp.replica_id > 0 {
if timestamp.replica_id.0 > 0 {
write!(f, ", ")?;
}
if timestamp.replica_id == LOCAL_BRANCH_REPLICA_ID {
write!(f, "<branch>: {}", timestamp.value)?;
} else if timestamp.replica_id == AGENT_REPLICA_ID {
write!(f, "<agent>: {}", timestamp.value)?;
} else {
write!(f, "{}: {}", timestamp.replica_id, timestamp.value)?;
}
write!(f, "{:?}: {}", timestamp.replica_id, timestamp.value)?;
}
write!(f, "}}")
}

View file

@ -62,9 +62,9 @@ impl Database {
.iter()
.map(|c| c.replica_id)
.collect::<HashSet<_>>();
let mut replica_id = ReplicaId(0);
let mut replica_id = ReplicaId(clock::ReplicaId::FIRST_COLLAB_ID.as_u16() as i32);
while replica_ids.contains(&replica_id) {
replica_id.0 += 1;
replica_id = ReplicaId(replica_id.0 + 1);
}
let collaborator = channel_buffer_collaborator::ActiveModel {
channel_id: ActiveValue::Set(channel_id),
@ -203,7 +203,7 @@ impl Database {
while let Some(row) = rows.next().await {
let row = row?;
let timestamp = clock::Lamport {
replica_id: row.replica_id as u16,
replica_id: clock::ReplicaId::new(row.replica_id as u16),
value: row.lamport_timestamp as u32,
};
server_version.observe(timestamp);
@ -701,7 +701,11 @@ impl Database {
return Ok(());
}
let mut text_buffer = text::Buffer::new(0, text::BufferId::new(1).unwrap(), base_text);
let mut text_buffer = text::Buffer::new(
clock::ReplicaId::LOCAL,
text::BufferId::new(1).unwrap(),
base_text,
);
text_buffer.apply_ops(operations.into_iter().filter_map(operation_from_wire));
let base_text = text_buffer.text();
@ -934,7 +938,7 @@ pub fn operation_from_wire(operation: proto::Operation) -> Option<text::Operatio
match operation.variant? {
proto::operation::Variant::Edit(edit) => Some(text::Operation::Edit(EditOperation {
timestamp: clock::Lamport {
replica_id: edit.replica_id as text::ReplicaId,
replica_id: clock::ReplicaId::new(edit.replica_id as u16),
value: edit.lamport_timestamp,
},
version: version_from_wire(&edit.version),
@ -949,7 +953,7 @@ pub fn operation_from_wire(operation: proto::Operation) -> Option<text::Operatio
})),
proto::operation::Variant::Undo(undo) => Some(text::Operation::Undo(UndoOperation {
timestamp: clock::Lamport {
replica_id: undo.replica_id as text::ReplicaId,
replica_id: clock::ReplicaId::new(undo.replica_id as u16),
value: undo.lamport_timestamp,
},
version: version_from_wire(&undo.version),
@ -959,7 +963,7 @@ pub fn operation_from_wire(operation: proto::Operation) -> Option<text::Operatio
.map(|c| {
(
clock::Lamport {
replica_id: c.replica_id as text::ReplicaId,
replica_id: clock::ReplicaId::new(c.replica_id as u16),
value: c.lamport_timestamp,
},
c.count,
@ -975,7 +979,7 @@ fn version_from_wire(message: &[proto::VectorClockEntry]) -> clock::Global {
let mut version = clock::Global::new();
for entry in message {
version.observe(clock::Lamport {
replica_id: entry.replica_id as text::ReplicaId,
replica_id: clock::ReplicaId::new(entry.replica_id as u16),
value: entry.timestamp,
});
}
@ -986,7 +990,7 @@ fn version_to_wire(version: &clock::Global) -> Vec<proto::VectorClockEntry> {
let mut message = Vec::new();
for entry in version.iter() {
message.push(proto::VectorClockEntry {
replica_id: entry.replica_id as u32,
replica_id: entry.replica_id.as_u16() as u32,
timestamp: entry.value,
});
}

View file

@ -91,14 +91,18 @@ impl Database {
.await?;
}
let replica_id = if is_ssh_project { 1 } else { 0 };
let replica_id = if is_ssh_project {
clock::ReplicaId::REMOTE_SERVER
} else {
clock::ReplicaId::LOCAL
};
project_collaborator::ActiveModel {
project_id: ActiveValue::set(project.id),
connection_id: ActiveValue::set(connection.id as i32),
connection_server_id: ActiveValue::set(ServerId(connection.owner_id as i32)),
user_id: ActiveValue::set(participant.user_id),
replica_id: ActiveValue::set(ReplicaId(replica_id)),
replica_id: ActiveValue::set(ReplicaId(replica_id.as_u16() as i32)),
is_host: ActiveValue::set(true),
id: ActiveValue::NotSet,
committer_name: ActiveValue::Set(None),
@ -841,7 +845,7 @@ impl Database {
.iter()
.map(|c| c.replica_id)
.collect::<HashSet<_>>();
let mut replica_id = ReplicaId(1);
let mut replica_id = ReplicaId(clock::ReplicaId::FIRST_COLLAB_ID.as_u16() as i32);
while replica_ids.contains(&replica_id) {
replica_id.0 += 1;
}

View file

@ -1,7 +1,7 @@
use super::*;
use crate::test_both_dbs;
use language::proto::{self, serialize_version};
use text::Buffer;
use text::{Buffer, ReplicaId};
test_both_dbs!(
test_channel_buffers,
@ -70,7 +70,11 @@ async fn test_channel_buffers(db: &Arc<Database>) {
.await
.unwrap();
let mut buffer_a = Buffer::new(0, text::BufferId::new(1).unwrap(), "".to_string());
let mut buffer_a = Buffer::new(
ReplicaId::new(0),
text::BufferId::new(1).unwrap(),
"".to_string(),
);
let operations = vec![
buffer_a.edit([(0..0, "hello world")]),
buffer_a.edit([(5..5, ", cruel")]),
@ -95,7 +99,7 @@ async fn test_channel_buffers(db: &Arc<Database>) {
.unwrap();
let mut buffer_b = Buffer::new(
0,
ReplicaId::new(0),
text::BufferId::new(1).unwrap(),
buffer_response_b.base_text,
);
@ -124,7 +128,7 @@ async fn test_channel_buffers(db: &Arc<Database>) {
rpc::proto::Collaborator {
user_id: a_id.to_proto(),
peer_id: Some(rpc::proto::PeerId { id: 1, owner_id }),
replica_id: 0,
replica_id: ReplicaId::FIRST_COLLAB_ID.as_u16() as u32,
is_host: false,
committer_name: None,
committer_email: None,
@ -132,7 +136,7 @@ async fn test_channel_buffers(db: &Arc<Database>) {
rpc::proto::Collaborator {
user_id: b_id.to_proto(),
peer_id: Some(rpc::proto::PeerId { id: 2, owner_id }),
replica_id: 1,
replica_id: ReplicaId::FIRST_COLLAB_ID.as_u16() as u32 + 1,
is_host: false,
committer_name: None,
committer_email: None,
@ -228,7 +232,8 @@ async fn test_channel_buffers_last_operations(db: &Database) {
.await
.unwrap();
db.join_channel_buffer(channel, user_id, connection_id)
let res = db
.join_channel_buffer(channel, user_id, connection_id)
.await
.unwrap();
@ -239,7 +244,7 @@ async fn test_channel_buffers_last_operations(db: &Database) {
);
text_buffers.push(Buffer::new(
0,
ReplicaId::new(res.replica_id as u16),
text::BufferId::new(1).unwrap(),
"".to_string(),
));
@ -276,7 +281,12 @@ async fn test_channel_buffers_last_operations(db: &Database) {
db.join_channel_buffer(buffers[1].channel_id, user_id, connection_id)
.await
.unwrap();
text_buffers[1] = Buffer::new(1, text::BufferId::new(1).unwrap(), "def".to_string());
let replica_id = text_buffers[1].replica_id();
text_buffers[1] = Buffer::new(
replica_id,
text::BufferId::new(1).unwrap(),
"def".to_string(),
);
update_buffer(
buffers[1].channel_id,
user_id,
@ -304,20 +314,32 @@ async fn test_channel_buffers_last_operations(db: &Database) {
rpc::proto::ChannelBufferVersion {
channel_id: buffers[0].channel_id.to_proto(),
epoch: 0,
version: serialize_version(&text_buffers[0].version()),
version: serialize_version(&text_buffers[0].version())
.into_iter()
.filter(
|vector| vector.replica_id == text_buffers[0].replica_id().as_u16() as u32
)
.collect::<Vec<_>>(),
},
rpc::proto::ChannelBufferVersion {
channel_id: buffers[1].channel_id.to_proto(),
epoch: 1,
version: serialize_version(&text_buffers[1].version())
.into_iter()
.filter(|vector| vector.replica_id == text_buffers[1].replica_id() as u32)
.filter(
|vector| vector.replica_id == text_buffers[1].replica_id().as_u16() as u32
)
.collect::<Vec<_>>(),
},
rpc::proto::ChannelBufferVersion {
channel_id: buffers[2].channel_id.to_proto(),
epoch: 0,
version: serialize_version(&text_buffers[2].version()),
version: serialize_version(&text_buffers[2].version())
.into_iter()
.filter(
|vector| vector.replica_id == text_buffers[2].replica_id().as_u16() as u32
)
.collect::<Vec<_>>(),
},
]
);

View file

@ -1278,7 +1278,7 @@ mod tests {
Anchor::min(),
&InlayHint {
label: InlayHintLabel::String("a".to_string()),
position: text::Anchor::default(),
position: text::Anchor::MIN,
padding_left: false,
padding_right: false,
tooltip: None,
@ -1298,7 +1298,7 @@ mod tests {
Anchor::min(),
&InlayHint {
label: InlayHintLabel::String("a".to_string()),
position: text::Anchor::default(),
position: text::Anchor::MIN,
padding_left: true,
padding_right: true,
tooltip: None,
@ -1318,7 +1318,7 @@ mod tests {
Anchor::min(),
&InlayHint {
label: InlayHintLabel::String(" a ".to_string()),
position: text::Anchor::default(),
position: text::Anchor::MIN,
padding_left: false,
padding_right: false,
tooltip: None,
@ -1338,7 +1338,7 @@ mod tests {
Anchor::min(),
&InlayHint {
label: InlayHintLabel::String(" a ".to_string()),
position: text::Anchor::default(),
position: text::Anchor::MIN,
padding_left: true,
padding_right: true,
tooltip: None,
@ -1361,7 +1361,7 @@ mod tests {
Anchor::min(),
&InlayHint {
label: InlayHintLabel::String("🎨".to_string()),
position: text::Anchor::default(),
position: text::Anchor::MIN,
padding_left: true,
padding_right: true,
tooltip: None,

View file

@ -82,7 +82,7 @@ use anyhow::{Context as _, Result, anyhow};
use blink_manager::BlinkManager;
use buffer_diff::DiffHunkStatus;
use client::{Collaborator, ParticipantIndex, parse_zed_link};
use clock::{AGENT_REPLICA_ID, ReplicaId};
use clock::ReplicaId;
use code_context_menus::{
AvailableCodeAction, CodeActionContents, CodeActionsItem, CodeActionsMenu, CodeContextMenu,
CompletionsMenu, ContextMenuOrigin,
@ -1301,7 +1301,7 @@ enum SelectionHistoryMode {
#[derive(Clone, PartialEq, Eq, Hash)]
struct HoveredCursor {
replica_id: u16,
replica_id: ReplicaId,
selection_id: usize,
}
@ -23482,7 +23482,7 @@ impl EditorSnapshot {
self.buffer_snapshot()
.selections_in_range(range, false)
.filter_map(move |(replica_id, line_mode, cursor_shape, selection)| {
if replica_id == AGENT_REPLICA_ID {
if replica_id == ReplicaId::AGENT {
Some(RemoteSelection {
replica_id,
selection,

View file

@ -8,7 +8,7 @@ use gpui::{
};
use language::{
Anchor, Buffer, Capability, DiskState, File, LanguageRegistry, LineEnding, OffsetRangeExt as _,
Point, Rope, TextBuffer,
Point, ReplicaId, Rope, TextBuffer,
};
use multi_buffer::PathKey;
use project::{Project, WorktreeId, git_store::Repository};
@ -135,7 +135,7 @@ impl CommitView {
});
let buffer = cx.new(|cx| {
let buffer = TextBuffer::new_normalized(
0,
ReplicaId::LOCAL,
cx.entity_id().as_non_zero_u64().into(),
LineEnding::default(),
format_commit(&commit).into(),
@ -316,7 +316,7 @@ async fn build_buffer(
};
let buffer = cx.new(|cx| {
let buffer = TextBuffer::new_normalized(
0,
ReplicaId::LOCAL,
cx.entity_id().as_non_zero_u64().into(),
line_ending,
text,

View file

@ -18,8 +18,8 @@ pub use crate::{
proto,
};
use anyhow::{Context as _, Result};
use clock::Lamport;
pub use clock::ReplicaId;
use clock::{AGENT_REPLICA_ID, Lamport};
use collections::HashMap;
use fs::MTime;
use futures::channel::oneshot;
@ -828,7 +828,11 @@ impl Buffer {
/// Create a new buffer with the given base text.
pub fn local<T: Into<String>>(base_text: T, cx: &Context<Self>) -> Self {
Self::build(
TextBuffer::new(0, cx.entity_id().as_non_zero_u64().into(), base_text.into()),
TextBuffer::new(
ReplicaId::LOCAL,
cx.entity_id().as_non_zero_u64().into(),
base_text.into(),
),
None,
Capability::ReadWrite,
)
@ -842,7 +846,7 @@ impl Buffer {
) -> Self {
Self::build(
TextBuffer::new_normalized(
0,
ReplicaId::LOCAL,
cx.entity_id().as_non_zero_u64().into(),
line_ending,
base_text_normalized,
@ -991,10 +995,10 @@ impl Buffer {
language: None,
remote_selections: Default::default(),
diagnostics: Default::default(),
diagnostics_timestamp: Default::default(),
diagnostics_timestamp: Lamport::MIN,
completion_triggers: Default::default(),
completion_triggers_per_language_server: Default::default(),
completion_triggers_timestamp: Default::default(),
completion_triggers_timestamp: Lamport::MIN,
deferred_ops: OperationQueue::new(),
has_conflict: false,
change_bits: Default::default(),
@ -1012,7 +1016,8 @@ impl Buffer {
let buffer_id = entity_id.as_non_zero_u64().into();
async move {
let text =
TextBuffer::new_normalized(0, buffer_id, Default::default(), text).snapshot();
TextBuffer::new_normalized(ReplicaId::LOCAL, buffer_id, Default::default(), text)
.snapshot();
let mut syntax = SyntaxMap::new(&text).snapshot();
if let Some(language) = language.clone() {
let language_registry = language_registry.clone();
@ -1033,8 +1038,13 @@ impl Buffer {
pub fn build_empty_snapshot(cx: &mut App) -> BufferSnapshot {
let entity_id = cx.reserve_entity::<Self>().entity_id();
let buffer_id = entity_id.as_non_zero_u64().into();
let text =
TextBuffer::new_normalized(0, buffer_id, Default::default(), Rope::new()).snapshot();
let text = TextBuffer::new_normalized(
ReplicaId::LOCAL,
buffer_id,
Default::default(),
Rope::new(),
)
.snapshot();
let syntax = SyntaxMap::new(&text).snapshot();
BufferSnapshot {
text,
@ -1056,7 +1066,9 @@ impl Buffer {
) -> BufferSnapshot {
let entity_id = cx.reserve_entity::<Self>().entity_id();
let buffer_id = entity_id.as_non_zero_u64().into();
let text = TextBuffer::new_normalized(0, buffer_id, Default::default(), text).snapshot();
let text =
TextBuffer::new_normalized(ReplicaId::LOCAL, buffer_id, Default::default(), text)
.snapshot();
let mut syntax = SyntaxMap::new(&text).snapshot();
if let Some(language) = language.clone() {
syntax.reparse(&text, language_registry, language);
@ -2260,7 +2272,7 @@ impl Buffer {
) {
let lamport_timestamp = self.text.lamport_clock.tick();
self.remote_selections.insert(
AGENT_REPLICA_ID,
ReplicaId::AGENT,
SelectionSet {
selections,
lamport_timestamp,
@ -2917,7 +2929,7 @@ impl Buffer {
edits.push((range, new_text));
}
log::info!("mutating buffer {} with {:?}", self.replica_id(), edits);
log::info!("mutating buffer {:?} with {:?}", self.replica_id(), edits);
self.edit(edits, None, cx);
}

View file

@ -70,7 +70,13 @@ fn test_line_endings(cx: &mut gpui::App) {
fn test_set_line_ending(cx: &mut TestAppContext) {
let base = cx.new(|cx| Buffer::local("one\ntwo\nthree\n", cx));
let base_replica = cx.new(|cx| {
Buffer::from_proto(1, Capability::ReadWrite, base.read(cx).to_proto(cx), None).unwrap()
Buffer::from_proto(
ReplicaId::new(1),
Capability::ReadWrite,
base.read(cx).to_proto(cx),
None,
)
.unwrap()
});
base.update(cx, |_buffer, cx| {
cx.subscribe(&base_replica, |this, _, event, cx| {
@ -397,7 +403,7 @@ fn test_edit_events(cx: &mut gpui::App) {
let buffer2 = cx.new(|cx| {
Buffer::remote(
BufferId::from(cx.entity_id().as_non_zero_u64()),
1,
ReplicaId::new(1),
Capability::ReadWrite,
"abcdef",
)
@ -2775,7 +2781,8 @@ fn test_serialization(cx: &mut gpui::App) {
.background_executor()
.block(buffer1.read(cx).serialize_ops(None, cx));
let buffer2 = cx.new(|cx| {
let mut buffer = Buffer::from_proto(1, Capability::ReadWrite, state, None).unwrap();
let mut buffer =
Buffer::from_proto(ReplicaId::new(1), Capability::ReadWrite, state, None).unwrap();
buffer.apply_ops(
ops.into_iter()
.map(|op| proto::deserialize_operation(op).unwrap()),
@ -2794,7 +2801,13 @@ fn test_branch_and_merge(cx: &mut TestAppContext) {
// Create a remote replica of the base buffer.
let base_replica = cx.new(|cx| {
Buffer::from_proto(1, Capability::ReadWrite, base.read(cx).to_proto(cx), None).unwrap()
Buffer::from_proto(
ReplicaId::new(1),
Capability::ReadWrite,
base.read(cx).to_proto(cx),
None,
)
.unwrap()
});
base.update(cx, |_buffer, cx| {
cx.subscribe(&base_replica, |this, _, event, cx| {
@ -3108,7 +3121,8 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) {
.background_executor()
.block(base_buffer.read(cx).serialize_ops(None, cx));
let mut buffer =
Buffer::from_proto(i as ReplicaId, Capability::ReadWrite, state, None).unwrap();
Buffer::from_proto(ReplicaId::new(i as u16), Capability::ReadWrite, state, None)
.unwrap();
buffer.apply_ops(
ops.into_iter()
.map(|op| proto::deserialize_operation(op).unwrap()),
@ -3133,9 +3147,9 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) {
});
buffers.push(buffer);
replica_ids.push(i as ReplicaId);
network.lock().add_peer(i as ReplicaId);
log::info!("Adding initial peer with replica id {}", i);
replica_ids.push(ReplicaId::new(i as u16));
network.lock().add_peer(ReplicaId::new(i as u16));
log::info!("Adding initial peer with replica id {:?}", replica_ids[i]);
}
log::info!("initial text: {:?}", base_text);
@ -3155,14 +3169,14 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) {
buffer.start_transaction_at(now);
buffer.randomly_edit(&mut rng, 5, cx);
buffer.end_transaction_at(now, cx);
log::info!("buffer {} text: {:?}", buffer.replica_id(), buffer.text());
log::info!("buffer {:?} text: {:?}", buffer.replica_id(), buffer.text());
});
mutation_count -= 1;
}
30..=39 if mutation_count != 0 => {
buffer.update(cx, |buffer, cx| {
if rng.random_bool(0.2) {
log::info!("peer {} clearing active selections", replica_id);
log::info!("peer {:?} clearing active selections", replica_id);
active_selections.remove(&replica_id);
buffer.remove_active_selections(cx);
} else {
@ -3179,7 +3193,7 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) {
}
let selections: Arc<[Selection<Anchor>]> = selections.into();
log::info!(
"peer {} setting active selections: {:?}",
"peer {:?} setting active selections: {:?}",
replica_id,
selections
);
@ -3189,7 +3203,7 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) {
});
mutation_count -= 1;
}
40..=49 if mutation_count != 0 && replica_id == 0 => {
40..=49 if mutation_count != 0 && replica_id == ReplicaId::REMOTE_SERVER => {
let entry_count = rng.random_range(1..=5);
buffer.update(cx, |buffer, cx| {
let diagnostics = DiagnosticSet::new(
@ -3207,7 +3221,11 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) {
}),
buffer,
);
log::info!("peer {} setting diagnostics: {:?}", replica_id, diagnostics);
log::info!(
"peer {:?} setting diagnostics: {:?}",
replica_id,
diagnostics
);
buffer.update_diagnostics(LanguageServerId(0), diagnostics, cx);
});
mutation_count -= 1;
@ -3217,12 +3235,13 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) {
let old_buffer_ops = cx
.background_executor()
.block(buffer.read(cx).serialize_ops(None, cx));
let new_replica_id = (0..=replica_ids.len() as ReplicaId)
let new_replica_id = (0..=replica_ids.len() as u16)
.map(ReplicaId::new)
.filter(|replica_id| *replica_id != buffer.read(cx).replica_id())
.choose(&mut rng)
.unwrap();
log::info!(
"Adding new replica {} (replicating from {})",
"Adding new replica {:?} (replicating from {:?})",
new_replica_id,
replica_id
);
@ -3241,7 +3260,7 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) {
cx,
);
log::info!(
"New replica {} text: {:?}",
"New replica {:?} text: {:?}",
new_buffer.replica_id(),
new_buffer.text()
);
@ -3264,7 +3283,7 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) {
}));
network.lock().replicate(replica_id, new_replica_id);
if new_replica_id as usize == replica_ids.len() {
if new_replica_id.as_u16() as usize == replica_ids.len() {
replica_ids.push(new_replica_id);
} else {
let new_buffer = new_buffer.take().unwrap();
@ -3276,7 +3295,7 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) {
.map(|op| proto::deserialize_operation(op).unwrap());
if ops.len() > 0 {
log::info!(
"peer {} (version: {:?}) applying {} ops from the network. {:?}",
"peer {:?} (version: {:?}) applying {} ops from the network. {:?}",
new_replica_id,
buffer.read(cx).version(),
ops.len(),
@ -3287,13 +3306,13 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) {
});
}
}
buffers[new_replica_id as usize] = new_buffer;
buffers[new_replica_id.as_u16() as usize] = new_buffer;
}
}
60..=69 if mutation_count != 0 => {
buffer.update(cx, |buffer, cx| {
buffer.randomly_undo_redo(&mut rng, cx);
log::info!("buffer {} text: {:?}", buffer.replica_id(), buffer.text());
log::info!("buffer {:?} text: {:?}", buffer.replica_id(), buffer.text());
});
mutation_count -= 1;
}
@ -3305,7 +3324,7 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) {
.map(|op| proto::deserialize_operation(op).unwrap());
if ops.len() > 0 {
log::info!(
"peer {} (version: {:?}) applying {} ops from the network. {:?}",
"peer {:?} (version: {:?}) applying {} ops from the network. {:?}",
replica_id,
buffer.read(cx).version(),
ops.len(),
@ -3335,13 +3354,13 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) {
assert_eq!(
buffer.version(),
first_buffer.version(),
"Replica {} version != Replica 0 version",
"Replica {:?} version != Replica 0 version",
buffer.replica_id()
);
assert_eq!(
buffer.text(),
first_buffer.text(),
"Replica {} text != Replica 0 text",
"Replica {:?} text != Replica 0 text",
buffer.replica_id()
);
assert_eq!(
@ -3351,7 +3370,7 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) {
first_buffer
.diagnostics_in_range::<_, usize>(0..first_buffer.len(), false)
.collect::<Vec<_>>(),
"Replica {} diagnostics != Replica 0 diagnostics",
"Replica {:?} diagnostics != Replica 0 diagnostics",
buffer.replica_id()
);
}
@ -3370,7 +3389,7 @@ fn test_random_collaboration(cx: &mut App, mut rng: StdRng) {
assert_eq!(
actual_remote_selections,
expected_remote_selections,
"Replica {} remote selections != expected selections",
"Replica {:?} remote selections != expected selections",
buffer.replica_id()
);
}

View file

@ -39,14 +39,14 @@ pub fn serialize_operation(operation: &crate::Operation) -> proto::Operation {
crate::Operation::Buffer(text::Operation::Undo(undo)) => {
proto::operation::Variant::Undo(proto::operation::Undo {
replica_id: undo.timestamp.replica_id as u32,
replica_id: undo.timestamp.replica_id.as_u16() as u32,
lamport_timestamp: undo.timestamp.value,
version: serialize_version(&undo.version),
counts: undo
.counts
.iter()
.map(|(edit_id, count)| proto::UndoCount {
replica_id: edit_id.replica_id as u32,
replica_id: edit_id.replica_id.as_u16() as u32,
lamport_timestamp: edit_id.value,
count: *count,
})
@ -60,7 +60,7 @@ pub fn serialize_operation(operation: &crate::Operation) -> proto::Operation {
lamport_timestamp,
cursor_shape,
} => proto::operation::Variant::UpdateSelections(proto::operation::UpdateSelections {
replica_id: lamport_timestamp.replica_id as u32,
replica_id: lamport_timestamp.replica_id.as_u16() as u32,
lamport_timestamp: lamport_timestamp.value,
selections: serialize_selections(selections),
line_mode: *line_mode,
@ -72,7 +72,7 @@ pub fn serialize_operation(operation: &crate::Operation) -> proto::Operation {
server_id,
diagnostics,
} => proto::operation::Variant::UpdateDiagnostics(proto::UpdateDiagnostics {
replica_id: lamport_timestamp.replica_id as u32,
replica_id: lamport_timestamp.replica_id.as_u16() as u32,
lamport_timestamp: lamport_timestamp.value,
server_id: server_id.0 as u64,
diagnostics: serialize_diagnostics(diagnostics.iter()),
@ -84,7 +84,7 @@ pub fn serialize_operation(operation: &crate::Operation) -> proto::Operation {
server_id,
} => proto::operation::Variant::UpdateCompletionTriggers(
proto::operation::UpdateCompletionTriggers {
replica_id: lamport_timestamp.replica_id as u32,
replica_id: lamport_timestamp.replica_id.as_u16() as u32,
lamport_timestamp: lamport_timestamp.value,
triggers: triggers.clone(),
language_server_id: server_id.to_proto(),
@ -95,7 +95,7 @@ pub fn serialize_operation(operation: &crate::Operation) -> proto::Operation {
line_ending,
lamport_timestamp,
} => proto::operation::Variant::UpdateLineEnding(proto::operation::UpdateLineEnding {
replica_id: lamport_timestamp.replica_id as u32,
replica_id: lamport_timestamp.replica_id.as_u16() as u32,
lamport_timestamp: lamport_timestamp.value,
line_ending: serialize_line_ending(*line_ending) as i32,
}),
@ -106,7 +106,7 @@ pub fn serialize_operation(operation: &crate::Operation) -> proto::Operation {
/// Serializes an [`EditOperation`] to be sent over RPC.
pub fn serialize_edit_operation(operation: &EditOperation) -> proto::operation::Edit {
proto::operation::Edit {
replica_id: operation.timestamp.replica_id as u32,
replica_id: operation.timestamp.replica_id.as_u16() as u32,
lamport_timestamp: operation.timestamp.value,
version: serialize_version(&operation.version),
ranges: operation.ranges.iter().map(serialize_range).collect(),
@ -123,12 +123,12 @@ pub fn serialize_undo_map_entry(
(edit_id, counts): (&clock::Lamport, &[(clock::Lamport, u32)]),
) -> proto::UndoMapEntry {
proto::UndoMapEntry {
replica_id: edit_id.replica_id as u32,
replica_id: edit_id.replica_id.as_u16() as u32,
local_timestamp: edit_id.value,
counts: counts
.iter()
.map(|(undo_id, count)| proto::UndoCount {
replica_id: undo_id.replica_id as u32,
replica_id: undo_id.replica_id.as_u16() as u32,
lamport_timestamp: undo_id.value,
count: *count,
})
@ -246,7 +246,7 @@ pub fn serialize_diagnostics<'a>(
/// Serializes an [`Anchor`] to be sent over RPC.
pub fn serialize_anchor(anchor: &Anchor) -> proto::Anchor {
proto::Anchor {
replica_id: anchor.timestamp.replica_id as u32,
replica_id: anchor.timestamp.replica_id.as_u16() as u32,
timestamp: anchor.timestamp.value,
offset: anchor.offset as u64,
bias: match anchor.bias {
@ -283,7 +283,7 @@ pub fn deserialize_operation(message: proto::Operation) -> Result<crate::Operati
proto::operation::Variant::Undo(undo) => {
crate::Operation::Buffer(text::Operation::Undo(UndoOperation {
timestamp: clock::Lamport {
replica_id: undo.replica_id as ReplicaId,
replica_id: ReplicaId::new(undo.replica_id as u16),
value: undo.lamport_timestamp,
},
version: deserialize_version(&undo.version),
@ -293,7 +293,7 @@ pub fn deserialize_operation(message: proto::Operation) -> Result<crate::Operati
.map(|c| {
(
clock::Lamport {
replica_id: c.replica_id as ReplicaId,
replica_id: ReplicaId::new(c.replica_id as u16),
value: c.lamport_timestamp,
},
c.count,
@ -319,7 +319,7 @@ pub fn deserialize_operation(message: proto::Operation) -> Result<crate::Operati
crate::Operation::UpdateSelections {
lamport_timestamp: clock::Lamport {
replica_id: message.replica_id as ReplicaId,
replica_id: ReplicaId::new(message.replica_id as u16),
value: message.lamport_timestamp,
},
selections: Arc::from(selections),
@ -333,7 +333,7 @@ pub fn deserialize_operation(message: proto::Operation) -> Result<crate::Operati
proto::operation::Variant::UpdateDiagnostics(message) => {
crate::Operation::UpdateDiagnostics {
lamport_timestamp: clock::Lamport {
replica_id: message.replica_id as ReplicaId,
replica_id: ReplicaId::new(message.replica_id as u16),
value: message.lamport_timestamp,
},
server_id: LanguageServerId(message.server_id as usize),
@ -344,7 +344,7 @@ pub fn deserialize_operation(message: proto::Operation) -> Result<crate::Operati
crate::Operation::UpdateCompletionTriggers {
triggers: message.triggers,
lamport_timestamp: clock::Lamport {
replica_id: message.replica_id as ReplicaId,
replica_id: ReplicaId::new(message.replica_id as u16),
value: message.lamport_timestamp,
},
server_id: LanguageServerId::from_proto(message.language_server_id),
@ -353,7 +353,7 @@ pub fn deserialize_operation(message: proto::Operation) -> Result<crate::Operati
proto::operation::Variant::UpdateLineEnding(message) => {
crate::Operation::UpdateLineEnding {
lamport_timestamp: clock::Lamport {
replica_id: message.replica_id as ReplicaId,
replica_id: ReplicaId::new(message.replica_id as u16),
value: message.lamport_timestamp,
},
line_ending: deserialize_line_ending(
@ -370,7 +370,7 @@ pub fn deserialize_operation(message: proto::Operation) -> Result<crate::Operati
pub fn deserialize_edit_operation(edit: proto::operation::Edit) -> EditOperation {
EditOperation {
timestamp: clock::Lamport {
replica_id: edit.replica_id as ReplicaId,
replica_id: ReplicaId::new(edit.replica_id as u16),
value: edit.lamport_timestamp,
},
version: deserialize_version(&edit.version),
@ -385,7 +385,7 @@ pub fn deserialize_undo_map_entry(
) -> (clock::Lamport, Vec<(clock::Lamport, u32)>) {
(
clock::Lamport {
replica_id: entry.replica_id as u16,
replica_id: ReplicaId::new(entry.replica_id as u16),
value: entry.local_timestamp,
},
entry
@ -394,7 +394,7 @@ pub fn deserialize_undo_map_entry(
.map(|undo_count| {
(
clock::Lamport {
replica_id: undo_count.replica_id as u16,
replica_id: ReplicaId::new(undo_count.replica_id as u16),
value: undo_count.lamport_timestamp,
},
undo_count.count,
@ -480,7 +480,7 @@ pub fn deserialize_anchor(anchor: proto::Anchor) -> Option<Anchor> {
};
Some(Anchor {
timestamp: clock::Lamport {
replica_id: anchor.replica_id as ReplicaId,
replica_id: ReplicaId::new(anchor.replica_id as u16),
value: anchor.timestamp,
},
offset: anchor.offset as usize,
@ -524,7 +524,7 @@ pub fn lamport_timestamp_for_operation(operation: &proto::Operation) -> Option<c
}
Some(clock::Lamport {
replica_id: replica_id as ReplicaId,
replica_id: ReplicaId::new(replica_id as u16),
value,
})
}
@ -559,7 +559,7 @@ pub fn deserialize_transaction(transaction: proto::Transaction) -> Result<Transa
/// Serializes a [`clock::Lamport`] timestamp to be sent over RPC.
pub fn serialize_timestamp(timestamp: clock::Lamport) -> proto::LamportTimestamp {
proto::LamportTimestamp {
replica_id: timestamp.replica_id as u32,
replica_id: timestamp.replica_id.as_u16() as u32,
value: timestamp.value,
}
}
@ -567,7 +567,7 @@ pub fn serialize_timestamp(timestamp: clock::Lamport) -> proto::LamportTimestamp
/// Deserializes a [`clock::Lamport`] timestamp from the RPC representation.
pub fn deserialize_timestamp(timestamp: proto::LamportTimestamp) -> clock::Lamport {
clock::Lamport {
replica_id: timestamp.replica_id as ReplicaId,
replica_id: ReplicaId::new(timestamp.replica_id as u16),
value: timestamp.value,
}
}
@ -590,7 +590,7 @@ pub fn deserialize_version(message: &[proto::VectorClockEntry]) -> clock::Global
let mut version = clock::Global::new();
for entry in message {
version.observe(clock::Lamport {
replica_id: entry.replica_id as ReplicaId,
replica_id: ReplicaId::new(entry.replica_id as u16),
value: entry.timestamp,
});
}
@ -602,7 +602,7 @@ pub fn serialize_version(version: &clock::Global) -> Vec<proto::VectorClockEntry
version
.iter()
.map(|entry| proto::VectorClockEntry {
replica_id: entry.replica_id as u32,
replica_id: entry.replica_id.as_u16() as u32,
timestamp: entry.value,
})
.collect()

View file

@ -6,7 +6,7 @@ use crate::{
use gpui::App;
use rand::rngs::StdRng;
use std::{env, ops::Range, sync::Arc};
use text::{Buffer, BufferId};
use text::{Buffer, BufferId, ReplicaId};
use tree_sitter::Node;
use unindent::Unindent as _;
use util::test::marked_text_ranges;
@ -88,7 +88,7 @@ fn test_syntax_map_layers_for_range(cx: &mut App) {
registry.add(language.clone());
let mut buffer = Buffer::new(
0,
ReplicaId::LOCAL,
BufferId::new(1).unwrap(),
r#"
fn a() {
@ -189,7 +189,7 @@ fn test_dynamic_language_injection(cx: &mut App) {
registry.add(Arc::new(ruby_lang()));
let mut buffer = Buffer::new(
0,
ReplicaId::LOCAL,
BufferId::new(1).unwrap(),
r#"
This is a code block:
@ -811,7 +811,7 @@ fn test_syntax_map_languages_loading_with_erb(cx: &mut App) {
.unindent();
let registry = Arc::new(LanguageRegistry::test(cx.background_executor().clone()));
let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), text);
let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), text);
let mut syntax_map = SyntaxMap::new(&buffer);
syntax_map.set_language_registry(registry.clone());
@ -978,7 +978,7 @@ fn test_random_edits(
.map(|i| i.parse().expect("invalid `OPERATIONS` variable"))
.unwrap_or(10);
let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), text);
let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), text);
let mut syntax_map = SyntaxMap::new(&buffer);
syntax_map.set_language_registry(registry.clone());
@ -1159,7 +1159,7 @@ fn test_edit_sequence(language_name: &str, steps: &[&str], cx: &mut App) -> (Buf
.now_or_never()
.unwrap()
.unwrap();
let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), "");
let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), "");
let mut mutated_syntax_map = SyntaxMap::new(&buffer);
mutated_syntax_map.set_language_registry(registry.clone());

View file

@ -666,7 +666,7 @@ impl MultiBuffer {
paths_by_excerpt: Default::default(),
buffer_changed_since_sync: Default::default(),
history: History {
next_transaction_id: clock::Lamport::default(),
next_transaction_id: clock::Lamport::MIN,
undo_stack: Vec::new(),
redo_stack: Vec::new(),
transaction_depth: 0,

View file

@ -78,7 +78,9 @@ fn test_remote(cx: &mut App) {
let ops = cx
.background_executor()
.block(host_buffer.read(cx).serialize_ops(None, cx));
let mut buffer = Buffer::from_proto(1, Capability::ReadWrite, state, None).unwrap();
let mut buffer =
Buffer::from_proto(ReplicaId::REMOTE_SERVER, Capability::ReadWrite, state, None)
.unwrap();
buffer.apply_ops(
ops.into_iter()
.map(|op| language::proto::deserialize_operation(op).unwrap()),
@ -3636,7 +3638,7 @@ fn assert_position_translation(snapshot: &MultiBufferSnapshot) {
fn assert_line_indents(snapshot: &MultiBufferSnapshot) {
let max_row = snapshot.max_point().row;
let buffer_id = snapshot.excerpts().next().unwrap().1.remote_id();
let text = text::Buffer::new(0, buffer_id, snapshot.text());
let text = text::Buffer::new(ReplicaId::LOCAL, buffer_id, snapshot.text());
let mut line_indents = text
.line_indents_in_row_range(0..max_row + 1)
.collect::<Vec<_>>();

View file

@ -25,7 +25,7 @@ use rpc::{
};
use smol::channel::Receiver;
use std::{io, pin::pin, sync::Arc, time::Instant};
use text::BufferId;
use text::{BufferId, ReplicaId};
use util::{ResultExt as _, TryFutureExt, debug_panic, maybe, rel_path::RelPath};
use worktree::{File, PathChange, ProjectEntryId, Worktree, WorktreeId};
@ -158,7 +158,7 @@ impl RemoteBufferStore {
pub fn handle_create_buffer_for_peer(
&mut self,
envelope: TypedEnvelope<proto::CreateBufferForPeer>,
replica_id: u16,
replica_id: ReplicaId,
capability: Capability,
cx: &mut Context<BufferStore>,
) -> Result<Option<Entity<Buffer>>> {
@ -626,7 +626,9 @@ impl LocalBufferStore {
cx.spawn(async move |_, cx| {
let loaded = load_file.await?;
let text_buffer = cx
.background_spawn(async move { text::Buffer::new(0, buffer_id, loaded.text) })
.background_spawn(async move {
text::Buffer::new(ReplicaId::LOCAL, buffer_id, loaded.text)
})
.await;
cx.insert_entity(reservation, |_| {
Buffer::build(text_buffer, Some(loaded.file), Capability::ReadWrite)
@ -639,7 +641,7 @@ impl LocalBufferStore {
Ok(buffer) => Ok(buffer),
Err(error) if is_not_found_error(&error) => cx.new(|cx| {
let buffer_id = BufferId::from(cx.entity_id().as_non_zero_u64());
let text_buffer = text::Buffer::new(0, buffer_id, "");
let text_buffer = text::Buffer::new(ReplicaId::LOCAL, buffer_id, "");
Buffer::build(
text_buffer,
Some(Arc::new(File {
@ -917,7 +919,7 @@ impl BufferStore {
path: file.path.clone(),
worktree_id: file.worktree_id(cx),
});
let is_remote = buffer.replica_id() != 0;
let is_remote = buffer.replica_id().is_remote();
let open_buffer = OpenBuffer::Complete {
buffer: buffer_entity.downgrade(),
};
@ -1317,7 +1319,7 @@ impl BufferStore {
pub fn handle_create_buffer_for_peer(
&mut self,
envelope: TypedEnvelope<proto::CreateBufferForPeer>,
replica_id: u16,
replica_id: ReplicaId,
capability: Capability,
cx: &mut Context<Self>,
) -> Result<()> {

View file

@ -271,7 +271,7 @@ mod tests {
use language::language_settings::AllLanguageSettings;
use serde_json::json;
use settings::Settings as _;
use text::{Buffer, BufferId, Point, ToOffset as _};
use text::{Buffer, BufferId, Point, ReplicaId, ToOffset as _};
use unindent::Unindent as _;
use util::{path, rel_path::rel_path};
use worktree::WorktreeSettings;
@ -299,7 +299,7 @@ mod tests {
.unindent();
let buffer_id = BufferId::new(1).unwrap();
let buffer = Buffer::new(0, buffer_id, test_content);
let buffer = Buffer::new(ReplicaId::LOCAL, buffer_id, test_content);
let snapshot = buffer.snapshot();
let conflict_snapshot = ConflictSet::parse(&snapshot);
@ -374,7 +374,7 @@ mod tests {
.unindent();
let buffer_id = BufferId::new(1).unwrap();
let buffer = Buffer::new(0, buffer_id, test_content);
let buffer = Buffer::new(ReplicaId::LOCAL, buffer_id, test_content);
let snapshot = buffer.snapshot();
let conflict_snapshot = ConflictSet::parse(&snapshot);
@ -405,7 +405,7 @@ mod tests {
>>>>>>> "#
.unindent();
let buffer_id = BufferId::new(1).unwrap();
let buffer = Buffer::new(0, buffer_id, test_content);
let buffer = Buffer::new(ReplicaId::LOCAL, buffer_id, test_content);
let snapshot = buffer.snapshot();
let conflict_snapshot = ConflictSet::parse(&snapshot);
@ -447,7 +447,7 @@ mod tests {
.unindent();
let buffer_id = BufferId::new(1).unwrap();
let buffer = Buffer::new(0, buffer_id, test_content.clone());
let buffer = Buffer::new(ReplicaId::LOCAL, buffer_id, test_content.clone());
let snapshot = buffer.snapshot();
let conflict_snapshot = ConflictSet::parse(&snapshot);

View file

@ -3025,9 +3025,8 @@ impl LocalLspStore {
Some(buffer_to_edit.read(cx).saved_version().clone())
};
let most_recent_edit = version.and_then(|version| {
version.iter().max_by_key(|timestamp| timestamp.value)
});
let most_recent_edit =
version.and_then(|version| version.most_recent());
// Check if the edit that triggered that edit has been made by this participant.
if let Some(most_recent_edit) = most_recent_edit {

View file

@ -1560,7 +1560,7 @@ impl Project {
})?;
let agent_server_store = cx.new(|cx| AgentServerStore::collab(cx))?;
let replica_id = response.payload.replica_id as ReplicaId;
let replica_id = ReplicaId::new(response.payload.replica_id as u16);
let project = cx.new(|cx| {
let snippets = SnippetProvider::new(fs.clone(), BTreeSet::from_iter([]), cx);
@ -1975,9 +1975,9 @@ impl Project {
ProjectClientState::Remote { replica_id, .. } => replica_id,
_ => {
if self.remote_client.is_some() {
1
ReplicaId::REMOTE_SERVER
} else {
0
ReplicaId::LOCAL
}
}
}

View file

@ -4307,7 +4307,7 @@ async fn test_rescan_and_remote_updates(cx: &mut gpui::TestAppContext) {
let remote = cx.update(|cx| {
Worktree::remote(
0,
1,
ReplicaId::REMOTE_SERVER,
metadata,
project.read(cx).client().into(),
project.read(cx).path_style(cx),

View file

@ -551,7 +551,7 @@ impl WorktreeStore {
let worktree = cx.update(|cx| {
Worktree::remote(
REMOTE_SERVER_PROJECT_ID,
0,
ReplicaId::REMOTE_SERVER,
proto::WorktreeMetadata {
id: response.worktree_id,
root_name,

View file

@ -6,7 +6,7 @@ use std::{cmp::Ordering, fmt::Debug, ops::Range};
use sum_tree::{Bias, Dimensions};
/// A timestamped position in a buffer
#[derive(Copy, Clone, Eq, PartialEq, Debug, Hash, Default)]
#[derive(Copy, Clone, Eq, PartialEq, Debug, Hash)]
pub struct Anchor {
pub timestamp: clock::Lamport,
/// The byte offset in the buffer

View file

@ -1,3 +1,4 @@
use clock::Lamport;
use std::{fmt::Debug, ops::Add};
use sum_tree::{ContextLessSummary, Dimension, Edit, Item, KeyedItem, SumTree};
@ -11,10 +12,10 @@ struct OperationItem<T>(T);
#[derive(Clone, Debug)]
pub struct OperationQueue<T: Operation>(SumTree<OperationItem<T>>);
#[derive(Clone, Copy, Debug, Default, Eq, Ord, PartialEq, PartialOrd)]
#[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
pub struct OperationKey(clock::Lamport);
#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub struct OperationSummary {
pub key: OperationKey,
pub len: usize,
@ -69,7 +70,10 @@ impl<T: Operation> OperationQueue<T> {
impl ContextLessSummary for OperationSummary {
fn zero() -> Self {
Default::default()
OperationSummary {
key: OperationKey::new(Lamport::MIN),
len: 0,
}
}
fn add_summary(&mut self, other: &Self) {
@ -93,7 +97,7 @@ impl Add<&Self> for OperationSummary {
impl Dimension<'_, OperationSummary> for OperationKey {
fn zero(_cx: ()) -> Self {
Default::default()
OperationKey::new(Lamport::MIN)
}
fn add_summary(&mut self, summary: &OperationSummary, _: ()) {
@ -123,11 +127,13 @@ impl<T: Operation> KeyedItem for OperationItem<T> {
#[cfg(test)]
mod tests {
use clock::ReplicaId;
use super::*;
#[test]
fn test_len() {
let mut clock = clock::Lamport::new(0);
let mut clock = clock::Lamport::new(ReplicaId::LOCAL);
let mut queue = OperationQueue::new();
assert_eq!(queue.len(), 0);

View file

@ -16,7 +16,7 @@ fn init_logger() {
#[test]
fn test_edit() {
let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), "abc");
let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), "abc");
assert_eq!(buffer.text(), "abc");
buffer.edit([(3..3, "def")]);
assert_eq!(buffer.text(), "abcdef");
@ -40,7 +40,11 @@ fn test_random_edits(mut rng: StdRng) {
let mut reference_string = RandomCharIter::new(&mut rng)
.take(reference_string_len)
.collect::<String>();
let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), reference_string.clone());
let mut buffer = Buffer::new(
ReplicaId::LOCAL,
BufferId::new(1).unwrap(),
reference_string.clone(),
);
LineEnding::normalize(&mut reference_string);
buffer.set_group_interval(Duration::from_millis(rng.random_range(0..=200)));
@ -176,7 +180,11 @@ fn test_line_endings() {
LineEnding::Windows
);
let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), "one\r\ntwo\rthree");
let mut buffer = Buffer::new(
ReplicaId::LOCAL,
BufferId::new(1).unwrap(),
"one\r\ntwo\rthree",
);
assert_eq!(buffer.text(), "one\ntwo\nthree");
assert_eq!(buffer.line_ending(), LineEnding::Windows);
buffer.check_invariants();
@ -190,7 +198,7 @@ fn test_line_endings() {
#[test]
fn test_line_len() {
let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), "");
let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), "");
buffer.edit([(0..0, "abcd\nefg\nhij")]);
buffer.edit([(12..12, "kl\nmno")]);
buffer.edit([(18..18, "\npqrs\n")]);
@ -207,7 +215,7 @@ fn test_line_len() {
#[test]
fn test_common_prefix_at_position() {
let text = "a = str; b = δα";
let buffer = Buffer::new(0, BufferId::new(1).unwrap(), text);
let buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), text);
let offset1 = offset_after(text, "str");
let offset2 = offset_after(text, "δα");
@ -256,7 +264,7 @@ fn test_common_prefix_at_position() {
#[test]
fn test_text_summary_for_range() {
let buffer = Buffer::new(
0,
ReplicaId::LOCAL,
BufferId::new(1).unwrap(),
"ab\nefg\nhklm\nnopqrs\ntuvwxyz",
);
@ -348,7 +356,7 @@ fn test_text_summary_for_range() {
#[test]
fn test_chars_at() {
let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), "");
let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), "");
buffer.edit([(0..0, "abcd\nefgh\nij")]);
buffer.edit([(12..12, "kl\nmno")]);
buffer.edit([(18..18, "\npqrs")]);
@ -370,7 +378,7 @@ fn test_chars_at() {
assert_eq!(chars.collect::<String>(), "PQrs");
// Regression test:
let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), "");
let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), "");
buffer.edit([(0..0, "[workspace]\nmembers = [\n \"xray_core\",\n \"xray_server\",\n \"xray_cli\",\n \"xray_wasm\",\n]\n")]);
buffer.edit([(60..60, "\n")]);
@ -380,7 +388,7 @@ fn test_chars_at() {
#[test]
fn test_anchors() {
let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), "");
let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), "");
buffer.edit([(0..0, "abc")]);
let left_anchor = buffer.anchor_before(2);
let right_anchor = buffer.anchor_after(2);
@ -498,7 +506,7 @@ fn test_anchors() {
#[test]
fn test_anchors_at_start_and_end() {
let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), "");
let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), "");
let before_start_anchor = buffer.anchor_before(0);
let after_end_anchor = buffer.anchor_after(0);
@ -521,7 +529,7 @@ fn test_anchors_at_start_and_end() {
#[test]
fn test_undo_redo() {
let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), "1234");
let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), "1234");
// Set group interval to zero so as to not group edits in the undo stack.
buffer.set_group_interval(Duration::from_secs(0));
@ -558,7 +566,7 @@ fn test_undo_redo() {
#[test]
fn test_history() {
let mut now = Instant::now();
let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), "123456");
let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), "123456");
buffer.set_group_interval(Duration::from_millis(300));
let transaction_1 = buffer.start_transaction_at(now).unwrap();
@ -625,7 +633,7 @@ fn test_history() {
#[test]
fn test_finalize_last_transaction() {
let now = Instant::now();
let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), "123456");
let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), "123456");
buffer.history.group_interval = Duration::from_millis(1);
buffer.start_transaction_at(now);
@ -661,7 +669,7 @@ fn test_finalize_last_transaction() {
#[test]
fn test_edited_ranges_for_transaction() {
let now = Instant::now();
let mut buffer = Buffer::new(0, BufferId::new(1).unwrap(), "1234567");
let mut buffer = Buffer::new(ReplicaId::LOCAL, BufferId::new(1).unwrap(), "1234567");
buffer.start_transaction_at(now);
buffer.edit([(2..4, "cd")]);
@ -700,9 +708,9 @@ fn test_edited_ranges_for_transaction() {
fn test_concurrent_edits() {
let text = "abcdef";
let mut buffer1 = Buffer::new(1, BufferId::new(1).unwrap(), text);
let mut buffer2 = Buffer::new(2, BufferId::new(1).unwrap(), text);
let mut buffer3 = Buffer::new(3, BufferId::new(1).unwrap(), text);
let mut buffer1 = Buffer::new(ReplicaId::new(1), BufferId::new(1).unwrap(), text);
let mut buffer2 = Buffer::new(ReplicaId::new(2), BufferId::new(1).unwrap(), text);
let mut buffer3 = Buffer::new(ReplicaId::new(3), BufferId::new(1).unwrap(), text);
let buf1_op = buffer1.edit([(1..2, "12")]);
assert_eq!(buffer1.text(), "a12cdef");
@ -741,11 +749,15 @@ fn test_random_concurrent_edits(mut rng: StdRng) {
let mut network = Network::new(rng.clone());
for i in 0..peers {
let mut buffer = Buffer::new(i as ReplicaId, BufferId::new(1).unwrap(), base_text.clone());
let mut buffer = Buffer::new(
ReplicaId::new(i as u16),
BufferId::new(1).unwrap(),
base_text.clone(),
);
buffer.history.group_interval = Duration::from_millis(rng.random_range(0..=200));
buffers.push(buffer);
replica_ids.push(i as u16);
network.add_peer(i as u16);
replica_ids.push(ReplicaId::new(i as u16));
network.add_peer(ReplicaId::new(i as u16));
}
log::info!("initial text: {:?}", base_text);
@ -759,7 +771,7 @@ fn test_random_concurrent_edits(mut rng: StdRng) {
0..=50 if mutation_count != 0 => {
let op = buffer.randomly_edit(&mut rng, 5).1;
network.broadcast(buffer.replica_id, vec![op]);
log::info!("buffer {} text: {:?}", buffer.replica_id, buffer.text());
log::info!("buffer {:?} text: {:?}", buffer.replica_id, buffer.text());
mutation_count -= 1;
}
51..=70 if mutation_count != 0 => {
@ -771,7 +783,7 @@ fn test_random_concurrent_edits(mut rng: StdRng) {
let ops = network.receive(replica_id);
if !ops.is_empty() {
log::info!(
"peer {} applying {} ops from the network.",
"peer {:?} applying {} ops from the network.",
replica_id,
ops.len()
);
@ -792,7 +804,7 @@ fn test_random_concurrent_edits(mut rng: StdRng) {
assert_eq!(
buffer.text(),
first_buffer.text(),
"Replica {} text != Replica 0 text",
"Replica {:?} text != Replica 0 text",
buffer.replica_id
);
buffer.check_invariants();

View file

@ -12,7 +12,7 @@ mod undo_map;
pub use anchor::*;
use anyhow::{Context as _, Result};
use clock::LOCAL_BRANCH_REPLICA_ID;
use clock::Lamport;
pub use clock::ReplicaId;
use collections::{HashMap, HashSet};
use locator::Locator;
@ -573,7 +573,7 @@ struct InsertionFragment {
fragment_id: Locator,
}
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord)]
struct InsertionFragmentKey {
timestamp: clock::Lamport,
split_offset: usize,
@ -709,7 +709,7 @@ impl FromIterator<char> for LineIndent {
}
impl Buffer {
pub fn new(replica_id: u16, remote_id: BufferId, base_text: impl Into<String>) -> Buffer {
pub fn new(replica_id: ReplicaId, remote_id: BufferId, base_text: impl Into<String>) -> Buffer {
let mut base_text = base_text.into();
let line_ending = LineEnding::detect(&base_text);
LineEnding::normalize(&mut base_text);
@ -717,7 +717,7 @@ impl Buffer {
}
pub fn new_normalized(
replica_id: u16,
replica_id: ReplicaId,
remote_id: BufferId,
line_ending: LineEnding,
normalized: Rope,
@ -731,10 +731,7 @@ impl Buffer {
let visible_text = history.base_text.clone();
if !visible_text.is_empty() {
let insertion_timestamp = clock::Lamport {
replica_id: 0,
value: 1,
};
let insertion_timestamp = clock::Lamport::new(ReplicaId::LOCAL);
lamport_clock.observe(insertion_timestamp);
version.observe(insertion_timestamp);
let fragment_id = Locator::between(&Locator::min(), &Locator::max());
@ -788,7 +785,7 @@ impl Buffer {
history: History::new(self.base_text().clone()),
deferred_ops: OperationQueue::new(),
deferred_replicas: HashSet::default(),
lamport_clock: clock::Lamport::new(LOCAL_BRANCH_REPLICA_ID),
lamport_clock: clock::Lamport::new(ReplicaId::LOCAL_BRANCH),
subscriptions: Default::default(),
edit_id_resolvers: Default::default(),
wait_for_version_txs: Default::default(),
@ -1254,7 +1251,7 @@ impl Buffer {
for edit_id in edit_ids {
let insertion_slice = InsertionSlice {
edit_id: *edit_id,
insertion_id: clock::Lamport::default(),
insertion_id: clock::Lamport::MIN,
range: 0..0,
};
let slices = self
@ -1858,7 +1855,7 @@ impl Buffer {
T: rand::Rng,
{
let mut edits = self.get_random_edits(rng, edit_count);
log::info!("mutating buffer {} with {:?}", self.replica_id, edits);
log::info!("mutating buffer {:?} with {:?}", self.replica_id, edits);
let op = self.edit(edits.iter().cloned());
if let Operation::Edit(edit) = &op {
@ -1881,7 +1878,7 @@ impl Buffer {
if let Some(entry) = self.history.undo_stack.choose(rng) {
let transaction = entry.transaction.clone();
log::info!(
"undoing buffer {} transaction {:?}",
"undoing buffer {:?} transaction {:?}",
self.replica_id,
transaction
);
@ -2918,7 +2915,10 @@ impl InsertionFragment {
impl sum_tree::ContextLessSummary for InsertionFragmentKey {
fn zero() -> Self {
Default::default()
InsertionFragmentKey {
timestamp: Lamport::MIN,
split_offset: 0,
}
}
fn add_summary(&mut self, summary: &Self) {

View file

@ -1,4 +1,5 @@
use crate::UndoOperation;
use clock::Lamport;
use std::cmp;
use sum_tree::{Bias, SumTree};
@ -24,7 +25,7 @@ impl sum_tree::KeyedItem for UndoMapEntry {
}
}
#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord)]
struct UndoMapKey {
edit_id: clock::Lamport,
undo_id: clock::Lamport,
@ -32,7 +33,10 @@ struct UndoMapKey {
impl sum_tree::ContextLessSummary for UndoMapKey {
fn zero() -> Self {
Default::default()
UndoMapKey {
edit_id: Lamport::MIN,
undo_id: Lamport::MIN,
}
}
fn add_summary(&mut self, summary: &Self) {
@ -69,7 +73,7 @@ impl UndoMap {
cursor.seek(
&UndoMapKey {
edit_id,
undo_id: Default::default(),
undo_id: Lamport::MIN,
},
Bias::Left,
);
@ -93,7 +97,7 @@ impl UndoMap {
cursor.seek(
&UndoMapKey {
edit_id,
undo_id: Default::default(),
undo_id: Lamport::MIN,
},
Bias::Left,
);

View file

@ -656,7 +656,7 @@ impl Worktree {
pub fn replica_id(&self) -> ReplicaId {
match self {
Worktree::Local(_) => 0,
Worktree::Local(_) => ReplicaId::LOCAL,
Worktree::Remote(worktree) => worktree.replica_id,
}
}

View file

@ -1581,7 +1581,7 @@ fn guess_token_count(bytes: usize) -> usize {
#[cfg(test)]
mod tests {
use client::test::FakeServer;
use clock::FakeSystemClock;
use clock::{FakeSystemClock, ReplicaId};
use cloud_api_types::{CreateLlmTokenResponse, LlmToken};
use gpui::TestAppContext;
use http_client::FakeHttpClient;
@ -1839,7 +1839,7 @@ mod tests {
let buffer = cx.new(|_cx| {
Buffer::remote(
language::BufferId::new(1).unwrap(),
1,
ReplicaId::new(1),
language::Capability::ReadWrite,
"fn main() {\n println!(\"Hello\");\n}",
)