a73x

527b236b

Replace timestamp-wins conflict resolution with Lamport clock + OID

a73x   2026-03-21 13:21

Add a `clock: u64` field to Event that the DAG layer sets automatically:
- Root events get clock=1
- Appended events get max_clock(DAG) + 1
- Merge events get max(local_max, remote_max) + 1

Status conflicts (close/reopen, close/merge) now resolve by comparing
(clock, commit_oid) tuples instead of wall-clock timestamps. Higher
clock wins; lexicographic OID breaks ties on equal clocks.

Includes migrate_clocks() for rewriting pre-CRDT DAGs and 11 new tests.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

diff --git a/src/dag.rs b/src/dag.rs
index d585071..82df8e1 100644
--- a/src/dag.rs
+++ b/src/dag.rs
@@ -8,15 +8,43 @@ use crate::signing::sign_event;
/// The manifest blob content included in every event commit tree.
const MANIFEST_JSON: &[u8] = br#"{"version":1,"format":"git-collab"}"#;

/// Walk the entire DAG reachable from `tip` and return the maximum clock value.
/// Returns 0 if the DAG is empty or all events have clock 0 (pre-migration).
pub fn max_clock(repo: &Repository, tip: Oid) -> Result<u64, Error> {
    let mut revwalk = repo.revwalk()?;
    revwalk.set_sorting(Sort::TOPOLOGICAL)?;
    revwalk.push(tip)?;

    let mut max = 0u64;
    for oid_result in revwalk {
        let oid = oid_result?;
        let commit = repo.find_commit(oid)?;
        let tree = commit.tree()?;
        let entry = tree
            .get_name("event.json")
            .ok_or_else(|| git2::Error::from_str("missing event.json in commit tree"))?;
        let blob = repo.find_blob(entry.id())?;
        let event: Event = serde_json::from_slice(blob.content())?;
        if event.clock > max {
            max = event.clock;
        }
    }
    Ok(max)
}

/// Create an orphan commit (no parents) with the given event.
/// Returns the new commit OID which also serves as the entity ID.
/// Clones the event internally and sets clock=1.
pub fn create_root_event(
    repo: &Repository,
    event: &Event,
    signing_key: &ed25519_dalek::SigningKey,
) -> Result<Oid, Error> {
    let detached = sign_event(event, signing_key)?;
    let event_json = serde_json::to_vec_pretty(event)?;
    let mut event = event.clone();
    event.clock = 1;

    let detached = sign_event(&event, signing_key)?;
    let event_json = serde_json::to_vec_pretty(&event)?;

    let event_blob = repo.blob(&event_json)?;
    let sig_blob = repo.blob(detached.signature.as_bytes())?;
@@ -39,14 +67,21 @@ pub fn create_root_event(
}

/// Append an event to an existing DAG. The current tip is the parent.
/// Clones the event internally and sets clock = max_clock(tip) + 1.
pub fn append_event(
    repo: &Repository,
    ref_name: &str,
    event: &Event,
    signing_key: &ed25519_dalek::SigningKey,
) -> Result<Oid, Error> {
    let detached = sign_event(event, signing_key)?;
    let event_json = serde_json::to_vec_pretty(event)?;
    let tip = repo.refname_to_id(ref_name)?;
    let current_max = max_clock(repo, tip)?;

    let mut event = event.clone();
    event.clock = current_max + 1;

    let detached = sign_event(&event, signing_key)?;
    let event_json = serde_json::to_vec_pretty(&event)?;

    let event_blob = repo.blob(&event_json)?;
    let sig_blob = repo.blob(detached.signature.as_bytes())?;
@@ -127,11 +162,16 @@ pub fn reconcile(
        return Ok(remote_oid);
    }

    // True fork — create merge commit
    // True fork — create merge commit with clock = max(local, remote) + 1
    let local_max = max_clock(repo, local_oid)?;
    let remote_max = max_clock(repo, remote_oid)?;
    let merge_clock = std::cmp::max(local_max, remote_max) + 1;

    let merge_event = Event {
        timestamp: chrono::Utc::now().to_rfc3339(),
        author: merge_author.clone(),
        action: Action::Merge,
        clock: merge_clock,
    };

    let detached = sign_event(&merge_event, signing_key)?;
@@ -165,6 +205,72 @@ pub fn reconcile(
    Ok(oid)
}

/// Migrate a DAG ref so that every event with clock=0 gets a sequential clock
/// assigned in topological order. Events that already have clock>0 are left as-is.
/// This rewrites the commit chain (new OIDs) and updates the ref.
pub fn migrate_clocks(
    repo: &Repository,
    ref_name: &str,
    signing_key: &ed25519_dalek::SigningKey,
) -> Result<(), Error> {
    let events = walk_events(repo, ref_name)?;

    // Check if migration is needed: any event with clock=0?
    let needs_migration = events.iter().any(|(_, e)| e.clock == 0);
    if !needs_migration {
        return Ok(());
    }

    // Rebuild the chain with sequential clocks
    let mut clock = 0u64;
    let mut prev_oid: Option<Oid> = None;

    for (_old_oid, event) in &events {
        clock += 1;
        let mut migrated = event.clone();
        if migrated.clock == 0 {
            migrated.clock = clock;
        } else {
            clock = migrated.clock; // respect existing clocks
        }

        let detached = sign_event(&migrated, signing_key)?;
        let event_json = serde_json::to_vec_pretty(&migrated)?;
        let event_blob = repo.blob(&event_json)?;
        let sig_blob = repo.blob(detached.signature.as_bytes())?;
        let pubkey_blob = repo.blob(detached.pubkey.as_bytes())?;
        let manifest_blob = repo.blob(MANIFEST_JSON)?;

        let mut tb = repo.treebuilder(None)?;
        tb.insert("event.json", event_blob, 0o100644)?;
        tb.insert("signature", sig_blob, 0o100644)?;
        tb.insert("pubkey", pubkey_blob, 0o100644)?;
        tb.insert("manifest.json", manifest_blob, 0o100644)?;
        let tree_oid = tb.write()?;
        let tree = repo.find_tree(tree_oid)?;

        let sig = author_signature(&migrated.author)?;
        let message = commit_message(&migrated.action);

        let parents: Vec<git2::Commit> = if let Some(pid) = prev_oid {
            vec![repo.find_commit(pid)?]
        } else {
            vec![]
        };
        let parent_refs: Vec<&git2::Commit> = parents.iter().collect();

        let new_oid = repo.commit(None, &sig, &sig, &message, &tree, &parent_refs)?;
        prev_oid = Some(new_oid);
    }

    // Update the ref to point to the new tip
    if let Some(new_tip) = prev_oid {
        repo.reference(ref_name, new_tip, true, "migrate clocks")?;
    }

    Ok(())
}

fn commit_message(action: &Action) -> String {
    match action {
        Action::IssueOpen { title, .. } => format!("issue: open \"{}\"", title),
diff --git a/src/event.rs b/src/event.rs
index ec1a83c..135c19b 100644
--- a/src/event.rs
+++ b/src/event.rs
@@ -11,6 +11,7 @@ pub struct Event {
    pub timestamp: String,
    pub author: Author,
    pub action: Action,
    pub clock: u64,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
diff --git a/src/issue.rs b/src/issue.rs
index fce8fbe..c4e0cf8 100644
--- a/src/issue.rs
+++ b/src/issue.rs
@@ -16,6 +16,7 @@ pub fn open(repo: &Repository, title: &str, body: &str) -> Result<String, crate:
            title: title.to_string(),
            body: body.to_string(),
        },
        clock: 0,
    };
    let oid = dag::create_root_event(repo, &event, &sk)?;
    let id = oid.to_string();
@@ -82,6 +83,7 @@ pub fn label(repo: &Repository, id_prefix: &str, label: &str) -> Result<(), crat
        action: Action::IssueLabel {
            label: label.to_string(),
        },
        clock: 0,
    };
    dag::append_event(repo, &ref_name, &event, &sk)?;
    Ok(())
@@ -97,6 +99,7 @@ pub fn unlabel(repo: &Repository, id_prefix: &str, label: &str) -> Result<(), cr
        action: Action::IssueUnlabel {
            label: label.to_string(),
        },
        clock: 0,
    };
    dag::append_event(repo, &ref_name, &event, &sk)?;
    Ok(())
@@ -116,6 +119,7 @@ pub fn assign(
        action: Action::IssueAssign {
            assignee: assignee.to_string(),
        },
        clock: 0,
    };
    dag::append_event(repo, &ref_name, &event, &sk)?;
    Ok(())
@@ -135,6 +139,7 @@ pub fn unassign(
        action: Action::IssueUnassign {
            assignee: assignee.to_string(),
        },
        clock: 0,
    };
    dag::append_event(repo, &ref_name, &event, &sk)?;
    Ok(())
@@ -161,6 +166,7 @@ pub fn edit(
            title: title.map(|s| s.to_string()),
            body: body.map(|s| s.to_string()),
        },
        clock: 0,
    };
    dag::append_event(repo, &ref_name, &event, &sk)?;
    Ok(())
