···9191}
92929393/// Parse the collection NSID from a full cbr key given the prefix length.
9494-fn cbr_parse_collection(key: &[u8], prefix_len: usize) -> Option<Nsid<'static>> {
9595- // TODO: we should error on unparseable!
9696- let nsid_str = std::str::from_utf8(key.get(prefix_len..)?).ok()?;
9797- Nsid::new_owned(nsid_str).ok()
9494+fn cbr_parse_collection(key: &[u8], prefix_len: usize) -> StorageResult<Nsid<'static>> {
9595+ let key_str = String::from_utf8_lossy(key);
9696+ let suffix = key.get(prefix_len..).ok_or(StorageError::Corrupt {
9797+ key: key_str.to_string(),
9898+ reason: "invalid prefix when parsing collection in cbr",
9999+ })?;
100100+ let nsid_str = std::str::from_utf8(suffix).map_err(|_| StorageError::Corrupt {
101101+ key: key_str.to_string(),
102102+ reason: "invalid string suffix for NSID in cbr suffix",
103103+ })?;
104104+ Nsid::new_owned(nsid_str).map_err(|_| StorageError::Corrupt {
105105+ key: key_str.to_string(),
106106+ reason: "invalid NSID in cbr suffix",
107107+ })
98108}
99109100110// ---------------------------------------------------------------------------
···143153/// Iterate over collections in the cbr index for `did`, starting after `cursor`.
144154///
145155/// Returns at most `limit` NSIDs.
146146-///
147147-/// TODO: we can fjall range to the collection's next-after-max (might even be
148148-/// exposed now?) or maybe use prefix + seek for the start?
149156pub fn scan_cbr(
150157 db: &DbRef,
151158 did: &Did<'_>,
···176183 {
177184 continue;
178185 }
179179- if let Some(col) = cbr_parse_collection(&k, prefix_len) {
180180- cols.push(col);
181181- }
186186+ let col = cbr_parse_collection(&k, prefix_len)?;
187187+ cols.push(col);
182188 }
183189 Ok(cols)
184190}
···325331/// rbc and cbr removes into `batch`. Use this when the index cleanup must be
326332/// atomic with other writes (e.g. a repo state update on tombstone).
327333/// Returns the number of collections removed.
334334+///
335335+/// TODO: also pass in a db snapshot??
328336pub fn remove_all_into(
329337 batch: &mut fjall::OwnedWriteBatch,
330338 db: &DbRef,
···338346 .collect::<fjall::Result<_>>()?;
339347 let prefix_len = prefix.len();
340348 for cbr_key in &collections {
341341- if let Some(col) = cbr_parse_collection(cbr_key, prefix_len) {
342342- batch.remove(&db.index_ks, rbc(col, did));
343343- }
349349+ let col = cbr_parse_collection(cbr_key, prefix_len)?;
350350+ batch.remove(&db.index_ks, rbc(col, did));
344351 batch.remove(&db.index_ks, cbr_key.as_slice());
345352 }
346353 Ok(collections.len())
···543550 let col = nsid("app.bsky.feed.post");
544551 let key = cbr(&d, col.clone());
545552 let prefix_len = cbr_prefix(&d).len();
546546- assert_eq!(cbr_parse_collection(&key, prefix_len), Some(col));
553553+ assert_eq!(cbr_parse_collection(&key, prefix_len), Ok(col));
547554 }
548555549556 #[test]
+16-4
src/storage/meta.rs
···1212use std::sync::{Arc, Mutex};
13131414use cardinality_estimator_safe::{Element, Sketch};
1515-use tracing::error;
1515+use tracing::{error, trace};
16161717use super::{DbRef, PREFIX_META, error::StorageResult};
1818···137137}
138138139139fn read_sketch(ks: &fjall::Keyspace, suffix: &[u8]) -> Sk {
140140- let Some(bytes) = ks.get(full_key(suffix)).ok().flatten() else {
140140+ let bytes = match ks.get(full_key(suffix)) {
141141+ Ok(b) => b,
142142+ Err(e) => {
143143+ error!(key = ?suffix, error = %e, "failed to read sketch from db, using fresh sketch");
144144+ return Sk::default();
145145+ }
146146+ };
147147+ let Some(bytes) = bytes else {
148148+ // normal just didn't exist yet (should only happen once per db)
149149+ trace!(key = ?suffix, "sketch not found in db, initializing new");
141150 return Sk::default();
142151 };
143152 match postcard::from_bytes(bytes.as_ref()) {
144144- Ok(sk) => sk,
153153+ Ok(sk) => {
154154+ trace!(key = ?suffix, "successfully got sketch from db");
155155+ sk
156156+ }
145157 Err(e) => {
146146- error!(key = ?suffix, error = %e, "failed to deserialize cardinality sketch; using fresh sketch");
158158+ error!(key = ?suffix, error = %e, "failed to deserialize sketch, using fresh sketch");
147159 Sk::default()
148160 }
149161 }
+7-4
src/sync/firehose/commit_event.rs
···170170 let rev = commit.rev.clone();
171171 let since = commit.since.clone();
172172 // Extract the raw CID bytes from the event's prevData field (sync1.1).
173173- let incoming_prev_data: Option<Vec<u8>> = commit
173173+ let incoming_prev_data = commit
174174 .prev_data
175175- .as_ref()
176176- .and_then(|cl| cl.to_ipld().ok())
177177- .map(|cid| cid.to_bytes());
175175+ .map(|cid| {
176176+ cid.to_ipld()
177177+ .map_err(|e| crate::error::Error::Other(format!("bad CID format: {e}")))
178178+ })
179179+ .transpose()?
180180+ .map(|ipld_cid| ipld_cid.to_bytes());
178181179182 tokio::task::spawn_blocking(move || {
180183 process_blocking(