Ramjet is a relay consumer that supports configurable forward and track collections, as well as record reconciliation.
event-stream
relay
firehose
riblt
atprotocol
1//! Configuration types for the Ramjet service.
2//!
3//! Includes CLI argument parsing via clap, collection pattern matching
4//! for routing firehose events, and service-level configuration.
5
6use std::collections::HashSet;
7use std::net::SocketAddr;
8use std::path::PathBuf;
9
10use clap::Parser;
11
12/// Collection pattern for matching ATProtocol NSIDs.
13///
14/// Supports exact match, single-segment wildcard (`.*`), and
15/// glob wildcard (`.**`) patterns.
16#[derive(Debug, Clone)]
17pub enum CollectionPattern {
18 /// Empty pattern — matches nothing.
19 None,
20 /// Wildcard `*` — matches every collection.
21 MatchAll,
22 /// Exact NSID match (e.g., `com.example`).
23 Exact(String),
24 /// Single-segment wildcard (e.g., `com.example.*` matches `com.example.foo`).
25 SingleWild(String),
26 /// Glob wildcard (e.g., `com.example.**` matches `com.example.foo.bar`).
27 GlobWild(String),
28}
29
30impl CollectionPattern {
31 /// Parse a pattern string into a `CollectionPattern`.
32 pub fn parse(pattern: &str) -> Self {
33 let trimmed = pattern.trim();
34 if trimmed.is_empty() {
35 CollectionPattern::None
36 } else if trimmed == "*" {
37 CollectionPattern::MatchAll
38 } else if let Some(prefix) = trimmed.strip_suffix(".**") {
39 CollectionPattern::GlobWild(format!("{}.", prefix))
40 } else if let Some(prefix) = trimmed.strip_suffix(".*") {
41 CollectionPattern::SingleWild(format!("{}.", prefix))
42 } else {
43 CollectionPattern::Exact(trimmed.to_string())
44 }
45 }
46
47 /// Test whether a collection NSID matches this pattern.
48 pub fn matches(&self, collection: &str) -> bool {
49 match self {
50 CollectionPattern::None => false,
51 CollectionPattern::MatchAll => true,
52 CollectionPattern::Exact(expected) => collection == expected,
53 CollectionPattern::SingleWild(prefix) => {
54 collection.starts_with(prefix.as_str())
55 && !collection[prefix.len()..].contains('.')
56 && collection.len() > prefix.len()
57 }
58 CollectionPattern::GlobWild(prefix) => {
59 collection.starts_with(prefix.as_str()) && collection.len() > prefix.len()
60 }
61 }
62 }
63}
64
65/// A set of collection patterns evaluated with OR semantics.
66#[derive(Debug, Clone)]
67pub struct CollectionMatcher {
68 patterns: Vec<CollectionPattern>,
69}
70
71impl CollectionMatcher {
72 /// Create a matcher from a space-separated pattern string.
73 ///
74 /// - `"*"` matches everything
75 /// - `""` matches nothing
76 /// - `"app.bsky.** community.lexicon.**"` matches multiple prefixes
77 pub fn new(pattern_str: &str) -> Self {
78 let trimmed = pattern_str.trim();
79 if trimmed.is_empty() {
80 return Self {
81 patterns: vec![CollectionPattern::None],
82 };
83 }
84 Self {
85 patterns: trimmed
86 .split_whitespace()
87 .map(CollectionPattern::parse)
88 .collect(),
89 }
90 }
91
92 /// Test whether a collection NSID matches any pattern in this matcher.
93 pub fn matches(&self, collection: &str) -> bool {
94 self.patterns.iter().any(|p| p.matches(collection))
95 }
96}
97
98/// Ramjet service configuration, parsed from CLI arguments and environment variables.
99#[derive(Parser, Debug)]
100#[command(
101 name = "ramjet",
102 version,
103 about = "ATProtocol event-stream, record, and blob service"
104)]
105pub struct CliArgs {
106 /// Path to the fjall database directory.
107 #[arg(long, env = "RAMJET_DB_PATH", default_value = "./data/ramjet.db")]
108 pub db_path: PathBuf,
109
110 /// Upstream relay WebSocket host.
111 #[arg(long, env = "RAMJET_RELAY_HOST", default_value = "bsky.network")]
112 pub relay_host: String,
113
114 /// HTTP bind address.
115 #[arg(long, env = "RAMJET_LISTEN_ADDR", default_value = "0.0.0.0:8080")]
116 pub listen_addr: SocketAddr,
117
118 /// Space-separated collection patterns to persist (e.g., `"garden.lexicon.** com.atproto.lexicon.**"`).
119 #[arg(long, env = "RAMJET_TRACKED_COLLECTIONS", default_value = "*")]
120 pub tracked_collections: String,
121
122 /// Space-separated collection patterns to forward as low-priority events.
123 /// `"*"` forwards everything not already tracked (default).
124 /// `""` forwards nothing.
125 #[arg(long, env = "RAMJET_FORWARD_COLLECTIONS", default_value = "*")]
126 pub forward_collections: String,
127
128 /// Event retention window in hours.
129 #[arg(long, env = "RAMJET_EVENT_RETENTION_HOURS", default_value = "72")]
130 pub event_retention_hours: u64,
131
132 /// Maximum events per WriteBatch.
133 #[arg(long, env = "RAMJET_BATCH_SIZE", default_value = "500")]
134 pub batch_size: usize,
135
136 /// Maximum wait time (ms) for batch fill before flushing.
137 #[arg(long, env = "RAMJET_BATCH_TIMEOUT_MS", default_value = "100")]
138 pub batch_timeout_ms: u64,
139
140 /// Comma-separated list of admin DIDs.
141 #[arg(long, env = "ADMIN_DIDS", default_value = "")]
142 pub admin_dids: String,
143
144 /// Path to a zstd dictionary file for event compression.
145 /// When absent, events are stored uncompressed.
146 #[arg(long, env = "RAMJET_ZSTD_DICT_PATH")]
147 pub zstd_dict_path: Option<PathBuf>,
148
149 /// Comma-separated list of DIDs to backfill on startup (skips already-backfilled repos).
150 #[arg(long, env = "RAMJET_BACKFILL", default_value = "")]
151 pub backfill: String,
152
153 /// Consumer group definitions (repeatable). Format: `name:partition_count`.
154 /// Example: `--consumer-group indexers:3 --consumer-group notifiers:2`
155 #[arg(long, env = "RAMJET_CONSUMER_GROUPS", value_delimiter = ',')]
156 pub consumer_group: Vec<String>,
157
158 /// QUIC bind address for RIBLT set reconciliation. Server only starts if provided.
159 #[arg(long, env = "RAMJET_QUIC_LISTEN_ADDR")]
160 pub quic_listen_addr: Option<SocketAddr>,
161}
162
163/// A pre-defined consumer group with a fixed number of partitions.
164#[derive(Debug, Clone)]
165pub struct ConsumerGroup {
166 /// Group name (e.g., "indexers").
167 pub name: String,
168 /// Number of partitions. Must be >= 1.
169 pub partition_count: u16,
170}
171
172impl ConsumerGroup {
173 /// Parse a `name:count` string into a `ConsumerGroup`.
174 pub fn parse(s: &str) -> Option<Self> {
175 let (name, count_str) = s.split_once(':')?;
176 let name = name.trim();
177 let count: u16 = count_str.trim().parse().ok()?;
178 if name.is_empty() || count == 0 {
179 return None;
180 }
181 Some(Self {
182 name: name.to_string(),
183 partition_count: count,
184 })
185 }
186}
187
188/// Resolved service configuration with parsed matchers and sets.
189pub struct ServiceConfig {
190 /// Path to the fjall database directory.
191 pub db_path: PathBuf,
192 /// Upstream relay WebSocket host.
193 pub relay_host: String,
194 /// HTTP bind address.
195 pub listen_addr: SocketAddr,
196 /// Matcher for collections to persist.
197 pub tracked_collections: CollectionMatcher,
198 /// Matcher for collections to forward as low-priority.
199 pub forward_collections: CollectionMatcher,
200 /// Event retention window in hours.
201 pub event_retention_hours: u64,
202 /// Maximum events per WriteBatch.
203 pub batch_size: usize,
204 /// Maximum wait time (ms) for batch fill.
205 pub batch_timeout_ms: u64,
206 /// Set of admin DIDs.
207 pub admin_dids: HashSet<String>,
208 /// Optional path to a zstd dictionary file for event compression.
209 pub zstd_dict_path: Option<PathBuf>,
210 /// DIDs to backfill on startup.
211 pub backfill_dids: Vec<String>,
212 /// Pre-defined consumer groups.
213 pub consumer_groups: Vec<ConsumerGroup>,
214 /// QUIC bind address for RIBLT reconciliation (None = disabled).
215 pub quic_listen_addr: Option<SocketAddr>,
216}
217
218impl From<CliArgs> for ServiceConfig {
219 fn from(args: CliArgs) -> Self {
220 let admin_dids: HashSet<String> = args
221 .admin_dids
222 .split(',')
223 .map(|s| s.trim().to_string())
224 .filter(|s| !s.is_empty())
225 .collect();
226
227 let backfill_dids: Vec<String> = args
228 .backfill
229 .split(',')
230 .map(|s| s.trim().to_string())
231 .filter(|s| !s.is_empty())
232 .collect();
233
234 let consumer_groups: Vec<ConsumerGroup> = args
235 .consumer_group
236 .iter()
237 .filter_map(|s| {
238 let parsed = ConsumerGroup::parse(s);
239 if parsed.is_none() {
240 tracing::warn!(value = %s, "ignoring invalid --consumer-group value, expected name:count");
241 }
242 parsed
243 })
244 .collect();
245
246 Self {
247 db_path: args.db_path,
248 relay_host: args.relay_host,
249 listen_addr: args.listen_addr,
250 tracked_collections: CollectionMatcher::new(&args.tracked_collections),
251 forward_collections: CollectionMatcher::new(&args.forward_collections),
252 event_retention_hours: args.event_retention_hours,
253 batch_size: args.batch_size,
254 batch_timeout_ms: args.batch_timeout_ms,
255 admin_dids,
256 zstd_dict_path: args.zstd_dict_path,
257 backfill_dids,
258 consumer_groups,
259 quic_listen_addr: args.quic_listen_addr,
260 }
261 }
262}
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267
268 #[test]
269 fn test_exact_match() {
270 let p = CollectionPattern::parse("com.example");
271 assert!(p.matches("com.example"));
272 assert!(!p.matches("com.example.foo"));
273 assert!(!p.matches("com"));
274 }
275
276 #[test]
277 fn test_single_wildcard() {
278 let p = CollectionPattern::parse("com.example.*");
279 assert!(p.matches("com.example.foo"));
280 assert!(p.matches("com.example.bar"));
281 assert!(!p.matches("com.example"));
282 assert!(!p.matches("com.example.foo.baz"));
283 assert!(!p.matches("com"));
284 }
285
286 #[test]
287 fn test_glob_wildcard() {
288 let p = CollectionPattern::parse("com.example.**");
289 assert!(p.matches("com.example.foo"));
290 assert!(p.matches("com.example.bar"));
291 assert!(p.matches("com.example.foo.bar"));
292 assert!(p.matches("com.example.bar.baz.qux"));
293 assert!(!p.matches("com.example"));
294 assert!(!p.matches("com"));
295 }
296
297 #[test]
298 fn test_match_all() {
299 let p = CollectionPattern::parse("*");
300 assert!(p.matches("anything"));
301 assert!(p.matches("com.example.foo"));
302 }
303
304 #[test]
305 fn test_none() {
306 let p = CollectionPattern::parse("");
307 assert!(!p.matches("anything"));
308 }
309
310 #[test]
311 fn test_matcher_multiple_patterns() {
312 let m = CollectionMatcher::new("garden.lexicon.** com.atproto.lexicon.**");
313 assert!(m.matches("garden.lexicon.foo"));
314 assert!(m.matches("garden.lexicon.foo.bar"));
315 assert!(m.matches("com.atproto.lexicon.def"));
316 assert!(!m.matches("app.bsky.feed.post"));
317 assert!(!m.matches("garden.lexicon"));
318 }
319
320 #[test]
321 fn test_matcher_star() {
322 let m = CollectionMatcher::new("*");
323 assert!(m.matches("anything"));
324 }
325
326 #[test]
327 fn test_matcher_empty() {
328 let m = CollectionMatcher::new("");
329 assert!(!m.matches("anything"));
330 }
331
332 #[test]
333 fn test_admin_dids_parsing() {
334 let args = CliArgs {
335 db_path: PathBuf::from("/tmp"),
336 relay_host: "bsky.network".to_string(),
337 listen_addr: "0.0.0.0:8080".parse().unwrap(),
338 tracked_collections: "*".to_string(),
339 forward_collections: "*".to_string(),
340 event_retention_hours: 72,
341 batch_size: 500,
342 batch_timeout_ms: 100,
343 admin_dids: "did:plc:abc, did:plc:def".to_string(),
344 zstd_dict_path: None,
345 backfill: String::new(),
346 consumer_group: Vec::new(),
347 quic_listen_addr: None,
348 };
349 let config = ServiceConfig::from(args);
350 assert!(config.admin_dids.contains("did:plc:abc"));
351 assert!(config.admin_dids.contains("did:plc:def"));
352 assert_eq!(config.admin_dids.len(), 2);
353 }
354}