···8899## What does it do
10101111-...... TODO ......
1111+[Tap](https://github.com/bluesky-social/indigo/tree/main/cmd/tap) is a [tool made by Bluesky](https://docs.bsky.app/blog/introducing-tap), which combines a firehose client/adapter with a JSON output like [Jetstream](https://github.com/bluesky-social/jetstream) and a repository/PDS crawler and importer. It can be useful if you're building some kind of backend app that needs to import and store some set of records from the Atmosphere. It's meant to be run locally on your app's server, with only your app connecting to it, and it simplifies a lot of code for you if you need to both import & backfill existing records of some kind and also stream any new ones that are added after you start the import.
1212+1313+Basically, before Tap, your app code needed to:
1414+1515+1) Connect to a relay or Jetstream
1616+2) Filter only records of selected kinds
1717+3) Find out which repos on which PDSes have records that are relevant to you
1818+4) Connect to those PDSes and get those records via listRecords or getRepo
1919+5) Handle possible duplicates between imported repo and the firehose
2020+2121+If you only want some kinds of records from some specific repos, that still leaves you with both a firehose client and a repo importer and merging the results from both somehow.
2222+2323+With Tap, you only need to:
2424+2525+1) Run Tap, passing it a list of record types to sync in command line parameters or in env
2626+2) Connect to the Tap stream on localhost
2727+3) Save everything coming from the stream
2828+2929+So instead of two ways of importing the records, you only have one and it's the much less involved one. And this library also handles a lot of this for you.
3030+3131+**Tapfall** is an extension of [Skyfall](https://tangled.org/mackuba.eu/skyfall), which is a gem for streaming records from a relay/PDS firehose or Jetstream, and it adds support for the event format used by Tap and for some additional HTTP APIs it provides.
3232+3333+3434+## Installation
3535+3636+Add this to your `Gemfile`:
3737+3838+ gem 'tapfall', '~> 0.1'
3939+4040+4141+## Usage
4242+4343+Create a `Tapfall::Stream` object, specifying the address of the Tap service websocket:
4444+4545+```rb
4646+require 'tapfall'
4747+4848+tap = Tapfall::Stream.new('ws://localhost:2480')
4949+```
5050+5151+You can also just pass a hostname, but then it's interpreted as HTTPS/WSS, which might not be what you want.
5252+5353+Next, set up event listeners to handle incoming messages and get notified of errors. Here are all the available listeners (you will need at least `on_message`):
5454+5555+```rb
5656+# this gives you a parsed message object, one of subclasses of Tapfall::TapMessage
5757+tap.on_message { |msg| p msg }
5858+5959+# lifecycle events
6060+tap.on_connecting { |url| puts "Connecting to #{url}..." }
6161+tap.on_connect { puts "Connected" }
6262+tap.on_disconnect { puts "Disconnected" }
6363+tap.on_reconnect { puts "Connection lost, trying to reconnect..." }
6464+tap.on_timeout { puts "Connection stalled, triggering a reconnect..." }
6565+6666+# handling errors (there's a default error handler that does exactly this)
6767+tap.on_error { |e| puts "ERROR: #{e}" }
6868+```
6969+7070+You can also call these as setters accepting a `Proc` – e.g. to disable default error handling, you can do:
7171+7272+```rb
7373+tap.on_error = nil
7474+```
7575+7676+When you're ready, open the connection by calling `connect`:
7777+7878+```rb
7979+tap.connect
8080+```
8181+8282+The `#connect` method blocks until the connection is explicitly closed with `#disconnect` from an event or interrupt handler. Tapfall & Skyfall use [EventMachine](https://github.com/eventmachine/eventmachine) under the hood, so in order to run some things in parallel, you can use e.g. `EM::PeriodicTimer`.
8383+8484+Tapfall also supports Skyfall's `on_raw_message` handler version, but only if you use Tap in "disable acks" mode (see below), which is not recommended beyond testing, unless you're doing the acks yourself. (This is because Tapfall needs to parse the message into a JSON form in order to get the `id` of the event to send the "ack".)
8585+8686+> [!NOTE]
8787+> Unlike standard firehose and Jetstream, Tap streams don't have a cursor that you store and pass when reconnecting. It's meant to be used only by one client, and it tracks internally itself which events have been sent to you and which weren't. You can think about it this way: it's not a public service like Jetstream that you can share with others, it's a microservice that you run as a component of your app.
8888+8989+9090+### Acks
9191+9292+Tap by default runs in a mode where it expects the client to send back an "ack" after receiving and processing each event. When it gets the ack, it marks the event as processed and will not send it again. If you don't send an ack, it tries to retransmit the event after a moment.
9393+9494+You can also run it with acks disabled, by passing a `--disable-acks` option or `TAP_DISABLE_ACKS=true` env var, in which case it will assume an event has been processed as soon as it's sent to you. This is not recommended to do in production, since if your process crashes during an event processing loop, that event will be lost (and you can't ask for an earlier cursor because there's no cursor).
9595+9696+Tapfall handles the acks for you automatically. If you want it to not send acks, pass an `:ack => false` option to the constructor:
9797+9898+```rb
9999+tap = Tapfall::Stream.new(server, { ack: false })
100100+```
101101+102102+### Password-protected access
103103+104104+Tap also lets you set an admin password, which you can set with the `--admin-password` option or `TAP_ADMIN_PASSWORD` env var. This locks the stream and the API behind HTTP Basic auth (with the user `admin`). Pass the password to Tapfall constructor like this:
105105+106106+```rb
107107+tap = Tapfall::Stream.new(server { admin_password: 'abracadabra' })
108108+```
109109+110110+### Processing messages
111111+112112+Each message passed to `on_message` is an instance of a subclass of `Tapfall::TapMessage`. The main event type is `Tapfall::RecordMessage`, which includes a record operation; you will also receive `Tapfall::IdentityMessage` events, which provide info about an account change like changed handle or migration to a new PDS. `UnknownMessage` might be sent if new unrecognized message types are sent in the future.
113113+114114+All message types share these properties:
115115+116116+- `type` (symbol) – the message type identifier, e.g. `:record`
117117+- `id` (integer), aliased as `seq` – a sequential index of the message
118118+119119+The `:record` messages have an `operations` method, which includes an array of add/remove/edit `Operation`s done on some records. Currently Tap event format only includes one single record operation in each event, but it's returned as an array here for symmetry with the `Skyfall::Firehose` stream version.
120120+121121+An `Operation` has such fields (also matching the API of `Skyfall::Firehose::Operation` and `Skyfall::Jetstream::Operation`):
122122+123123+- `repo` or `did` (string) – DID of the repository (user account)
124124+- `collection` (string) – name of the collection / record type, e.g. `app.bsky.feed.post` for posts
125125+- `type` (symbol) – short name of the collection, e.g. `:bsky_post`
126126+- `rkey` (string) – identifier of a record in a collection
127127+- `path` (string) – the path part of the at:// URI – collection name + ID (rkey) of the item
128128+- `uri` (string) – the complete at:// URI
129129+- `action` (symbol) – `:create`, `:update` or `:delete`
130130+- `cid` (CID) – CID of the operation/record (`nil` for delete operations)
131131+- `live?` (boolean) – true if the record was received from the firehose, false if it was backfilled from the repo
132132+133133+Create and update operations will also have an attached record (JSON object) with details of the post, like etc. The record data is currently available as a Ruby hash via `raw_record` property (custom types may be added in future).
134134+135135+So for example, in order to filter only "create post" operations and print their details, you can do something like this:
136136+137137+```rb
138138+tap.on_message do |m|
139139+ next if m.type != :record
140140+141141+ m.operations.each do |op|
142142+ next unless op.action == :create && op.type == :bsky_post
143143+144144+ puts "#{op.repo}:"
145145+ puts op.raw_record['text']
146146+ puts
147147+ end
148148+end
149149+```
150150+151151+152152+### Note on custom lexicons
153153+154154+Note that the `Operation` objects have two properties that tell you the kind of record they're about: `#collection`, which is a string containing the official name of the collection/lexicon, e.g. `"app.bsky.feed.post"`; and `#type`, which is a symbol meant to save you some typing, e.g. `:bsky_post`.
155155+156156+When Tapfall receives a message about a record type that's not on the list, whether in the `app.bsky` namespace or not, the operation `type` will be `:unknown`, while the `collection` will be the original string. So if an app like e.g. "Skygram" appears with a `zz.skygram.*` namespace that lets you share photos on ATProto, the operations will have a type `:unknown` and collection names like `zz.skygram.feed.photo`, and you can check the `collection` field for record types known to you and process them in some appropriate way, even if Tapfall doesn't recognize the record type.
157157+158158+Do not however check if such operations have a `type` equal to `:unknown` first – just ignore the type and only check the `collection` string. The reason is that some next version might start recognizing those records and add a new `type` value for them like e.g. `:skygram_photo`, and then they won't match your condition anymore.
159159+160160+161161+### Reconnection logic
162162+163163+See the section in the [Skyfall readme](https://tangled.org/mackuba.eu/skyfall#reconnection-logic) about the options for handling reconnecting to a flaky firehose – but since in this case the Tap service will run under your control, likely on the same machine, this might not be as useful in practice.
164164+165165+166166+## HTTP API
167167+168168+Apart from the `/channel` websocket endpoint, Tap also has [a few other endpoints](https://github.com/bluesky-social/indigo/tree/main/cmd/tap#http-api) for adding/removing repos and checking various stats.
169169+170170+You can call all the below methods either on the `Tapfall::Stream` instance you use for connecting to the websocket, or on a separate `Tapfall::API` object if you prefer.
171171+172172+Currently implemented endpoints:
173173+174174+### /repos/add
175175+176176+Tap can work in three possible ways regarding the subset of repos it tracks:
177177+178178+1) `TAP_FULL_NETWORK`, when it tracks *all repos everywhere*
179179+2) `TAP_SIGNAL_COLLECTION`, when it finds and tracks all repos that have some specific types of records you're interested in
180180+3) default mode, when it only tracks repos you've added manually
181181+182182+In that third mode, use this to add repos to the tracking list:
183183+184184+```rb
185185+@tap.add_repo('did:plc:uh4errluyq5thgszrwwrtpuq')
186186+187187+# or:
188188+@tap.add_repos(did_list)
189189+```
190190+191191+### /repos/remove
192192+193193+To remove repos from the list, use:
194194+195195+```rb
196196+@tap.remove_repo('did:plc:uh4errluyq5thgszrwwrtpuq')
197197+198198+# or:
199199+@tap.remove_repos(did_list)
200200+```
122011320214203## Credits