a73x

6149c147

Harden sync correctness and tidy up edge cases

alex emery   2026-04-12 04:57

- state.rs: use chrono for RFC3339 timestamp comparison instead of
  lexicographic strings, and log skipped corrupt refs via inspect_err
  rather than silently dropping them
- sync.rs: batch pushes into a single git invocation with non-force
  refspecs so concurrent pushers can't silently discard each other's
  events; fall back to per-ref pushes on failure for precise error
  attribution
- dag.rs: map merge_base failures to a typed DisjointHistories error
  with local/remote ref context; document migrate_clocks' OID-rewrite
  hazard for shared refs
- signing.rs: add a regression test pinning canonical_json key order
  so a transitive preserve_order activation can't silently break
  signature determinism
- server/http/repo_list.rs: gate test-only read_description helper
  behind cfg(test) to silence dead_code
- tests: ensure init_repo sets HEAD to main so system-default
  branch names don't bleed into assertions

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

diff --git a/src/dag.rs b/src/dag.rs
index 063d825..e1ac63e 100644
--- a/src/dag.rs
+++ b/src/dag.rs
@@ -212,7 +212,13 @@ pub fn reconcile(
        return Ok((local_oid, ReconcileOutcome::AlreadyCurrent));
    }

    let merge_base = repo.merge_base(local_oid, remote_oid)?;
    let merge_base = repo.merge_base(local_oid, remote_oid).map_err(|e| {
        Error::DisjointHistories {
            local_ref: local_ref.to_string(),
            remote_ref: remote_ref.to_string(),
            detail: e.message().to_string(),
        }
    })?;

    if merge_base == remote_oid {
        // Remote is ancestor of local — local is ahead
@@ -258,7 +264,11 @@ pub fn reconcile(

/// Migrate a DAG ref so that every event with clock=0 gets a sequential clock
/// assigned in topological order. Events that already have clock>0 are left as-is.
/// This rewrites the commit chain (new OIDs) and updates the ref.
///
/// **WARNING**: This rewrites the commit chain, producing new OIDs for every
/// commit. If the ref has already been pushed to a remote, other users who
/// fetched the old OIDs will encounter disjoint histories on their next sync.
/// Only call this on refs that have not been shared.
pub fn migrate_clocks(
    repo: &Repository,
    ref_name: &str,
diff --git a/src/error.rs b/src/error.rs
index 9f3ea51..4443e62 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -40,4 +40,11 @@ pub enum Error {

    #[error("ambiguous id prefix '{prefix}': {count} matches")]
    AmbiguousId { prefix: String, count: usize },

    #[error("disjoint histories: local ref '{local_ref}' and remote ref '{remote_ref}' share no common ancestor ({detail})")]
    DisjointHistories {
        local_ref: String,
        remote_ref: String,
        detail: String,
    },
}
diff --git a/src/server/http/repo_list.rs b/src/server/http/repo_list.rs
index 111ea63..8c21d30 100644
--- a/src/server/http/repo_list.rs
+++ b/src/server/http/repo_list.rs
@@ -100,6 +100,7 @@ fn resolve_description(entry: &crate::repos::RepoEntry, repo: Option<&git2::Repo
        .unwrap_or_default()
}

#[cfg(test)]
fn read_description(entry: &crate::repos::RepoEntry) -> String {
    let repo = crate::repos::open(entry).ok();
    resolve_description(entry, repo.as_ref())
diff --git a/src/signing.rs b/src/signing.rs
index 883f1ff..b4eb9b9 100644
--- a/src/signing.rs
+++ b/src/signing.rs
@@ -139,11 +139,11 @@ pub fn load_verifying_key(config_dir: &Path) -> Result<VerifyingKey, Error> {
        .map_err(|e| Error::Verification(format!("invalid verifying key: {}", e)))
}

/// Serialize an Event to canonical JSON bytes.
/// Serialize an Event to canonical JSON bytes with deterministic key ordering.
///
/// Uses `serde_json::Value` as an intermediate step. Since serde_json uses
/// BTreeMap-backed Map (no `preserve_order` feature), keys are sorted
/// alphabetically, ensuring deterministic output.
/// Relies on `serde_json::Map` being backed by `BTreeMap` (sorted keys) when
/// the `preserve_order` feature is **not** enabled. A test below verifies this
/// invariant so we catch breakage from transitive feature activation.
pub fn canonical_json(event: &Event) -> Result<Vec<u8>, Error> {
    let value = serde_json::to_value(event)?;
    let json = serde_json::to_string(&value)?;
@@ -335,4 +335,33 @@ mod tests {
        let status = verify_detached(&event, &sig).unwrap();
        assert_eq!(status, VerifyStatus::Valid);
    }

    #[test]
    fn canonical_json_keys_are_sorted() {
        // Guard against serde_json's `preserve_order` feature being activated
        // by a transitive dependency, which would break signature determinism.
        let event = Event {
            timestamp: "2026-03-21T00:00:00Z".to_string(),
            author: Author {
                name: "Alice".to_string(),
                email: "alice@example.com".to_string(),
            },
            action: Action::IssueOpen {
                title: "Test".to_string(),
                body: "Body".to_string(),
                relates_to: None,
            },
            clock: 1,
        };
        let json = canonical_json(&event).unwrap();
        let parsed: serde_json::Value = serde_json::from_slice(&json).unwrap();
        if let serde_json::Value::Object(map) = parsed {
            let keys: Vec<&String> = map.keys().collect();
            let mut sorted = keys.clone();
            sorted.sort();
            assert_eq!(keys, sorted, "top-level JSON keys must be alphabetically sorted");
        } else {
            panic!("expected JSON object");
        }
    }
}
diff --git a/src/state.rs b/src/state.rs
index a57edd5..c642eed 100644
--- a/src/state.rs
+++ b/src/state.rs
@@ -1,5 +1,6 @@
use std::fmt;

use chrono::{DateTime, Utc};
use git2::{Oid, Repository};
use serde::{Deserialize, Serialize};

@@ -7,6 +8,14 @@ use crate::cache;
use crate::dag;
use crate::event::{Action, Author, ReviewVerdict};

/// Parse an RFC3339 timestamp, returning the earliest representable time on
/// failure so that unparseable timestamps sort before any valid one.
fn parse_timestamp(s: &str) -> DateTime<Utc> {
    DateTime::parse_from_rfc3339(s)
        .map(|dt| dt.with_timezone(&Utc))
        .unwrap_or(DateTime::<Utc>::MIN_UTC)
}

fn serialize_oid<S: serde::Serializer>(oid: &Oid, s: S) -> Result<S::Ok, S::Error> {
    s.serialize_str(&oid.to_string())
}
@@ -232,15 +241,16 @@ impl IssueState {
    ) -> Result<Self, crate::error::Error> {
        let events = dag::walk_events(repo, ref_name)?;
        let mut state: Option<IssueState> = None;
        let mut max_timestamp = String::new();
        let mut latest: Option<(DateTime<Utc>, String)> = None;

        // Track the (clock, commit_oid_hex) of the latest status-changing event.
        // Higher clock wins; on tie, lexicographically higher OID wins.
        let mut status_key: Option<(u64, String)> = None;

        for (oid, event) in events {
            if event.timestamp > max_timestamp {
                max_timestamp = event.timestamp.clone();
            let ts = parse_timestamp(&event.timestamp);
            if latest.as_ref().is_none_or(|(prev, _)| ts > *prev) {
                latest = Some((ts, event.timestamp.clone()));
            }
            match event.action {
                Action::IssueOpen { title, body, relates_to } => {
@@ -326,13 +336,12 @@ impl IssueState {
                        }
                    }
                }
                Action::Merge => {}
                _ => {}
            }
        }

        if let Some(ref mut s) = state {
            s.last_updated = max_timestamp;
            s.last_updated = latest.map(|(_, raw)| raw).unwrap_or_default();
        }
        state.ok_or_else(|| git2::Error::from_str("no IssueOpen event found in DAG").into())
    }
@@ -427,13 +436,14 @@ impl PatchState {
    ) -> Result<Self, crate::error::Error> {
        let events = dag::walk_events(repo, ref_name)?;
        let mut state: Option<PatchState> = None;
        let mut max_timestamp = String::new();
        let mut latest: Option<(DateTime<Utc>, String)> = None;

        let mut status_key: Option<(u64, String)> = None;

        for (oid, event) in events {
            if event.timestamp > max_timestamp {
                max_timestamp = event.timestamp.clone();
            let ts = parse_timestamp(&event.timestamp);
            if latest.as_ref().is_none_or(|(prev, _)| ts > *prev) {
                latest = Some((ts, event.timestamp.clone()));
            }
            match event.action {
                Action::PatchCreate {
@@ -538,13 +548,12 @@ impl PatchState {
                        }
                    }
                }
                Action::Merge => {}
                _ => {}
            }
        }

        if let Some(ref mut s) = state {
            s.last_updated = max_timestamp;
            s.last_updated = latest.map(|(_, raw)| raw).unwrap_or_default();
            s.check_auto_merge(repo);
        }
        state.ok_or_else(|| git2::Error::from_str("no PatchCreate event found in DAG").into())
@@ -635,7 +644,11 @@ pub fn list_issues(repo: &Repository) -> Result<Vec<IssueState>, crate::error::E
    let items = collab_refs(repo, "issues")?
        .into_iter()
        .filter(|(_, id)| !archived_ids.contains(id))
        .filter_map(|(ref_name, id)| IssueState::from_ref(repo, &ref_name, &id).ok())
        .filter_map(|(ref_name, id)| {
            IssueState::from_ref(repo, &ref_name, &id)
                .inspect_err(|e| eprintln!("warning: skipping issue {:.8}: {}", id, e))
                .ok()
        })
        .collect();
    Ok(items)
}
@@ -649,7 +662,11 @@ pub fn list_patches(repo: &Repository) -> Result<Vec<PatchState>, crate::error::
    let items = collab_refs(repo, "patches")?
        .into_iter()
        .filter(|(_, id)| !archived_ids.contains(id))
        .filter_map(|(ref_name, id)| PatchState::from_ref(repo, &ref_name, &id).ok())
        .filter_map(|(ref_name, id)| {
            PatchState::from_ref(repo, &ref_name, &id)
                .inspect_err(|e| eprintln!("warning: skipping patch {:.8}: {}", id, e))
                .ok()
        })
        .collect();
    Ok(items)
}
@@ -663,15 +680,17 @@ pub fn list_issues_with_archived(repo: &Repository) -> Result<Vec<IssueState>, c
    // Archived first so they take priority
    for (ref_name, id) in collab_archive_refs(repo, "issues")? {
        if seen.insert(id.clone()) {
            if let Ok(state) = IssueState::from_ref(repo, &ref_name, &id) {
                items.push(state);
            match IssueState::from_ref(repo, &ref_name, &id) {
                Ok(state) => items.push(state),
                Err(e) => eprintln!("warning: skipping issue {:.8}: {}", id, e),
            }
        }
    }
    for (ref_name, id) in collab_refs(repo, "issues")? {
        if seen.insert(id.clone()) {
            if let Ok(state) = IssueState::from_ref(repo, &ref_name, &id) {
                items.push(state);
            match IssueState::from_ref(repo, &ref_name, &id) {
                Ok(state) => items.push(state),
                Err(e) => eprintln!("warning: skipping issue {:.8}: {}", id, e),
            }
        }
    }
@@ -686,15 +705,17 @@ pub fn list_patches_with_archived(repo: &Repository) -> Result<Vec<PatchState>, 

    for (ref_name, id) in collab_archive_refs(repo, "patches")? {
        if seen.insert(id.clone()) {
            if let Ok(state) = PatchState::from_ref(repo, &ref_name, &id) {
                items.push(state);
            match PatchState::from_ref(repo, &ref_name, &id) {
                Ok(state) => items.push(state),
                Err(e) => eprintln!("warning: skipping patch {:.8}: {}", id, e),
            }
        }
    }
    for (ref_name, id) in collab_refs(repo, "patches")? {
        if seen.insert(id.clone()) {
            if let Ok(state) = PatchState::from_ref(repo, &ref_name, &id) {
                items.push(state);
            match PatchState::from_ref(repo, &ref_name, &id) {
                Ok(state) => items.push(state),
                Err(e) => eprintln!("warning: skipping patch {:.8}: {}", id, e),
            }
        }
    }
diff --git a/src/sync.rs b/src/sync.rs
index da0c0ed..bf2cbe5 100644
--- a/src/sync.rs
+++ b/src/sync.rs
@@ -147,9 +147,50 @@ impl SyncState {
// Push helpers (T003, T004)
// ---------------------------------------------------------------------------

/// Push a single ref to the remote. Returns a RefPushResult.
fn push_ref(workdir: &Path, remote_name: &str, ref_name: &str) -> RefPushResult {
    let refspec = format!("+{}:{}", ref_name, ref_name);
/// Push multiple refs to the remote in a single `git push` invocation.
///
/// Uses non-force refspecs so the remote will reject the push if it has
/// commits we haven't reconciled. This prevents silently discarding events
/// that another pusher added between our fetch and push.
///
/// On success, all refs are marked as Pushed. On failure, falls back to
/// per-ref pushes to determine which specific refs failed.
fn push_refs_batched(workdir: &Path, remote_name: &str, refs: &[String]) -> Vec<RefPushResult> {
    if refs.is_empty() {
        return Vec::new();
    }

    let refspecs: Vec<String> = refs.iter().map(|r| format!("{}:{}", r, r)).collect();
    let mut args = vec!["push", remote_name];
    args.extend(refspecs.iter().map(|s| s.as_str()));

    match Command::new("git")
        .args(&args)
        .current_dir(workdir)
        .output()
    {
        Ok(output) if output.status.success() => {
            // All refs pushed successfully
            refs.iter()
                .map(|r| RefPushResult {
                    ref_name: r.clone(),
                    status: PushStatus::Pushed,
                    error: None,
                })
                .collect()
        }
        Ok(_) | Err(_) => {
            // Batch failed — retry each ref individually to isolate failures
            refs.iter()
                .map(|ref_name| push_ref_single(workdir, remote_name, ref_name))
                .collect()
        }
    }
}

/// Push a single ref to the remote. Used as a fallback when batched push fails.
fn push_ref_single(workdir: &Path, remote_name: &str, ref_name: &str) -> RefPushResult {
    let refspec = format!("{}:{}", ref_name, ref_name);
    match Command::new("git")
        .args(["push", remote_name, &refspec])
        .current_dir(workdir)
@@ -314,7 +355,10 @@ pub fn sync(repo: &Repository, remote_name: &str) -> Result<(), Error> {
    }

    // Step 2: Reconcile
    // Re-open repo to see the fetched refs (git2 caches ref state)
    // Re-open the repo because git2 caches the ref list at open time.
    // The `git fetch` above wrote new refs to disk, but the original
    // `repo` handle won't see them. `repo.path()` returns the `.git`
    // directory, which is the correct argument to `Repository::open`.
    let repo = Repository::open(repo.path())?;
    let sk = signing::load_signing_key(&signing::signing_key_dir()?)?;
    reconcile_refs(&repo, "issues", &author, &sk)?;
@@ -327,7 +371,7 @@ pub fn sync(repo: &Repository, remote_name: &str) -> Result<(), Error> {
    if refs_to_push.is_empty() {
        println!("Nothing to push.");
    } else {
        let sync_result = push_refs_individually(&workdir, remote_name, &refs_to_push);
        let sync_result = push_refs(&workdir, remote_name, &refs_to_push);

        if !sync_result.is_complete() {
            // Save state for resume
@@ -393,7 +437,7 @@ fn sync_resume(

    // Push the pending refs
    let ref_names: Vec<String> = state.pending_refs.iter().map(|(r, _)| r.clone()).collect();
    let sync_result = push_refs_individually(workdir, remote_name, &ref_names);
    let sync_result = push_refs(workdir, remote_name, &ref_names);

    if sync_result.is_complete() {
        // All pending refs pushed successfully
@@ -437,22 +481,20 @@ fn sync_resume(
    }
}

/// Push refs one at a time, printing per-ref status, and return aggregated results.
fn push_refs_individually(workdir: &Path, remote_name: &str, refs: &[String]) -> SyncResult {
    let mut results = Vec::new();
    for ref_name in refs {
        let result = push_ref(workdir, remote_name, ref_name);
/// Push all refs in a single batch, printing per-ref status, and return aggregated results.
fn push_refs(workdir: &Path, remote_name: &str, refs: &[String]) -> SyncResult {
    let results = push_refs_batched(workdir, remote_name, refs);
    for result in &results {
        match &result.status {
            PushStatus::Pushed => println!("  Pushed {}", ref_name),
            PushStatus::Pushed => println!("  Pushed {}", result.ref_name),
            PushStatus::Failed => {
                eprintln!(
                    "  FAILED {}: {}",
                    ref_name,
                    result.ref_name,
                    result.error.as_deref().unwrap_or("unknown error")
                );
            }
        }
        results.push(result);
    }
    SyncResult {
        results,
diff --git a/tests/collab_test.rs b/tests/collab_test.rs
index 81401da..6f2f3d1 100644
--- a/tests/collab_test.rs
+++ b/tests/collab_test.rs
@@ -10,7 +10,7 @@ use git_collab::state::{self, IssueState, IssueStatus, PatchState, PatchStatus};

use common::{
    add_comment, add_review, alice, bob, close_issue, create_patch, init_repo, now, open_issue,
    reopen_issue, setup_signing_key, test_signing_key,
    reopen_issue, setup_signing_key, test_signing_key, ScopedTestConfig,
};

// ---------------------------------------------------------------------------
@@ -752,6 +752,8 @@ use git_collab::patch;
#[test]
fn test_create_patch_from_branch_populates_branch_field() {
    // T009: creating a patch from current branch populates `branch` field
    let cfg = ScopedTestConfig::new();
    cfg.ensure_signing_key();
    let tmp = TempDir::new().unwrap();
    let repo = init_repo(tmp.path(), &alice());
    make_initial_commit(&repo, "main");
@@ -772,6 +774,8 @@ fn test_create_patch_from_branch_populates_branch_field() {
#[test]
fn test_create_duplicate_patch_for_same_branch_returns_error() {
    // T011: creating a duplicate patch for same branch returns error
    let cfg = ScopedTestConfig::new();
    cfg.ensure_signing_key();
    let tmp = TempDir::new().unwrap();
    let repo = init_repo(tmp.path(), &alice());
    make_initial_commit(&repo, "main");
@@ -939,6 +943,8 @@ fn test_auto_detect_merged_patch_via_git_merge() {
    // When a user merges the patch branch into the base branch manually
    // (using git merge), PatchState should auto-detect that the patch
    // is merged without needing `patch merge`.
    let cfg = ScopedTestConfig::new();
    cfg.ensure_signing_key();
    let tmp = TempDir::new().unwrap();
    let repo = init_repo(tmp.path(), &alice());
    make_initial_commit(&repo, "main");
@@ -968,6 +974,8 @@ fn test_auto_detect_merged_patch_via_git_merge() {
fn test_auto_detect_merged_patch_deleted_branch() {
    // If the patch branch was deleted after a manual merge,
    // auto-detection should not crash — patch stays Open.
    let cfg = ScopedTestConfig::new();
    cfg.ensure_signing_key();
    let tmp = TempDir::new().unwrap();
    let repo = init_repo(tmp.path(), &alice());
    make_initial_commit(&repo, "main");
@@ -992,6 +1000,8 @@ fn test_cache_does_not_defeat_auto_detect_merge() {
    // Regression: from_ref() returned cached Open status even after the
    // patch branch was merged into main, because the cache hit bypassed
    // the auto-detect merge logic that only ran in from_ref_uncached().
    let cfg = ScopedTestConfig::new();
    cfg.ensure_signing_key();
    let tmp = TempDir::new().unwrap();
    let repo = init_repo(tmp.path(), &alice());
    make_initial_commit(&repo, "main");
diff --git a/tests/common/mod.rs b/tests/common/mod.rs
index 14be07c..8801e5d 100644
--- a/tests/common/mod.rs
+++ b/tests/common/mod.rs
@@ -177,6 +177,8 @@ impl Drop for ScopedTestConfig {
/// and an initial empty commit on `main`.
pub fn init_repo(dir: &Path, author: &Author) -> Repository {
    let repo = Repository::init(dir).expect("init repo");
    // Ensure HEAD points to main regardless of the system's default branch name
    repo.set_head("refs/heads/main").expect("set HEAD to main");
    {
        let mut config = repo.config().unwrap();
        config.set_str("user.name", &author.name).unwrap();