a73x

6df0725b

Add advisory lockfile to prevent concurrent sync operations

a73x   2026-03-21 13:36

New sync_lock module with RAII-based locking:
- Atomic lock creation via OpenOptions::create_new
- PID + timestamp stored in .git/collab/sync.lock
- Stale lock detection via kill -0 with 10-minute threshold
- Automatic cleanup on drop, race-safe retry on stale removal
- Human-readable lock age in error messages

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

diff --git a/src/error.rs b/src/error.rs
index 2295ef2..ce300ea 100644
--- a/src/error.rs
+++ b/src/error.rs
@@ -25,4 +25,7 @@ pub enum Error {

    #[error("untrusted key: {0}")]
    UntrustedKey(String),

    #[error("another sync is in progress (pid {pid}, started {since})")]
    SyncLocked { pid: u32, since: String },
}
diff --git a/src/lib.rs b/src/lib.rs
index 56dd0d2..08b1c3e 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -9,6 +9,7 @@ pub mod patch;
pub mod state;
pub mod signing;
pub mod sync;
pub mod sync_lock;
pub mod trust;
pub mod tui;

diff --git a/src/sync.rs b/src/sync.rs
index 8cd7db1..f6d9b10 100644
--- a/src/sync.rs
+++ b/src/sync.rs
@@ -6,6 +6,7 @@ use crate::dag;
use crate::error::Error;
use crate::identity::get_author;
use crate::signing;
use crate::sync_lock::SyncLock;
use crate::trust;

