lightweight com.atproto.sync.listReposByCollection
45
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix up the readme

phil 389c3f54 1dd0c4a7

+170 -39
+16 -12
hacking.md
··· 93 93 - [ ] lenient pre-sync1.1 94 94 - [ ] *don't* allow non-validating commits that look like sync1.1 95 95 - [ ] rachet by PDS host: be lenient if we have never seen a sync1.1-looking commit, always strict after we see one. 96 - - [ ] boooo we probably need *even more* special handling for pre-sync1.1 repos since they don't include adjacent keys!!! 96 + - [ ] boooo we might need more handling for pre-sync1.1 repos if they don't include adjacent keys 97 + - [ ] resync free hints from first phony getRecord 98 + - [ ] short-circuit: tiny repos may incidentally return their entire CAR for getRecord 99 + - [ ] estimate CAR size and `getRecord` if it's likely very small (bypass `describeRepo`) 100 + - [ ] add a `--heavy` mode that always uses `getRepo` and never `describeRepo` 101 + - [ ] commit CAR handling: generate a list of keys with gaps noted, to reliably detect missing adjacent keys 97 102 - [ ] account status convergeance: if we receive commits from apparently-inactive accounts, should we check upstream status to make sure we're not stale? 98 103 - [ ] split the keyspace: put the rbc/cbr indexes on a second keyspace with larger block size, expect hits on main keyspace 99 104 - [ ] websocket ping/pong (unless jacquard is already doing it) ··· 106 111 - [ ] admin view of backfill state etc 107 112 - [ ] vanity stats for optimizations, like how many in-flight repos were saved from resync due to high-water-mark firehose cursor persistence 108 113 - [ ] if the upstream is a PDS (check with describeServer?) then make only accept events for DIDs that have it as their PDS 114 + - [ ] use `since` on getRepo for resync to get a smaller partial export in many cases (and then more-carefully do the actual resync) 109 115 110 116 111 117 ### special-casing ··· 116 122 ## some choices 117 123 118 124 - tokio for async runtime: works good 119 - - iroh-car: robust, simple, async 120 - - manual CAR processing: since we need access to adjacent keys 121 - - TODO: repo-stream will expose this soon probably 122 - - TODO: right now we use jacquard_repo but i think it's easier in our case to handle it more manually. 125 + - jacquard almost everywhere: makes things *so much* easier 126 + - repo-stream for CAR processing 123 127 - fjall: workload is write-heavy so LSM is a good fit, space efficiency also very desirable 124 128 125 129 ··· 150 154 151 155 taking [inspiration from tap](https://github.com/bluesky-social/indigo/blob/main/cmd/tap/models/models.go) here! 152 156 153 - TODO: fix outdated prefixes here 157 + see [src/storage/mod.rs](./src/storage/mod.rs) for an accurate key summary. rough overview: 154 158 155 159 ``` 156 160 main index: ··· 169 173 170 174 subscribeRepos (firehose) cursor: 171 175 172 - "subscribeRepos"||<subscribe_host>||"cursor" => u64 176 + "sub"||<subscribe_host>||"cursor" => u64 173 177 174 178 175 179 subscribeRepos' host listRepos progress: 176 180 177 - "listRepos"||<subscribe_host> => { 181 + "lsr"||<subscribe_host> => { 178 182 cursor: String, 179 183 completed: Option<DateTime>, 180 184 } ··· 194 198 195 199 per-repo transient sync state: 196 200 197 - "repoPrev"||<did> => <rev:string>||<prevData:cid> 201 + "rev"||<did> => <rev:string>||<prevData:cid> 198 202 199 203 note: kept separate and small because it very frequently updates! 200 204 ··· 206 210 207 211 resync queue: 208 212 209 - "repoResyncQueue"||<after:timestamp/u64_be>||<did> => { 213 + "rsq"||<after:timestamp/u64_be>||<did> => { 210 214 commit: cbor, 211 215 retryCount: u16, 212 216 retryReason: string, ··· 217 221 218 222 resync buffer: 219 223 220 - "resyncBuffer"||<did>||<seq_be:u64> => <raw firehose event:cbor> 224 + "rsb"||<did>||<seq_be:u64> => <raw firehose event:cbor> 221 225 222 226 ``` 223 227 ··· 248 252 249 253 ## parallel work 250 254 251 - there are two implementations of worker pools: one for backfill, and one for firehose commits. they work slightly differently from Bluesky's parallel scheduler (used in tap, relay, jetstream, ..): 255 + there several implementations of worker pools: one for backfill, one for firehose commits, etc. they work slightly differently from Bluesky's parallel scheduler (used in tap, relay, jetstream, ..): 252 256 253 257 Bluesky's parallel scheduler assigns work by sharding on the associated DID: each worker is essentially assigned a subset of DIDs it's responsible for. This is really nice and pretty simple, and upholds the important thing: work for a specific DID is never assigned to more than one worder, so all event for any specific DID are always handled sequentially. 254 258
+147 -20
readme.md
··· 1 1 # lightrail: `listReposByCollection` service 2 2 3 - **status: almost working well but not stable yet!!** 3 + **status: almost working well but _not stable yet!!_** 4 4 5 - lightrail uses the adjacent keys included in CAR slices from firehose commits to detect the first record added and last record removed from a collection in an atproto repo. 5 + Lightrail uses the _adjacent keys_ in firehose commit CAR slices to detect first-record-added-to and last-record-removed-from collections in atproto repos, _statelessly_. Since most commits don't change repos' collection lists, this eliminates most of the work to maintain an accurate repos-by-collection index. 6 6 7 - compared to Bluesky's [`collectiondir`](https://github.com/bluesky-social/indigo/tree/main/cmd/collectiondir) service, lightrail: 7 + Compared to Bluesky's [`collectiondir`](https://github.com/bluesky-social/indigo/tree/main/cmd/collectiondir) service, lightrail: 8 8 9 - - applies sync1.1 inductive proof validation to firehose commits 10 - - handles sync1.1 `#sync` events 11 - - avoids updating its index unless commits actually add or remove collections 12 - - removes repos from the index when the last record from a repo's collection is removed 9 + - validates sync1.1 commit proofs for index integrity 10 + - handles sync1.1 `#sync` events, catching significant repo changes 11 + - actually _removes repos from the index_ when their last record from a collection is removed 12 + - while doing less work over all 13 13 14 - lightrail's CAR slice techniques enable its lightweight implementation, but its primary focus is on accuracy and correctness. 14 + Lightrail's main priorities are accuracy and correctness. 15 15 16 - for backfill, lightrail currently uses `com.atproto.repo.describeRepo`, like Bluesky's `collectiondir`. This is not a robust approach, and will hopefully be replaced by probing that authenticated repo contents (see [./authenticated-collection-list.md](./authenticated-collection-list.md)) soon. 17 16 17 + ## Backfill-by-collection assister 18 18 19 - ### wishlist features (probably doable?): 19 + Sync utilities in atproto like [Tap](https://github.com/bluesky-social/indigo/tree/main/cmd/tap#tap-atproto-sync-utility) and [Hydrant](https://tangled.org/ptr.pet/hydrant#hydrant) can typically synchronize subsets of the atmosphere, filtering repositories by collection. The `com.atproto.sync.listReposByCollection` query answers _"which repos already have content relevant to the filtered subset?"_, so the sync utility can backfill existing relevant network data. 20 20 21 - - [x] DONE accept multiple collections for `listReposbyCollection` (merge + dedup by DID; works bc key is `<collection>||<did>`) 22 - - [x] DONE "wilcard" fo `listReposbyCollection` by omitting the `collection` query param entirely 23 - - ~~`listReposByCollectionPrefix`, either with additional indexes up the NSID hierarchy, or via merge+dedup.~~ not doing 24 - - subscribe to multiple relays 25 - - use authenticated repo contents for backfill instead of `com.atproto.repo.describeRepo` (see [./authenticated-collection-list.md](./authenticated-collection-list.md)) 21 + You usually want to call `listReposByCollection` on the [relay](https://atproto.com/guides/glossary#relay) you subscribe to, to filter the same view of teh network that your firehose delivers. But relays don't usually implement `listReposByCollection` themselves: instead they proxy the request to a helper service, like lightrail! 22 + 23 + ``` 24 + ___________ 25 + ___________ [ lightrail ]...... 26 + [ your app ] ‾‾‾‾^‾‾‾‾‾‾ : 27 + ‾‾‾‾‾^‾‾‾‾‾ __|____ (subscribeRepos) 28 + | .-listReposByCollection-->|--+ | : 29 + __|__ ___/ | relay |<....... 30 + [ tap ]<--------subscribeRepos-------| | 31 + ‾‾‾‾‾ ‾‾‾‾‾‾‾ 32 + ``` 26 33 34 + Subscribing lightrail to the same relay it's assisting keeps its network view consistent. 27 35 28 - ### quirks 29 36 30 - if you see a log line like 37 + ### API 31 38 39 + #### `com.atproto.sync.listReposByCollection` 40 + 41 + [Query docs](https://docs.bsky.app/docs/api/com-atproto-sync-list-repos-by-collection) 42 + 43 + Lightrail implements some [proposed changes](https://github.com/bluesky-social/atproto/pull/4733) to this query: 44 + 45 + - `collection` parameter with zero values (absent) returns *all* repos 46 + - repeated `collection` parameter returns repos from *any* of the specified collections 47 + 48 + Quirks: 49 + 50 + - `limit` can be up to 10,000 (lexicon specifies 2,000 max). This matches `collectiondir`'s limit. 51 + 52 + 53 + #### `com.atproto.sync.listRepos` 54 + 55 + [Query docs](https://docs.bsky.app/docs/api/com-atproto-sync-list-repos) 56 + 57 + 58 + #### `com.atproto.sync.getRepoStatus` 59 + 60 + [Query docs](https://docs.bsky.app/docs/api/com-atproto-sync-get-repo-status) 61 + 62 + 63 + ## Lightrail server quick start 64 + 65 + _(one day we'll have pre-built binaries)_ 66 + 67 + Lightrail is written in rust. Installing [rustup](https://rustup.rs/) will get you everything you need to build and run it. 68 + 69 + ```bash 70 + cargo run --release -- --upstream relay.fire.hose.cam 32 71 ``` 33 - ... WARN ... error=identity resolution failed: jacquard: unsupported DID method: did:web:... 72 + 73 + [`relay.fire.hose.cam`](https://relay.fire.hose.cam/) is one of [microcosm](https://www.microcosm.blue/)'s full-network relays. Lightrail works with a relay or PDS host upstream, or any other service that implements at least: 74 + 75 + - `com.atproto.sync.subscribeRepos` and 76 + - `com.atproto.sync.listRepos` 77 + 78 + 79 + ### Key configs 80 + 81 + ```bash 82 + # you can list all config options with: 83 + cargo run -- --help 34 84 ``` 35 85 36 - it just means the did:web resolution failed. lightrail supports did:web, but a [tiny current bug in jacquard](https://tangled.org/nonbinary.computer/jacquard/issues/31) surfaces this message 86 + - **`--db-path`**, default `./lightrail.db`: where to write lightrail's [fjall](https://fjall-rs.github.io/) db 87 + - **`--listen`**, default `0.0.0.0:2511`: host and port to bind 88 + 89 + 90 + #### Atmosphere configs 91 + 92 + - **`--plc-url`**, default: `https://plc.directory`: where to resolve `did:plc` identities. To use microcosm's mirror: `--plc-url https://plc.wtf`. 93 + - **`--slingshot-url`**, default: `https://slingshot.microcosm.blue`: enables slingshot for identity reoslution (PLC directory acts as fallback. 94 + - **`--deep-crawl`**, default: `[unset]`. enumerate hosts from upstream with `com.atproto.sync.listHosts` and then crawl those hosts each directly with `com.atproto.sync.listRepos`. 95 + 96 + 97 + #### Operational configs 98 + 99 + - **`--metrics-listen`**, default: `0.0.0.0:6789`: enable prometheus-style metrics collection and serving at this address 100 + - **`--max-resync-workers`**, default: `16`: max backfill and repo resync concurrency. increase to use more resources to speed up backfill. 101 + 102 + 103 + more knobs you can twist: 104 + 105 + - **`--ident-cache-size`**, default: `2_000_000`: identity resolution provides repo signing keys and PDS hostnames. a larger cache reduces outbound resolution requests at the cost of more memory used. 106 + - **`--max-firehose-workers`**, default: `6`: max firehose event processing concurrency. 107 + - **`--cursor-save-interval-secs`**, default `1` 108 + - **`--describe-repo-fetch-timeout-secs`**, default `30` 109 + - **`--get-repo-fetch-timeout-secs`**, default `300` 110 + - **`--max-deep-crawl-workers`**, default `4`: host-crawling concurrency for `--deep-crawl` 111 + 112 + 113 + ### quirks 114 + 115 + - Lightrail's ordering of DIDs in the `listReposByCollection` response is different from `collectiondir` 116 + 117 + - `collectiondir` always inserts new DIDs at the end of the paginated response 118 + - Lightrail makes no guarantee except that the response will not contain duplicates 119 + 120 + 121 + - If you see a log line like 122 + 123 + ``` 124 + ... WARN ... error=identity resolution failed: jacquard: unsupported DID method: did:web:... 125 + ``` 126 + 127 + it just means the did:web resolution failed. lightrail supports did:web, but a [tiny current bug in jacquard](https://tangled.org/nonbinary.computer/jacquard/issues/31) surfaces this message 128 + 129 + 130 + ### Backfill 131 + 132 + Lightrail currently uses `com.atproto.repo.describeRepo`, like Bluesky's `collectiondir`. This not as robust as we wish it was, and could be replaced by probing that authenticated repo contents (see [./authenticated-collection-list.md](./authenticated-collection-list.md)) soon. 133 + 134 + The two reasons `describeRepo` isn't robust: 135 + 136 + 1. the results are not authenticated (PDS bugs or quirks could lead to incorrect index) 137 + 2. the response lacks the repo `rev`, so even if the list is accurate, it's not possible to prove that the next firehose commit follows without gaps 138 + 139 + To mitigate the second, we always call `com.atproto.sync.getRecord` *before* `describeRepo`. This establishes a `rev` prior to the list, for eventual-(usually fast)-consistency after cutting over to the firehose. 140 + 141 + The `sync.getRecord` response also includes a CAR slice that we can use: for very small repos, it might actually include a full repository export, in which case we can resync directly (and robustly!) from that and exit early. If it's a partial CAR, it will still include some keys whose presence we can assert when processing the `describeRepo` response to *maybe* catch a PDS bug. 142 + 143 + Future `sync.getRecord` work: since every provable partial CAR must contain at least the MST root node, we can make a very rough estimate of the full-repo export size, and go ahead and `sync.getRepo` instead of `describeRepo` when it's expected to be very small, for better accuracy without much additional bandwidth overhead. 144 + 145 + When we call `sync.getRecord`, we provide a made-up collection and rkey, which works for our purposes because the response will contain a _proof of absense_ if the key doesn't exist in the repo: a CAR slice (with rev + data from the commit object!) containing adjacent keys (that we'll use!). Unfortunately, not every PDS implements proof of absense responses, notably **bridgy** currently returns an error for non-existent keys. 146 + 37 147 148 + #### `sync.getRepo` resync fallback 149 + 150 + If the `describeRepo` approach fails for any reason, lightstream attempt to resync from a full repo export. 151 + 152 + 153 + ### Sync1.1 154 + 155 + plz remind fig to write this up: the strictness ratchet, any handling of lenient hosts we end up needing, and proof re: correctness of the adjacent keys approach. 156 + 157 + 158 + ### wishlist features (probably doable?): 159 + 160 + - [x] DONE accept multiple collections for `listReposbyCollection` (merge + dedup by DID; works bc key is `<collection>||<did>`) 161 + - [x] DONE "wilcard" fo `listReposbyCollection` by omitting the `collection` query param entirely 162 + - ~~`listReposByCollectionPrefix`, either with additional indexes up the NSID hierarchy, or via merge+dedup.~~ not doing 163 + - subscribe to multiple relays 164 + - use authenticated repo contents for backfill instead of `com.atproto.repo.describeRepo` (see [./authenticated-collection-list.md](./authenticated-collection-list.md)) 38 165 39 166 40 167 ## contributing 41 168 42 - see ['./hacking.md'](./hacking.md) 169 + see ['./hacking.md'](./hacking.md) for style, implementation, and architecture notes. 43 170 44 171 45 172 ## license
+7 -7
src/main.rs
··· 24 24 db_path: PathBuf, 25 25 26 26 /// TCP address for the XRPC API server. 27 - #[arg(long, env = "LIGHTRAIL_LISTEN", default_value = "0.0.0.0:3000")] 27 + #[arg(long, env = "LIGHTRAIL_LISTEN", default_value = "0.0.0.0:2511")] 28 28 listen: SocketAddr, 29 29 30 30 /// PLC directory URL for did:plc resolution. ··· 49 49 ident_cache_size: u64, 50 50 51 51 /// Maximum concurrent firehose commit worker tasks. 52 - #[arg(long, env = "LIGHTRAIL_MAX_FIREHOSE_WORKERS", default_value_t = 10)] 52 + #[arg(long, env = "LIGHTRAIL_MAX_FIREHOSE_WORKERS", default_value_t = 6)] 53 53 max_firehose_workers: usize, 54 54 55 55 /// Maximum concurrent resync worker tasks. ··· 74 74 75 75 /// TCP address for the Prometheus metrics HTTP endpoint. 76 76 /// If not set, metrics are not exported. 77 - #[arg(long, env = "LIGHTRAIL_METRICS_BIND", num_args = 0..=1, default_missing_value = "0.0.0.0:6789")] 78 - metrics_bind: Option<SocketAddr>, 77 + #[arg(long, env = "LIGHTRAIL_METRICS_LISTEN", num_args = 0..=1, default_missing_value = "0.0.0.0:6789")] 78 + metrics_listen: Option<SocketAddr>, 79 79 80 80 /// Admin password for privileged API endpoints. 81 81 #[arg(long, env = "LIGHTRAIL_ADMIN_PASSWORD")] 82 82 admin_password: Option<String>, 83 83 84 84 /// Enable deep crawl: discover PDS hosts via listHosts and crawl each one's repos. 85 - #[arg(long, env = "LIGHTRAIL_DEEP_CRAWL")] 85 + #[arg(long, action, env = "LIGHTRAIL_DEEP_CRAWL")] 86 86 deep_crawl: bool, 87 87 88 88 /// Max concurrent per-PDS listRepos workers during deep crawl. 89 - #[arg(long, env = "LIGHTRAIL_MAX_DEEP_CRAWL_WORKERS", default_value_t = 4)] 89 + #[arg(long, env = "LIGHTRAIL_MAX_DEEP_CRAWL_WORKERS", requires("deep_crawl"), default_value_t = 4)] 90 90 max_deep_crawl_workers: usize, 91 91 } 92 92 ··· 119 119 ident_cache_size, 120 120 )); 121 121 122 - if let Some(addr) = args.metrics_bind { 122 + if let Some(addr) = args.metrics_listen { 123 123 install_metrics(addr)?; 124 124 } 125 125