···11+# SQLite database files (user data, never commit)
22+/firehose.db
33+/firehose.db-shm
44+/firehose.db-wal
55+/firehose.db.bak.*
66+77+# Local backup artifacts from scripts/migrate.sh
88+/*.bak.*
99+1010+# One-offs
1111+/scripts/migrate.sh
1212+/NOTES.md
1313+1414+# Compiled binary
1515+/bin/
1616+1717+# ent codegen output: keep only the hand-written schemas and generate.go;
1818+# everything else under /ent is regenerated by `go generate ./ent`.
1919+/ent/*
2020+!/ent/schema/
2121+!/ent/generate.go
+19
.tangled/workflows/ci.yml
···11+when:
22+ - event: ["push", "pull_request"]
33+ branch: main
44+55+engine: nixery
66+77+dependencies:
88+ nixpkgs:
99+ - go
1010+1111+steps:
1212+ - name: regenerate ent client
1313+ command: make generate
1414+1515+ - name: vet
1616+ command: make vet
1717+1818+ - name: build
1919+ command: make build
+22
Makefile
···11+DB ?= firehose.db
22+LISTEN ?= :8080
33+44+.PHONY: run-dev build generate vet tidy clean
55+66+run-dev: generate
77+ go run ./cmd/firehose --db $(DB) --listen $(LISTEN) --debug
88+99+build: generate
1010+ go build -o bin/firehose ./cmd/firehose
1111+1212+generate:
1313+ go generate ./ent/...
1414+1515+vet:
1616+ go vet ./...
1717+1818+tidy:
1919+ go mod tidy
2020+2121+clean:
2222+ rm -rf bin $(DB) $(DB)-shm $(DB)-wal
+30-1
README.md
···11# tangled-repo-firehose
2233-A little experiment with jetstream to visualize repositories and stargazers on [tangled.org](https://tangled.org).33+A little experiment with jetstream to visualize repositories and stargazers on [tangled.org](https://tangled.org).
44+55+## What is it?
66+77+I was trying to find popular repositories on tangled.org, but so far, no such method or lexicon exists. Which led me down to, how do you figure out what people are doing right now.
88+99+And here we go.
1010+1111+If you want to see a demo, check here: [https://bsky.app/profile/chown.de/post/3mkvet2xfis2q](https://bsky.app/profile/chown.de/post/3mkvet2xfis2q).
1212+1313+### How does it work?
1414+1515+> **Caveat**
1616+> There's (currently) no atproto operation to enumerate all users or repositories, so coverage grows from what the firehose shows live plus various walks across DIDs. Star counts will not match `tangled.org`.
1717+1818+The app subscribes to Bluesky's jetstream and extracts `sh.tangled.*`. From there, background workers will resolve DIDs to handles, fetch language stats from knots, discover unseen repos referenced by stars, and backfill historical stars per known DID.
1919+2020+Persistence is SQLite via [entgo.io](https://entgo.io).
2121+2222+### Run it
2323+2424+```bash
2525+make run-dev # localhost:8080
2626+```
2727+2828+Inspect:
2929+3030+ - `/`: the repositories (alternative: `/repos.json`)
3131+ - `/knots`: known knots (and some status)
3232+ - `/handles`: resolved handles
···11+package firehose
22+33+import (
44+ "context"
55+ "encoding/json"
66+ "errors"
77+ "fmt"
88+ "log/slog"
99+ "net/http"
1010+ "time"
1111+1212+ "github.com/bluesky-social/indigo/atproto/identity"
1313+ "github.com/bluesky-social/indigo/atproto/syntax"
1414+ "github.com/bluesky-social/indigo/xrpc"
1515+1616+ "tangled.sh/chown.de/tangled-repo-firehose/store"
1717+)
1818+1919+// recordNotFoundRetry is how long we trust the discovery_failed entry before
2020+// re-attempting — gives a deleted record a chance to be re-created.
2121+const recordNotFoundRetry = 7 * 24 * time.Hour
2222+2323+// DiscoveryWorker fetches sh.tangled.repo records we don't have in the index
2424+// — typically because a star event referenced a repo we've never seen. It
2525+// resolves the owner's PDS via the identity directory and queries
2626+// com.atproto.repo.getRecord, then chains into handles, languages, and
2727+// stars-backfill enrichers.
2828+type DiscoveryWorker struct {
2929+ *Worker[string]
3030+ store *store.Store
3131+ dir identity.Directory
3232+ httpClient *http.Client
3333+ langs LanguagesEnqueuer
3434+ handles HandlesEnqueuer
3535+ starsBackfill StarsBackfillEnqueuer
3636+}
3737+3838+func NewDiscoveryWorker(s *store.Store, dir identity.Directory, logger *slog.Logger, langs LanguagesEnqueuer, handles HandlesEnqueuer, queueSize int) *DiscoveryWorker {
3939+ w := &DiscoveryWorker{
4040+ store: s,
4141+ dir: dir,
4242+ httpClient: &http.Client{Timeout: 20 * time.Second},
4343+ langs: langs,
4444+ handles: handles,
4545+ }
4646+ w.Worker = NewWorker(WorkerConfig[string]{
4747+ Name: "discovery",
4848+ Logger: logger,
4949+ Process: w.process,
5050+ QueueSize: queueSize,
5151+ SeenTTL: 6 * time.Hour,
5252+ MinDelay: 250 * time.Millisecond,
5353+ })
5454+ return w
5555+}
5656+5757+// SetStarsBackfill wires the back-edge so newly-discovered repo owners get
5858+// their historical stars pulled immediately. Late-bound to break the
5959+// construction cycle.
6060+func (w *DiscoveryWorker) SetStarsBackfill(sb StarsBackfillEnqueuer) {
6161+ w.starsBackfill = sb
6262+}
6363+6464+// EnqueueAtURI accepts a repo at-uri (typically the subject of a star).
6565+// Non-blocking: drops on full queue. Use from live firehose handlers.
6666+func (w *DiscoveryWorker) EnqueueAtURI(uri string) {
6767+ if !validRepoAtURI(uri) {
6868+ return
6969+ }
7070+ w.Worker.Enqueue(uri)
7171+}
7272+7373+// EnqueueAtURIWait blocks until accepted or ctx is cancelled. Use from chained
7474+// producers (e.g. stars-backfill bursting hundreds of subjects).
7575+func (w *DiscoveryWorker) EnqueueAtURIWait(ctx context.Context, uri string) {
7676+ if !validRepoAtURI(uri) {
7777+ return
7878+ }
7979+ w.Worker.EnqueueWait(ctx, uri)
8080+}
8181+8282+func validRepoAtURI(uri string) bool {
8383+ a, err := syntax.ParseATURI(uri)
8484+ return err == nil && a.Collection().String() == "sh.tangled.repo"
8585+}
8686+8787+func (w *DiscoveryWorker) process(ctx context.Context, atURI string) error {
8888+ if has, err := w.store.HasRepo(ctx, atURI); err != nil {
8989+ return err
9090+ } else if has {
9191+ return nil
9292+ }
9393+ if failed, err := w.store.DiscoveryFailedRecently(ctx, atURI, recordNotFoundRetry); err != nil {
9494+ return err
9595+ } else if failed {
9696+ return nil
9797+ }
9898+9999+ a, err := syntax.ParseATURI(atURI)
100100+ if err != nil {
101101+ return err
102102+ }
103103+ did, err := a.Authority().AsDID()
104104+ if err != nil {
105105+ return fmt.Errorf("authority not a DID: %w", err)
106106+ }
107107+ rkey := a.RecordKey().String()
108108+109109+ ident, err := w.dir.LookupDID(ctx, did)
110110+ if err != nil {
111111+ return fmt.Errorf("identity lookup: %w", err)
112112+ }
113113+ pds := ident.PDSEndpoint()
114114+ if pds == "" {
115115+ return fmt.Errorf("no PDS endpoint for %s", did)
116116+ }
117117+118118+ rec, err := w.fetchRepoRecord(ctx, pds, did.String(), rkey)
119119+ if err != nil {
120120+ if isRecordNotFound(err) {
121121+ // Deleted upstream — persist so we don't re-attempt for a week.
122122+ return w.store.MarkDiscoveryFailed(ctx, atURI, err.Error())
123123+ }
124124+ return err
125125+ }
126126+127127+ createdAt, err := time.Parse(time.RFC3339Nano, rec.CreatedAt)
128128+ if err != nil {
129129+ createdAt = time.Now()
130130+ }
131131+ r := store.Repo{
132132+ AtURI: atURI,
133133+ DID: did.String(),
134134+ Rkey: rkey,
135135+ Name: rec.Name,
136136+ Knot: rec.Knot,
137137+ Description: rec.Description,
138138+ Topics: rec.Topics,
139139+ Website: rec.Website,
140140+ Source: rec.Source,
141141+ Spindle: rec.Spindle,
142142+ RepoDID: rec.RepoDID,
143143+ CreatedAt: createdAt,
144144+ SeenAt: time.Now(),
145145+ }
146146+ if err := w.store.UpsertRepo(ctx, r); err != nil {
147147+ return err
148148+ }
149149+ if err := w.store.UpsertKnot(ctx, rec.Knot); err != nil {
150150+ w.logger.Warn("upsert knot", "err", err, "knot", rec.Knot)
151151+ }
152152+ w.logger.Info("discovered", "did", did, "name", rec.Name, "knot", rec.Knot)
153153+154154+ if w.langs != nil && rec.Knot != "" {
155155+ w.langs.Enqueue(atURI, rec.Knot, did.String(), rec.Name)
156156+ }
157157+ if w.handles != nil {
158158+ w.handles.Enqueue(did.String())
159159+ }
160160+ if w.starsBackfill != nil {
161161+ w.starsBackfill.Enqueue(did.String())
162162+ }
163163+ return nil
164164+}
165165+166166+// isRecordNotFound returns true for XRPC responses that indicate the record
167167+// has been deleted upstream. Bluesky PDSes return either 404 or 400 depending
168168+// on the implementation — both with the "RecordNotFound" error name.
169169+func isRecordNotFound(err error) bool {
170170+ var xe *xrpc.Error
171171+ if !errors.As(err, &xe) {
172172+ return false
173173+ }
174174+ if xe.StatusCode == http.StatusNotFound {
175175+ return true
176176+ }
177177+ if xe.StatusCode == http.StatusBadRequest {
178178+ var xrpcErr *xrpc.XRPCError
179179+ if errors.As(err, &xrpcErr) && xrpcErr.ErrStr == "RecordNotFound" {
180180+ return true
181181+ }
182182+ }
183183+ return false
184184+}
185185+186186+func (w *DiscoveryWorker) fetchRepoRecord(ctx context.Context, pds, did, rkey string) (*repoRecord, error) {
187187+ xc := &xrpc.Client{Client: w.httpClient, Host: pds}
188188+ var out struct {
189189+ URI string `json:"uri"`
190190+ CID string `json:"cid"`
191191+ Value json.RawMessage `json:"value"`
192192+ }
193193+ err := xc.Do(ctx, xrpc.Query, "", "com.atproto.repo.getRecord", map[string]any{
194194+ "repo": did,
195195+ "collection": "sh.tangled.repo",
196196+ "rkey": rkey,
197197+ }, nil, &out)
198198+ if err != nil {
199199+ return nil, err
200200+ }
201201+ var rec repoRecord
202202+ if err := json.Unmarshal(out.Value, &rec); err != nil {
203203+ return nil, err
204204+ }
205205+ if rec.Name == "" || rec.Knot == "" {
206206+ return nil, fmt.Errorf("record missing required fields")
207207+ }
208208+ return &rec, nil
209209+}
+167
firehose/handler.go
···11+package firehose
22+33+import (
44+ "context"
55+ "encoding/json"
66+ "fmt"
77+ "log/slog"
88+ "time"
99+1010+ "github.com/bluesky-social/jetstream/pkg/models"
1111+1212+ "tangled.sh/chown.de/tangled-repo-firehose/store"
1313+)
1414+1515+// repoRecord is the on-wire shape of an sh.tangled.repo record.
1616+type repoRecord struct {
1717+ Name string `json:"name"`
1818+ Knot string `json:"knot"`
1919+ Description string `json:"description,omitempty"`
2020+ Website string `json:"website,omitempty"`
2121+ Source string `json:"source,omitempty"`
2222+ Spindle string `json:"spindle,omitempty"`
2323+ Topics []string `json:"topics,omitempty"`
2424+ RepoDID string `json:"repoDid,omitempty"`
2525+ CreatedAt string `json:"createdAt"`
2626+}
2727+2828+// starRecord is the on-wire shape of an sh.tangled.feed.star record.
2929+type starRecord struct {
3030+ Subject string `json:"subject"`
3131+ CreatedAt string `json:"createdAt"`
3232+}
3333+3434+// LanguagesEnqueuer accepts (atURI, knot, did, name) for async language
3535+// enrichment. *LanguagesWorker satisfies this; pass nil to disable enrichment.
3636+type LanguagesEnqueuer interface {
3737+ Enqueue(atURI, knot, did, name string)
3838+}
3939+4040+// HandlesEnqueuer accepts a DID for async handle resolution. *HandlesWorker
4141+// satisfies this; pass nil to disable.
4242+type HandlesEnqueuer interface {
4343+ Enqueue(did string)
4444+}
4545+4646+// DiscoveryEnqueuer accepts a repo at-uri for async discovery (PDS fetch).
4747+// *DiscoveryWorker satisfies this; pass nil to disable. Live firehose handlers
4848+// use EnqueueAtURI (non-blocking, drops on full); chained producers use
4949+// EnqueueAtURIWait (blocks for backpressure).
5050+type DiscoveryEnqueuer interface {
5151+ EnqueueAtURI(uri string)
5252+ EnqueueAtURIWait(ctx context.Context, uri string)
5353+}
5454+5555+// StarsBackfillEnqueuer accepts a DID whose historical stars should be
5656+// pulled. *StarsBackfillWorker satisfies this; pass nil to disable.
5757+type StarsBackfillEnqueuer interface {
5858+ Enqueue(did string)
5959+}
6060+6161+// Handler routes commit events to per-collection handlers. Non-commit events
6262+// and unknown collections still advance the cursor so they aren't replayed on
6363+// restart.
6464+func Handler(s *store.Store, langs LanguagesEnqueuer, handles HandlesEnqueuer, discovery DiscoveryEnqueuer, starsBackfill StarsBackfillEnqueuer, logger *slog.Logger) HandlerFunc {
6565+ return func(ctx context.Context, ev *models.Event) error {
6666+ if ev.Kind != "commit" || ev.Commit == nil {
6767+ return s.AdvanceCursor(ctx, ev.TimeUS)
6868+ }
6969+ switch ev.Commit.Collection {
7070+ case "sh.tangled.repo":
7171+ return handleRepo(ctx, s, langs, handles, starsBackfill, logger, ev)
7272+ case "sh.tangled.feed.star":
7373+ return handleStar(ctx, s, discovery, starsBackfill, logger, ev)
7474+ default:
7575+ return s.AdvanceCursor(ctx, ev.TimeUS)
7676+ }
7777+ }
7878+}
7979+8080+func handleRepo(ctx context.Context, s *store.Store, langs LanguagesEnqueuer, handles HandlesEnqueuer, starsBackfill StarsBackfillEnqueuer, logger *slog.Logger, ev *models.Event) error {
8181+ atURI := fmt.Sprintf("at://%s/%s/%s", ev.Did, ev.Commit.Collection, ev.Commit.RKey)
8282+ switch ev.Commit.Operation {
8383+ case "create", "update":
8484+ var rec repoRecord
8585+ if err := json.Unmarshal(ev.Commit.Record, &rec); err != nil {
8686+ return fmt.Errorf("decode repo record: %w", err)
8787+ }
8888+ createdAt, err := time.Parse(time.RFC3339Nano, rec.CreatedAt)
8989+ if err != nil {
9090+ createdAt = time.UnixMicro(ev.TimeUS)
9191+ }
9292+ r := store.Repo{
9393+ AtURI: atURI,
9494+ DID: ev.Did,
9595+ Rkey: ev.Commit.RKey, //nolint:misspell
9696+ Name: rec.Name,
9797+ Knot: rec.Knot,
9898+ Description: rec.Description,
9999+ Topics: rec.Topics,
100100+ Website: rec.Website,
101101+ Source: rec.Source,
102102+ Spindle: rec.Spindle,
103103+ RepoDID: rec.RepoDID,
104104+ CreatedAt: createdAt,
105105+ SeenAt: time.Now(),
106106+ }
107107+ if err := s.ApplyRepoUpsert(ctx, r, ev.TimeUS); err != nil {
108108+ return err
109109+ }
110110+ if err := s.UpsertKnot(ctx, rec.Knot); err != nil {
111111+ logger.Warn("upsert knot", "err", err, "knot", rec.Knot)
112112+ }
113113+ logger.Info("repo "+ev.Commit.Operation, "did", ev.Did, "name", rec.Name, "knot", rec.Knot)
114114+ if langs != nil && rec.Knot != "" {
115115+ langs.Enqueue(atURI, rec.Knot, ev.Did, rec.Name)
116116+ }
117117+ if handles != nil {
118118+ handles.Enqueue(ev.Did)
119119+ }
120120+ if starsBackfill != nil {
121121+ starsBackfill.Enqueue(ev.Did)
122122+ }
123123+ return nil
124124+ case "delete":
125125+ logger.Info("repo delete", "at_uri", atURI)
126126+ return s.ApplyRepoDelete(ctx, atURI, ev.TimeUS)
127127+ default:
128128+ return s.AdvanceCursor(ctx, ev.TimeUS)
129129+ }
130130+}
131131+132132+func handleStar(ctx context.Context, s *store.Store, discovery DiscoveryEnqueuer, starsBackfill StarsBackfillEnqueuer, logger *slog.Logger, ev *models.Event) error {
133133+ atURI := fmt.Sprintf("at://%s/%s/%s", ev.Did, ev.Commit.Collection, ev.Commit.RKey)
134134+ switch ev.Commit.Operation {
135135+ case "create", "update":
136136+ var rec starRecord
137137+ if err := json.Unmarshal(ev.Commit.Record, &rec); err != nil {
138138+ return fmt.Errorf("decode star record: %w", err)
139139+ }
140140+ if rec.Subject == "" {
141141+ return s.AdvanceCursor(ctx, ev.TimeUS)
142142+ }
143143+ createdAt, err := time.Parse(time.RFC3339Nano, rec.CreatedAt)
144144+ if err != nil {
145145+ createdAt = time.UnixMicro(ev.TimeUS)
146146+ }
147147+ if err := s.ApplyStarCreate(ctx, atURI, rec.Subject, createdAt, ev.TimeUS); err != nil {
148148+ return err
149149+ }
150150+ logger.Info("star", "by", ev.Did, "subject", rec.Subject)
151151+ // If the starred repo isn't in our index yet, queue discovery. The
152152+ // worker checks HasRepo before fetching, so duplicates are cheap.
153153+ if discovery != nil {
154154+ discovery.EnqueueAtURI(rec.Subject)
155155+ }
156156+ // First time we see this user — also pull their historical stars.
157157+ if starsBackfill != nil {
158158+ starsBackfill.Enqueue(ev.Did)
159159+ }
160160+ return nil
161161+ case "delete":
162162+ logger.Info("unstar", "at_uri", atURI)
163163+ return s.ApplyStarDelete(ctx, atURI, ev.TimeUS)
164164+ default:
165165+ return s.AdvanceCursor(ctx, ev.TimeUS)
166166+ }
167167+}
+54
firehose/handles.go
···11+package firehose
22+33+import (
44+ "context"
55+ "fmt"
66+ "log/slog"
77+ "time"
88+99+ "github.com/bluesky-social/indigo/atproto/identity"
1010+ "github.com/bluesky-social/indigo/atproto/syntax"
1111+1212+ "tangled.sh/chown.de/tangled-repo-firehose/store"
1313+)
1414+1515+// HandlesWorker resolves DID → handle via indigo's identity.Directory.
1616+// Bidirectional verification is intentionally bypassed by reading the
1717+// declared handle from the DID document directly; an aggregator just needs a
1818+// human-friendly label, and the directory still gives us caching.
1919+type HandlesWorker struct {
2020+ *Worker[string]
2121+ store *store.Store
2222+ dir identity.Directory
2323+}
2424+2525+func NewHandlesWorker(s *store.Store, dir identity.Directory, logger *slog.Logger, queueSize int) *HandlesWorker {
2626+ w := &HandlesWorker{store: s, dir: dir}
2727+ w.Worker = NewWorker(WorkerConfig[string]{
2828+ Name: "handles",
2929+ Logger: logger,
3030+ Process: w.process,
3131+ QueueSize: queueSize,
3232+ SeenTTL: 6 * time.Hour,
3333+ MinDelay: 250 * time.Millisecond,
3434+ })
3535+ return w
3636+}
3737+3838+func (w *HandlesWorker) Enqueue(did string) { w.Worker.Enqueue(did) }
3939+4040+func (w *HandlesWorker) process(ctx context.Context, did string) error {
4141+ d, err := syntax.ParseDID(did)
4242+ if err != nil {
4343+ return err
4444+ }
4545+ ident, err := w.dir.LookupDID(ctx, d)
4646+ if err != nil {
4747+ return err
4848+ }
4949+ handle, err := ident.DeclaredHandle()
5050+ if err != nil {
5151+ return fmt.Errorf("no declared handle: %w", err)
5252+ }
5353+ return w.store.UpsertHandle(ctx, did, handle.String())
5454+}
+100
firehose/jetstream.go
···11+// Package firehose drives a Jetstream WebSocket subscription via the official
22+// bluesky-social/jetstream client and dispatches events to a handler.
33+package firehose
44+55+import (
66+ "context"
77+ "fmt"
88+ "log/slog"
99+ "math/rand/v2"
1010+ "sync/atomic"
1111+ "time"
1212+1313+ "github.com/bluesky-social/jetstream/pkg/client"
1414+ "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
1515+ "github.com/bluesky-social/jetstream/pkg/models"
1616+)
1717+1818+// HandlerFunc applies a Jetstream event. Errors are logged but don't tear down
1919+// the stream.
2020+type HandlerFunc func(ctx context.Context, ev *models.Event) error
2121+2222+type Consumer struct {
2323+ Endpoint string
2424+ WantedCollections []string
2525+ Handler HandlerFunc
2626+ Logger *slog.Logger
2727+ MaxBackoff time.Duration // default 60s
2828+}
2929+3030+// Run runs the consumer until ctx is canceled. Reconnects on disconnect with
3131+// jittered exponential backoff. The official client handles the WS dance —
3232+// this wrapper only adds reconnect, cursor tracking, and a heartbeat log.
3333+func (c *Consumer) Run(ctx context.Context, startCursor int64) error {
3434+ if c.MaxBackoff == 0 {
3535+ c.MaxBackoff = 60 * time.Second
3636+ }
3737+3838+ var cursor atomic.Int64
3939+ cursor.Store(startCursor)
4040+4141+ cfg := client.DefaultClientConfig()
4242+ cfg.WebsocketURL = c.Endpoint
4343+ cfg.WantedCollections = c.WantedCollections
4444+4545+ sched := sequential.NewScheduler("firehose", c.Logger, func(ctx context.Context, ev *models.Event) error {
4646+ if ev.TimeUS > cursor.Load() {
4747+ cursor.Store(ev.TimeUS)
4848+ }
4949+ return c.Handler(ctx, ev)
5050+ })
5151+5252+ cli, err := client.NewClient(cfg, c.Logger, sched)
5353+ if err != nil {
5454+ return fmt.Errorf("create jetstream client: %w", err)
5555+ }
5656+5757+ heartbeatCtx, cancelHeartbeat := context.WithCancel(ctx)
5858+ defer cancelHeartbeat()
5959+ go func() {
6060+ t := time.NewTicker(30 * time.Second)
6161+ defer t.Stop()
6262+ for {
6363+ select {
6464+ case <-heartbeatCtx.Done():
6565+ return
6666+ case <-t.C:
6767+ c.Logger.Info("heartbeat",
6868+ "events", cli.EventsRead.Load(),
6969+ "bytes", cli.BytesRead.Load(),
7070+ "cursor", cursor.Load())
7171+ }
7272+ }
7373+ }()
7474+7575+ backoff := time.Second
7676+ for {
7777+ if err := ctx.Err(); err != nil {
7878+ return err
7979+ }
8080+ var resume *int64
8181+ if v := cursor.Load(); v > 0 {
8282+ resume = &v
8383+ }
8484+ err := cli.ConnectAndRead(ctx, resume)
8585+ if ctx.Err() != nil {
8686+ return ctx.Err()
8787+ }
8888+ c.Logger.Warn("disconnected", "err", err, "backoff", backoff)
8989+ jitter := time.Duration(rand.Int64N(int64(backoff/2 + 1)))
9090+ select {
9191+ case <-ctx.Done():
9292+ return ctx.Err()
9393+ case <-time.After(backoff + jitter):
9494+ }
9595+ backoff *= 2
9696+ if backoff > c.MaxBackoff {
9797+ backoff = c.MaxBackoff
9898+ }
9999+ }
100100+}
+146
firehose/languages.go
···11+package firehose
22+33+import (
44+ "context"
55+ "encoding/json"
66+ "errors"
77+ "fmt"
88+ "log/slog"
99+ "net"
1010+ "net/http"
1111+ "net/url"
1212+ "time"
1313+1414+ "tangled.sh/chown.de/tangled-repo-firehose/store"
1515+)
1616+1717+type langsJob struct {
1818+ atURI string
1919+ knot string
2020+ did string
2121+ name string
2222+}
2323+2424+// LanguagesWorker calls sh.tangled.repo.languages on the repo's knot and
2525+// writes the result to the languages column.
2626+type LanguagesWorker struct {
2727+ *Worker[langsJob]
2828+ store *store.Store
2929+ client *http.Client
3030+}
3131+3232+func NewLanguagesWorker(s *store.Store, logger *slog.Logger, queueSize int) *LanguagesWorker {
3333+ w := &LanguagesWorker{
3434+ store: s,
3535+ client: &http.Client{Timeout: 30 * time.Second},
3636+ }
3737+ w.Worker = NewWorker(WorkerConfig[langsJob]{
3838+ Name: "languages",
3939+ Logger: logger,
4040+ Process: w.process,
4141+ QueueSize: queueSize,
4242+ SeenTTL: 6 * time.Hour,
4343+ MinDelay: 500 * time.Millisecond,
4444+ })
4545+ return w
4646+}
4747+4848+// Enqueue is the public face of the worker; it builds the internal job struct
4949+// and pushes it through the generic worker.
5050+func (w *LanguagesWorker) Enqueue(atURI, knot, did, name string) {
5151+ w.Worker.Enqueue(langsJob{atURI: atURI, knot: knot, did: did, name: name})
5252+}
5353+5454+// Circuit breaker: after this many consecutive errors, skip the knot for the
5555+// cooldown window before trying again.
5656+const (
5757+ knotCircuitMinErrors = 3
5858+ knotCircuitCooldown = time.Hour
5959+)
6060+6161+func (w *LanguagesWorker) process(ctx context.Context, j langsJob) error {
6262+ // Some repos register knots on localhost or private IPs (devs testing).
6363+ // Those are permanently unreachable from us — stamp empty languages.
6464+ if isUnroutable(j.knot) {
6565+ return w.store.UpdateLanguages(ctx, j.atURI, map[string]int64{})
6666+ }
6767+ // Skip if the knot has been stalling/failing recently. The circuit naturally
6868+ // half-opens after the cooldown — next call either succeeds (counter
6969+ // resets) or re-extends the open window.
7070+ if open, err := w.store.KnotCircuitOpen(ctx, j.knot, knotCircuitMinErrors, knotCircuitCooldown); err != nil {
7171+ return err
7272+ } else if open {
7373+ return nil
7474+ }
7575+ u := url.URL{
7676+ Scheme: "https",
7777+ Host: j.knot,
7878+ Path: "/xrpc/sh.tangled.repo.languages",
7979+ }
8080+ q := u.Query()
8181+ q.Set("repo", j.did+"/"+j.name)
8282+ u.RawQuery = q.Encode()
8383+8484+ req, err := http.NewRequestWithContext(ctx, "GET", u.String(), nil)
8585+ if err != nil {
8686+ return err
8787+ }
8888+ resp, err := w.client.Do(req)
8989+ if err != nil {
9090+ // DNS-not-found means the knot host has vanished — treat as permanent
9191+ // (write empty so the row's language_at gets stamped and we skip it
9292+ // for the 7-day re-check window).
9393+ var dnsErr *net.DNSError
9494+ if errors.As(err, &dnsErr) && dnsErr.IsNotFound {
9595+ _ = w.store.MarkKnotError(ctx, j.knot, err.Error())
9696+ return w.store.UpdateLanguages(ctx, j.atURI, map[string]int64{})
9797+ }
9898+ _ = w.store.MarkKnotError(ctx, j.knot, err.Error())
9999+ return err
100100+ }
101101+ defer resp.Body.Close()
102102+103103+ if resp.StatusCode >= 500 {
104104+ _ = w.store.MarkKnotError(ctx, j.knot, fmt.Sprintf("status %d", resp.StatusCode))
105105+ return fmt.Errorf("status %d", resp.StatusCode)
106106+ }
107107+ if resp.StatusCode != http.StatusOK {
108108+ // 4xx — knot is reachable, just doesn't have this repo. Mark OK so
109109+ // the circuit breaker doesn't penalize the host for missing data.
110110+ _ = w.store.MarkKnotOK(ctx, j.knot)
111111+ return w.store.UpdateLanguages(ctx, j.atURI, map[string]int64{})
112112+ }
113113+114114+ var body struct {
115115+ Ref string `json:"ref"`
116116+ Languages []struct {
117117+ Name string `json:"name"`
118118+ Size int64 `json:"size"`
119119+ } `json:"languages"`
120120+ }
121121+ if err := json.NewDecoder(resp.Body).Decode(&body); err != nil {
122122+ return err
123123+ }
124124+ out := make(map[string]int64, len(body.Languages))
125125+ for _, l := range body.Languages {
126126+ out[l.Name] = l.Size
127127+ }
128128+ _ = w.store.MarkKnotOK(ctx, j.knot)
129129+ return w.store.UpdateLanguages(ctx, j.atURI, out)
130130+}
131131+132132+// isUnroutable returns true for hosts we can't reach from a remote process:
133133+// localhost, loopback IPs, RFC1918 private ranges, and unspecified addresses.
134134+func isUnroutable(hostport string) bool {
135135+ host, _, err := net.SplitHostPort(hostport)
136136+ if err != nil {
137137+ host = hostport
138138+ }
139139+ if host == "localhost" {
140140+ return true
141141+ }
142142+ if ip := net.ParseIP(host); ip != nil {
143143+ return ip.IsLoopback() || ip.IsPrivate() || ip.IsUnspecified()
144144+ }
145145+ return false
146146+}
+126
firehose/starsbackfill.go
···11+package firehose
22+33+import (
44+ "context"
55+ "fmt"
66+ "log/slog"
77+ "net/http"
88+ "time"
99+1010+ "github.com/bluesky-social/indigo/atproto/identity"
1111+ "github.com/bluesky-social/indigo/atproto/syntax"
1212+ "github.com/bluesky-social/indigo/xrpc"
1313+1414+ "tangled.sh/chown.de/tangled-repo-firehose/store"
1515+)
1616+1717+// StarsBackfillWorker pulls historical sh.tangled.feed.star records from a
1818+// user's PDS via com.atproto.repo.listRecords. Handles pagination. Chains into
1919+// DiscoveryWorker so any backfilled star pointing at an unknown repo triggers
2020+// a repo fetch.
2121+type StarsBackfillWorker struct {
2222+ *Worker[string]
2323+ store *store.Store
2424+ dir identity.Directory
2525+ httpClient *http.Client
2626+ pageSize int64
2727+ discovery DiscoveryEnqueuer
2828+}
2929+3030+func NewStarsBackfillWorker(s *store.Store, dir identity.Directory, logger *slog.Logger, discovery DiscoveryEnqueuer, queueSize int) *StarsBackfillWorker {
3131+ w := &StarsBackfillWorker{
3232+ store: s,
3333+ dir: dir,
3434+ httpClient: &http.Client{Timeout: 30 * time.Second},
3535+ pageSize: 100,
3636+ discovery: discovery,
3737+ }
3838+ w.Worker = NewWorker(WorkerConfig[string]{
3939+ Name: "stars-backfill",
4040+ Logger: logger,
4141+ Process: w.process,
4242+ QueueSize: queueSize,
4343+ SeenTTL: 6 * time.Hour,
4444+ MinDelay: 500 * time.Millisecond,
4545+ })
4646+ return w
4747+}
4848+4949+func (w *StarsBackfillWorker) Enqueue(did string) { w.Worker.Enqueue(did) }
5050+5151+func (w *StarsBackfillWorker) process(ctx context.Context, did string) error {
5252+ d, err := syntax.ParseDID(did)
5353+ if err != nil {
5454+ return err
5555+ }
5656+ ident, err := w.dir.LookupDID(ctx, d)
5757+ if err != nil {
5858+ return fmt.Errorf("identity lookup: %w", err)
5959+ }
6060+ pds := ident.PDSEndpoint()
6161+ if pds == "" {
6262+ return fmt.Errorf("no PDS endpoint")
6363+ }
6464+6565+ xc := &xrpc.Client{Client: w.httpClient, Host: pds}
6666+ cursor := ""
6767+ total := 0
6868+ for {
6969+ records, next, err := w.listStars(ctx, xc, did, cursor)
7070+ if err != nil {
7171+ return err
7272+ }
7373+ for _, r := range records {
7474+ if r.Value.Subject == "" {
7575+ continue
7676+ }
7777+ createdAt, err := time.Parse(time.RFC3339Nano, r.Value.CreatedAt)
7878+ if err != nil {
7979+ createdAt = time.Now()
8080+ }
8181+ if err := w.store.UpsertStar(ctx, r.URI, r.Value.Subject, createdAt); err != nil {
8282+ w.logger.Warn("upsert star", "err", err, "uri", r.URI)
8383+ continue
8484+ }
8585+ total++
8686+ if w.discovery != nil {
8787+ // Block on backpressure — a single user can have hundreds of
8888+ // stars and discovery is rate-limited per PDS call.
8989+ w.discovery.EnqueueAtURIWait(ctx, r.Value.Subject)
9090+ }
9191+ }
9292+ if next == "" || next == cursor || len(records) == 0 {
9393+ break
9494+ }
9595+ cursor = next
9696+ }
9797+ if total > 0 {
9898+ w.logger.Info("backfilled stars", "did", did, "count", total)
9999+ }
100100+ return nil
101101+}
102102+103103+type listRecordEntry struct {
104104+ URI string `json:"uri"`
105105+ CID string `json:"cid"`
106106+ Value starRecord `json:"value"`
107107+}
108108+109109+func (w *StarsBackfillWorker) listStars(ctx context.Context, xc *xrpc.Client, did, cursor string) ([]listRecordEntry, string, error) {
110110+ params := map[string]any{
111111+ "repo": did,
112112+ "collection": "sh.tangled.feed.star",
113113+ "limit": w.pageSize,
114114+ }
115115+ if cursor != "" {
116116+ params["cursor"] = cursor
117117+ }
118118+ var out struct {
119119+ Cursor string `json:"cursor"`
120120+ Records []listRecordEntry `json:"records"`
121121+ }
122122+ if err := xc.Do(ctx, xrpc.Query, "", "com.atproto.repo.listRecords", params, nil, &out); err != nil {
123123+ return nil, "", err
124124+ }
125125+ return out.Records, out.Cursor, nil
126126+}
+122
firehose/worker.go
···11+package firehose
22+33+import (
44+ "context"
55+ "log/slog"
66+ "sync"
77+ "time"
88+)
99+1010+// Worker is the shared skeleton behind every async enrichment/backfill
1111+// worker: a bounded queue, a TTL'd dedupe map, a single goroutine that
1212+// processes items with a min delay between calls, and immediate-retry on
1313+// process errors (the dedupe entry is dropped).
1414+//
1515+// SeenTTL means "skip an item that was last seen less than this ago". A short
1616+// TTL pairs with a periodic backfill seeder — together they replace the old
1717+// restart-required pattern: stale items naturally re-enter the queue without a
1818+// daemon bounce.
1919+type Worker[T comparable] struct {
2020+ name string
2121+ queue chan T
2222+ seen sync.Map // T -> time.Time
2323+ seenTTL time.Duration
2424+ minDelay time.Duration
2525+ logger *slog.Logger
2626+ process func(ctx context.Context, item T) error
2727+}
2828+2929+type WorkerConfig[T comparable] struct {
3030+ Name string // logged on errors / drops
3131+ Logger *slog.Logger // required
3232+ Process func(ctx context.Context, item T) error // required
3333+ QueueSize int // default 256
3434+ SeenTTL time.Duration // 0 = forever
3535+ MinDelay time.Duration // gap between processed items
3636+}
3737+3838+func NewWorker[T comparable](cfg WorkerConfig[T]) *Worker[T] {
3939+ if cfg.QueueSize <= 0 {
4040+ cfg.QueueSize = 1024
4141+ }
4242+ return &Worker[T]{
4343+ name: cfg.Name,
4444+ queue: make(chan T, cfg.QueueSize),
4545+ seenTTL: cfg.SeenTTL,
4646+ minDelay: cfg.MinDelay,
4747+ logger: cfg.Logger,
4848+ process: cfg.Process,
4949+ }
5050+}
5151+5252+// Enqueue is non-blocking. Returns false if the item was deduped (within TTL)
5353+// or the queue is full. Zero-valued items are rejected. Use this from live
5454+// event handlers where blocking would stall the firehose.
5555+func (w *Worker[T]) Enqueue(item T) bool {
5656+ var zero T
5757+ if item == zero {
5858+ return false
5959+ }
6060+ if !w.markSeen(item) {
6161+ return false
6262+ }
6363+ select {
6464+ case w.queue <- item:
6565+ return true
6666+ default:
6767+ w.seen.Delete(item)
6868+ w.logger.Warn("queue full, dropping", "worker", w.name, "item", item)
6969+ return false
7070+ }
7171+}
7272+7373+// EnqueueWait blocks until the queue accepts the item or ctx is cancelled.
7474+// Use this from chained producers (one worker enqueueing into another) so the
7575+// producer paces itself to the consumer's rate instead of dropping work.
7676+func (w *Worker[T]) EnqueueWait(ctx context.Context, item T) bool {
7777+ var zero T
7878+ if item == zero {
7979+ return false
8080+ }
8181+ if !w.markSeen(item) {
8282+ return false
8383+ }
8484+ select {
8585+ case w.queue <- item:
8686+ return true
8787+ case <-ctx.Done():
8888+ w.seen.Delete(item)
8989+ return false
9090+ }
9191+}
9292+9393+func (w *Worker[T]) markSeen(item T) bool {
9494+ if v, ok := w.seen.Load(item); ok {
9595+ if w.seenTTL == 0 || time.Since(v.(time.Time)) < w.seenTTL {
9696+ return false
9797+ }
9898+ }
9999+ w.seen.Store(item, time.Now())
100100+ return true
101101+}
102102+103103+func (w *Worker[T]) Run(ctx context.Context) {
104104+ for {
105105+ select {
106106+ case <-ctx.Done():
107107+ return
108108+ case item := <-w.queue:
109109+ if err := w.process(ctx, item); err != nil {
110110+ w.logger.Warn(w.name, "err", err, "item", item)
111111+ w.seen.Delete(item)
112112+ }
113113+ if w.minDelay > 0 {
114114+ select {
115115+ case <-ctx.Done():
116116+ return
117117+ case <-time.After(w.minDelay):
118118+ }
119119+ }
120120+ }
121121+ }
122122+}