a73x

src/dag.rs

Ref:   Size: 12.0 KiB

use git2::{ObjectType, Oid, Repository, Sort};

use crate::error::Error;
use crate::event::{Action, Event};
use crate::identity::{author_signature, get_author};
use crate::signing::{self, sign_event};

/// The manifest blob content included in every event commit tree.
const MANIFEST_JSON: &[u8] = br#"{"version":1,"format":"git-collab"}"#;

/// Maximum allowed size for an event.json blob (1 MB).
pub const MAX_EVENT_BLOB_SIZE: usize = 1_048_576;

/// Build an event tree containing event.json, signature, pubkey, and manifest.json blobs.
/// Returns the tree OID.
fn build_event_tree(
    repo: &Repository,
    event_blob: Oid,
    sig_blob: Oid,
    pubkey_blob: Oid,
    manifest_blob: Oid,
) -> Result<Oid, Error> {
    let mut tb = repo.treebuilder(None)?;
    tb.insert("event.json", event_blob, 0o100644)?;
    tb.insert("signature", sig_blob, 0o100644)?;
    tb.insert("pubkey", pubkey_blob, 0o100644)?;
    tb.insert("manifest.json", manifest_blob, 0o100644)?;
    let tree_oid = tb.write()?;
    Ok(tree_oid)
}

/// Sign an event, serialize it, and build the event tree. Returns the tree OID.
fn sign_and_build_tree(
    repo: &Repository,
    event: &Event,
    signing_key: &ed25519_dalek::SigningKey,
) -> Result<Oid, Error> {
    let detached = sign_event(event, signing_key)?;
    let event_json = serde_json::to_vec_pretty(event)?;

    let event_blob = repo.blob(&event_json)?;
    let sig_blob = repo.blob(detached.signature.as_bytes())?;
    let pubkey_blob = repo.blob(detached.pubkey.as_bytes())?;
    let manifest_blob = repo.blob(MANIFEST_JSON)?;

    build_event_tree(repo, event_blob, sig_blob, pubkey_blob, manifest_blob)
}

/// Load and deserialize the event from a commit's tree.
fn load_event_from_commit(repo: &Repository, oid: Oid) -> Result<Event, Error> {
    let commit = repo.find_commit(oid)?;
    let tree = commit.tree()?;
    let entry = tree
        .get_name("event.json")
        .ok_or_else(|| git2::Error::from_str("missing event.json in commit tree"))?;

    if entry.kind() != Some(ObjectType::Blob) {
        return Err(Error::Git(git2::Error::from_str(
            "event.json entry is not a blob",
        )));
    }

    let blob = repo.find_blob(entry.id())?;
    let content = blob.content();
    if content.len() > MAX_EVENT_BLOB_SIZE {
        return Err(Error::PayloadTooLarge {
            actual: content.len(),
            limit: MAX_EVENT_BLOB_SIZE,
        });
    }

    Ok(serde_json::from_slice(content)?)
}

/// Read the clock value from the tip commit of a DAG.
/// Since clocks are monotonically increasing, the tip always has the max clock.
/// Returns 0 if the event has clock 0 (pre-migration).
pub fn max_clock(repo: &Repository, tip: Oid) -> Result<u64, Error> {
    Ok(load_event_from_commit(repo, tip)?.clock)
}

/// Create an orphan commit (no parents) with the given event.
/// Returns the new commit OID which also serves as the entity ID.
/// Clones the event internally and sets clock=1.
pub fn create_root_event(
    repo: &Repository,
    event: &Event,
    signing_key: &ed25519_dalek::SigningKey,
) -> Result<Oid, Error> {
    let mut event = event.clone();
    event.clock = 1;

    let tree_oid = sign_and_build_tree(repo, &event, signing_key)?;
    let tree = repo.find_tree(tree_oid)?;

    let sig = author_signature(&event.author)?;
    let message = commit_message(&event.action);

    let oid = repo.commit(None, &sig, &sig, &message, &tree, &[])?;
    Ok(oid)
}

