A file-based task manager
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge driver for refs/tsk/queues/* on git-pull

Concurrent pushes used to force-overwrite refs/tsk/queues/*; only the
namespace renumber and the task ref reconciler had real merging.
git-pull now does a 3-way merge of queue trees too:

- index: per-stable 3-way set merge. Entries present in base survive
only if both sides keep them; entries added on either side are
included; removals on either side are honored. Order is
remote-first, then local additions appended.
- inbox: per-key 3-way map merge. Removals on either side win;
remote wins on simultaneous-add conflicts (per-source seq keys make
that impossible in practice).
- can_pull: 3-way bool with local-change-wins.

When the two sides have no common ancestor (each clone independently
rooted its queue ref before any sync), the base is treated as empty
— same handling we should apply to other reconcile paths if their
own first-pull case ever bites.

queue::read_at_commit and queue::build_tree are now `pub` so the
merge driver can read divergent tips and write the merged tree
without going through the active-ref-only `read`/`write` API.
fast_forward_non_task_refs skips queues now that they're properly
reconciled.

Updates concurrent_pushes_dont_clobber to assert the new contract:
both alice's and bob's tasks survive the pull (the test previously
documented the absence of this driver).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+197 -17
+3
src/lib.rs
··· 483 483 ); 484 484 } 485 485 } 486 + for qr in &outcome.queues { 487 + println!("merged queue {}", qr.name); 488 + } 486 489 Ok(()) 487 490 } 488 491 Commands::Share { target, task_id } => command_share(dir, target, task_id),
+173 -2
src/merge.rs
··· 24 24 use crate::errors::Result; 25 25 use crate::namespace::{self, NS_REF_PREFIX, Namespace}; 26 26 use crate::object::{self, StableId, TASK_REF_PREFIX}; 27 + use crate::queue::{self, QUEUE_REF_PREFIX, Queue}; 27 28 use git2::{Commit, Oid, Repository}; 28 - use std::collections::BTreeSet; 29 + use std::collections::{BTreeMap, BTreeSet, HashSet}; 29 30 30 31 #[derive(Clone, Copy, Debug, Default, Eq, PartialEq)] 31 32 pub enum Strategy { ··· 60 61 pub struct PullOutcome { 61 62 pub tasks: Vec<Reconciliation>, 62 63 pub namespaces: Vec<NamespaceReconciliation>, 64 + pub queues: Vec<QueueReconciliation>, 63 65 } 64 66 65 67 pub const FETCH_PREFIX: &str = "refs/tsk-fetched/"; ··· 345 347 } 346 348 } 347 349 350 + /// One queue's reconciliation outcome at pull time. Currently we only 351 + /// surface that a non-trivial merge happened; counts/diffs could be 352 + /// added later if useful. 353 + #[derive(Debug)] 354 + pub struct QueueReconciliation { 355 + pub name: String, 356 + } 357 + 358 + /// 3-way merge each queue ref against its fetched counterpart. 359 + /// 360 + /// `index`: per-stable-id 3-way set merge — entries present in *base* 361 + /// stay only if both sides keep them; entries added on either side are 362 + /// included; entries removed on either side are dropped. Order is 363 + /// remote-first, then local additions appended. 364 + /// 365 + /// `inbox`: per-key 3-way map merge. Removals on either side win; 366 + /// adds on either side are included; if both sides set the same new key 367 + /// to different stables (shouldn't happen — keys carry a per-source 368 + /// sequence), remote wins. 369 + /// 370 + /// `can_pull`: 3-way bool. Local change wins if it differs from base; 371 + /// otherwise take remote. 372 + pub fn reconcile_queue_refs( 373 + repo: &Repository, 374 + remote: &str, 375 + ) -> Result<Vec<QueueReconciliation>> { 376 + let fetched = format!("{}queues/", fetched_prefix(remote)); 377 + let mut names: BTreeSet<String> = BTreeSet::new(); 378 + for r in repo.references_glob(&format!("{QUEUE_REF_PREFIX}*"))? { 379 + let r = r?; 380 + if let Some(n) = r.name().and_then(|n| n.strip_prefix(QUEUE_REF_PREFIX)) { 381 + names.insert(n.to_string()); 382 + } 383 + } 384 + for r in repo.references_glob(&format!("{fetched}*"))? { 385 + let r = r?; 386 + if let Some(n) = r.name().and_then(|n| n.strip_prefix(fetched.as_str())) { 387 + names.insert(n.to_string()); 388 + } 389 + } 390 + let mut out = Vec::new(); 391 + for name in names { 392 + let local = repo 393 + .find_reference(&queue::refname(&name)) 394 + .ok() 395 + .and_then(|r| r.target()); 396 + let remote_oid = repo 397 + .find_reference(&format!("{fetched}{name}")) 398 + .ok() 399 + .and_then(|r| r.target()); 400 + if let Some(rec) = reconcile_queue_one(repo, &name, local, remote_oid)? { 401 + out.push(rec); 402 + } 403 + } 404 + Ok(out) 405 + } 406 + 407 + fn reconcile_queue_one( 408 + repo: &Repository, 409 + name: &str, 410 + local: Option<Oid>, 411 + remote: Option<Oid>, 412 + ) -> Result<Option<QueueReconciliation>> { 413 + match (local, remote) { 414 + (None, None) | (Some(_), None) => Ok(None), 415 + (None, Some(r)) => { 416 + repo.reference(&queue::refname(name), r, true, "pull-import")?; 417 + Ok(None) 418 + } 419 + (Some(l), Some(r)) if l == r => Ok(None), 420 + (Some(l), Some(r)) => { 421 + if repo.graph_descendant_of(l, r).unwrap_or(false) { 422 + return Ok(None); 423 + } 424 + if repo.graph_descendant_of(r, l).unwrap_or(false) { 425 + repo.reference(&queue::refname(name), r, true, "fast-forward")?; 426 + return Ok(None); 427 + } 428 + // No common ancestor when each clone independently rooted 429 + // its queue ref; treat the base as empty in that case. 430 + let base_q = match repo.merge_base(l, r).ok() { 431 + Some(oid) => queue::read_at_commit(repo, name, oid)?, 432 + None => Queue::new(name), 433 + }; 434 + let local_q = queue::read_at_commit(repo, name, l)?; 435 + let remote_q = queue::read_at_commit(repo, name, r)?; 436 + let merged = three_way_queue_merge(&base_q, &local_q, &remote_q); 437 + let tree_oid = queue::build_tree(repo, &merged)?; 438 + let sig = object::signature(repo); 439 + let local_commit = repo.find_commit(l)?; 440 + let remote_commit = repo.find_commit(r)?; 441 + let parents: Vec<&Commit> = vec![&local_commit, &remote_commit]; 442 + let new_oid = repo.commit( 443 + None, 444 + &sig, 445 + &sig, 446 + &format!("merge-queue {name}"), 447 + &repo.find_tree(tree_oid)?, 448 + &parents, 449 + )?; 450 + repo.reference(&queue::refname(name), new_oid, true, "merge")?; 451 + Ok(Some(QueueReconciliation { name: name.to_string() })) 452 + } 453 + } 454 + } 455 + 456 + fn three_way_queue_merge(base: &Queue, local: &Queue, remote: &Queue) -> Queue { 457 + let base_set: HashSet<&StableId> = base.index.iter().collect(); 458 + let local_set: HashSet<&StableId> = local.index.iter().collect(); 459 + let remote_set: HashSet<&StableId> = remote.index.iter().collect(); 460 + let keep: HashSet<&StableId> = base_set 461 + .iter() 462 + .chain(local_set.iter()) 463 + .chain(remote_set.iter()) 464 + .copied() 465 + .filter(|s| { 466 + let in_base = base_set.contains(*s); 467 + let in_local = local_set.contains(*s); 468 + let in_remote = remote_set.contains(*s); 469 + // present in base → kept iff neither side removed it. 470 + // not in base → added by either side, keep. 471 + if in_base { in_local && in_remote } else { in_local || in_remote } 472 + }) 473 + .collect(); 474 + 475 + let mut index = Vec::new(); 476 + let mut seen: HashSet<StableId> = HashSet::new(); 477 + for s in remote.index.iter().chain(local.index.iter()) { 478 + if keep.contains(s) && seen.insert(s.clone()) { 479 + index.push(s.clone()); 480 + } 481 + } 482 + 483 + let mut inbox: BTreeMap<String, StableId> = BTreeMap::new(); 484 + let all_keys: BTreeSet<&String> = base 485 + .inbox 486 + .keys() 487 + .chain(local.inbox.keys()) 488 + .chain(remote.inbox.keys()) 489 + .collect(); 490 + for k in all_keys { 491 + let in_base = base.inbox.contains_key(k); 492 + let lv = local.inbox.get(k); 493 + let rv = remote.inbox.get(k); 494 + if in_base { 495 + // Removal on either side wins; otherwise prefer remote on conflict. 496 + if let (Some(_), Some(rv)) = (lv, rv) { 497 + inbox.insert(k.clone(), rv.clone()); 498 + } 499 + } else { 500 + // New on either side; remote wins on simultaneous-add conflict. 501 + if let Some(v) = rv.or(lv) { 502 + inbox.insert(k.clone(), v.clone()); 503 + } 504 + } 505 + } 506 + 507 + let can_pull = if local.can_pull == base.can_pull { 508 + remote.can_pull 509 + } else { 510 + local.can_pull 511 + }; 512 + 513 + Queue { index, can_pull, inbox } 514 + } 515 + 348 516 /// After task refs are reconciled, copy every other fetched ref 349 517 /// (`refs/tsk-fetched/<remote>/{namespaces,queues,properties}/*`) onto its 350 518 /// `refs/tsk/*` counterpart with force-update. Better merging for these is ··· 359 527 let Some(rest) = name.strip_prefix(prefix.as_str()) else { 360 528 continue; 361 529 }; 362 - if rest.starts_with("tasks/") || rest.starts_with("namespaces/") { 530 + if rest.starts_with("tasks/") 531 + || rest.starts_with("namespaces/") 532 + || rest.starts_with("queues/") 533 + { 363 534 continue; 364 535 } 365 536 let Some(target) = repo.find_reference(&name).ok().and_then(|r| r.target()) else {
+10 -8
src/queue.rs
··· 58 58 } 59 59 60 60 pub fn read(repo: &Repository, name: &str) -> Result<Queue> { 61 - let Ok(r) = repo.find_reference(&refname(name)) else { 61 + let Some(target) = repo.find_reference(&refname(name)).ok().and_then(|r| r.target()) else { 62 62 return Ok(Queue::new(name)); 63 63 }; 64 - let Some(target) = r.target() else { 65 - return Ok(Queue::new(name)); 66 - }; 67 - let tree = repo.find_commit(target)?.tree()?; 64 + read_at_commit(repo, name, target) 65 + } 66 + 67 + /// Read a queue from the tree of a specific commit (used by the queue 68 + /// merge driver to compare local and fetched-remote tips). 69 + pub fn read_at_commit(repo: &Repository, name: &str, commit_oid: Oid) -> Result<Queue> { 70 + let tree = repo.find_commit(commit_oid)?.tree()?; 68 71 let mut q = Queue::new(name); 69 72 if let Some(e) = tree.get_name(INDEX_FILE) { 70 73 let blob = e.to_object(repo)?.peel_to_blob()?; ··· 85 88 let blob = ie.to_object(repo)?.peel_to_blob()?; 86 89 let stable = String::from_utf8_lossy(blob.content()).trim().to_string(); 87 90 if !stable.is_empty() { 88 - q.inbox 89 - .insert(name.to_string(), StableId(stable)); 91 + q.inbox.insert(name.to_string(), StableId(stable)); 90 92 } 91 93 } 92 94 } 93 95 Ok(q) 94 96 } 95 97 96 - fn build_tree(repo: &Repository, q: &Queue) -> Result<Oid> { 98 + pub fn build_tree(repo: &Repository, q: &Queue) -> Result<Oid> { 97 99 let mut tb = repo.treebuilder(None)?; 98 100 let index_text: String = q 99 101 .index
+2 -1
src/workspace.rs
··· 1055 1055 let repo = self.repo()?; 1056 1056 let tasks = merge::reconcile_task_refs(&repo, remote, strategy)?; 1057 1057 let namespaces = merge::reconcile_namespace_refs(&repo, remote)?; 1058 + let queues = merge::reconcile_queue_refs(&repo, remote)?; 1058 1059 merge::fast_forward_non_task_refs(&repo, remote)?; 1059 - Ok(merge::PullOutcome { tasks, namespaces }) 1060 + Ok(merge::PullOutcome { tasks, namespaces, queues }) 1060 1061 } 1061 1062 } 1062 1063
+9 -6
tests/multi_user.rs
··· 143 143 "bob's push should fail (non-fast-forward); stderr={stderr}" 144 144 ); 145 145 146 - // After bob pulls, his local refs are overwritten with alice's state 147 - // (v1 has no merge driver for refs/tsk/queues/* — that's tracked for 148 - // a follow-up). The safety property we DO have is that the failed 149 - // push above didn't silently win. 150 - let (_, _, _) = tsk(&bob, &["git-pull"]); 146 + // After bob pulls, the queue merge driver merges both sides so neither 147 + // task is lost. Namespace conflict (both allocated tsk-1) auto-renumbers 148 + // the local binding (tsk-12 + tsk-34). 149 + tsk_ok(&bob, &["git-pull"]); 151 150 let listed = tsk_ok(&bob, &["list"]); 152 151 assert!( 153 152 listed.contains("alice work"), 154 - "after force-pull bob inherits alice's queue state: {listed}" 153 + "alice's task must survive the merge: {listed}" 154 + ); 155 + assert!( 156 + listed.contains("bob work"), 157 + "bob's task must survive the merge: {listed}" 155 158 ); 156 159 } 157 160