@@ -176,6 +182,7 @@ pub fn comment(repo: &Repository, id_prefix: &str, body: &str) -> Result<(), cra
        action: Action::IssueComment {
            body: body.to_string(),
        },
        clock: 0,
    };
    dag::append_event(repo, &ref_name, &event, &sk)?;
    Ok(())
@@ -195,6 +202,7 @@ pub fn close(
        action: Action::IssueClose {
            reason: reason.map(|s| s.to_string()),
        },
        clock: 0,
    };
    dag::append_event(repo, &ref_name, &event, &sk)?;
    Ok(())
@@ -208,6 +216,7 @@ pub fn reopen(repo: &Repository, id_prefix: &str) -> Result<(), crate::error::Er
        timestamp: chrono::Utc::now().to_rfc3339(),
        author,
        action: Action::IssueReopen,
        clock: 0,
    };
    dag::append_event(repo, &ref_name, &event, &sk)?;
    Ok(())
diff --git a/src/patch.rs b/src/patch.rs
index cfae1b3..43bb9ae 100644
--- a/src/patch.rs
+++ b/src/patch.rs
@@ -50,6 +50,7 @@ pub fn create(
            branch: branch.to_string(),
            fixes: fixes.map(|s| s.to_string()),
        },
        clock: 0,
    };
    let oid = dag::create_root_event(repo, &event, &sk)?;
    let id = oid.to_string();
@@ -104,6 +105,7 @@ pub fn comment(
        timestamp: chrono::Utc::now().to_rfc3339(),
        author,
        action,
        clock: 0,
    };
    dag::append_event(repo, &ref_name, &event, &sk)?;
    Ok(())
@@ -125,6 +127,7 @@ pub fn review(
            verdict,
            body: body.to_string(),
        },
        clock: 0,
    };
    dag::append_event(repo, &ref_name, &event, &sk)?;
    Ok(())
@@ -144,6 +147,7 @@ pub fn revise(
        action: Action::PatchRevise {
            body: body.map(|s| s.to_string()),
        },
        clock: 0,
    };
    dag::append_event(repo, &ref_name, &event, &sk)?;
    Ok(())