/// Append an event to an existing DAG. The current tip is the parent.
/// Clones the event internally and sets clock = max_clock(tip) + 1.
pub fn append_event(
    repo: &Repository,
    ref_name: &str,
    event: &Event,
    signing_key: &ed25519_dalek::SigningKey,
) -> Result<Oid, Error> {
    let tip = repo.refname_to_id(ref_name)?;
    let current_max = max_clock(repo, tip)?;

    let mut event = event.clone();
    event.clock = current_max + 1;

    let tree_oid = sign_and_build_tree(repo, &event, signing_key)?;
    let tree = repo.find_tree(tree_oid)?;

    let sig = author_signature(&event.author)?;
    let message = commit_message(&event.action);

    let parent = repo.find_commit(tip)?;

    let oid = repo.commit(Some(ref_name), &sig, &sig, &message, &tree, &[&parent])?;
    Ok(oid)
}

/// Build an Event from the given action, filling in timestamp and author
/// automatically. The clock field is set to 0 (callers like `create_root_event`
/// and `append_event` overwrite it).
pub fn build_event(repo: &Repository, action: Action) -> Result<Event, Error> {
    let author = get_author(repo)?;
    Ok(Event {
        timestamp: chrono::Utc::now().to_rfc3339(),
        author,
        action,
        clock: 0,
    })
}

/// Convenience wrapper: load signing key, build event, and append it to an
/// existing DAG ref in one call.
pub fn append_action(
    repo: &Repository,
    ref_name: &str,
    action: Action,
) -> Result<Oid, Error> {
    let sk = signing::load_signing_key(&signing::signing_key_dir()?)?;
    let event = build_event(repo, action)?;
    append_event(repo, ref_name, &event, &sk)
}

/// Convenience wrapper: load signing key, build event, and create a root
/// (orphan) DAG commit. Returns the new commit OID (entity ID).
pub fn create_root_action(
    repo: &Repository,
    action: Action,
) -> Result<Oid, Error> {
    let sk = signing::load_signing_key(&signing::signing_key_dir()?)?;
    let event = build_event(repo, action)?;
    create_root_event(repo, &event, &sk)
}

/// Walk the DAG from the given ref in topological order (oldest first).
/// Returns (commit_oid, event) pairs.
pub fn walk_events(repo: &Repository, ref_name: &str) -> Result<Vec<(Oid, Event)>, Error> {
    let tip = repo.refname_to_id(ref_name)?;
    let mut revwalk = repo.revwalk()?;
    revwalk.set_sorting(Sort::TOPOLOGICAL | Sort::REVERSE)?;
    revwalk.push(tip)?;

    let mut events = Vec::new();
    for oid_result in revwalk {
        let oid = oid_result?;
        let event = load_event_from_commit(repo, oid)?;
        events.push((oid, event));
    }
    Ok(events)
}

/// Outcome of reconciling a local ref with a remote ref.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReconcileOutcome {
    /// Both refs already point to the same commit. No action taken.
    AlreadyCurrent,
    /// Local is ahead of remote. No action taken.
    LocalAhead,
    /// Local was fast-forwarded to the remote tip.
    FastForward,
    /// A merge commit was created to reconcile divergent histories.
    Merge,
}

/// Reconcile a local ref with a remote ref. Returns the outcome and final tip OID.
///
/// - If they're the same: no-op
/// - If remote is ancestor of local: local is ahead, no-op
/// - If local is ancestor of remote: fast-forward
/// - Otherwise: create a merge commit
pub fn reconcile(
    repo: &Repository,
    local_ref: &str,
    remote_ref: &str,
    merge_author: &crate::event::Author,
    signing_key: &ed25519_dalek::SigningKey,
) -> Result<(Oid, ReconcileOutcome), Error> {
    let local_oid = repo.refname_to_id(local_ref)?;
    let remote_oid = repo.refname_to_id(remote_ref)?;

    if local_oid == remote_oid {
        return Ok((local_oid, ReconcileOutcome::AlreadyCurrent));
    }

    let merge_base = repo.merge_base(local_oid, remote_oid).map_err(|e| {
        Error::DisjointHistories {
            local_ref: local_ref.to_string(),
            remote_ref: remote_ref.to_string(),
            detail: e.message().to_string(),
        }
    })?;

    if merge_base == remote_oid {
        // Remote is ancestor of local — local is ahead
        return Ok((local_oid, ReconcileOutcome::LocalAhead));
    }

    if merge_base == local_oid {
        // Local is ancestor of remote — fast-forward
        repo.reference(local_ref, remote_oid, true, "fast-forward reconcile")?;
        return Ok((remote_oid, ReconcileOutcome::FastForward));
    }

    // True fork — create merge commit with clock = max(local, remote) + 1
    let local_max = max_clock(repo, local_oid)?;
    let remote_max = max_clock(repo, remote_oid)?;
    let merge_clock = std::cmp::max(local_max, remote_max) + 1;

    let merge_event = Event {
        timestamp: chrono::Utc::now().to_rfc3339(),
        author: merge_author.clone(),
        action: Action::Merge,
        clock: merge_clock,
    };

    let tree_oid = sign_and_build_tree(repo, &merge_event, signing_key)?;
    let tree = repo.find_tree(tree_oid)?;

    let sig = author_signature(merge_author)?;
    let local_commit = repo.find_commit(local_oid)?;
    let remote_commit = repo.find_commit(remote_oid)?;

    let oid = repo.commit(
        Some(local_ref),
        &sig,
        &sig,
        "collab: reconcile merge",
        &tree,
        &[&local_commit, &remote_commit],
    )?;

    Ok((oid, ReconcileOutcome::Merge))
}

