a73x

src/sync.rs

Ref:   Size: 20.0 KiB

use std::fs;
use std::path::{Path, PathBuf};
use std::process::Command;

use git2::Repository;
use serde::{Deserialize, Serialize};

use crate::dag;
use crate::error::Error;
use crate::identity::get_author;
use crate::signing;
use crate::sync_lock::SyncLock;
use crate::trust;

// ---------------------------------------------------------------------------
// Ref name validation
// ---------------------------------------------------------------------------

/// Validate that a collab ref ID is a valid 40-character lowercase hex string.
/// This prevents path traversal, null byte injection, and other malicious ref names.
pub fn validate_collab_ref_id(id: &str) -> Result<(), Error> {
    if id.len() != 40 {
        return Err(Error::InvalidRefName(format!(
            "ref ID must be exactly 40 characters, got {}",
            id.len()
        )));
    }
    if !id.chars().all(|c| c.is_ascii_hexdigit() && !c.is_ascii_uppercase()) {
        return Err(Error::InvalidRefName(format!(
            "ref ID must contain only lowercase hex characters [0-9a-f], got {:?}",
            id
        )));
    }
    Ok(())
}

// ---------------------------------------------------------------------------
// Per-ref push types (T002a)
// ---------------------------------------------------------------------------

/// Status of pushing a single ref.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum PushStatus {
    Pushed,
    Failed,
}

/// Result of pushing a single ref to the remote.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RefPushResult {
    pub ref_name: String,
    pub status: PushStatus,
    pub error: Option<String>,
}

/// Aggregated outcome of the entire push phase.
#[derive(Debug, Clone)]
pub struct SyncResult {
    pub results: Vec<RefPushResult>,
    pub remote: String,
}

impl SyncResult {
    pub fn succeeded(&self) -> Vec<&RefPushResult> {
        self.results
            .iter()
            .filter(|r| r.status == PushStatus::Pushed)
            .collect()
    }

    pub fn failed(&self) -> Vec<&RefPushResult> {
        self.results
            .iter()
            .filter(|r| r.status == PushStatus::Failed)
            .collect()
    }

    pub fn is_complete(&self) -> bool {
        self.results.iter().all(|r| r.status == PushStatus::Pushed)
    }
}

// ---------------------------------------------------------------------------
// Persistent sync state (T002b)
// ---------------------------------------------------------------------------

/// Persistent record of a partially-completed sync.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SyncState {
    pub remote: String,
    pub pending_refs: Vec<(String, String)>,
    pub timestamp: String,
}

impl SyncState {
    /// Path to the sync state file within a repo.
    fn path(repo: &Repository) -> PathBuf {
        let git_dir = repo.path();
        git_dir.join("collab").join("sync-state.json")
    }

    /// Load sync state from disk. Returns None if the file does not exist.
    /// If the file is corrupted, prints a warning, deletes it, and returns None.
    pub fn load(repo: &Repository) -> Option<SyncState> {
        let path = Self::path(repo);
        if !path.exists() {
            return None;
        }
        match fs::read_to_string(&path) {
            Ok(contents) => match serde_json::from_str(&contents) {
                Ok(state) => Some(state),
                Err(e) => {
                    eprintln!(
                        "warning: corrupted sync state file ({}); ignoring and proceeding with full sync",
                        e
                    );
                    let _ = fs::remove_file(&path);
                    None
                }
            },
            Err(_) => None,
        }
    }

    /// Save sync state to disk.
    pub fn save(&self, repo: &Repository) -> Result<(), Error> {
        let path = Self::path(repo);
        if let Some(parent) = path.parent() {
            fs::create_dir_all(parent)?;
        }
        let json = serde_json::to_string_pretty(self)?;
        fs::write(&path, json)?;
        Ok(())
    }

    /// Delete the sync state file.
    pub fn clear(repo: &Repository) -> Result<(), Error> {
        let path = Self::path(repo);
        if path.exists() {
            fs::remove_file(&path)?;
        }
        Ok(())
    }
}

// ---------------------------------------------------------------------------
// Push helpers (T003, T004)
// ---------------------------------------------------------------------------