@@ -207,6 +211,7 @@ pub fn merge(repo: &Repository, id_prefix: &str) -> Result<PatchState, crate::er
        timestamp: chrono::Utc::now().to_rfc3339(),
        author: author.clone(),
        action: Action::PatchMerge,
        clock: 0,
    };
    dag::append_event(repo, &ref_name, &event, &sk)?;

@@ -219,6 +224,7 @@ pub fn merge(repo: &Repository, id_prefix: &str) -> Result<PatchState, crate::er
                action: Action::IssueClose {
                    reason: Some(format!("Fixed by patch {:.8}", p.id)),
                },
                clock: 0,
            };
            dag::append_event(repo, &issue_ref, &close_event, &sk)?;
        }
@@ -297,6 +303,7 @@ pub fn close(
        action: Action::PatchClose {
            reason: reason.map(|s| s.to_string()),
        },
        clock: 0,
    };
    dag::append_event(repo, &ref_name, &event, &sk)?;
    Ok(())
diff --git a/src/signing.rs b/src/signing.rs
index 089a249..76a2790 100644
--- a/src/signing.rs
+++ b/src/signing.rs
@@ -322,6 +322,7 @@ mod tests {
                title: "Test".to_string(),
                body: "Body".to_string(),
            },
            clock: 0,
        };

        let sk = SigningKey::generate(&mut rand_core::OsRng);
diff --git a/src/state.rs b/src/state.rs
index 261d8b6..8028f0b 100644
--- a/src/state.rs
+++ b/src/state.rs
@@ -83,10 +83,9 @@ impl IssueState {
        let events = dag::walk_events(repo, ref_name)?;
        let mut state: Option<IssueState> = None;

        // Track the timestamp of the latest status-changing event so that
        // concurrent close/reopen conflicts resolve deterministically:
        // the event with the later timestamp wins.
        let mut status_ts: Option<String> = None;
        // Track the (clock, commit_oid_hex) of the latest status-changing event.
        // Higher clock wins; on tie, lexicographically higher OID wins.
        let mut status_key: Option<(u64, String)> = None;

        for (oid, event) in events {
            match event.action {
@@ -117,11 +116,12 @@ impl IssueState {
                }
                Action::IssueClose { reason } => {
                    if let Some(ref mut s) = state {
                        if status_ts.as_ref().is_none_or(|ts| event.timestamp >= *ts) {
                        let key = (event.clock, oid.to_string());
                        if status_key.as_ref().is_none_or(|k| key >= *k) {
                            s.status = IssueStatus::Closed;
                            s.close_reason = reason;
                            s.closed_by = Some(oid);
                            status_ts = Some(event.timestamp.clone());
                            status_key = Some(key);
                        }
                    }
                }
@@ -161,11 +161,12 @@ impl IssueState {
                }
                Action::IssueReopen => {
                    if let Some(ref mut s) = state {
                        if status_ts.as_ref().is_none_or(|ts| event.timestamp >= *ts) {
                        let key = (event.clock, oid.to_string());
                        if status_key.as_ref().is_none_or(|k| key >= *k) {
                            s.status = IssueStatus::Open;
                            s.close_reason = None;
                            s.closed_by = None;
                            status_ts = Some(event.timestamp.clone());
                            status_key = Some(key);
                        }
                    }
                }
@@ -209,7 +210,7 @@ impl PatchState {
        let events = dag::walk_events(repo, ref_name)?;
        let mut state: Option<PatchState> = None;

        let mut status_ts: Option<String> = None;
        let mut status_key: Option<(u64, String)> = None;

        for (oid, event) in events {
            match event.action {
@@ -275,17 +276,19 @@ impl PatchState {
                }
                Action::PatchClose { .. } => {
                    if let Some(ref mut s) = state {
                        if status_ts.as_ref().is_none_or(|ts| event.timestamp >= *ts) {
                        let key = (event.clock, oid.to_string());
                        if status_key.as_ref().is_none_or(|k| key >= *k) {
                            s.status = PatchStatus::Closed;
                            status_ts = Some(event.timestamp.clone());
                            status_key = Some(key);
                        }
                    }
                }
                Action::PatchMerge => {
                    if let Some(ref mut s) = state {
                        if status_ts.as_ref().is_none_or(|ts| event.timestamp >= *ts) {
                        let key = (event.clock, oid.to_string());
                        if status_key.as_ref().is_none_or(|k| key >= *k) {
                            s.status = PatchStatus::Merged;
                            status_ts = Some(event.timestamp.clone());
                            status_key = Some(key);
                        }
                    }
                }
diff --git a/src/tui.rs b/src/tui.rs
index 930b3da..9af74f2 100644
--- a/src/tui.rs
+++ b/src/tui.rs
@@ -1991,6 +1991,7 @@ mod tests {
                        title: "Test Issue".to_string(),
                        body: "This is the body".to_string(),
                    },
                    clock: 0,
                },
            ),
            (
@@ -2004,6 +2005,7 @@ mod tests {
                    action: Action::IssueComment {
                        body: "A comment on the issue".to_string(),
                    },
                    clock: 0,
                },
            ),
            (
@@ -2014,6 +2016,7 @@ mod tests {
                    action: Action::IssueClose {
                        reason: Some("fixed".to_string()),
                    },
                    clock: 0,
                },
            ),
        ]
@@ -2097,6 +2100,7 @@ mod tests {
                title: "My Issue".to_string(),
                body: "Description here".to_string(),
            },
            clock: 0,
        };
        let detail = format_event_detail(&oid, &event);
        assert!(detail.contains("aaaaaaa"));
@@ -2116,6 +2120,7 @@ mod tests {
            action: Action::IssueClose {
                reason: Some("resolved".to_string()),
            },
            clock: 0,
        };
        let detail = format_event_detail(&oid, &event);
        assert!(detail.contains("Issue Close"));
@@ -2132,6 +2137,7 @@ mod tests {
                verdict: ReviewVerdict::Approve,
                body: "Looks good!".to_string(),
            },
            clock: 0,
        };
        let detail = format_event_detail(&oid, &event);
        assert!(detail.contains("Patch Review"));
@@ -2146,6 +2152,7 @@ mod tests {
            timestamp: "2026-01-01T00:00:00Z".to_string(),
            author: test_author(),
            action: Action::IssueReopen,
            clock: 0,
        };
        let detail = format_event_detail(&oid, &event);
        assert!(detail.contains("1234567"));
diff --git a/tests/collab_test.rs b/tests/collab_test.rs
index d68835b..1a7461f 100644
--- a/tests/collab_test.rs
+++ b/tests/collab_test.rs
@@ -95,6 +95,7 @@ fn test_issue_edit_updates_title_and_body() {
            title: Some("New title".to_string()),
            body: Some("New body".to_string()),
        },
        clock: 0,
    };
    dag::append_event(&repo, &ref_name, &event, &sk).unwrap();

@@ -119,6 +120,7 @@ fn test_issue_edit_partial_update() {
            title: None,
            body: Some("Added body".to_string()),
        },
        clock: 0,
    };
    dag::append_event(&repo, &ref_name, &event, &sk).unwrap();

@@ -329,6 +331,7 @@ fn test_patch_review_workflow() {
        action: Action::PatchRevise {
            body: Some("Updated implementation".to_string()),
        },
        clock: 0,
    };
    dag::append_event(&repo, &ref_name, &event, &sk).unwrap();

@@ -504,6 +507,7 @@ fn test_signed_event_in_dag() {
            title: "Signed issue".to_string(),
            body: "".to_string(),
        },
        clock: 0,
    };
    let oid = dag::create_root_event(&repo, &event, &sk).unwrap();
    let id = oid.to_string();
@@ -629,6 +633,7 @@ fn create_branch_patch(
            branch: branch.to_string(),
            fixes: None,
        },
        clock: 0,
    };
    let oid = dag::create_root_event(repo, &event, &sk).unwrap();
    let id = oid.to_string();
diff --git a/tests/common/mod.rs b/tests/common/mod.rs
index f5343aa..cdfcf0c 100644
--- a/tests/common/mod.rs
+++ b/tests/common/mod.rs
@@ -66,6 +66,7 @@ pub fn open_issue(repo: &Repository, author: &Author, title: &str) -> (String, S
            title: title.to_string(),
            body: "".to_string(),
        },
        clock: 0,
    };
    let oid = dag::create_root_event(repo, &event, &sk).unwrap();
    let id = oid.to_string();