/// Migrate a DAG ref so that every event with clock=0 gets a sequential clock
/// assigned in topological order. Events that already have clock>0 are left as-is.
///
/// **WARNING**: This rewrites the commit chain, producing new OIDs for every
/// commit. If the ref has already been pushed to a remote, other users who
/// fetched the old OIDs will encounter disjoint histories on their next sync.
/// Only call this on refs that have not been shared.
pub fn migrate_clocks(
    repo: &Repository,
    ref_name: &str,
    signing_key: &ed25519_dalek::SigningKey,
) -> Result<(), Error> {
    let events = walk_events(repo, ref_name)?;

    // Check if migration is needed: any event with clock=0?
    let needs_migration = events.iter().any(|(_, e)| e.clock == 0);
    if !needs_migration {
        return Ok(());
    }

    // Rebuild the chain with sequential clocks
    let mut clock = 0u64;
    let mut prev_oid: Option<Oid> = None;

    for (_old_oid, event) in &events {
        clock += 1;
        let mut migrated = event.clone();
        if migrated.clock == 0 {
            migrated.clock = clock;
        } else {
            clock = migrated.clock; // respect existing clocks
        }

        let tree_oid = sign_and_build_tree(repo, &migrated, signing_key)?;
        let tree = repo.find_tree(tree_oid)?;

        let sig = author_signature(&migrated.author)?;
        let message = commit_message(&migrated.action);

        let parents: Vec<git2::Commit> = if let Some(pid) = prev_oid {
            vec![repo.find_commit(pid)?]
        } else {
            vec![]
        };
        let parent_refs: Vec<&git2::Commit> = parents.iter().collect();

        let new_oid = repo.commit(None, &sig, &sig, &message, &tree, &parent_refs)?;
        prev_oid = Some(new_oid);
    }

    // Update the ref to point to the new tip
    if let Some(new_tip) = prev_oid {
        repo.reference(ref_name, new_tip, true, "migrate clocks")?;
    }

    Ok(())
}

fn commit_message(action: &Action) -> String {
    match action {
        Action::IssueOpen { title, .. } => format!("issue: open \"{}\"", title),
        Action::IssueEdit { .. } => "issue: edit".to_string(),
        Action::IssueLabel { ref label } => format!("issue: label \"{}\"", label),
        Action::IssueUnlabel { ref label } => format!("issue: unlabel \"{}\"", label),
        Action::IssueAssign { ref assignee } => format!("issue: assign \"{}\"", assignee),
        Action::IssueUnassign { ref assignee } => format!("issue: unassign \"{}\"", assignee),
        Action::IssueComment { .. } => "issue: comment".to_string(),
        Action::IssueClose { .. } => "issue: close".to_string(),
        Action::IssueReopen => "issue: reopen".to_string(),
        Action::IssueCommitLink { commit } => format!("issue: commit link {}", &commit[..commit.len().min(7)]),
        Action::PatchCreate { title, .. } => format!("patch: create \"{}\"", title),
        Action::PatchRevision { .. } => "patch: revision".to_string(),
        Action::PatchReview { verdict, .. } => format!("patch: review ({})", verdict),
        Action::PatchComment { .. } => "patch: comment".to_string(),
        Action::PatchInlineComment { ref file, line, .. } => {
            format!("patch: inline comment on {}:{}", file, line)
        }
        Action::PatchClose { .. } => "patch: close".to_string(),
        Action::PatchMerge => "patch: merge".to_string(),
        Action::Merge => "collab: merge".to_string(),
    }
}