A Product Hunt Clone for AtProto with an emphasis on on-proto community
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Dim-table the DID columns; document the migration learnings

Schema change for storage and query efficiency:
- Add `dids` dim table (BIGINT id PK, VARCHAR did)
- Fact tables (types, urls, backlinks, mentions) now reference DIDs
through BIGINT did_id / subject_did_id columns
- In-process map[string]int64, hydrated from `dids` at startup, source
of truth for ID assignment; new IDs append to a dedicated dim
appender that flushes before the fact appenders so a fact row is
never visible without its referenced dim row

Required dropping the existing 5.35 GB DuckDB file (DROP TABLE +
CHECKPOINT doesn't return space to the OS in 1.5.x). Cursor in sqlite
preserved, harvester replays back to Jetstream's retention boundary
on reconnect.

Also captured today's findings in learnings.md: DuckDB's process-level
exclusive lock (read-only opens fail too), the file-shrink limitation,
the live-machine entrypoint-swap pattern for ad-hoc queries, the
shared-cpu-1x:1024 OOM, and per-day storage growth / pruning strategy.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+175 -27
+96 -15
learnings.md
··· 91 91 92 92 --- 93 93 94 + ## DuckDB: one process at a time 95 + 96 + A DuckDB database file can only be opened by **one process** at a time, full stop. **Read-only doesn't help.** Verified on 1.5.x — opening from a second process returns: 97 + 98 + ``` 99 + IO Error: Could not set lock on file "harvest.duckdb": Conflicting lock is held in /usr/local/bin/harvester (PID 654). See also https://duckdb.org/docs/stable/connect/concurrency 100 + ``` 101 + 102 + This is documented behavior but easy to miss because most other embedded DBs (sqlite, leveldb) allow multi-reader / single-writer across processes. DuckDB does multi-connection within a single process (and recommends that as the concurrency model), not multi-process. 103 + 104 + Implications for an always-on writer: 105 + - The `duckdb` CLI cannot connect to a live `harvest.duckdb` while the harvester holds it. Pause the writer first. 106 + - An external query service has to either (a) cohabit the harvester process via an HTTP/RPC endpoint that runs SQL on the live connection, or (b) work off a snapshot/copy. 107 + - Copying the file while a writer is open isn't safe either: DuckDB writes to the main file during checkpoints, so a `cp` started mid-checkpoint can produce a torn copy. (A copy started between checkpoints is *probably* fine since between-checkpoint writes only go to the .wal — but "probably" isn't the contract.) 108 + 109 + ### Pausing the harvester for an ad-hoc query 110 + 111 + `fly machine update --command "sleep infinity"` swaps the entrypoint, restarts the machine, and the volume stays mounted. The CLI hangs forever waiting for `/health` to come back (it never will — sleep doesn't serve HTTP) — that's expected. Cancel the CLI once the machine reaches `started`; the machine is up, SSH-able, and the duckdb file lock is released. Run the query, then restore with `fly machine update --command "/usr/local/bin/harvester"`. Total disruption: ~1 minute of firehose lag, recovered automatically by the cursor. 112 + 113 + Keep the duckdb CLI binary on the volume (`/data/duckdb`), not in the container's overlay FS, so it survives across the entrypoint swaps. 114 + 115 + ### DuckDB doesn't shrink files 116 + 117 + `DROP TABLE`, `DELETE`, and `CHECKPOINT` mark pages free for reuse but **do not return space to the OS**. The file size only ever grows or stays flat. The 1.5 docs are explicit about this: "DuckDB does not currently shrink the database file when data is deleted." 118 + 119 + To actually compact, the supported paths are: 120 + 121 + - **Recreate the file.** Stop the writer, `rm harvest.duckdb harvest.duckdb.wal`, restart. Cleanest if you can re-ingest (we can — Jetstream replays back ~36h, the cursor in sqlite makes this transparent on reconnect). 122 + - **Export + import.** `EXPORT DATABASE 'tmp_dir'` → drop the file → `IMPORT DATABASE 'tmp_dir'`. Preserves data without re-ingest. 123 + 124 + For "I just want to roll a schema change," the recreate path took us from 5.35 GB → 12 KB instantly. The cursor stays in sqlite (untouched), the harvester replays from there on next reconnect, schema migration done. 125 + 126 + --- 127 + 94 128 ## Community extensions 95 129 96 130 ```sql ··· 284 318 ### DuckDB (`harvest.duckdb`) 285 319 286 320 ```sql 321 + -- Dim table: every distinct DID gets a stable BIGINT id, used everywhere 322 + -- a DID is referenced (author or subject). Both saves disk and makes 323 + -- COUNT(DISTINCT did_id) cheap (8-byte int compare vs 32-byte varchar). 324 + CREATE TABLE IF NOT EXISTS dids ( 325 + id BIGINT PRIMARY KEY, 326 + did VARCHAR NOT NULL 327 + ); 328 + 287 329 CREATE TABLE IF NOT EXISTS types ( 288 - did VARCHAR NOT NULL, 330 + did_id BIGINT NOT NULL, 289 331 typename VARCHAR NOT NULL, 290 332 timestamp TIMESTAMP NOT NULL 291 333 ); 292 334 CREATE TABLE IF NOT EXISTS urls ( 293 - did VARCHAR NOT NULL, 335 + did_id BIGINT NOT NULL, 294 336 url VARCHAR NOT NULL, 295 337 timestamp TIMESTAMP NOT NULL 296 338 ); 297 339 CREATE TABLE IF NOT EXISTS backlinks ( 298 - did VARCHAR NOT NULL, 299 - subject_did VARCHAR NOT NULL, 340 + did_id BIGINT NOT NULL, 341 + subject_did_id BIGINT NOT NULL, 300 342 subject_collection VARCHAR NOT NULL, 301 343 subject_rkey VARCHAR NOT NULL, 302 344 timestamp TIMESTAMP NOT NULL 303 345 ); 304 346 CREATE TABLE IF NOT EXISTS mentions ( 305 - did VARCHAR NOT NULL, 306 - subject_did VARCHAR NOT NULL, 307 - timestamp TIMESTAMP NOT NULL 347 + did_id BIGINT NOT NULL, 348 + subject_did_id BIGINT NOT NULL, 349 + timestamp TIMESTAMP NOT NULL 308 350 ); 309 351 ``` 310 352 311 - `did` is the **author** (the repo the commit lives in). `subject_*` columns are the *referenced* DID/collection/rkey for backlinks, just the referenced DID for mentions. `timestamp` is the Jetstream `time_us` of the commit. Per-record dedupe so each tuple is unique within a record body — same record, repeated `$type`/url/etc. → one row. 353 + `did_id` is the **author** (the repo the commit lives in). `subject_did_id` is the referenced DID for backlinks/mentions; `subject_collection`/`subject_rkey` are kept as VARCHAR (rkeys don't repeat enough to dim, and collections are a bounded enum that DuckDB's dictionary encoding already compresses well). `timestamp` is the Jetstream `time_us` of the commit. Per-record dedupe so each tuple is unique within a record body — same record, repeated `$type`/url/etc. → one row. 354 + 355 + ### DID dim table: in-process map + appender 356 + 357 + DuckDB has no `INSERT … RETURNING` for the appender path. Cheapest model: **the harvester process owns the assignment**, the appender just records it. 358 + 359 + ```go 360 + didMap := make(map[string]int64) // hydrated from `dids` at startup 361 + var nextDidID int64 = 1 362 + 363 + idForDid := func(did string) int64 { 364 + if id, ok := didMap[did]; ok { return id } 365 + id := nextDidID; nextDidID++ 366 + didMap[did] = id 367 + didsApp.AppendRow(id, did) // dim-table appender 368 + return id 369 + } 370 + ``` 371 + 372 + Single-goroutine ingest path → no locking. Hydrate `didMap` on startup with `SELECT id, did FROM dids` and set `nextDidID = MAX(id)+1`. Flush the dim appender **before** the fact appenders so a fact row is never visible without its referenced dim row (no FK, so it'd just be dangling, but flush-order discipline is free). 373 + 374 + **RAM cost**: `map[string]int64` entry ≈ 96 bytes including map overhead and the DID string. Bluesky has ~30M total accounts; a steady-state harvester that observes most of them holds ~3 GB just for the map. Tolerable on 4 GB if nothing else is greedy; switch to an LRU + on-miss SQLite/DuckDB lookup if it gets tight (sqlite with `INSERT … ON CONFLICT(did) DO NOTHING RETURNING id` is the natural fit — sqlite handles dedup, the LRU absorbs the hot path). 375 + 376 + **Crash semantics**: appenders flush dim before facts, then the cursor is upserted. If facts flush but cursor doesn't, replay re-encounters the same DIDs → `idForDid` returns the same persisted id (already in the map) → idempotent. If dim was assigned but not flushed before crash, the unflushed fact rows are also lost (same atomic flush group), so no orphan dim ids and no fact-pointing-at-missing-dim either. 312 377 313 378 --- 314 379 ··· 501 566 path = '/health' 502 567 503 568 [[vm]] 504 - cpu_kind = 'shared' 505 - cpus = 1 506 - memory_mb = 1024 569 + size = 'shared-cpu-2x' # see "Sizing" below — 1x at 1 GB OOM-kills 570 + memory = '4gb' 507 571 508 572 [[mounts]] 509 573 source = 'harvester_data' 510 574 destination = '/data' 511 - initial_size = '100' # GB 575 + initial_size = '100' # GB — see "Volume sizing" below 512 576 ``` 577 + 578 + Use the `size` / `memory` shortcut in `[[vm]]` over the long-form `cpu_kind` / `cpus` / `memory_mb`. The long form parses into a "size" string internally and the round-trip can flag a spurious mismatch warning ("fly.toml: size=shared-cpu-1x" even when `cpus = 2`); the shortcut avoids it. 513 579 514 580 Volumes are region-pinned and machine-pinned. One machine per volume. Volume snapshots are scheduled by default (5-snapshot retention) — that's the recovery story for "rebuild backend later, keep the data". 515 581 ··· 553 619 | Workload | Best fit | 554 620 |---|---| 555 621 | Backfill (CPU-bound, sustained) | `performance-1x` (1 dedicated vCPU). Scale up while catching up, scale back down. | 556 - | Steady-state (~1k records/s after caught up) | `shared-cpu-1x` 1 GB is enough — well under baseline. | 557 - | Both, no babysitting | `shared-cpu-2x` 2 GB. Slower catchup but no throttling drama. | 622 + | Steady-state, no babysitting | **`shared-cpu-2x` 4 GB**. Holds the duckdb working set + the in-process DID dim map without thrashing. | 623 + 624 + **`shared-cpu-1x:1024MB` will OOM-kill** on the firehose. Confirmed in the field: the harvester ran ~75 minutes before `exit_code=137, oom_killed=true`. DuckDB appender batches + sqlite cursor + the websocket buffer + the DID dim map don't fit in 1 GB. `shared-cpu-1x` also caps at 2 GB — to get more memory you have to bump CPU class, hence shared-cpu-2x as the floor. **Do not provision below this** for an always-on harvester. 625 + 626 + `fly scale memory` and `fly scale vm` apply to the live machine without redeploying — useful mid-incident. Update fly.toml at the same time so the next `fly deploy` doesn't quietly downgrade. 627 + 628 + Reconnect logic absorbs slow-consumer evictions during backfill on shared CPU; the failure mode you can't recover from is OOM, not throttling. 629 + 630 + ### Volume sizing: storage grows unbounded 631 + 632 + Per-day ingest produces roughly **3–7 GB/day** of duckdb file growth in steady state (rough — backfill bursts skew this; needs a multi-day sample to confirm). The volume is bounded; the data is not. Two paths: 558 633 559 - For our toy budget, `shared-cpu-1x` 1 GB + 100 GB volume ≈ $21/mo total. Reconnect logic absorbs the slow-consumer evictions during backfill. 634 + - **Bigger volume.** `fly volumes extend <vol_id> -s <new-size-gb>`. Online, no machine restart, no FS resize step needed. At $0.15/GB/mo: 250 GB ≈ $37, 500 GB ≈ $75, 1 TB ≈ $150. Buys runway, doesn't solve the problem. 635 + - **Rolling-window pruning.** `DELETE FROM <fact-tables> WHERE timestamp < now() - INTERVAL N DAY` on a daily cron, where N matches your query window plus a buffer. With a 40-day cutoff and ~5 GB/day, steady-state usage stabilizes around 200 GB. Combine with a 250 GB volume and the system is bounded forever. 636 + 637 + Caveats on pruning: 638 + - DuckDB doesn't return deleted pages to the OS (see "DuckDB doesn't shrink files"). After the first prune cycle the file size plateaus rather than shrinks; new inserts reuse the freed pages. Size the volume for the steady-state plateau, not for "post-prune". 639 + - Prune in time chunks (e.g. one day at a time) rather than one giant DELETE. Full-table scans on a 200 GB DuckDB on shared-cpu-2x are minutes-long and can spike memory enough to trip the health check. 640 + - Run the prune from a goroutine *inside* the harvester process (multi-connection within one process is supported) — not from a separate process (see "DuckDB: one process at a time"). 560 641 561 642 --- 562 643
+79 -12
main.go
··· 158 158 } 159 159 defer db.Close() 160 160 161 + // `dids` is a dim table: every distinct DID we've ever seen gets a unique 162 + // BIGINT id, and every fact-table row references that id. Rationale: 163 + // - DIDs are 28-32 bytes each and repeat heavily across the fact tables. 164 + // A BIGINT is 8 bytes. For tables where each DID appears many times 165 + // (mentions, backlinks especially), this is a real disk savings even 166 + // after DuckDB's own dictionary encoding. 167 + // - Common analytical queries care about COUNT DISTINCT and joins, both 168 + // of which are faster on integers than on long strings. 169 + // `subject_did_id` in backlinks/mentions reuses the same dim table — DIDs 170 + // are DIDs regardless of which side of the relation they appear on. 161 171 if _, err := db.ExecContext(ctx, ` 172 + CREATE TABLE IF NOT EXISTS dids ( 173 + id BIGINT PRIMARY KEY, 174 + did VARCHAR NOT NULL 175 + ); 162 176 CREATE TABLE IF NOT EXISTS types ( 163 - did VARCHAR NOT NULL, 177 + did_id BIGINT NOT NULL, 164 178 typename VARCHAR NOT NULL, 165 179 timestamp TIMESTAMP NOT NULL 166 180 ); 167 181 CREATE TABLE IF NOT EXISTS urls ( 168 - did VARCHAR NOT NULL, 182 + did_id BIGINT NOT NULL, 169 183 url VARCHAR NOT NULL, 170 184 timestamp TIMESTAMP NOT NULL 171 185 ); 172 186 CREATE TABLE IF NOT EXISTS backlinks ( 173 - did VARCHAR NOT NULL, 174 - subject_did VARCHAR NOT NULL, 187 + did_id BIGINT NOT NULL, 188 + subject_did_id BIGINT NOT NULL, 175 189 subject_collection VARCHAR NOT NULL, 176 190 subject_rkey VARCHAR NOT NULL, 177 191 timestamp TIMESTAMP NOT NULL 178 192 ); 179 193 CREATE TABLE IF NOT EXISTS mentions ( 180 - did VARCHAR NOT NULL, 181 - subject_did VARCHAR NOT NULL, 182 - timestamp TIMESTAMP NOT NULL 194 + did_id BIGINT NOT NULL, 195 + subject_did_id BIGINT NOT NULL, 196 + timestamp TIMESTAMP NOT NULL 183 197 ); 184 198 `); err != nil { 185 199 log.Fatalf("create tables: %v", err) 186 200 } 187 201 202 + // Hydrate the dim cache from the persisted dim table. The cache is the 203 + // source of truth for ID assignment while the harvester runs; on restart 204 + // we rebuild it by scanning the table. Per-process map (no locking — the 205 + // ingest path is single-goroutine). 206 + didMap := make(map[string]int64) 207 + var nextDidID int64 = 1 208 + { 209 + rows, err := db.QueryContext(ctx, `SELECT id, did FROM dids`) 210 + if err != nil { 211 + log.Fatalf("dids load: %v", err) 212 + } 213 + for rows.Next() { 214 + var id int64 215 + var did string 216 + if err := rows.Scan(&id, &did); err != nil { 217 + rows.Close() 218 + log.Fatalf("dids scan: %v", err) 219 + } 220 + didMap[did] = id 221 + if id >= nextDidID { 222 + nextDidID = id + 1 223 + } 224 + } 225 + if err := rows.Close(); err != nil { 226 + log.Fatalf("dids rows close: %v", err) 227 + } 228 + log.Printf("dids: loaded %d existing entries (next id %d)", len(didMap), nextDidID) 229 + } 230 + 188 231 sqlConn, err := db.Conn(ctx) 189 232 if err != nil { 190 233 log.Fatalf("duckdb conn: %v", err) ··· 192 235 defer sqlConn.Close() 193 236 194 237 var ( 238 + didsApp *duckdb.Appender 195 239 typesApp *duckdb.Appender 196 240 urlsApp *duckdb.Appender 197 241 backlinksApp *duckdb.Appender ··· 200 244 if err := sqlConn.Raw(func(anyConn any) error { 201 245 dc := anyConn.(driver.Conn) 202 246 var err error 247 + if didsApp, err = duckdb.NewAppenderFromConn(dc, "", "dids"); err != nil { 248 + return err 249 + } 203 250 if typesApp, err = duckdb.NewAppenderFromConn(dc, "", "types"); err != nil { 204 251 return err 205 252 } ··· 216 263 }); err != nil { 217 264 log.Fatalf("appenders: %v", err) 218 265 } 219 - allAppenders := []*duckdb.Appender{typesApp, urlsApp, backlinksApp, mentionsApp} 266 + // dids first: the fact appenders reference dim ids, so on flush we want 267 + // the dim rows visible before the fact rows that point at them. 268 + allAppenders := []*duckdb.Appender{didsApp, typesApp, urlsApp, backlinksApp, mentionsApp} 220 269 defer func() { 221 270 for _, a := range allAppenders { 222 271 if err := a.Close(); err != nil { ··· 237 286 ) 238 287 239 288 var lastTickAt time.Time 289 + 290 + // idForDid returns the dim-table id for `did`, allocating a new one and 291 + // appending to didsApp on first sight. Single-goroutine, no locking. 292 + idForDid := func(did string) int64 { 293 + if id, ok := didMap[did]; ok { 294 + return id 295 + } 296 + id := nextDidID 297 + nextDidID++ 298 + didMap[did] = id 299 + if err := didsApp.AppendRow(id, did); err != nil { 300 + log.Printf("dids append: %v", err) 301 + } 302 + return id 303 + } 240 304 241 305 flushAndCheckpoint := func() { 242 306 ok := true ··· 367 431 extractFromRecord(evt.Commit.Record, extract) 368 432 369 433 ts := time.UnixMicro(evt.TimeUs) 434 + actorID := idForDid(evt.Did) 370 435 for t := range extract.types { 371 - if err := typesApp.AppendRow(evt.Did, t, ts); err != nil { 436 + if err := typesApp.AppendRow(actorID, t, ts); err != nil { 372 437 log.Printf("types append: %v", err) 373 438 } 374 439 } 375 440 for u := range extract.urls { 376 - if err := urlsApp.AppendRow(evt.Did, u, ts); err != nil { 441 + if err := urlsApp.AppendRow(actorID, u, ts); err != nil { 377 442 log.Printf("urls append: %v", err) 378 443 } 379 444 } 380 445 for k := range extract.backlinks { 381 - if err := backlinksApp.AppendRow(evt.Did, k.did, k.collection, k.rkey, ts); err != nil { 446 + subjectID := idForDid(k.did) 447 + if err := backlinksApp.AppendRow(actorID, subjectID, k.collection, k.rkey, ts); err != nil { 382 448 log.Printf("backlinks append: %v", err) 383 449 } 384 450 } 385 451 for m := range extract.mentions { 386 - if err := mentionsApp.AppendRow(evt.Did, m, ts); err != nil { 452 + subjectID := idForDid(m) 453 + if err := mentionsApp.AppendRow(actorID, subjectID, ts); err != nil { 387 454 log.Printf("mentions append: %v", err) 388 455 } 389 456 }