@@ -83,6 +84,7 @@ pub fn add_comment(repo: &Repository, ref_name: &str, author: &Author, body: &st
        action: Action::IssueComment {
            body: body.to_string(),
        },
        clock: 0,
    };
    dag::append_event(repo, ref_name, &event, &sk).unwrap();
}
@@ -94,6 +96,7 @@ pub fn close_issue(repo: &Repository, ref_name: &str, author: &Author) {
        timestamp: now(),
        author: author.clone(),
        action: Action::IssueClose { reason: None },
        clock: 0,
    };
    dag::append_event(repo, ref_name, &event, &sk).unwrap();
}
@@ -105,6 +108,7 @@ pub fn reopen_issue(repo: &Repository, ref_name: &str, author: &Author) {
        timestamp: now(),
        author: author.clone(),
        action: Action::IssueReopen,
        clock: 0,
    };
    dag::append_event(repo, ref_name, &event, &sk).unwrap();
}
@@ -122,6 +126,7 @@ pub fn create_patch(repo: &Repository, author: &Author, title: &str) -> (String,
            branch: "test-branch".to_string(),
            fixes: None,
        },
        clock: 0,
    };
    let oid = dag::create_root_event(repo, &event, &sk).unwrap();
    let id = oid.to_string();
@@ -140,6 +145,7 @@ pub fn add_review(repo: &Repository, ref_name: &str, author: &Author, verdict: R
            verdict,
            body: "review comment".to_string(),
        },
        clock: 0,
    };
    dag::append_event(repo, ref_name, &event, &sk).unwrap();
}
diff --git a/tests/crdt_test.rs b/tests/crdt_test.rs
new file mode 100644
index 0000000..82000d5
--- /dev/null
+++ b/tests/crdt_test.rs
@@ -0,0 +1,576 @@
mod common;

use ed25519_dalek::SigningKey;
use git2::{Oid, Repository};
use rand_core::OsRng;
use tempfile::TempDir;

use git_collab::dag;
use git_collab::event::{Action, Author, Event};
use git_collab::state::{IssueState, IssueStatus, PatchState, PatchStatus};

fn alice() -> Author {
    Author {
        name: "Alice".to_string(),
        email: "alice@example.com".to_string(),
    }
}

fn bob() -> Author {
    Author {
        name: "Bob".to_string(),
        email: "bob@example.com".to_string(),
    }
}

fn test_sk() -> SigningKey {
    SigningKey::generate(&mut OsRng)
}

fn init_repo(dir: &std::path::Path) -> Repository {
    let repo = Repository::init(dir).unwrap();
    {
        let mut config = repo.config().unwrap();
        config.set_str("user.name", "Test").unwrap();
        config.set_str("user.email", "test@test.com").unwrap();
    }
    repo
}

