Ramjet is a relay consumer that supports configurable forward and track collections, as well as record reconciliation.
event-stream relay firehose riblt atprotocol
13
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 354 lines 12 kB view raw
1//! Configuration types for the Ramjet service. 2//! 3//! Includes CLI argument parsing via clap, collection pattern matching 4//! for routing firehose events, and service-level configuration. 5 6use std::collections::HashSet; 7use std::net::SocketAddr; 8use std::path::PathBuf; 9 10use clap::Parser; 11 12/// Collection pattern for matching ATProtocol NSIDs. 13/// 14/// Supports exact match, single-segment wildcard (`.*`), and 15/// glob wildcard (`.**`) patterns. 16#[derive(Debug, Clone)] 17pub enum CollectionPattern { 18 /// Empty pattern — matches nothing. 19 None, 20 /// Wildcard `*` — matches every collection. 21 MatchAll, 22 /// Exact NSID match (e.g., `com.example`). 23 Exact(String), 24 /// Single-segment wildcard (e.g., `com.example.*` matches `com.example.foo`). 25 SingleWild(String), 26 /// Glob wildcard (e.g., `com.example.**` matches `com.example.foo.bar`). 27 GlobWild(String), 28} 29 30impl CollectionPattern { 31 /// Parse a pattern string into a `CollectionPattern`. 32 pub fn parse(pattern: &str) -> Self { 33 let trimmed = pattern.trim(); 34 if trimmed.is_empty() { 35 CollectionPattern::None 36 } else if trimmed == "*" { 37 CollectionPattern::MatchAll 38 } else if let Some(prefix) = trimmed.strip_suffix(".**") { 39 CollectionPattern::GlobWild(format!("{}.", prefix)) 40 } else if let Some(prefix) = trimmed.strip_suffix(".*") { 41 CollectionPattern::SingleWild(format!("{}.", prefix)) 42 } else { 43 CollectionPattern::Exact(trimmed.to_string()) 44 } 45 } 46 47 /// Test whether a collection NSID matches this pattern. 48 pub fn matches(&self, collection: &str) -> bool { 49 match self { 50 CollectionPattern::None => false, 51 CollectionPattern::MatchAll => true, 52 CollectionPattern::Exact(expected) => collection == expected, 53 CollectionPattern::SingleWild(prefix) => { 54 collection.starts_with(prefix.as_str()) 55 && !collection[prefix.len()..].contains('.') 56 && collection.len() > prefix.len() 57 } 58 CollectionPattern::GlobWild(prefix) => { 59 collection.starts_with(prefix.as_str()) && collection.len() > prefix.len() 60 } 61 } 62 } 63} 64 65/// A set of collection patterns evaluated with OR semantics. 66#[derive(Debug, Clone)] 67pub struct CollectionMatcher { 68 patterns: Vec<CollectionPattern>, 69} 70 71impl CollectionMatcher { 72 /// Create a matcher from a space-separated pattern string. 73 /// 74 /// - `"*"` matches everything 75 /// - `""` matches nothing 76 /// - `"app.bsky.** community.lexicon.**"` matches multiple prefixes 77 pub fn new(pattern_str: &str) -> Self { 78 let trimmed = pattern_str.trim(); 79 if trimmed.is_empty() { 80 return Self { 81 patterns: vec![CollectionPattern::None], 82 }; 83 } 84 Self { 85 patterns: trimmed 86 .split_whitespace() 87 .map(CollectionPattern::parse) 88 .collect(), 89 } 90 } 91 92 /// Test whether a collection NSID matches any pattern in this matcher. 93 pub fn matches(&self, collection: &str) -> bool { 94 self.patterns.iter().any(|p| p.matches(collection)) 95 } 96} 97 98/// Ramjet service configuration, parsed from CLI arguments and environment variables. 99#[derive(Parser, Debug)] 100#[command( 101 name = "ramjet", 102 version, 103 about = "ATProtocol event-stream, record, and blob service" 104)] 105pub struct CliArgs { 106 /// Path to the fjall database directory. 107 #[arg(long, env = "RAMJET_DB_PATH", default_value = "./data/ramjet.db")] 108 pub db_path: PathBuf, 109 110 /// Upstream relay WebSocket host. 111 #[arg(long, env = "RAMJET_RELAY_HOST", default_value = "bsky.network")] 112 pub relay_host: String, 113 114 /// HTTP bind address. 115 #[arg(long, env = "RAMJET_LISTEN_ADDR", default_value = "0.0.0.0:8080")] 116 pub listen_addr: SocketAddr, 117 118 /// Space-separated collection patterns to persist (e.g., `"garden.lexicon.** com.atproto.lexicon.**"`). 119 #[arg(long, env = "RAMJET_TRACKED_COLLECTIONS", default_value = "*")] 120 pub tracked_collections: String, 121 122 /// Space-separated collection patterns to forward as low-priority events. 123 /// `"*"` forwards everything not already tracked (default). 124 /// `""` forwards nothing. 125 #[arg(long, env = "RAMJET_FORWARD_COLLECTIONS", default_value = "*")] 126 pub forward_collections: String, 127 128 /// Event retention window in hours. 129 #[arg(long, env = "RAMJET_EVENT_RETENTION_HOURS", default_value = "72")] 130 pub event_retention_hours: u64, 131 132 /// Maximum events per WriteBatch. 133 #[arg(long, env = "RAMJET_BATCH_SIZE", default_value = "500")] 134 pub batch_size: usize, 135 136 /// Maximum wait time (ms) for batch fill before flushing. 137 #[arg(long, env = "RAMJET_BATCH_TIMEOUT_MS", default_value = "100")] 138 pub batch_timeout_ms: u64, 139 140 /// Comma-separated list of admin DIDs. 141 #[arg(long, env = "ADMIN_DIDS", default_value = "")] 142 pub admin_dids: String, 143 144 /// Path to a zstd dictionary file for event compression. 145 /// When absent, events are stored uncompressed. 146 #[arg(long, env = "RAMJET_ZSTD_DICT_PATH")] 147 pub zstd_dict_path: Option<PathBuf>, 148 149 /// Comma-separated list of DIDs to backfill on startup (skips already-backfilled repos). 150 #[arg(long, env = "RAMJET_BACKFILL", default_value = "")] 151 pub backfill: String, 152 153 /// Consumer group definitions (repeatable). Format: `name:partition_count`. 154 /// Example: `--consumer-group indexers:3 --consumer-group notifiers:2` 155 #[arg(long, env = "RAMJET_CONSUMER_GROUPS", value_delimiter = ',')] 156 pub consumer_group: Vec<String>, 157 158 /// QUIC bind address for RIBLT set reconciliation. Server only starts if provided. 159 #[arg(long, env = "RAMJET_QUIC_LISTEN_ADDR")] 160 pub quic_listen_addr: Option<SocketAddr>, 161} 162 163/// A pre-defined consumer group with a fixed number of partitions. 164#[derive(Debug, Clone)] 165pub struct ConsumerGroup { 166 /// Group name (e.g., "indexers"). 167 pub name: String, 168 /// Number of partitions. Must be >= 1. 169 pub partition_count: u16, 170} 171 172impl ConsumerGroup { 173 /// Parse a `name:count` string into a `ConsumerGroup`. 174 pub fn parse(s: &str) -> Option<Self> { 175 let (name, count_str) = s.split_once(':')?; 176 let name = name.trim(); 177 let count: u16 = count_str.trim().parse().ok()?; 178 if name.is_empty() || count == 0 { 179 return None; 180 } 181 Some(Self { 182 name: name.to_string(), 183 partition_count: count, 184 }) 185 } 186} 187 188/// Resolved service configuration with parsed matchers and sets. 189pub struct ServiceConfig { 190 /// Path to the fjall database directory. 191 pub db_path: PathBuf, 192 /// Upstream relay WebSocket host. 193 pub relay_host: String, 194 /// HTTP bind address. 195 pub listen_addr: SocketAddr, 196 /// Matcher for collections to persist. 197 pub tracked_collections: CollectionMatcher, 198 /// Matcher for collections to forward as low-priority. 199 pub forward_collections: CollectionMatcher, 200 /// Event retention window in hours. 201 pub event_retention_hours: u64, 202 /// Maximum events per WriteBatch. 203 pub batch_size: usize, 204 /// Maximum wait time (ms) for batch fill. 205 pub batch_timeout_ms: u64, 206 /// Set of admin DIDs. 207 pub admin_dids: HashSet<String>, 208 /// Optional path to a zstd dictionary file for event compression. 209 pub zstd_dict_path: Option<PathBuf>, 210 /// DIDs to backfill on startup. 211 pub backfill_dids: Vec<String>, 212 /// Pre-defined consumer groups. 213 pub consumer_groups: Vec<ConsumerGroup>, 214 /// QUIC bind address for RIBLT reconciliation (None = disabled). 215 pub quic_listen_addr: Option<SocketAddr>, 216} 217 218impl From<CliArgs> for ServiceConfig { 219 fn from(args: CliArgs) -> Self { 220 let admin_dids: HashSet<String> = args 221 .admin_dids 222 .split(',') 223 .map(|s| s.trim().to_string()) 224 .filter(|s| !s.is_empty()) 225 .collect(); 226 227 let backfill_dids: Vec<String> = args 228 .backfill 229 .split(',') 230 .map(|s| s.trim().to_string()) 231 .filter(|s| !s.is_empty()) 232 .collect(); 233 234 let consumer_groups: Vec<ConsumerGroup> = args 235 .consumer_group 236 .iter() 237 .filter_map(|s| { 238 let parsed = ConsumerGroup::parse(s); 239 if parsed.is_none() { 240 tracing::warn!(value = %s, "ignoring invalid --consumer-group value, expected name:count"); 241 } 242 parsed 243 }) 244 .collect(); 245 246 Self { 247 db_path: args.db_path, 248 relay_host: args.relay_host, 249 listen_addr: args.listen_addr, 250 tracked_collections: CollectionMatcher::new(&args.tracked_collections), 251 forward_collections: CollectionMatcher::new(&args.forward_collections), 252 event_retention_hours: args.event_retention_hours, 253 batch_size: args.batch_size, 254 batch_timeout_ms: args.batch_timeout_ms, 255 admin_dids, 256 zstd_dict_path: args.zstd_dict_path, 257 backfill_dids, 258 consumer_groups, 259 quic_listen_addr: args.quic_listen_addr, 260 } 261 } 262} 263 264#[cfg(test)] 265mod tests { 266 use super::*; 267 268 #[test] 269 fn test_exact_match() { 270 let p = CollectionPattern::parse("com.example"); 271 assert!(p.matches("com.example")); 272 assert!(!p.matches("com.example.foo")); 273 assert!(!p.matches("com")); 274 } 275 276 #[test] 277 fn test_single_wildcard() { 278 let p = CollectionPattern::parse("com.example.*"); 279 assert!(p.matches("com.example.foo")); 280 assert!(p.matches("com.example.bar")); 281 assert!(!p.matches("com.example")); 282 assert!(!p.matches("com.example.foo.baz")); 283 assert!(!p.matches("com")); 284 } 285 286 #[test] 287 fn test_glob_wildcard() { 288 let p = CollectionPattern::parse("com.example.**"); 289 assert!(p.matches("com.example.foo")); 290 assert!(p.matches("com.example.bar")); 291 assert!(p.matches("com.example.foo.bar")); 292 assert!(p.matches("com.example.bar.baz.qux")); 293 assert!(!p.matches("com.example")); 294 assert!(!p.matches("com")); 295 } 296 297 #[test] 298 fn test_match_all() { 299 let p = CollectionPattern::parse("*"); 300 assert!(p.matches("anything")); 301 assert!(p.matches("com.example.foo")); 302 } 303 304 #[test] 305 fn test_none() { 306 let p = CollectionPattern::parse(""); 307 assert!(!p.matches("anything")); 308 } 309 310 #[test] 311 fn test_matcher_multiple_patterns() { 312 let m = CollectionMatcher::new("garden.lexicon.** com.atproto.lexicon.**"); 313 assert!(m.matches("garden.lexicon.foo")); 314 assert!(m.matches("garden.lexicon.foo.bar")); 315 assert!(m.matches("com.atproto.lexicon.def")); 316 assert!(!m.matches("app.bsky.feed.post")); 317 assert!(!m.matches("garden.lexicon")); 318 } 319 320 #[test] 321 fn test_matcher_star() { 322 let m = CollectionMatcher::new("*"); 323 assert!(m.matches("anything")); 324 } 325 326 #[test] 327 fn test_matcher_empty() { 328 let m = CollectionMatcher::new(""); 329 assert!(!m.matches("anything")); 330 } 331 332 #[test] 333 fn test_admin_dids_parsing() { 334 let args = CliArgs { 335 db_path: PathBuf::from("/tmp"), 336 relay_host: "bsky.network".to_string(), 337 listen_addr: "0.0.0.0:8080".parse().unwrap(), 338 tracked_collections: "*".to_string(), 339 forward_collections: "*".to_string(), 340 event_retention_hours: 72, 341 batch_size: 500, 342 batch_timeout_ms: 100, 343 admin_dids: "did:plc:abc, did:plc:def".to_string(), 344 zstd_dict_path: None, 345 backfill: String::new(), 346 consumer_group: Vec::new(), 347 quic_listen_addr: None, 348 }; 349 let config = ServiceConfig::from(args); 350 assert!(config.admin_dids.contains("did:plc:abc")); 351 assert!(config.admin_dids.contains("did:plc:def")); 352 assert_eq!(config.admin_dids.len(), 2); 353 } 354}