/// Push multiple refs to the remote in a single `git push` invocation.
///
/// Uses non-force refspecs so the remote will reject the push if it has
/// commits we haven't reconciled. This prevents silently discarding events
/// that another pusher added between our fetch and push.
///
/// On success, all refs are marked as Pushed. On failure, falls back to
/// per-ref pushes to determine which specific refs failed.
fn push_refs_batched(workdir: &Path, remote_name: &str, refs: &[String]) -> Vec<RefPushResult> {
    if refs.is_empty() {
        return Vec::new();
    }

    let refspecs: Vec<String> = refs.iter().map(|r| format!("{}:{}", r, r)).collect();
    let mut args = vec!["push", remote_name];
    args.extend(refspecs.iter().map(|s| s.as_str()));

    match Command::new("git")
        .args(&args)
        .current_dir(workdir)
        .output()
    {
        Ok(output) if output.status.success() => {
            // All refs pushed successfully
            refs.iter()
                .map(|r| RefPushResult {
                    ref_name: r.clone(),
                    status: PushStatus::Pushed,
                    error: None,
                })
                .collect()
        }
        Ok(_) | Err(_) => {
            // Batch failed — retry each ref individually to isolate failures
            refs.iter()
                .map(|ref_name| push_ref_single(workdir, remote_name, ref_name))
                .collect()
        }
    }
}

/// Push a single ref to the remote. Used as a fallback when batched push fails.
fn push_ref_single(workdir: &Path, remote_name: &str, ref_name: &str) -> RefPushResult {
    let refspec = format!("{}:{}", ref_name, ref_name);
    match Command::new("git")
        .args(["push", remote_name, &refspec])
        .current_dir(workdir)
        .output()
    {
        Ok(output) => {
            if output.status.success() {
                RefPushResult {
                    ref_name: ref_name.to_string(),
                    status: PushStatus::Pushed,
                    error: None,
                }
            } else {
                let stderr = String::from_utf8_lossy(&output.stderr).trim().to_string();
                RefPushResult {
                    ref_name: ref_name.to_string(),
                    status: PushStatus::Failed,
                    error: Some(stderr),
                }
            }
        }
        Err(e) => RefPushResult {
            ref_name: ref_name.to_string(),
            status: PushStatus::Failed,
            error: Some(format!("failed to run git push: {}", e)),
        },
    }
}

/// Collect all collab ref names that should be pushed.
fn collect_push_refs(repo: &Repository) -> Result<Vec<String>, Error> {
    let mut refs = Vec::new();
    for pattern in &[
        "refs/collab/issues/*",
        "refs/collab/patches/*",
        "refs/collab/archive/issues/*",
        "refs/collab/archive/patches/*",
    ] {
        let iter = repo.references_glob(pattern)?;
        for r in iter {
            let r = r?;
            if let Some(name) = r.name() {
                refs.push(name.to_string());
            }
        }
    }
    Ok(refs)
}

// ---------------------------------------------------------------------------
// Summary printing (T012)
// ---------------------------------------------------------------------------

/// Print a failure summary to stderr.
fn print_sync_summary(result: &SyncResult) {
    let succeeded = result.succeeded().len();
    let total = result.results.len();

    if succeeded == 0 {
        eprintln!("\nSync failed: {} of {} refs pushed.", succeeded, total);
    } else {
        eprintln!(
            "\nSync partially failed: {} of {} refs pushed.",
            succeeded, total
        );
    }

    let failed = result.failed();
    eprintln!("Failed refs:");
    for f in &failed {
        eprintln!(
            "  {}: {}",
            f.ref_name,
            f.error.as_deref().unwrap_or("unknown error")
        );
    }

    eprintln!(
        "\nRun `collab sync {}` again to retry {} failed ref(s).",
        result.remote,
        failed.len()
    );
}

/// Clean up refs/collab/sync/* refs.
fn cleanup_sync_refs(repo: &Repository) -> Result<(), Error> {
    for prefix in &["refs/collab/sync/issues/", "refs/collab/sync/patches/"] {
        let refs: Vec<String> = repo
            .references_glob(&format!("{}*", prefix))?
            .filter_map(|r| r.ok()?.name().map(|n| n.to_string()))
            .collect();
        for ref_name in refs {
            let mut r = repo.find_reference(&ref_name)?;
            r.delete()?;
        }
    }
    // Also clean up any refs under refs/collab/sync/ with remote-name prefix
    let refs: Vec<String> = repo
        .references_glob("refs/collab/sync/*")?
        .filter_map(|r| r.ok()?.name().map(|n| n.to_string()))
        .collect();
    for ref_name in refs {
        let mut r = repo.find_reference(&ref_name)?;
        r.delete()?;
    }
    Ok(())
}

/// Add collab refspecs to all remotes.
pub fn init(repo: &Repository) -> Result<(), Error> {
    let remotes = repo.remotes()?;
    if remotes.is_empty() {
        println!("No remotes configured.");
        return Ok(());
    }
    for remote_name in remotes.iter().flatten() {
        let fetch_spec = format!("+refs/collab/*:refs/collab/sync/{}/*", remote_name);
        repo.remote_add_fetch(remote_name, &fetch_spec)?;
        println!("Configured remote '{}'", remote_name);
    }
    println!("Collab refspecs initialized.");
    Ok(())
}