fn read_event_clock(repo: &Repository, oid: Oid) -> u64 {
    let commit = repo.find_commit(oid).unwrap();
    let tree = commit.tree().unwrap();
    let entry = tree.get_name("event.json").unwrap();
    let blob = repo.find_blob(entry.id()).unwrap();
    let event: Event = serde_json::from_slice(blob.content()).unwrap();
    event.clock
}

// ── Phase 2: Clock propagation tests ─────────────────────────────

#[test]
fn create_root_event_sets_clock_to_1() {
    let dir = TempDir::new().unwrap();
    let repo = init_repo(dir.path());
    let sk = test_sk();

    let event = Event {
        timestamp: "2026-01-01T00:00:00Z".to_string(),
        author: alice(),
        action: Action::IssueOpen {
            title: "test".to_string(),
            body: "body".to_string(),
        },
        clock: 0, // caller passes 0, DAG should override to 1
    };

    let oid = dag::create_root_event(&repo, &event, &sk).unwrap();
    assert_eq!(read_event_clock(&repo, oid), 1);
}

#[test]
fn append_event_increments_clock() {
    let dir = TempDir::new().unwrap();
    let repo = init_repo(dir.path());
    let sk = test_sk();

    let open_event = Event {
        timestamp: "2026-01-01T00:00:00Z".to_string(),
        author: alice(),
        action: Action::IssueOpen {
            title: "test".to_string(),
            body: "body".to_string(),
        },
        clock: 0,
    };

    let root_oid = dag::create_root_event(&repo, &open_event, &sk).unwrap();
    let ref_name = format!("refs/collab/issues/{}", root_oid);
    repo.reference(&ref_name, root_oid, false, "test").unwrap();

    // First append should get clock=2
    let comment_event = Event {
        timestamp: "2026-01-02T00:00:00Z".to_string(),
        author: alice(),
        action: Action::IssueComment {
            body: "comment 1".to_string(),
        },
        clock: 0,
    };
    let oid2 = dag::append_event(&repo, &ref_name, &comment_event, &sk).unwrap();
    assert_eq!(read_event_clock(&repo, oid2), 2);

    // Second append should get clock=3
    let comment_event2 = Event {
        timestamp: "2026-01-03T00:00:00Z".to_string(),
        author: alice(),
        action: Action::IssueComment {
            body: "comment 2".to_string(),
        },
        clock: 0,
    };
    let oid3 = dag::append_event(&repo, &ref_name, &comment_event2, &sk).unwrap();
    assert_eq!(read_event_clock(&repo, oid3), 3);
}

#[test]
fn max_clock_returns_highest_clock_in_dag() {
    let dir = TempDir::new().unwrap();
    let repo = init_repo(dir.path());
    let sk = test_sk();

    let event = Event {
        timestamp: "2026-01-01T00:00:00Z".to_string(),
        author: alice(),
        action: Action::IssueOpen {
            title: "test".to_string(),
            body: "body".to_string(),
        },
        clock: 0,
    };

    let root_oid = dag::create_root_event(&repo, &event, &sk).unwrap();
    let ref_name = format!("refs/collab/issues/{}", root_oid);
    repo.reference(&ref_name, root_oid, false, "test").unwrap();

    assert_eq!(dag::max_clock(&repo, root_oid).unwrap(), 1);

    let comment = Event {
        timestamp: "2026-01-02T00:00:00Z".to_string(),
        author: alice(),
        action: Action::IssueComment {
            body: "comment".to_string(),
        },
        clock: 0,
    };
    let tip = dag::append_event(&repo, &ref_name, &comment, &sk).unwrap();
    assert_eq!(dag::max_clock(&repo, tip).unwrap(), 2);
}

// ── Phase 3: Reconcile merge clock test ──────────────────────────

#[test]
fn reconcile_merge_clock_is_max_of_both_branches_plus_one() {
    let dir = TempDir::new().unwrap();
    let repo = init_repo(dir.path());
    let sk = test_sk();

    // Create root event (clock=1)
    let open = Event {
        timestamp: "2026-01-01T00:00:00Z".to_string(),
        author: alice(),
        action: Action::IssueOpen {
            title: "test".to_string(),
            body: "body".to_string(),
        },
        clock: 0,
    };
    let root_oid = dag::create_root_event(&repo, &open, &sk).unwrap();
    let local_ref = "refs/collab/issues/test-reconcile";
    let remote_ref = "refs/collab/sync/issues/test-reconcile";
    repo.reference(local_ref, root_oid, false, "test").unwrap();
    repo.reference(remote_ref, root_oid, false, "test").unwrap();

    // Append 2 events on local (clock=2,3)
    let comment1 = Event {
        timestamp: "2026-01-02T00:00:00Z".to_string(),
        author: alice(),
        action: Action::IssueComment {
            body: "local 1".to_string(),
        },
        clock: 0,
    };
    dag::append_event(&repo, local_ref, &comment1, &sk).unwrap();
    let comment2 = Event {
        timestamp: "2026-01-03T00:00:00Z".to_string(),
        author: alice(),
        action: Action::IssueComment {
            body: "local 2".to_string(),
        },
        clock: 0,
    };
    dag::append_event(&repo, local_ref, &comment2, &sk).unwrap();

    // Append 4 events on remote (clock=2,3,4,5)
    for i in 1..=4 {
        let comment = Event {
            timestamp: format!("2026-02-{:02}T00:00:00Z", i),
            author: bob(),
            action: Action::IssueComment {
                body: format!("remote {}", i),
            },
            clock: 0,
        };
        dag::append_event(&repo, remote_ref, &comment, &sk).unwrap();
    }

    let merge_oid = dag::reconcile(&repo, local_ref, remote_ref, &alice(), &sk).unwrap();
    // Remote max is 5, local max is 3, so merge should be 6
    assert_eq!(read_event_clock(&repo, merge_oid), 6);
}

