//! Configuration types for the Ramjet service. //! //! Includes CLI argument parsing via clap, collection pattern matching //! for routing firehose events, and service-level configuration. use std::collections::HashSet; use std::net::SocketAddr; use std::path::PathBuf; use clap::Parser; /// Collection pattern for matching ATProtocol NSIDs. /// /// Supports exact match, single-segment wildcard (`.*`), and /// glob wildcard (`.**`) patterns. #[derive(Debug, Clone)] pub enum CollectionPattern { /// Empty pattern — matches nothing. None, /// Wildcard `*` — matches every collection. MatchAll, /// Exact NSID match (e.g., `com.example`). Exact(String), /// Single-segment wildcard (e.g., `com.example.*` matches `com.example.foo`). SingleWild(String), /// Glob wildcard (e.g., `com.example.**` matches `com.example.foo.bar`). GlobWild(String), } impl CollectionPattern { /// Parse a pattern string into a `CollectionPattern`. pub fn parse(pattern: &str) -> Self { let trimmed = pattern.trim(); if trimmed.is_empty() { CollectionPattern::None } else if trimmed == "*" { CollectionPattern::MatchAll } else if let Some(prefix) = trimmed.strip_suffix(".**") { CollectionPattern::GlobWild(format!("{}.", prefix)) } else if let Some(prefix) = trimmed.strip_suffix(".*") { CollectionPattern::SingleWild(format!("{}.", prefix)) } else { CollectionPattern::Exact(trimmed.to_string()) } } /// Test whether a collection NSID matches this pattern. pub fn matches(&self, collection: &str) -> bool { match self { CollectionPattern::None => false, CollectionPattern::MatchAll => true, CollectionPattern::Exact(expected) => collection == expected, CollectionPattern::SingleWild(prefix) => { collection.starts_with(prefix.as_str()) && !collection[prefix.len()..].contains('.') && collection.len() > prefix.len() } CollectionPattern::GlobWild(prefix) => { collection.starts_with(prefix.as_str()) && collection.len() > prefix.len() } } } } /// A set of collection patterns evaluated with OR semantics. #[derive(Debug, Clone)] pub struct CollectionMatcher { patterns: Vec, } impl CollectionMatcher { /// Create a matcher from a space-separated pattern string. /// /// - `"*"` matches everything /// - `""` matches nothing /// - `"app.bsky.** community.lexicon.**"` matches multiple prefixes pub fn new(pattern_str: &str) -> Self { let trimmed = pattern_str.trim(); if trimmed.is_empty() { return Self { patterns: vec![CollectionPattern::None], }; } Self { patterns: trimmed .split_whitespace() .map(CollectionPattern::parse) .collect(), } } /// Test whether a collection NSID matches any pattern in this matcher. pub fn matches(&self, collection: &str) -> bool { self.patterns.iter().any(|p| p.matches(collection)) } } /// Ramjet service configuration, parsed from CLI arguments and environment variables. #[derive(Parser, Debug)] #[command( name = "ramjet", version, about = "ATProtocol event-stream, record, and blob service" )] pub struct CliArgs { /// Path to the fjall database directory. #[arg(long, env = "RAMJET_DB_PATH", default_value = "./data/ramjet.db")] pub db_path: PathBuf, /// Upstream relay WebSocket host. #[arg(long, env = "RAMJET_RELAY_HOST", default_value = "bsky.network")] pub relay_host: String, /// HTTP bind address. #[arg(long, env = "RAMJET_LISTEN_ADDR", default_value = "0.0.0.0:8080")] pub listen_addr: SocketAddr, /// Space-separated collection patterns to persist (e.g., `"garden.lexicon.** com.atproto.lexicon.**"`). #[arg(long, env = "RAMJET_TRACKED_COLLECTIONS", default_value = "*")] pub tracked_collections: String, /// Space-separated collection patterns to forward as low-priority events. /// `"*"` forwards everything not already tracked (default). /// `""` forwards nothing. #[arg(long, env = "RAMJET_FORWARD_COLLECTIONS", default_value = "*")] pub forward_collections: String, /// Event retention window in hours. #[arg(long, env = "RAMJET_EVENT_RETENTION_HOURS", default_value = "72")] pub event_retention_hours: u64, /// Maximum events per WriteBatch. #[arg(long, env = "RAMJET_BATCH_SIZE", default_value = "500")] pub batch_size: usize, /// Maximum wait time (ms) for batch fill before flushing. #[arg(long, env = "RAMJET_BATCH_TIMEOUT_MS", default_value = "100")] pub batch_timeout_ms: u64, /// Comma-separated list of admin DIDs. #[arg(long, env = "ADMIN_DIDS", default_value = "")] pub admin_dids: String, /// Path to a zstd dictionary file for event compression. /// When absent, events are stored uncompressed. #[arg(long, env = "RAMJET_ZSTD_DICT_PATH")] pub zstd_dict_path: Option, /// Comma-separated list of DIDs to backfill on startup (skips already-backfilled repos). #[arg(long, env = "RAMJET_BACKFILL", default_value = "")] pub backfill: String, /// Consumer group definitions (repeatable). Format: `name:partition_count`. /// Example: `--consumer-group indexers:3 --consumer-group notifiers:2` #[arg(long, env = "RAMJET_CONSUMER_GROUPS", value_delimiter = ',')] pub consumer_group: Vec, /// QUIC bind address for RIBLT set reconciliation. Server only starts if provided. #[arg(long, env = "RAMJET_QUIC_LISTEN_ADDR")] pub quic_listen_addr: Option, } /// A pre-defined consumer group with a fixed number of partitions. #[derive(Debug, Clone)] pub struct ConsumerGroup { /// Group name (e.g., "indexers"). pub name: String, /// Number of partitions. Must be >= 1. pub partition_count: u16, } impl ConsumerGroup { /// Parse a `name:count` string into a `ConsumerGroup`. pub fn parse(s: &str) -> Option { let (name, count_str) = s.split_once(':')?; let name = name.trim(); let count: u16 = count_str.trim().parse().ok()?; if name.is_empty() || count == 0 { return None; } Some(Self { name: name.to_string(), partition_count: count, }) } } /// Resolved service configuration with parsed matchers and sets. pub struct ServiceConfig { /// Path to the fjall database directory. pub db_path: PathBuf, /// Upstream relay WebSocket host. pub relay_host: String, /// HTTP bind address. pub listen_addr: SocketAddr, /// Matcher for collections to persist. pub tracked_collections: CollectionMatcher, /// Matcher for collections to forward as low-priority. pub forward_collections: CollectionMatcher, /// Event retention window in hours. pub event_retention_hours: u64, /// Maximum events per WriteBatch. pub batch_size: usize, /// Maximum wait time (ms) for batch fill. pub batch_timeout_ms: u64, /// Set of admin DIDs. pub admin_dids: HashSet, /// Optional path to a zstd dictionary file for event compression. pub zstd_dict_path: Option, /// DIDs to backfill on startup. pub backfill_dids: Vec, /// Pre-defined consumer groups. pub consumer_groups: Vec, /// QUIC bind address for RIBLT reconciliation (None = disabled). pub quic_listen_addr: Option, } impl From for ServiceConfig { fn from(args: CliArgs) -> Self { let admin_dids: HashSet = args .admin_dids .split(',') .map(|s| s.trim().to_string()) .filter(|s| !s.is_empty()) .collect(); let backfill_dids: Vec = args .backfill .split(',') .map(|s| s.trim().to_string()) .filter(|s| !s.is_empty()) .collect(); let consumer_groups: Vec = args .consumer_group .iter() .filter_map(|s| { let parsed = ConsumerGroup::parse(s); if parsed.is_none() { tracing::warn!(value = %s, "ignoring invalid --consumer-group value, expected name:count"); } parsed }) .collect(); Self { db_path: args.db_path, relay_host: args.relay_host, listen_addr: args.listen_addr, tracked_collections: CollectionMatcher::new(&args.tracked_collections), forward_collections: CollectionMatcher::new(&args.forward_collections), event_retention_hours: args.event_retention_hours, batch_size: args.batch_size, batch_timeout_ms: args.batch_timeout_ms, admin_dids, zstd_dict_path: args.zstd_dict_path, backfill_dids, consumer_groups, quic_listen_addr: args.quic_listen_addr, } } } #[cfg(test)] mod tests { use super::*; #[test] fn test_exact_match() { let p = CollectionPattern::parse("com.example"); assert!(p.matches("com.example")); assert!(!p.matches("com.example.foo")); assert!(!p.matches("com")); } #[test] fn test_single_wildcard() { let p = CollectionPattern::parse("com.example.*"); assert!(p.matches("com.example.foo")); assert!(p.matches("com.example.bar")); assert!(!p.matches("com.example")); assert!(!p.matches("com.example.foo.baz")); assert!(!p.matches("com")); } #[test] fn test_glob_wildcard() { let p = CollectionPattern::parse("com.example.**"); assert!(p.matches("com.example.foo")); assert!(p.matches("com.example.bar")); assert!(p.matches("com.example.foo.bar")); assert!(p.matches("com.example.bar.baz.qux")); assert!(!p.matches("com.example")); assert!(!p.matches("com")); } #[test] fn test_match_all() { let p = CollectionPattern::parse("*"); assert!(p.matches("anything")); assert!(p.matches("com.example.foo")); } #[test] fn test_none() { let p = CollectionPattern::parse(""); assert!(!p.matches("anything")); } #[test] fn test_matcher_multiple_patterns() { let m = CollectionMatcher::new("garden.lexicon.** com.atproto.lexicon.**"); assert!(m.matches("garden.lexicon.foo")); assert!(m.matches("garden.lexicon.foo.bar")); assert!(m.matches("com.atproto.lexicon.def")); assert!(!m.matches("app.bsky.feed.post")); assert!(!m.matches("garden.lexicon")); } #[test] fn test_matcher_star() { let m = CollectionMatcher::new("*"); assert!(m.matches("anything")); } #[test] fn test_matcher_empty() { let m = CollectionMatcher::new(""); assert!(!m.matches("anything")); } #[test] fn test_admin_dids_parsing() { let args = CliArgs { db_path: PathBuf::from("/tmp"), relay_host: "bsky.network".to_string(), listen_addr: "0.0.0.0:8080".parse().unwrap(), tracked_collections: "*".to_string(), forward_collections: "*".to_string(), event_retention_hours: 72, batch_size: 500, batch_timeout_ms: 100, admin_dids: "did:plc:abc, did:plc:def".to_string(), zstd_dict_path: None, backfill: String::new(), consumer_group: Vec::new(), quic_listen_addr: None, }; let config = ServiceConfig::from(args); assert!(config.admin_dids.contains("did:plc:abc")); assert!(config.admin_dids.contains("did:plc:def")); assert_eq!(config.admin_dids.len(), 2); } }