6df0725b
Add advisory lockfile to prevent concurrent sync operations
a73x 2026-03-21 13:36
New sync_lock module with RAII-based locking: - Atomic lock creation via OpenOptions::create_new - PID + timestamp stored in .git/collab/sync.lock - Stale lock detection via kill -0 with 10-minute threshold - Automatic cleanup on drop, race-safe retry on stale removal - Human-readable lock age in error messages Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
diff --git a/src/error.rs b/src/error.rs index 2295ef2..ce300ea 100644 --- a/src/error.rs +++ b/src/error.rs @@ -25,4 +25,7 @@ pub enum Error { #[error("untrusted key: {0}")] UntrustedKey(String), #[error("another sync is in progress (pid {pid}, started {since})")] SyncLocked { pid: u32, since: String }, } diff --git a/src/lib.rs b/src/lib.rs index 56dd0d2..08b1c3e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,7 @@ pub mod patch; pub mod state; pub mod signing; pub mod sync; pub mod sync_lock; pub mod trust; pub mod tui; diff --git a/src/sync.rs b/src/sync.rs index 8cd7db1..f6d9b10 100644 --- a/src/sync.rs +++ b/src/sync.rs @@ -6,6 +6,7 @@ use crate::dag; use crate::error::Error; use crate::identity::get_author; use crate::signing; use crate::sync_lock::SyncLock; use crate::trust; /// Add collab refspecs to all remotes. @@ -26,6 +27,9 @@ pub fn init(repo: &Repository) -> Result<(), Error> { /// Sync with a specific remote: fetch, reconcile, push. pub fn sync(repo: &Repository, remote_name: &str) -> Result<(), Error> { // Acquire advisory lock — held until _lock is dropped (RAII) let _lock = SyncLock::acquire(repo)?; let author = get_author(repo)?; let workdir = repo.path().parent().unwrap_or(repo.path()).to_path_buf(); diff --git a/src/sync_lock.rs b/src/sync_lock.rs new file mode 100644 index 0000000..ed06770 --- /dev/null +++ b/src/sync_lock.rs @@ -0,0 +1,203 @@ use std::fs::{self, OpenOptions}; use std::io::Write; use std::path::PathBuf; use std::process::Command; use serde::{Deserialize, Serialize}; use crate::error::Error; /// Default staleness threshold in minutes. const STALE_THRESHOLD_MINUTES: u64 = 10; /// Metadata stored in the lockfile for stale detection and error reporting. #[derive(Debug, Serialize, Deserialize)] pub struct SyncLockInfo { pub pid: u32, pub timestamp: String, } impl SyncLockInfo { /// Serialize to JSON string. pub fn to_json(&self) -> String { serde_json::to_string(self).expect("SyncLockInfo serialization should not fail") } /// Deserialize from JSON string. pub fn from_json(json: &str) -> Result<Self, Error> { Ok(serde_json::from_str(json)?) } } /// RAII guard that owns the lockfile. Deletes the lockfile on drop. #[derive(Debug)] pub struct SyncLock { pub lock_path: PathBuf, } impl SyncLock { /// Attempt to acquire the sync lock for the given repository. /// /// Creates `.git/collab/sync.lock` atomically using `create_new(true)`. /// If the lockfile already exists, checks for staleness before returning an error. pub fn acquire(repo: &git2::Repository) -> Result<Self, Error> { let collab_dir = repo.path().join("collab"); fs::create_dir_all(&collab_dir)?; let lock_path = collab_dir.join("sync.lock"); match Self::try_create_lock(&lock_path) { Ok(lock) => Ok(lock), Err(_) => { // Lockfile exists — check if stale Self::handle_existing_lock(&lock_path) } } } /// Try to atomically create the lockfile. Returns Err on any failure. fn try_create_lock(lock_path: &PathBuf) -> Result<Self, Error> { let info = SyncLockInfo { pid: std::process::id(), timestamp: chrono::Utc::now().to_rfc3339(), }; let json = info.to_json(); let mut file = OpenOptions::new() .write(true) .create_new(true) .open(lock_path)?; file.write_all(json.as_bytes())?; file.flush()?; Ok(SyncLock { lock_path: lock_path.clone(), }) } /// Handle the case where a lockfile already exists. /// Check staleness and either clean up and retry, or return SyncLocked error. fn handle_existing_lock(lock_path: &PathBuf) -> Result<Self, Error> { let content = match fs::read_to_string(lock_path) { Ok(c) => c, Err(_) => { // Can't read — treat as corrupted/stale Self::remove_stale_and_retry(lock_path)?; return Self::try_create_or_contention(lock_path); } }; let info = match SyncLockInfo::from_json(&content) { Ok(info) => info, Err(_) => { // Corrupted JSON — treat as stale Self::remove_stale_and_retry(lock_path)?; return Self::try_create_or_contention(lock_path); } }; if is_stale(&info, STALE_THRESHOLD_MINUTES) { Self::remove_stale_and_retry(lock_path)?; return Self::try_create_or_contention(lock_path); } // Lock is held by a live process — return error with enhanced message Err(Error::SyncLocked { pid: info.pid, since: format!( "{} — wait for it to finish, or remove .git/collab/sync.lock if the process is no longer running", format_lock_age(&info.timestamp) ), }) } /// Remove the stale lockfile. Ignores errors if the file is already gone. fn remove_stale_and_retry(lock_path: &PathBuf) -> Result<(), Error> { eprintln!("Removing stale sync lock..."); let _ = fs::remove_file(lock_path); Ok(()) } /// Try to create the lock; if it fails (race with another process), read the /// new holder's info and return SyncLocked. fn try_create_or_contention(lock_path: &PathBuf) -> Result<Self, Error> { match Self::try_create_lock(lock_path) { Ok(lock) => Ok(lock), Err(_) => { // Another process won the race — read the new lock info let content = fs::read_to_string(lock_path).unwrap_or_default(); let info = SyncLockInfo::from_json(&content).unwrap_or(SyncLockInfo { pid: 0, timestamp: String::new(), }); Err(Error::SyncLocked { pid: info.pid, since: format!( "{} — wait for it to finish, or remove .git/collab/sync.lock if the process is no longer running", format_lock_age(&info.timestamp) ), }) } } } } impl Drop for SyncLock { fn drop(&mut self) { let _ = fs::remove_file(&self.lock_path); } } /// Check if a process with the given PID is alive using `kill -0`. pub fn is_process_alive(pid: u32) -> bool { Command::new("kill") .args(["-0", &pid.to_string()]) .stdout(std::process::Stdio::null()) .stderr(std::process::Stdio::null()) .status() .map(|s| s.success()) .unwrap_or(false) } /// Check if a lock is stale based on PID liveness and timestamp age. pub fn is_stale(info: &SyncLockInfo, threshold_minutes: u64) -> bool { // If PID is dead, it's stale if !is_process_alive(info.pid) { return true; } // If the lock is older than the threshold, treat as stale (PID reuse protection) if let Ok(lock_time) = chrono::DateTime::parse_from_rfc3339(&info.timestamp) { let age = chrono::Utc::now() - lock_time.with_timezone(&chrono::Utc); if age.num_minutes() >= threshold_minutes as i64 { return true; } } else { // Can't parse timestamp — treat as stale return true; } false } /// Convert an RFC 3339 timestamp to a human-readable duration (e.g., "3 seconds ago"). pub fn format_lock_age(timestamp: &str) -> String { let lock_time = match chrono::DateTime::parse_from_rfc3339(timestamp) { Ok(t) => t, Err(_) => return format!("unknown time ({})", timestamp), }; let age = chrono::Utc::now() - lock_time.with_timezone(&chrono::Utc); let secs = age.num_seconds(); if secs < 0 { "just now".to_string() } else if secs < 60 { format!("{} second{} ago", secs, if secs == 1 { "" } else { "s" }) } else if secs < 3600 { let mins = secs / 60; format!("{} minute{} ago", mins, if mins == 1 { "" } else { "s" }) } else { let hours = secs / 3600; format!("{} hour{} ago", hours, if hours == 1 { "" } else { "s" }) } } diff --git a/tests/sync_lock_test.rs b/tests/sync_lock_test.rs new file mode 100644 index 0000000..923adfd --- /dev/null +++ b/tests/sync_lock_test.rs @@ -0,0 +1,256 @@ mod common; use std::fs; use std::path::{Path, PathBuf}; use tempfile::TempDir; use git_collab::error::Error; use git_collab::sync_lock::{SyncLock, SyncLockInfo}; // =========================================================================== // Test helpers // =========================================================================== /// Return the `.git/collab/` path for a temp repo, creating it if needed. fn collab_dir(repo_path: &Path) -> PathBuf { let dir = repo_path.join(".git").join("collab"); fs::create_dir_all(&dir).unwrap(); dir } /// Write a lockfile with the given PID and timestamp. fn write_lockfile(collab_dir: &Path, pid: u32, timestamp: &str) { let info = SyncLockInfo { pid, timestamp: timestamp.to_string(), }; let lock_path = collab_dir.join("sync.lock"); fs::write(&lock_path, info.to_json()).unwrap(); } // =========================================================================== // Phase 2: Foundational tests — serialization // =========================================================================== #[test] fn test_sync_lock_info_roundtrip() { let info = SyncLockInfo { pid: 12345, timestamp: "2026-03-21T10:30:00+00:00".to_string(), }; let json = info.to_json(); let parsed = SyncLockInfo::from_json(&json).unwrap(); assert_eq!(parsed.pid, 12345); assert_eq!(parsed.timestamp, "2026-03-21T10:30:00+00:00"); } #[test] fn test_sync_lock_info_from_invalid_json() { let result = SyncLockInfo::from_json("not json"); assert!(result.is_err()); } // =========================================================================== // Phase 3: User Story 1 — Prevent Concurrent Sync Corruption // =========================================================================== // T006: Test that SyncLock::acquire creates lockfile with correct PID and timestamp #[test] fn test_acquire_creates_lockfile() { let dir = TempDir::new().unwrap(); let repo = git2::Repository::init(dir.path()).unwrap(); let lock = SyncLock::acquire(&repo).unwrap(); assert!(lock.lock_path.exists(), "lockfile should exist after acquire"); let content = fs::read_to_string(&lock.lock_path).unwrap(); let info = SyncLockInfo::from_json(&content).unwrap(); assert_eq!(info.pid, std::process::id()); // Timestamp should be a valid RFC 3339 string chrono::DateTime::parse_from_rfc3339(&info.timestamp) .expect("timestamp should be valid RFC 3339"); } // T007: Test that SyncLock::acquire returns SyncLocked when lockfile exists with live PID #[test] fn test_acquire_returns_sync_locked_when_lock_held() { let dir = TempDir::new().unwrap(); let repo = git2::Repository::init(dir.path()).unwrap(); // Write a lockfile with our own PID (guaranteed alive) let collab = collab_dir(dir.path()); let current_pid = std::process::id(); write_lockfile(&collab, current_pid, &chrono::Utc::now().to_rfc3339()); let result = SyncLock::acquire(&repo); assert!(result.is_err(), "acquire should fail when lock is held"); match result.unwrap_err() { Error::SyncLocked { pid, since } => { assert_eq!(pid, current_pid); assert!(!since.is_empty()); } other => panic!("expected SyncLocked error, got: {}", other), } } // T008: Test that dropping SyncLock deletes the lockfile #[test] fn test_drop_deletes_lockfile() { let dir = TempDir::new().unwrap(); let repo = git2::Repository::init(dir.path()).unwrap(); let lock = SyncLock::acquire(&repo).unwrap(); let lock_path = lock.lock_path.clone(); assert!(lock_path.exists()); drop(lock); assert!(!lock_path.exists(), "lockfile should be deleted after drop"); } // =========================================================================== // Phase 4: User Story 2 — Clear Error Message // =========================================================================== // T015: Test that SyncLocked error message includes PID and human-readable lock age #[test] fn test_sync_locked_error_message_includes_pid_and_age() { let dir = TempDir::new().unwrap(); let repo = git2::Repository::init(dir.path()).unwrap(); let collab = collab_dir(dir.path()); let current_pid = std::process::id(); // Lock from 3 seconds ago let ts = (chrono::Utc::now() - chrono::Duration::seconds(3)).to_rfc3339(); write_lockfile(&collab, current_pid, &ts); let err = SyncLock::acquire(&repo).unwrap_err(); let msg = err.to_string(); assert!( msg.contains(¤t_pid.to_string()), "error should contain PID, got: {}", msg ); // Should contain human-readable age assert!( msg.contains("ago"), "error should contain human-readable age, got: {}", msg ); } // T016: Test that error message suggests waiting or checking process #[test] fn test_sync_locked_error_message_suggests_action() { let dir = TempDir::new().unwrap(); let repo = git2::Repository::init(dir.path()).unwrap(); let collab = collab_dir(dir.path()); let current_pid = std::process::id(); write_lockfile(&collab, current_pid, &chrono::Utc::now().to_rfc3339()); let err = SyncLock::acquire(&repo).unwrap_err(); let msg = err.to_string(); assert!( msg.contains("wait") || msg.contains("remove"), "error should suggest action, got: {}", msg ); } // =========================================================================== // Phase 5: User Story 3 — Stale Lock Recovery // =========================================================================== // T019: Test that acquire succeeds when lockfile exists with a dead PID #[test] fn test_acquire_succeeds_with_dead_pid() { let dir = TempDir::new().unwrap(); let repo = git2::Repository::init(dir.path()).unwrap(); let collab = collab_dir(dir.path()); // PID 999999 is almost certainly not running write_lockfile(&collab, 999999, &chrono::Utc::now().to_rfc3339()); let lock = SyncLock::acquire(&repo); assert!(lock.is_ok(), "acquire should succeed with dead PID: {:?}", lock.err()); // The new lock should be ours let lock = lock.unwrap(); let content = fs::read_to_string(&lock.lock_path).unwrap(); let info = SyncLockInfo::from_json(&content).unwrap(); assert_eq!(info.pid, std::process::id()); } // T020: Test that acquire succeeds when lockfile is older than 10 minutes #[test] fn test_acquire_succeeds_with_old_lock() { let dir = TempDir::new().unwrap(); let repo = git2::Repository::init(dir.path()).unwrap(); let collab = collab_dir(dir.path()); // Lock from 11 minutes ago with our own PID (alive but stale by age) let old_ts = (chrono::Utc::now() - chrono::Duration::minutes(11)).to_rfc3339(); write_lockfile(&collab, std::process::id(), &old_ts); let lock = SyncLock::acquire(&repo); assert!( lock.is_ok(), "acquire should succeed with stale (old) lock: {:?}", lock.err() ); } // T021: Test that acquire succeeds when lockfile contains invalid JSON #[test] fn test_acquire_succeeds_with_corrupted_lockfile() { let dir = TempDir::new().unwrap(); let repo = git2::Repository::init(dir.path()).unwrap(); let collab = collab_dir(dir.path()); let lock_path = collab.join("sync.lock"); fs::write(&lock_path, "this is not json at all").unwrap(); let lock = SyncLock::acquire(&repo); assert!( lock.is_ok(), "acquire should succeed with corrupted lockfile: {:?}", lock.err() ); } // T022: Test format_lock_age helper #[test] fn test_format_lock_age() { use git_collab::sync_lock::format_lock_age; let now = chrono::Utc::now(); let three_sec_ago = (now - chrono::Duration::seconds(3)).to_rfc3339(); let result = format_lock_age(&three_sec_ago); assert!(result.contains("second"), "expected 'second' in: {}", result); assert!(result.contains("ago"), "expected 'ago' in: {}", result); let two_min_ago = (now - chrono::Duration::minutes(2)).to_rfc3339(); let result = format_lock_age(&two_min_ago); assert!(result.contains("minute"), "expected 'minute' in: {}", result); // Invalid timestamp should fall back gracefully let result = format_lock_age("not-a-timestamp"); assert!(!result.is_empty()); } // T023: Test is_process_alive #[test] fn test_is_process_alive() { use git_collab::sync_lock::is_process_alive; // Our own process should be alive assert!(is_process_alive(std::process::id())); // PID 999999 should not be alive (almost certainly) assert!(!is_process_alive(999999)); }