src/dag.rs
Ref: Size: 12.0 KiB
use git2::{ObjectType, Oid, Repository, Sort};
use crate::error::Error;
use crate::event::{Action, Event};
use crate::identity::{author_signature, get_author};
use crate::signing::{self, sign_event};
/// The manifest blob content included in every event commit tree.
const MANIFEST_JSON: &[u8] = br#"{"version":1,"format":"git-collab"}"#;
/// Maximum allowed size for an event.json blob (1 MB).
pub const MAX_EVENT_BLOB_SIZE: usize = 1_048_576;
/// Build an event tree containing event.json, signature, pubkey, and manifest.json blobs.
/// Returns the tree OID.
fn build_event_tree(
repo: &Repository,
event_blob: Oid,
sig_blob: Oid,
pubkey_blob: Oid,
manifest_blob: Oid,
) -> Result<Oid, Error> {
let mut tb = repo.treebuilder(None)?;
tb.insert("event.json", event_blob, 0o100644)?;
tb.insert("signature", sig_blob, 0o100644)?;
tb.insert("pubkey", pubkey_blob, 0o100644)?;
tb.insert("manifest.json", manifest_blob, 0o100644)?;
let tree_oid = tb.write()?;
Ok(tree_oid)
}
/// Sign an event, serialize it, and build the event tree. Returns the tree OID.
fn sign_and_build_tree(
repo: &Repository,
event: &Event,
signing_key: &ed25519_dalek::SigningKey,
) -> Result<Oid, Error> {
let detached = sign_event(event, signing_key)?;
let event_json = serde_json::to_vec_pretty(event)?;
let event_blob = repo.blob(&event_json)?;
let sig_blob = repo.blob(detached.signature.as_bytes())?;
let pubkey_blob = repo.blob(detached.pubkey.as_bytes())?;
let manifest_blob = repo.blob(MANIFEST_JSON)?;
build_event_tree(repo, event_blob, sig_blob, pubkey_blob, manifest_blob)
}
/// Load and deserialize the event from a commit's tree.
fn load_event_from_commit(repo: &Repository, oid: Oid) -> Result<Event, Error> {
let commit = repo.find_commit(oid)?;
let tree = commit.tree()?;
let entry = tree
.get_name("event.json")
.ok_or_else(|| git2::Error::from_str("missing event.json in commit tree"))?;
if entry.kind() != Some(ObjectType::Blob) {
return Err(Error::Git(git2::Error::from_str(
"event.json entry is not a blob",
)));
}
let blob = repo.find_blob(entry.id())?;
let content = blob.content();
if content.len() > MAX_EVENT_BLOB_SIZE {
return Err(Error::PayloadTooLarge {
actual: content.len(),
limit: MAX_EVENT_BLOB_SIZE,
});
}
Ok(serde_json::from_slice(content)?)
}
/// Read the clock value from the tip commit of a DAG.
/// Since clocks are monotonically increasing, the tip always has the max clock.
/// Returns 0 if the event has clock 0 (pre-migration).
pub fn max_clock(repo: &Repository, tip: Oid) -> Result<u64, Error> {
Ok(load_event_from_commit(repo, tip)?.clock)
}
/// Create an orphan commit (no parents) with the given event.
/// Returns the new commit OID which also serves as the entity ID.
/// Clones the event internally and sets clock=1.
pub fn create_root_event(
repo: &Repository,
event: &Event,
signing_key: &ed25519_dalek::SigningKey,
) -> Result<Oid, Error> {
let mut event = event.clone();
event.clock = 1;
let tree_oid = sign_and_build_tree(repo, &event, signing_key)?;
let tree = repo.find_tree(tree_oid)?;
let sig = author_signature(&event.author)?;
let message = commit_message(&event.action);
let oid = repo.commit(None, &sig, &sig, &message, &tree, &[])?;
Ok(oid)
}
/// Append an event to an existing DAG. The current tip is the parent.
/// Clones the event internally and sets clock = max_clock(tip) + 1.
pub fn append_event(
repo: &Repository,
ref_name: &str,
event: &Event,
signing_key: &ed25519_dalek::SigningKey,
) -> Result<Oid, Error> {
let tip = repo.refname_to_id(ref_name)?;
let current_max = max_clock(repo, tip)?;
let mut event = event.clone();
event.clock = current_max + 1;
let tree_oid = sign_and_build_tree(repo, &event, signing_key)?;
let tree = repo.find_tree(tree_oid)?;
let sig = author_signature(&event.author)?;
let message = commit_message(&event.action);
let parent = repo.find_commit(tip)?;
let oid = repo.commit(Some(ref_name), &sig, &sig, &message, &tree, &[&parent])?;
Ok(oid)
}
/// Build an Event from the given action, filling in timestamp and author
/// automatically. The clock field is set to 0 (callers like `create_root_event`
/// and `append_event` overwrite it).
pub fn build_event(repo: &Repository, action: Action) -> Result<Event, Error> {
let author = get_author(repo)?;
Ok(Event {
timestamp: chrono::Utc::now().to_rfc3339(),
author,
action,
clock: 0,
})
}
/// Convenience wrapper: load signing key, build event, and append it to an
/// existing DAG ref in one call.
pub fn append_action(
repo: &Repository,
ref_name: &str,
action: Action,
) -> Result<Oid, Error> {
let sk = signing::load_signing_key(&signing::signing_key_dir()?)?;
let event = build_event(repo, action)?;
append_event(repo, ref_name, &event, &sk)
}
/// Convenience wrapper: load signing key, build event, and create a root
/// (orphan) DAG commit. Returns the new commit OID (entity ID).
pub fn create_root_action(
repo: &Repository,
action: Action,
) -> Result<Oid, Error> {
let sk = signing::load_signing_key(&signing::signing_key_dir()?)?;
let event = build_event(repo, action)?;
create_root_event(repo, &event, &sk)
}
/// Walk the DAG from the given ref in topological order (oldest first).
/// Returns (commit_oid, event) pairs.
pub fn walk_events(repo: &Repository, ref_name: &str) -> Result<Vec<(Oid, Event)>, Error> {
let tip = repo.refname_to_id(ref_name)?;
let mut revwalk = repo.revwalk()?;
revwalk.set_sorting(Sort::TOPOLOGICAL | Sort::REVERSE)?;
revwalk.push(tip)?;
let mut events = Vec::new();
for oid_result in revwalk {
let oid = oid_result?;
let event = load_event_from_commit(repo, oid)?;
events.push((oid, event));
}
Ok(events)
}
/// Outcome of reconciling a local ref with a remote ref.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReconcileOutcome {
/// Both refs already point to the same commit. No action taken.
AlreadyCurrent,
/// Local is ahead of remote. No action taken.
LocalAhead,
/// Local was fast-forwarded to the remote tip.
FastForward,
/// A merge commit was created to reconcile divergent histories.
Merge,
}
/// Reconcile a local ref with a remote ref. Returns the outcome and final tip OID.
///
/// - If they're the same: no-op
/// - If remote is ancestor of local: local is ahead, no-op
/// - If local is ancestor of remote: fast-forward
/// - Otherwise: create a merge commit
pub fn reconcile(
repo: &Repository,
local_ref: &str,
remote_ref: &str,
merge_author: &crate::event::Author,
signing_key: &ed25519_dalek::SigningKey,
) -> Result<(Oid, ReconcileOutcome), Error> {
let local_oid = repo.refname_to_id(local_ref)?;
let remote_oid = repo.refname_to_id(remote_ref)?;
if local_oid == remote_oid {
return Ok((local_oid, ReconcileOutcome::AlreadyCurrent));
}
let merge_base = repo.merge_base(local_oid, remote_oid).map_err(|e| {
Error::DisjointHistories {
local_ref: local_ref.to_string(),
remote_ref: remote_ref.to_string(),
detail: e.message().to_string(),
}
})?;
if merge_base == remote_oid {
// Remote is ancestor of local — local is ahead
return Ok((local_oid, ReconcileOutcome::LocalAhead));
}
if merge_base == local_oid {
// Local is ancestor of remote — fast-forward
repo.reference(local_ref, remote_oid, true, "fast-forward reconcile")?;
return Ok((remote_oid, ReconcileOutcome::FastForward));
}
// True fork — create merge commit with clock = max(local, remote) + 1
let local_max = max_clock(repo, local_oid)?;
let remote_max = max_clock(repo, remote_oid)?;
let merge_clock = std::cmp::max(local_max, remote_max) + 1;
let merge_event = Event {
timestamp: chrono::Utc::now().to_rfc3339(),
author: merge_author.clone(),
action: Action::Merge,
clock: merge_clock,
};
let tree_oid = sign_and_build_tree(repo, &merge_event, signing_key)?;
let tree = repo.find_tree(tree_oid)?;
let sig = author_signature(merge_author)?;
let local_commit = repo.find_commit(local_oid)?;
let remote_commit = repo.find_commit(remote_oid)?;
let oid = repo.commit(
Some(local_ref),
&sig,
&sig,
"collab: reconcile merge",
&tree,
&[&local_commit, &remote_commit],
)?;
Ok((oid, ReconcileOutcome::Merge))
}
/// Migrate a DAG ref so that every event with clock=0 gets a sequential clock
/// assigned in topological order. Events that already have clock>0 are left as-is.
///
/// **WARNING**: This rewrites the commit chain, producing new OIDs for every
/// commit. If the ref has already been pushed to a remote, other users who
/// fetched the old OIDs will encounter disjoint histories on their next sync.
/// Only call this on refs that have not been shared.
pub fn migrate_clocks(
repo: &Repository,
ref_name: &str,
signing_key: &ed25519_dalek::SigningKey,
) -> Result<(), Error> {
let events = walk_events(repo, ref_name)?;
// Check if migration is needed: any event with clock=0?
let needs_migration = events.iter().any(|(_, e)| e.clock == 0);
if !needs_migration {
return Ok(());
}
// Rebuild the chain with sequential clocks
let mut clock = 0u64;
let mut prev_oid: Option<Oid> = None;
for (_old_oid, event) in &events {
clock += 1;
let mut migrated = event.clone();
if migrated.clock == 0 {
migrated.clock = clock;
} else {
clock = migrated.clock; // respect existing clocks
}
let tree_oid = sign_and_build_tree(repo, &migrated, signing_key)?;
let tree = repo.find_tree(tree_oid)?;
let sig = author_signature(&migrated.author)?;
let message = commit_message(&migrated.action);
let parents: Vec<git2::Commit> = if let Some(pid) = prev_oid {
vec![repo.find_commit(pid)?]
} else {
vec![]
};
let parent_refs: Vec<&git2::Commit> = parents.iter().collect();
let new_oid = repo.commit(None, &sig, &sig, &message, &tree, &parent_refs)?;
prev_oid = Some(new_oid);
}
// Update the ref to point to the new tip
if let Some(new_tip) = prev_oid {
repo.reference(ref_name, new_tip, true, "migrate clocks")?;
}
Ok(())
}
fn commit_message(action: &Action) -> String {
match action {
Action::IssueOpen { title, .. } => format!("issue: open \"{}\"", title),
Action::IssueEdit { .. } => "issue: edit".to_string(),
Action::IssueLabel { ref label } => format!("issue: label \"{}\"", label),
Action::IssueUnlabel { ref label } => format!("issue: unlabel \"{}\"", label),
Action::IssueAssign { ref assignee } => format!("issue: assign \"{}\"", assignee),
Action::IssueUnassign { ref assignee } => format!("issue: unassign \"{}\"", assignee),
Action::IssueComment { .. } => "issue: comment".to_string(),
Action::IssueClose { .. } => "issue: close".to_string(),
Action::IssueReopen => "issue: reopen".to_string(),
Action::IssueCommitLink { commit } => format!("issue: commit link {}", &commit[..commit.len().min(7)]),
Action::PatchCreate { title, .. } => format!("patch: create \"{}\"", title),
Action::PatchRevision { .. } => "patch: revision".to_string(),
Action::PatchReview { verdict, .. } => format!("patch: review ({})", verdict),
Action::PatchComment { .. } => "patch: comment".to_string(),
Action::PatchInlineComment { ref file, line, .. } => {
format!("patch: inline comment on {}:{}", file, line)
}
Action::PatchClose { .. } => "patch: close".to_string(),
Action::PatchMerge => "patch: merge".to_string(),
Action::Merge => "collab: merge".to_string(),
}
}