# skyline A TypeScript toolkit for consuming the Bluesky network in real-time. Provides WebSocket streaming via Jetstream, HTTP API bindings for the Bluesky public API, and typed parsing/formatting utilities for building CLI tools and bots. **This library is read-only.** It can observe the network (stream posts, fetch profiles, search, read threads) but cannot create posts, follow users, or perform any write operations. For writes, use the official [`@atproto/api`](https://github.com/bluesky-social/atproto/tree/main/packages/api) package, which provides a full-featured agent with `createPost`, `like`, `follow`, etc. ## Quick start ```sh npm install npx tsx examples/hello-jetstream.ts ``` That's it — no API keys, no auth tokens, no configuration. You'll see live Bluesky posts scrolling by within seconds. Press `Ctrl+C` to stop. ## ATProto concepts This section covers the AT Protocol concepts you'll encounter throughout the codebase. If you've used Bluesky the app but never touched ATProto code, start here. ### DIDs and handles Every Bluesky user has two identifiers: - **DID** (Decentralized Identifier): A permanent, opaque ID like `did:plc:oky5czdrnfjpqslsw2a5iclo`. This never changes, even if the user changes their handle or moves to a different server. All data in ATProto is addressed by DID. - **Handle**: A human-readable name like `jay.bsky.team`. Handles can change at any time — a user might switch from `alice.bsky.social` to `alice.com`. Handles are resolved to DIDs via DNS or the `resolveHandle` API. The Jetstream firehose only gives you DIDs. If you need a display name or handle, you must resolve it via the HTTP API (`fetchResolveHandle`, `fetchGetProfile`). This is why many examples in this repo work with raw DIDs — the stream doesn't include profile data. ### AT URIs AT URIs are the canonical way to address any record in the AT Protocol: ``` at://did:plc:oky5czdrnfjpqslsw2a5iclo/app.bsky.feed.post/3mffccqrpx22t └─── DID (who) ──────────────┘ └─ collection ────┘ └─ rkey ─────┘ ``` - **DID**: The repo owner - **Collection**: The Lexicon type (e.g. `app.bsky.feed.post`, `app.bsky.graph.follow`) - **rkey** (record key): A unique ID within the collection, usually a TID (timestamp-based ID) AT URIs are what the protocol uses internally. The `bsky.app` URLs you see in your browser (`https://bsky.app/profile/jay.bsky.team/post/3mffccqrpx22t`) are a web-app convenience. This library provides helpers to convert between the two: `buildAtUri`, `parseAtUri`, `toBskyUrl`, `atUriToBskyUrl`, `parseBskyUrl`. ### Facets (rich text) ATProto does **not** use markdown. Post text is stored as plain UTF-8 with a separate `facets` array that annotates byte ranges with features like links or mentions: ```json { "text": "Check out @jay.bsky.team's work at https://bsky.social", "facets": [ { "index": { "byteStart": 10, "byteEnd": 24 }, "features": [{ "$type": "app.bsky.richtext.facet#mention", "did": "did:plc:..." }] }, { "index": { "byteStart": 35, "byteEnd": 55 }, "features": [{ "$type": "app.bsky.richtext.facet#link", "uri": "https://bsky.social" }] } ] } ``` This design keeps the text human-readable while supporting rich rendering. The `extractLinks()` and `extractMentions()` helpers pull these features out of the facets array so you don't have to navigate the structure yourself. ### Lexicons and the ATProto ecosystem All record types in ATProto are defined by [Lexicons](https://atproto.com/specs/lexicon) — schema definitions identified by reverse-DNS names like `app.bsky.feed.post`. Bluesky's lexicons all live under `app.bsky.*`, but the protocol itself is generic. A different app could define `com.example.recipes.recipe` and use the same infrastructure. This library is tightly coupled to `app.bsky.*` lexicons — the post parsers, type guards, and API bindings all reference Bluesky-specific schemas. However, the low-level WebSocket layer (`createJetstreamConnection`, `onMessage`) receives events for all collections, so you could use it to observe non-Bluesky record types if another ATProto app existed on the network. You'd just need to write your own parsers for those record types. ## Jetstream vs the AT Protocol firehose This library supports two ways to consume the real-time event stream: ### Jetstream (`lib/websocket.ts`, `examples/hello-jetstream.ts`) [Jetstream](https://github.com/bluesky-social/jetstream) is an **official Bluesky service** that provides a simplified, JSON-based WebSocket stream. It pre-processes the raw firehose into clean events: - **Format**: JSON messages with `kind` (`commit`, `identity`, `account`), pre-parsed records, microsecond timestamps - **Filtering**: Server-side filtering by collection and DID via URL params or `options_update` messages - **Reconnection**: Cursor-based replay from a microsecond timestamp (see [Cursors](#cursors) below) - **What you lose**: Cryptographic proofs (CIDs). Jetstream strips the content-addressed hashes that let you verify data integrity. For most applications, this doesn't matter. - **Endpoint**: `wss://jetstream2.us-east.bsky.network/subscribe` This is what 90% of the library uses. ### AT Protocol firehose (`examples/hello-firehose.ts`) The raw firehose via `@atproto/sync` gives you the full commit stream with cryptographic proofs: - **Format**: CBOR-encoded repo commits with CID verification - **What you gain**: Content-addressed integrity proofs — you can verify that the data hasn't been tampered with - **What you lose**: Convenience. You're working with raw repo operations, not pre-parsed events. No server-side filtering; you process everything client-side. Use the raw firehose only if you need cryptographic verification or are building infrastructure-level tooling. ## Cursors Every Jetstream event includes a `time_us` field — a **microsecond Unix timestamp** (not milliseconds, not an opaque token). This is the cursor. When your WebSocket disconnects, the library stores the `time_us` of the last event it processed. On reconnect, it subtracts a safety buffer (default: 5 seconds / `5_000_000` microseconds) and passes that as the `cursor` URL parameter. Jetstream replays all events from that point forward. **Retention window**: Jetstream retains roughly **72 hours** of events. If you disconnect for longer than that, you'll resume from the oldest available event, not from where you left off. There is no "I missed 4 days of data" recovery — you'd need to use the relay firehose or a backfill service for that. **Duplicates**: Because of the safety buffer, you may receive some events twice after reconnecting. The library does not deduplicate — if your application needs exactly-once processing, you'll need to track seen `rkey`s yourself. The `startStreamWithReconnect()` function handles all of this automatically. If you're using the lower-level `createJetstreamConnection` + `attachLifecycleWithCursor`, you manage the `CursorRef` yourself. ## Stream volume and backpressure An unfiltered Jetstream subscription receives **every event on the entire Bluesky network**. As of early 2025, that's roughly: - **~50–150 post creates per second** (varies by time of day) - Plus likes, follows, reposts, blocks, list operations, profile updates, etc. - Total event throughput can exceed **500+ events/second** during peak hours The examples in this repo process events synchronously in the `onEvent` callback. For a CLI tool printing to stdout, this is fine — the bottleneck is your terminal, not the event rate. But if you're doing anything expensive per event (HTTP calls, database writes), you need to buffer or batch. The library does not provide built-in backpressure handling. Filtering helps: subscribing to only `app.bsky.feed.post` cuts volume significantly, and filtering by specific DIDs (via `wantedDids`) can reduce it to near-zero if you're watching a small set of users. ## Dynamic stream filtering with `sendOptionsUpdate` You can change your subscription filters on a live WebSocket **without reconnecting**: ```typescript import { startStreamWithReconnect, sendOptionsUpdate } from "./lib/index.js"; const { getWs } = startStreamWithReconnect({ config: { wantedCollections: ["app.bsky.feed.post"] }, onEvent: (event) => { /* ... */ }, }); // Later, narrow the stream to specific users: sendOptionsUpdate( getWs(), ["app.bsky.feed.post"], // collections ["did:plc:abc...", "did:plc:def..."], // DIDs ); ``` This sends a JSON `options_update` message over the existing WebSocket. The server immediately starts filtering to your new criteria. The `user-feed` example uses this to progressively narrow the stream as it discovers users from search results. Limits: you can filter by collection and/or DID. There's no server-side keyword or language filter — those happen client-side in the `onEvent` callback. ## Authentication and the HTTP API Most Bluesky API endpoints work without authentication through the public API at `https://public.api.bsky.app/xrpc`. This includes profiles, feeds, threads, followers, and social graph queries. **`searchPosts` is the exception** — it requires a Bearer token. This is a Bluesky policy decision; the search index is more expensive to operate than basic data lookups. The token is a **JWT access token** obtained by creating an authenticated session: ```sh # 1. Create an App Password at https://bsky.app/settings/app-passwords # 2. Exchange it for a JWT: curl -s -X POST https://bsky.social/xrpc/com.atproto.server.createSession \ -H "Content-Type: application/json" \ -d '{"identifier":"your.handle","password":"your-app-password"}' \ | jq -r .accessJwt # 3. Export it: export BSKY_AUTH_TOKEN="eyJ..." ``` The JWT expires after a few hours. This library does not handle token refresh — if your token expires, you'll get a 401 and need to create a new session. ## Rate limits The Bluesky public API enforces rate limits per IP: - **Unauthenticated**: ~3,000 requests per 5 minutes - **Authenticated**: ~5,000 requests per 5 minutes When you hit the limit, the API returns HTTP 429. The `isRateLimitError()` helper detects this, and `delay()` is a simple sleep utility. **The library does not implement automatic retry or backoff** — you need to handle this yourself. The `user-feed` example shows a basic pattern: catch the error, sleep 30 seconds, retry. The Jetstream WebSocket has no rate limit on receiving events — it pushes to you as fast as events happen. ## Filtering the stream to specific users To watch posts from specific accounts, you pass their DIDs to `wantedDids`: ```typescript startStreamWithReconnect({ config: { wantedCollections: ["app.bsky.feed.post"], wantedDids: ["did:plc:abc...", "did:plc:def..."], }, onEvent: (event) => { /* only events from these DIDs */ }, }); ``` There is **no shortcut for "my followers"**. You must: 1. Resolve your handle to a DID via `fetchResolveHandle` 2. Paginate through `fetchGetFollows` (the people *you* follow) or `fetchGetFollowers` (people who follow *you*) 3. Collect all their DIDs 4. Pass them to `wantedDids` or `sendOptionsUpdate` The `user-feed` example demonstrates this pattern with search results instead of followers. ## Reconnection behavior `startStreamWithReconnect` handles reconnection automatically: 1. WebSocket `close` event fires 2. Wait `reconnectDelay` (default: 5 seconds) 3. Reconnect with `cursor = lastEventTimestamp - cursorBufferUs` (default buffer: 5 seconds) 4. Jetstream replays events from the cursor forward This is fully automatic — you don't need to wire anything up. The lower-level functions (`createJetstreamConnection` + `attachLifecycleWithCursor`) give you manual control if you need it. **Guarantees**: You will not miss events during brief disconnects (under 72 hours). You may receive duplicates after reconnecting due to the safety buffer. The library does not deduplicate. ## Project structure ``` lib/ ├── types.ts Zod schemas and TypeScript types for all data structures ├── websocket.ts Jetstream WebSocket connection, cursor tracking, reconnection ├── parsing.ts Event type guards, post/identity/account parsers, facet extraction ├── formatting.ts Terminal output: box drawing, keyword highlighting, URL conversion ├── api.ts HTTP API bindings for the Bluesky public API ├── index.ts Re-exports everything from the above modules └── __tests__/ Unit tests (vitest) examples/ ├── hello-jetstream.ts Minimal Jetstream connection (no auth needed) ├── hello-firehose.ts Raw AT Protocol firehose with CID proofs ├── keyword-stream.ts Real-time keyword filtering with cursor reconnection ├── post-lifecycle.ts Track creates, edits, and deletes with throughput stats ├── identity-monitor.ts Handle changes and account status events ├── profile-dashboard.ts Fetch and display a user profile (HTTP API) ├── thread-explorer.ts Render a post thread as an indented tree ├── search-posts.ts Search posts with engagement analysis (requires auth) ├── user-feed.ts Search users → stream their posts (API + WebSocket) └── README.md Detailed guide for each example ``` ## API reference ### WebSocket | Function | Purpose | |----------|---------| | `startStreamWithReconnect(options)` | High-level: connect, handle events, auto-reconnect with cursor. This is what you want. | | `createJetstreamConnection(config)` | Low-level: open a raw WebSocket to Jetstream | | `onMessage(ws, handler)` | Attach a typed event handler (no cursor tracking) | | `onMessageWithCursor(ws, handler, cursorRef)` | Attach a handler that updates a cursor ref on each event | | `attachLifecycleWithCursor(ws, options)` | Wire up open/error/close handlers with cursor-aware reconnection | | `sendOptionsUpdate(ws, collections, dids)` | Change subscription filters on a live connection without reconnecting | | `buildJetstreamUrl(config)` | Build the WebSocket URL with query parameters | | `waitForOpen(ws)` | Promise that resolves when the WebSocket opens | | `createCursorRef()` | Create a `{ current: number | undefined }` ref for cursor tracking | ### Parsing | Function | Purpose | |----------|---------| | `isCommitEvent(event)` | Type guard: is this a commit (create/update/delete)? | | `isIdentityEvent(event)` | Type guard: is this a handle change? | | `isAccountEvent(event)` | Type guard: is this an account status change? | | `parsePost(event)` | Extract a `PostData` from a post create event, or `null` | | `parsePostUpdate(event)` | Extract a `PostUpdateData` from a post update event, or `null` | | `parsePostDelete(event)` | Extract a `PostDeleteData` from a post delete event, or `null` | | `parseKeywordPost(event, keywords)` | Parse + keyword-match in one step. Returns `null` if no match | | `parseUserPost(event, registry)` | Parse + user-match against a `UserRegistry`. Returns `null` if user not in registry | | `extractLinks(facets)` | Pull all link URIs from a post's facets array | | `extractMentions(facets)` | Pull all mentioned DIDs from a post's facets array | | `buildAtUri(did, collection, rkey)` | Construct an `at://` URI | | `parseAtUri(uri)` | Decompose an `at://` URI into `{ did, collection, rkey }` | | `parseBskyUrl(url)` | Decompose a `bsky.app` URL into `{ id, rkey }` | ### HTTP API | Function | Auth required | Purpose | |----------|:---:|---------| | `fetchResolveHandle(handle)` | No | Resolve a handle to a DID | | `fetchGetProfile(actor)` | No | Fetch a user's full profile | | `fetchGetProfiles(actors)` | No | Batch-fetch up to 25 profiles | | `fetchSearchActors(query)` | No | Search for users by name/handle | | `fetchGetAuthorFeed(actor, options)` | No | Fetch a user's recent posts | | `fetchGetPostThread(uri, options)` | No | Fetch a thread (parents + replies) | | `fetchGetFollowers(actor, options)` | No | Paginate through a user's followers | | `fetchGetFollows(actor, options)` | No | Paginate through who a user follows | | `fetchSearchPosts(query, options)` | **Yes** | Full-text search across all posts | | `fetchGetLikes(uri, options)` | No | Who liked a post | | `fetchGetRepostedBy(uri, options)` | No | Who reposted a post | | `fetchGetQuotes(uri, options)` | No | Posts that quote a given post | ### Formatting | Function | Purpose | |----------|---------| | `toBskyUrl(did, rkey)` | Build a `https://bsky.app/profile/...` URL | | `atUriToBskyUrl(atUri)` | Convert an AT URI to a bsky.app URL | | `formatPost(post, header)` | Box-drawing formatted post for terminal output | | `formatKeywordPost(post)` | Format with keyword highlighting | | `formatUserPost(post)` | Format with user display name and handle | | `highlightKeywords(text, keywords)` | Wrap matched keywords in `**bold**` markers | | `formatTruncated(text, maxLength)` | Truncate with `...` | | `formatEngagement(counts)` | Compact stats: `L:5 R:2 Q:1 Re:3` | ## Running tests ```sh npm test ``` Tests use [Vitest](https://vitest.dev/) and cover parsing, formatting, WebSocket URL building, and API URL construction.