// ── Phase 5: Concurrent status resolution using clock+OID ───────

#[test]
fn concurrent_issue_close_reopen_higher_clock_wins() {
    // When one branch closes and another reopens at the same clock,
    // the OID tiebreaker should produce a deterministic result.
    // But when clocks differ, the higher clock always wins.
    let dir = TempDir::new().unwrap();
    let repo = init_repo(dir.path());
    let sk = test_sk();

    // Create root issue (clock=1)
    let open = Event {
        timestamp: "2026-01-01T00:00:00Z".to_string(),
        author: alice(),
        action: Action::IssueOpen {
            title: "test".to_string(),
            body: "body".to_string(),
        },
        clock: 0,
    };
    let root_oid = dag::create_root_event(&repo, &open, &sk).unwrap();
    let local_ref = "refs/collab/issues/test-concurrent";
    let remote_ref = "refs/collab/sync/issues/test-concurrent";
    repo.reference(local_ref, root_oid, false, "test").unwrap();
    repo.reference(remote_ref, root_oid, false, "test").unwrap();

    // Local: close (clock=2)
    let close = Event {
        timestamp: "2026-01-02T00:00:00Z".to_string(),
        author: alice(),
        action: Action::IssueClose { reason: None },
        clock: 0,
    };
    dag::append_event(&repo, local_ref, &close, &sk).unwrap();

    // Remote: add comment (clock=2) then reopen (clock=3)
    let comment = Event {
        timestamp: "2026-01-02T00:00:00Z".to_string(),
        author: bob(),
        action: Action::IssueComment {
            body: "comment".to_string(),
        },
        clock: 0,
    };
    dag::append_event(&repo, remote_ref, &comment, &sk).unwrap();
    let reopen = Event {
        timestamp: "2026-01-03T00:00:00Z".to_string(),
        author: bob(),
        action: Action::IssueReopen,
        clock: 0,
    };
    dag::append_event(&repo, remote_ref, &reopen, &sk).unwrap();

    // Reconcile
    dag::reconcile(&repo, local_ref, remote_ref, &alice(), &sk).unwrap();

    // After reconcile: close had clock=2, reopen had clock=3, so reopen wins
    let state = IssueState::from_ref(&repo, local_ref, "test-concurrent").unwrap();
    assert_eq!(state.status, IssueStatus::Open);
}

#[test]
fn concurrent_issue_same_clock_oid_breaks_tie() {
    // When two status changes have the same clock, higher OID (lexicographic) wins.
    let dir = TempDir::new().unwrap();
    let repo = init_repo(dir.path());
    let sk = test_sk();

    // Create root issue (clock=1)
    let open = Event {
        timestamp: "2026-01-01T00:00:00Z".to_string(),
        author: alice(),
        action: Action::IssueOpen {
            title: "test".to_string(),
            body: "body".to_string(),
        },
        clock: 0,
    };
    let root_oid = dag::create_root_event(&repo, &open, &sk).unwrap();
    let local_ref = "refs/collab/issues/test-tie";
    let remote_ref = "refs/collab/sync/issues/test-tie";
    repo.reference(local_ref, root_oid, false, "test").unwrap();
    repo.reference(remote_ref, root_oid, false, "test").unwrap();

    // Both branches: close at clock=2 (both directly from root)
    let close1 = Event {
        timestamp: "2026-01-02T00:00:00Z".to_string(),
        author: alice(),
        action: Action::IssueClose {
            reason: Some("alice close".to_string()),
        },
        clock: 0,
    };
    dag::append_event(&repo, local_ref, &close1, &sk).unwrap();

    let close2 = Event {
        timestamp: "2026-01-02T00:00:00Z".to_string(),
        author: bob(),
        action: Action::IssueClose {
            reason: Some("bob close".to_string()),
        },
        clock: 0,
    };
    dag::append_event(&repo, remote_ref, &close2, &sk).unwrap();

    // Reconcile
    dag::reconcile(&repo, local_ref, remote_ref, &alice(), &sk).unwrap();

    // Both are closes with clock=2, so the issue should be closed.
    // The one with the higher OID wins and determines the close reason.
    let state = IssueState::from_ref(&repo, local_ref, "test-tie").unwrap();
    assert_eq!(state.status, IssueStatus::Closed);
    // We can't predict which OID wins, but one of the reasons should be present
    assert!(
        state.close_reason == Some("alice close".to_string())
            || state.close_reason == Some("bob close".to_string())
    );
}

