src/sync.rs
Ref: Size: 20.0 KiB
use std::fs;
use std::path::{Path, PathBuf};
use std::process::Command;
use git2::Repository;
use serde::{Deserialize, Serialize};
use crate::dag;
use crate::error::Error;
use crate::identity::get_author;
use crate::signing;
use crate::sync_lock::SyncLock;
use crate::trust;
// ---------------------------------------------------------------------------
// Ref name validation
// ---------------------------------------------------------------------------
/// Validate that a collab ref ID is a valid 40-character lowercase hex string.
/// This prevents path traversal, null byte injection, and other malicious ref names.
pub fn validate_collab_ref_id(id: &str) -> Result<(), Error> {
if id.len() != 40 {
return Err(Error::InvalidRefName(format!(
"ref ID must be exactly 40 characters, got {}",
id.len()
)));
}
if !id.chars().all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase()) {
return Err(Error::InvalidRefName(format!(
"ref ID must contain only lowercase hex characters [0-9a-f], got {:?}",
id
)));
}
Ok(())
}
// ---------------------------------------------------------------------------
// Per-ref push types (T002a)
// ---------------------------------------------------------------------------
/// Status of pushing a single ref.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum PushStatus {
Pushed,
Failed,
}
/// Result of pushing a single ref to the remote.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RefPushResult {
pub ref_name: String,
pub status: PushStatus,
pub error: Option<String>,
}
/// Aggregated outcome of the entire push phase.
#[derive(Debug, Clone)]
pub struct SyncResult {
pub results: Vec<RefPushResult>,
pub remote: String,
}
impl SyncResult {
pub fn succeeded(&self) -> Vec<&RefPushResult> {
self.results
.iter()
.filter(|r| r.status == PushStatus::Pushed)
.collect()
}
pub fn failed(&self) -> Vec<&RefPushResult> {
self.results
.iter()
.filter(|r| r.status == PushStatus::Failed)
.collect()
}
pub fn is_complete(&self) -> bool {
self.results.iter().all(|r| r.status == PushStatus::Pushed)
}
}
// ---------------------------------------------------------------------------
// Persistent sync state (T002b)
// ---------------------------------------------------------------------------
/// Persistent record of a partially-completed sync.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncState {
pub remote: String,
pub pending_refs: Vec<(String, String)>,
pub timestamp: String,
}
impl SyncState {
/// Path to the sync state file within a repo.
fn path(repo: &Repository) -> PathBuf {
let git_dir = repo.path();
git_dir.join("collab").join("sync-state.json")
}
/// Load sync state from disk. Returns None if the file does not exist.
/// If the file is corrupted, prints a warning, deletes it, and returns None.
pub fn load(repo: &Repository) -> Option<SyncState> {
let path = Self::path(repo);
if !path.exists() {
return None;
}
match fs::read_to_string(&path) {
Ok(contents) => match serde_json::from_str(&contents) {
Ok(state) => Some(state),
Err(e) => {
eprintln!(
"warning: corrupted sync state file ({}); ignoring and proceeding with full sync",
e
);
let _ = fs::remove_file(&path);
None
}
},
Err(_) => None,
}
}
/// Save sync state to disk.
pub fn save(&self, repo: &Repository) -> Result<(), Error> {
let path = Self::path(repo);
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
let json = serde_json::to_string_pretty(self)?;
fs::write(&path, json)?;
Ok(())
}
/// Delete the sync state file.
pub fn clear(repo: &Repository) -> Result<(), Error> {
let path = Self::path(repo);
if path.exists() {
fs::remove_file(&path)?;
}
Ok(())
}
}
// ---------------------------------------------------------------------------
// Push helpers (T003, T004)
// ---------------------------------------------------------------------------
/// Push multiple refs to the remote in a single `git push` invocation.
///
/// Uses non-force refspecs so the remote will reject the push if it has
/// commits we haven't reconciled. This prevents silently discarding events
/// that another pusher added between our fetch and push.
///
/// On success, all refs are marked as Pushed. On failure, falls back to
/// per-ref pushes to determine which specific refs failed.
fn push_refs_batched(workdir: &Path, remote_name: &str, refs: &[String]) -> Vec<RefPushResult> {
if refs.is_empty() {
return Vec::new();
}
let refspecs: Vec<String> = refs.iter().map(|r| format!("{}:{}", r, r)).collect();
let mut args = vec!["push", remote_name];
args.extend(refspecs.iter().map(|s| s.as_str()));
match Command::new("git")
.args(&args)
.current_dir(workdir)
.output()
{
Ok(output) if output.status.success() => {
// All refs pushed successfully
refs.iter()
.map(|r| RefPushResult {
ref_name: r.clone(),
status: PushStatus::Pushed,
error: None,
})
.collect()
}
Ok(_) | Err(_) => {
// Batch failed — retry each ref individually to isolate failures
refs.iter()
.map(|ref_name| push_ref_single(workdir, remote_name, ref_name))
.collect()
}
}
}
/// Push a single ref to the remote. Used as a fallback when batched push fails.
fn push_ref_single(workdir: &Path, remote_name: &str, ref_name: &str) -> RefPushResult {
let refspec = format!("{}:{}", ref_name, ref_name);
match Command::new("git")
.args(["push", remote_name, &refspec])
.current_dir(workdir)
.output()
{
Ok(output) => {
if output.status.success() {
RefPushResult {
ref_name: ref_name.to_string(),
status: PushStatus::Pushed,
error: None,
}
} else {
let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
RefPushResult {
ref_name: ref_name.to_string(),
status: PushStatus::Failed,
error: Some(stderr),
}
}
}
Err(e) => RefPushResult {
ref_name: ref_name.to_string(),
status: PushStatus::Failed,
error: Some(format!("failed to run git push: {}", e)),
},
}
}
/// Collect all collab ref names that should be pushed.
fn collect_push_refs(repo: &Repository) -> Result<Vec<String>, Error> {
let mut refs = Vec::new();
for pattern in &[
"refs/collab/issues/*",
"refs/collab/patches/*",
"refs/collab/archive/issues/*",
"refs/collab/archive/patches/*",
] {
let iter = repo.references_glob(pattern)?;
for r in iter {
let r = r?;
if let Some(name) = r.name() {
refs.push(name.to_string());
}
}
}
Ok(refs)
}
// ---------------------------------------------------------------------------
// Summary printing (T012)
// ---------------------------------------------------------------------------
/// Print a failure summary to stderr.
fn print_sync_summary(result: &SyncResult) {
let succeeded = result.succeeded().len();
let total = result.results.len();
if succeeded == 0 {
eprintln!("\nSync failed: {} of {} refs pushed.", succeeded, total);
} else {
eprintln!(
"\nSync partially failed: {} of {} refs pushed.",
succeeded, total
);
}
let failed = result.failed();
eprintln!("Failed refs:");
for f in &failed {
eprintln!(
" {}: {}",
f.ref_name,
f.error.as_deref().unwrap_or("unknown error")
);
}
eprintln!(
"\nRun `collab sync {}` again to retry {} failed ref(s).",
result.remote,
failed.len()
);
}
/// Clean up refs/collab/sync/* refs.
fn cleanup_sync_refs(repo: &Repository) -> Result<(), Error> {
for prefix in &["refs/collab/sync/issues/", "refs/collab/sync/patches/"] {
let refs: Vec<String> = repo
.references_glob(&format!("{}*", prefix))?
.filter_map(|r| r.ok()?.name().map(|n| n.to_string()))
.collect();
for ref_name in refs {
let mut r = repo.find_reference(&ref_name)?;
r.delete()?;
}
}
// Also clean up any refs under refs/collab/sync/ with remote-name prefix
let refs: Vec<String> = repo
.references_glob("refs/collab/sync/*")?
.filter_map(|r| r.ok()?.name().map(|n| n.to_string()))
.collect();
for ref_name in refs {
let mut r = repo.find_reference(&ref_name)?;
r.delete()?;
}
Ok(())
}
/// Add collab refspecs to all remotes.
pub fn init(repo: &Repository) -> Result<(), Error> {
let remotes = repo.remotes()?;
if remotes.is_empty() {
println!("No remotes configured.");
return Ok(());
}
for remote_name in remotes.iter().flatten() {
let fetch_spec = format!("+refs/collab/*:refs/collab/sync/{}/*", remote_name);
repo.remote_add_fetch(remote_name, &fetch_spec)?;
println!("Configured remote '{}'", remote_name);
}
println!("Collab refspecs initialized.");
Ok(())
}
/// Sync with a specific remote: fetch, reconcile, push.
pub fn sync(repo: &Repository, remote_name: &str) -> Result<(), Error> {
// Acquire advisory lock — held until _lock is dropped (RAII)
let _lock = SyncLock::acquire(repo)?;
let workdir = repo.path().parent().unwrap_or(repo.path()).to_path_buf();
// Check for existing sync state (resume mode)
if let Some(state) = SyncState::load(repo) {
if state.remote == remote_name {
return sync_resume(repo, remote_name, &workdir, &state);
}
// State is for a different remote — ignore it, run full sync
}
let author = get_author(repo)?;
// Step 1: Fetch collab refs using system git (handles SSH agent, credentials, etc.)
println!("Fetching from '{}'...", remote_name);
let fetch_status = Command::new("git")
.args([
"fetch",
remote_name,
"+refs/collab/issues/*:refs/collab/sync/issues/*",
"+refs/collab/patches/*:refs/collab/sync/patches/*",
"+refs/collab/archive/issues/*:refs/collab/sync/archive/issues/*",
"+refs/collab/archive/patches/*:refs/collab/sync/archive/patches/*",
])
.current_dir(&workdir)
.status()
.map_err(|e| Error::Cmd(format!("failed to run git fetch: {}", e)))?;
if !fetch_status.success() {
return Err(Error::Cmd(format!(
"git fetch exited with status {}",
fetch_status
)));
}
// Step 2: Reconcile
// Re-open the repo because git2 caches the ref list at open time.
// The `git fetch` above wrote new refs to disk, but the original
// `repo` handle won't see them. `repo.path()` returns the `.git`
// directory, which is the correct argument to `Repository::open`.
let repo = Repository::open(repo.path())?;
let sk = signing::load_signing_key(&signing::signing_key_dir()?)?;
reconcile_refs(&repo, "issues", &author, &sk)?;
reconcile_refs(&repo, "patches", &author, &sk)?;
// Step 2.5: Scan local branches for Issue: trailers and emit link events.
// Never breaks sync — scan_and_link absorbs per-commit/per-issue errors.
match crate::commit_link::scan_and_link(&repo, &author, &sk) {
Ok(n) if n > 0 => println!("Linked {} commit(s) to issues.", n),
Ok(_) => {}
Err(e) => eprintln!("warning: commit link scan failed: {}", e),
}
// Step 3: Push collab refs individually
println!("Pushing to '{}'...", remote_name);
let refs_to_push = collect_push_refs(&repo)?;
if refs_to_push.is_empty() {
println!("Nothing to push.");
} else {
let sync_result = push_refs(&workdir, remote_name, &refs_to_push);
if !sync_result.is_complete() {
// Save state for resume
let now = chrono::Utc::now().to_rfc3339();
let pending: Vec<(String, String)> = sync_result
.failed()
.iter()
.map(|r| {
(
r.ref_name.clone(),
r.error.clone().unwrap_or_else(|| "unknown error".to_string()),
)
})
.collect();
let state = SyncState {
remote: remote_name.to_string(),
pending_refs: pending,
timestamp: now,
};
state.save(&repo)?;
print_sync_summary(&sync_result);
// Clean up sync refs before returning error
cleanup_sync_refs(&repo)?;
let succeeded = sync_result.succeeded().len();
let total = sync_result.results.len();
return Err(Error::PartialSync {
succeeded,
total,
});
}
}
// Step 4: Clean up sync refs
cleanup_sync_refs(&repo)?;
println!("Sync complete.");
Ok(())
}
/// Resume a partially-failed sync by retrying only the pending refs.
fn sync_resume(
repo: &Repository,
remote_name: &str,
workdir: &Path,
state: &SyncState,
) -> Result<(), Error> {
println!(
"Resuming sync to '{}' ({} refs pending from previous failure)...",
remote_name,
state.pending_refs.len()
);
// Print pending refs with their last error
for (ref_name, last_error) in &state.pending_refs {
println!(" Pending: {} (last error: {})", ref_name, last_error);
}
// Clean up stale sync refs (T018b)
cleanup_sync_refs(repo)?;
// Push the pending refs
let ref_names: Vec<String> = state.pending_refs.iter().map(|(r, _)| r.clone()).collect();
let sync_result = push_refs(workdir, remote_name, &ref_names);
if sync_result.is_complete() {
// All pending refs pushed successfully
SyncState::clear(repo)?;
if sync_result.results.is_empty() {
println!("All pending refs are already up to date. Clearing stale sync state.");
} else {
println!("\nSync complete. All previously-failed refs pushed.");
}
println!("Sync complete.");
Ok(())
} else {
// Some refs still failing - update state
let now = chrono::Utc::now().to_rfc3339();
let pending: Vec<(String, String)> = sync_result
.failed()
.iter()
.map(|r| {
(
r.ref_name.clone(),
r.error.clone().unwrap_or_else(|| "unknown error".to_string()),
)
})
.collect();
let new_state = SyncState {
remote: remote_name.to_string(),
pending_refs: pending,
timestamp: now,
};
new_state.save(repo)?;
print_sync_summary(&sync_result);
eprintln!("To force a full sync, delete .git/collab/sync-state.json");
let succeeded = sync_result.succeeded().len();
let total = sync_result.results.len();
Err(Error::PartialSync {
succeeded,
total,
})
}
}
/// Push all refs in a single batch, printing per-ref status, and return aggregated results.
fn push_refs(workdir: &Path, remote_name: &str, refs: &[String]) -> SyncResult {
let results = push_refs_batched(workdir, remote_name, refs);
for result in &results {
match &result.status {
PushStatus::Pushed => println!(" Pushed {}", result.ref_name),
PushStatus::Failed => {
eprintln!(
" FAILED {}: {}",
result.ref_name,
result.error.as_deref().unwrap_or("unknown error")
);
}
}
}
SyncResult {
results,
remote: remote_name.to_string(),
}
}
/// Reconcile all refs of a given kind (issues or patches) from sync refs.
fn reconcile_refs(
repo: &Repository,
kind: &str,
author: &crate::event::Author,
signing_key: &ed25519_dalek::SigningKey,
) -> Result<(), Error> {
let sync_prefix = format!("refs/collab/sync/{}/", kind);
let sync_refs: Vec<(String, String)> = {
let refs = repo.references_glob(&format!("{}*", sync_prefix))?;
refs.filter_map(|r| {
let r = r.ok()?;
let name = r.name()?.to_string();
let id = name.strip_prefix(&sync_prefix)?.to_string();
Some((name, id))
})
.collect()
};
// Load trust policy once for all refs of this kind
let trust_policy = trust::load_trust_policy(repo)?;
let mut warned_unconfigured = false;
for (remote_ref, id) in &sync_refs {
// Validate the ref ID format before processing
if let Err(e) = validate_collab_ref_id(id) {
eprintln!(" Skipping {} with invalid ref ID {:.8}: {}", kind, id, e);
continue;
}
// Verify all commits on the remote ref before reconciling
match signing::verify_ref(repo, remote_ref) {
Ok(results) => {
// Apply trust checking
let results = trust::check_trust(&results, &trust_policy);
if matches!(trust_policy, trust::TrustPolicy::Unconfigured) && !warned_unconfigured {
eprintln!("warning: no trusted keys configured — all valid signatures accepted. Run 'collab key add --self' to start.");
warned_unconfigured = true;
}
let failures: Vec<_> = results
.iter()
.filter(|r| r.status != signing::VerifyStatus::Valid)
.collect();
if !failures.is_empty() {
for f in &failures {
eprintln!(
" Rejecting {} {:.8}: commit {} — {}",
kind,
id,
f.commit_id,
f.error.as_deref().unwrap_or("unknown error")
);
}
continue;
}
}
Err(e) => {
eprintln!(
" Failed to verify {} {:.8}: {}",
kind, id, e
);
continue;
}
}
let local_ref = format!("refs/collab/{}/{}", kind, id);
if repo.refname_to_id(&local_ref).is_ok() {
match dag::reconcile(repo, &local_ref, remote_ref, author, signing_key) {
Ok((_oid, outcome)) => {
let action = match outcome {
dag::ReconcileOutcome::AlreadyCurrent => "already current",
dag::ReconcileOutcome::LocalAhead => "local ahead",
dag::ReconcileOutcome::FastForward => "fast-forwarded",
dag::ReconcileOutcome::Merge => "merged",
};
println!(" Reconciled {} {:.8} ({})", kind, id, action);
}
Err(e) => eprintln!(" Failed to reconcile {} {:.8}: {}", kind, id, e),
}
} else {
let oid = repo.refname_to_id(remote_ref)?;
repo.reference(&local_ref, oid, false, "sync: new from remote")?;
println!(" New {} {:.8} from remote", kind, id);
}
}
Ok(())
}