···11+use crate::LinkEvent;
22+use jetstream::{
33+ DefaultJetstreamEndpoints, JetstreamCompression, JetstreamConfig, JetstreamConnector,
44+ events::{CommitOp, Cursor, EventKind},
55+};
66+use links::collect_links;
77+use std::error::Error;
88+use tokio::sync::broadcast;
99+1010+const MAX_LINKS_PER_EVENT: usize = 100;
1111+1212+pub async fn consume(
1313+ b: broadcast::Sender<LinkEvent>,
1414+ jetstream_endpoint: &str,
1515+ cursor: Option<Cursor>,
1616+ no_zstd: bool,
1717+) -> Result<(), Box<dyn Error>> {
1818+ let endpoint = DefaultJetstreamEndpoints::endpoint_or_shortcut(jetstream_endpoint);
1919+ if endpoint == jetstream_endpoint {
2020+ std::println!("connecting to jetstream at {endpoint}");
2121+ } else {
2222+ std::println!("connecting to jetstream at {jetstream_endpoint} => {endpoint}");
2323+ }
2424+ let config: JetstreamConfig = JetstreamConfig {
2525+ endpoint,
2626+ compression: if no_zstd {
2727+ JetstreamCompression::None
2828+ } else {
2929+ JetstreamCompression::Zstd
3030+ },
3131+ replay_on_reconnect: true,
3232+ channel_size: 1024, // buffer up to ~1s of jetstream events
3333+ ..Default::default()
3434+ };
3535+ let mut receiver = JetstreamConnector::new(config)?
3636+ .connect_cursor(cursor)
3737+ .await?;
3838+3939+ while let Some(event) = receiver.recv().await {
4040+ if event.kind != EventKind::Commit {
4141+ continue;
4242+ }
4343+ let Some(commit) = event.commit else {
4444+ eprintln!("jetstream commit event missing commit data, ignoring");
4545+ continue;
4646+ };
4747+4848+ // TODO: keep a buffer and remove quick deletes to debounce notifs
4949+ // for now we just drop all deletes eek
5050+ if commit.operation == CommitOp::Delete {
5151+ continue;
5252+ }
5353+ let Some(record) = commit.record else {
5454+ eprintln!("jetstream commit update/delete missing record, ignoring");
5555+ continue;
5656+ };
5757+5858+ let jv = record.get().parse()?;
5959+6060+ // todo: indicate if the link limit was reached (-> links omitted)
6161+ for (i, link) in collect_links(&jv).into_iter().enumerate() {
6262+ if i >= MAX_LINKS_PER_EVENT {
6363+ eprintln!("jetstream event has too many links, ignoring the rest");
6464+ break;
6565+ }
6666+ let link_ev = LinkEvent {
6767+ collection: commit.collection.to_string(),
6868+ path: link.path,
6969+ origin: format!(
7070+ "at://{}/{}/{}",
7171+ &*event.did,
7272+ &*commit.collection,
7373+ &*commit.rkey,
7474+ ),
7575+ target: link.target.into_string(),
7676+ };
7777+ let _ = b.send(link_ev); // only errors if no subscribers are connected, which is just fine.
7878+ }
7979+ }
8080+8181+ Err("jetstream consumer ended".into())
8282+}
+31
spacedust/src/lib.rs
···11+pub mod consumer;
22+pub mod server;
33+44+use serde::Serialize;
55+66+#[derive(Debug, Clone)]
77+pub struct LinkEvent {
88+ collection: String,
99+ path: String,
1010+ origin: String,
1111+ target: String,
1212+}
1313+1414+#[derive(Debug, Serialize)]
1515+struct ClientEvent {
1616+ source: String,
1717+ origin: String,
1818+ target: String,
1919+ // TODO: include the record too? would save clients a level of hydration
2020+}
2121+2222+impl From<LinkEvent> for ClientEvent {
2323+ fn from(link: LinkEvent) -> Self {
2424+ let undotted = link.path.get(1..).unwrap_or("");
2525+ Self {
2626+ source: format!("{}:{undotted}", link.collection),
2727+ origin: link.origin,
2828+ target: link.target,
2929+ }
3030+ }
3131+}
+54
spacedust/src/main.rs
···11+use spacedust::consumer;
22+use spacedust::server;
33+44+use clap::Parser;
55+use tokio::sync::broadcast;
66+77+/// Aggregate links in the at-mosphere
88+#[derive(Parser, Debug, Clone)]
99+#[command(version, about, long_about = None)]
1010+struct Args {
1111+ /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:
1212+ /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2'
1313+ #[arg(long)]
1414+ jetstream: String,
1515+ /// don't request zstd-compressed jetstream events
1616+ ///
1717+ /// reduces CPU at the expense of more ingress bandwidth
1818+ #[arg(long, action)]
1919+ jetstream_no_zstd: bool,
2020+}
2121+2222+#[tokio::main]
2323+async fn main() -> Result<(), String> {
2424+ let args = Args::parse();
2525+2626+ // tokio broadcast keeps a single main output queue for all subscribers.
2727+ // each subscriber clones off a copy of an individual value for each recv.
2828+ // since there's no large per-client buffer, we can make this one kind of
2929+ // big and accommodate more slow/bursty clients.
3030+ //
3131+ // in fact, we *could* even keep lagging clients alive, inserting lag-
3232+ // indicating messages to their output.... but for now we'll drop them to
3333+ // avoid accumulating zombies.
3434+ //
3535+ // events on the channel are individual links as they are discovered. a link
3636+ // contains a source and a target. the target is an at-uri, so it's up to
3737+ // ~1KB max; source is a collection + link path, which can be more but in
3838+ // practice the whole link rarely approaches 1KB total.
3939+ //
4040+ // TODO: determine if a pathological case could blow this up (eg 1MB link
4141+ // paths + slow subscriber -> 16GiB queue)
4242+ let (b, _) = broadcast::channel(16_384);
4343+4444+ let consuming = consumer::consume(b.clone(), &args.jetstream, None, args.jetstream_no_zstd);
4545+4646+ let serving = server::serve(b);
4747+4848+ tokio::select! {
4949+ e = serving => eprintln!("serving failed: {e:?}"),
5050+ e = consuming => eprintln!("consuming failed: {e:?}"),
5151+ };
5252+5353+ Ok(())
5454+}