/// Add collab refspecs to all remotes.
@@ -26,6 +27,9 @@ pub fn init(repo: &Repository) -> Result<(), Error> {

/// Sync with a specific remote: fetch, reconcile, push.
pub fn sync(repo: &Repository, remote_name: &str) -> Result<(), Error> {
    // Acquire advisory lock — held until _lock is dropped (RAII)
    let _lock = SyncLock::acquire(repo)?;

    let author = get_author(repo)?;
    let workdir = repo.path().parent().unwrap_or(repo.path()).to_path_buf();

diff --git a/src/sync_lock.rs b/src/sync_lock.rs
new file mode 100644
index 0000000..ed06770
--- /dev/null
+++ b/src/sync_lock.rs
@@ -0,0 +1,203 @@
use std::fs::{self, OpenOptions};
use std::io::Write;
use std::path::PathBuf;
use std::process::Command;

use serde::{Deserialize, Serialize};

use crate::error::Error;

/// Default staleness threshold in minutes.
const STALE_THRESHOLD_MINUTES: u64 = 10;

/// Metadata stored in the lockfile for stale detection and error reporting.
#[derive(Debug, Serialize, Deserialize)]
pub struct SyncLockInfo {
    pub pid: u32,
    pub timestamp: String,
}

impl SyncLockInfo {
    /// Serialize to JSON string.
    pub fn to_json(&self) -> String {
        serde_json::to_string(self).expect("SyncLockInfo serialization should not fail")
    }

    /// Deserialize from JSON string.
    pub fn from_json(json: &str) -> Result<Self, Error> {
        Ok(serde_json::from_str(json)?)
    }
}

/// RAII guard that owns the lockfile. Deletes the lockfile on drop.
#[derive(Debug)]
pub struct SyncLock {
    pub lock_path: PathBuf,
}

impl SyncLock {
    /// Attempt to acquire the sync lock for the given repository.
    ///
    /// Creates `.git/collab/sync.lock` atomically using `create_new(true)`.
    /// If the lockfile already exists, checks for staleness before returning an error.
    pub fn acquire(repo: &git2::Repository) -> Result<Self, Error> {
        let collab_dir = repo.path().join("collab");
        fs::create_dir_all(&collab_dir)?;
        let lock_path = collab_dir.join("sync.lock");

        match Self::try_create_lock(&lock_path) {
            Ok(lock) => Ok(lock),
            Err(_) => {
                // Lockfile exists — check if stale
                Self::handle_existing_lock(&lock_path)
            }
        }
    }

    /// Try to atomically create the lockfile. Returns Err on any failure.
    fn try_create_lock(lock_path: &PathBuf) -> Result<Self, Error> {
        let info = SyncLockInfo {
            pid: std::process::id(),
            timestamp: chrono::Utc::now().to_rfc3339(),
        };
        let json = info.to_json();

        let mut file = OpenOptions::new()
            .write(true)
            .create_new(true)
            .open(lock_path)?;
        file.write_all(json.as_bytes())?;
        file.flush()?;

        Ok(SyncLock {
            lock_path: lock_path.clone(),
        })
    }

    /// Handle the case where a lockfile already exists.
    /// Check staleness and either clean up and retry, or return SyncLocked error.
    fn handle_existing_lock(lock_path: &PathBuf) -> Result<Self, Error> {
        let content = match fs::read_to_string(lock_path) {
            Ok(c) => c,
            Err(_) => {
                // Can't read — treat as corrupted/stale
                Self::remove_stale_and_retry(lock_path)?;
                return Self::try_create_or_contention(lock_path);
            }
        };

        let info = match SyncLockInfo::from_json(&content) {
            Ok(info) => info,
            Err(_) => {
                // Corrupted JSON — treat as stale
                Self::remove_stale_and_retry(lock_path)?;
                return Self::try_create_or_contention(lock_path);
            }
        };

        if is_stale(&info, STALE_THRESHOLD_MINUTES) {
            Self::remove_stale_and_retry(lock_path)?;
            return Self::try_create_or_contention(lock_path);
        }

        // Lock is held by a live process — return error with enhanced message
        Err(Error::SyncLocked {
            pid: info.pid,
            since: format!(
                "{} — wait for it to finish, or remove .git/collab/sync.lock if the process is no longer running",
                format_lock_age(&info.timestamp)
            ),
        })
    }

    /// Remove the stale lockfile. Ignores errors if the file is already gone.
    fn remove_stale_and_retry(lock_path: &PathBuf) -> Result<(), Error> {
        eprintln!("Removing stale sync lock...");
        let _ = fs::remove_file(lock_path);
        Ok(())
    }

    /// Try to create the lock; if it fails (race with another process), read the
    /// new holder's info and return SyncLocked.
    fn try_create_or_contention(lock_path: &PathBuf) -> Result<Self, Error> {
        match Self::try_create_lock(lock_path) {
            Ok(lock) => Ok(lock),
            Err(_) => {
                // Another process won the race — read the new lock info
                let content = fs::read_to_string(lock_path).unwrap_or_default();
                let info = SyncLockInfo::from_json(&content).unwrap_or(SyncLockInfo {
                    pid: 0,
                    timestamp: String::new(),
                });
                Err(Error::SyncLocked {
                    pid: info.pid,
                    since: format!(
                        "{} — wait for it to finish, or remove .git/collab/sync.lock if the process is no longer running",
                        format_lock_age(&info.timestamp)
                    ),
                })
            }
        }
    }
}

impl Drop for SyncLock {
    fn drop(&mut self) {
        let _ = fs::remove_file(&self.lock_path);
    }
}

/// Check if a process with the given PID is alive using `kill -0`.
pub fn is_process_alive(pid: u32) -> bool {
    Command::new("kill")
        .args(["-0", &pid.to_string()])
        .stdout(std::process::Stdio::null())
        .stderr(std::process::Stdio::null())
        .status()
        .map(|s| s.success())
        .unwrap_or(false)
}

/// Check if a lock is stale based on PID liveness and timestamp age.
pub fn is_stale(info: &SyncLockInfo, threshold_minutes: u64) -> bool {
    // If PID is dead, it's stale
    if !is_process_alive(info.pid) {
        return true;
    }

    // If the lock is older than the threshold, treat as stale (PID reuse protection)
    if let Ok(lock_time) = chrono::DateTime::parse_from_rfc3339(&info.timestamp) {
        let age = chrono::Utc::now() - lock_time.with_timezone(&chrono::Utc);
        if age.num_minutes() >= threshold_minutes as i64 {
            return true;
        }
    } else {
        // Can't parse timestamp — treat as stale
        return true;
    }

    false
}

/// Convert an RFC 3339 timestamp to a human-readable duration (e.g., "3 seconds ago").
pub fn format_lock_age(timestamp: &str) -> String {
    let lock_time = match chrono::DateTime::parse_from_rfc3339(timestamp) {
        Ok(t) => t,
        Err(_) => return format!("unknown time ({})", timestamp),
    };

    let age = chrono::Utc::now() - lock_time.with_timezone(&chrono::Utc);
    let secs = age.num_seconds();

    if secs < 0 {
        "just now".to_string()
    } else if secs < 60 {
        format!("{} second{} ago", secs, if secs == 1 { "" } else { "s" })
    } else if secs < 3600 {
        let mins = secs / 60;
        format!("{} minute{} ago", mins, if mins == 1 { "" } else { "s" })
    } else {
        let hours = secs / 3600;
        format!("{} hour{} ago", hours, if hours == 1 { "" } else { "s" })
    }
}
diff --git a/tests/sync_lock_test.rs b/tests/sync_lock_test.rs
new file mode 100644
index 0000000..923adfd
--- /dev/null
+++ b/tests/sync_lock_test.rs
@@ -0,0 +1,256 @@
mod common;

use std::fs;
use std::path::{Path, PathBuf};

use tempfile::TempDir;

use git_collab::error::Error;
use git_collab::sync_lock::{SyncLock, SyncLockInfo};

// ===========================================================================
// Test helpers
// ===========================================================================

/// Return the `.git/collab/` path for a temp repo, creating it if needed.
fn collab_dir(repo_path: &Path) -> PathBuf {
    let dir = repo_path.join(".git").join("collab");
    fs::create_dir_all(&dir).unwrap();
    dir
}

/// Write a lockfile with the given PID and timestamp.
fn write_lockfile(collab_dir: &Path, pid: u32, timestamp: &str) {
    let info = SyncLockInfo {
        pid,
        timestamp: timestamp.to_string(),
    };
    let lock_path = collab_dir.join("sync.lock");
    fs::write(&lock_path, info.to_json()).unwrap();
}

// ===========================================================================
// Phase 2: Foundational tests — serialization
// ===========================================================================

#[test]
fn test_sync_lock_info_roundtrip() {
    let info = SyncLockInfo {
        pid: 12345,
        timestamp: "2026-03-21T10:30:00+00:00".to_string(),
    };
    let json = info.to_json();
    let parsed = SyncLockInfo::from_json(&json).unwrap();
    assert_eq!(parsed.pid, 12345);
    assert_eq!(parsed.timestamp, "2026-03-21T10:30:00+00:00");
}

#[test]
fn test_sync_lock_info_from_invalid_json() {
    let result = SyncLockInfo::from_json("not json");
    assert!(result.is_err());
}

// ===========================================================================
// Phase 3: User Story 1 — Prevent Concurrent Sync Corruption
// ===========================================================================

// T006: Test that SyncLock::acquire creates lockfile with correct PID and timestamp
#[test]
fn test_acquire_creates_lockfile() {
    let dir = TempDir::new().unwrap();
    let repo = git2::Repository::init(dir.path()).unwrap();

    let lock = SyncLock::acquire(&repo).unwrap();

    assert!(lock.lock_path.exists(), "lockfile should exist after acquire");

    let content = fs::read_to_string(&lock.lock_path).unwrap();
    let info = SyncLockInfo::from_json(&content).unwrap();
    assert_eq!(info.pid, std::process::id());
    // Timestamp should be a valid RFC 3339 string
    chrono::DateTime::parse_from_rfc3339(&info.timestamp)
        .expect("timestamp should be valid RFC 3339");
}

// T007: Test that SyncLock::acquire returns SyncLocked when lockfile exists with live PID
#[test]
fn test_acquire_returns_sync_locked_when_lock_held() {
    let dir = TempDir::new().unwrap();
    let repo = git2::Repository::init(dir.path()).unwrap();

    // Write a lockfile with our own PID (guaranteed alive)
    let collab = collab_dir(dir.path());
    let current_pid = std::process::id();
    write_lockfile(&collab, current_pid, &chrono::Utc::now().to_rfc3339());

    let result = SyncLock::acquire(&repo);
    assert!(result.is_err(), "acquire should fail when lock is held");

    match result.unwrap_err() {
        Error::SyncLocked { pid, since } => {
            assert_eq!(pid, current_pid);
            assert!(!since.is_empty());
        }
        other => panic!("expected SyncLocked error, got: {}", other),
    }
}

// T008: Test that dropping SyncLock deletes the lockfile
#[test]
fn test_drop_deletes_lockfile() {
    let dir = TempDir::new().unwrap();
    let repo = git2::Repository::init(dir.path()).unwrap();

    let lock = SyncLock::acquire(&repo).unwrap();
    let lock_path = lock.lock_path.clone();
    assert!(lock_path.exists());

    drop(lock);
    assert!(!lock_path.exists(), "lockfile should be deleted after drop");
}

// ===========================================================================
// Phase 4: User Story 2 — Clear Error Message
// ===========================================================================

// T015: Test that SyncLocked error message includes PID and human-readable lock age
#[test]
fn test_sync_locked_error_message_includes_pid_and_age() {
    let dir = TempDir::new().unwrap();
    let repo = git2::Repository::init(dir.path()).unwrap();

    let collab = collab_dir(dir.path());
    let current_pid = std::process::id();
    // Lock from 3 seconds ago
    let ts = (chrono::Utc::now() - chrono::Duration::seconds(3)).to_rfc3339();
    write_lockfile(&collab, current_pid, &ts);

    let err = SyncLock::acquire(&repo).unwrap_err();
    let msg = err.to_string();

    assert!(
        msg.contains(&current_pid.to_string()),
        "error should contain PID, got: {}",
        msg
    );
    // Should contain human-readable age
    assert!(
        msg.contains("ago"),
        "error should contain human-readable age, got: {}",
        msg
    );
}

// T016: Test that error message suggests waiting or checking process
#[test]
fn test_sync_locked_error_message_suggests_action() {
    let dir = TempDir::new().unwrap();
    let repo = git2::Repository::init(dir.path()).unwrap();

    let collab = collab_dir(dir.path());
    let current_pid = std::process::id();
    write_lockfile(&collab, current_pid, &chrono::Utc::now().to_rfc3339());

    let err = SyncLock::acquire(&repo).unwrap_err();
    let msg = err.to_string();

    assert!(
        msg.contains("wait") || msg.contains("remove"),
        "error should suggest action, got: {}",
        msg
    );
}

// ===========================================================================
// Phase 5: User Story 3 — Stale Lock Recovery
// ===========================================================================

// T019: Test that acquire succeeds when lockfile exists with a dead PID
#[test]
fn test_acquire_succeeds_with_dead_pid() {
    let dir = TempDir::new().unwrap();
    let repo = git2::Repository::init(dir.path()).unwrap();

    let collab = collab_dir(dir.path());
    // PID 999999 is almost certainly not running
    write_lockfile(&collab, 999999, &chrono::Utc::now().to_rfc3339());

    let lock = SyncLock::acquire(&repo);
    assert!(lock.is_ok(), "acquire should succeed with dead PID: {:?}", lock.err());

    // The new lock should be ours
    let lock = lock.unwrap();
    let content = fs::read_to_string(&lock.lock_path).unwrap();
    let info = SyncLockInfo::from_json(&content).unwrap();
    assert_eq!(info.pid, std::process::id());
}

// T020: Test that acquire succeeds when lockfile is older than 10 minutes
#[test]
fn test_acquire_succeeds_with_old_lock() {
    let dir = TempDir::new().unwrap();
    let repo = git2::Repository::init(dir.path()).unwrap();

    let collab = collab_dir(dir.path());
    // Lock from 11 minutes ago with our own PID (alive but stale by age)
    let old_ts = (chrono::Utc::now() - chrono::Duration::minutes(11)).to_rfc3339();
    write_lockfile(&collab, std::process::id(), &old_ts);

    let lock = SyncLock::acquire(&repo);
    assert!(
        lock.is_ok(),
        "acquire should succeed with stale (old) lock: {:?}",
        lock.err()
    );
}

// T021: Test that acquire succeeds when lockfile contains invalid JSON
#[test]
fn test_acquire_succeeds_with_corrupted_lockfile() {
    let dir = TempDir::new().unwrap();
    let repo = git2::Repository::init(dir.path()).unwrap();

    let collab = collab_dir(dir.path());
    let lock_path = collab.join("sync.lock");
    fs::write(&lock_path, "this is not json at all").unwrap();

    let lock = SyncLock::acquire(&repo);
    assert!(
        lock.is_ok(),
        "acquire should succeed with corrupted lockfile: {:?}",
        lock.err()
    );
}

// T022: Test format_lock_age helper
#[test]
fn test_format_lock_age() {
    use git_collab::sync_lock::format_lock_age;

    let now = chrono::Utc::now();
    let three_sec_ago = (now - chrono::Duration::seconds(3)).to_rfc3339();
    let result = format_lock_age(&three_sec_ago);
    assert!(result.contains("second"), "expected 'second' in: {}", result);
    assert!(result.contains("ago"), "expected 'ago' in: {}", result);

    let two_min_ago = (now - chrono::Duration::minutes(2)).to_rfc3339();
    let result = format_lock_age(&two_min_ago);
    assert!(result.contains("minute"), "expected 'minute' in: {}", result);

    // Invalid timestamp should fall back gracefully
    let result = format_lock_age("not-a-timestamp");
    assert!(!result.is_empty());
}

// T023: Test is_process_alive
#[test]
fn test_is_process_alive() {
    use git_collab::sync_lock::is_process_alive;

    // Our own process should be alive
    assert!(is_process_alive(std::process::id()));

    // PID 999999 should not be alive (almost certainly)
    assert!(!is_process_alive(999999));
}