/// Sync with a specific remote: fetch, reconcile, push.
pub fn sync(repo: &Repository, remote_name: &str) -> Result<(), Error> {
    // Acquire advisory lock — held until _lock is dropped (RAII)
    let _lock = SyncLock::acquire(repo)?;

    let workdir = repo.path().parent().unwrap_or(repo.path()).to_path_buf();

    // Check for existing sync state (resume mode)
    if let Some(state) = SyncState::load(repo) {
        if state.remote == remote_name {
            return sync_resume(repo, remote_name, &workdir, &state);
        }
        // State is for a different remote — ignore it, run full sync
    }

    let author = get_author(repo)?;

    // Step 1: Fetch collab refs using system git (handles SSH agent, credentials, etc.)
    println!("Fetching from '{}'...", remote_name);
    let fetch_status = Command::new("git")
        .args([
            "fetch",
            remote_name,
            "+refs/collab/issues/*:refs/collab/sync/issues/*",
            "+refs/collab/patches/*:refs/collab/sync/patches/*",
            "+refs/collab/archive/issues/*:refs/collab/sync/archive/issues/*",
            "+refs/collab/archive/patches/*:refs/collab/sync/archive/patches/*",
        ])
        .current_dir(&workdir)
        .status()
        .map_err(|e| Error::Cmd(format!("failed to run git fetch: {}", e)))?;

    if !fetch_status.success() {
        return Err(Error::Cmd(format!(
            "git fetch exited with status {}",
            fetch_status
        )));
    }

    // Step 2: Reconcile
    // Re-open the repo because git2 caches the ref list at open time.
    // The `git fetch` above wrote new refs to disk, but the original
    // `repo` handle won't see them. `repo.path()` returns the `.git`
    // directory, which is the correct argument to `Repository::open`.
    let repo = Repository::open(repo.path())?;
    let sk = signing::load_signing_key(&signing::signing_key_dir()?)?;
    reconcile_refs(&repo, "issues", &author, &sk)?;
    reconcile_refs(&repo, "patches", &author, &sk)?;

    // Step 2.5: Scan local branches for Issue: trailers and emit link events.
    // Never breaks sync — scan_and_link absorbs per-commit/per-issue errors.
    match crate::commit_link::scan_and_link(&repo, &author, &sk) {
        Ok(n) if n > 0 => println!("Linked {} commit(s) to issues.", n),
        Ok(_) => {}
        Err(e) => eprintln!("warning: commit link scan failed: {}", e),
    }

    // Step 3: Push collab refs individually
    println!("Pushing to '{}'...", remote_name);
    let refs_to_push = collect_push_refs(&repo)?;

    if refs_to_push.is_empty() {
        println!("Nothing to push.");
    } else {
        let sync_result = push_refs(&workdir, remote_name, &refs_to_push);

        if !sync_result.is_complete() {
            // Save state for resume
            let now = chrono::Utc::now().to_rfc3339();
            let pending: Vec<(String, String)> = sync_result
                .failed()
                .iter()
                .map(|r| {
                    (
                        r.ref_name.clone(),
                        r.error.clone().unwrap_or_else(|| "unknown error".to_string()),
                    )
                })
                .collect();
            let state = SyncState {
                remote: remote_name.to_string(),
                pending_refs: pending,
                timestamp: now,
            };
            state.save(&repo)?;

            print_sync_summary(&sync_result);

            // Clean up sync refs before returning error
            cleanup_sync_refs(&repo)?;

            let succeeded = sync_result.succeeded().len();
            let total = sync_result.results.len();
            return Err(Error::PartialSync {
                succeeded,
                total,
            });
        }
    }

    // Step 4: Clean up sync refs
    cleanup_sync_refs(&repo)?;

    println!("Sync complete.");
    Ok(())
}

/// Resume a partially-failed sync by retrying only the pending refs.
fn sync_resume(
    repo: &Repository,
    remote_name: &str,
    workdir: &Path,
    state: &SyncState,
) -> Result<(), Error> {
    println!(
        "Resuming sync to '{}' ({} refs pending from previous failure)...",
        remote_name,
        state.pending_refs.len()
    );

    // Print pending refs with their last error
    for (ref_name, last_error) in &state.pending_refs {
        println!("  Pending: {} (last error: {})", ref_name, last_error);
    }

    // Clean up stale sync refs (T018b)
    cleanup_sync_refs(repo)?;

    // Push the pending refs
    let ref_names: Vec<String> = state.pending_refs.iter().map(|(r, _)| r.clone()).collect();
    let sync_result = push_refs(workdir, remote_name, &ref_names);

    if sync_result.is_complete() {
        // All pending refs pushed successfully
        SyncState::clear(repo)?;
        if sync_result.results.is_empty() {
            println!("All pending refs are already up to date. Clearing stale sync state.");
        } else {
            println!("\nSync complete. All previously-failed refs pushed.");
        }
        println!("Sync complete.");
        Ok(())
    } else {
        // Some refs still failing - update state
        let now = chrono::Utc::now().to_rfc3339();
        let pending: Vec<(String, String)> = sync_result
            .failed()
            .iter()
            .map(|r| {
                (
                    r.ref_name.clone(),
                    r.error.clone().unwrap_or_else(|| "unknown error".to_string()),
                )
            })
            .collect();
        let new_state = SyncState {
            remote: remote_name.to_string(),
            pending_refs: pending,
            timestamp: now,
        };
        new_state.save(repo)?;

        print_sync_summary(&sync_result);
        eprintln!("To force a full sync, delete .git/collab/sync-state.json");

        let succeeded = sync_result.succeeded().len();
        let total = sync_result.results.len();
        Err(Error::PartialSync {
            succeeded,
            total,
        })
    }
}

