527b236b
Replace timestamp-wins conflict resolution with Lamport clock + OID
a73x 2026-03-21 13:21
Add a `clock: u64` field to Event that the DAG layer sets automatically: - Root events get clock=1 - Appended events get max_clock(DAG) + 1 - Merge events get max(local_max, remote_max) + 1 Status conflicts (close/reopen, close/merge) now resolve by comparing (clock, commit_oid) tuples instead of wall-clock timestamps. Higher clock wins; lexicographic OID breaks ties on equal clocks. Includes migrate_clocks() for rewriting pre-CRDT DAGs and 11 new tests. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
diff --git a/src/dag.rs b/src/dag.rs index d585071..82df8e1 100644 --- a/src/dag.rs +++ b/src/dag.rs @@ -8,15 +8,43 @@ use crate::signing::sign_event; /// The manifest blob content included in every event commit tree. const MANIFEST_JSON: &[u8] = br#"{"version":1,"format":"git-collab"}"#; /// Walk the entire DAG reachable from `tip` and return the maximum clock value. /// Returns 0 if the DAG is empty or all events have clock 0 (pre-migration). pub fn max_clock(repo: &Repository, tip: Oid) -> Result<u64, Error> { let mut revwalk = repo.revwalk()?; revwalk.set_sorting(Sort::TOPOLOGICAL)?; revwalk.push(tip)?; let mut max = 0u64; for oid_result in revwalk { let oid = oid_result?; let commit = repo.find_commit(oid)?; let tree = commit.tree()?; let entry = tree .get_name("event.json") .ok_or_else(|| git2::Error::from_str("missing event.json in commit tree"))?; let blob = repo.find_blob(entry.id())?; let event: Event = serde_json::from_slice(blob.content())?; if event.clock > max { max = event.clock; } } Ok(max) } /// Create an orphan commit (no parents) with the given event. /// Returns the new commit OID which also serves as the entity ID. /// Clones the event internally and sets clock=1. pub fn create_root_event( repo: &Repository, event: &Event, signing_key: &ed25519_dalek::SigningKey, ) -> Result<Oid, Error> { let detached = sign_event(event, signing_key)?; let event_json = serde_json::to_vec_pretty(event)?; let mut event = event.clone(); event.clock = 1; let detached = sign_event(&event, signing_key)?; let event_json = serde_json::to_vec_pretty(&event)?; let event_blob = repo.blob(&event_json)?; let sig_blob = repo.blob(detached.signature.as_bytes())?; @@ -39,14 +67,21 @@ pub fn create_root_event( } /// Append an event to an existing DAG. The current tip is the parent. /// Clones the event internally and sets clock = max_clock(tip) + 1. pub fn append_event( repo: &Repository, ref_name: &str, event: &Event, signing_key: &ed25519_dalek::SigningKey, ) -> Result<Oid, Error> { let detached = sign_event(event, signing_key)?; let event_json = serde_json::to_vec_pretty(event)?; let tip = repo.refname_to_id(ref_name)?; let current_max = max_clock(repo, tip)?; let mut event = event.clone(); event.clock = current_max + 1; let detached = sign_event(&event, signing_key)?; let event_json = serde_json::to_vec_pretty(&event)?; let event_blob = repo.blob(&event_json)?; let sig_blob = repo.blob(detached.signature.as_bytes())?; @@ -127,11 +162,16 @@ pub fn reconcile( return Ok(remote_oid); } // True fork — create merge commit // True fork — create merge commit with clock = max(local, remote) + 1 let local_max = max_clock(repo, local_oid)?; let remote_max = max_clock(repo, remote_oid)?; let merge_clock = std::cmp::max(local_max, remote_max) + 1; let merge_event = Event { timestamp: chrono::Utc::now().to_rfc3339(), author: merge_author.clone(), action: Action::Merge, clock: merge_clock, }; let detached = sign_event(&merge_event, signing_key)?; @@ -165,6 +205,72 @@ pub fn reconcile( Ok(oid) } /// Migrate a DAG ref so that every event with clock=0 gets a sequential clock /// assigned in topological order. Events that already have clock>0 are left as-is. /// This rewrites the commit chain (new OIDs) and updates the ref. pub fn migrate_clocks( repo: &Repository, ref_name: &str, signing_key: &ed25519_dalek::SigningKey, ) -> Result<(), Error> { let events = walk_events(repo, ref_name)?; // Check if migration is needed: any event with clock=0? let needs_migration = events.iter().any(|(_, e)| e.clock == 0); if !needs_migration { return Ok(()); } // Rebuild the chain with sequential clocks let mut clock = 0u64; let mut prev_oid: Option<Oid> = None; for (_old_oid, event) in &events { clock += 1; let mut migrated = event.clone(); if migrated.clock == 0 { migrated.clock = clock; } else { clock = migrated.clock; // respect existing clocks } let detached = sign_event(&migrated, signing_key)?; let event_json = serde_json::to_vec_pretty(&migrated)?; let event_blob = repo.blob(&event_json)?; let sig_blob = repo.blob(detached.signature.as_bytes())?; let pubkey_blob = repo.blob(detached.pubkey.as_bytes())?; let manifest_blob = repo.blob(MANIFEST_JSON)?; let mut tb = repo.treebuilder(None)?; tb.insert("event.json", event_blob, 0o100644)?; tb.insert("signature", sig_blob, 0o100644)?; tb.insert("pubkey", pubkey_blob, 0o100644)?; tb.insert("manifest.json", manifest_blob, 0o100644)?; let tree_oid = tb.write()?; let tree = repo.find_tree(tree_oid)?; let sig = author_signature(&migrated.author)?; let message = commit_message(&migrated.action); let parents: Vec<git2::Commit> = if let Some(pid) = prev_oid { vec![repo.find_commit(pid)?] } else { vec![] }; let parent_refs: Vec<&git2::Commit> = parents.iter().collect(); let new_oid = repo.commit(None, &sig, &sig, &message, &tree, &parent_refs)?; prev_oid = Some(new_oid); } // Update the ref to point to the new tip if let Some(new_tip) = prev_oid { repo.reference(ref_name, new_tip, true, "migrate clocks")?; } Ok(()) } fn commit_message(action: &Action) -> String { match action { Action::IssueOpen { title, .. } => format!("issue: open \"{}\"", title), diff --git a/src/event.rs b/src/event.rs index ec1a83c..135c19b 100644 --- a/src/event.rs +++ b/src/event.rs @@ -11,6 +11,7 @@ pub struct Event { pub timestamp: String, pub author: Author, pub action: Action, pub clock: u64, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/src/issue.rs b/src/issue.rs index fce8fbe..c4e0cf8 100644 --- a/src/issue.rs +++ b/src/issue.rs @@ -16,6 +16,7 @@ pub fn open(repo: &Repository, title: &str, body: &str) -> Result<String, crate: title: title.to_string(), body: body.to_string(), }, clock: 0, }; let oid = dag::create_root_event(repo, &event, &sk)?; let id = oid.to_string(); @@ -82,6 +83,7 @@ pub fn label(repo: &Repository, id_prefix: &str, label: &str) -> Result<(), crat action: Action::IssueLabel { label: label.to_string(), }, clock: 0, }; dag::append_event(repo, &ref_name, &event, &sk)?; Ok(()) @@ -97,6 +99,7 @@ pub fn unlabel(repo: &Repository, id_prefix: &str, label: &str) -> Result<(), cr action: Action::IssueUnlabel { label: label.to_string(), }, clock: 0, }; dag::append_event(repo, &ref_name, &event, &sk)?; Ok(()) @@ -116,6 +119,7 @@ pub fn assign( action: Action::IssueAssign { assignee: assignee.to_string(), }, clock: 0, }; dag::append_event(repo, &ref_name, &event, &sk)?; Ok(()) @@ -135,6 +139,7 @@ pub fn unassign( action: Action::IssueUnassign { assignee: assignee.to_string(), }, clock: 0, }; dag::append_event(repo, &ref_name, &event, &sk)?; Ok(()) @@ -161,6 +166,7 @@ pub fn edit( title: title.map(|s| s.to_string()), body: body.map(|s| s.to_string()), }, clock: 0, }; dag::append_event(repo, &ref_name, &event, &sk)?; Ok(()) @@ -176,6 +182,7 @@ pub fn comment(repo: &Repository, id_prefix: &str, body: &str) -> Result<(), cra action: Action::IssueComment { body: body.to_string(), }, clock: 0, }; dag::append_event(repo, &ref_name, &event, &sk)?; Ok(()) @@ -195,6 +202,7 @@ pub fn close( action: Action::IssueClose { reason: reason.map(|s| s.to_string()), }, clock: 0, }; dag::append_event(repo, &ref_name, &event, &sk)?; Ok(()) @@ -208,6 +216,7 @@ pub fn reopen(repo: &Repository, id_prefix: &str) -> Result<(), crate::error::Er timestamp: chrono::Utc::now().to_rfc3339(), author, action: Action::IssueReopen, clock: 0, }; dag::append_event(repo, &ref_name, &event, &sk)?; Ok(()) diff --git a/src/patch.rs b/src/patch.rs index cfae1b3..43bb9ae 100644 --- a/src/patch.rs +++ b/src/patch.rs @@ -50,6 +50,7 @@ pub fn create( branch: branch.to_string(), fixes: fixes.map(|s| s.to_string()), }, clock: 0, }; let oid = dag::create_root_event(repo, &event, &sk)?; let id = oid.to_string(); @@ -104,6 +105,7 @@ pub fn comment( timestamp: chrono::Utc::now().to_rfc3339(), author, action, clock: 0, }; dag::append_event(repo, &ref_name, &event, &sk)?; Ok(()) @@ -125,6 +127,7 @@ pub fn review( verdict, body: body.to_string(), }, clock: 0, }; dag::append_event(repo, &ref_name, &event, &sk)?; Ok(()) @@ -144,6 +147,7 @@ pub fn revise( action: Action::PatchRevise { body: body.map(|s| s.to_string()), }, clock: 0, }; dag::append_event(repo, &ref_name, &event, &sk)?; Ok(()) @@ -207,6 +211,7 @@ pub fn merge(repo: &Repository, id_prefix: &str) -> Result<PatchState, crate::er timestamp: chrono::Utc::now().to_rfc3339(), author: author.clone(), action: Action::PatchMerge, clock: 0, }; dag::append_event(repo, &ref_name, &event, &sk)?; @@ -219,6 +224,7 @@ pub fn merge(repo: &Repository, id_prefix: &str) -> Result<PatchState, crate::er action: Action::IssueClose { reason: Some(format!("Fixed by patch {:.8}", p.id)), }, clock: 0, }; dag::append_event(repo, &issue_ref, &close_event, &sk)?; } @@ -297,6 +303,7 @@ pub fn close( action: Action::PatchClose { reason: reason.map(|s| s.to_string()), }, clock: 0, }; dag::append_event(repo, &ref_name, &event, &sk)?; Ok(()) diff --git a/src/signing.rs b/src/signing.rs index 089a249..76a2790 100644 --- a/src/signing.rs +++ b/src/signing.rs @@ -322,6 +322,7 @@ mod tests { title: "Test".to_string(), body: "Body".to_string(), }, clock: 0, }; let sk = SigningKey::generate(&mut rand_core::OsRng); diff --git a/src/state.rs b/src/state.rs index 261d8b6..8028f0b 100644 --- a/src/state.rs +++ b/src/state.rs @@ -83,10 +83,9 @@ impl IssueState { let events = dag::walk_events(repo, ref_name)?; let mut state: Option<IssueState> = None; // Track the timestamp of the latest status-changing event so that // concurrent close/reopen conflicts resolve deterministically: // the event with the later timestamp wins. let mut status_ts: Option<String> = None; // Track the (clock, commit_oid_hex) of the latest status-changing event. // Higher clock wins; on tie, lexicographically higher OID wins. let mut status_key: Option<(u64, String)> = None; for (oid, event) in events { match event.action { @@ -117,11 +116,12 @@ impl IssueState { } Action::IssueClose { reason } => { if let Some(ref mut s) = state { if status_ts.as_ref().is_none_or(|ts| event.timestamp >= *ts) { let key = (event.clock, oid.to_string()); if status_key.as_ref().is_none_or(|k| key >= *k) { s.status = IssueStatus::Closed; s.close_reason = reason; s.closed_by = Some(oid); status_ts = Some(event.timestamp.clone()); status_key = Some(key); } } } @@ -161,11 +161,12 @@ impl IssueState { } Action::IssueReopen => { if let Some(ref mut s) = state { if status_ts.as_ref().is_none_or(|ts| event.timestamp >= *ts) { let key = (event.clock, oid.to_string()); if status_key.as_ref().is_none_or(|k| key >= *k) { s.status = IssueStatus::Open; s.close_reason = None; s.closed_by = None; status_ts = Some(event.timestamp.clone()); status_key = Some(key); } } } @@ -209,7 +210,7 @@ impl PatchState { let events = dag::walk_events(repo, ref_name)?; let mut state: Option<PatchState> = None; let mut status_ts: Option<String> = None; let mut status_key: Option<(u64, String)> = None; for (oid, event) in events { match event.action { @@ -275,17 +276,19 @@ impl PatchState { } Action::PatchClose { .. } => { if let Some(ref mut s) = state { if status_ts.as_ref().is_none_or(|ts| event.timestamp >= *ts) { let key = (event.clock, oid.to_string()); if status_key.as_ref().is_none_or(|k| key >= *k) { s.status = PatchStatus::Closed; status_ts = Some(event.timestamp.clone()); status_key = Some(key); } } } Action::PatchMerge => { if let Some(ref mut s) = state { if status_ts.as_ref().is_none_or(|ts| event.timestamp >= *ts) { let key = (event.clock, oid.to_string()); if status_key.as_ref().is_none_or(|k| key >= *k) { s.status = PatchStatus::Merged; status_ts = Some(event.timestamp.clone()); status_key = Some(key); } } } diff --git a/src/tui.rs b/src/tui.rs index 930b3da..9af74f2 100644 --- a/src/tui.rs +++ b/src/tui.rs @@ -1991,6 +1991,7 @@ mod tests { title: "Test Issue".to_string(), body: "This is the body".to_string(), }, clock: 0, }, ), ( @@ -2004,6 +2005,7 @@ mod tests { action: Action::IssueComment { body: "A comment on the issue".to_string(), }, clock: 0, }, ), ( @@ -2014,6 +2016,7 @@ mod tests { action: Action::IssueClose { reason: Some("fixed".to_string()), }, clock: 0, }, ), ] @@ -2097,6 +2100,7 @@ mod tests { title: "My Issue".to_string(), body: "Description here".to_string(), }, clock: 0, }; let detail = format_event_detail(&oid, &event); assert!(detail.contains("aaaaaaa")); @@ -2116,6 +2120,7 @@ mod tests { action: Action::IssueClose { reason: Some("resolved".to_string()), }, clock: 0, }; let detail = format_event_detail(&oid, &event); assert!(detail.contains("Issue Close")); @@ -2132,6 +2137,7 @@ mod tests { verdict: ReviewVerdict::Approve, body: "Looks good!".to_string(), }, clock: 0, }; let detail = format_event_detail(&oid, &event); assert!(detail.contains("Patch Review")); @@ -2146,6 +2152,7 @@ mod tests { timestamp: "2026-01-01T00:00:00Z".to_string(), author: test_author(), action: Action::IssueReopen, clock: 0, }; let detail = format_event_detail(&oid, &event); assert!(detail.contains("1234567")); diff --git a/tests/collab_test.rs b/tests/collab_test.rs index d68835b..1a7461f 100644 --- a/tests/collab_test.rs +++ b/tests/collab_test.rs @@ -95,6 +95,7 @@ fn test_issue_edit_updates_title_and_body() { title: Some("New title".to_string()), body: Some("New body".to_string()), }, clock: 0, }; dag::append_event(&repo, &ref_name, &event, &sk).unwrap(); @@ -119,6 +120,7 @@ fn test_issue_edit_partial_update() { title: None, body: Some("Added body".to_string()), }, clock: 0, }; dag::append_event(&repo, &ref_name, &event, &sk).unwrap(); @@ -329,6 +331,7 @@ fn test_patch_review_workflow() { action: Action::PatchRevise { body: Some("Updated implementation".to_string()), }, clock: 0, }; dag::append_event(&repo, &ref_name, &event, &sk).unwrap(); @@ -504,6 +507,7 @@ fn test_signed_event_in_dag() { title: "Signed issue".to_string(), body: "".to_string(), }, clock: 0, }; let oid = dag::create_root_event(&repo, &event, &sk).unwrap(); let id = oid.to_string(); @@ -629,6 +633,7 @@ fn create_branch_patch( branch: branch.to_string(), fixes: None, }, clock: 0, }; let oid = dag::create_root_event(repo, &event, &sk).unwrap(); let id = oid.to_string(); diff --git a/tests/common/mod.rs b/tests/common/mod.rs index f5343aa..cdfcf0c 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -66,6 +66,7 @@ pub fn open_issue(repo: &Repository, author: &Author, title: &str) -> (String, S title: title.to_string(), body: "".to_string(), }, clock: 0, }; let oid = dag::create_root_event(repo, &event, &sk).unwrap(); let id = oid.to_string(); @@ -83,6 +84,7 @@ pub fn add_comment(repo: &Repository, ref_name: &str, author: &Author, body: &st action: Action::IssueComment { body: body.to_string(), }, clock: 0, }; dag::append_event(repo, ref_name, &event, &sk).unwrap(); } @@ -94,6 +96,7 @@ pub fn close_issue(repo: &Repository, ref_name: &str, author: &Author) { timestamp: now(), author: author.clone(), action: Action::IssueClose { reason: None }, clock: 0, }; dag::append_event(repo, ref_name, &event, &sk).unwrap(); } @@ -105,6 +108,7 @@ pub fn reopen_issue(repo: &Repository, ref_name: &str, author: &Author) { timestamp: now(), author: author.clone(), action: Action::IssueReopen, clock: 0, }; dag::append_event(repo, ref_name, &event, &sk).unwrap(); } @@ -122,6 +126,7 @@ pub fn create_patch(repo: &Repository, author: &Author, title: &str) -> (String, branch: "test-branch".to_string(), fixes: None, }, clock: 0, }; let oid = dag::create_root_event(repo, &event, &sk).unwrap(); let id = oid.to_string(); @@ -140,6 +145,7 @@ pub fn add_review(repo: &Repository, ref_name: &str, author: &Author, verdict: R verdict, body: "review comment".to_string(), }, clock: 0, }; dag::append_event(repo, ref_name, &event, &sk).unwrap(); } diff --git a/tests/crdt_test.rs b/tests/crdt_test.rs new file mode 100644 index 0000000..82000d5 --- /dev/null +++ b/tests/crdt_test.rs @@ -0,0 +1,576 @@ mod common; use ed25519_dalek::SigningKey; use git2::{Oid, Repository}; use rand_core::OsRng; use tempfile::TempDir; use git_collab::dag; use git_collab::event::{Action, Author, Event}; use git_collab::state::{IssueState, IssueStatus, PatchState, PatchStatus}; fn alice() -> Author { Author { name: "Alice".to_string(), email: "alice@example.com".to_string(), } } fn bob() -> Author { Author { name: "Bob".to_string(), email: "bob@example.com".to_string(), } } fn test_sk() -> SigningKey { SigningKey::generate(&mut OsRng) } fn init_repo(dir: &std::path::Path) -> Repository { let repo = Repository::init(dir).unwrap(); { let mut config = repo.config().unwrap(); config.set_str("user.name", "Test").unwrap(); config.set_str("user.email", "test@test.com").unwrap(); } repo } fn read_event_clock(repo: &Repository, oid: Oid) -> u64 { let commit = repo.find_commit(oid).unwrap(); let tree = commit.tree().unwrap(); let entry = tree.get_name("event.json").unwrap(); let blob = repo.find_blob(entry.id()).unwrap(); let event: Event = serde_json::from_slice(blob.content()).unwrap(); event.clock } // ── Phase 2: Clock propagation tests ───────────────────────────── #[test] fn create_root_event_sets_clock_to_1() { let dir = TempDir::new().unwrap(); let repo = init_repo(dir.path()); let sk = test_sk(); let event = Event { timestamp: "2026-01-01T00:00:00Z".to_string(), author: alice(), action: Action::IssueOpen { title: "test".to_string(), body: "body".to_string(), }, clock: 0, // caller passes 0, DAG should override to 1 }; let oid = dag::create_root_event(&repo, &event, &sk).unwrap(); assert_eq!(read_event_clock(&repo, oid), 1); } #[test] fn append_event_increments_clock() { let dir = TempDir::new().unwrap(); let repo = init_repo(dir.path()); let sk = test_sk(); let open_event = Event { timestamp: "2026-01-01T00:00:00Z".to_string(), author: alice(), action: Action::IssueOpen { title: "test".to_string(), body: "body".to_string(), }, clock: 0, }; let root_oid = dag::create_root_event(&repo, &open_event, &sk).unwrap(); let ref_name = format!("refs/collab/issues/{}", root_oid); repo.reference(&ref_name, root_oid, false, "test").unwrap(); // First append should get clock=2 let comment_event = Event { timestamp: "2026-01-02T00:00:00Z".to_string(), author: alice(), action: Action::IssueComment { body: "comment 1".to_string(), }, clock: 0, }; let oid2 = dag::append_event(&repo, &ref_name, &comment_event, &sk).unwrap(); assert_eq!(read_event_clock(&repo, oid2), 2); // Second append should get clock=3 let comment_event2 = Event { timestamp: "2026-01-03T00:00:00Z".to_string(), author: alice(), action: Action::IssueComment { body: "comment 2".to_string(), }, clock: 0, }; let oid3 = dag::append_event(&repo, &ref_name, &comment_event2, &sk).unwrap(); assert_eq!(read_event_clock(&repo, oid3), 3); } #[test] fn max_clock_returns_highest_clock_in_dag() { let dir = TempDir::new().unwrap(); let repo = init_repo(dir.path()); let sk = test_sk(); let event = Event { timestamp: "2026-01-01T00:00:00Z".to_string(), author: alice(), action: Action::IssueOpen { title: "test".to_string(), body: "body".to_string(), }, clock: 0, }; let root_oid = dag::create_root_event(&repo, &event, &sk).unwrap(); let ref_name = format!("refs/collab/issues/{}", root_oid); repo.reference(&ref_name, root_oid, false, "test").unwrap(); assert_eq!(dag::max_clock(&repo, root_oid).unwrap(), 1); let comment = Event { timestamp: "2026-01-02T00:00:00Z".to_string(), author: alice(), action: Action::IssueComment { body: "comment".to_string(), }, clock: 0, }; let tip = dag::append_event(&repo, &ref_name, &comment, &sk).unwrap(); assert_eq!(dag::max_clock(&repo, tip).unwrap(), 2); } // ── Phase 3: Reconcile merge clock test ────────────────────────── #[test] fn reconcile_merge_clock_is_max_of_both_branches_plus_one() { let dir = TempDir::new().unwrap(); let repo = init_repo(dir.path()); let sk = test_sk(); // Create root event (clock=1) let open = Event { timestamp: "2026-01-01T00:00:00Z".to_string(), author: alice(), action: Action::IssueOpen { title: "test".to_string(), body: "body".to_string(), }, clock: 0, }; let root_oid = dag::create_root_event(&repo, &open, &sk).unwrap(); let local_ref = "refs/collab/issues/test-reconcile"; let remote_ref = "refs/collab/sync/issues/test-reconcile"; repo.reference(local_ref, root_oid, false, "test").unwrap(); repo.reference(remote_ref, root_oid, false, "test").unwrap(); // Append 2 events on local (clock=2,3) let comment1 = Event { timestamp: "2026-01-02T00:00:00Z".to_string(), author: alice(), action: Action::IssueComment { body: "local 1".to_string(), }, clock: 0, }; dag::append_event(&repo, local_ref, &comment1, &sk).unwrap(); let comment2 = Event { timestamp: "2026-01-03T00:00:00Z".to_string(), author: alice(), action: Action::IssueComment { body: "local 2".to_string(), }, clock: 0, }; dag::append_event(&repo, local_ref, &comment2, &sk).unwrap(); // Append 4 events on remote (clock=2,3,4,5) for i in 1..=4 { let comment = Event { timestamp: format!("2026-02-{:02}T00:00:00Z", i), author: bob(), action: Action::IssueComment { body: format!("remote {}", i), }, clock: 0, }; dag::append_event(&repo, remote_ref, &comment, &sk).unwrap(); } let merge_oid = dag::reconcile(&repo, local_ref, remote_ref, &alice(), &sk).unwrap(); // Remote max is 5, local max is 3, so merge should be 6 assert_eq!(read_event_clock(&repo, merge_oid), 6); } // ── Phase 5: Concurrent status resolution using clock+OID ─────── #[test] fn concurrent_issue_close_reopen_higher_clock_wins() { // When one branch closes and another reopens at the same clock, // the OID tiebreaker should produce a deterministic result. // But when clocks differ, the higher clock always wins. let dir = TempDir::new().unwrap(); let repo = init_repo(dir.path()); let sk = test_sk(); // Create root issue (clock=1) let open = Event { timestamp: "2026-01-01T00:00:00Z".to_string(), author: alice(), action: Action::IssueOpen { title: "test".to_string(), body: "body".to_string(), }, clock: 0, }; let root_oid = dag::create_root_event(&repo, &open, &sk).unwrap(); let local_ref = "refs/collab/issues/test-concurrent"; let remote_ref = "refs/collab/sync/issues/test-concurrent"; repo.reference(local_ref, root_oid, false, "test").unwrap(); repo.reference(remote_ref, root_oid, false, "test").unwrap(); // Local: close (clock=2) let close = Event { timestamp: "2026-01-02T00:00:00Z".to_string(), author: alice(), action: Action::IssueClose { reason: None }, clock: 0, }; dag::append_event(&repo, local_ref, &close, &sk).unwrap(); // Remote: add comment (clock=2) then reopen (clock=3) let comment = Event { timestamp: "2026-01-02T00:00:00Z".to_string(), author: bob(), action: Action::IssueComment { body: "comment".to_string(), }, clock: 0, }; dag::append_event(&repo, remote_ref, &comment, &sk).unwrap(); let reopen = Event { timestamp: "2026-01-03T00:00:00Z".to_string(), author: bob(), action: Action::IssueReopen, clock: 0, }; dag::append_event(&repo, remote_ref, &reopen, &sk).unwrap(); // Reconcile dag::reconcile(&repo, local_ref, remote_ref, &alice(), &sk).unwrap(); // After reconcile: close had clock=2, reopen had clock=3, so reopen wins let state = IssueState::from_ref(&repo, local_ref, "test-concurrent").unwrap(); assert_eq!(state.status, IssueStatus::Open); } #[test] fn concurrent_issue_same_clock_oid_breaks_tie() { // When two status changes have the same clock, higher OID (lexicographic) wins. let dir = TempDir::new().unwrap(); let repo = init_repo(dir.path()); let sk = test_sk(); // Create root issue (clock=1) let open = Event { timestamp: "2026-01-01T00:00:00Z".to_string(), author: alice(), action: Action::IssueOpen { title: "test".to_string(), body: "body".to_string(), }, clock: 0, }; let root_oid = dag::create_root_event(&repo, &open, &sk).unwrap(); let local_ref = "refs/collab/issues/test-tie"; let remote_ref = "refs/collab/sync/issues/test-tie"; repo.reference(local_ref, root_oid, false, "test").unwrap(); repo.reference(remote_ref, root_oid, false, "test").unwrap(); // Both branches: close at clock=2 (both directly from root) let close1 = Event { timestamp: "2026-01-02T00:00:00Z".to_string(), author: alice(), action: Action::IssueClose { reason: Some("alice close".to_string()), }, clock: 0, }; dag::append_event(&repo, local_ref, &close1, &sk).unwrap(); let close2 = Event { timestamp: "2026-01-02T00:00:00Z".to_string(), author: bob(), action: Action::IssueClose { reason: Some("bob close".to_string()), }, clock: 0, }; dag::append_event(&repo, remote_ref, &close2, &sk).unwrap(); // Reconcile dag::reconcile(&repo, local_ref, remote_ref, &alice(), &sk).unwrap(); // Both are closes with clock=2, so the issue should be closed. // The one with the higher OID wins and determines the close reason. let state = IssueState::from_ref(&repo, local_ref, "test-tie").unwrap(); assert_eq!(state.status, IssueStatus::Closed); // We can't predict which OID wins, but one of the reasons should be present assert!( state.close_reason == Some("alice close".to_string()) || state.close_reason == Some("bob close".to_string()) ); } #[test] fn concurrent_patch_close_merge_higher_clock_wins() { let dir = TempDir::new().unwrap(); let repo = init_repo(dir.path()); let sk = test_sk(); // Need a branch for the patch let sig = git2::Signature::now("Test", "test@test.com").unwrap(); let tree_oid = repo.treebuilder(None).unwrap().write().unwrap(); let tree = repo.find_tree(tree_oid).unwrap(); let initial = repo .commit(Some("refs/heads/main"), &sig, &sig, "initial", &tree, &[]) .unwrap(); let initial_commit = repo.find_commit(initial).unwrap(); repo.branch("test-branch", &initial_commit, false).unwrap(); // Create root patch (clock=1) let create = Event { timestamp: "2026-01-01T00:00:00Z".to_string(), author: alice(), action: Action::PatchCreate { title: "test patch".to_string(), body: "body".to_string(), base_ref: "main".to_string(), branch: "test-branch".to_string(), fixes: None, }, clock: 0, }; let root_oid = dag::create_root_event(&repo, &create, &sk).unwrap(); let local_ref = "refs/collab/patches/test-patch"; let remote_ref = "refs/collab/sync/patches/test-patch"; repo.reference(local_ref, root_oid, false, "test").unwrap(); repo.reference(remote_ref, root_oid, false, "test") .unwrap(); // Local: close (clock=2) let close = Event { timestamp: "2026-01-02T00:00:00Z".to_string(), author: alice(), action: Action::PatchClose { reason: None }, clock: 0, }; dag::append_event(&repo, local_ref, &close, &sk).unwrap(); // Remote: comment (clock=2), then merge (clock=3) let comment = Event { timestamp: "2026-01-02T00:00:00Z".to_string(), author: bob(), action: Action::PatchComment { body: "lgtm".to_string(), }, clock: 0, }; dag::append_event(&repo, remote_ref, &comment, &sk).unwrap(); let merge = Event { timestamp: "2026-01-03T00:00:00Z".to_string(), author: bob(), action: Action::PatchMerge, clock: 0, }; dag::append_event(&repo, remote_ref, &merge, &sk).unwrap(); // Reconcile dag::reconcile(&repo, local_ref, remote_ref, &alice(), &sk).unwrap(); // Merge at clock=3 should win over close at clock=2 let state = PatchState::from_ref(&repo, local_ref, "test-patch").unwrap(); assert_eq!(state.status, PatchStatus::Merged); } // ── Phase 7: Migration tests ──────────────────────────────────── #[test] fn migrate_clocks_assigns_sequential_clocks() { let dir = TempDir::new().unwrap(); let repo = init_repo(dir.path()); let sk = test_sk(); // Create a DAG with events that have clock=0 (simulating pre-migration data) // We need to create commits directly to simulate old format let open = Event { timestamp: "2026-01-01T00:00:00Z".to_string(), author: alice(), action: Action::IssueOpen { title: "test".to_string(), body: "body".to_string(), }, clock: 0, }; let root_oid = dag::create_root_event(&repo, &open, &sk).unwrap(); let ref_name = "refs/collab/issues/test-migrate"; repo.reference(ref_name, root_oid, false, "test").unwrap(); // Append a couple events let comment = Event { timestamp: "2026-01-02T00:00:00Z".to_string(), author: alice(), action: Action::IssueComment { body: "comment".to_string(), }, clock: 0, }; dag::append_event(&repo, ref_name, &comment, &sk).unwrap(); // After creation through the DAG functions, clocks should already be set // Let's verify the walk returns correct clocks let events = dag::walk_events(&repo, ref_name).unwrap(); assert_eq!(events.len(), 2); assert_eq!(events[0].1.clock, 1); assert_eq!(events[1].1.clock, 2); // Now test migrate_clocks on a ref that has correct clocks (should be a no-op effectively) dag::migrate_clocks(&repo, ref_name, &sk).unwrap(); let events_after = dag::walk_events(&repo, ref_name).unwrap(); assert_eq!(events_after.len(), 2); assert!(events_after[0].1.clock >= 1); assert!(events_after[1].1.clock >= 2); } #[test] fn migrate_clocks_on_zero_clock_events() { // Build a DAG with clock=0 events by writing directly (bypassing DAG functions) let dir = TempDir::new().unwrap(); let repo = init_repo(dir.path()); let sk = test_sk(); let event1 = Event { timestamp: "2026-01-01T00:00:00Z".to_string(), author: alice(), action: Action::IssueOpen { title: "test".to_string(), body: "body".to_string(), }, clock: 0, }; // Write directly with clock=0 (simulating pre-CRDT data) let oid1 = write_raw_event(&repo, &event1, &sk, &[]); let ref_name = "refs/collab/issues/test-migrate-zero"; repo.reference(ref_name, oid1, false, "test").unwrap(); let event2 = Event { timestamp: "2026-01-02T00:00:00Z".to_string(), author: alice(), action: Action::IssueComment { body: "comment".to_string(), }, clock: 0, }; let parent = repo.find_commit(oid1).unwrap(); let oid2 = write_raw_event(&repo, &event2, &sk, &[&parent]); repo.reference(ref_name, oid2, true, "test append").unwrap(); // Verify clocks are 0 assert_eq!(read_event_clock(&repo, oid1), 0); assert_eq!(read_event_clock(&repo, oid2), 0); // Migrate dag::migrate_clocks(&repo, ref_name, &sk).unwrap(); // After migration, clocks should be sequential (1, 2) let events = dag::walk_events(&repo, ref_name).unwrap(); assert_eq!(events.len(), 2); assert_eq!(events[0].1.clock, 1); assert_eq!(events[1].1.clock, 2); } // ── Phase 8: Serialization preservation test ───────────────────── #[test] fn clock_field_survives_serialization_round_trip() { let event = Event { timestamp: "2026-01-01T00:00:00Z".to_string(), author: alice(), action: Action::IssueOpen { title: "test".to_string(), body: "body".to_string(), }, clock: 42, }; let json = serde_json::to_string(&event).unwrap(); assert!(json.contains("\"clock\":42")); let deserialized: Event = serde_json::from_str(&json).unwrap(); assert_eq!(deserialized.clock, 42); } #[test] fn clock_field_in_dag_round_trip() { let dir = TempDir::new().unwrap(); let repo = init_repo(dir.path()); let sk = test_sk(); let event = Event { timestamp: "2026-01-01T00:00:00Z".to_string(), author: alice(), action: Action::IssueOpen { title: "test".to_string(), body: "body".to_string(), }, clock: 0, // Caller passes 0 }; let oid = dag::create_root_event(&repo, &event, &sk).unwrap(); let ref_name = format!("refs/collab/issues/{}", oid); repo.reference(&ref_name, oid, false, "test").unwrap(); // Read back and verify clock was set to 1 and persisted let events = dag::walk_events(&repo, &ref_name).unwrap(); assert_eq!(events.len(), 1); assert_eq!(events[0].1.clock, 1); } // ── Helper: write a raw event commit bypassing DAG clock logic ─── fn write_raw_event( repo: &Repository, event: &Event, sk: &SigningKey, parents: &[&git2::Commit], ) -> Oid { use git_collab::signing::sign_event; let detached = sign_event(event, sk).unwrap(); let json = serde_json::to_vec_pretty(event).unwrap(); let event_blob = repo.blob(&json).unwrap(); let sig_blob = repo.blob(detached.signature.as_bytes()).unwrap(); let pubkey_blob = repo.blob(detached.pubkey.as_bytes()).unwrap(); let manifest = br#"{"version":1,"format":"git-collab"}"#; let manifest_blob = repo.blob(manifest).unwrap(); let mut tb = repo.treebuilder(None).unwrap(); tb.insert("event.json", event_blob, 0o100644).unwrap(); tb.insert("signature", sig_blob, 0o100644).unwrap(); tb.insert("pubkey", pubkey_blob, 0o100644).unwrap(); tb.insert("manifest.json", manifest_blob, 0o100644).unwrap(); let tree_oid = tb.write().unwrap(); let tree = repo.find_tree(tree_oid).unwrap(); let sig = git2::Signature::now(&event.author.name, &event.author.email).unwrap(); repo.commit(None, &sig, &sig, "raw event", &tree, parents) .unwrap() } diff --git a/tests/signing_test.rs b/tests/signing_test.rs index c56c581..89eb86e 100644 --- a/tests/signing_test.rs +++ b/tests/signing_test.rs @@ -16,6 +16,7 @@ fn make_event() -> Event { title: "Test issue".to_string(), body: "This is a test".to_string(), }, clock: 0, } } @@ -209,6 +210,7 @@ fn event_json_uses_namespaced_action_types() { branch: "feature/fix-bug".to_string(), fixes: Some("deadbeef".to_string()), }, clock: 0, }; let json = serde_json::to_string(&event).unwrap(); diff --git a/tests/sync_test.rs b/tests/sync_test.rs index b73adb5..48a7445 100644 --- a/tests/sync_test.rs +++ b/tests/sync_test.rs @@ -286,6 +286,7 @@ fn test_patch_review_across_repos() { branch: "feature/x".to_string(), fixes: None, }, clock: 0, }; let sk = test_signing_key(); let oid = dag::create_root_event(&alice_repo, &event, &sk).unwrap(); @@ -306,6 +307,7 @@ fn test_patch_review_across_repos() { verdict: ReviewVerdict::Approve, body: "LGTM!".to_string(), }, clock: 0, }; dag::append_event(&bob_repo, &bob_ref, &review_event, &sk).unwrap(); sync::sync(&bob_repo, "origin").unwrap(); @@ -333,6 +335,7 @@ fn test_concurrent_review_and_revise() { branch: "feature/wip".to_string(), fixes: None, }, clock: 0, }; let sk = test_signing_key(); let oid = dag::create_root_event(&alice_repo, &event, &sk).unwrap(); @@ -350,6 +353,7 @@ fn test_concurrent_review_and_revise() { action: Action::PatchRevise { body: Some("Updated description".to_string()), }, clock: 0, }; dag::append_event(&alice_repo, &alice_ref, &revise_event, &sk).unwrap(); @@ -361,6 +365,7 @@ fn test_concurrent_review_and_revise() { verdict: ReviewVerdict::RequestChanges, body: "Needs work".to_string(), }, clock: 0, }; dag::append_event(&bob_repo, &bob_ref, &review_event, &sk).unwrap(); @@ -464,6 +469,7 @@ fn test_unsigned_event_sync_rejected() { title: "Unsigned issue".to_string(), body: "No signature".to_string(), }, clock: 0, }; let oid = create_unsigned_event(&alice_repo, &event); let id = oid.to_string(); @@ -511,6 +517,7 @@ fn test_tampered_event_sync_rejected() { title: "Tampered issue".to_string(), body: "Will be tampered".to_string(), }, clock: 0, }; let oid = create_tampered_event(&repo, &event); let id = oid.to_string();