···11+pub mod consumer;
22+pub mod delay;
33+pub mod error;
44+pub mod server;
55+pub mod subscriber;
66+pub mod removable_delay_queue;
77+88+use serde::Serialize;
99+1010+#[derive(Debug, Clone)]
1111+pub struct LinkEvent {
1212+ collection: String,
1313+ path: String,
1414+ origin: String,
1515+ target: String,
1616+ rev: String,
1717+}
1818+1919+#[derive(Debug, Serialize)]
2020+#[serde(rename_all="snake_case")]
2121+pub struct ClientEvent {
2222+ kind: String, // "link"
2323+ origin: String, // "live", "replay", "backfill"
2424+ link: ClientLinkEvent,
2525+}
2626+2727+#[derive(Debug, Serialize)]
2828+struct ClientLinkEvent {
2929+ operation: String, // "create", "delete" (prob no update, though maybe for rev?)
3030+ source: String,
3131+ source_record: String,
3232+ source_rev: String,
3333+ subject: String,
3434+ // TODO: include the record too? would save clients a level of hydration
3535+}
3636+3737+impl From<LinkEvent> for ClientLinkEvent {
3838+ fn from(link: LinkEvent) -> Self {
3939+ let undotted = link.path.strip_prefix('.').unwrap_or_else(|| {
4040+ eprintln!("link path did not have expected '.' prefix: {}", link.path);
4141+ ""
4242+ });
4343+ Self {
4444+ operation: "create".to_string(),
4545+ source: format!("{}:{undotted}", link.collection),
4646+ source_record: link.origin,
4747+ source_rev: link.rev,
4848+ subject: link.target,
4949+ }
5050+ }
5151+}
+139
spacedust/src/main.rs
···11+use spacedust::error::MainTaskError;
22+use spacedust::consumer;
33+use spacedust::server;
44+use spacedust::delay;
55+use spacedust::removable_delay_queue::removable_delay_queue;
66+77+use clap::Parser;
88+use metrics_exporter_prometheus::PrometheusBuilder;
99+use tokio::sync::broadcast;
1010+use tokio_util::sync::CancellationToken;
1111+use std::time::Duration;
1212+1313+/// Aggregate links in the at-mosphere
1414+#[derive(Parser, Debug, Clone)]
1515+#[command(version, about, long_about = None)]
1616+struct Args {
1717+ /// Jetstream server to connect to (exclusive with --fixture). Provide either a wss:// URL, or a shorhand value:
1818+ /// 'us-east-1', 'us-east-2', 'us-west-1', or 'us-west-2'
1919+ #[arg(long)]
2020+ jetstream: String,
2121+ /// don't request zstd-compressed jetstream events
2222+ ///
2323+ /// reduces CPU at the expense of more ingress bandwidth
2424+ #[arg(long, action)]
2525+ jetstream_no_zstd: bool,
2626+}
2727+2828+#[tokio::main]
2929+async fn main() -> Result<(), String> {
3030+ env_logger::init();
3131+3232+ // tokio broadcast keeps a single main output queue for all subscribers.
3333+ // each subscriber clones off a copy of an individual value for each recv.
3434+ // since there's no large per-client buffer, we can make this one kind of
3535+ // big and accommodate more slow/bursty clients.
3636+ //
3737+ // in fact, we *could* even keep lagging clients alive, inserting lag-
3838+ // indicating messages to their output.... but for now we'll drop them to
3939+ // avoid accumulating zombies.
4040+ //
4141+ // events on the channel are individual links as they are discovered. a link
4242+ // contains a source and a target. the target is an at-uri, so it's up to
4343+ // ~1KB max; source is a collection + link path, which can be more but in
4444+ // practice the whole link rarely approaches 1KB total.
4545+ //
4646+ // TODO: determine if a pathological case could blow this up (eg 1MB link
4747+ // paths + slow subscriber -> 16GiB queue)
4848+ let (b, _) = broadcast::channel(16_384);
4949+ let consumer_sender = b.clone();
5050+ let (d, _) = broadcast::channel(16_384);
5151+ let consumer_delayed_sender = d.clone();
5252+5353+ let delay = Duration::from_secs(21);
5454+ let (delay_queue_sender, delay_queue_receiver) = removable_delay_queue(delay);
5555+5656+ let shutdown = CancellationToken::new();
5757+5858+ let ctrlc_shutdown = shutdown.clone();
5959+ ctrlc::set_handler(move || ctrlc_shutdown.cancel()).expect("failed to set ctrl-c handler");
6060+6161+ let args = Args::parse();
6262+6363+ if let Err(e) = install_metrics_server() {
6464+ log::error!("failed to install metrics server: {e:?}");
6565+ };
6666+6767+ let mut tasks: tokio::task::JoinSet<Result<(), MainTaskError>> = tokio::task::JoinSet::new();
6868+6969+ let server_shutdown = shutdown.clone();
7070+ tasks.spawn(async move {
7171+ server::serve(b, d, server_shutdown).await?;
7272+ Ok(())
7373+ });
7474+7575+ let consumer_shutdown = shutdown.clone();
7676+ tasks.spawn(async move {
7777+ consumer::consume(
7878+ consumer_sender,
7979+ delay_queue_sender,
8080+ args.jetstream,
8181+ None,
8282+ args.jetstream_no_zstd,
8383+ consumer_shutdown
8484+ )
8585+ .await?;
8686+ Ok(())
8787+ });
8888+8989+ let delay_shutdown = shutdown.clone();
9090+ tasks.spawn(async move {
9191+ delay::to_broadcast(delay_queue_receiver, consumer_delayed_sender, delay_shutdown).await?;
9292+ Ok(())
9393+ });
9494+9595+ tokio::select! {
9696+ _ = shutdown.cancelled() => log::warn!("shutdown requested"),
9797+ Some(r) = tasks.join_next() => {
9898+ log::warn!("a task exited, shutting down: {r:?}");
9999+ shutdown.cancel();
100100+ }
101101+ }
102102+103103+ tokio::select! {
104104+ _ = async {
105105+ while let Some(completed) = tasks.join_next().await {
106106+ log::info!("shutdown: task completed: {completed:?}");
107107+ }
108108+ } => {},
109109+ _ = tokio::time::sleep(std::time::Duration::from_secs(3)) => {
110110+ log::info!("shutdown: not all tasks completed on time. aborting...");
111111+ tasks.shutdown().await;
112112+ },
113113+ }
114114+115115+ log::info!("bye!");
116116+117117+ Ok(())
118118+}
119119+120120+fn install_metrics_server() -> Result<(), metrics_exporter_prometheus::BuildError> {
121121+ log::info!("installing metrics server...");
122122+ let host = [0, 0, 0, 0];
123123+ let port = 8765;
124124+ PrometheusBuilder::new()
125125+ .set_quantiles(&[0.5, 0.9, 0.99, 1.0])?
126126+ .set_bucket_duration(std::time::Duration::from_secs(300))?
127127+ .set_bucket_count(std::num::NonZero::new(12).unwrap()) // count * duration = 60 mins. stuff doesn't happen that fast here.
128128+ .set_enable_unit_suffix(false) // this seemed buggy for constellation (sometimes wouldn't engage)
129129+ .with_http_listener((host, port))
130130+ .install()?;
131131+ log::info!(
132132+ "metrics server installed! listening on http://{}.{}.{}.{}:{port}",
133133+ host[0],
134134+ host[1],
135135+ host[2],
136136+ host[3]
137137+ );
138138+ Ok(())
139139+}
+121
spacedust/src/removable_delay_queue.rs
···11+use std::ops::RangeBounds;
22+use std::collections::{BTreeMap, VecDeque};
33+use std::time::{Duration, Instant};
44+use tokio::sync::Mutex;
55+use std::sync::Arc;
66+use thiserror::Error;
77+88+#[derive(Debug, Error)]
99+pub enum EnqueueError<T> {
1010+ #[error("queue ouput dropped")]
1111+ OutputDropped(T),
1212+}
1313+1414+pub trait Key: Eq + Ord + Clone {}
1515+impl<T: Eq + Ord + Clone> Key for T {}
1616+1717+#[derive(Debug)]
1818+struct Queue<K: Key, T> {
1919+ queue: VecDeque<(Instant, K)>,
2020+ items: BTreeMap<K, T>
2121+}
2222+2323+pub struct Input<K: Key, T> {
2424+ q: Arc<Mutex<Queue<K, T>>>,
2525+}
2626+2727+impl<K: Key, T> Input<K, T> {
2828+ /// if a key is already present, its previous item will be overwritten and
2929+ /// its delay time will be reset for the new item.
3030+ ///
3131+ /// errors if the remover has been dropped
3232+ pub async fn enqueue(&self, key: K, item: T) -> Result<(), EnqueueError<T>> {
3333+ if Arc::strong_count(&self.q) == 1 {
3434+ return Err(EnqueueError::OutputDropped(item));
3535+ }
3636+ // TODO: try to push out an old element first
3737+ // for now we just hope there's a listener
3838+ let now = Instant::now();
3939+ let mut q = self.q.lock().await;
4040+ q.queue.push_back((now, key.clone()));
4141+ q.items.insert(key, item);
4242+ Ok(())
4343+ }
4444+ /// remove an item from the queue, by key
4545+ ///
4646+ /// the item itself is removed, but the key will remain in the queue -- it
4747+ /// will simply be skipped over when a new output item is requested. this
4848+ /// keeps the removal cheap (=btreemap remove), for a bit of space overhead
4949+ pub async fn remove_range(&self, range: impl RangeBounds<K>) {
5050+ let n = {
5151+ let mut q = self.q.lock().await;
5252+ let keys = q.items.range(range).map(|(k, _)| k).cloned().collect::<Vec<_>>();
5353+ for k in &keys {
5454+ q.items.remove(k);
5555+ }
5656+ keys.len()
5757+ };
5858+ if n == 0 {
5959+ metrics::counter!("delay_queue_remove_not_found").increment(1);
6060+ } else {
6161+ metrics::counter!("delay_queue_remove_total_records").increment(1);
6262+ metrics::counter!("delay_queue_remove_total_links").increment(n as u64);
6363+ }
6464+ }
6565+}
6666+6767+pub struct Output<K: Key, T> {
6868+ delay: Duration,
6969+ q: Arc<Mutex<Queue<K, T>>>,
7070+}
7171+7272+impl<K: Key, T> Output<K, T> {
7373+ pub async fn next(&self) -> Option<T> {
7474+ let get = || async {
7575+ let mut q = self.q.lock().await;
7676+ metrics::gauge!("delay_queue_queue_len").set(q.queue.len() as f64);
7777+ metrics::gauge!("delay_queue_queue_capacity").set(q.queue.capacity() as f64);
7878+ while let Some((t, k)) = q.queue.pop_front() {
7979+ // skip over queued keys that were removed from items
8080+ if let Some(item) = q.items.remove(&k) {
8181+ return Some((t, item));
8282+ }
8383+ }
8484+ None
8585+ };
8686+ loop {
8787+ if let Some((t, item)) = get().await {
8888+ let now = Instant::now();
8989+ let expected_release = t + self.delay;
9090+ if expected_release.saturating_duration_since(now) > Duration::from_millis(1) {
9191+ tokio::time::sleep_until(expected_release.into()).await;
9292+ metrics::counter!("delay_queue_emit_total", "early" => "yes").increment(1);
9393+ metrics::histogram!("delay_queue_emit_overshoot").record(0);
9494+ } else {
9595+ let overshoot = now.saturating_duration_since(expected_release);
9696+ metrics::counter!("delay_queue_emit_total", "early" => "no").increment(1);
9797+ metrics::histogram!("delay_queue_emit_overshoot").record(overshoot.as_secs_f64());
9898+ }
9999+ return Some(item)
100100+ } else if Arc::strong_count(&self.q) == 1 {
101101+ return None;
102102+ }
103103+ // the queue is *empty*, so we need to wait at least as long as the current delay
104104+ tokio::time::sleep(self.delay).await;
105105+ metrics::counter!("delay_queue_entirely_empty_total").increment(1);
106106+ };
107107+ }
108108+}
109109+110110+pub fn removable_delay_queue<K: Key, T>(
111111+ delay: Duration,
112112+) -> (Input<K, T>, Output<K, T>) {
113113+ let q: Arc<Mutex<Queue<K, T>>> = Arc::new(Mutex::new(Queue {
114114+ queue: VecDeque::new(),
115115+ items: BTreeMap::new(),
116116+ }));
117117+118118+ let input = Input::<K, T> { q: q.clone() };
119119+ let output = Output::<K, T> { q, delay };
120120+ (input, output)
121121+}
+324
spacedust/src/server.rs
···11+use crate::error::ServerError;
22+use crate::subscriber::Subscriber;
33+use metrics::{histogram, counter};
44+use std::sync::Arc;
55+use crate::LinkEvent;
66+use http::{
77+ header::{ORIGIN, USER_AGENT},
88+ Response, StatusCode,
99+};
1010+use dropshot::{
1111+ Body,
1212+ ApiDescription, ConfigDropshot, ConfigLogging, ConfigLoggingLevel, Query, RequestContext,
1313+ ServerBuilder, WebsocketConnection, channel, endpoint, HttpResponse,
1414+ ApiEndpointBodyContentType, ExtractorMetadata, HttpError, ServerContext,
1515+ SharedExtractor,
1616+};
1717+1818+use schemars::JsonSchema;
1919+use serde::{Deserialize, Serialize};
2020+use tokio::sync::broadcast;
2121+use tokio::time::Instant;
2222+use tokio_tungstenite::tungstenite::protocol::Role;
2323+use tokio_util::sync::CancellationToken;
2424+use async_trait::async_trait;
2525+use std::collections::HashSet;
2626+2727+const INDEX_HTML: &str = include_str!("../static/index.html");
2828+const FAVICON: &[u8] = include_bytes!("../static/favicon.ico");
2929+3030+pub async fn serve(
3131+ b: broadcast::Sender<LinkEvent>,
3232+ d: broadcast::Sender<LinkEvent>,
3333+ shutdown: CancellationToken
3434+) -> Result<(), ServerError> {
3535+ let config_logging = ConfigLogging::StderrTerminal {
3636+ level: ConfigLoggingLevel::Info,
3737+ };
3838+3939+ let log = config_logging
4040+ .to_logger("example-basic")
4141+ .map_err(ServerError::ConfigLogError)?;
4242+4343+ let mut api = ApiDescription::new();
4444+ api.register(index).unwrap();
4545+ api.register(favicon).unwrap();
4646+ api.register(openapi).unwrap();
4747+ api.register(subscribe).unwrap();
4848+4949+ // TODO: put spec in a once cell / lazy lock thing?
5050+ let spec = Arc::new(
5151+ api.openapi(
5252+ "Spacedust",
5353+ env!("CARGO_PKG_VERSION")
5454+ .parse()
5555+ .inspect_err(|e| {
5656+ eprintln!("failed to parse cargo package version for openapi: {e:?}")
5757+ })
5858+ .unwrap_or(semver::Version::new(0, 0, 1)),
5959+ )
6060+ .description("A configurable ATProto notifications firehose.")
6161+ .contact_name("part of @microcosm.blue")
6262+ .contact_url("https://microcosm.blue")
6363+ .json()
6464+ .map_err(ServerError::OpenApiJsonFail)?,
6565+ );
6666+6767+ let sub_shutdown = shutdown.clone();
6868+ let ctx = Context { spec, b, d, shutdown: sub_shutdown };
6969+7070+ let server = ServerBuilder::new(api, ctx, log)
7171+ .config(ConfigDropshot {
7272+ bind_address: "0.0.0.0:9998".parse().unwrap(),
7373+ ..Default::default()
7474+ })
7575+ .start()?;
7676+7777+ tokio::select! {
7878+ s = server.wait_for_shutdown() => {
7979+ s.map_err(ServerError::ServerExited)?;
8080+ log::info!("server shut down normally.");
8181+ },
8282+ _ = shutdown.cancelled() => {
8383+ log::info!("shutting down: closing server");
8484+ server.close().await.map_err(ServerError::BadClose)?;
8585+ },
8686+ }
8787+ Ok(())
8888+}
8989+9090+#[derive(Debug, Clone)]
9191+struct Context {
9292+ pub spec: Arc<serde_json::Value>,
9393+ pub b: broadcast::Sender<LinkEvent>,
9494+ pub d: broadcast::Sender<LinkEvent>,
9595+ pub shutdown: CancellationToken,
9696+}
9797+9898+async fn instrument_handler<T, H, R>(ctx: &RequestContext<T>, handler: H) -> Result<R, HttpError>
9999+where
100100+ R: HttpResponse,
101101+ H: Future<Output = Result<R, HttpError>>,
102102+ T: ServerContext,
103103+{
104104+ let start = Instant::now();
105105+ let result = handler.await;
106106+ let latency = start.elapsed();
107107+ let status_code = match &result {
108108+ Ok(response) => response.status_code(),
109109+ Err(e) => e.status_code.as_status(),
110110+ }
111111+ .as_str() // just the number (.to_string()'s Display does eg `200 OK`)
112112+ .to_string();
113113+ let endpoint = ctx.endpoint.operation_id.clone();
114114+ let headers = ctx.request.headers();
115115+ let origin = headers
116116+ .get(ORIGIN)
117117+ .and_then(|v| v.to_str().ok())
118118+ .unwrap_or("")
119119+ .to_string();
120120+ let ua = headers
121121+ .get(USER_AGENT)
122122+ .and_then(|v| v.to_str().ok())
123123+ .map(|ua| {
124124+ if ua.starts_with("Mozilla/5.0 ") {
125125+ "browser"
126126+ } else {
127127+ ua
128128+ }
129129+ })
130130+ .unwrap_or("")
131131+ .to_string();
132132+ counter!("server_requests_total",
133133+ "endpoint" => endpoint.clone(),
134134+ "origin" => origin,
135135+ "ua" => ua,
136136+ "status_code" => status_code,
137137+ )
138138+ .increment(1);
139139+ histogram!("server_handler_latency", "endpoint" => endpoint).record(latency.as_micros() as f64);
140140+ result
141141+}
142142+143143+use dropshot::{HttpResponseHeaders, HttpResponseOk};
144144+145145+pub type OkCorsResponse<T> = Result<HttpResponseHeaders<HttpResponseOk<T>>, HttpError>;
146146+147147+/// Helper for constructing Ok responses: return OkCors(T).into()
148148+/// (not happy with this yet)
149149+pub struct OkCors<T: Serialize + JsonSchema + Send + Sync>(pub T);
150150+151151+impl<T> From<OkCors<T>> for OkCorsResponse<T>
152152+where
153153+ T: Serialize + JsonSchema + Send + Sync,
154154+{
155155+ fn from(ok: OkCors<T>) -> OkCorsResponse<T> {
156156+ let mut res = HttpResponseHeaders::new_unnamed(HttpResponseOk(ok.0));
157157+ res.headers_mut()
158158+ .insert("access-control-allow-origin", "*".parse().unwrap());
159159+ Ok(res)
160160+ }
161161+}
162162+163163+// TODO: cors for HttpError
164164+165165+166166+/// Serve index page as html
167167+#[endpoint {
168168+ method = GET,
169169+ path = "/",
170170+ /*
171171+ * not useful to have this in openapi
172172+ */
173173+ unpublished = true,
174174+}]
175175+async fn index(ctx: RequestContext<Context>) -> Result<Response<Body>, HttpError> {
176176+ instrument_handler(&ctx, async {
177177+ Ok(Response::builder()
178178+ .status(StatusCode::OK)
179179+ .header(http::header::CONTENT_TYPE, "text/html")
180180+ .body(INDEX_HTML.into())?)
181181+ })
182182+ .await
183183+}
184184+185185+/// Serve index page as html
186186+#[endpoint {
187187+ method = GET,
188188+ path = "/favicon.ico",
189189+ /*
190190+ * not useful to have this in openapi
191191+ */
192192+ unpublished = true,
193193+}]
194194+async fn favicon(ctx: RequestContext<Context>) -> Result<Response<Body>, HttpError> {
195195+ instrument_handler(&ctx, async {
196196+ Ok(Response::builder()
197197+ .status(StatusCode::OK)
198198+ .header(http::header::CONTENT_TYPE, "image/x-icon")
199199+ .body(FAVICON.to_vec().into())?)
200200+ })
201201+ .await
202202+}
203203+204204+/// Meta: get the openapi spec for this api
205205+#[endpoint {
206206+ method = GET,
207207+ path = "/openapi",
208208+ /*
209209+ * not useful to have this in openapi
210210+ */
211211+ unpublished = true,
212212+}]
213213+async fn openapi(ctx: RequestContext<Context>) -> OkCorsResponse<serde_json::Value> {
214214+ instrument_handler(&ctx, async {
215215+ let spec = (*ctx.context().spec).clone();
216216+ OkCors(spec).into()
217217+ })
218218+ .await
219219+}
220220+221221+/// The real type that gets deserialized
222222+#[derive(Debug, Deserialize, JsonSchema)]
223223+#[serde(rename_all = "camelCase")]
224224+pub struct MultiSubscribeQuery {
225225+ #[serde(default)]
226226+ pub wanted_subjects: HashSet<String>,
227227+ #[serde(default)]
228228+ pub wanted_subject_dids: HashSet<String>,
229229+ #[serde(default)]
230230+ pub wanted_sources: HashSet<String>,
231231+}
232232+/// The fake corresponding type for docs that dropshot won't freak out about a
233233+/// vec for
234234+#[derive(Deserialize, JsonSchema)]
235235+#[allow(dead_code)]
236236+#[serde(rename_all = "camelCase")]
237237+struct MultiSubscribeQueryForDocs {
238238+ /// One or more at-uris to receive links about
239239+ ///
240240+ /// The at-uri must be url-encoded
241241+ ///
242242+ /// Pass this parameter multiple times to specify multiple collections, like
243243+ /// `wantedSubjects=[...]&wantedSubjects=[...]`
244244+ pub wanted_subjects: String,
245245+ /// One or more DIDs to receive links about
246246+ ///
247247+ /// Pass this parameter multiple times to specify multiple collections
248248+ pub wanted_subject_dids: String,
249249+ /// One or more link sources to receive links about
250250+ ///
251251+ /// TODO: docs about link sources
252252+ ///
253253+ /// eg, a bluesky like's link source: `app.bsky.feed.like:subject.uri`
254254+ ///
255255+ /// Pass this parameter multiple times to specify multiple sources
256256+ pub wanted_sources: String,
257257+}
258258+259259+// The `SharedExtractor` implementation for Query<QueryType> describes how to
260260+// construct an instance of `Query<QueryType>` from an HTTP request: namely, by
261261+// parsing the query string to an instance of `QueryType`.
262262+#[async_trait]
263263+impl SharedExtractor for MultiSubscribeQuery {
264264+ async fn from_request<Context: ServerContext>(
265265+ ctx: &RequestContext<Context>,
266266+ ) -> Result<MultiSubscribeQuery, HttpError> {
267267+ let raw_query = ctx.request.uri().query().unwrap_or("");
268268+ let q = serde_qs::from_str(raw_query).map_err(|e| {
269269+ HttpError::for_bad_request(None, format!("unable to parse query string: {}", e))
270270+ })?;
271271+ Ok(q)
272272+ }
273273+274274+ fn metadata(body_content_type: ApiEndpointBodyContentType) -> ExtractorMetadata {
275275+ // HACK: query type switcheroo: passing MultiSubscribeQuery to
276276+ // `metadata` would "helpfully" panic because dropshot believes we can
277277+ // only have scalar types in a query.
278278+ //
279279+ // so instead we have a fake second type whose only job is to look the
280280+ // same as MultiSubscribeQuery exept that it has `String` instead of
281281+ // `Vec<String>`, which dropshot will accept, and generate ~close-enough
282282+ // docs for.
283283+ <Query<MultiSubscribeQueryForDocs> as SharedExtractor>::metadata(body_content_type)
284284+ }
285285+}
286286+287287+#[derive(Deserialize, JsonSchema)]
288288+#[serde(rename_all = "camelCase")]
289289+struct ScalarSubscribeQuery {
290290+ #[serde(default)]
291291+ pub instant: bool,
292292+}
293293+294294+#[channel {
295295+ protocol = WEBSOCKETS,
296296+ path = "/subscribe",
297297+}]
298298+async fn subscribe(
299299+ reqctx: RequestContext<Context>,
300300+ query: MultiSubscribeQuery,
301301+ scalar_query: Query<ScalarSubscribeQuery>,
302302+ upgraded: WebsocketConnection,
303303+) -> dropshot::WebsocketChannelResult {
304304+ let ws = tokio_tungstenite::WebSocketStream::from_raw_socket(
305305+ upgraded.into_inner(),
306306+ Role::Server,
307307+ None,
308308+ )
309309+ .await;
310310+311311+ let Context { b, d, shutdown, .. } = reqctx.context();
312312+ let sub_token = shutdown.child_token();
313313+314314+ let q = scalar_query.into_inner();
315315+ let subscription = if q.instant { b } else { d }.subscribe();
316316+ log::info!("starting subscriber with broadcast: instant={}", q.instant);
317317+318318+ Subscriber::new(query, sub_token)
319319+ .start(ws, subscription)
320320+ .await
321321+ .map_err(|e| format!("boo: {e:?}"))?;
322322+323323+ Ok(())
324324+}
+163
spacedust/src/subscriber.rs
···11+use tokio::time::interval;
22+use std::time::Duration;
33+use futures::StreamExt;
44+use crate::ClientEvent;
55+use crate::LinkEvent;
66+use crate::server::MultiSubscribeQuery;
77+use futures::SinkExt;
88+use std::error::Error;
99+use tokio::sync::broadcast::{self, error::RecvError};
1010+use tokio_tungstenite::{WebSocketStream, tungstenite::Message};
1111+use tokio_util::sync::CancellationToken;
1212+use dropshot::WebsocketConnectionRaw;
1313+1414+const PING_PERIOD: Duration = Duration::from_secs(30);
1515+1616+pub struct Subscriber {
1717+ query: MultiSubscribeQuery,
1818+ shutdown: CancellationToken,
1919+}
2020+2121+impl Subscriber {
2222+ pub fn new(
2323+ query: MultiSubscribeQuery,
2424+ shutdown: CancellationToken,
2525+ ) -> Self {
2626+ Self { query, shutdown }
2727+ }
2828+2929+ pub async fn start(
3030+ self,
3131+ ws: WebSocketStream<WebsocketConnectionRaw>,
3232+ mut receiver: broadcast::Receiver<LinkEvent>
3333+ ) -> Result<(), Box<dyn Error>> {
3434+ let mut ping_state = None;
3535+ let (mut ws_sender, mut ws_receiver) = ws.split();
3636+ let mut ping_interval = interval(PING_PERIOD);
3737+ let _guard = self.shutdown.clone().drop_guard();
3838+3939+ // TODO: do we need to timeout ws sends??
4040+4141+ metrics::counter!("subscribers_connected_total").increment(1);
4242+ metrics::gauge!("subscribers_connected").increment(1);
4343+4444+ loop {
4545+ tokio::select! {
4646+ l = receiver.recv() => match l {
4747+ Ok(link) => if let Some(message) = self.filter(link) {
4848+ if let Err(e) = ws_sender.send(message).await {
4949+ log::warn!("failed to send link, dropping subscriber: {e:?}");
5050+ break;
5151+ }
5252+ },
5353+ Err(RecvError::Closed) => self.shutdown.cancel(),
5454+ Err(RecvError::Lagged(n)) => {
5555+ log::warn!("dropping lagging subscriber (missed {n} messages already)");
5656+ self.shutdown.cancel();
5757+ }
5858+ },
5959+ cm = ws_receiver.next() => match cm {
6060+ Some(Ok(Message::Ping(state))) => {
6161+ if let Err(e) = ws_sender.send(Message::Pong(state)).await {
6262+ log::error!("failed to reply pong to subscriber: {e:?}");
6363+ break;
6464+ }
6565+ }
6666+ Some(Ok(Message::Pong(state))) => {
6767+ if let Some(expected_state) = ping_state {
6868+ if *state == expected_state {
6969+ ping_state = None; // good
7070+ } else {
7171+ log::error!("subscriber returned a pong with the wrong state, dropping");
7272+ self.shutdown.cancel();
7373+ }
7474+ } else {
7575+ log::error!("subscriber sent a pong when none was expected");
7676+ self.shutdown.cancel();
7777+ }
7878+ }
7979+ Some(Ok(m)) => log::trace!("subscriber sent an unexpected message: {m:?}"),
8080+ Some(Err(e)) => {
8181+ log::error!("failed to receive subscriber message: {e:?}");
8282+ break;
8383+ }
8484+ None => {
8585+ log::trace!("end of subscriber messages. bye!");
8686+ break;
8787+ }
8888+ },
8989+ _ = ping_interval.tick() => {
9090+ if ping_state.is_some() {
9191+ log::warn!("did not recieve pong within {PING_PERIOD:?}, dropping subscriber");
9292+ self.shutdown.cancel();
9393+ } else {
9494+ let new_state: [u8; 8] = rand::random();
9595+ let ping = new_state.to_vec().into();
9696+ ping_state = Some(new_state);
9797+ if let Err(e) = ws_sender.send(Message::Ping(ping)).await {
9898+ log::error!("failed to send ping to subscriber, dropping: {e:?}");
9999+ self.shutdown.cancel();
100100+ }
101101+ }
102102+ }
103103+ _ = self.shutdown.cancelled() => {
104104+ log::info!("subscriber shutdown requested, bye!");
105105+ if let Err(e) = ws_sender.close().await {
106106+ log::warn!("failed to close subscriber: {e:?}");
107107+ }
108108+ break;
109109+ },
110110+ }
111111+ }
112112+ log::trace!("end of subscriber. bye!");
113113+ metrics::gauge!("subscribers_connected").decrement(1);
114114+ Ok(())
115115+ }
116116+117117+ fn filter(
118118+ &self,
119119+ link: LinkEvent,
120120+ // mut sender: impl Sink<Message> + Unpin
121121+ ) -> Option<Message> {
122122+ let query = &self.query;
123123+124124+ // subject + subject DIDs are logical OR
125125+ let target_did = if link.target.starts_with("did:") {
126126+ link.target.clone()
127127+ } else {
128128+ let rest = link.target.strip_prefix("at://")?;
129129+ if let Some((did, _)) = rest.split_once("/") {
130130+ did
131131+ } else {
132132+ rest
133133+ }.to_string()
134134+ };
135135+ if !(query.wanted_subjects.contains(&link.target) || query.wanted_subject_dids.contains(&target_did) || query.wanted_subjects.is_empty() && query.wanted_subject_dids.is_empty()) {
136136+ // wowwww ^^ fix that
137137+ return None
138138+ }
139139+140140+ // subjects together with sources are logical AND
141141+142142+ if !query.wanted_sources.is_empty() {
143143+ let undotted = link.path.strip_prefix('.').unwrap_or_else(|| {
144144+ eprintln!("link path did not have expected '.' prefix: {}", link.path);
145145+ ""
146146+ });
147147+ let source = format!("{}:{undotted}", link.collection);
148148+ if !query.wanted_sources.contains(&source) {
149149+ return None
150150+ }
151151+ }
152152+153153+ let ev = ClientEvent {
154154+ kind: "link".to_string(),
155155+ origin: "live".to_string(),
156156+ link: link.into(),
157157+ };
158158+159159+ let json = serde_json::to_string(&ev).unwrap();
160160+161161+ Some(Message::Text(json.into()))
162162+ }
163163+}
···22<html lang="en">
33 <head>
44 <meta charset="utf-8" />
55- <title>UFOs API Documentation</title>
55+ <title>UFOs API documentation</title>
66 <meta name="viewport" content="width=device-width, initial-scale=1" />
77 <meta name="description" content="API Documentation for UFOs: Samples and stats for all atproto lexicons." />
88 <style>
+1-1
ufos/src/storage_fjall.rs
···4040///
4141/// new data format, roughly:
4242///
4343-/// Partion: 'global'
4343+/// Partition: 'global'
4444///
4545/// - Global sequence counter (is the jetstream cursor -- monotonic with many gaps)
4646/// - key: "js_cursor" (literal)