/// Push all refs in a single batch, printing per-ref status, and return aggregated results.
fn push_refs(workdir: &Path, remote_name: &str, refs: &[String]) -> SyncResult {
    let results = push_refs_batched(workdir, remote_name, refs);
    for result in &results {
        match &result.status {
            PushStatus::Pushed => println!("  Pushed {}", result.ref_name),
            PushStatus::Failed => {
                eprintln!(
                    "  FAILED {}: {}",
                    result.ref_name,
                    result.error.as_deref().unwrap_or("unknown error")
                );
            }
        }
    }
    SyncResult {
        results,
        remote: remote_name.to_string(),
    }
}

/// Reconcile all refs of a given kind (issues or patches) from sync refs.
fn reconcile_refs(
    repo: &Repository,
    kind: &str,
    author: &crate::event::Author,
    signing_key: &ed25519_dalek::SigningKey,
) -> Result<(), Error> {
    let sync_prefix = format!("refs/collab/sync/{}/", kind);
    let sync_refs: Vec<(String, String)> = {
        let refs = repo.references_glob(&format!("{}*", sync_prefix))?;
        refs.filter_map(|r| {
            let r = r.ok()?;
            let name = r.name()?.to_string();
            let id = name.strip_prefix(&sync_prefix)?.to_string();
            Some((name, id))
        })
        .collect()
    };

    // Load trust policy once for all refs of this kind
    let trust_policy = trust::load_trust_policy(repo)?;
    let mut warned_unconfigured = false;

    for (remote_ref, id) in &sync_refs {
        // Validate the ref ID format before processing
        if let Err(e) = validate_collab_ref_id(id) {
            eprintln!("  Skipping {} with invalid ref ID {:.8}: {}", kind, id, e);
            continue;
        }

        // Verify all commits on the remote ref before reconciling
        match signing::verify_ref(repo, remote_ref) {
            Ok(results) => {
                // Apply trust checking
                let results = trust::check_trust(&results, &trust_policy);

                if matches!(trust_policy, trust::TrustPolicy::Unconfigured) && !warned_unconfigured {
                    eprintln!("warning: no trusted keys configured — all valid signatures accepted. Run 'collab key add --self' to start.");
                    warned_unconfigured = true;
                }

                let failures: Vec<_> = results
                    .iter()
                    .filter(|r| r.status != signing::VerifyStatus::Valid)
                    .collect();
                if !failures.is_empty() {
                    for f in &failures {
                        eprintln!(
                            "  Rejecting {} {:.8}: commit {} — {}",
                            kind,
                            id,
                            f.commit_id,
                            f.error.as_deref().unwrap_or("unknown error")
                        );
                    }
                    continue;
                }
            }
            Err(e) => {
                eprintln!(
                    "  Failed to verify {} {:.8}: {}",
                    kind, id, e
                );
                continue;
            }
        }

        let local_ref = format!("refs/collab/{}/{}", kind, id);
        if repo.refname_to_id(&local_ref).is_ok() {
            match dag::reconcile(repo, &local_ref, remote_ref, author, signing_key) {
                Ok((_oid, outcome)) => {
                    let action = match outcome {
                        dag::ReconcileOutcome::AlreadyCurrent => "already current",
                        dag::ReconcileOutcome::LocalAhead => "local ahead",
                        dag::ReconcileOutcome::FastForward => "fast-forwarded",
                        dag::ReconcileOutcome::Merge => "merged",
                    };
                    println!("  Reconciled {} {:.8} ({})", kind, id, action);
                }
                Err(e) => eprintln!("  Failed to reconcile {} {:.8}: {}", kind, id, e),
            }
        } else {
            let oid = repo.refname_to_id(remote_ref)?;
            repo.reference(&local_ref, oid, false, "sync: new from remote")?;
            println!("  New {} {:.8} from remote", kind, id);
        }
    }
    Ok(())
}