#[test]
fn concurrent_patch_close_merge_higher_clock_wins() {
    let dir = TempDir::new().unwrap();
    let repo = init_repo(dir.path());
    let sk = test_sk();

    // Need a branch for the patch
    let sig = git2::Signature::now("Test", "test@test.com").unwrap();
    let tree_oid = repo.treebuilder(None).unwrap().write().unwrap();
    let tree = repo.find_tree(tree_oid).unwrap();
    let initial = repo
        .commit(Some("refs/heads/main"), &sig, &sig, "initial", &tree, &[])
        .unwrap();
    let initial_commit = repo.find_commit(initial).unwrap();
    repo.branch("test-branch", &initial_commit, false).unwrap();

    // Create root patch (clock=1)
    let create = Event {
        timestamp: "2026-01-01T00:00:00Z".to_string(),
        author: alice(),
        action: Action::PatchCreate {
            title: "test patch".to_string(),
            body: "body".to_string(),
            base_ref: "main".to_string(),
            branch: "test-branch".to_string(),
            fixes: None,
        },
        clock: 0,
    };
    let root_oid = dag::create_root_event(&repo, &create, &sk).unwrap();
    let local_ref = "refs/collab/patches/test-patch";
    let remote_ref = "refs/collab/sync/patches/test-patch";
    repo.reference(local_ref, root_oid, false, "test").unwrap();
    repo.reference(remote_ref, root_oid, false, "test")
        .unwrap();

    // Local: close (clock=2)
    let close = Event {
        timestamp: "2026-01-02T00:00:00Z".to_string(),
        author: alice(),
        action: Action::PatchClose { reason: None },
        clock: 0,
    };
    dag::append_event(&repo, local_ref, &close, &sk).unwrap();

    // Remote: comment (clock=2), then merge (clock=3)
    let comment = Event {
        timestamp: "2026-01-02T00:00:00Z".to_string(),
        author: bob(),
        action: Action::PatchComment {
            body: "lgtm".to_string(),
        },
        clock: 0,
    };
    dag::append_event(&repo, remote_ref, &comment, &sk).unwrap();
    let merge = Event {
        timestamp: "2026-01-03T00:00:00Z".to_string(),
        author: bob(),
        action: Action::PatchMerge,
        clock: 0,
    };
    dag::append_event(&repo, remote_ref, &merge, &sk).unwrap();

    // Reconcile
    dag::reconcile(&repo, local_ref, remote_ref, &alice(), &sk).unwrap();

    // Merge at clock=3 should win over close at clock=2
    let state = PatchState::from_ref(&repo, local_ref, "test-patch").unwrap();
    assert_eq!(state.status, PatchStatus::Merged);
}

// ── Phase 7: Migration tests ────────────────────────────────────

#[test]
fn migrate_clocks_assigns_sequential_clocks() {
    let dir = TempDir::new().unwrap();
    let repo = init_repo(dir.path());
    let sk = test_sk();

    // Create a DAG with events that have clock=0 (simulating pre-migration data)
    // We need to create commits directly to simulate old format
    let open = Event {
        timestamp: "2026-01-01T00:00:00Z".to_string(),
        author: alice(),
        action: Action::IssueOpen {
            title: "test".to_string(),
            body: "body".to_string(),
        },
        clock: 0,
    };
    let root_oid = dag::create_root_event(&repo, &open, &sk).unwrap();
    let ref_name = "refs/collab/issues/test-migrate";
    repo.reference(ref_name, root_oid, false, "test").unwrap();

    // Append a couple events
    let comment = Event {
        timestamp: "2026-01-02T00:00:00Z".to_string(),
        author: alice(),
        action: Action::IssueComment {
            body: "comment".to_string(),
        },
        clock: 0,
    };
    dag::append_event(&repo, ref_name, &comment, &sk).unwrap();

    // After creation through the DAG functions, clocks should already be set
    // Let's verify the walk returns correct clocks
    let events = dag::walk_events(&repo, ref_name).unwrap();
    assert_eq!(events.len(), 2);
    assert_eq!(events[0].1.clock, 1);
    assert_eq!(events[1].1.clock, 2);

    // Now test migrate_clocks on a ref that has correct clocks (should be a no-op effectively)
    dag::migrate_clocks(&repo, ref_name, &sk).unwrap();
    let events_after = dag::walk_events(&repo, ref_name).unwrap();
    assert_eq!(events_after.len(), 2);
    assert!(events_after[0].1.clock >= 1);
    assert!(events_after[1].1.clock >= 2);
}

#[test]
fn migrate_clocks_on_zero_clock_events() {
    // Build a DAG with clock=0 events by writing directly (bypassing DAG functions)
    let dir = TempDir::new().unwrap();
    let repo = init_repo(dir.path());
    let sk = test_sk();

    let event1 = Event {
        timestamp: "2026-01-01T00:00:00Z".to_string(),
        author: alice(),
        action: Action::IssueOpen {
            title: "test".to_string(),
            body: "body".to_string(),
        },
        clock: 0,
    };

    // Write directly with clock=0 (simulating pre-CRDT data)
    let oid1 = write_raw_event(&repo, &event1, &sk, &[]);
    let ref_name = "refs/collab/issues/test-migrate-zero";
    repo.reference(ref_name, oid1, false, "test").unwrap();

    let event2 = Event {
        timestamp: "2026-01-02T00:00:00Z".to_string(),
        author: alice(),
        action: Action::IssueComment {
            body: "comment".to_string(),
        },
        clock: 0,
    };
    let parent = repo.find_commit(oid1).unwrap();
    let oid2 = write_raw_event(&repo, &event2, &sk, &[&parent]);
    repo.reference(ref_name, oid2, true, "test append").unwrap();

    // Verify clocks are 0
    assert_eq!(read_event_clock(&repo, oid1), 0);
    assert_eq!(read_event_clock(&repo, oid2), 0);

    // Migrate
    dag::migrate_clocks(&repo, ref_name, &sk).unwrap();

    // After migration, clocks should be sequential (1, 2)
    let events = dag::walk_events(&repo, ref_name).unwrap();
    assert_eq!(events.len(), 2);
    assert_eq!(events[0].1.clock, 1);
    assert_eq!(events[1].1.clock, 2);
}

// ── Phase 8: Serialization preservation test ─────────────────────

