A Product Hunt Clone for AtProto with an emphasis on on-proto community
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

Snapshot before the live-stream speed run

- Save research notes in learnings.md
- Add Dockerfile, fly.toml, and .dockerignore for fly deploy
- Update main.go and dependencies for the harvester
- Remove Deno/Node count-records scratch scripts and configs;
the Go harvester is the canonical implementation

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>

+1423 -414
+15
.dockerignore
··· 1 + .git 2 + .jj 3 + .gitignore 4 + .dockerignore 5 + *.duckdb 6 + *.sqlite 7 + *.sqlite3 8 + *.log 9 + /harvester 10 + README.md 11 + learnings.md 12 + architecture.* 13 + *.svg 14 + *.mmd 15 + fly.toml
+2
.gitignore
··· 1 1 /harvester 2 2 *.log 3 3 node_modules/ 4 + *.sqlite 5 + *.duckdb
+32
Dockerfile
··· 1 + # syntax=docker/dockerfile:1.7 2 + 3 + FROM golang:1.25-bookworm AS builder 4 + 5 + WORKDIR /src 6 + 7 + # Cache deps 8 + COPY go.mod go.sum ./ 9 + RUN go mod download 10 + 11 + # Build the harvester binary. CGO is required for duckdb-go and go-sqlite3. 12 + COPY *.go ./ 13 + RUN CGO_ENABLED=1 GOOS=linux go build -trimpath -o /out/harvester . 14 + 15 + # --- Runtime ---------------------------------------------------------------- 16 + FROM debian:bookworm-slim 17 + 18 + RUN apt-get update \ 19 + && apt-get install -y --no-install-recommends ca-certificates \ 20 + && rm -rf /var/lib/apt/lists/* 21 + 22 + COPY --from=builder /out/harvester /usr/local/bin/harvester 23 + 24 + # /data is the volume mount point. The harvester writes harvest.duckdb and 25 + # harvest_blue.sqlite relative to its working directory, so cwd=/data puts 26 + # both files on the volume. 27 + RUN mkdir -p /data 28 + WORKDIR /data 29 + 30 + EXPOSE 8080 31 + 32 + CMD ["/usr/local/bin/harvester"]
+16 -3
README.md
··· 10 10 11 11 - The **record schemas (NSIDs) the app owns** — what the firehose pipeline should count as evidence of use. 12 12 - The app's **canonical URLs / domains** — what counts as a mention. 13 - - Anything else needed to measure the app fairly. 14 13 15 14 Nominator-declared metadata is the input to all the automatic measurement that follows. 16 15 ··· 34 33 35 34 ### 1. Engagement — are people using it? 36 35 37 - Count of distinct DIDs creating, updating, or deleting records under the schemas the app owns. This is signal that the app is alive on-network, not just that it has a landing page. 36 + Count of distinct DIDs creating, or updating records containing $type fields that match your lexicons. 38 37 39 38 ### 2. Buzz — are people talking about it? 40 39 ··· 42 41 43 42 ### 3. Votes — do humans think it's cool? 44 43 45 - Logged-in users (via Bluesky OAuth) vote for apps they like. Votes are stored as records in the voter's own repo under a harvester.blue lexicon — portable, verifiable, and expensive to fake without real DIDs behind them. 44 + Logged-in users (via Atproto OAuth) vote for apps they like. Votes are stored as records in the voter's own repo under a harvester.blue lexicon — portable, verifiable, and expensive to fake without real DIDs behind them. 46 45 47 46 ## Citizenship badges 48 47 ··· 59 58 ## Why metrics + badges, not one score 60 59 61 60 Engagement alone rewards popular-but-poorly-built apps. Votes alone reward marketing. Citizenship alone rewards purism without traction. Showing the three metrics side-by-side, with citizenship as an orthogonal layer of attested badges, lets a viewer ask the question that fits them — *who is being used, who is being talked about, who is loved, who is doing it right* — without the site picking the answer for them. 61 + 62 + ## Architecture 63 + 64 + ![Architecture diagram](./architecture.svg) 65 + 66 + The system splits cleanly into two tiers with different scaling stories: 67 + 68 + - **Go VM (singleton).** Consumes the Jetstream firehose, writes nominations and votes straight to a local SQLite file, and aggregates engagement/buzz metrics in an in-memory HyperLogLog that periodically flushes to the same DB. Exposes read-only XRPC over Fly's private network. Cannot be horizontally scaled — only one consumer at a time, otherwise events double-count into the HLL. 69 + - **Frontend VMs (stateless, autoscaled).** TanStack Start handles UI/SSR and proxies both metric XRPC calls and PDS operations so the browser only talks to one origin. Truly stateless: OAuth sessions and PKCE state live in Upstash Redis (with native TTLs), so frontends can autoscale freely and roll on every deploy without logging anyone out. 70 + 71 + > Diagram source lives in [`architecture.mmd`](./architecture.mmd). To regenerate the SVG after editing: 72 + > ```sh 73 + > mmdc -i architecture.mmd -o architecture.svg --backgroundColor white 74 + > ``` 62 75 63 76 ## Status 64 77
-5
deno.json
··· 1 - { 2 - "tasks": { 3 - "count-records-24h-deno": "deno run --allow-net ./scripts/count-records-24h.deno.ts" 4 - } 5 - }
+29
fly.toml
··· 1 + app = 'harvester-blue' 2 + primary_region = 'iad' 3 + 4 + [build] 5 + 6 + [http_service] 7 + internal_port = 8080 8 + force_https = true 9 + auto_stop_machines = 'off' 10 + auto_start_machines = false 11 + min_machines_running = 1 12 + processes = ['app'] 13 + 14 + [[http_service.checks]] 15 + interval = '30s' 16 + timeout = '5s' 17 + grace_period = '5m' 18 + method = 'GET' 19 + path = '/health' 20 + 21 + [[vm]] 22 + cpu_kind = 'shared' 23 + cpus = 1 24 + memory_mb = 1024 25 + 26 + [[mounts]] 27 + source = 'harvester_data' 28 + destination = '/data' 29 + initial_size = '100'
+36 -1
go.mod
··· 2 2 3 3 go 1.25.6 4 4 5 - require github.com/coder/websocket v1.8.14 // indirect 5 + require ( 6 + github.com/bytedance/sonic v1.15.1 7 + github.com/coder/websocket v1.8.14 8 + github.com/duckdb/duckdb-go/v2 v2.10501.0 9 + github.com/mattn/go-sqlite3 v1.14.44 10 + ) 11 + 12 + require ( 13 + github.com/apache/arrow-go/v18 v18.5.1 // indirect 14 + github.com/bytedance/gopkg v0.1.3 // indirect 15 + github.com/bytedance/sonic/loader v0.5.1 // indirect 16 + github.com/cloudwego/base64x v0.1.6 // indirect 17 + github.com/duckdb/duckdb-go-bindings v0.10501.0 // indirect 18 + github.com/duckdb/duckdb-go-bindings/lib/darwin-amd64 v0.10501.0 // indirect 19 + github.com/duckdb/duckdb-go-bindings/lib/darwin-arm64 v0.10501.0 // indirect 20 + github.com/duckdb/duckdb-go-bindings/lib/linux-amd64 v0.10501.0 // indirect 21 + github.com/duckdb/duckdb-go-bindings/lib/linux-arm64 v0.10501.0 // indirect 22 + github.com/duckdb/duckdb-go-bindings/lib/windows-amd64 v0.10501.0 // indirect 23 + github.com/go-viper/mapstructure/v2 v2.5.0 // indirect 24 + github.com/goccy/go-json v0.10.5 // indirect 25 + github.com/google/flatbuffers v25.12.19+incompatible // indirect 26 + github.com/google/uuid v1.6.0 // indirect 27 + github.com/klauspost/compress v1.18.3 // indirect 28 + github.com/klauspost/cpuid/v2 v2.3.0 // indirect 29 + github.com/pierrec/lz4/v4 v4.1.25 // indirect 30 + github.com/twitchyliquid64/golang-asm v0.15.1 // indirect 31 + github.com/zeebo/xxh3 v1.1.0 // indirect 32 + golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect 33 + golang.org/x/exp v0.0.0-20260112195511-716be5621a96 // indirect 34 + golang.org/x/mod v0.32.0 // indirect 35 + golang.org/x/sync v0.19.0 // indirect 36 + golang.org/x/sys v0.40.0 // indirect 37 + golang.org/x/telemetry v0.0.0-20260116145544-c6413dc483f5 // indirect 38 + golang.org/x/tools v0.41.0 // indirect 39 + golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da // indirect 40 + )
+113
go.sum
··· 1 + github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ= 2 + github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= 3 + github.com/apache/arrow-go/v18 v18.5.1 h1:yaQ6zxMGgf9YCYw4/oaeOU3AULySDlAYDOcnr4LdHdI= 4 + github.com/apache/arrow-go/v18 v18.5.1/go.mod h1:OCCJsmdq8AsRm8FkBSSmYTwL/s4zHW9CqxeBxEytkNE= 5 + github.com/apache/thrift v0.22.0 h1:r7mTJdj51TMDe6RtcmNdQxgn9XcyfGDOzegMDRg47uc= 6 + github.com/apache/thrift v0.22.0/go.mod h1:1e7J/O1Ae6ZQMTYdy9xa3w9k+XHWPfRvdPyJeynQ+/g= 7 + github.com/bytedance/gopkg v0.1.3 h1:TPBSwH8RsouGCBcMBktLt1AymVo2TVsBVCY4b6TnZ/M= 8 + github.com/bytedance/gopkg v0.1.3/go.mod h1:576VvJ+eJgyCzdjS+c4+77QF3p7ubbtiKARP3TxducM= 9 + github.com/bytedance/sonic v1.15.1 h1:nJD5PmM0vY7J8CT6MxoqbVAAMhkSmV2HgRAUrrpLoOw= 10 + github.com/bytedance/sonic v1.15.1/go.mod h1:mT2NbXunuaEbnZ+mRIX/vYqKISmgEuHFDI4UzmKx2SA= 11 + github.com/bytedance/sonic/loader v0.5.1 h1:Ygpfa9zwRCCKSlrp5bBP/b/Xzc3VxsAW+5NIYXrOOpI= 12 + github.com/bytedance/sonic/loader v0.5.1/go.mod h1:AR4NYCk5DdzZizZ5djGqQ92eEhCCcdf5x77udYiSJRo= 13 + github.com/cloudwego/base64x v0.1.6 h1:t11wG9AECkCDk5fMSoxmufanudBtJ+/HemLstXDLI2M= 14 + github.com/cloudwego/base64x v0.1.6/go.mod h1:OFcloc187FXDaYHvrNIjxSe8ncn0OOM8gEHfghB2IPU= 1 15 github.com/coder/websocket v1.8.14 h1:9L0p0iKiNOibykf283eHkKUHHrpG7f65OE3BhhO7v9g= 2 16 github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg= 17 + github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= 18 + github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= 19 + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= 20 + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= 21 + github.com/duckdb/duckdb-go-bindings v0.10501.0 h1:BR21HkcALr9Lm+Ios2vEPaaB5oRRxGJHONzkS0bnOKE= 22 + github.com/duckdb/duckdb-go-bindings v0.10501.0/go.mod h1:UiTBFhbFLPI8+jX7hi3N577KlKOZGj/BW5qSN904658= 23 + github.com/duckdb/duckdb-go-bindings v0.10502.0 h1:Uhg/dfvPLQv4cH35lMD48hqUcdOh2Z7bcuykjr4qnOA= 24 + github.com/duckdb/duckdb-go-bindings v0.10502.0/go.mod h1:8KF3oEKrmYdSbZnQ1BPTdxAZDHRaM1LEv+oBvL2nSLk= 25 + github.com/duckdb/duckdb-go-bindings/lib/darwin-amd64 v0.10501.0 h1:InnDiz/iBHUzwI/4xkigTq6PRrIx+9L+eC2NfCShgWc= 26 + github.com/duckdb/duckdb-go-bindings/lib/darwin-amd64 v0.10501.0/go.mod h1:EnAvZh1kNJHp5yF+M1ZHNEvapnmt6anq1xXHVrAGqMo= 27 + github.com/duckdb/duckdb-go-bindings/lib/darwin-amd64 v0.10502.0 h1:1GxSHSI1ef3sCdDVrJ9l8s6aTd7P1K788os9lHrs43g= 28 + github.com/duckdb/duckdb-go-bindings/lib/darwin-amd64 v0.10502.0/go.mod h1:EnAvZh1kNJHp5yF+M1ZHNEvapnmt6anq1xXHVrAGqMo= 29 + github.com/duckdb/duckdb-go-bindings/lib/darwin-arm64 v0.10501.0 h1:XLMUi/9QJcN8Bp77ML/QPwynX8f9RAg4VUiTdPzRUEU= 30 + github.com/duckdb/duckdb-go-bindings/lib/darwin-arm64 v0.10501.0/go.mod h1:IGLSeEcFhNeZF16aVjQCULD7TsFZKG5G7SyKJAXKp5c= 31 + github.com/duckdb/duckdb-go-bindings/lib/darwin-arm64 v0.10502.0 h1:76gB6UiqKae6JptNiFLjwecD0oR87bXS5u6Lni9hSGI= 32 + github.com/duckdb/duckdb-go-bindings/lib/darwin-arm64 v0.10502.0/go.mod h1:IGLSeEcFhNeZF16aVjQCULD7TsFZKG5G7SyKJAXKp5c= 33 + github.com/duckdb/duckdb-go-bindings/lib/linux-amd64 v0.10501.0 h1:td84w8XucSPQoxGC84RYIxTu1+RV+fjeFIHVk3GLXog= 34 + github.com/duckdb/duckdb-go-bindings/lib/linux-amd64 v0.10501.0/go.mod h1:KAIynZ0GHCS7X5fRyuFnQMg/SZBPK/bS9OCOVojClxw= 35 + github.com/duckdb/duckdb-go-bindings/lib/linux-amd64 v0.10502.0 h1:fcBKRy9keR5FLxppDD7ZjQ1EwqTRcA2kPLi2jWilPDw= 36 + github.com/duckdb/duckdb-go-bindings/lib/linux-amd64 v0.10502.0/go.mod h1:KAIynZ0GHCS7X5fRyuFnQMg/SZBPK/bS9OCOVojClxw= 37 + github.com/duckdb/duckdb-go-bindings/lib/linux-arm64 v0.10501.0 h1:XLw03uWhdQvAFU6unJ2MtQvSFi4LNCfhiOM644MyAHw= 38 + github.com/duckdb/duckdb-go-bindings/lib/linux-arm64 v0.10501.0/go.mod h1:81SGOYoEUs8qaAfSk1wRfM5oobrIJ5KI7AzYhK6/bvQ= 39 + github.com/duckdb/duckdb-go-bindings/lib/linux-arm64 v0.10502.0 h1:pUwDWLQZIkm/v5aoGIu2cTAsgGqratxklRwP9zzsmiU= 40 + github.com/duckdb/duckdb-go-bindings/lib/linux-arm64 v0.10502.0/go.mod h1:81SGOYoEUs8qaAfSk1wRfM5oobrIJ5KI7AzYhK6/bvQ= 41 + github.com/duckdb/duckdb-go-bindings/lib/windows-amd64 v0.10501.0 h1:jhhOonew2VOcTN4f+BOlnOTUgHEp797Uee2Tq8xMno8= 42 + github.com/duckdb/duckdb-go-bindings/lib/windows-amd64 v0.10501.0/go.mod h1:K25pJL26ARblGDeuAkrdblFvUen92+CwksLtPEHRqqQ= 43 + github.com/duckdb/duckdb-go-bindings/lib/windows-amd64 v0.10502.0 h1:CDPf2ow6pP/9zYXfBdyT8a1GZ69eBWdMt5AhAsVgvyU= 44 + github.com/duckdb/duckdb-go-bindings/lib/windows-amd64 v0.10502.0/go.mod h1:K25pJL26ARblGDeuAkrdblFvUen92+CwksLtPEHRqqQ= 45 + github.com/duckdb/duckdb-go/v2 v2.10501.0 h1:vYgvKBfotrZqpBESqHXYF5NVlbaYHRf0VrQEXtb/jnU= 46 + github.com/duckdb/duckdb-go/v2 v2.10501.0/go.mod h1:825xmA19rJmdYWvSTd0kHWT9xq3EChSejO5RwevS9ZA= 47 + github.com/duckdb/duckdb-go/v2 v2.10502.0 h1:YfdiBlXnlRdxIKu1AtBQSRI0/tGhOkIGshKq52+uA7A= 48 + github.com/duckdb/duckdb-go/v2 v2.10502.0/go.mod h1:a/31wL2vx7dJ0isrO+E6o28DBQVaVOMbKxp2BsHTGp0= 49 + github.com/go-viper/mapstructure/v2 v2.5.0 h1:vM5IJoUAy3d7zRSVtIwQgBj7BiWtMPfmPEgAXnvj1Ro= 50 + github.com/go-viper/mapstructure/v2 v2.5.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= 51 + github.com/goccy/go-json v0.10.5 h1:Fq85nIqj+gXn/S5ahsiTlK3TmC85qgirsdTP/+DeaC4= 52 + github.com/goccy/go-json v0.10.5/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= 53 + github.com/golang/snappy v1.0.0 h1:Oy607GVXHs7RtbggtPBnr2RmDArIsAefDwvrdWvRhGs= 54 + github.com/golang/snappy v1.0.0/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= 55 + github.com/google/flatbuffers v25.12.19+incompatible h1:haMV2JRRJCe1998HeW/p0X9UaMTK6SDo0ffLn2+DbLs= 56 + github.com/google/flatbuffers v25.12.19+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= 57 + github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= 58 + github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= 59 + github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= 60 + github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= 61 + github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= 62 + github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= 63 + github.com/klauspost/compress v1.18.3 h1:9PJRvfbmTabkOX8moIpXPbMMbYN60bWImDDU7L+/6zw= 64 + github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4= 65 + github.com/klauspost/cpuid/v2 v2.3.0 h1:S4CRMLnYUhGeDFDqkGriYKdfoFlDnMtqTiI/sFzhA9Y= 66 + github.com/klauspost/cpuid/v2 v2.3.0/go.mod h1:hqwkgyIinND0mEev00jJYCxPNVRVXFQeu1XKlok6oO0= 67 + github.com/mattn/go-sqlite3 v1.14.44 h1:3VSe+xafpbzsLbdr2AWlAZk9yRHiBhTBakioXaCKTF8= 68 + github.com/mattn/go-sqlite3 v1.14.44/go.mod h1:pjEuOr8IwzLJP2MfGeTb0A35jauH+C2kbHKBr7yXKVQ= 69 + github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpspGNG7Z948v4n35fFGB3RR3G/ry4FWs= 70 + github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= 71 + github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI= 72 + github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= 73 + github.com/pierrec/lz4/v4 v4.1.25 h1:kocOqRffaIbU5djlIBr7Wh+cx82C0vtFb0fOurZHqD0= 74 + github.com/pierrec/lz4/v4 v4.1.25/go.mod h1:EoQMVJgeeEOMsCqCzqFm2O0cJvljX2nGZjcRIPL34O4= 75 + github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= 76 + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= 77 + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= 78 + github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= 79 + github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= 80 + github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= 81 + github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= 82 + github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= 83 + github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= 84 + github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= 85 + github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= 86 + github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= 87 + github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= 88 + github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI= 89 + github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08= 90 + github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= 91 + github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= 92 + github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs= 93 + github.com/zeebo/xxh3 v1.1.0/go.mod h1:IisAie1LELR4xhVinxWS5+zf1lA4p0MW4T+w+W07F5s= 94 + golang.org/x/arch v0.0.0-20210923205945-b76863e36670 h1:18EFjUmQOcUvxNYSkA6jO9VAiXCnxFY6NyDX0bHDmkU= 95 + golang.org/x/arch v0.0.0-20210923205945-b76863e36670/go.mod h1:5om86z9Hs0C8fWVUuoMHwpExlXzs5Tkyp9hOrfG7pp8= 96 + golang.org/x/exp v0.0.0-20260112195511-716be5621a96 h1:Z/6YuSHTLOHfNFdb8zVZomZr7cqNgTJvA8+Qz75D8gU= 97 + golang.org/x/exp v0.0.0-20260112195511-716be5621a96/go.mod h1:nzimsREAkjBCIEFtHiYkrJyT+2uy9YZJB7H1k68CXZU= 98 + golang.org/x/mod v0.32.0 h1:9F4d3PHLljb6x//jOyokMv3eX+YDeepZSEo3mFJy93c= 99 + golang.org/x/mod v0.32.0/go.mod h1:SgipZ/3h2Ci89DlEtEXWUk/HteuRin+HHhN+WbNhguU= 100 + golang.org/x/sync v0.19.0 h1:vV+1eWNmZ5geRlYjzm2adRgW2/mcpevXNg50YZtPCE4= 101 + golang.org/x/sync v0.19.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= 102 + golang.org/x/sys v0.40.0 h1:DBZZqJ2Rkml6QMQsZywtnjnnGvHza6BTfYFWY9kjEWQ= 103 + golang.org/x/sys v0.40.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= 104 + golang.org/x/telemetry v0.0.0-20260116145544-c6413dc483f5 h1:i0p03B68+xC1kD2QUO8JzDTPXCzhN56OLJ+IhHY8U3A= 105 + golang.org/x/telemetry v0.0.0-20260116145544-c6413dc483f5/go.mod h1:b7fPSJ0pKZ3ccUh8gnTONJxhn3c/PS6tyzQvyqw4iA8= 106 + golang.org/x/tools v0.41.0 h1:a9b8iMweWG+S0OBnlU36rzLp20z1Rp10w+IY2czHTQc= 107 + golang.org/x/tools v0.41.0/go.mod h1:XSY6eDqxVNiYgezAVqqCeihT4j1U2CCsqvH3WhQpnlg= 108 + golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da h1:noIWHXmPHxILtqtCOPIhSt0ABwskkZKjD3bXGnZGpNY= 109 + golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= 110 + gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= 111 + gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= 112 + gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= 113 + gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= 114 + gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= 115 + gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
+594
learnings.md
··· 1 + # learnings.md 2 + 3 + Notes from the morning spike (Go + DuckDB + SQLite + URLPattern + at-uris). Reference doc, not a tutorial — written for the build phase that follows. 4 + 5 + --- 6 + 7 + ## Go DuckDB driver 8 + 9 + **Use `github.com/duckdb/duckdb-go/v2`.** The previously-popular `github.com/marcboeker/go-duckdb/v2` is now deprecated and points at this one — but the redirect target's `go.mod` still self-declares the old name, so you *can't* `go get github.com/duckdb/duckdb-go` (no `/v2`). The `/v2` suffix matters. 10 + 11 + CGO is required. Apple Clang from the Xcode CLT is enough on macOS — no system DuckDB install needed (the driver ships static prebuilt libraries per platform). 12 + 13 + ### Versioning scheme 14 + 15 + The bindings encode the DuckDB version into the second semver component: 16 + 17 + | Module version | DuckDB version | 18 + |---|---| 19 + | `v2.10500.0` / bindings `v0.10500.0` | DuckDB 1.5.0 | 20 + | `v2.10501.0` / bindings `v0.10501.0` | DuckDB 1.5.1 | 21 + | `v2.10502.0` / bindings `v0.10502.0` | DuckDB 1.5.2 | 22 + 23 + Older releases used the `v0.3.x` / `v0.1.x` scheme (pre 1.5.0). Tagging changed at 1.5.0. 24 + 25 + ### `go.mod` shape (current pinned state) 26 + 27 + ``` 28 + require ( 29 + github.com/duckdb/duckdb-go/v2 v2.10501.0 30 + github.com/mattn/go-sqlite3 v1.14.44 31 + ) 32 + 33 + require ( 34 + // pin all six bindings explicitly — see "MVS gotcha" below 35 + github.com/duckdb/duckdb-go-bindings v0.10501.0 // indirect 36 + github.com/duckdb/duckdb-go-bindings/lib/darwin-amd64 v0.10501.0 // indirect 37 + github.com/duckdb/duckdb-go-bindings/lib/darwin-arm64 v0.10501.0 // indirect 38 + github.com/duckdb/duckdb-go-bindings/lib/linux-amd64 v0.10501.0 // indirect 39 + github.com/duckdb/duckdb-go-bindings/lib/linux-arm64 v0.10501.0 // indirect 40 + github.com/duckdb/duckdb-go-bindings/lib/windows-amd64 v0.10501.0 // indirect 41 + ) 42 + ``` 43 + 44 + Note the `/lib/` segment in the per-platform binding paths. The deprecated `marcboeker/...` driver used `duckdb-go-bindings/<platform>` (no `lib/`). 45 + 46 + ### Pinning gotcha (DuckDB version ↔ extension availability) 47 + 48 + Community extensions are built per `(DuckDB version, platform)`. The extension server lives at: 49 + 50 + ``` 51 + https://community-extensions.duckdb.org/v<DUCKDB_VERSION>/<PLATFORM>/<NAME>.duckdb_extension.gz 52 + ``` 53 + 54 + `curl -I` it before pinning a DuckDB version. The latest extension build often lags the latest DuckDB by a release. 55 + 56 + **MVS gotcha:** Downgrading the driver does *not* drag bindings down. If `duckdb-go-bindings v0.10502.0` is already in the dependency graph, `go get duckdb-go/v2@v2.10501.0` will keep the higher binding version. You have to `go get` each of the six binding modules at the matching version too. 57 + 58 + --- 59 + 60 + ## Go SQLite driver 61 + 62 + `github.com/mattn/go-sqlite3` — also CGO. Standard `database/sql` interface, registers as the `"sqlite3"` driver. 63 + 64 + Pure-Go alternative exists (`modernc.org/sqlite`) but since DuckDB already requires CGO there's no reason to prefer it here. 65 + 66 + --- 67 + 68 + ## ATTACHing SQLite to DuckDB 69 + 70 + ```sql 71 + INSTALL sqlite; 72 + LOAD sqlite; 73 + ATTACH '/path/to/file.sqlite' AS s (TYPE sqlite, READ_ONLY); 74 + SELECT * FROM s.my_table; 75 + ``` 76 + 77 + - DuckDB resolves `s.my_table` like any other schema-qualified table — joins, CTEs, etc. all work normally. 78 + - **Always use `READ_ONLY` if there's another writer on the same file.** See concurrency notes below. 79 + 80 + ### Concurrency: live SQLite writer + live DuckDB reader 81 + 82 + Probed in `duckdb/coexist/main.go`. Findings: 83 + 84 + - ✅ **READ_ONLY ATTACH coexists fine with a live writer on a separate `*sql.DB` SQLite handle.** New rows inserted through the SQLite handle are visible to subsequent DuckDB queries against `s.my_table` immediately — no DETACH/re-ATTACH, no checkpoint, no refresh. Works with or without WAL. 85 + - 🚫 **Read-write ATTACH + a concurrent writer = data divergence.** If DuckDB writes through the attached connection while another handle is also writing the same file, each side ends up only seeing its own writes (each holds an independent snapshot, and DETACH+re-ATTACH does *not* refresh). The file ends up internally inconsistent. Don't do this. 86 + 87 + Production model: 88 + - One goroutine owns the SQLite `*sql.DB` and is the only writer (and any number of readers). 89 + - DuckDB ATTACHes that file `READ_ONLY`. Use it freely from analytical queries. 90 + - DuckDB writes only ever go to native DuckDB tables, never to attached SQLite. 91 + 92 + --- 93 + 94 + ## Community extensions 95 + 96 + ```sql 97 + INSTALL <name> FROM community; 98 + LOAD <name>; 99 + ``` 100 + 101 + Bundled-with-DuckDB list (approximate; varies by version): `parquet`, `azure`, `ducklake`, `sqlite`, `lance`. Anything else is a community extension and needs the `FROM community` clause. 102 + 103 + --- 104 + 105 + ## URLPattern extension (`teaguesterling/duckdb_urlpattern`) 106 + 107 + ### Functions 108 + 109 + | Function | Purpose | 110 + |---|---| 111 + | `urlpattern_test(pattern, url) → BOOL` | Match test | 112 + | `urlpattern_extract(pattern, url, group_name) → VARCHAR` | Pull a named group | 113 + | `urlpattern_exec(pattern, url) → STRUCT` | Full match result with `.matched`, `.pathname`, `.groups[name]` | 114 + | `urlpattern_init(protocol := …, hostname := …, pathname := …, …)` | Build a pattern from components, bypassing URL string parsing of the pattern | 115 + | `urlpattern(s) → URLPATTERN` | Constructor for the URLPATTERN custom type | 116 + | `url_parse(url) → STRUCT` | Parse URL into components | 117 + | `url_pathname(url)`, `url_hostname(url)`, `url_protocol(url)`, etc. | Component extraction | 118 + 119 + `urlpattern_test` accepts either a `VARCHAR` pattern or a `URLPATTERN` (auto-cast). Patterns can be stored in a column typed `URLPATTERN` for parse-once semantics. 120 + 121 + ### Pattern syntax cheat sheet 122 + 123 + `*` is **greedy across `/`** — it's effectively `.*`, not "one segment". 124 + 125 + | Pattern | `/test` | `/test/test` | `/a/b/c/d` | `/` | 126 + |---|---|---|---|---| 127 + | `/*` | ✓ | ✓ | ✓ | ✓ | 128 + | `/:seg` | ✓ | ✗ | ✗ | ✗ | 129 + | `/*/*` | ✗ | ✓ | ✓ | ✗ | 130 + 131 + So: 132 + - `*` — anything, including empty. 133 + - `:name` — exactly one segment (stops at `/`). Captures it as a named group. 134 + - `/*/*` — at least two segments, but the trailing `*` is still greedy. Use `:a/:b` for "exactly two". 135 + - `(regex)` — explicit regex group. Useful when you need literal characters that URLPattern syntax would otherwise interpret (see at-uri section). 136 + - `\:` — escape literal colon. Doesn't always work (parser is picky about where). 137 + 138 + Hostname patterns follow the same rules (`*.example.com`, `:tenant.example.com`). 139 + 140 + ### URL parsing constraints 141 + 142 + Pattern strings AND input URLs both go through the WHATWG URL parser, which has hard rules: 143 + 144 + - Non-bracketed `:` in the host is interpreted as `host:port`. The "port" must be numeric. `did:plc:1239234980` therefore fails to parse — `plc` isn't a number. 145 + - For non-special schemes like `at://`, `url_parse(...)` returns NULL on this. `urlpattern_test` silently returns `false` rather than erroring. 146 + - Percent-encoding in the URL works: `at://did%3Aplc%3A1239234980/foo/bar` parses cleanly. `url_pathname` returns `/foo/bar`, `url_hostname` returns `did%3Aplc%3A1239234980`. 147 + - But percent-encoding in the **pattern** hostname doesn't work: `urlpattern_init(hostname := 'did%3Aplc%3A1239234980')` raises `Invalid URL pattern components` because `%` isn't a valid hostname-pattern character. 148 + - Workaround if you must: regex group like `(did%3Aplc%3A1239234980)`. It accepts `%` and matches the percent-encoded host. **Don't do this in production** — see next section. 149 + 150 + --- 151 + 152 + ## at-uri matching: don't use URLPattern 153 + 154 + at-uri format: `at://<did>/<collection>/<rkey>` (rkey is sometimes absent — collection-level URI). 155 + 156 + Example: `at://did:plc:1239234980/harvester.blue.vote/98023432` 157 + 158 + DIDs contain literal colons. URLPattern (and the WHATWG URL parser under it) can't natively handle them. We probed every reasonable encoding — escaped colons, brackets, percent-encoding, regex groups — and the only thing that produced any match was percent-encoding the URL *and* using a regex group in the pattern hostname. That's not a real solution. 159 + 160 + **Use component-based matching instead.** Destructure on insert, store as `(did, collection, rkey)`, and match with plain SQL: 161 + 162 + ```sql 163 + SELECT u.id, u.uri, 164 + (p.did = '*' OR p.did = u.did) 165 + AND (p.collection = '*' OR p.collection = u.collection) 166 + AND (p.rkey = '*' OR p.rkey = u.rkey) AS matches 167 + FROM urls u 168 + CROSS JOIN patterns p; 169 + ``` 170 + 171 + Trivial, fast, no encoding tax, no extension dependency for this case. Reserve URLPattern for the buzz-mention side (real http URLs against domain patterns) where it earns its keep. 172 + 173 + A minimal Go destructurer: 174 + 175 + ```go 176 + func parseAtURI(uri string) (did, coll, rkey string, ok bool) { 177 + const prefix = "at://" 178 + if !strings.HasPrefix(uri, prefix) { 179 + return "", "", "", false 180 + } 181 + parts := strings.SplitN(uri[len(prefix):], "/", 3) 182 + if len(parts) < 1 || parts[0] == "" { 183 + return "", "", "", false 184 + } 185 + did = parts[0] 186 + if len(parts) >= 2 { 187 + coll = parts[1] 188 + } 189 + if len(parts) >= 3 { 190 + rkey = parts[2] 191 + } 192 + return did, coll, rkey, true 193 + } 194 + ``` 195 + 196 + --- 197 + 198 + ## JSON parsing on the hot path 199 + 200 + Every Jetstream event has to be parsed and then walked to pull `$type` strings, backlinks, and URLs out of arbitrary record bodies. Records are heterogeneous, so the parse target is effectively `any` — every nested object becomes `map[string]any`, every array `[]any`. That's the worst case for Go's stdlib `encoding/json` (reflection + allocation per node). 201 + 202 + Benchmarked side-by-side against the same TS script (`scripts/count-records-24h.mjs`) over the same 24h window: 203 + 204 + | Runtime | avg_parse | avg_type (walk) | 205 + |---|---|---| 206 + | Go `encoding/json` | ~11.0us | ~0.7us | 207 + | Deno | ~7.0us | ~2.2us | 208 + | Node 22 (V8 `JSON.parse`) | ~6.5us | ~1.8us | 209 + | **Go `bytedance/sonic`** | **~5.5us** | ~0.7us | 210 + 211 + V8's `JSON.parse` is hand-tuned C++ and beats stdlib Go on raw parse. Sonic (SIMD + JIT-compiled type-specific decoders) wins outright, and the typed map-walk is already faster in Go than V8 dynamic property lookup, so the end-to-end per-record cost flips in Go's favour once sonic is in. 212 + 213 + **Use `github.com/bytedance/sonic` with `ConfigFastest` for record-level parsing.** Drop-in for `encoding/json` — same `json:"…"` struct tags, same API. 214 + 215 + ```go 216 + import "github.com/bytedance/sonic" 217 + 218 + var jsonAPI = sonic.ConfigFastest // skips stdlib-compat checks (key sort, strict utf-8) 219 + 220 + if err := jsonAPI.Unmarshal(data, &evt); err != nil { 221 + continue 222 + } 223 + ``` 224 + 225 + Notes: 226 + - JIT warmup: first handful of decodes are slower than steady-state. Call `sonic.Pretouch(reflect.TypeOf(evt))` at startup if it matters. 227 + - `ConfigFastest` skips stable map-key ordering on marshal and strict UTF-8 validation. Fine for ingest; revisit if marshalling output that a strict parser will read. 228 + - JIT path is amd64 / arm64 only — covers linux-amd64, linux-arm64, darwin-arm64. Other archs fall back to a slower compat mode. 229 + 230 + --- 231 + 232 + ## Record verification strategy 233 + 234 + Jetstream strips signatures. atproto signs **commits**, not records — there's no per-record signature anywhere in the protocol. The signature lives on the parent commit, with the record reachable via the MST inside that commit. The full crypto dance to verify a Jetstream-delivered record is `sync.getRecord` (returns CAR with signed commit + MST proof + record blocks) → parse CAR → resolve DID → verify commit signature → verify MST inversion → confirm record CID matches Jetstream's CID. Indigo has the primitives (`atproto/atcrypto`, `atproto/identity`, `atproto/repo`, see `cmd/relay/relay/verify.go` for the canonical implementation). 235 + 236 + **We're not doing that.** The PDS is authoritative for records on its DIDs. If the PDS serves the record, that's verification enough for an aggregator — we don't republish, so we don't need to attest cryptographically. **Use `com.atproto.repo.getRecord(did, collection, rkey)` instead.** One HTTP round-trip, no CAR parsing, no signature verification, no MST traversal. The marginal trust gain from full crypto verify over "the PDS served it" doesn't justify the code-complexity / latency / key-rotation cost. Reserve the full dance for if we ever become a relay. 237 + 238 + ### Two tiers of verification 239 + 240 + **Tier 1 — synchronous, high-stakes records.** For records that drive a decision at ingest (votes, appeals, anything we'd act on directly), call `com.atproto.repo.getRecord` inline against the originating PDS before accepting. PDS endpoint comes from the DID document via `identity.Directory.LookupDID(did).PDSEndpoint()`. Cost: one HTTP round-trip + DID resolution (cached after first hit). Acceptable for low-volume / high-trust events. 241 + 242 + **Tier 2 — sampled background verification for aggregate workloads.** For records that only matter in aggregate (counts, rates, link graph, type histograms), don't verify inline. Sample at ingest, hash the record content, queue it, and verify in a background goroutine. 243 + 244 + ``` 245 + ingest → sample(1%) → store (did, collection, rkey, jetstream_cid, content_hash, observed_at) 246 + → background worker → getRecord → compare → record outcome 247 + ``` 248 + 249 + Failure mode is **log and keep going** — never gate the aggregate pipeline on tier-2 results, that defeats the point of sampling. The failure log is the audit trail. 250 + 251 + ### `verify_failures` schema 252 + 253 + Track failures with enough context to act on patterns: 254 + 255 + ``` 256 + (did, collection, rkey, expected_hash, observed_hash, kind, observed_at, verified_at) 257 + ``` 258 + 259 + `kind ∈ {mismatch, not_found, pds_error, did_unresolved}`. Distinct kinds matter — `mismatch` is "Jetstream lied or the record was edited", `not_found` is "deleted between observation and verify" (legitimate), `pds_error` is operational noise, `did_unresolved` is upstream identity infra. 260 + 261 + ### Practical bits 262 + 263 + - Single `identity.DefaultDirectory()` per process — its 24h LRU (250k entries) does the heavy lifting. 264 + - `atclient.NewAPIClient(pdsURL)` per host, cached in an LRU keyed on host. Indigo doesn't pool these. 265 + - **Bound concurrency per PDS** — don't be the source of an outage. Small worker pool per host (e.g. 4) + a global cap. 266 + - Hash records canonically (DAG-CBOR, then SHA-256) — don't compare JSON byte strings, since Jetstream re-serializes and the PDS re-serializes independently. Or just compare the CIDs once you compute them the same way both sides. 267 + 268 + ## Harvester schema 269 + 270 + Two databases. SQLite holds the **cursor** (small, transactional); DuckDB holds the **observed data** (analytical, append-heavy). Both live on the same fly volume so a snapshot captures consistent state. 271 + 272 + ### SQLite (`harvest_blue.sqlite`) 273 + 274 + ```sql 275 + CREATE TABLE IF NOT EXISTS cursor ( 276 + name TEXT PRIMARY KEY, 277 + cursor_us INTEGER NOT NULL, 278 + updated_at TEXT NOT NULL 279 + ); 280 + ``` 281 + 282 + Single row keyed `name='jetstream'`. `cursor_us` is the `time_us` of the last successfully-flushed event. Read at startup, upserted after every successful DuckDB flush. **Never advance the cursor before the flush.** 283 + 284 + ### DuckDB (`harvest.duckdb`) 285 + 286 + ```sql 287 + CREATE TABLE IF NOT EXISTS types ( 288 + did VARCHAR NOT NULL, 289 + typename VARCHAR NOT NULL, 290 + timestamp TIMESTAMP NOT NULL 291 + ); 292 + CREATE TABLE IF NOT EXISTS urls ( 293 + did VARCHAR NOT NULL, 294 + url VARCHAR NOT NULL, 295 + timestamp TIMESTAMP NOT NULL 296 + ); 297 + CREATE TABLE IF NOT EXISTS backlinks ( 298 + did VARCHAR NOT NULL, 299 + subject_did VARCHAR NOT NULL, 300 + subject_collection VARCHAR NOT NULL, 301 + subject_rkey VARCHAR NOT NULL, 302 + timestamp TIMESTAMP NOT NULL 303 + ); 304 + CREATE TABLE IF NOT EXISTS mentions ( 305 + did VARCHAR NOT NULL, 306 + subject_did VARCHAR NOT NULL, 307 + timestamp TIMESTAMP NOT NULL 308 + ); 309 + ``` 310 + 311 + `did` is the **author** (the repo the commit lives in). `subject_*` columns are the *referenced* DID/collection/rkey for backlinks, just the referenced DID for mentions. `timestamp` is the Jetstream `time_us` of the commit. Per-record dedupe so each tuple is unique within a record body — same record, repeated `$type`/url/etc. → one row. 312 + 313 + --- 314 + 315 + ## Format-based string extraction 316 + 317 + Records are heterogeneous and references can show up anywhere in the tree. **Don't trust field names** to tell you whether a value is a DID, at-uri, or URL. Walk every string value and classify by format. Strings that don't fully match a known shape are dropped. 318 + 319 + ```go 320 + func classify(s string, ex *recordExtract) { 321 + switch { 322 + case strings.HasPrefix(s, "did:"): 323 + if isDID(s) { ex.mentions[s] = struct{}{} } 324 + case strings.HasPrefix(s, "at://"): 325 + if did, coll, rkey, ok := parseAtURI(s); ok { 326 + ex.backlinks[backlinkKey{did, coll, rkey}] = struct{}{} 327 + } else if did, ok := parseAtURIRepo(s); ok { 328 + ex.mentions[did] = struct{}{} // bare at://<did> → mention 329 + } 330 + case strings.HasPrefix(s, "http://"), strings.HasPrefix(s, "https://"): 331 + if isHTTPURL(s) { ex.urls[s] = struct{}{} } 332 + } 333 + } 334 + ``` 335 + 336 + ### DID validation 337 + 338 + The atproto spec is permissive (any `did:method:identifier`); we accept only `did:plc:` and `did:web:`: 339 + - `did:plc:` + **exactly 24** lowercase base32 chars (`a-z2-7`). 340 + - `did:web:` + a domain-shaped suffix (`a-zA-Z0-9.\-:%_`, length-bounded). 341 + 342 + Strict full-match — rejects "starts with did:". Without the `did:plc` 24-char check, garbage like `did:plc:short` would slip through. 343 + 344 + ### At-uri forms 345 + 346 + | Form | Action | 347 + |---|---| 348 + | `at://<did>/<collection>/<rkey>` | record reference → **backlinks** | 349 + | `at://<did>` (no slash after did) | repo reference → **mentions** (extract inner DID) | 350 + | `at://<did>/<collection>` (no rkey) | collection reference → **skipped** (rare; doesn't fit schema) | 351 + 352 + Treating bare `at://<did>` as a mention prevents losing references when authors write the at-uri form vs the bare DID — both routes flow into the same `mentions` set, which deduplicates. 353 + 354 + ### URL validation 355 + 356 + `net/url.Parse` + require `http`/`https` scheme + non-empty host + length ≤ 2048. Cheap and rejects bare hostnames, mailto:, javascript:, etc. 357 + 358 + --- 359 + 360 + ## At-least-once persistence: appender + cursor 361 + 362 + DuckDB appender batches in memory; SQLite holds the cursor. Per second: 363 + 364 + 1. **Flush all DuckDB appenders.** If any error → don't advance cursor. 365 + 2. **Upsert the cursor row** in SQLite to the latest `lastTimeUs`. 366 + 367 + Order matters: data first, cursor second. A crash between (1) and (2) leaves the cursor pointing at older data; on restart we reprocess and produce duplicates in DuckDB. That's the at-least-once tradeoff. If we needed exactly-once we'd need a 2PC or do everything in DuckDB, but for an append-heavy aggregate store duplicates are tolerable. 368 + 369 + ```go 370 + flushAndCheckpoint := func() { 371 + ok := true 372 + for _, a := range allAppenders { 373 + if err := a.Flush(); err != nil { ok = false; log.Printf("flush: %v", err) } 374 + } 375 + if !ok || lastTimeUs <= 0 { return } 376 + upsertCursor.ExecContext(ctx, cursorName, lastTimeUs, time.Now().UTC().Format(time.RFC3339Nano)) 377 + } 378 + ``` 379 + 380 + Reuse a single per-record extract (cleared via `clear()` per iteration) so the maps don't allocate per record. 381 + 382 + ### Appender setup pitfall 383 + 384 + The duckdb-go appender needs a `driver.Conn`, not a `*sql.DB`. Get it via `db.Conn(ctx).Raw(func(any) error{...})`: 385 + 386 + ```go 387 + sqlConn, _ := db.Conn(ctx) 388 + defer sqlConn.Close() 389 + var app *duckdb.Appender 390 + _ = sqlConn.Raw(func(dc any) error { 391 + var err error 392 + app, err = duckdb.NewAppenderFromConn(dc.(driver.Conn), "", "types") 393 + return err 394 + }) 395 + defer app.Close() // flushes residual on close 396 + ``` 397 + 398 + The appender holds a strong reference to the underlying conn. **Don't release `sqlConn`** until after `app.Close()`. 399 + 400 + --- 401 + 402 + ## Always-on Jetstream consumer 403 + 404 + Two non-obvious lessons running this against fly's shared CPU: 405 + 406 + ### 1. Never `os.Exit` on a websocket disconnect 407 + 408 + Jetstream drops slow consumers without ceremony — the read returns `unexpected EOF`. The naïve handler exits the process; fly restarts; HTTP health check fails for the ~10s restart window. Reconnect with backoff instead, and the HTTP server stays up across drops. 409 + 410 + ```go 411 + for ctx.Err() == nil { 412 + conn, _, err := websocket.Dial(ctx, urlWithCursor(lastTimeUs), nil) 413 + if err != nil { sleepWithBackoff(); continue } 414 + for { 415 + _, data, err := conn.Read(ctx) 416 + if err != nil { conn.Close(...); break } // → outer loop, reconnect 417 + // process... 418 + } 419 + } 420 + ``` 421 + 422 + Use `max(savedCursor, lastTimeUs)` on each reconnect so we resume from the latest checkpoint without re-fetching. 423 + 424 + ### 2. Slow-consumer eviction is the real symptom of CPU throttling 425 + 426 + `shared-cpu-1x` baseline is **1/16 of a vCPU** with burst credits. Backfill at 5k records/sec needs ~75% of one core sustained — way over baseline. Once burst credits are spent we get throttled, fall behind reading the websocket, Jetstream's server-side buffer fills, they evict us. Reconnect immediately, repeat. 427 + 428 + Symptoms: regular `unexpected EOF` every ~80–100s, `loadavg` ≫ 1 on a 1-CPU machine, `lag` not closing as expected. 429 + 430 + Fix options: 431 + - **`fly scale vm performance-1x`** for backfill (~$30/mo, 1 dedicated vCPU, no throttling), scale back down once caught up. 432 + - Stay shared and accept long catchup — once we're at realtime the steady-state CPU is well within baseline. 433 + 434 + The reconnect logic is what makes either path tolerable. 435 + 436 + --- 437 + 438 + ## Catchup is cheaper than a health-check failure 439 + 440 + Jetstream supports cursor-based replay within its retention window (~36h observed in `iad`). That shifts the operational tradeoff: **HTTP liveness is more valuable than current ingest throughput**, because lag is recoverable but downtime isn't. If processing falls behind, we backfill from the cursor on the next reconnect. If `/health` goes dark, fly stops routing traffic and treats us as broken. 441 + 442 + Practical rule: **slow record processing before letting it starve the HTTP listener.** 443 + 444 + - Cap the worst-case work per iteration. A pathologically large record body, a slow DuckDB flush (CGO — Go's scheduler can't preempt), or a backlog of pending appender rows can all monopolize the only CPU we have. The HTTP goroutine can only respond to `/health` when the scheduler hands it a slot. 445 + - If a flush exceeds its budget (~1s today), the right move is **back off ingest**, not power through. Better to skip-and-replay later than to block `/health` for 5+ seconds and trip a check failure. 446 + - A bounded channel between the websocket reader and the record processor self-regulates this: when the channel fills, the reader stalls, Jetstream evicts us, we reconnect from the cursor when the processor is ready. The HTTP goroutine never noticed. 447 + - The reconnect loop already absorbs slow-consumer evictions; **keeping `/health` answering is what makes that resilient**. If health fails, fly stops the machine, the volume unmounts (with `EBUSY` retries), and recovery takes ~10s — exactly the gap that's visible to upstream callers. 448 + 449 + In short: it's fine to be a few minutes behind. It's not fine to be unreachable. The cursor lets us trade the first for the second whenever we have to. 450 + 451 + --- 452 + 453 + ## /health for fly checks 454 + 455 + Cross-goroutine state via `sync/atomic.Int64`. The HTTP handler reads atomics and synthesizes a snapshot — no mutex on the hot path. 456 + 457 + ```go 458 + var ( 459 + healthLastEventTimeUs atomic.Int64 // event time_us 460 + healthLastEventAtNs atomic.Int64 // wall-clock ns when last event arrived 461 + healthRecordEvents atomic.Int64 462 + healthStartedAtNs atomic.Int64 463 + ) 464 + // hot path: 465 + healthLastEventAtNs.Store(time.Now().UnixNano()) 466 + healthLastEventTimeUs.Store(evt.TimeUs) 467 + healthRecordEvents.Store(recordEvents) 468 + ``` 469 + 470 + **Healthy = received an event in the last 60s AND have a real `lastEventTimeUs`.** That second clause prevents reporting healthy at boot before any data has flowed. Status code 200 vs 503 — fly's HTTP health check uses the status directly. 471 + 472 + Body is JSON: `lag_seconds`, `lag_human`, `last_event_at`, `seconds_since_last`, `record_events`, `uptime_seconds`. Cheap to compute, lets us spot-check from a browser without ssh-ing in. 473 + 474 + --- 475 + 476 + ## Fly.io deploy 477 + 478 + ### App name constraint 479 + 480 + Fly app names match `[a-z0-9-]+` — **no dots**. `harvester.blue` (the project) is `harvester-blue` (the fly app). The .blue domain stays the project's identity, just not the fly resource name. 481 + 482 + ### Single-machine + persistent volume 483 + 484 + ```toml 485 + # fly.toml 486 + app = 'harvester-blue' 487 + primary_region = 'iad' # close to jetstream2.us-east 488 + 489 + [http_service] 490 + internal_port = 8080 491 + force_https = true 492 + auto_stop_machines = 'off' # always-on consumer, never sleep 493 + auto_start_machines = false 494 + min_machines_running = 1 495 + 496 + [[http_service.checks]] 497 + interval = '30s' 498 + timeout = '5s' 499 + grace_period = '5m' # fly silently clamps this to 1m 500 + method = 'GET' 501 + path = '/health' 502 + 503 + [[vm]] 504 + cpu_kind = 'shared' 505 + cpus = 1 506 + memory_mb = 1024 507 + 508 + [[mounts]] 509 + source = 'harvester_data' 510 + destination = '/data' 511 + initial_size = '100' # GB 512 + ``` 513 + 514 + Volumes are region-pinned and machine-pinned. One machine per volume. Volume snapshots are scheduled by default (5-snapshot retention) — that's the recovery story for "rebuild backend later, keep the data". 515 + 516 + ### Dockerfile + remote builder 517 + 518 + CGO is required (DuckDB and SQLite drivers). Build inside a `golang:1.25-bookworm` stage, ship from `debian:bookworm-slim` with just `ca-certificates` for TLS: 519 + 520 + ```dockerfile 521 + FROM golang:1.25-bookworm AS builder 522 + WORKDIR /src 523 + COPY go.mod go.sum ./ 524 + RUN go mod download 525 + COPY *.go ./ 526 + RUN CGO_ENABLED=1 GOOS=linux go build -trimpath -o /out/harvester . 527 + 528 + FROM debian:bookworm-slim 529 + RUN apt-get update && apt-get install -y --no-install-recommends ca-certificates && rm -rf /var/lib/apt/lists/* 530 + COPY --from=builder /out/harvester /usr/local/bin/harvester 531 + RUN mkdir -p /data 532 + WORKDIR /data 533 + EXPOSE 8080 534 + CMD ["/usr/local/bin/harvester"] 535 + ``` 536 + 537 + `fly deploy` uses fly's remote builder (Depot) by default — no local Docker required, cross-compile from any host. The DuckDB Go bindings ship static prebuilt libraries per platform so the runtime image only needs glibc + ca-certs. 538 + 539 + `WORKDIR /data` makes the binary's relative paths (`harvest.duckdb`, `harvest_blue.sqlite`) land on the mounted volume without code changes. 540 + 541 + ### Setup sequence 542 + 543 + ```bash 544 + fly apps create harvester-blue --org personal 545 + fly volumes create harvester_data --region iad --size 100 --app harvester-blue --yes 546 + fly deploy --ha=false --app harvester-blue # --ha=false keeps it to one machine 547 + ``` 548 + 549 + `--ha=false` matters: fly's default is to try for high availability (multiple machines), but with one volume only one machine can attach. 550 + 551 + ### Sizing: shared vs performance 552 + 553 + | Workload | Best fit | 554 + |---|---| 555 + | Backfill (CPU-bound, sustained) | `performance-1x` (1 dedicated vCPU). Scale up while catching up, scale back down. | 556 + | Steady-state (~1k records/s after caught up) | `shared-cpu-1x` 1 GB is enough — well under baseline. | 557 + | Both, no babysitting | `shared-cpu-2x` 2 GB. Slower catchup but no throttling drama. | 558 + 559 + For our toy budget, `shared-cpu-1x` 1 GB + 100 GB volume ≈ $21/mo total. Reconnect logic absorbs the slow-consumer evictions during backfill. 560 + 561 + --- 562 + 563 + ## Quick-reference snippets 564 + 565 + ### Open both DBs 566 + 567 + ```go 568 + import ( 569 + _ "github.com/duckdb/duckdb-go/v2" 570 + _ "github.com/mattn/go-sqlite3" 571 + ) 572 + 573 + ddb, _ := sql.Open("duckdb", "") // in-memory; pass a path to persist 574 + sdb, _ := sql.Open("sqlite3", "/tmp/x.db") // path required (":memory:" for in-mem) 575 + ``` 576 + 577 + ### Install a community extension and ATTACH SQLite 578 + 579 + ```go 580 + for _, stmt := range []string{ 581 + `INSTALL urlpattern FROM community`, `LOAD urlpattern`, 582 + `INSTALL sqlite`, `LOAD sqlite`, 583 + } { 584 + if _, err := ddb.Exec(stmt); err != nil { log.Fatal(err) } 585 + } 586 + ddb.Exec(fmt.Sprintf(`ATTACH '%s' AS s (TYPE sqlite, READ_ONLY)`, sqlitePath)) 587 + ``` 588 + 589 + ### Check whether an extension is built for your DuckDB version 590 + 591 + ```bash 592 + curl -sI "https://community-extensions.duckdb.org/v1.5.1/osx_arm64/urlpattern.duckdb_extension.gz" 593 + # 200 = built; 404 = pin to a different DuckDB version 594 + ```
+586 -45
main.go
··· 1 1 package main 2 2 3 3 import ( 4 + "bytes" 4 5 "context" 6 + "database/sql" 7 + "database/sql/driver" 5 8 "encoding/json" 9 + "errors" 6 10 "fmt" 7 11 "log" 12 + "net/http" 8 13 "net/url" 9 14 "os" 10 15 "os/signal" 16 + "strings" 17 + "sync/atomic" 11 18 "syscall" 12 19 "time" 13 20 21 + "github.com/bytedance/sonic" 14 22 "github.com/coder/websocket" 23 + "github.com/duckdb/duckdb-go/v2" 24 + _ "github.com/mattn/go-sqlite3" 15 25 ) 16 26 27 + // Atomics read by the /health HTTP handler. Written by the main goroutine. 28 + var ( 29 + healthLastEventTimeUs atomic.Int64 // time_us of the most recent event we processed 30 + healthLastEventAtNs atomic.Int64 // wall-clock UnixNano when we last got *any* commit 31 + healthRecordEvents atomic.Int64 // total commits processed so far 32 + healthStartedAtNs atomic.Int64 // wall-clock UnixNano at startup 33 + ) 34 + 35 + // jsonAPI is the configured JSON parser. ConfigFastest skips a couple of 36 + // stdlib-compat checks (sorting map keys, etc.) for max throughput. 37 + var jsonAPI = sonic.ConfigFastest 38 + 39 + // kindCommitToken is the fast pre-filter to skip identity / account events 40 + // without ever entering the JSON parser. 41 + var kindCommitToken = []byte(`"kind":"commit"`) 42 + 43 + // event is the subset of a Jetstream commit we look at. 17 44 type event struct { 45 + Did string `json:"did"` 18 46 Kind string `json:"kind"` 19 47 TimeUs int64 `json:"time_us"` 20 48 Commit *struct { 21 - Operation string `json:"operation"` 22 - Record json.RawMessage `json:"record"` 49 + Record any `json:"record"` 23 50 } `json:"commit"` 24 51 } 25 52 53 + const ( 54 + // duckdbPath is the analytical store. Persistent across runs. 55 + duckdbPath = "harvest.duckdb" 56 + // sqlitePath holds the cursor (and any other small state). Persistent. 57 + sqlitePath = "harvest_blue.sqlite" 58 + // cursorName keys the single-row jetstream cursor in the cursor table. 59 + cursorName = "jetstream" 60 + ) 61 + 62 + // backlinkKey is the deduplication key for at-uri references inside a 63 + // single record body. 64 + type backlinkKey struct { 65 + did, collection, rkey string 66 + } 67 + 68 + // recordExtract collects everything we want to write out for one record. 69 + // Maps are reused across records via clear() to avoid allocations. 70 + type recordExtract struct { 71 + types map[string]struct{} 72 + urls map[string]struct{} 73 + backlinks map[backlinkKey]struct{} 74 + mentions map[string]struct{} 75 + } 76 + 77 + func newRecordExtract() *recordExtract { 78 + return &recordExtract{ 79 + types: make(map[string]struct{}), 80 + urls: make(map[string]struct{}), 81 + backlinks: make(map[backlinkKey]struct{}), 82 + mentions: make(map[string]struct{}), 83 + } 84 + } 85 + 86 + func (e *recordExtract) reset() { 87 + clear(e.types) 88 + clear(e.urls) 89 + clear(e.backlinks) 90 + clear(e.mentions) 91 + } 92 + 26 93 func main() { 27 94 ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) 28 95 defer cancel() 29 96 30 - connectTime := time.Now() 31 - startUs := connectTime.Add(-1 * time.Hour).UnixMicro() 32 - endUs := connectTime.UnixMicro() 33 - wsURL := fmt.Sprintf("wss://jetstream2.us-east.bsky.network/subscribe?cursor=%d", startUs) 97 + healthStartedAtNs.Store(time.Now().UnixNano()) 34 98 35 - conn, _, err := websocket.Dial(ctx, wsURL, nil) 99 + // --- HTTP: /health ----------------------------------------------------- 100 + httpAddr := ":" + envOr("PORT", "8080") 101 + mux := http.NewServeMux() 102 + mux.HandleFunc("/health", healthHandler) 103 + httpServer := &http.Server{Addr: httpAddr, Handler: mux, ReadHeaderTimeout: 5 * time.Second} 104 + go func() { 105 + log.Printf("health: listening on %s/health", httpAddr) 106 + if err := httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { 107 + log.Printf("http server: %v", err) 108 + } 109 + }() 110 + defer func() { 111 + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 3*time.Second) 112 + defer shutdownCancel() 113 + _ = httpServer.Shutdown(shutdownCtx) 114 + }() 115 + 116 + // --- SQLite: cursor state ---------------------------------------------- 117 + sqliteDB, err := sql.Open("sqlite3", sqlitePath) 36 118 if err != nil { 37 - log.Fatalf("dial: %v", err) 119 + log.Fatalf("sqlite open: %v", err) 38 120 } 39 - defer conn.Close(websocket.StatusNormalClosure, "") 40 - conn.SetReadLimit(4 << 20) 121 + defer sqliteDB.Close() 122 + 123 + if _, err := sqliteDB.ExecContext(ctx, ` 124 + CREATE TABLE IF NOT EXISTS cursor ( 125 + name TEXT PRIMARY KEY, 126 + cursor_us INTEGER NOT NULL, 127 + updated_at TEXT NOT NULL 128 + ); 129 + `); err != nil { 130 + log.Fatalf("sqlite cursor table: %v", err) 131 + } 132 + 133 + upsertCursor, err := sqliteDB.PrepareContext(ctx, ` 134 + INSERT INTO cursor (name, cursor_us, updated_at) VALUES (?, ?, ?) 135 + ON CONFLICT(name) DO UPDATE SET cursor_us = excluded.cursor_us, updated_at = excluded.updated_at; 136 + `) 137 + if err != nil { 138 + log.Fatalf("prepare upsert cursor: %v", err) 139 + } 140 + defer upsertCursor.Close() 41 141 42 - urls := make(map[string]struct{}) 142 + // Resume from saved cursor, default to 1µs (oldest available). 143 + startUs := int64(1) 144 + if err := sqliteDB.QueryRowContext(ctx, 145 + `SELECT cursor_us FROM cursor WHERE name = ?`, cursorName, 146 + ).Scan(&startUs); err != nil && !errors.Is(err, sql.ErrNoRows) { 147 + log.Fatalf("read cursor: %v", err) 148 + } 149 + if startUs <= 0 { 150 + startUs = 1 151 + } 152 + requestedAt := time.UnixMicro(startUs).UTC() 43 153 44 - lastReport := time.Now() 45 - report := func() { 46 - fmt.Printf("\runique_urls=%d", len(urls)) 154 + // --- DuckDB: 4 tables, 4 appenders ------------------------------------- 155 + db, err := sql.Open("duckdb", duckdbPath) 156 + if err != nil { 157 + log.Fatalf("duckdb open: %v", err) 47 158 } 159 + defer db.Close() 48 160 161 + if _, err := db.ExecContext(ctx, ` 162 + CREATE TABLE IF NOT EXISTS types ( 163 + did VARCHAR NOT NULL, 164 + typename VARCHAR NOT NULL, 165 + timestamp TIMESTAMP NOT NULL 166 + ); 167 + CREATE TABLE IF NOT EXISTS urls ( 168 + did VARCHAR NOT NULL, 169 + url VARCHAR NOT NULL, 170 + timestamp TIMESTAMP NOT NULL 171 + ); 172 + CREATE TABLE IF NOT EXISTS backlinks ( 173 + did VARCHAR NOT NULL, 174 + subject_did VARCHAR NOT NULL, 175 + subject_collection VARCHAR NOT NULL, 176 + subject_rkey VARCHAR NOT NULL, 177 + timestamp TIMESTAMP NOT NULL 178 + ); 179 + CREATE TABLE IF NOT EXISTS mentions ( 180 + did VARCHAR NOT NULL, 181 + subject_did VARCHAR NOT NULL, 182 + timestamp TIMESTAMP NOT NULL 183 + ); 184 + `); err != nil { 185 + log.Fatalf("create tables: %v", err) 186 + } 187 + 188 + sqlConn, err := db.Conn(ctx) 189 + if err != nil { 190 + log.Fatalf("duckdb conn: %v", err) 191 + } 192 + defer sqlConn.Close() 193 + 194 + var ( 195 + typesApp *duckdb.Appender 196 + urlsApp *duckdb.Appender 197 + backlinksApp *duckdb.Appender 198 + mentionsApp *duckdb.Appender 199 + ) 200 + if err := sqlConn.Raw(func(anyConn any) error { 201 + dc := anyConn.(driver.Conn) 202 + var err error 203 + if typesApp, err = duckdb.NewAppenderFromConn(dc, "", "types"); err != nil { 204 + return err 205 + } 206 + if urlsApp, err = duckdb.NewAppenderFromConn(dc, "", "urls"); err != nil { 207 + return err 208 + } 209 + if backlinksApp, err = duckdb.NewAppenderFromConn(dc, "", "backlinks"); err != nil { 210 + return err 211 + } 212 + if mentionsApp, err = duckdb.NewAppenderFromConn(dc, "", "mentions"); err != nil { 213 + return err 214 + } 215 + return nil 216 + }); err != nil { 217 + log.Fatalf("appenders: %v", err) 218 + } 219 + allAppenders := []*duckdb.Appender{typesApp, urlsApp, backlinksApp, mentionsApp} 220 + defer func() { 221 + for _, a := range allAppenders { 222 + if err := a.Close(); err != nil { 223 + log.Printf("appender close: %v", err) 224 + } 225 + } 226 + }() 227 + 228 + // --- Websocket connect parameters -------------------------------------- 229 + fmt.Printf("requested cursor: %s (%s ago)\n", requestedAt.Format(time.RFC3339), shortDur(time.Since(requestedAt))) 230 + 231 + // --- Loop state -------------------------------------------------------- 232 + extract := newRecordExtract() 233 + 234 + var ( 235 + recordEvents int64 236 + lastTimeUs int64 237 + ) 238 + 239 + var lastTickAt time.Time 240 + 241 + flushAndCheckpoint := func() { 242 + ok := true 243 + for _, a := range allAppenders { 244 + if err := a.Flush(); err != nil { 245 + log.Printf("flush: %v", err) 246 + ok = false 247 + } 248 + } 249 + if !ok || lastTimeUs <= 0 { 250 + return 251 + } 252 + if _, err := upsertCursor.ExecContext(ctx, cursorName, lastTimeUs, time.Now().UTC().Format(time.RFC3339Nano)); err != nil { 253 + log.Printf("cursor upsert: %v", err) 254 + } 255 + } 256 + 257 + logTick := func() { 258 + if lastTimeUs <= 0 { 259 + return 260 + } 261 + last := time.UnixMicro(lastTimeUs).UTC() 262 + log.Printf("last_event=%s lag=%s", last.Format(time.RFC3339), shortDur(time.Since(last))) 263 + } 264 + 265 + shutdown := func(reason string) { 266 + flushAndCheckpoint() 267 + log.Printf("shutdown: %s", reason) 268 + } 269 + 270 + // --- Reconnect loop ---------------------------------------------------- 271 + // Connection drops are routine on a long-lived Jetstream stream (idle 272 + // timeouts, slow-consumer eviction, intermittent network). We never 273 + // exit the process — the HTTP server stays up so /health continues to 274 + // answer, and we resume from the latest cursor. 275 + announcedCaughtUp := false 276 + const minBackoff = 1 * time.Second 277 + const maxBackoff = 30 * time.Second 278 + backoff := minBackoff 279 + 280 + reconnect: 49 281 for { 50 - _, data, err := conn.Read(ctx) 51 - if err != nil { 52 - fmt.Println() 53 - log.Printf("read: %v", err) 54 - break 282 + if ctx.Err() != nil { 283 + shutdown("interrupted") 284 + return 55 285 } 56 286 57 - var e event 58 - if err := json.Unmarshal(data, &e); err != nil { 59 - continue 287 + // Cursor for this connect: max(saved-startUs, lastTimeUs in memory). 288 + cursor := startUs 289 + if lastTimeUs > cursor { 290 + cursor = lastTimeUs 60 291 } 292 + wsURL := fmt.Sprintf("wss://jetstream2.us-east.bsky.network/subscribe?cursor=%d", cursor) 293 + log.Printf("connecting: %s", wsURL) 61 294 62 - if e.Kind == "commit" && e.Commit != nil && len(e.Commit.Record) > 0 { 63 - var body any 64 - if err := json.Unmarshal(e.Commit.Record, &body); err == nil { 65 - walk(body, urls) 295 + dialCtx, dialCancel := context.WithTimeout(ctx, 30*time.Second) 296 + conn, _, err := websocket.Dial(dialCtx, wsURL, nil) 297 + dialCancel() 298 + if err != nil { 299 + log.Printf("dial failed: %v (retry in %s)", err, backoff) 300 + select { 301 + case <-ctx.Done(): 302 + shutdown("interrupted") 303 + return 304 + case <-time.After(backoff): 66 305 } 306 + backoff = nextBackoff(backoff, maxBackoff) 307 + continue reconnect 67 308 } 309 + conn.SetReadLimit(4 << 20) 310 + log.Printf("connected") 311 + backoff = minBackoff 312 + 313 + for { 314 + _, data, err := conn.Read(ctx) 315 + if err != nil { 316 + _ = conn.Close(websocket.StatusNormalClosure, "") 317 + if errors.Is(err, context.Canceled) { 318 + shutdown("interrupted") 319 + return 320 + } 321 + flushAndCheckpoint() 322 + log.Printf("socket dropped: %v (reconnect in %s)", err, backoff) 323 + select { 324 + case <-ctx.Done(): 325 + shutdown("interrupted") 326 + return 327 + case <-time.After(backoff): 328 + } 329 + backoff = nextBackoff(backoff, maxBackoff) 330 + continue reconnect 331 + } 68 332 69 - if e.TimeUs >= endUs { 70 - fmt.Println() 71 - log.Printf("reached 1h of data (cursor caught up to connect time)") 72 - break 73 - } 333 + if !bytes.Contains(data, kindCommitToken) { 334 + continue 335 + } 336 + 337 + var evt event 338 + if err := jsonAPI.Unmarshal(data, &evt); err != nil { 339 + continue 340 + } 341 + 342 + if evt.Kind != "commit" || evt.Commit == nil { 343 + continue 344 + } 345 + 346 + if recordEvents == 0 && evt.TimeUs > 0 { 347 + firstAt := time.UnixMicro(evt.TimeUs).UTC() 348 + gap := firstAt.Sub(requestedAt) 349 + log.Printf("first event: %s (%s ago, gap %s after requested cursor)", 350 + firstAt.Format(time.RFC3339), shortDur(time.Since(firstAt)), shortDur(gap)) 351 + } 352 + 353 + recordEvents++ 354 + healthLastEventAtNs.Store(time.Now().UnixNano()) 355 + healthRecordEvents.Store(recordEvents) 356 + if evt.TimeUs > 0 { 357 + healthLastEventTimeUs.Store(evt.TimeUs) 358 + lastTimeUs = evt.TimeUs 359 + if !announcedCaughtUp && time.Now().UnixMicro()-lastTimeUs < 5_000_000 { 360 + log.Printf("caught up to realtime") 361 + announcedCaughtUp = true 362 + } 363 + } 364 + 365 + if evt.Commit.Record != nil { 366 + extract.reset() 367 + extractFromRecord(evt.Commit.Record, extract) 368 + 369 + ts := time.UnixMicro(evt.TimeUs) 370 + for t := range extract.types { 371 + if err := typesApp.AppendRow(evt.Did, t, ts); err != nil { 372 + log.Printf("types append: %v", err) 373 + } 374 + } 375 + for u := range extract.urls { 376 + if err := urlsApp.AppendRow(evt.Did, u, ts); err != nil { 377 + log.Printf("urls append: %v", err) 378 + } 379 + } 380 + for k := range extract.backlinks { 381 + if err := backlinksApp.AppendRow(evt.Did, k.did, k.collection, k.rkey, ts); err != nil { 382 + log.Printf("backlinks append: %v", err) 383 + } 384 + } 385 + for m := range extract.mentions { 386 + if err := mentionsApp.AppendRow(evt.Did, m, ts); err != nil { 387 + log.Printf("mentions append: %v", err) 388 + } 389 + } 390 + } 74 391 75 - if time.Since(lastReport) >= time.Second { 76 - report() 77 - lastReport = time.Now() 392 + if time.Since(lastTickAt) >= time.Second { 393 + flushAndCheckpoint() 394 + logTick() 395 + lastTickAt = time.Now() 396 + } 78 397 } 79 398 } 399 + } 80 400 81 - report() 82 - fmt.Println() 83 - fmt.Printf("DONE: %d unique URLs\n", len(urls)) 401 + // nextBackoff returns the next backoff duration, doubling each time up to 402 + // the given maximum. 403 + func nextBackoff(cur, max time.Duration) time.Duration { 404 + next := cur * 2 405 + if next > max { 406 + next = max 407 + } 408 + return next 84 409 } 85 410 86 - func walk(v any, set map[string]struct{}) { 411 + // extractFromRecord walks the parsed record and populates the four 412 + // per-record sets: 413 + // - types: every $type string found on any object (deep walk). 414 + // - urls/backlinks/mentions: every string value classified by FORMAT, 415 + // not by field name. We don't trust where DIDs/URIs live in the schema 416 + // — backlinks and mentions can show up anywhere. 417 + func extractFromRecord(v any, ex *recordExtract) { 87 418 switch x := v.(type) { 88 419 case map[string]any: 420 + if t, ok := x["$type"].(string); ok { 421 + ex.types[t] = struct{}{} 422 + } 89 423 for _, val := range x { 90 - walk(val, set) 424 + extractFromRecord(val, ex) 91 425 } 92 426 case []any: 93 - for _, val := range x { 94 - walk(val, set) 427 + for _, item := range x { 428 + extractFromRecord(item, ex) 95 429 } 96 430 case string: 97 - if isURL(x) { 98 - set[x] = struct{}{} 431 + classify(x, ex) 432 + } 433 + } 434 + 435 + // classify routes a string value into urls / backlinks / mentions based on 436 + // its format. The string must FULLY match the expected shape — we never 437 + // accept "starts with did:" or similar prefix-only matches. 438 + func classify(s string, ex *recordExtract) { 439 + switch { 440 + case strings.HasPrefix(s, "did:"): 441 + if isDID(s) { 442 + ex.mentions[s] = struct{}{} 443 + } 444 + case strings.HasPrefix(s, "at://"): 445 + // Full record at-uri: at://<did>/<collection>/<rkey> → backlink. 446 + // Bare repo at-uri: at://<did> → mention 447 + // (effectively a DID reference; we treat it the same as `did:plc:…` 448 + // so we don't lose references when authors write the at-uri form). 449 + // at://<did>/<collection> (no rkey) is skipped — rare and the 450 + // schema doesn't fit it cleanly. 451 + if did, coll, rkey, ok := parseAtURI(s); ok { 452 + ex.backlinks[backlinkKey{did, coll, rkey}] = struct{}{} 453 + } else if did, ok := parseAtURIRepo(s); ok { 454 + ex.mentions[did] = struct{}{} 455 + } 456 + case strings.HasPrefix(s, "http://"), strings.HasPrefix(s, "https://"): 457 + if isHTTPURL(s) { 458 + ex.urls[s] = struct{}{} 459 + } 460 + } 461 + } 462 + 463 + // parseAtURIRepo extracts the did from a bare repo at-uri "at://<did>" with 464 + // no slash after the did. Returns ("", false) for any other form (full 465 + // record at-uris, collection-only at-uris, or invalid input). 466 + func parseAtURIRepo(s string) (did string, ok bool) { 467 + const prefix = "at://" 468 + if !strings.HasPrefix(s, prefix) { 469 + return "", false 470 + } 471 + rest := s[len(prefix):] 472 + if strings.IndexByte(rest, '/') >= 0 { 473 + return "", false // has at least a collection segment 474 + } 475 + if !isDID(rest) { 476 + return "", false 477 + } 478 + return rest, true 479 + } 480 + 481 + // isDID returns true iff s is exactly a did:plc:<24-base32-chars> or 482 + // did:web:<domainish> — no trailing path / fragment / query. 483 + func isDID(s string) bool { 484 + const plcPrefix = "did:plc:" 485 + const webPrefix = "did:web:" 486 + switch { 487 + case strings.HasPrefix(s, plcPrefix): 488 + suffix := s[len(plcPrefix):] 489 + if len(suffix) != 24 { 490 + return false 491 + } 492 + for i := 0; i < len(suffix); i++ { 493 + c := suffix[i] 494 + if !((c >= 'a' && c <= 'z') || (c >= '2' && c <= '7')) { 495 + return false 496 + } 99 497 } 498 + return true 499 + case strings.HasPrefix(s, webPrefix): 500 + suffix := s[len(webPrefix):] 501 + if len(suffix) == 0 || len(suffix) > 253 { 502 + return false 503 + } 504 + for i := 0; i < len(suffix); i++ { 505 + c := suffix[i] 506 + ok := (c >= 'a' && c <= 'z') || 507 + (c >= 'A' && c <= 'Z') || 508 + (c >= '0' && c <= '9') || 509 + c == '.' || c == '-' || c == ':' || c == '%' || c == '_' 510 + if !ok { 511 + return false 512 + } 513 + } 514 + return true 100 515 } 516 + return false 101 517 } 102 518 103 - func isURL(s string) bool { 104 - if len(s) < 8 || len(s) > 2048 { 519 + // parseAtURI parses an at-uri of the form at://<did>/<collection>/<rkey>. 520 + // All three components are required; collection/rkey can't be empty. The 521 + // did component is validated via isDID. 522 + func parseAtURI(s string) (did, collection, rkey string, ok bool) { 523 + const prefix = "at://" 524 + if !strings.HasPrefix(s, prefix) { 525 + return "", "", "", false 526 + } 527 + rest := s[len(prefix):] 528 + slash1 := strings.IndexByte(rest, '/') 529 + if slash1 < 0 { 530 + return "", "", "", false 531 + } 532 + did = rest[:slash1] 533 + if !isDID(did) { 534 + return "", "", "", false 535 + } 536 + rest2 := rest[slash1+1:] 537 + slash2 := strings.IndexByte(rest2, '/') 538 + if slash2 < 0 { 539 + return "", "", "", false 540 + } 541 + collection = rest2[:slash2] 542 + rkey = rest2[slash2+1:] 543 + if collection == "" || rkey == "" { 544 + return "", "", "", false 545 + } 546 + // rkey should not contain another slash for a single-record at-uri. 547 + if strings.IndexByte(rkey, '/') >= 0 { 548 + return "", "", "", false 549 + } 550 + return did, collection, rkey, true 551 + } 552 + 553 + // isHTTPURL returns true iff s is a syntactically valid http/https URL with 554 + // a non-empty host. Length-bounded to keep walk-time predictable. 555 + func isHTTPURL(s string) bool { 556 + if len(s) > 2048 { 105 557 return false 106 558 } 107 559 u, err := url.Parse(s) ··· 116 568 } 117 569 return true 118 570 } 571 + 572 + // envOr returns the env var value, or fallback if unset/empty. 573 + func envOr(name, fallback string) string { 574 + if v := os.Getenv(name); v != "" { 575 + return v 576 + } 577 + return fallback 578 + } 579 + 580 + // healthHandler returns the current lag and basic liveness info for fly.io. 581 + // 200 if we received an event in the last 60s and have a real lastTimeUs; 582 + // 503 otherwise. 583 + func healthHandler(w http.ResponseWriter, _ *http.Request) { 584 + lastEventTimeUs := healthLastEventTimeUs.Load() 585 + lastEventAtNs := healthLastEventAtNs.Load() 586 + records := healthRecordEvents.Load() 587 + startedAtNs := healthStartedAtNs.Load() 588 + 589 + now := time.Now() 590 + var ( 591 + lagSec float64 592 + lastEventAt string 593 + eventReceived string 594 + uptimeSec float64 595 + ) 596 + if startedAtNs > 0 { 597 + uptimeSec = now.Sub(time.Unix(0, startedAtNs)).Seconds() 598 + } 599 + if lastEventTimeUs > 0 { 600 + lastEventAtTime := time.UnixMicro(lastEventTimeUs).UTC() 601 + lastEventAt = lastEventAtTime.Format(time.RFC3339Nano) 602 + lagSec = now.Sub(lastEventAtTime).Seconds() 603 + } 604 + var sinceLast time.Duration 605 + if lastEventAtNs > 0 { 606 + recvAt := time.Unix(0, lastEventAtNs).UTC() 607 + eventReceived = recvAt.Format(time.RFC3339Nano) 608 + sinceLast = now.Sub(recvAt) 609 + } 610 + 611 + healthy := lastEventTimeUs > 0 && lastEventAtNs > 0 && sinceLast < 60*time.Second 612 + 613 + resp := map[string]any{ 614 + "healthy": healthy, 615 + "lag_seconds": lagSec, 616 + "lag_human": shortDur(time.Duration(lagSec * float64(time.Second))), 617 + "last_event_at": lastEventAt, 618 + "last_event_received": eventReceived, 619 + "seconds_since_last": sinceLast.Seconds(), 620 + "record_events": records, 621 + "now": now.UTC().Format(time.RFC3339Nano), 622 + "uptime_seconds": uptimeSec, 623 + } 624 + 625 + status := http.StatusOK 626 + if !healthy { 627 + status = http.StatusServiceUnavailable 628 + } 629 + w.Header().Set("Content-Type", "application/json") 630 + w.WriteHeader(status) 631 + _ = json.NewEncoder(w).Encode(resp) 632 + } 633 + 634 + // shortDur formats a duration compactly: "3d4h12m", "5h2m", "47s", "120ms". 635 + func shortDur(d time.Duration) string { 636 + if d < 0 { 637 + return "-" + shortDur(-d) 638 + } 639 + if d < time.Second { 640 + return d.Truncate(time.Millisecond).String() 641 + } 642 + days := int(d / (24 * time.Hour)) 643 + d -= time.Duration(days) * 24 * time.Hour 644 + hours := int(d / time.Hour) 645 + d -= time.Duration(hours) * time.Hour 646 + minutes := int(d / time.Minute) 647 + d -= time.Duration(minutes) * time.Minute 648 + seconds := int(d / time.Second) 649 + switch { 650 + case days > 0: 651 + return fmt.Sprintf("%dd%dh%dm", days, hours, minutes) 652 + case hours > 0: 653 + return fmt.Sprintf("%dh%dm", hours, minutes) 654 + case minutes > 0: 655 + return fmt.Sprintf("%dm%ds", minutes, seconds) 656 + default: 657 + return fmt.Sprintf("%ds", seconds) 658 + } 659 + }
-37
package-lock.json
··· 1 - { 2 - "name": "harvester.blue", 3 - "lockfileVersion": 3, 4 - "requires": true, 5 - "packages": { 6 - "": { 7 - "name": "harvester.blue", 8 - "dependencies": { 9 - "ws": "^8.18.0" 10 - }, 11 - "engines": { 12 - "node": ">=18" 13 - } 14 - }, 15 - "node_modules/ws": { 16 - "version": "8.20.0", 17 - "resolved": "https://registry.npmjs.org/ws/-/ws-8.20.0.tgz", 18 - "integrity": "sha512-sAt8BhgNbzCtgGbt2OxmpuryO63ZoDk/sqaB/znQm94T4fCEsy/yV+7CdC1kJhOU9lboAEU7R3kquuycDoibVA==", 19 - "license": "MIT", 20 - "engines": { 21 - "node": ">=10.0.0" 22 - }, 23 - "peerDependencies": { 24 - "bufferutil": "^4.0.1", 25 - "utf-8-validate": ">=5.0.2" 26 - }, 27 - "peerDependenciesMeta": { 28 - "bufferutil": { 29 - "optional": true 30 - }, 31 - "utf-8-validate": { 32 - "optional": true 33 - } 34 - } 35 - } 36 - } 37 - }
-14
package.json
··· 1 - { 2 - "name": "harvester.blue", 3 - "private": true, 4 - "type": "module", 5 - "scripts": { 6 - "count-records-24h": "node scripts/count-records-24h.mjs" 7 - }, 8 - "engines": { 9 - "node": ">=18" 10 - }, 11 - "dependencies": { 12 - "ws": "^8.18.0" 13 - } 14 - }
-165
scripts/count-records-24h.deno.ts
··· 1 - /** 2 - * Deno port of `count-records-24h.mjs`: Jetstream last-24h replay with parse/type timing. 3 - * 4 - * Run: deno task count-records-24h-deno 5 - * Or: deno run --allow-net ./scripts/count-records-24h.deno.ts 6 - */ 7 - const encoder = new TextEncoder(); 8 - 9 - /** Monotonic time in nanoseconds (from `performance.now()`, sub-ms resolution where available). */ 10 - const nowNs = () => performance.now() * 1e6; 11 - 12 - const nowUs = Date.now() * 1000; 13 - const startUs = nowUs - 24 * 60 * 60 * 1_000_000; 14 - const endUs = nowUs; 15 - const windowUs = endUs - startUs; 16 - 17 - const wsUrl = `wss://jetstream2.us-east.bsky.network/subscribe?cursor=${startUs}`; 18 - 19 - type JetstreamEvent = { 20 - kind?: string; 21 - time_us?: number; 22 - commit?: { 23 - operation?: string; 24 - record?: unknown; 25 - }; 26 - }; 27 - 28 - let recordEvents = 0; 29 - let lastTimeUs = 0; 30 - 31 - const startedAt = Date.now(); 32 - let lastPrintedAt = 0; 33 - 34 - const types = new Set<string>(); 35 - 36 - let parseNsTotal = 0; 37 - let typeNsTotal = 0; 38 - let handlerNsTotal = 0; 39 - let parseCount = 0; 40 - let typeCount = 0; 41 - 42 - const avgNsToUs = (totalNs: number, count: number) => 43 - count > 0 ? ((totalNs / count) / 1000).toFixed(1) : "0.0"; 44 - 45 - const printProgress = async (final = false) => { 46 - const elapsedSec = Math.max(1, Math.floor((Date.now() - startedAt) / 1000)); 47 - const recordsPerSec = recordEvents / elapsedSec; 48 - const progressedUs = Math.max(0, lastTimeUs - startUs); 49 - const catchupPct = windowUs > 0 ? (progressedUs / windowUs) * 100 : 0; 50 - const eventHoursPerSec = progressedUs / 1_000_000 / 60 / 60 / elapsedSec; 51 - const lagHours = lastTimeUs ? ((endUs - lastTimeUs) / 1_000_000 / 60 / 60).toFixed(2) : "n/a"; 52 - const avgParseUs = avgNsToUs(parseNsTotal, parseCount); 53 - const avgTypeUs = avgNsToUs(typeNsTotal, typeCount); 54 - const avgHandlerUs = avgNsToUs(handlerNsTotal, recordEvents); 55 - const line = 56 - `records=${recordEvents} unique_types=${types.size} ` + 57 - `catchup=${catchupPct.toFixed(2)}% lag=${lagHours}h event_time_rate=${eventHoursPerSec.toFixed(3)}h/s ` + 58 - `records_rate=${recordsPerSec.toFixed(1)}/s avg_parse=${avgParseUs}us avg_type=${avgTypeUs}us avg_handler=${avgHandlerUs}us`; 59 - 60 - if (final) { 61 - await Deno.stdout.write(encoder.encode(`\n${line}\n`)); 62 - return; 63 - } 64 - 65 - await Deno.stdout.write(encoder.encode(`\r${line}`)); 66 - }; 67 - 68 - const done = async (reason: string) => { 69 - await printProgress(true); 70 - console.log(`done: ${reason}`); 71 - Deno.exit(0); 72 - }; 73 - 74 - async function messageDataToString(data: unknown): Promise<string> { 75 - if (typeof data === "string") return data; 76 - if (data instanceof Blob) return await data.text(); 77 - if (data instanceof ArrayBuffer) return new TextDecoder().decode(data); 78 - if (ArrayBuffer.isView(data)) { 79 - const view = data as ArrayBufferView; 80 - return new TextDecoder().decode( 81 - new Uint8Array(view.buffer, view.byteOffset, view.byteLength), 82 - ); 83 - } 84 - return String(data); 85 - } 86 - 87 - const get$types = (record: unknown) => { 88 - if (typeof record === "object" && record !== null) { 89 - if (Array.isArray(record)) { 90 - for (const item of record) { 91 - get$types(item); 92 - } 93 - } else if ("$type" in record && typeof (record as Record<string, unknown>)["$type"] === "string") { 94 - types.add((record as Record<string, string>)["$type"]); 95 - } 96 - } 97 - }; 98 - 99 - console.log(`connecting: ${wsUrl}`); 100 - console.log("counting records from the last 24h window..."); 101 - 102 - const ws = new WebSocket(wsUrl); 103 - 104 - ws.addEventListener("open", () => { 105 - console.log("connected"); 106 - }); 107 - 108 - ws.addEventListener("message", async (ev) => { 109 - const handlerStartNs = nowNs(); 110 - const raw = await messageDataToString(ev.data); 111 - 112 - let evt: JetstreamEvent; 113 - const parseStartNs = nowNs(); 114 - try { 115 - evt = JSON.parse(raw) as JetstreamEvent; 116 - } catch { 117 - return; 118 - } 119 - parseNsTotal += nowNs() - parseStartNs; 120 - parseCount++; 121 - 122 - if (evt.kind === "commit" && evt.commit) { 123 - if (evt.commit.record !== undefined && evt.commit.record !== null) { 124 - const typeStartNs = nowNs(); 125 - recordEvents++; 126 - get$types(evt.commit.record); 127 - typeNsTotal += nowNs() - typeStartNs; 128 - typeCount++; 129 - handlerNsTotal += nowNs() - handlerStartNs; 130 - } 131 - } 132 - 133 - if (typeof evt.time_us === "number") { 134 - lastTimeUs = evt.time_us; 135 - if (lastTimeUs >= endUs) { 136 - await done("caught up to last 24h window end"); 137 - return; 138 - } 139 - } 140 - 141 - if (Date.now() - lastPrintedAt >= 1000) { 142 - await printProgress(false); 143 - lastPrintedAt = Date.now(); 144 - } 145 - }); 146 - 147 - ws.addEventListener("error", async () => { 148 - await printProgress(true); 149 - console.error("websocket error"); 150 - Deno.exit(1); 151 - }); 152 - 153 - ws.addEventListener("close", async (ev) => { 154 - await printProgress(true); 155 - const reasonStr = ev.reason || "n/a"; 156 - console.log(`socket closed: code=${ev.code} reason=${reasonStr}`); 157 - if (lastTimeUs < endUs) { 158 - Deno.exit(1); 159 - } 160 - Deno.exit(0); 161 - }); 162 - 163 - Deno.addSignalListener("SIGINT", () => { 164 - void done("interrupted"); 165 - });
-144
scripts/count-records-24h.mjs
··· 1 - import WebSocket from "ws"; 2 - 3 - const nowNs = () => process.hrtime.bigint(); 4 - 5 - const nowUs = Date.now() * 1000; 6 - const startUs = nowUs - 24 * 60 * 60 * 1_000_000; 7 - const endUs = nowUs; 8 - const windowUs = endUs - startUs; 9 - 10 - const wsUrl = `wss://jetstream2.us-east.bsky.network/subscribe?cursor=${startUs}`; 11 - 12 - let recordEvents = 0; 13 - let lastTimeUs = 0; 14 - 15 - const startedAt = Date.now(); 16 - let lastPrintedAt = 0; 17 - 18 - const types = new Set(); 19 - 20 - let parseNsTotal = 0n; 21 - let typeNsTotal = 0n; 22 - let handlerNsTotal = 0n; 23 - let parseCount = 0; 24 - let typeCount = 0; 25 - 26 - /** Average nanoseconds total → microseconds per sample */ 27 - const avgNsToUs = (totalNs, count) => 28 - count > 0 ? ((Number(totalNs) / count) / 1000).toFixed(1) : "0.0"; 29 - 30 - const printProgress = (final = false) => { 31 - const elapsedSec = Math.max(1, Math.floor((Date.now() - startedAt) / 1000)); 32 - const recordsPerSec = recordEvents / elapsedSec; 33 - const progressedUs = Math.max(0, lastTimeUs - startUs); 34 - const catchupPct = windowUs > 0 ? (progressedUs / windowUs) * 100 : 0; 35 - const eventHoursPerSec = progressedUs / 1_000_000 / 60 / 60 / elapsedSec; 36 - const lagHours = lastTimeUs ? ((endUs - lastTimeUs) / 1_000_000 / 60 / 60).toFixed(2) : "n/a"; 37 - const avgParseUs = avgNsToUs(parseNsTotal, parseCount); 38 - const avgTypeUs = avgNsToUs(typeNsTotal, typeCount); 39 - const avgHandlerUs = avgNsToUs(handlerNsTotal, recordEvents); 40 - const line = 41 - `records=${recordEvents} unique_types=${types.size} ` + 42 - `catchup=${catchupPct.toFixed(2)}% lag=${lagHours}h event_time_rate=${eventHoursPerSec.toFixed(3)}h/s ` + 43 - `records_rate=${recordsPerSec.toFixed(1)}/s avg_parse=${avgParseUs}us avg_type=${avgTypeUs}us avg_handler=${avgHandlerUs}us`; 44 - 45 - if (final) { 46 - process.stdout.write(`\n${line}\n`); 47 - return; 48 - } 49 - 50 - process.stdout.write(`\r${line}`); 51 - }; 52 - 53 - const done = (reason) => { 54 - printProgress(true); 55 - console.log(`done: ${reason}`); 56 - process.exit(0); 57 - }; 58 - 59 - console.log(`connecting: ${wsUrl}`); 60 - console.log("counting records from the last 24h window..."); 61 - 62 - const get$types = (record) => { 63 - if (typeof record === "object" && record !== null) { 64 - if (Array.isArray(record)) { 65 - for (const item of record) { 66 - get$types(item); 67 - } 68 - } else if ("$type" in record && typeof record["$type"] === "string") { 69 - types.add(record["$type"]); 70 - } 71 - } 72 - }; 73 - 74 - const messageDataToString = (data) => { 75 - if (typeof data === "string") return data; 76 - if (Buffer.isBuffer(data)) return data.toString("utf8"); 77 - if (data instanceof ArrayBuffer) return Buffer.from(data).toString("utf8"); 78 - return String(data); 79 - }; 80 - 81 - const ws = new WebSocket(wsUrl); 82 - 83 - ws.on("open", () => { 84 - console.log("connected"); 85 - }); 86 - 87 - ws.on("message", (data) => { 88 - const handlerStartNs = nowNs(); 89 - const raw = messageDataToString(data); 90 - 91 - let evt; 92 - const parseStartNs = nowNs(); 93 - try { 94 - evt = JSON.parse(raw); 95 - } catch { 96 - return; 97 - } 98 - parseNsTotal += nowNs() - parseStartNs; 99 - parseCount++; 100 - 101 - if (evt.kind === "commit" && evt.commit) { 102 - if (evt.commit.record !== undefined && evt.commit.record !== null) { 103 - const typeStartNs = nowNs(); 104 - recordEvents++; 105 - get$types(evt.commit.record); 106 - typeNsTotal += nowNs() - typeStartNs; 107 - typeCount++; 108 - handlerNsTotal += nowNs() - handlerStartNs; 109 - } 110 - } 111 - 112 - if (typeof evt.time_us === "number") { 113 - lastTimeUs = evt.time_us; 114 - if (lastTimeUs >= endUs) { 115 - done("caught up to last 24h window end"); 116 - return; 117 - } 118 - } 119 - 120 - if (Date.now() - lastPrintedAt >= 1000) { 121 - printProgress(false); 122 - lastPrintedAt = Date.now(); 123 - } 124 - }); 125 - 126 - ws.on("error", () => { 127 - printProgress(true); 128 - console.error("websocket error"); 129 - process.exit(1); 130 - }); 131 - 132 - ws.on("close", (code, reason) => { 133 - printProgress(true); 134 - const reasonStr = reason && reason.length ? reason.toString("utf8") : "n/a"; 135 - console.log(`socket closed: code=${code} reason=${reasonStr}`); 136 - if (lastTimeUs < endUs) { 137 - process.exit(1); 138 - } 139 - process.exit(0); 140 - }); 141 - 142 - process.on("SIGINT", () => { 143 - done("interrupted"); 144 - });