this repo has no description
20
fork

Configure Feed

Select the types of activity you want to include in your feed.

feature: atproto-dasl disk storage improvements

+41 -19
+1 -1
crates/atproto-repo/src/lib.rs
··· 76 76 pub use mst::{Mst, MstDiff, MstNode, TreeEntry}; 77 77 78 78 // Re-export repository types 79 - pub use repo::{Commit, RecordPath, Repository}; 79 + pub use repo::{Commit, DiskRepository, MemoryRepository, RecordPath, Repository}; 80 80 81 81 #[cfg(test)] 82 82 mod tests {
+40 -18
crates/atproto-repo/src/repo/mod.rs
··· 44 44 use crate::mst::Mst; 45 45 use atproto_dasl::CarReader; 46 46 use atproto_dasl::Cid; 47 - use atproto_dasl::storage::{BlockStorage, MemoryStorage}; 47 + use atproto_dasl::storage::{BlockStorage, DiskStorage, MemoryStorage}; 48 48 use tokio::io::AsyncRead; 49 49 50 50 /// Read-only view of a repository backed by pluggable storage. ··· 213 213 pub fn config(&self) -> &RepoConfig { 214 214 &self.config 215 215 } 216 - } 217 216 218 - /// Convenience type for in-memory repository. 219 - pub type MemoryRepository = Repository<MemoryStorage>; 220 - 221 - impl MemoryRepository { 222 - /// Load repository from an async CAR reader into memory. 223 - /// 224 - /// # Errors 225 - /// 226 - /// Returns `RepoError` if loading fails. 227 - pub async fn from_car<R: AsyncRead + Unpin>( 217 + /// Load repository from an async CAR reader into pre-built storage. 218 + async fn from_car_with_storage<R: AsyncRead + Unpin>( 228 219 reader: R, 229 220 config: RepoConfig, 221 + mut storage: S, 230 222 ) -> Result<Self, RepoError> { 231 223 let car_reader = CarReader::with_config(reader, config.car_config()).await?; 232 224 233 - // Get root CID 234 225 let root_cid = car_reader 235 226 .root() 236 227 .ok_or_else(|| RepoError::InvalidCommit { ··· 238 229 })? 239 230 .clone(); 240 231 241 - // Stream all blocks to memory storage 242 - let mut storage = MemoryStorage::with_limits(config.limits.clone()); 243 232 let _header = car_reader.stream_to_storage(&mut storage).await?; 244 233 245 - // Load commit from root 246 234 let commit_bytes = storage 247 235 .get(&root_cid.clone().into()) 248 236 .await? ··· 255 243 reason: format!("failed to decode commit: {}", e), 256 244 })?; 257 245 258 - // Validate commit version 259 246 if commit.version != 3 { 260 247 return Err(RepoError::UnsupportedCommitVersion { 261 248 version: commit.version, 262 249 }); 263 250 } 264 251 265 - // Create MST from commit data CID 266 252 let mst = Mst::from_root(commit.data.clone().into(), storage, config.clone()); 267 253 268 254 Ok(Self { ··· 272 258 signature_verified: None, 273 259 }) 274 260 } 261 + } 262 + 263 + /// Convenience type for in-memory repository. 264 + pub type MemoryRepository = Repository<MemoryStorage>; 265 + 266 + impl MemoryRepository { 267 + /// Load repository from an async CAR reader into memory. 268 + /// 269 + /// # Errors 270 + /// 271 + /// Returns `RepoError` if loading fails. 272 + pub async fn from_car<R: AsyncRead + Unpin>( 273 + reader: R, 274 + config: RepoConfig, 275 + ) -> Result<Self, RepoError> { 276 + let storage = MemoryStorage::with_limits(config.limits.clone()); 277 + Self::from_car_with_storage(reader, config, storage).await 278 + } 275 279 276 280 /// Get the total number of records in the repository. 277 281 /// ··· 281 285 pub async fn record_count(&self) -> Result<usize, RepoError> { 282 286 let entries = self.mst.entries().await?; 283 287 Ok(entries.len()) 288 + } 289 + } 290 + 291 + /// Convenience type for disk-backed repository. 292 + pub type DiskRepository = Repository<DiskStorage>; 293 + 294 + impl DiskRepository { 295 + /// Load repository from an async CAR reader with disk-backed storage. 296 + /// 297 + /// # Errors 298 + /// 299 + /// Returns `RepoError` if loading or storage initialization fails. 300 + pub async fn from_car<R: AsyncRead + Unpin>( 301 + reader: R, 302 + config: RepoConfig, 303 + ) -> Result<Self, RepoError> { 304 + let storage = DiskStorage::new(config.limits.clone()).await?; 305 + Self::from_car_with_storage(reader, config, storage).await 284 306 } 285 307 } 286 308