at-then#
Promise-based one-shot waits for matching AT Protocol records from Jetstream.
Wait for one matching AT Protocol record from Jetstream, then stop.
at-then is a one-shot promise API over the AT Protocol firehose. You give it Jetstream pushdown filters plus a local predicate. It opens a Jetstream subscription, waits until a matching record arrives, resolves with that event, and closes the stream.
Status#
This repository is in README-driven development. The API below is the first implementation target.
Why this exists#
Jetstream already does the cheap filtering well. You can push down wantedCollections, wantedDids, and a cursor before any events reach your process.
But most real waits still need application logic after that. Examples:
- The next post from this DID whose text contains
#launch. - The first like whose subject URI matches a specific post.
- The first record that passes a schema check or custom predicate.
You can build that with raw WebSocket code, but the control flow is awkward for a one-shot wait. A promise is the right shape.
Install#
pnpm add at-then
Basic use#
import { atThen } from "at-then"
const event = await atThen<{ text?: string }>({
endpoint: "wss://jetstream2.us-east.bsky.network/subscribe",
filter: {
wantedCollections: ["app.bsky.feed.post"],
wantedDids: ["did:plc:aliceexampledid"],
},
match({ commit }) {
if (commit.operation !== "create") return false
return typeof commit.record?.text === "string"
&& commit.record.text.includes("#launch")
},
timeout: 20_000,
})
console.log(event.did)
console.log(event.commit.record)
atThen() resolves with the matching Jetstream commit event and closes the subscription immediately.
API#
type JetstreamCommitEvent<TRecord = unknown> = {
did: string
time_us: number
kind: "commit"
commit: {
rev: string
operation: "create" | "update" | "delete"
collection: string
rkey: string
cid?: string
record?: TRecord
}
}
type AtThenOptions<TRecord = unknown> = {
endpoint?: string | URL
filter?: {
wantedCollections?: string[]
wantedDids?: string[]
cursor?: number
compress?: boolean
}
match?: (event: JetstreamCommitEvent<TRecord>) => boolean | Promise<boolean>
signal?: AbortSignal
timeout?: number | {
ms: number
reason?: unknown
}
}
declare function atThen<TRecord = unknown>(
options: AtThenOptions<TRecord>
): Promise<JetstreamCommitEvent<TRecord>>
Options#
endpointis the Jetstream/subscribeURL.filtermaps directly to Jetstream subscription query parameters.matchruns after Jetstream pushdown filtering. If omitted, the first record event that survives the pushdown filter wins.signalaborts the wait.timeoutis shorthand for an internal abort timer.
Matching semantics#
at-thenonly resolves on Jetstreamcommitevents that include arecord.- Identity events, account events, and delete commits without a record are ignored.
matchmay be sync or async.- Events are evaluated in arrival order. The first event in stream order whose
matchresult is truthy wins. - If
matchthrows or rejects,atThen()rejects and closes the subscription. - When a match is found, the promise resolves with that same event object.
The ordering rule matters. If match is async, at-then should still mean "first matching event from the stream," not "first predicate promise to settle."
Abort and timeout#
atThen() should support normal AbortSignal cancellation and a built-in timeout.
const controller = new AbortController()
const targetUri = "at://did:plc:someone/app.bsky.feed.post/3kxyz"
const event = await atThen<{ subject?: { uri?: string } }>({
filter: {
wantedCollections: ["app.bsky.feed.like"],
},
match({ commit }) {
return commit.record?.subject?.uri === targetUri
},
signal: controller.signal,
timeout: {
ms: 10_000,
reason: new Error("timed out waiting for first matching like"),
},
})
Timeout should be implemented as an internal abort. That keeps cancellation behavior consistent:
- If the caller aborts, reject with
signal.reasonwhen available. - If the timeout fires, abort with
timeout.reasonwhen provided. - If the timeout fires without a custom reason, reject with an
AtThenTimeoutError.
Yes, abort can carry a specific cause. Modern runtimes support AbortController.abort(reason), and the reason is available as signal.reason.
Non-goals#
- Historical search over already-indexed data.
- Durable cursor management or automatic reconnect loops.
- Multi-match subscriptions or async iterators.
- Full schema decoding for every AT Protocol record type.
This package is for one live wait on the firehose. If you want a long-running consumer, this is the wrong abstraction.
First implementation target#
The first version should do exactly this:
- Open a Jetstream subscription at
/subscribewith the requested pushdown filters. - Listen for incoming messages and ignore anything that is not a record-bearing
commitevent. - Run
matchfor each candidate event, in arrival order. - Resolve on the first truthy match and close the subscription.
- Reject on socket failure, predicate failure, abort, or timeout.
- Merge external cancellation and internal timeout into one abort path.
If this README feels right, the next step is to make at-then.ts implement this contract and then write tests against the semantics above.