Our Personal Data Server from scratch!
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(tranquil-store/gauntlet): op surface, oracle, workload for eventlog & reads

Lewis: May this revision serve well! <lu5a@proton.me>

+321 -31
+20 -8
crates/tranquil-store/src/gauntlet/mod.rs
··· 2 2 pub mod invariants; 3 3 pub mod op; 4 4 pub mod oracle; 5 + pub mod overrides; 6 + pub mod regression; 5 7 pub mod runner; 6 8 pub mod scenarios; 9 + pub mod shrink; 7 10 pub mod workload; 8 11 9 - pub use invariants::{Invariant, InvariantSet, InvariantViolation, invariants_for}; 10 - pub use op::{CollectionName, Op, OpStream, RecordKey, Seed, ValueSeed}; 11 - pub use oracle::Oracle; 12 + pub use invariants::{ 13 + EventLogSnapshot, Invariant, InvariantSet, InvariantViolation, SnapshotEvent, invariants_for, 14 + }; 15 + pub use op::{ 16 + CollectionName, DidSeed, EventKind, Op, OpStream, PayloadSeed, RecordKey, RetentionSecs, Seed, 17 + ValueSeed, 18 + }; 19 + pub use oracle::{EventExpectation, Oracle}; 20 + pub use overrides::{ConfigOverrides, GroupCommitOverrides, StoreOverrides}; 21 + pub use regression::{RegressionRecord, RegressionViolation, default_root as regression_root}; 12 22 pub use runner::{ 13 - Gauntlet, GauntletBuildError, GauntletConfig, GauntletReport, IoBackend, MaxFileSize, OpIndex, 14 - OpInterval, OpsExecuted, RestartCount, RestartPolicy, RunLimits, ShardCount, StoreConfig, 15 - WallMs, 23 + EventLogConfig, Gauntlet, GauntletBuildError, GauntletConfig, GauntletReport, Harness, 24 + IoBackend, MaxFileSize, MaxSegmentSize, OpErrorCount, OpIndex, OpInterval, OpsExecuted, 25 + RestartCount, RestartPolicy, RunLimits, ShardCount, StoreConfig, WallMs, WriterConcurrency, 16 26 }; 17 - pub use scenarios::{Scenario, config_for}; 27 + pub use scenarios::{Scenario, UnknownScenario, config_for}; 28 + pub use shrink::{ShrinkOutcome, shrink_failure}; 18 29 pub use workload::{ 19 - ByteRange, KeySpaceSize, OpCount, OpWeights, SizeDistribution, ValueBytes, WorkloadModel, 30 + ByteRange, DidSpaceSize, KeySpaceSize, OpCount, OpWeights, RetentionMaxSecs, SizeDistribution, 31 + ValueBytes, WorkloadModel, 20 32 };
+148 -10
crates/tranquil-store/src/gauntlet/op.rs
··· 1 - #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] 1 + use serde::{Deserialize, Serialize}; 2 + 3 + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] 2 4 pub struct Seed(pub u64); 3 5 4 - #[derive(Debug, Clone, PartialEq, Eq, Hash)] 6 + #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] 5 7 pub struct CollectionName(pub String); 6 8 7 - #[derive(Debug, Clone, PartialEq, Eq, Hash)] 9 + #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] 8 10 pub struct RecordKey(pub String); 9 11 10 - #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] 12 + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] 11 13 pub struct ValueSeed(pub u32); 12 14 13 - #[derive(Debug, Clone)] 15 + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] 16 + pub struct DidSeed(pub u32); 17 + 18 + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] 19 + pub struct PayloadSeed(pub u32); 20 + 21 + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] 22 + pub struct RetentionSecs(pub u32); 23 + 24 + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)] 25 + pub enum EventKind { 26 + Commit, 27 + Identity, 28 + Account, 29 + Sync, 30 + } 31 + 32 + #[derive(Debug, Clone, Serialize, Deserialize)] 14 33 pub enum Op { 15 34 AddRecord { 16 35 collection: CollectionName, ··· 23 42 }, 24 43 Compact, 25 44 Checkpoint, 45 + AppendEvent { 46 + did_seed: DidSeed, 47 + event_kind: EventKind, 48 + payload_seed: PayloadSeed, 49 + }, 50 + SyncEventLog, 51 + RunRetention { 52 + max_age_secs: RetentionSecs, 53 + }, 54 + ReadRecord { 55 + collection: CollectionName, 56 + rkey: RecordKey, 57 + }, 58 + ReadBlock { 59 + value_seed: ValueSeed, 60 + }, 26 61 } 27 62 28 - #[derive(Debug, Clone)] 63 + impl Op { 64 + pub const fn is_read_only(&self) -> bool { 65 + matches!(self, Op::ReadRecord { .. } | Op::ReadBlock { .. }) 66 + } 67 + } 68 + 69 + #[derive(Debug, Clone, Default, Serialize, Deserialize)] 29 70 pub struct OpStream { 30 71 ops: Vec<Op>, 31 72 } ··· 35 76 Self { ops } 36 77 } 37 78 79 + pub fn empty() -> Self { 80 + Self { ops: Vec::new() } 81 + } 82 + 83 + pub fn as_slice(&self) -> &[Op] { 84 + &self.ops 85 + } 86 + 38 87 pub fn into_vec(self) -> Vec<Op> { 39 88 self.ops 40 89 } ··· 51 100 self.ops.is_empty() 52 101 } 53 102 54 - pub fn shrink(&self) -> Option<OpStream> { 55 - (self.ops.len() >= 2).then(|| { 56 - let half = self.ops.len() / 2; 57 - OpStream::from_vec(self.ops[..half].to_vec()) 103 + pub fn shrink_candidates(&self) -> impl Iterator<Item = OpStream> + '_ { 104 + let len = self.ops.len(); 105 + let chunk_sizes: Vec<usize> = std::iter::successors((len >= 2).then_some(len / 2), |&s| { 106 + (s >= 2).then_some(s / 2) 58 107 }) 108 + .collect(); 109 + 110 + let chunk_candidates = chunk_sizes.into_iter().flat_map(move |chunk_size| { 111 + let count = len.div_ceil(chunk_size); 112 + (0..count).map(move |i| { 113 + let start = i * chunk_size; 114 + let end = (start + chunk_size).min(len); 115 + let mut reduced = Vec::with_capacity(len - (end - start)); 116 + reduced.extend_from_slice(&self.ops[..start]); 117 + reduced.extend_from_slice(&self.ops[end..]); 118 + OpStream::from_vec(reduced) 119 + }) 120 + }); 121 + 122 + let single_candidates = (0..len).map(move |i| { 123 + let mut reduced = self.ops.clone(); 124 + reduced.remove(i); 125 + OpStream::from_vec(reduced) 126 + }); 127 + 128 + chunk_candidates.chain(single_candidates) 129 + } 130 + 131 + pub fn shrink_to_fixpoint(mut self, mut fails: impl FnMut(&OpStream) -> bool) -> OpStream { 132 + loop { 133 + let next = self.shrink_candidates().find(|c| !c.is_empty() && fails(c)); 134 + match next { 135 + Some(smaller) => self = smaller, 136 + None => return self, 137 + } 138 + } 139 + } 140 + } 141 + 142 + #[cfg(test)] 143 + mod tests { 144 + use super::*; 145 + 146 + fn stream(n: usize) -> OpStream { 147 + OpStream::from_vec( 148 + (0..n) 149 + .map(|i| Op::AddRecord { 150 + collection: CollectionName("c".into()), 151 + rkey: RecordKey(format!("{i:04}")), 152 + value_seed: ValueSeed(i as u32), 153 + }) 154 + .collect(), 155 + ) 156 + } 157 + 158 + fn contains_index(s: &OpStream, target: u32) -> bool { 159 + s.iter() 160 + .any(|op| matches!(op, Op::AddRecord { value_seed, .. } if value_seed.0 == target)) 161 + } 162 + 163 + #[test] 164 + fn shrink_candidates_nonempty_for_len_ge_2() { 165 + let s = stream(8); 166 + let count = s.shrink_candidates().count(); 167 + assert!(count > 0); 168 + } 169 + 170 + #[test] 171 + fn shrink_candidates_empty_for_len_0() { 172 + let s = OpStream::from_vec(Vec::new()); 173 + assert_eq!(s.shrink_candidates().count(), 0); 174 + } 175 + 176 + #[test] 177 + fn shrink_candidates_includes_every_single_removal() { 178 + let s = stream(5); 179 + let singles: Vec<_> = s.shrink_candidates().filter(|c| c.len() == 4).collect(); 180 + assert!( 181 + singles.len() >= 5, 182 + "expected at least 5 size-4 candidates, got {}", 183 + singles.len() 184 + ); 185 + } 186 + 187 + #[test] 188 + fn shrink_to_fixpoint_converges_to_culprit() { 189 + let s = stream(64); 190 + let shrunk = s.shrink_to_fixpoint(|c| contains_index(c, 17)); 191 + assert!(contains_index(&shrunk, 17)); 192 + assert!( 193 + shrunk.len() < 4, 194 + "expected shrink to close on culprit, got {} ops", 195 + shrunk.len() 196 + ); 59 197 } 60 198 }
+57 -1
crates/tranquil-store/src/gauntlet/oracle.rs
··· 2 2 3 3 use cid::Cid; 4 4 5 - use super::op::{CollectionName, RecordKey}; 5 + use super::op::{CollectionName, EventKind, RecordKey}; 6 6 use crate::blockstore::CidBytes; 7 + use crate::eventlog::EventSequence; 7 8 8 9 #[derive(Debug, thiserror::Error, PartialEq, Eq)] 9 10 #[error("unexpected CID encoding: got {actual} bytes, expected 36 for sha256 CIDv1")] ··· 11 12 pub actual: usize, 12 13 } 13 14 15 + #[derive(Debug, Clone, Copy, PartialEq, Eq)] 16 + pub struct EventExpectation { 17 + pub seq: EventSequence, 18 + pub timestamp_us: u64, 19 + pub kind: EventKind, 20 + pub did_hash: u32, 21 + } 22 + 14 23 #[derive(Debug, Default)] 15 24 pub struct Oracle { 16 25 live: HashMap<(CollectionName, RecordKey), CidBytes>, 17 26 current_root: Option<Cid>, 18 27 mst_node_cids: Vec<CidBytes>, 28 + synced_events: Vec<EventExpectation>, 29 + unsynced_events: Vec<EventExpectation>, 30 + last_synced_seq: Option<EventSequence>, 31 + last_retention_cutoff_us: Option<u64>, 19 32 } 20 33 21 34 impl Oracle { ··· 36 49 self.live.remove(&(coll.clone(), rkey.clone())) 37 50 } 38 51 52 + pub fn contains_record(&self, coll: &CollectionName, rkey: &RecordKey) -> bool { 53 + self.live.contains_key(&(coll.clone(), rkey.clone())) 54 + } 55 + 39 56 pub fn set_root(&mut self, root: Cid) { 40 57 self.current_root = Some(root); 41 58 } ··· 70 87 .live_records() 71 88 .map(|(c, r, v)| (format!("record {}/{}", c.0, r.0), *v)); 72 89 nodes.chain(records).collect() 90 + } 91 + 92 + pub fn record_event_append(&mut self, event: EventExpectation) { 93 + self.unsynced_events.push(event); 94 + } 95 + 96 + pub fn record_event_sync(&mut self, synced_through: EventSequence) { 97 + let (promoted, remaining): (Vec<_>, Vec<_>) = self 98 + .unsynced_events 99 + .drain(..) 100 + .partition(|e| e.seq <= synced_through); 101 + self.synced_events.extend(promoted); 102 + self.unsynced_events = remaining; 103 + self.last_synced_seq = Some(synced_through); 104 + } 105 + 106 + pub fn record_crash(&mut self) { 107 + self.unsynced_events.clear(); 108 + } 109 + 110 + pub fn record_retention(&mut self, cutoff_us: u64) { 111 + self.synced_events.retain(|e| e.timestamp_us >= cutoff_us); 112 + self.last_retention_cutoff_us = Some(cutoff_us); 113 + } 114 + 115 + pub fn synced_events(&self) -> &[EventExpectation] { 116 + &self.synced_events 117 + } 118 + 119 + pub fn unsynced_events(&self) -> &[EventExpectation] { 120 + &self.unsynced_events 121 + } 122 + 123 + pub fn last_synced_seq(&self) -> Option<EventSequence> { 124 + self.last_synced_seq 125 + } 126 + 127 + pub fn last_retention_cutoff_us(&self) -> Option<u64> { 128 + self.last_retention_cutoff_us 73 129 } 74 130 } 75 131
+96 -12
crates/tranquil-store/src/gauntlet/workload.rs
··· 1 - use super::op::{CollectionName, Op, OpStream, RecordKey, Seed, ValueSeed}; 1 + use super::op::{ 2 + CollectionName, DidSeed, EventKind, Op, OpStream, PayloadSeed, RecordKey, RetentionSecs, Seed, 3 + ValueSeed, 4 + }; 2 5 3 6 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] 4 7 pub struct ValueBytes(pub u32); ··· 9 12 #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] 10 13 pub struct OpCount(pub usize); 11 14 12 - #[derive(Debug, Clone, Copy)] 15 + #[derive(Debug, Clone, Copy, Default)] 13 16 pub struct OpWeights { 14 17 pub add: u32, 15 18 pub delete: u32, 16 19 pub compact: u32, 17 20 pub checkpoint: u32, 21 + pub append_event: u32, 22 + pub sync_event_log: u32, 23 + pub run_retention: u32, 24 + pub read_record: u32, 25 + pub read_block: u32, 18 26 } 19 27 20 28 impl OpWeights { 21 29 pub const fn total(&self) -> u32 { 22 - self.add + self.delete + self.compact + self.checkpoint 30 + self.add 31 + + self.delete 32 + + self.compact 33 + + self.checkpoint 34 + + self.append_event 35 + + self.sync_event_log 36 + + self.run_retention 37 + + self.read_record 38 + + self.read_block 39 + } 40 + 41 + pub const fn touches_eventlog(&self) -> bool { 42 + self.append_event > 0 || self.sync_event_log > 0 || self.run_retention > 0 23 43 } 24 44 } 25 45 ··· 51 71 pub enum SizeDistribution { 52 72 Fixed(ValueBytes), 53 73 Uniform(ByteRange), 74 + HeavyTail(ByteRange), 54 75 } 55 76 77 + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] 78 + pub struct DidSpaceSize(pub u32); 79 + 80 + #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] 81 + pub struct RetentionMaxSecs(pub u32); 82 + 56 83 #[derive(Debug, Clone)] 57 84 pub struct WorkloadModel { 58 85 pub weights: OpWeights, 59 86 pub size_distribution: SizeDistribution, 60 87 pub collections: Vec<CollectionName>, 61 88 pub key_space: KeySpaceSize, 89 + pub did_space: DidSpaceSize, 90 + pub retention_max_secs: RetentionMaxSecs, 91 + } 92 + 93 + impl Default for WorkloadModel { 94 + fn default() -> Self { 95 + Self { 96 + weights: OpWeights { 97 + add: 80, 98 + delete: 10, 99 + compact: 5, 100 + checkpoint: 5, 101 + append_event: 0, 102 + sync_event_log: 0, 103 + run_retention: 0, 104 + read_record: 0, 105 + read_block: 0, 106 + }, 107 + size_distribution: SizeDistribution::Fixed(ValueBytes(64)), 108 + collections: vec![CollectionName("app.bsky.feed.post".to_string())], 109 + key_space: KeySpaceSize(200), 110 + did_space: DidSpaceSize(32), 111 + retention_max_secs: RetentionMaxSecs(3600), 112 + } 113 + } 62 114 } 63 115 64 116 impl WorkloadModel { ··· 77 129 let coll = self.collections[rng.next_usize() % self.collections.len()].clone(); 78 130 let rkey = RecordKey(format!("{:06}", rng.next_u32() % self.key_space.0.max(1))); 79 131 80 - let (a, d, c) = ( 81 - self.weights.add, 82 - self.weights.add + self.weights.delete, 83 - self.weights.add + self.weights.delete + self.weights.compact, 84 - ); 132 + let w = &self.weights; 133 + let t1 = w.add; 134 + let t2 = t1 + w.delete; 135 + let t3 = t2 + w.compact; 136 + let t4 = t3 + w.checkpoint; 137 + let t5 = t4 + w.append_event; 138 + let t6 = t5 + w.sync_event_log; 139 + let t7 = t6 + w.run_retention; 140 + let t8 = t7 + w.read_record; 141 + 85 142 match bucket { 86 - b if b < a => Op::AddRecord { 143 + b if b < t1 => Op::AddRecord { 87 144 collection: coll, 88 145 rkey, 89 146 value_seed: ValueSeed(rng.next_u32()), 90 147 }, 91 - b if b < d => Op::DeleteRecord { 148 + b if b < t2 => Op::DeleteRecord { 92 149 collection: coll, 93 150 rkey, 94 151 }, 95 - b if b < c => Op::Compact, 96 - _ => Op::Checkpoint, 152 + b if b < t3 => Op::Compact, 153 + b if b < t4 => Op::Checkpoint, 154 + b if b < t5 => Op::AppendEvent { 155 + did_seed: DidSeed(rng.next_u32() % self.did_space.0.max(1)), 156 + event_kind: event_kind_for(rng.next_u32()), 157 + payload_seed: PayloadSeed(rng.next_u32()), 158 + }, 159 + b if b < t6 => Op::SyncEventLog, 160 + b if b < t7 => Op::RunRetention { 161 + max_age_secs: RetentionSecs( 162 + rng.next_u32() % self.retention_max_secs.0.max(1), 163 + ), 164 + }, 165 + b if b < t8 => Op::ReadRecord { 166 + collection: coll, 167 + rkey, 168 + }, 169 + _ => Op::ReadBlock { 170 + value_seed: ValueSeed(rng.next_u32()), 171 + }, 97 172 } 98 173 }) 99 174 .collect(); 100 175 OpStream::from_vec(ops) 176 + } 177 + } 178 + 179 + fn event_kind_for(n: u32) -> EventKind { 180 + match n & 0b11 { 181 + 0 => EventKind::Commit, 182 + 1 => EventKind::Identity, 183 + 2 => EventKind::Account, 184 + _ => EventKind::Sync, 101 185 } 102 186 } 103 187