search for standard sites pub-search.waow.tech
search zig blog atproto
11
fork

Configure Feed

Select the types of activity you want to include in your feed.

tap (firehose sync)#

leaflet-search uses tap from bluesky-social/indigo to receive real-time events from the ATProto firehose.

what is tap?#

tap subscribes to the ATProto firehose, filters for specific collections (e.g., site.standard.document), and broadcasts matching events to websocket clients. it also does initial crawling/backfilling of existing records.

key behavior: tap backfills historical data when repos are added. when a repo is added to tracking:

  1. tap fetches the full repo from the account's PDS using com.atproto.sync.getRepo
  2. live firehose events during backfill are buffered in memory
  3. historical events (marked live: false) are delivered first
  4. after historical events complete, buffered live events are released
  5. subsequent firehose events arrive immediately marked as live: true

tap enforces strict per-repo ordering - live events are synchronization barriers that require all prior events to complete first.

message format#

tap sends JSON messages over websocket. record events look like:

{
  "type": "record",
  "record": {
    "live": true,
    "did": "did:plc:abc123...",
    "rev": "3mbspmpaidl2a",
    "collection": "site.standard.document",
    "rkey": "3lzyrj6q6gs27",
    "action": "create",
    "record": { ... },
    "cid": "bafyrei..."
  }
}

field types (important!)#

field type values notes
type string "record", "identity", "account" message type
action string "create", "update", "delete" NOT an enum!
live bool true/false true = firehose, false = resync
collection string e.g., "site.standard.document" lexicon collection

gotchas#

  1. action is a string, not an enum - tap sends "action": "create" as a JSON string. if your parser expects an enum type, extraction will silently fail. use string comparison.

  2. collection filters apply during processing - TAP_COLLECTION_FILTERS controls which records tap processes and sends to clients, both during live commits and resync CAR walks. records from other collections are skipped entirely.

  3. signal collection vs collection filters - TAP_SIGNAL_COLLECTION controls auto-discovery of repos (which repos to track), while TAP_COLLECTION_FILTERS controls which records from those repos to output. a repo must either be auto-discovered via signal collection OR manually added via /repos/add.

  4. silent extraction failures - if using zat's extractAt, enable debug logging to see why parsing fails:

    pub const std_options = .{
        .log_scope_levels = &.{.{ .scope = .zat, .level = .debug }},
    };
    

    this will show messages like:

    debug(zat): extractAt: parse failed for Op at path { "op" }: InvalidEnumTag
    

memory and performance tuning#

tap loads entire repo CARs into memory during resync. some bsky users have repos that are 100-300MB+. this causes spiky memory usage that can OOM the machine.

[[vm]]
  memory = '2gb'  # 1gb is not enough

[env]
  TAP_RELAY_URL = 'https://relay.waow.tech'  # custom relay (not default bsky.network)
  TAP_RESYNC_PARALLELISM = '1'               # only one repo CAR in memory at a time (default: 5)
  TAP_FIREHOSE_PARALLELISM = '5'             # concurrent event processors (default: 10)
  TAP_OUTBOX_CAPACITY = '10000'              # event buffer size (default: 100000)
  TAP_IDENT_CACHE_SIZE = '10000'             # identity cache entries (default: 2000000)
  TAP_CURSOR_SAVE_INTERVAL = '5s'            # how often to persist firehose cursor
  TAP_REPO_FETCH_TIMEOUT = '600s'            # timeout for repo CAR fetches

why these values?#

  • 2GB memory: 1GB causes OOM kills when resyncing large repos
  • resync parallelism 1: prevents multiple large CARs in memory simultaneously
  • lower firehose/outbox: we track ~1000 repos, not millions - defaults are overkill
  • smaller ident cache: we don't need 2M cached identities

if tap keeps OOM'ing, check logs for large repo resyncs:

fly logs -a leaflet-search-tap | grep "parsing repo CAR" | grep -E "size\":[0-9]{8,}"

quick status check#

from the tap/ directory:

just check

shows tap machine state, most recent indexed date, and 7-day timeline. useful for verifying indexing is working after restarts.

example output:

=== tap status ===
app    781417db604d48  23  ewr  started  ...

=== Recent Indexing Activity ===
Last indexed: 2026-01-08 (14 docs)
Today: 2026-01-11
Docs: 3742 | Pubs: 1231

=== Timeline (last 7 days) ===
2026-01-08: 14 docs
2026-01-07: 29 docs
...

if "Last indexed" is more than a day behind "Today", tap may be down or catching up.

checking catch-up progress#

when tap restarts after downtime, it replays the firehose from its saved cursor. to check progress:

# see current firehose position (look for timestamps in log messages)
fly logs -a leaflet-search-tap | grep -E '"time".*"seq"' | tail -3

the "time" field in log messages shows how far behind tap is. compare to current time to estimate catch-up.

catch-up speed varies:

  • ~0.3x when resync queue is full (large repos being fetched)
  • ~1x or faster once resyncs clear

debugging#

check tap connection#

fly logs -a leaflet-search-tap --no-tail | tail -30

look for:

  • "connected to firehose" - successfully connected to bsky relay
  • "websocket connected" - backend connected to tap
  • "dialing failed" / "i/o timeout" - network issues

check backend is receiving#

fly logs -a leaflet-search-backend --no-tail | grep -E "(tap|indexed)"

look for:

  • tap connected! - connected to tap
  • tap: msg_type=record - receiving messages
  • indexed document: - successfully processing

common issues#

symptom cause fix
tap machine stopped, oom_killed=true large repo CARs exhausted memory increase memory to 2GB, reduce TAP_RESYNC_PARALLELISM to 1
websocket handshake failed: error.Timeout tap not running or network issue restart tap, check regions match
dialing failed: lookup ... i/o timeout DNS issues reaching bsky relay restart tap, transient network issue
messages received but not indexed extraction failing (type mismatch) enable zat debug logging, check field types
repo shows records: 0 after adding resync failed or collection not in filters check tap logs for resync errors, verify TAP_COLLECTION_FILTERS
new platform records not appearing platform's collection not in TAP_COLLECTION_FILTERS add collection to filters, restart tap
indexing stopped, tap shows "started" tap catching up from downtime check firehose position in logs, wait for catch-up

tap API endpoints#

tap exposes HTTP endpoints for monitoring and control:

endpoint description
/health health check
/stats/repo-count number of tracked repos
/stats/record-count total records processed
/stats/outbox-buffer events waiting to be sent
/stats/resync-buffer buffered commits for repos currently resyncing (NOT the resync queue)
/stats/cursors firehose cursor position
/info/:did repo status: {"did":"...","state":"active","records":N}
/repos/add POST with {"dids":["did:plc:..."]} to add repos
/repos/remove POST with {"dids":["did:plc:..."]} to remove repos

note: the tap container has no curl — use wget instead.

example: check repo status

fly ssh console -a leaflet-search-tap -C "wget -qO- http://localhost:2480/info/did:plc:abc123"

example: manually add a repo for backfill

fly ssh console -a leaflet-search-tap -C 'wget -qO- --post-data="{\"dids\":[\"did:plc:abc123\"]}" --header="Content-Type: application/json" http://localhost:2480/repos/add'

fly.io deployment#

both tap and backend should be in the same region for internal networking:

# check current regions
fly status -a leaflet-search-tap
fly status -a leaflet-search-backend

# restart tap if needed
fly machine restart -a leaflet-search-tap <machine-id>

note: changing primary_region in fly.toml only affects new machines. to move existing machines, clone to new region and destroy old one.

references#