lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

set up pds host state

phil f9e47c74 4db7eeb5

+341
+76
src/storage/list_hosts_cursor.rs
··· 1 + //! listHosts pagination cursor for the upstream relay. 2 + //! 3 + //! Tracks progress through `com.atproto.sync.listHosts` for deep-crawl mode. 4 + //! 5 + //! Key: `"lhs"<upstream_host>`, value: plain string cursor 6 + 7 + use jacquard_common::url::Host; 8 + 9 + use crate::storage::{ 10 + DbRef, PREFIX_LIST_HOSTS, 11 + error::{StorageError, StorageResult}, 12 + }; 13 + 14 + fn key(host: &Host) -> Vec<u8> { 15 + let host = host.to_string(); 16 + let mut k = Vec::with_capacity(PREFIX_LIST_HOSTS.len() + host.len()); 17 + k.extend_from_slice(&PREFIX_LIST_HOSTS); 18 + k.extend_from_slice(host.as_bytes()); 19 + k 20 + } 21 + 22 + /// Read the `listHosts` cursor for `host`. 23 + /// 24 + /// Returns `None` if no walk has been started yet. 25 + pub fn get(db: &DbRef, host: &Host) -> StorageResult<Option<String>> { 26 + let k = key(host); 27 + match db.ks.get(k)? { 28 + None => Ok(None), 29 + Some(v) => { 30 + let s = std::str::from_utf8(v.as_ref()).map_err(|_| StorageError::Corrupt { 31 + key: host.to_string(), 32 + reason: "listHosts cursor not valid UTF-8", 33 + })?; 34 + Ok(Some(s.to_owned())) 35 + } 36 + } 37 + } 38 + 39 + /// Persist the `listHosts` cursor for `host`. 40 + pub fn set(db: &DbRef, host: &Host, cursor: &str) -> StorageResult<()> { 41 + db.ks.insert(key(host), cursor.as_bytes())?; 42 + Ok(()) 43 + } 44 + 45 + #[cfg(test)] 46 + mod tests { 47 + use super::*; 48 + use crate::storage::open_temporary; 49 + 50 + fn host(s: &str) -> Host { 51 + Host::parse(s).unwrap() 52 + } 53 + 54 + #[test] 55 + fn none_when_missing() { 56 + let db = open_temporary().unwrap(); 57 + assert_eq!(get(&db, &host("relay.example.com")).unwrap(), None); 58 + } 59 + 60 + #[test] 61 + fn set_and_get() { 62 + let db = open_temporary().unwrap(); 63 + let h = host("relay.example.com"); 64 + set(&db, &h, "page2cursor").unwrap(); 65 + assert_eq!(get(&db, &h).unwrap(), Some("page2cursor".to_owned())); 66 + } 67 + 68 + #[test] 69 + fn overwrite_cursor() { 70 + let db = open_temporary().unwrap(); 71 + let h = host("relay.example.com"); 72 + set(&db, &h, "first").unwrap(); 73 + set(&db, &h, "second").unwrap(); 74 + assert_eq!(get(&db, &h).unwrap(), Some("second".to_owned())); 75 + } 76 + }
+6
src/storage/mod.rs
··· 2 2 pub mod collection_index; 3 3 pub mod error; 4 4 pub mod firehose_cursor; 5 + pub mod list_hosts_cursor; 6 + pub mod pds_host; 5 7 pub mod repo; 6 8 pub mod resync_buffer; 7 9 pub mod resync_queue; ··· 35 37 pub(crate) const PREFIX_RESYNC_QUEUE: KeyPrefix = *b"rsq"; 36 38 /// Per-repo buffered firehose events during resync. See [`resync_buffer`]. 37 39 pub(crate) const PREFIX_RESYNC_BUFFER: KeyPrefix = *b"rsb"; 40 + /// Per-PDS host state (sync1.1 mode, trust, listRepos cursor/done). See [`pds_host`]. 41 + pub(crate) const PREFIX_PDS_HOST: KeyPrefix = *b"pdh"; 42 + /// listHosts walk cursor (per upstream relay host). See [`list_hosts_cursor`]. 43 + pub(crate) const PREFIX_LIST_HOSTS: KeyPrefix = *b"lhs"; 38 44 39 45 use std::path::Path; 40 46 use std::sync::Arc;
+259
src/storage/pds_host.rs
··· 1 + //! Per-PDS-host state storage 2 + //! 3 + //! Key: `"pdh"<hostname>`, value: `<sync11_mode>\0<trust>\0<list_repos_done>\0<cursor_or_empty>` 4 + 5 + use jacquard_common::url::Host; 6 + 7 + use crate::storage::{ 8 + DbRef, PREFIX_PDS_HOST, 9 + error::{StorageError, StorageResult}, 10 + }; 11 + 12 + // --------------------------------------------------------------------------- 13 + // Key encoding 14 + // --------------------------------------------------------------------------- 15 + 16 + fn key(host: &Host) -> Vec<u8> { 17 + let host = host.to_string(); 18 + let mut k = Vec::with_capacity(PREFIX_PDS_HOST.len() + host.len()); 19 + k.extend_from_slice(&PREFIX_PDS_HOST); 20 + k.extend_from_slice(host.as_bytes()); 21 + k 22 + } 23 + 24 + // --------------------------------------------------------------------------- 25 + // Types 26 + // --------------------------------------------------------------------------- 27 + 28 + /// Whether this host has been seen sending sync1.1-style events. 29 + /// 30 + /// Hosts start in `Lenient` mode and flip to `Strict` once any event with 31 + /// `prevData` is observed. 32 + #[derive(Debug, Clone, PartialEq, Eq)] 33 + pub enum Sync11Mode { 34 + Lenient, 35 + Strict, 36 + } 37 + 38 + impl Sync11Mode { 39 + fn as_str(&self) -> &'static str { 40 + match self { 41 + Sync11Mode::Lenient => "lenient", 42 + Sync11Mode::Strict => "strict", 43 + } 44 + } 45 + 46 + fn from_str(s: &str) -> Option<Self> { 47 + Some(match s { 48 + "lenient" => Sync11Mode::Lenient, 49 + "strict" => Sync11Mode::Strict, 50 + _ => return None, 51 + }) 52 + } 53 + } 54 + 55 + /// Trust level for a PDS host. 56 + #[derive(Debug, Clone, PartialEq, Eq)] 57 + pub enum PdsTrust { 58 + Ok, 59 + Cautious, 60 + Blocked, 61 + } 62 + 63 + impl PdsTrust { 64 + fn as_str(&self) -> &'static str { 65 + match self { 66 + PdsTrust::Ok => "ok", 67 + PdsTrust::Cautious => "cautious", 68 + PdsTrust::Blocked => "blocked", 69 + } 70 + } 71 + 72 + fn from_str(s: &str) -> Option<Self> { 73 + Some(match s { 74 + "ok" => PdsTrust::Ok, 75 + "cautious" => PdsTrust::Cautious, 76 + "blocked" => PdsTrust::Blocked, 77 + _ => return None, 78 + }) 79 + } 80 + } 81 + 82 + /// Stored state for a PDS host. 83 + #[derive(Debug, Clone)] 84 + pub struct PdsHostInfo { 85 + pub sync11_mode: Sync11Mode, 86 + pub trust: PdsTrust, 87 + /// Whether the `listRepos` walk for this PDS has completed. 88 + pub list_repos_done: bool, 89 + /// Current `listRepos` pagination cursor; empty when not yet started. 90 + pub list_repos_cursor: String, 91 + } 92 + 93 + impl Default for PdsHostInfo { 94 + fn default() -> Self { 95 + PdsHostInfo { 96 + sync11_mode: Sync11Mode::Lenient, 97 + trust: PdsTrust::Ok, 98 + list_repos_done: false, 99 + list_repos_cursor: String::new(), 100 + } 101 + } 102 + } 103 + 104 + // --------------------------------------------------------------------------- 105 + // Encode / decode 106 + // --------------------------------------------------------------------------- 107 + 108 + /// Wire format: `<sync11_mode>\0<trust>\0<list_repos_done>\0<cursor_or_empty>` 109 + fn encode(info: &PdsHostInfo) -> Vec<u8> { 110 + let done = if info.list_repos_done { "1" } else { "0" }; 111 + let s = format!( 112 + "{}\0{}\0{}\0{}", 113 + info.sync11_mode.as_str(), 114 + info.trust.as_str(), 115 + done, 116 + info.list_repos_cursor, 117 + ); 118 + s.into_bytes() 119 + } 120 + 121 + fn decode(bytes: &[u8], key: &str) -> StorageResult<PdsHostInfo> { 122 + let s = std::str::from_utf8(bytes).map_err(|_| StorageError::Corrupt { 123 + key: key.to_owned(), 124 + reason: "pds host info not valid UTF-8", 125 + })?; 126 + let mut parts = s.splitn(4, '\0'); 127 + let sync11_mode = parts 128 + .next() 129 + .and_then(Sync11Mode::from_str) 130 + .ok_or(StorageError::Corrupt { 131 + key: key.to_owned(), 132 + reason: "invalid sync11_mode", 133 + })?; 134 + let trust = parts 135 + .next() 136 + .and_then(PdsTrust::from_str) 137 + .ok_or(StorageError::Corrupt { 138 + key: key.to_owned(), 139 + reason: "invalid trust", 140 + })?; 141 + let list_repos_done = match parts.next() { 142 + Some("0") => false, 143 + Some("1") => true, 144 + _ => { 145 + return Err(StorageError::Corrupt { 146 + key: key.to_owned(), 147 + reason: "invalid list_repos_done", 148 + }); 149 + } 150 + }; 151 + let list_repos_cursor = parts.next().unwrap_or("").to_owned(); 152 + Ok(PdsHostInfo { 153 + sync11_mode, 154 + trust, 155 + list_repos_done, 156 + list_repos_cursor, 157 + }) 158 + } 159 + 160 + // --------------------------------------------------------------------------- 161 + // Public API 162 + // --------------------------------------------------------------------------- 163 + 164 + /// Read the [`PdsHostInfo`] for `host`. 165 + /// 166 + /// Returns the default (lenient, ok, not done, empty cursor) when the key is 167 + /// absent — matching the implicit state for any host we haven't seen yet. 168 + pub fn get(db: &DbRef, host: &Host) -> StorageResult<PdsHostInfo> { 169 + let k = key(host); 170 + match db.ks.get(&k)? { 171 + None => Ok(PdsHostInfo::default()), 172 + Some(v) => { 173 + let key_str = String::from_utf8_lossy(&k).into_owned(); 174 + decode(v.as_ref(), &key_str) 175 + } 176 + } 177 + } 178 + 179 + /// Add a [`PdsHostInfo`] write to an existing batch. 180 + pub fn put_into(batch: &mut fjall::OwnedWriteBatch, db: &DbRef, host: &Host, info: &PdsHostInfo) { 181 + batch.insert(&db.ks, key(host), encode(info)); 182 + } 183 + 184 + /// Write a [`PdsHostInfo`] for `host`. 185 + pub fn put(db: &DbRef, host: &Host, info: &PdsHostInfo) -> StorageResult<()> { 186 + let mut batch = db.database.batch(); 187 + put_into(&mut batch, db, host, info); 188 + batch.commit()?; 189 + Ok(()) 190 + } 191 + 192 + // --------------------------------------------------------------------------- 193 + // Tests 194 + // --------------------------------------------------------------------------- 195 + 196 + #[cfg(test)] 197 + mod tests { 198 + use super::*; 199 + use crate::storage::open_temporary; 200 + 201 + fn host(s: &str) -> Host { 202 + Host::parse(s).unwrap() 203 + } 204 + 205 + #[test] 206 + fn default_when_missing() { 207 + let db = open_temporary().unwrap(); 208 + let info = get(&db, &host("pds.example.com")).unwrap(); 209 + assert_eq!(info.sync11_mode, Sync11Mode::Lenient); 210 + assert_eq!(info.trust, PdsTrust::Ok); 211 + assert!(!info.list_repos_done); 212 + assert!(info.list_repos_cursor.is_empty()); 213 + } 214 + 215 + #[test] 216 + fn roundtrip_default() { 217 + let info = PdsHostInfo::default(); 218 + let encoded = encode(&info); 219 + let decoded = decode(&encoded, "test").unwrap(); 220 + assert_eq!(decoded.sync11_mode, Sync11Mode::Lenient); 221 + assert_eq!(decoded.trust, PdsTrust::Ok); 222 + assert!(!decoded.list_repos_done); 223 + assert!(decoded.list_repos_cursor.is_empty()); 224 + } 225 + 226 + #[test] 227 + fn roundtrip_strict_blocked_done_with_cursor() { 228 + let info = PdsHostInfo { 229 + sync11_mode: Sync11Mode::Strict, 230 + trust: PdsTrust::Blocked, 231 + list_repos_done: true, 232 + list_repos_cursor: "abc123".to_owned(), 233 + }; 234 + let encoded = encode(&info); 235 + let decoded = decode(&encoded, "test").unwrap(); 236 + assert_eq!(decoded.sync11_mode, Sync11Mode::Strict); 237 + assert_eq!(decoded.trust, PdsTrust::Blocked); 238 + assert!(decoded.list_repos_done); 239 + assert_eq!(decoded.list_repos_cursor, "abc123"); 240 + } 241 + 242 + #[test] 243 + fn put_and_get() { 244 + let db = open_temporary().unwrap(); 245 + let h = host("pds.example.com"); 246 + let info = PdsHostInfo { 247 + sync11_mode: Sync11Mode::Strict, 248 + trust: PdsTrust::Cautious, 249 + list_repos_done: false, 250 + list_repos_cursor: "cursor42".to_owned(), 251 + }; 252 + put(&db, &h, &info).unwrap(); 253 + let retrieved = get(&db, &h).unwrap(); 254 + assert_eq!(retrieved.sync11_mode, Sync11Mode::Strict); 255 + assert_eq!(retrieved.trust, PdsTrust::Cautious); 256 + assert!(!retrieved.list_repos_done); 257 + assert_eq!(retrieved.list_repos_cursor, "cursor42"); 258 + } 259 + }