6149c147
Harden sync correctness and tidy up edge cases
alex emery 2026-04-12 04:57
- state.rs: use chrono for RFC3339 timestamp comparison instead of lexicographic strings, and log skipped corrupt refs via inspect_err rather than silently dropping them - sync.rs: batch pushes into a single git invocation with non-force refspecs so concurrent pushers can't silently discard each other's events; fall back to per-ref pushes on failure for precise error attribution - dag.rs: map merge_base failures to a typed DisjointHistories error with local/remote ref context; document migrate_clocks' OID-rewrite hazard for shared refs - signing.rs: add a regression test pinning canonical_json key order so a transitive preserve_order activation can't silently break signature determinism - server/http/repo_list.rs: gate test-only read_description helper behind cfg(test) to silence dead_code - tests: ensure init_repo sets HEAD to main so system-default branch names don't bleed into assertions Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
diff --git a/src/dag.rs b/src/dag.rs index 063d825..e1ac63e 100644 --- a/src/dag.rs +++ b/src/dag.rs @@ -212,7 +212,13 @@ pub fn reconcile( return Ok((local_oid, ReconcileOutcome::AlreadyCurrent)); } let merge_base = repo.merge_base(local_oid, remote_oid)?; let merge_base = repo.merge_base(local_oid, remote_oid).map_err(|e| { Error::DisjointHistories { local_ref: local_ref.to_string(), remote_ref: remote_ref.to_string(), detail: e.message().to_string(), } })?; if merge_base == remote_oid { // Remote is ancestor of local — local is ahead @@ -258,7 +264,11 @@ pub fn reconcile( /// Migrate a DAG ref so that every event with clock=0 gets a sequential clock /// assigned in topological order. Events that already have clock>0 are left as-is. /// This rewrites the commit chain (new OIDs) and updates the ref. /// /// **WARNING**: This rewrites the commit chain, producing new OIDs for every /// commit. If the ref has already been pushed to a remote, other users who /// fetched the old OIDs will encounter disjoint histories on their next sync. /// Only call this on refs that have not been shared. pub fn migrate_clocks( repo: &Repository, ref_name: &str, diff --git a/src/error.rs b/src/error.rs index 9f3ea51..4443e62 100644 --- a/src/error.rs +++ b/src/error.rs @@ -40,4 +40,11 @@ pub enum Error { #[error("ambiguous id prefix '{prefix}': {count} matches")] AmbiguousId { prefix: String, count: usize }, #[error("disjoint histories: local ref '{local_ref}' and remote ref '{remote_ref}' share no common ancestor ({detail})")] DisjointHistories { local_ref: String, remote_ref: String, detail: String, }, } diff --git a/src/server/http/repo_list.rs b/src/server/http/repo_list.rs index 111ea63..8c21d30 100644 --- a/src/server/http/repo_list.rs +++ b/src/server/http/repo_list.rs @@ -100,6 +100,7 @@ fn resolve_description(entry: &crate::repos::RepoEntry, repo: Option<&git2::Repo .unwrap_or_default() } #[cfg(test)] fn read_description(entry: &crate::repos::RepoEntry) -> String { let repo = crate::repos::open(entry).ok(); resolve_description(entry, repo.as_ref()) diff --git a/src/signing.rs b/src/signing.rs index 883f1ff..b4eb9b9 100644 --- a/src/signing.rs +++ b/src/signing.rs @@ -139,11 +139,11 @@ pub fn load_verifying_key(config_dir: &Path) -> Result<VerifyingKey, Error> { .map_err(|e| Error::Verification(format!("invalid verifying key: {}", e))) } /// Serialize an Event to canonical JSON bytes. /// Serialize an Event to canonical JSON bytes with deterministic key ordering. /// /// Uses `serde_json::Value` as an intermediate step. Since serde_json uses /// BTreeMap-backed Map (no `preserve_order` feature), keys are sorted /// alphabetically, ensuring deterministic output. /// Relies on `serde_json::Map` being backed by `BTreeMap` (sorted keys) when /// the `preserve_order` feature is **not** enabled. A test below verifies this /// invariant so we catch breakage from transitive feature activation. pub fn canonical_json(event: &Event) -> Result<Vec<u8>, Error> { let value = serde_json::to_value(event)?; let json = serde_json::to_string(&value)?; @@ -335,4 +335,33 @@ mod tests { let status = verify_detached(&event, &sig).unwrap(); assert_eq!(status, VerifyStatus::Valid); } #[test] fn canonical_json_keys_are_sorted() { // Guard against serde_json's `preserve_order` feature being activated // by a transitive dependency, which would break signature determinism. let event = Event { timestamp: "2026-03-21T00:00:00Z".to_string(), author: Author { name: "Alice".to_string(), email: "alice@example.com".to_string(), }, action: Action::IssueOpen { title: "Test".to_string(), body: "Body".to_string(), relates_to: None, }, clock: 1, }; let json = canonical_json(&event).unwrap(); let parsed: serde_json::Value = serde_json::from_slice(&json).unwrap(); if let serde_json::Value::Object(map) = parsed { let keys: Vec<&String> = map.keys().collect(); let mut sorted = keys.clone(); sorted.sort(); assert_eq!(keys, sorted, "top-level JSON keys must be alphabetically sorted"); } else { panic!("expected JSON object"); } } } diff --git a/src/state.rs b/src/state.rs index a57edd5..c642eed 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,5 +1,6 @@ use std::fmt; use chrono::{DateTime, Utc}; use git2::{Oid, Repository}; use serde::{Deserialize, Serialize}; @@ -7,6 +8,14 @@ use crate::cache; use crate::dag; use crate::event::{Action, Author, ReviewVerdict}; /// Parse an RFC3339 timestamp, returning the earliest representable time on /// failure so that unparseable timestamps sort before any valid one. fn parse_timestamp(s: &str) -> DateTime<Utc> { DateTime::parse_from_rfc3339(s) .map(|dt| dt.with_timezone(&Utc)) .unwrap_or(DateTime::<Utc>::MIN_UTC) } fn serialize_oid<S: serde::Serializer>(oid: &Oid, s: S) -> Result<S::Ok, S::Error> { s.serialize_str(&oid.to_string()) } @@ -232,15 +241,16 @@ impl IssueState { ) -> Result<Self, crate::error::Error> { let events = dag::walk_events(repo, ref_name)?; let mut state: Option<IssueState> = None; let mut max_timestamp = String::new(); let mut latest: Option<(DateTime<Utc>, String)> = None; // Track the (clock, commit_oid_hex) of the latest status-changing event. // Higher clock wins; on tie, lexicographically higher OID wins. let mut status_key: Option<(u64, String)> = None; for (oid, event) in events { if event.timestamp > max_timestamp { max_timestamp = event.timestamp.clone(); let ts = parse_timestamp(&event.timestamp); if latest.as_ref().is_none_or(|(prev, _)| ts > *prev) { latest = Some((ts, event.timestamp.clone())); } match event.action { Action::IssueOpen { title, body, relates_to } => { @@ -326,13 +336,12 @@ impl IssueState { } } } Action::Merge => {} _ => {} } } if let Some(ref mut s) = state { s.last_updated = max_timestamp; s.last_updated = latest.map(|(_, raw)| raw).unwrap_or_default(); } state.ok_or_else(|| git2::Error::from_str("no IssueOpen event found in DAG").into()) } @@ -427,13 +436,14 @@ impl PatchState { ) -> Result<Self, crate::error::Error> { let events = dag::walk_events(repo, ref_name)?; let mut state: Option<PatchState> = None; let mut max_timestamp = String::new(); let mut latest: Option<(DateTime<Utc>, String)> = None; let mut status_key: Option<(u64, String)> = None; for (oid, event) in events { if event.timestamp > max_timestamp { max_timestamp = event.timestamp.clone(); let ts = parse_timestamp(&event.timestamp); if latest.as_ref().is_none_or(|(prev, _)| ts > *prev) { latest = Some((ts, event.timestamp.clone())); } match event.action { Action::PatchCreate { @@ -538,13 +548,12 @@ impl PatchState { } } } Action::Merge => {} _ => {} } } if let Some(ref mut s) = state { s.last_updated = max_timestamp; s.last_updated = latest.map(|(_, raw)| raw).unwrap_or_default(); s.check_auto_merge(repo); } state.ok_or_else(|| git2::Error::from_str("no PatchCreate event found in DAG").into()) @@ -635,7 +644,11 @@ pub fn list_issues(repo: &Repository) -> Result<Vec<IssueState>, crate::error::E let items = collab_refs(repo, "issues")? .into_iter() .filter(|(_, id)| !archived_ids.contains(id)) .filter_map(|(ref_name, id)| IssueState::from_ref(repo, &ref_name, &id).ok()) .filter_map(|(ref_name, id)| { IssueState::from_ref(repo, &ref_name, &id) .inspect_err(|e| eprintln!("warning: skipping issue {:.8}: {}", id, e)) .ok() }) .collect(); Ok(items) } @@ -649,7 +662,11 @@ pub fn list_patches(repo: &Repository) -> Result<Vec<PatchState>, crate::error:: let items = collab_refs(repo, "patches")? .into_iter() .filter(|(_, id)| !archived_ids.contains(id)) .filter_map(|(ref_name, id)| PatchState::from_ref(repo, &ref_name, &id).ok()) .filter_map(|(ref_name, id)| { PatchState::from_ref(repo, &ref_name, &id) .inspect_err(|e| eprintln!("warning: skipping patch {:.8}: {}", id, e)) .ok() }) .collect(); Ok(items) } @@ -663,15 +680,17 @@ pub fn list_issues_with_archived(repo: &Repository) -> Result<Vec<IssueState>, c // Archived first so they take priority for (ref_name, id) in collab_archive_refs(repo, "issues")? { if seen.insert(id.clone()) { if let Ok(state) = IssueState::from_ref(repo, &ref_name, &id) { items.push(state); match IssueState::from_ref(repo, &ref_name, &id) { Ok(state) => items.push(state), Err(e) => eprintln!("warning: skipping issue {:.8}: {}", id, e), } } } for (ref_name, id) in collab_refs(repo, "issues")? { if seen.insert(id.clone()) { if let Ok(state) = IssueState::from_ref(repo, &ref_name, &id) { items.push(state); match IssueState::from_ref(repo, &ref_name, &id) { Ok(state) => items.push(state), Err(e) => eprintln!("warning: skipping issue {:.8}: {}", id, e), } } } @@ -686,15 +705,17 @@ pub fn list_patches_with_archived(repo: &Repository) -> Result<Vec<PatchState>, for (ref_name, id) in collab_archive_refs(repo, "patches")? { if seen.insert(id.clone()) { if let Ok(state) = PatchState::from_ref(repo, &ref_name, &id) { items.push(state); match PatchState::from_ref(repo, &ref_name, &id) { Ok(state) => items.push(state), Err(e) => eprintln!("warning: skipping patch {:.8}: {}", id, e), } } } for (ref_name, id) in collab_refs(repo, "patches")? { if seen.insert(id.clone()) { if let Ok(state) = PatchState::from_ref(repo, &ref_name, &id) { items.push(state); match PatchState::from_ref(repo, &ref_name, &id) { Ok(state) => items.push(state), Err(e) => eprintln!("warning: skipping patch {:.8}: {}", id, e), } } } diff --git a/src/sync.rs b/src/sync.rs index da0c0ed..bf2cbe5 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -147,9 +147,50 @@ impl SyncState { // Push helpers (T003, T004) // --------------------------------------------------------------------------- /// Push a single ref to the remote. Returns a RefPushResult. fn push_ref(workdir: &Path, remote_name: &str, ref_name: &str) -> RefPushResult { let refspec = format!("+{}:{}", ref_name, ref_name); /// Push multiple refs to the remote in a single `git push` invocation. /// /// Uses non-force refspecs so the remote will reject the push if it has /// commits we haven't reconciled. This prevents silently discarding events /// that another pusher added between our fetch and push. /// /// On success, all refs are marked as Pushed. On failure, falls back to /// per-ref pushes to determine which specific refs failed. fn push_refs_batched(workdir: &Path, remote_name: &str, refs: &[String]) -> Vec<RefPushResult> { if refs.is_empty() { return Vec::new(); } let refspecs: Vec<String> = refs.iter().map(|r| format!("{}:{}", r, r)).collect(); let mut args = vec!["push", remote_name]; args.extend(refspecs.iter().map(|s| s.as_str())); match Command::new("git") .args(&args) .current_dir(workdir) .output() { Ok(output) if output.status.success() => { // All refs pushed successfully refs.iter() .map(|r| RefPushResult { ref_name: r.clone(), status: PushStatus::Pushed, error: None, }) .collect() } Ok(_) | Err(_) => { // Batch failed — retry each ref individually to isolate failures refs.iter() .map(|ref_name| push_ref_single(workdir, remote_name, ref_name)) .collect() } } } /// Push a single ref to the remote. Used as a fallback when batched push fails. fn push_ref_single(workdir: &Path, remote_name: &str, ref_name: &str) -> RefPushResult { let refspec = format!("{}:{}", ref_name, ref_name); match Command::new("git") .args(["push", remote_name, &refspec]) .current_dir(workdir) @@ -314,7 +355,10 @@ pub fn sync(repo: &Repository, remote_name: &str) -> Result<(), Error> { } // Step 2: Reconcile // Re-open repo to see the fetched refs (git2 caches ref state) // Re-open the repo because git2 caches the ref list at open time. // The `git fetch` above wrote new refs to disk, but the original // `repo` handle won't see them. `repo.path()` returns the `.git` // directory, which is the correct argument to `Repository::open`. let repo = Repository::open(repo.path())?; let sk = signing::load_signing_key(&signing::signing_key_dir()?)?; reconcile_refs(&repo, "issues", &author, &sk)?; @@ -327,7 +371,7 @@ pub fn sync(repo: &Repository, remote_name: &str) -> Result<(), Error> { if refs_to_push.is_empty() { println!("Nothing to push."); } else { let sync_result = push_refs_individually(&workdir, remote_name, &refs_to_push); let sync_result = push_refs(&workdir, remote_name, &refs_to_push); if !sync_result.is_complete() { // Save state for resume @@ -393,7 +437,7 @@ fn sync_resume( // Push the pending refs let ref_names: Vec<String> = state.pending_refs.iter().map(|(r, _)| r.clone()).collect(); let sync_result = push_refs_individually(workdir, remote_name, &ref_names); let sync_result = push_refs(workdir, remote_name, &ref_names); if sync_result.is_complete() { // All pending refs pushed successfully @@ -437,22 +481,20 @@ fn sync_resume( } } /// Push refs one at a time, printing per-ref status, and return aggregated results. fn push_refs_individually(workdir: &Path, remote_name: &str, refs: &[String]) -> SyncResult { let mut results = Vec::new(); for ref_name in refs { let result = push_ref(workdir, remote_name, ref_name); /// Push all refs in a single batch, printing per-ref status, and return aggregated results. fn push_refs(workdir: &Path, remote_name: &str, refs: &[String]) -> SyncResult { let results = push_refs_batched(workdir, remote_name, refs); for result in &results { match &result.status { PushStatus::Pushed => println!(" Pushed {}", ref_name), PushStatus::Pushed => println!(" Pushed {}", result.ref_name), PushStatus::Failed => { eprintln!( " FAILED {}: {}", ref_name, result.ref_name, result.error.as_deref().unwrap_or("unknown error") ); } } results.push(result); } SyncResult { results, diff --git a/tests/collab_test.rs b/tests/collab_test.rs index 81401da..6f2f3d1 100644 --- a/tests/collab_test.rs +++ b/tests/collab_test.rs @@ -10,7 +10,7 @@ use git_collab::state::{self, IssueState, IssueStatus, PatchState, PatchStatus}; use common::{ add_comment, add_review, alice, bob, close_issue, create_patch, init_repo, now, open_issue, reopen_issue, setup_signing_key, test_signing_key, reopen_issue, setup_signing_key, test_signing_key, ScopedTestConfig, }; // --------------------------------------------------------------------------- @@ -752,6 +752,8 @@ use git_collab::patch; #[test] fn test_create_patch_from_branch_populates_branch_field() { // T009: creating a patch from current branch populates `branch` field let cfg = ScopedTestConfig::new(); cfg.ensure_signing_key(); let tmp = TempDir::new().unwrap(); let repo = init_repo(tmp.path(), &alice()); make_initial_commit(&repo, "main"); @@ -772,6 +774,8 @@ fn test_create_patch_from_branch_populates_branch_field() { #[test] fn test_create_duplicate_patch_for_same_branch_returns_error() { // T011: creating a duplicate patch for same branch returns error let cfg = ScopedTestConfig::new(); cfg.ensure_signing_key(); let tmp = TempDir::new().unwrap(); let repo = init_repo(tmp.path(), &alice()); make_initial_commit(&repo, "main"); @@ -939,6 +943,8 @@ fn test_auto_detect_merged_patch_via_git_merge() { // When a user merges the patch branch into the base branch manually // (using git merge), PatchState should auto-detect that the patch // is merged without needing `patch merge`. let cfg = ScopedTestConfig::new(); cfg.ensure_signing_key(); let tmp = TempDir::new().unwrap(); let repo = init_repo(tmp.path(), &alice()); make_initial_commit(&repo, "main"); @@ -968,6 +974,8 @@ fn test_auto_detect_merged_patch_via_git_merge() { fn test_auto_detect_merged_patch_deleted_branch() { // If the patch branch was deleted after a manual merge, // auto-detection should not crash — patch stays Open. let cfg = ScopedTestConfig::new(); cfg.ensure_signing_key(); let tmp = TempDir::new().unwrap(); let repo = init_repo(tmp.path(), &alice()); make_initial_commit(&repo, "main"); @@ -992,6 +1000,8 @@ fn test_cache_does_not_defeat_auto_detect_merge() { // Regression: from_ref() returned cached Open status even after the // patch branch was merged into main, because the cache hit bypassed // the auto-detect merge logic that only ran in from_ref_uncached(). let cfg = ScopedTestConfig::new(); cfg.ensure_signing_key(); let tmp = TempDir::new().unwrap(); let repo = init_repo(tmp.path(), &alice()); make_initial_commit(&repo, "main"); diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 14be07c..8801e5d 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -177,6 +177,8 @@ impl Drop for ScopedTestConfig { /// and an initial empty commit on `main`. pub fn init_repo(dir: &Path, author: &Author) -> Repository { let repo = Repository::init(dir).expect("init repo"); // Ensure HEAD points to main regardless of the system's default branch name repo.set_head("refs/heads/main").expect("set HEAD to main"); { let mut config = repo.config().unwrap(); config.set_str("user.name", &author.name).unwrap();