lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

storage and handler errors

phil 4e828091 534ca250

+149 -108
+1
Cargo.lock
··· 2233 2233 "thiserror 2.0.18", 2234 2234 "tokio", 2235 2235 "tokio-util", 2236 + "tracing", 2236 2237 "wiremock", 2237 2238 ] 2238 2239
+2
Cargo.toml
··· 19 19 thiserror = "2.0.18" 20 20 tokio = { version = "1.49.0", features = ["full"] } 21 21 tokio-util = { version = "0.7", features = ["rt"] } 22 + tracing = "0.1.44" 23 + serde_json = "1" 22 24 23 25 [[example]] 24 26 name = "list-repo-collections"
+2 -1
src/error.rs
··· 1 + use crate::storage::StorageError; 1 2 use thiserror::Error; 2 3 3 4 #[derive(Debug, Error)] 4 5 pub enum Error { 5 6 #[error("database: {0}")] 6 - Db(#[from] fjall::Error), 7 + Storage(#[from] StorageError), 7 8 #[error("I/O: {0}")] 8 9 Io(#[from] std::io::Error), 9 10 #[error("task panicked: {0}")]
+50 -20
src/server/handler.rs
··· 5 5 //! 6 6 //! TODO: xrpc-style error handling 7 7 8 - use axum::{Json, extract::State, http::StatusCode}; 8 + use axum::{ 9 + Json, 10 + extract::State, 11 + http::StatusCode, 12 + response::{IntoResponse, Response}, 13 + }; 9 14 use jacquard_api::com_atproto::sync::{ 10 15 get_repo_status::{GetRepoStatusOutput, GetRepoStatusRequest}, 11 16 list_repos_by_collection::{ListReposByCollectionOutput, ListReposByCollectionRequest, Repo}, 12 17 }; 13 18 use jacquard_axum::ExtractXrpc; 14 - use jacquard_common::{CowStr, types::string::Did}; 19 + use jacquard_common::types::string::Did; 20 + use serde_json::json; 15 21 16 - use crate::storage::{DbRef, repo::AccountStatus}; 22 + use crate::storage::{DbRef, error::StorageError}; 17 23 18 24 /// Handler for `GET /xrpc/com.atproto.sync.listReposByCollection`. 19 25 /// ··· 38 44 .map_err(|_| StatusCode::BAD_REQUEST)?; 39 45 40 46 // Scan one extra to detect whether a next page exists. 41 - let mut dids = crate::storage::index::scan_rbc(&db, req.collection, cursor, limit + 1) 47 + let mut dids = crate::storage::list_by::scan_rbc(&db, req.collection, cursor, limit + 1) 42 48 .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; 43 49 44 50 // If we got more than limit results, set the next-page cursor to the last ··· 66 72 })) 67 73 } 68 74 75 + pub enum GetRepoStatusError { 76 + StorageError, 77 + RepoNotFound, 78 + } 79 + 80 + impl From<StorageError> for GetRepoStatusError { 81 + fn from(e: StorageError) -> Self { 82 + tracing::error!("Storage error: {e:#}"); 83 + Self::StorageError 84 + } 85 + } 86 + 87 + impl IntoResponse for GetRepoStatusError { 88 + fn into_response(self) -> Response { 89 + match self { 90 + GetRepoStatusError::StorageError => ( 91 + StatusCode::INTERNAL_SERVER_ERROR, 92 + Json(json!({ 93 + "error": "InternalError", 94 + "message": "Storage issue", 95 + })), 96 + ), 97 + GetRepoStatusError::RepoNotFound => ( 98 + StatusCode::BAD_REQUEST, 99 + Json(json!({ 100 + "error": "RepoNotFound", 101 + "message": "the requested repo does not exist or has not been indexed.", 102 + })), 103 + ), 104 + } 105 + .into_response() 106 + } 107 + } 108 + 69 109 /// Handler for `GET /xrpc/com.atproto.sync.getRepoStatus`. 70 110 /// 71 111 /// Returns the active/status of the given repo. Returns 404 (XRPC RepoNotFound) ··· 74 114 /// `active` reflects whether the account is usable. `status` carries the 75 115 /// reason when inactive ("takendown", "suspended", "deactivated", "deleted"). 76 116 /// `rev` is currently omitted (TODO: store latest rev in RepoRecord). 77 - /// 78 - /// TODO: 400 xrpc-compatible error instead of 404 79 - /// jacquard-axum seems slightly limiting here atm 80 117 pub async fn get_repo_status( 81 118 State(db): State<DbRef>, 82 119 ExtractXrpc(req): ExtractXrpc<GetRepoStatusRequest>, 83 - ) -> Result<Json<GetRepoStatusOutput<'static>>, StatusCode> { 84 - let record = crate::storage::repo::get(&db, &req.did) 85 - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)? 86 - .ok_or(StatusCode::NOT_FOUND)?; 87 - 88 - let active = matches!(record.status, AccountStatus::Active); 89 - let status: Option<CowStr<'static>> = match record.status { 90 - AccountStatus::Active => None, 91 - s => Some(s.as_str().to_owned().into()), 92 - }; 120 + ) -> Result<Json<GetRepoStatusOutput<'static>>, GetRepoStatusError> { 121 + let (info, prev) = 122 + crate::storage::repo::get(&db, &req.did)?.ok_or(GetRepoStatusError::RepoNotFound)?; 93 123 94 124 Ok(Json(GetRepoStatusOutput { 95 - active, 96 125 did: req.did, 97 - rev: None, // TODO: store and return the latest rev from RepoRecord 98 - status, 126 + rev: prev.map(|p| p.rev), 127 + active: info.status.is_active(), 128 + status: info.status.status().map(|s| s.to_owned().into()), 99 129 extra_data: None, 100 130 })) 101 131 }
+17 -12
src/storage/cursor.rs
··· 1 1 //! Firehose cursor and listRepos backfill progress tracking. 2 2 3 - use crate::error::Result; 4 - use crate::storage::{DbRef, keys}; 3 + use crate::storage::{ 4 + DbRef, 5 + error::{StorageError, StorageResult}, 6 + keys, 7 + }; 5 8 6 9 /// Read the last-saved firehose cursor for `host`. 7 10 /// 8 11 /// Returns `None` if no cursor has been persisted yet (start from live). 9 - pub fn get_subscribe_cursor(db: &DbRef, host: &str) -> Result<Option<u64>> { 12 + pub fn get_subscribe_cursor(db: &DbRef, host: &str) -> StorageResult<Option<u64>> { 10 13 let key = keys::subscribe_cursor(host); 11 - match db.cursors.get(key)? { 14 + match db.ks.get(key)? { 12 15 None => Ok(None), 13 16 Some(v) => { 14 - let bytes: [u8; 8] = v 15 - .as_ref() 16 - .try_into() 17 - .map_err(|_| crate::error::Error::Other("corrupt cursor value".into()))?; 17 + let bytes: [u8; 8] = v.as_ref().try_into().map_err(|e| { 18 + StorageError::CorruptedCursor(format!("array from slice failed: {e}")) 19 + })?; 18 20 Ok(Some(u64::from_be_bytes(bytes))) 19 21 } 20 22 } 21 23 } 22 24 23 25 /// Persist the firehose cursor for `host`. 24 - pub fn set_subscribe_cursor(db: &DbRef, host: &str, cursor: u64) -> Result<()> { 26 + pub fn set_subscribe_cursor(db: &DbRef, host: &str, cursor: u64) -> StorageResult<()> { 25 27 let key = keys::subscribe_cursor(host); 26 - db.cursors.insert(key, cursor.to_be_bytes())?; 28 + db.ks.insert(key, cursor.to_be_bytes())?; 27 29 Ok(()) 28 30 } 29 31 ··· 37 39 } 38 40 39 41 /// Read the listRepos backfill progress for `host`. 40 - pub fn get_list_repos_progress(_db: &DbRef, _host: &str) -> Result<Option<ListReposProgress>> { 42 + pub fn get_list_repos_progress( 43 + _db: &DbRef, 44 + _host: &str, 45 + ) -> StorageResult<Option<ListReposProgress>> { 41 46 todo!("deserialize ListReposProgress") 42 47 } 43 48 ··· 46 51 _db: &DbRef, 47 52 _host: &str, 48 53 _progress: &ListReposProgress, 49 - ) -> Result<()> { 54 + ) -> StorageResult<()> { 50 55 todo!("serialize and write ListReposProgress") 51 56 }
+11
src/storage/error.rs
··· 1 + use thiserror::Error; 2 + 3 + #[derive(Debug, Error)] 4 + pub enum StorageError { 5 + #[error("fjall: {0}")] 6 + Fjall(#[from] fjall::Error), 7 + #[error("corrupted cursor: {0}")] 8 + CorruptedCursor(String), 9 + } 10 + 11 + pub type StorageResult<T> = Result<T, StorageError>;
+21 -18
src/storage/index.rs src/storage/list_by.rs
··· 5 5 use jacquard_common::types::nsid::Nsid; 6 6 use jacquard_common::types::string::Did; 7 7 8 - use crate::error::Result; 9 - use crate::storage::{DbRef, keys}; 8 + use crate::storage::{DbRef, StorageResult, keys}; 10 9 11 10 /// Add a `(collection, did)` pair to both indexes within an existing batch. 12 11 /// ··· 18 17 did: Did<'_>, 19 18 collection: Nsid<'_>, 20 19 ) { 21 - batch.insert(&db.rbc, keys::rbc(collection.clone(), did.clone()), b""); 22 - batch.insert(&db.cbr, keys::cbr(did, collection), b""); 20 + batch.insert(&db.ks, keys::rbc(collection.clone(), did.clone()), b""); 21 + batch.insert(&db.ks, keys::cbr(did, collection), b""); 23 22 } 24 23 25 24 /// Remove a `(collection, did)` pair from both indexes within an existing batch. ··· 32 31 did: Did<'_>, 33 32 collection: Nsid<'_>, 34 33 ) { 35 - batch.remove(&db.rbc, keys::rbc(collection.clone(), did.clone())); 36 - batch.remove(&db.cbr, keys::cbr(did, collection)); 34 + batch.remove(&db.ks, keys::rbc(collection.clone(), did.clone())); 35 + batch.remove(&db.ks, keys::cbr(did, collection)); 37 36 } 38 37 39 38 /// Insert a `(collection, did)` pair into both the rbc and cbr indexes. 40 - pub fn insert(db: &DbRef, did: Did<'_>, collection: Nsid<'_>) -> Result<()> { 39 + pub fn insert(db: &DbRef, did: Did<'_>, collection: Nsid<'_>) -> StorageResult<()> { 41 40 let mut batch = db.database.batch(); 42 41 insert_into(&mut batch, db, did, collection); 43 42 batch.commit()?; ··· 45 44 } 46 45 47 46 /// Remove a `(collection, did)` pair from both indexes. 48 - pub fn remove(db: &DbRef, did: Did<'_>, collection: Nsid<'_>) -> Result<()> { 47 + pub fn remove(db: &DbRef, did: Did<'_>, collection: Nsid<'_>) -> StorageResult<()> { 49 48 let mut batch = db.database.batch(); 50 49 remove_into(&mut batch, db, did, collection); 51 50 batch.commit()?; ··· 55 54 /// Remove all index entries for `did` (account deletion). 56 55 /// 57 56 /// Reads all collections from the cbr index, then deletes both sides in a batch. 58 - pub fn remove_all(db: &DbRef, did: Did<'_>) -> Result<()> { 57 + pub fn remove_all(db: &DbRef, did: Did<'_>) -> StorageResult<()> { 59 58 let prefix = keys::cbr_prefix(did.clone()); 60 59 let collections: Vec<Vec<u8>> = db 61 - .cbr 60 + .ks 62 61 .prefix(prefix.clone()) 63 62 .map(|guard| guard.into_inner().map(|(k, _v)| k.to_vec())) 64 63 .collect::<fjall::Result<_>>()?; ··· 71 70 let mut batch = db.database.batch(); 72 71 for cbr_key in &collections { 73 72 if let Some(col) = keys::cbr_parse_collection(cbr_key, prefix_len) { 74 - batch.remove(&db.rbc, keys::rbc(col, did.clone())); 73 + batch.remove(&db.ks, keys::rbc(col, did.clone())); 75 74 } 76 - batch.remove(&db.cbr, cbr_key.as_slice()); 75 + batch.remove(&db.ks, cbr_key.as_slice()); 77 76 } 78 77 batch.commit()?; 79 78 Ok(()) ··· 88 87 /// in a single batch. 89 88 /// 90 89 /// Typically called as `sync_collections(db, did, &snapshot.collections)`. 91 - pub fn sync_collections(db: &DbRef, did: Did<'_>, collections: &[Nsid<'static>]) -> Result<()> { 90 + pub fn sync_collections( 91 + db: &DbRef, 92 + did: Did<'_>, 93 + collections: &[Nsid<'static>], 94 + ) -> StorageResult<()> { 92 95 // Read the current set from the cbr index. Fjall iterates in key order, and 93 96 // the cbr prefix puts the collection NSID last, so the suffix scan is already 94 97 // sorted lexicographically. 95 98 let prefix = keys::cbr_prefix(did.clone()); 96 99 let prefix_len = prefix.len(); 97 100 let existing: Vec<Nsid<'static>> = db 98 - .cbr 101 + .ks 99 102 .prefix(prefix) 100 103 .map(|guard| { 101 104 guard ··· 290 293 collection: Nsid<'_>, 291 294 cursor: Option<Did<'_>>, 292 295 limit: usize, 293 - ) -> Result<Vec<Did<'static>>> { 296 + ) -> StorageResult<Vec<Did<'static>>> { 294 297 let prefix = keys::rbc_prefix(collection.clone()); 295 298 let prefix_len = prefix.len(); 296 299 ··· 300 303 }; 301 304 302 305 let mut dids = Vec::with_capacity(limit); 303 - for guard in db.rbc.range(start_key..) { 306 + for guard in db.ks.range(start_key..) { 304 307 let (k, _v) = guard.into_inner()?; 305 308 if !k.starts_with(&prefix) { 306 309 break; ··· 333 336 did: Did<'_>, 334 337 cursor: Option<Nsid<'_>>, 335 338 limit: usize, 336 - ) -> Result<Vec<Nsid<'static>>> { 339 + ) -> StorageResult<Vec<Nsid<'static>>> { 337 340 let prefix = keys::cbr_prefix(did.clone()); 338 341 let prefix_len = prefix.len(); 339 342 ··· 343 346 }; 344 347 345 348 let mut cols = Vec::with_capacity(limit); 346 - for guard in db.cbr.range(start_key..) { 349 + for guard in db.ks.range(start_key..) { 347 350 let (k, _v) = guard.into_inner()?; 348 351 if !k.starts_with(&prefix) { 349 352 break;
+11 -29
src/storage/mod.rs
··· 1 1 pub mod cursor; 2 - pub mod index; 2 + pub mod error; 3 3 pub mod keys; 4 + pub mod list_by; 4 5 pub mod repo; 5 6 pub mod resync; 7 + 8 + pub(crate) use error::{StorageError, StorageResult}; 6 9 7 10 use std::path::Path; 8 11 use std::sync::Arc; 9 12 10 - use crate::error::Result; 11 - 12 13 /// Shared handle to the fjall database and its per-concern keyspaces. 13 14 /// 14 15 /// In fjall 3.x, `Database` is the top-level multi-keyspace container and 15 16 /// `Keyspace` is an individual column-family (the old `PartitionHandle`). 16 17 pub struct Db { 17 18 pub(crate) database: fjall::Database, 18 - /// Main collection index: `rbc\0<collection>\0<did>`. 19 - pub(crate) rbc: fjall::Keyspace, 20 - /// Reversed index: `cbr\0<did>\0<collection>`. 21 - pub(crate) cbr: fjall::Keyspace, 22 - /// Per-repo state + metadata. 23 - pub(crate) repos: fjall::Keyspace, 24 - /// Firehose cursor + listRepos progress. 25 - pub(crate) cursors: fjall::Keyspace, 26 - /// Timestamp-ordered resync queue. 27 - pub(crate) resync: fjall::Keyspace, 19 + // for now, put all keys in one keyspace 20 + pub(crate) ks: fjall::Keyspace, 28 21 } 29 22 30 23 /// Cheaply-cloneable reference to the shared database. 31 24 pub type DbRef = Arc<Db>; 32 25 33 26 /// Open (or create) the fjall database at `path` and return a shared handle. 34 - pub fn open(path: &Path) -> Result<DbRef> { 27 + pub fn open(path: &Path) -> StorageResult<DbRef> { 35 28 open_inner(path, false) 36 29 } 37 30 38 31 /// Open a temporary database that deletes itself on drop. For tests only. 39 32 #[cfg(test)] 40 - pub(crate) fn open_temporary() -> Result<DbRef> { 33 + pub(crate) fn open_temporary() -> StorageResult<DbRef> { 41 34 use std::sync::atomic::{AtomicU64, Ordering}; 42 35 static COUNTER: AtomicU64 = AtomicU64::new(0); 43 36 let n = COUNTER.fetch_add(1, Ordering::Relaxed); ··· 45 38 open_inner(&path, true) 46 39 } 47 40 48 - fn open_inner(path: &Path, temporary: bool) -> Result<DbRef> { 41 + fn open_inner(path: &Path, temporary: bool) -> StorageResult<DbRef> { 49 42 let database = fjall::Database::builder(path).temporary(temporary).open()?; 50 43 51 - let rbc = database.keyspace("rbc", fjall::KeyspaceCreateOptions::default)?; 52 - let cbr = database.keyspace("cbr", fjall::KeyspaceCreateOptions::default)?; 53 - let repos = database.keyspace("repos", fjall::KeyspaceCreateOptions::default)?; 54 - let cursors = database.keyspace("cursors", fjall::KeyspaceCreateOptions::default)?; 55 - let resync = database.keyspace("resync", fjall::KeyspaceCreateOptions::default)?; 44 + let ks = database.keyspace("default", fjall::KeyspaceCreateOptions::default)?; 56 45 57 - Ok(Arc::new(Db { 58 - database, 59 - rbc, 60 - cbr, 61 - repos, 62 - cursors, 63 - resync, 64 - })) 46 + Ok(Arc::new(Db { database, ks })) 65 47 }
+29 -22
src/storage/repo.rs
··· 1 1 //! Per-repo state storage. 2 2 3 + use jacquard_common::types::string::Cid; 3 4 use jacquard_common::types::string::Did; 5 + use jacquard_common::types::tid::Tid; 4 6 5 - use crate::error::Result; 6 - use crate::storage::{DbRef, keys}; 7 + use crate::storage::{DbRef, error::StorageResult, keys}; 7 8 8 9 /// tap's "RepoState" type 9 10 #[derive(Debug, Clone, PartialEq, Eq)] ··· 34 35 } 35 36 36 37 /// tap's "AccountStatus" type 38 + /// TODO: move to a types folder? 37 39 #[derive(Debug, Clone, PartialEq, Eq)] 38 40 pub enum AccountStatus { 39 41 Active, ··· 44 46 } 45 47 46 48 impl AccountStatus { 47 - pub fn as_str(&self) -> &str { 49 + pub fn is_active(&self) -> bool { 50 + matches!(self, AccountStatus::Active) 51 + } 52 + pub fn status(&self) -> Option<&str> { 48 53 match self { 49 - AccountStatus::Active => "active", 50 - AccountStatus::Takendown => "takendown", 51 - AccountStatus::Suspended => "suspended", 52 - AccountStatus::Deactivated => "deactivated", 53 - AccountStatus::Deleted => "deleted", 54 + AccountStatus::Active => None, 55 + AccountStatus::Takendown => Some("takendown"), 56 + AccountStatus::Suspended => Some("suspended"), 57 + AccountStatus::Deactivated => Some("deactivated"), 58 + AccountStatus::Deleted => Some("deleted"), 54 59 } 55 60 } 56 61 } 57 62 58 - /// Stored record for a repository. 63 + /// Stored info for a repository. 59 64 #[derive(Debug, Clone)] 60 - pub struct RepoRecord { 65 + pub struct RepoInfo { 61 66 pub state: RepoState, 62 67 pub status: AccountStatus, 63 68 pub error: Option<String>, 64 69 } 65 70 66 - /// Read the [`RepoRecord`] for `did`, returning `None` if no record exists. 67 - pub fn get(_db: &DbRef, _did: &Did<'_>) -> Result<Option<RepoRecord>> { 68 - todo!("deserialize RepoRecord from fjall value") 71 + /// Retrieve both [`RepoInfo`] and [`RepoPrev`] for a `did`. 72 + /// 73 + /// `None` if the repo is not indexed. 74 + pub fn get(_db: &DbRef, _did: &Did<'_>) -> StorageResult<Option<(RepoInfo, Option<RepoPrev>)>> { 75 + todo!("deserialize RepoInfo from fjall value") 69 76 } 70 77 71 - /// Write a [`RepoRecord`] for `did`. 72 - pub fn put(_db: &DbRef, _did: &Did<'_>, _record: &RepoRecord) -> Result<()> { 73 - todo!("serialize RepoRecord and write to fjall") 78 + /// Write a [`RepoInfo`] for `did`. 79 + pub fn put_info(_db: &DbRef, _did: &Did<'_>, _record: &RepoInfo) -> StorageResult<()> { 80 + todo!("serialize RepoInfo and write to fjall") 74 81 } 75 82 76 83 /// Transient sync state for proof validation. 77 84 #[derive(Debug, Clone)] 78 85 pub struct RepoPrev { 79 86 /// The last-seen `rev` string. 80 - pub rev: String, 87 + pub rev: Tid, 81 88 /// The last-seen `prevData` CID (as raw bytes). 82 - pub prev_data: Vec<u8>, 89 + pub prev_data: Cid<'static>, 83 90 } 84 91 85 92 /// Read the transient [`RepoPrev`] for `did`. 86 - pub fn get_prev(_db: &DbRef, _did: &Did<'_>) -> Result<Option<RepoPrev>> { 93 + pub fn get_prev(_db: &DbRef, _did: &Did<'_>) -> StorageResult<Option<RepoPrev>> { 87 94 todo!("deserialize RepoPrev") 88 95 } 89 96 90 97 /// Write the transient [`RepoPrev`] for `did`. 91 - pub fn put_prev(_db: &DbRef, _did: &Did<'_>, _prev: &RepoPrev) -> Result<()> { 98 + pub fn put_prev(_db: &DbRef, _did: &Did<'_>, _prev: &RepoPrev) -> StorageResult<()> { 92 99 todo!("serialize and write RepoPrev") 93 100 } 94 101 95 102 /// Delete the transient [`RepoPrev`] for `did`. 96 - pub fn delete_prev(db: &DbRef, did: &Did<'_>) -> Result<()> { 103 + pub fn delete_prev(db: &DbRef, did: &Did<'_>) -> StorageResult<()> { 97 104 let key = keys::repo_prev(did); 98 - db.repos.remove(key)?; 105 + db.ks.remove(key)?; 99 106 Ok(()) 100 107 }
+4 -5
src/storage/resync.rs
··· 5 5 6 6 use jacquard_common::types::string::Did; 7 7 8 - use crate::error::Result; 9 - use crate::storage::DbRef; 8 + use crate::storage::{DbRef, StorageResult}; 10 9 11 10 /// An item waiting in the resync queue. 12 11 #[derive(Debug, Clone)] ··· 19 18 } 20 19 21 20 /// Enqueue a repo for resync at the given Unix timestamp (seconds). 22 - pub fn enqueue(_db: &DbRef, _ts: u64, _item: &ResyncItem) -> Result<()> { 21 + pub fn enqueue(_db: &DbRef, _ts: u64, _item: &ResyncItem) -> StorageResult<()> { 23 22 todo!("serialize ResyncItem to CBOR and insert into resync partition") 24 23 } 25 24 26 25 /// Dequeue and return the next item whose timestamp is ≤ `now`. 27 26 /// 28 27 /// Removes the entry from the queue atomically before returning it. 29 - pub fn dequeue_ready(_db: &DbRef, _now: u64) -> Result<Option<ResyncItem>> { 28 + pub fn dequeue_ready(_db: &DbRef, _now: u64) -> StorageResult<Option<ResyncItem>> { 30 29 todo!("scan resync partition up to `now`, remove and return the first entry") 31 30 } 32 31 33 32 /// Remove all queue entries for `did` (e.g., after a successful resync). 34 - pub fn remove_did(_db: &DbRef, _did: &Did<'_>) -> Result<()> { 33 + pub fn remove_did(_db: &DbRef, _did: &Did<'_>) -> StorageResult<()> { 35 34 todo!("scan resync partition and remove all entries matching did") 36 35 }
+1 -1
src/sync/resync/mod.rs
··· 87 87 Err(e) => return Err(crate::error::Error::Other(e.to_string())), 88 88 }; 89 89 90 - crate::storage::index::sync_collections(db, did, &snapshot.collections)?; 90 + crate::storage::list_by::sync_collections(db, did, &snapshot.collections)?; 91 91 Ok(()) 92 92 } 93 93