#[test]
fn clock_field_survives_serialization_round_trip() {
    let event = Event {
        timestamp: "2026-01-01T00:00:00Z".to_string(),
        author: alice(),
        action: Action::IssueOpen {
            title: "test".to_string(),
            body: "body".to_string(),
        },
        clock: 42,
    };

    let json = serde_json::to_string(&event).unwrap();
    assert!(json.contains("\"clock\":42"));

    let deserialized: Event = serde_json::from_str(&json).unwrap();
    assert_eq!(deserialized.clock, 42);
}

#[test]
fn clock_field_in_dag_round_trip() {
    let dir = TempDir::new().unwrap();
    let repo = init_repo(dir.path());
    let sk = test_sk();

    let event = Event {
        timestamp: "2026-01-01T00:00:00Z".to_string(),
        author: alice(),
        action: Action::IssueOpen {
            title: "test".to_string(),
            body: "body".to_string(),
        },
        clock: 0, // Caller passes 0
    };

    let oid = dag::create_root_event(&repo, &event, &sk).unwrap();
    let ref_name = format!("refs/collab/issues/{}", oid);
    repo.reference(&ref_name, oid, false, "test").unwrap();

    // Read back and verify clock was set to 1 and persisted
    let events = dag::walk_events(&repo, &ref_name).unwrap();
    assert_eq!(events.len(), 1);
    assert_eq!(events[0].1.clock, 1);
}

// ── Helper: write a raw event commit bypassing DAG clock logic ───

fn write_raw_event(
    repo: &Repository,
    event: &Event,
    sk: &SigningKey,
    parents: &[&git2::Commit],
) -> Oid {
    use git_collab::signing::sign_event;

    let detached = sign_event(event, sk).unwrap();
    let json = serde_json::to_vec_pretty(event).unwrap();
    let event_blob = repo.blob(&json).unwrap();
    let sig_blob = repo.blob(detached.signature.as_bytes()).unwrap();
    let pubkey_blob = repo.blob(detached.pubkey.as_bytes()).unwrap();
    let manifest = br#"{"version":1,"format":"git-collab"}"#;
    let manifest_blob = repo.blob(manifest).unwrap();

    let mut tb = repo.treebuilder(None).unwrap();
    tb.insert("event.json", event_blob, 0o100644).unwrap();
    tb.insert("signature", sig_blob, 0o100644).unwrap();
    tb.insert("pubkey", pubkey_blob, 0o100644).unwrap();
    tb.insert("manifest.json", manifest_blob, 0o100644).unwrap();
    let tree_oid = tb.write().unwrap();
    let tree = repo.find_tree(tree_oid).unwrap();

    let sig = git2::Signature::now(&event.author.name, &event.author.email).unwrap();
    repo.commit(None, &sig, &sig, "raw event", &tree, parents)
        .unwrap()
}
diff --git a/tests/signing_test.rs b/tests/signing_test.rs
index c56c581..89eb86e 100644
--- a/tests/signing_test.rs
+++ b/tests/signing_test.rs
@@ -16,6 +16,7 @@ fn make_event() -> Event {
            title: "Test issue".to_string(),
            body: "This is a test".to_string(),
        },
        clock: 0,
    }
}

@@ -209,6 +210,7 @@ fn event_json_uses_namespaced_action_types() {
            branch: "feature/fix-bug".to_string(),
            fixes: Some("deadbeef".to_string()),
        },
        clock: 0,
    };

    let json = serde_json::to_string(&event).unwrap();
diff --git a/tests/sync_test.rs b/tests/sync_test.rs
index b73adb5..48a7445 100644
--- a/tests/sync_test.rs
+++ b/tests/sync_test.rs
@@ -286,6 +286,7 @@ fn test_patch_review_across_repos() {
            branch: "feature/x".to_string(),
            fixes: None,
        },
        clock: 0,
    };
    let sk = test_signing_key();
    let oid = dag::create_root_event(&alice_repo, &event, &sk).unwrap();
@@ -306,6 +307,7 @@ fn test_patch_review_across_repos() {
            verdict: ReviewVerdict::Approve,
            body: "LGTM!".to_string(),
        },
        clock: 0,
    };
    dag::append_event(&bob_repo, &bob_ref, &review_event, &sk).unwrap();
    sync::sync(&bob_repo, "origin").unwrap();
@@ -333,6 +335,7 @@ fn test_concurrent_review_and_revise() {
            branch: "feature/wip".to_string(),
            fixes: None,
        },
        clock: 0,
    };
    let sk = test_signing_key();
    let oid = dag::create_root_event(&alice_repo, &event, &sk).unwrap();
@@ -350,6 +353,7 @@ fn test_concurrent_review_and_revise() {
        action: Action::PatchRevise {
            body: Some("Updated description".to_string()),
        },
        clock: 0,
    };
    dag::append_event(&alice_repo, &alice_ref, &revise_event, &sk).unwrap();

@@ -361,6 +365,7 @@ fn test_concurrent_review_and_revise() {
            verdict: ReviewVerdict::RequestChanges,
            body: "Needs work".to_string(),
        },
        clock: 0,
    };
    dag::append_event(&bob_repo, &bob_ref, &review_event, &sk).unwrap();

@@ -464,6 +469,7 @@ fn test_unsigned_event_sync_rejected() {
            title: "Unsigned issue".to_string(),
            body: "No signature".to_string(),
        },
        clock: 0,
    };
    let oid = create_unsigned_event(&alice_repo, &event);
    let id = oid.to_string();
@@ -511,6 +517,7 @@ fn test_tampered_event_sync_rejected() {
            title: "Tampered issue".to_string(),
            body: "Will be tampered".to_string(),
        },
        clock: 0,
    };
    let oid = create_tampered_event(&repo, &event);
    let id = oid.to_string();