Promise-based one-shot waits for matching AT Protocol records from Jetstream
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

TypeScript 100.0%
4 1 0

Clone this repository

https://tangled.org/jauntywk.bsky.social/at-then https://tangled.org/did:plc:zjbq26wybii5ojoypkso2mso/at-then
git@tangled.org:jauntywk.bsky.social/at-then git@tangled.org:did:plc:zjbq26wybii5ojoypkso2mso/at-then

For self-hosted knots, clone URLs may differ based on your setup.

Download tar.gz
README.md

at-then#

Promise-based one-shot waits for matching AT Protocol records from Jetstream.

Wait for one matching AT Protocol record from Jetstream, then stop.

at-then is a one-shot promise API over the AT Protocol firehose. You give it Jetstream pushdown filters plus a local predicate. It opens a Jetstream subscription, waits until a matching record arrives, resolves with that event, and closes the stream.

Status#

This repository is in README-driven development. The API below is the first implementation target.

Why this exists#

Jetstream already does the cheap filtering well. You can push down wantedCollections, wantedDids, and a cursor before any events reach your process.

But most real waits still need application logic after that. Examples:

  • The next post from this DID whose text contains #launch.
  • The first like whose subject URI matches a specific post.
  • The first record that passes a schema check or custom predicate.

You can build that with raw WebSocket code, but the control flow is awkward for a one-shot wait. A promise is the right shape.

Install#

pnpm add at-then

Basic use#

import { atThen } from "at-then"

const event = await atThen<{ text?: string }>({
  endpoint: "wss://jetstream2.us-east.bsky.network/subscribe",
  filter: {
    wantedCollections: ["app.bsky.feed.post"],
    wantedDids: ["did:plc:aliceexampledid"],
  },
  match({ commit }) {
    if (commit.operation !== "create") return false
    return typeof commit.record?.text === "string"
      && commit.record.text.includes("#launch")
  },
  timeout: 20_000,
})

console.log(event.did)
console.log(event.commit.record)

atThen() resolves with the matching Jetstream commit event and closes the subscription immediately.

API#

type JetstreamCommitEvent<TRecord = unknown> = {
  did: string
  time_us: number
  kind: "commit"
  commit: {
    rev: string
    operation: "create" | "update" | "delete"
    collection: string
    rkey: string
    cid?: string
    record?: TRecord
  }
}

type AtThenOptions<TRecord = unknown> = {
  endpoint?: string | URL
  filter?: {
    wantedCollections?: string[]
    wantedDids?: string[]
    cursor?: number
    compress?: boolean
  }
  match?: (event: JetstreamCommitEvent<TRecord>) => boolean | Promise<boolean>
  signal?: AbortSignal
  timeout?: number | {
    ms: number
    reason?: unknown
  }
}

declare function atThen<TRecord = unknown>(
  options: AtThenOptions<TRecord>
): Promise<JetstreamCommitEvent<TRecord>>

Options#

  • endpoint is the Jetstream /subscribe URL.
  • filter maps directly to Jetstream subscription query parameters.
  • match runs after Jetstream pushdown filtering. If omitted, the first record event that survives the pushdown filter wins.
  • signal aborts the wait.
  • timeout is shorthand for an internal abort timer.

Matching semantics#

  • at-then only resolves on Jetstream commit events that include a record.
  • Identity events, account events, and delete commits without a record are ignored.
  • match may be sync or async.
  • Events are evaluated in arrival order. The first event in stream order whose match result is truthy wins.
  • If match throws or rejects, atThen() rejects and closes the subscription.
  • When a match is found, the promise resolves with that same event object.

The ordering rule matters. If match is async, at-then should still mean "first matching event from the stream," not "first predicate promise to settle."

Abort and timeout#

atThen() should support normal AbortSignal cancellation and a built-in timeout.

const controller = new AbortController()
const targetUri = "at://did:plc:someone/app.bsky.feed.post/3kxyz"

const event = await atThen<{ subject?: { uri?: string } }>({
  filter: {
    wantedCollections: ["app.bsky.feed.like"],
  },
  match({ commit }) {
    return commit.record?.subject?.uri === targetUri
  },
  signal: controller.signal,
  timeout: {
    ms: 10_000,
    reason: new Error("timed out waiting for first matching like"),
  },
})

Timeout should be implemented as an internal abort. That keeps cancellation behavior consistent:

  • If the caller aborts, reject with signal.reason when available.
  • If the timeout fires, abort with timeout.reason when provided.
  • If the timeout fires without a custom reason, reject with an AtThenTimeoutError.

Yes, abort can carry a specific cause. Modern runtimes support AbortController.abort(reason), and the reason is available as signal.reason.

Non-goals#

  • Historical search over already-indexed data.
  • Durable cursor management or automatic reconnect loops.
  • Multi-match subscriptions or async iterators.
  • Full schema decoding for every AT Protocol record type.

This package is for one live wait on the firehose. If you want a long-running consumer, this is the wrong abstraction.

First implementation target#

The first version should do exactly this:

  1. Open a Jetstream subscription at /subscribe with the requested pushdown filters.
  2. Listen for incoming messages and ignore anything that is not a record-bearing commit event.
  3. Run match for each candidate event, in arrival order.
  4. Resolve on the first truthy match and close the subscription.
  5. Reject on socket failure, predicate failure, abort, or timeout.
  6. Merge external cancellation and internal timeout into one abort path.

If this README feels right, the next step is to make at-then.ts implement this contract and then write tests against the semantics above.