very fast at protocol indexer with flexible filtering, xrpc queries, cursor-backed event stream, and more, built on fjall
rust fjall at-protocol atproto indexer
58
fork

Configure Feed

Select the types of activity you want to include in your feed.

[db] rework ephemeral mode, allow setting any ttl

dawn b7be2730 6d735f19

+194 -212
+1
Cargo.lock
··· 1516 1516 "serde_ipld_dagcbor", 1517 1517 "serde_json", 1518 1518 "serde_urlencoded", 1519 + "sha2", 1519 1520 "smol_str", 1520 1521 "tempfile", 1521 1522 "thiserror 2.0.18",
+1
Cargo.toml
··· 50 50 rustls = { version = "0.23", features = ["aws-lc-rs"] } 51 51 tokio-tungstenite = { version = "0.28.0", features = ["rustls-tls-native-roots"] } 52 52 multibase = "0.9.2" 53 + sha2 = "0.10.9" 53 54 54 55 [dev-dependencies] 55 56 tempfile = "3.26.0"
+4 -3
README.md
··· 46 46 | `RELAY_HOST` | `wss://relay.fire.hose.cam/` | URL of the relay. | 47 47 | `RELAY_HOSTS` | | comma-separated list of relay URLs. if unset, falls back to `RELAY_HOST`. | 48 48 | `PLC_URL` | `https://plc.wtf`, `https://plc.directory` if full network | base URL(s) of the PLC directory (comma-separated for multiple). | 49 - | `EPHEMERAL` | `false` | if enabled, no records are stored. events are only stored up to 10 minutes for playback. | 49 + | `EPHEMERAL` | `false` | if enabled, no records are stored. events are deleted after a certain duration (`EPHEMERAL_TTL`). | 50 + | `EPHEMERAL_TTL` | `60min` | decides after how long events should be deleted. | 50 51 | `FULL_NETWORK` | `false` | if `true`, discovers and indexes all repositories in the network. | 51 52 | `FILTER_SIGNALS` | | comma-separated list of NSID patterns to use for the filter (e.g. `app.bsky.feed.post,app.bsky.graph.*`). | 52 53 | `FILTER_COLLECTIONS` | | comma-separated list of NSID patterns to use for the collections filter. | ··· 54 55 | `FIREHOSE_WORKERS` | `8` (`24` if full network) | number of concurrent workers for firehose events. | 55 56 | `BACKFILL_CONCURRENCY_LIMIT` | `16` (`64` if full network) | maximum number of concurrent backfill tasks. | 56 57 | `VERIFY_SIGNATURES` | `full` | signature verification level: `full`, `backfill-only`, or `none`. | 57 - | `CURSOR_SAVE_INTERVAL` | `3` | interval (in seconds) to save the firehose cursor. | 58 - | `REPO_FETCH_TIMEOUT` | `300` | timeout (in seconds) for fetching repositories. | 58 + | `CURSOR_SAVE_INTERVAL` | `3sec` | interval (in seconds) to save the firehose cursor. | 59 + | `REPO_FETCH_TIMEOUT` | `5min` | timeout (in seconds) for fetching repositories. | 59 60 | `CACHE_SIZE` | `256` | size of the database cache in MB. | 60 61 | `IDENTITY_CACHE_SIZE` | `100000` | number of identity entries to cache. | 61 62 | `API_PORT` | `3000` | port for the API server. |
+6 -4
src/api/debug.rs
··· 302 302 pub async fn handle_debug_ephemeral_ttl_tick( 303 303 State(state): State<Arc<AppState>>, 304 304 ) -> Result<StatusCode, StatusCode> { 305 - tokio::task::spawn_blocking(move || crate::db::ephemeral::ephemeral_ttl_tick(&state.db)) 306 - .await 307 - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? 308 - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 305 + tokio::task::spawn_blocking(move || { 306 + crate::db::ephemeral::ephemeral_ttl_tick(&state.db, &state.ephemeral_ttl) 307 + }) 308 + .await 309 + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? 310 + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 309 311 310 312 Ok(StatusCode::OK) 311 313 }
+70 -48
src/api/stream.rs
··· 1 1 use crate::api::AppState; 2 2 use crate::db::keys; 3 - use crate::types::{BroadcastEvent, MarshallableEvt, RecordEvt, StoredEvent}; 3 + use crate::types::{BroadcastEvent, MarshallableEvt, RecordEvt, StoredData, StoredEvent}; 4 4 use axum::Router; 5 5 use axum::routing::get; 6 6 use axum::{ ··· 10 10 }, 11 11 response::IntoResponse, 12 12 }; 13 + use cid::multihash::Multihash; 14 + use jacquard_common::types::cid::{ATP_CID_HASH, IpldCid}; 13 15 use jacquard_common::{CowStr, RawData}; 16 + use jacquard_repo::DAG_CBOR_CID_CODEC; 14 17 use miette::{Context, IntoDiagnostic}; 15 18 use serde::Deserialize; 19 + use sha2::{Digest, Sha256}; 16 20 use std::sync::Arc; 17 21 use tokio::sync::{broadcast, mpsc, oneshot}; 18 - use tracing::{error, info_span, warn}; 22 + use tracing::{error, info_span}; 19 23 20 24 pub fn router() -> Router<Arc<AppState>> { 21 25 Router::new().route("/", get(handle_stream)) ··· 121 125 collection, 122 126 rkey, 123 127 action, 124 - cid, 128 + data, 125 129 } = match rmp_serde::from_slice(&v) { 126 130 Ok(e) => e, 127 131 Err(e) => { ··· 130 134 } 131 135 }; 132 136 133 - let _entered = info_span!("record", cid = ?cid.map(|c| c.to_string())).entered(); 137 + let _entered = info_span!("record", data = ?data).entered(); 134 138 135 - let marshallable = { 136 - let mut record_val = None; 137 - let block_bytes = cid 138 - .map(|cid| { 139 - db.blocks 140 - .get(&keys::block_key(collection.as_str(), &cid.to_bytes())) 141 - }) 142 - .transpose(); 143 - match block_bytes { 144 - Ok(Some(Some(block_bytes))) => { 145 - match serde_ipld_dagcbor::from_slice::<RawData>(&block_bytes) { 146 - Ok(val) => record_val = serde_json::to_value(val).ok(), 147 - Err(e) => { 148 - error!(err = %e, "cant parse block, must be corrupted?"); 149 - return; 139 + let record = match data { 140 + StoredData::Ptr(cid) => { 141 + let block = db 142 + .blocks 143 + .get(&keys::block_key(collection.as_str(), &cid.to_bytes())); 144 + match block { 145 + Ok(Some(bytes)) => { 146 + match serde_ipld_dagcbor::from_slice::<RawData>(&bytes) { 147 + Ok(val) => Some(( 148 + cid, 149 + serde_json::to_value(val) 150 + .expect("that cbor raw data is valid json"), 151 + )), 152 + Err(e) => { 153 + error!(err = %e, "cant parse block, must be corrupted?"); 154 + return; 155 + } 150 156 } 151 157 } 158 + Ok(None) => { 159 + error!("block not found? this is a bug!!"); 160 + continue; 161 + } 162 + Err(e) => { 163 + error!(err = %e, "can't get block"); 164 + crate::db::check_poisoned(&e); 165 + return; 166 + } 152 167 } 153 - Ok(Some(None)) => { 154 - warn!( 155 - "block not found, possibly repo deleted but events not evicted yet?" 156 - ); 157 - continue; 158 - } 159 - Ok(None) => { 160 - // cid not found, its ok, delete event 161 - } 162 - Err(e) => { 163 - error!(err = %e, "can't get block"); 164 - crate::db::check_poisoned(&e); 165 - return; 168 + } 169 + StoredData::Block(block) => { 170 + let digest = Sha256::digest(&block); 171 + let hash = 172 + Multihash::wrap(ATP_CID_HASH, &digest).expect("that its valid sha256"); 173 + let cid = IpldCid::new_v1(DAG_CBOR_CID_CODEC, hash); 174 + match serde_ipld_dagcbor::from_slice::<RawData>(&block) { 175 + Ok(val) => Some(( 176 + cid, 177 + serde_json::to_value(val) 178 + .expect("that cbor raw data is valid json"), 179 + )), 180 + Err(e) => { 181 + error!(err = %e, "cant parse block, must be corrupted?"); 182 + return; 183 + } 166 184 } 167 185 } 186 + StoredData::Nothing => None, 187 + }; 168 188 169 - MarshallableEvt { 170 - id, 171 - event_type: "record".into(), 172 - record: Some(RecordEvt { 173 - live, 174 - did: did.to_did(), 175 - rev: CowStr::Owned(rev.to_tid().into()), 176 - collection, 177 - rkey: CowStr::Owned(rkey.to_smolstr().into()), 178 - action: CowStr::Borrowed(action.as_str()), 179 - record: record_val, 180 - cid: cid.map(|c| jacquard_common::types::cid::Cid::ipld(c).into()), 181 - }), 182 - identity: None, 183 - account: None, 184 - } 189 + let (cid, record) = record 190 + .map(|(c, r)| (Some(c), Some(r))) 191 + .unwrap_or((None, None)); 192 + let marshallable = MarshallableEvt { 193 + id, 194 + event_type: "record".into(), 195 + record: Some(RecordEvt { 196 + live, 197 + did: did.to_did(), 198 + rev: CowStr::Owned(rev.to_tid().into()), 199 + collection, 200 + rkey: CowStr::Owned(rkey.to_smolstr().into()), 201 + action: CowStr::Borrowed(action.as_str()), 202 + record, 203 + cid: cid.map(|c| jacquard_common::types::cid::Cid::ipld(c).into()), 204 + }), 205 + identity: None, 206 + account: None, 185 207 }; 186 208 187 209 let json_str = match serde_json::to_string(&marshallable) {
+4 -10
src/backfill/mod.rs
··· 6 6 use crate::state::AppState; 7 7 use crate::types::{ 8 8 AccountEvt, BroadcastEvent, GaugeState, RepoState, RepoStatus, ResyncErrorKind, ResyncState, 9 - StoredEvent, 9 + StoredData, StoredEvent, 10 10 }; 11 11 12 12 use fjall::Slice; ··· 624 624 625 625 let cid_raw = cid.to_bytes(); 626 626 let block_key = Slice::from(keys::block_key(collection, &cid_raw)); 627 - batch.insert(&app_state.db.blocks, block_key.clone(), val.as_ref()); 628 627 if !ephemeral { 628 + batch.insert(&app_state.db.blocks, block_key.clone(), val.as_ref()); 629 629 batch.insert(&app_state.db.records, db_key, cid_raw); 630 - } else { 631 - // ephemeral: track refcount for this event's CID 632 - let mut entry = app_state.db.block_refcounts.entry_sync(block_key).or_insert(0); 633 - *entry += 1; 634 630 } 635 631 636 632 added_blocks += 1; ··· 647 643 collection: CowStr::Borrowed(collection), 648 644 rkey, 649 645 action, 650 - cid: Some(cid_obj.to_ipld().expect("valid cid")), 646 + data: ephemeral.then_some(StoredData::Block(val)).unwrap_or_else(|| StoredData::Ptr(cid_obj.to_ipld().expect("valid cid"))), 651 647 }; 652 648 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 653 649 batch.insert(&app_state.db.events, keys::event_key(event_id), bytes); 654 - 655 650 656 651 count += 1; 657 652 } ··· 668 663 keys::record_key(&did, &collection, &rkey), 669 664 ); 670 665 671 - 672 666 let event_id = app_state.db.next_event_id.fetch_add(1, Ordering::SeqCst); 673 667 let evt = StoredEvent { 674 668 live: false, ··· 677 671 collection: CowStr::Borrowed(&collection), 678 672 rkey, 679 673 action: DbAction::Delete, 680 - cid: None, 674 + data: StoredData::Nothing, 681 675 }; 682 676 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?; 683 677 batch.insert(&app_state.db.events, keys::event_key(event_id), bytes);
+3
src/config.rs
··· 70 70 pub plc_urls: Vec<Url>, 71 71 pub full_network: bool, 72 72 pub ephemeral: bool, 73 + pub ephemeral_ttl: Duration, 73 74 pub cursor_save_interval: Duration, 74 75 pub repo_fetch_timeout: Duration, 75 76 pub api_port: u16, ··· 160 161 let repo_fetch_timeout = cfg!("REPO_FETCH_TIMEOUT", 300, sec); 161 162 162 163 let ephemeral: bool = cfg!("EPHEMERAL", false); 164 + let ephemeral_ttl = cfg!("EPHEMERAL_TTL", 60 * 60, sec); 163 165 let database_path = cfg!("DATABASE_PATH", "./hydrant.db"); 164 166 let cache_size = cfg!("CACHE_SIZE", 256u64); 165 167 let data_compression = cfg!("DATA_COMPRESSION", Compression::Lz4); ··· 239 241 relays: relay_hosts, 240 242 plc_urls, 241 243 ephemeral, 244 + ephemeral_ttl, 242 245 full_network, 243 246 cursor_save_interval, 244 247 repo_fetch_timeout,
+5 -61
src/db/ephemeral.rs
··· 1 - //! ephemeral mode block lifecycle: in-memory refcounting and TTL-based event expiry. 2 - //! 3 - //! ## model 4 - //! 5 - //! every event that references a block CID holds one refcount entry. the TTL worker 6 - //! decrements the count when the event expires. when the count hits zero the block is 7 - //! deleted inline in the same batch as the event deletion. 8 - //! 9 - //! ## correctness 10 - //! 11 - //! - refcounts are rebuilt from `db.events` on startup before the server accepts requests. 12 - //! - shared CIDs are handled correctly: two events referencing the same block each 13 - //! increment the counter; the block is deleted only when the second one expires. 14 - 15 1 use crate::db::{Db, keys}; 16 - use crate::types::StoredEvent; 17 - use fjall::Slice; 18 2 use miette::{IntoDiagnostic, WrapErr}; 19 3 use std::sync::Arc; 20 4 use std::sync::atomic::Ordering; 21 5 use std::time::Duration; 22 - use tracing::{debug, error, info, trace}; 23 - 24 - pub const EVENT_TTL_SECS: u64 = 60 * 10; 25 - 26 - /// rebuilds `db.block_refcounts` by scanning all stored events. 27 - /// must be called on startup in ephemeral mode before accepting requests. 28 - pub fn ephemeral_startup_load_refcounts(db: &Db) -> miette::Result<()> { 29 - info!("rebuilding block refcounts from events (ephemeral mode)"); 30 - for guard in db.events.iter() { 31 - let v = guard.value().into_diagnostic()?; 32 - let evt = rmp_serde::from_slice::<StoredEvent>(&v).into_diagnostic()?; 33 - let Some(cid) = evt.cid else { continue }; 34 - let block_key = Slice::from(keys::block_key(evt.collection.as_str(), &cid.to_bytes())); 35 - let mut entry = db.block_refcounts.entry_sync(block_key).or_insert(0); 36 - *entry += 1; 37 - } 38 - trace!("ephemeral block refcounts ready"); 39 - Ok(()) 40 - } 6 + use tracing::{debug, error, info}; 41 7 42 8 pub fn ephemeral_ttl_worker(state: Arc<crate::state::AppState>) { 43 9 info!("ephemeral TTL worker started"); 44 10 loop { 45 11 std::thread::sleep(Duration::from_secs(60)); 46 - if let Err(e) = ephemeral_ttl_tick(&state.db) { 12 + if let Err(e) = ephemeral_ttl_tick(&state.db, &state.ephemeral_ttl) { 47 13 error!(err = %e, "ephemeral TTL tick failed"); 48 14 } 49 15 } 50 16 } 51 17 52 - pub fn ephemeral_ttl_tick(db: &Db) -> miette::Result<()> { 18 + pub fn ephemeral_ttl_tick(db: &Db, ttl: &Duration) -> miette::Result<()> { 53 19 let now = chrono::Utc::now().timestamp() as u64; 54 - let cutoff_ts = now.saturating_sub(EVENT_TTL_SECS); 20 + let cutoff_ts = now.saturating_sub(ttl.as_secs()); 55 21 56 22 // write current watermark 57 23 let current_event_id = db.next_event_id.load(Ordering::SeqCst); ··· 90 56 let mut pruned = 0usize; 91 57 92 58 for guard in db.events.range(..cutoff_key_events) { 93 - let (k, v) = guard.into_inner().into_diagnostic()?; 94 - let evt = rmp_serde::from_slice::<StoredEvent>(&v).into_diagnostic()?; 95 - 96 - if let Some(cid) = evt.cid { 97 - let block_key = Slice::from(keys::block_key(evt.collection.as_str(), &cid.to_bytes())); 98 - 99 - let remove_block = { 100 - let count = db 101 - .block_refcounts 102 - .entry_sync(block_key.clone()) 103 - .and_modify(|c| { 104 - *c = c.saturating_sub(1); 105 - }) 106 - .or_default(); 107 - *count == 0 108 - }; 109 - 110 - if remove_block { 111 - db.block_refcounts.remove_sync(&block_key); 112 - batch.remove(&db.blocks, block_key); 113 - } 114 - } 115 - 59 + let k = guard.key().into_diagnostic()?; 116 60 batch.remove(&db.events, k); 117 61 pruned += 1; 118 62 }
+23 -10
src/db/mod.rs
··· 48 48 pub counts: Keyspace, 49 49 pub filter: Keyspace, 50 50 pub crawler: Keyspace, 51 - // only meaningful in ephemeral mode; empty and unused in non-ephemeral 52 - pub block_refcounts: scc::HashMap<Slice, u32>, 53 51 pub event_tx: broadcast::Sender<BroadcastEvent>, 54 52 pub next_event_id: Arc<AtomicU64>, 55 53 pub counts_map: HashMap<SmolStr, u64>, ··· 226 224 .data_block_compression_policy(CompressionPolicy::disabled()) 227 225 .data_block_restart_interval_policy(RestartIntervalPolicy::all(4)), 228 226 )?; 227 + // this is used in non-ephemeral mode 229 228 let blocks = open_ks( 230 229 "blocks", 231 230 opts() ··· 296 295 // eg. by many different repos and different records etc. 297 296 // since its sequential we should still go with bigger block size though 298 297 // backfills will be sequential though... 299 - .data_block_size_policy(BlockSizePolicy::new([kb(16), kb(64)])) 298 + .data_block_size_policy( 299 + cfg.ephemeral 300 + .then(|| BlockSizePolicy::new([kb(64), kb(128), kb(256)])) 301 + .unwrap_or_else(|| BlockSizePolicy::new([kb(16), kb(64)])), 302 + ) 300 303 // we are streaming the new events to consumers so we dont want to compress them 301 - .data_block_compression_policy(CompressionPolicy::new([ 302 - CompressionType::None, 303 - get_compression("events", 3), 304 - get_compression("events", 3), 305 - get_compression("events", 5), 306 - ])) 304 + .data_block_compression_policy( 305 + cfg.ephemeral 306 + .then(|| { 307 + CompressionPolicy::new([ 308 + CompressionType::None, 309 + get_compression("events", 3), 310 + ]) 311 + }) 312 + .unwrap_or_else(|| { 313 + CompressionPolicy::new([ 314 + CompressionType::None, 315 + get_compression("events", 3), 316 + get_compression("events", 3), 317 + get_compression("events", 5), 318 + ]) 319 + }), 320 + ) 307 321 // ids are int, we can prefix truncate a lot 308 322 .data_block_restart_interval_policy(RestartIntervalPolicy::new([64, 128])), 309 323 )?; ··· 400 414 counts, 401 415 filter, 402 416 crawler, 403 - block_refcounts: scc::HashMap::default(), 404 417 event_tx, 405 418 counts_map, 406 419 next_event_id: Arc::new(AtomicU64::new(last_id + 1)),
+3 -5
src/main.rs
··· 79 79 let state = Arc::new(state); 80 80 81 81 if cfg.ephemeral { 82 - db::ephemeral::ephemeral_startup_load_refcounts(&state.db)?; 83 - 84 - let state_ttl = state.clone(); 82 + let state = state.clone(); 85 83 std::thread::Builder::new() 86 - .name("ephemeral-ttl".into()) 87 - .spawn(move || db::ephemeral::ephemeral_ttl_worker(state_ttl)) 84 + .name("ephemeral-gc".into()) 85 + .spawn(move || db::ephemeral::ephemeral_ttl_worker(state)) 88 86 .into_diagnostic()?; 89 87 } 90 88
+20 -12
src/ops.rs
··· 18 18 use crate::db::{self, Db, keys, ser_repo_state}; 19 19 use crate::filter::FilterConfig; 20 20 use crate::ingest::stream::Commit; 21 + use crate::types::StoredData; 21 22 use crate::types::{ 22 23 AccountEvt, BroadcastEvent, IdentityEvt, MarshallableEvt, RepoState, RepoStatus, ResyncState, 23 24 StoredEvent, ··· 282 283 let event_id = db.next_event_id.fetch_add(1, Ordering::SeqCst); 283 284 284 285 let action = DbAction::try_from(op.action.as_str())?; 285 - match action { 286 + let block = match action { 286 287 DbAction::Create | DbAction::Update => { 287 288 let Some(cid) = &op.cid else { 288 289 continue; ··· 299 300 }; 300 301 let cid_raw = cid_ipld.to_bytes(); 301 302 let block_key = Slice::from(keys::block_key(collection, &cid_raw)); 302 - batch.insert(&db.blocks, block_key.clone(), bytes.to_vec()); 303 - blocks_count += 1; 304 303 304 + blocks_count += 1; 305 305 if !ephemeral { 306 + batch.insert(&db.blocks, block_key.clone(), bytes.as_ref()); 306 307 batch.insert(&db.records, db_key.clone(), cid_raw); 307 308 // accumulate counts 308 309 if action == DbAction::Create { 309 310 records_delta += 1; 310 311 *collection_deltas.entry(collection).or_default() += 1; 311 312 } 313 + None 312 314 } else if action == DbAction::Create || action == DbAction::Update { 313 - // ephemeral: track refcount for this event's CID so the TTL worker can 314 - // delete the block when the last referencing event expires 315 - let mut entry = db 316 - .block_refcounts 317 - .entry_sync(block_key.clone()) 318 - .or_insert(0); 319 - *entry += 1; 315 + Some(bytes.clone()) 316 + } else { 317 + unreachable!("we tested if we are in create or update action") 320 318 } 321 319 } 322 320 DbAction::Delete => { ··· 327 325 records_delta -= 1; 328 326 *collection_deltas.entry(collection).or_default() -= 1; 329 327 } 328 + 329 + None 330 330 } 331 - } 331 + }; 332 332 333 333 let evt = StoredEvent { 334 334 live: true, ··· 337 337 collection: CowStr::Borrowed(collection), 338 338 rkey, 339 339 action, 340 - cid: op.cid.as_ref().map(|c| c.to_ipld().expect("valid cid")), 340 + data: block 341 + .map(StoredData::Block) 342 + .or_else(|| { 343 + op.cid 344 + .as_ref() 345 + .map(|c| c.to_ipld().expect("valid cid")) 346 + .map(StoredData::Ptr) 347 + }) 348 + .unwrap_or(StoredData::Nothing), 341 349 }; 342 350 343 351 let bytes = rmp_serde::to_vec(&evt).into_diagnostic()?;
+3 -4
src/state.rs
··· 1 - use std::collections::HashMap; 2 1 use std::sync::atomic::AtomicI64; 2 + use std::{collections::HashMap, time::Duration}; 3 3 4 4 use miette::Result; 5 5 use tokio::sync::{Notify, watch}; ··· 18 18 pub filter: FilterHandle, 19 19 pub relay_cursors: HashMap<Url, AtomicI64>, 20 20 pub backfill_notify: Notify, 21 - /// Controls whether the crawler is running. Receivers are held by crawler tasks. 22 21 pub crawler_enabled: watch::Sender<bool>, 23 - /// Controls whether firehose ingestion is running. Receivers are held by ingestor tasks. 24 22 pub firehose_enabled: watch::Sender<bool>, 25 - /// Controls whether the backfill worker picks up new tasks. Receiver is held by the backfill worker. 26 23 pub backfill_enabled: watch::Sender<bool>, 24 + pub ephemeral_ttl: Duration, 27 25 } 28 26 29 27 impl AppState { ··· 59 57 crawler_enabled, 60 58 firehose_enabled, 61 59 backfill_enabled, 60 + ephemeral_ttl: config.ephemeral_ttl.clone(), 62 61 }) 63 62 } 64 63
+48 -3
src/types.rs
··· 1 - use std::fmt::Display; 1 + use std::fmt::{Debug, Display}; 2 2 3 3 use jacquard_common::types::cid::IpldCid; 4 4 use jacquard_common::types::string::Did; ··· 218 218 pub status: Option<CowStr<'i>>, 219 219 } 220 220 221 + use jacquard_common::bytes::Bytes; 222 + 223 + #[derive(Serialize, Deserialize, Clone)] 224 + pub enum StoredData { 225 + Nothing, 226 + Ptr(IpldCid), 227 + #[serde(with = "serde_bytes_squared")] 228 + Block(Bytes), 229 + } 230 + 231 + impl StoredData { 232 + pub fn is_nothing(&self) -> bool { 233 + matches!(self, StoredData::Nothing) 234 + } 235 + } 236 + 237 + impl Default for StoredData { 238 + fn default() -> Self { 239 + Self::Nothing 240 + } 241 + } 242 + 243 + impl Debug for StoredData { 244 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 245 + match self { 246 + Self::Nothing => f.write_str("nothing"), 247 + Self::Block(_) => f.write_str("<block>"), 248 + Self::Ptr(cid) => write!(f, "{cid}"), 249 + } 250 + } 251 + } 252 + 221 253 #[derive(Debug, Serialize, Deserialize, Clone)] 222 254 #[serde(bound(deserialize = "'i: 'de"))] 223 255 pub struct StoredEvent<'i> { ··· 231 263 pub rkey: DbRkey, 232 264 pub action: DbAction, 233 265 #[serde(default)] 234 - #[serde(skip_serializing_if = "Option::is_none")] 235 - pub cid: Option<IpldCid>, 266 + #[serde(skip_serializing_if = "StoredData::is_nothing")] 267 + pub data: StoredData, 268 + } 269 + 270 + mod serde_bytes_squared { 271 + use jacquard_common::bytes::Bytes; 272 + use serde::{Deserialize, Deserializer, Serializer}; 273 + 274 + pub fn serialize<S: Serializer>(v: impl AsRef<[u8]>, s: S) -> Result<S::Ok, S::Error> { 275 + s.serialize_bytes(serde_bytes::Bytes::new(v.as_ref())) 276 + } 277 + 278 + pub fn deserialize<'de, D: Deserializer<'de>>(d: D) -> Result<Bytes, D::Error> { 279 + serde_bytes::ByteBuf::deserialize(d).map(|b| b.into_vec().into()) 280 + } 236 281 } 237 282 238 283 #[derive(Debug, PartialEq, Eq, Clone, Copy)]
+3 -52
tests/ephemeral_gc_test.nu tests/ephemeral_gc.nu
··· 13 13 print $"database path: ($db_path)" 14 14 15 15 let binary = build-hydrant 16 - let instance = (with-env { HYDRANT_EPHEMERAL: "true" } { 16 + let instance = (with-env { HYDRANT_EPHEMERAL: "true", HYDRANT_EPHEMERAL_TTL: "60min" } { 17 17 start-hydrant $binary $db_path $port 18 18 }) 19 19 ··· 34 34 35 35 try { kill --force $instance.pid } 36 36 sleep 2sec 37 - } 38 - 39 - def check-block-count [debug_url: string, expected: int] { 40 - let response = (http get -f -e $"($debug_url)/debug/iter?partition=blocks&limit=1000000") 41 - if $response.status != 200 { 42 - error make {msg: $"FAILED: debug/iter returned ($response.status)"} 43 - } 44 - let count = ($response.body.items | length) 45 - if $count != $expected { 46 - error make {msg: $"FAILED: expected ($expected) blocks, found ($count)"} 47 - } 48 - print $"block count verified: ($count)" 49 37 } 50 38 51 39 def trigger-ttl-tick [debug_url: string] { ··· 57 45 print "TTL tick complete" 58 46 } 59 47 60 - # in ephemeral mode, records are never written to the records keyspace, so 61 - # common.nu's wait-for-backfill (which checks records > 0) hangs forever. 62 - # poll pending == 0 and blocks > 0 instead. 63 - def wait-for-ephemeral-backfill [url: string, debug_url: string] { 64 - print "waiting for ephemeral backfill to complete..." 65 - for i in 1..120 { 66 - let stats = (http get $"($url)/stats?accurate=true").counts 67 - let pending = ($stats.pending | into int) 68 - let blocks = ((http get -f -e $"($debug_url)/debug/iter?partition=blocks&limit=1").body.items | length) 69 - 70 - print $"[($i)/120] pending: ($pending), has_blocks: ($blocks > 0)" 71 - 72 - if ($pending == 0) and ($blocks > 0) { 73 - print "ephemeral backfill complete." 74 - return true 75 - } 76 - sleep 1sec 77 - } 78 - false 79 - } 80 - 81 48 def main [] { 82 49 let repo1 = "did:web:guestbook.gaze.systems" 83 50 ··· 86 53 print $"adding repo ($repo1)..." 87 54 http put -t application/json $"($url)/repos" [{ did: ($repo1) }] 88 55 89 - if not (wait-for-ephemeral-backfill $url $debug_url) { 56 + if not (wait-for-backfill $url) { 90 57 error make {msg: "backfill did not complete"} 91 58 } 92 59 93 60 let event_count = ((http get -f -e $"($debug_url)/debug/iter?partition=events&limit=1000").body.items | length) 94 61 print $"found ($event_count) events after backfill" 95 - 96 - let before_blocks = ((http get -f -e $"($debug_url)/debug/iter?partition=blocks&limit=1000000").body.items | length) 97 62 98 63 # immediately trigger TTL tick, watermarks are too recent, nothing should be pruned 99 64 trigger-ttl-tick $debug_url ··· 103 68 error make {msg: $"FAILED: expected ($event_count) events after TTL tick, got ($after_events)"} 104 69 } 105 70 print "event count unchanged after TTL tick (no events eligible)" 106 - 107 - # blocks must also be unchanged 108 - check-block-count $debug_url $before_blocks 109 - print "block count unchanged after TTL tick (correct)" 110 71 } 111 72 112 73 # plant a past watermark, trigger the real TTL path, and verify all events and blocks are gone ··· 114 75 print $"adding repo ($repo1)..." 115 76 http put -t application/json $"($url)/repos" [{ did: ($repo1) }] 116 77 117 - if not (wait-for-ephemeral-backfill $url $debug_url) { 78 + if not (wait-for-backfill $url) { 118 79 error make {msg: "backfill did not complete"} 119 80 } 120 81 ··· 125 86 } 126 87 print $"found ($event_count) events after backfill" 127 88 128 - let block_count = ((http get -f -e $"($debug_url)/debug/iter?partition=blocks&limit=1000000").body.items | length) 129 - if $block_count == 0 { 130 - error make {msg: "FAILED: expected blocks after backfill, found none"} 131 - } 132 - print $"found ($block_count) blocks after backfill" 133 - 134 89 # get the highest event id so we can set the cutoff just above it 135 90 let max_event_id = ($events | each { |item| ($item | first | into int) } | math max) 136 91 print $"max event id: ($max_event_id)" ··· 154 109 error make {msg: $"FAILED: expected 0 events after TTL expiry, got ($remaining_events)"} 155 110 } 156 111 print "all events pruned" 157 - 158 - # all blocks should be deleted (refcounts all hit zero) 159 - check-block-count $debug_url 0 160 - print "all blocks deleted" 161 112 } 162 113 163 114 print "all ephemeral gc tests passed!"