···2727}
28282929type WorkerConfig[T comparable] struct {
3030- Name string // logged on errors / drops
3131- Logger *slog.Logger // required
3232- Process func(ctx context.Context, item T) error // required
3333- QueueSize int // default 256
3434- SeenTTL time.Duration // 0 = forever
3535- MinDelay time.Duration // gap between processed items
3030+ Name string // logged on errors / drops
3131+ Logger *slog.Logger // required
3232+ Process func(ctx context.Context, item T) error // required
3333+ QueueSize int // default 256
3434+ SeenTTL time.Duration // 0 = forever
3535+ MinDelay time.Duration // gap between processed items
3636}
37373838func NewWorker[T comparable](cfg WorkerConfig[T]) *Worker[T] {
+47
store/cursor.go
···11+package store
22+33+import (
44+ "context"
55+ "time"
66+77+ "tangled.sh/chown.de/tangled-repo-firehose/ent"
88+ "tangled.sh/chown.de/tangled-repo-firehose/ent/cursor"
99+)
1010+1111+// GetCursor returns the persisted Jetstream cursor (unix microseconds), or 0
1212+// if none has been recorded yet.
1313+func (s *Store) GetCursor(ctx context.Context) (int64, error) {
1414+ row, err := s.client.Cursor.Query().Where(cursor.IDEQ(1)).Only(ctx)
1515+ if ent.IsNotFound(err) {
1616+ return 0, nil
1717+ }
1818+ if err != nil {
1919+ return 0, err
2020+ }
2121+ return row.Cursor, nil
2222+}
2323+2424+// AdvanceCursor moves the cursor forward without applying any record. Used
2525+// for non-tracked event kinds so we don't replay them on restart.
2626+func (s *Store) AdvanceCursor(ctx context.Context, cur int64) error {
2727+ return withTx(ctx, s.client, func(tx *ent.Tx) error {
2828+ return advanceCursor(ctx, tx, cur)
2929+ })
3030+}
3131+3232+func advanceCursor(ctx context.Context, tx *ent.Tx, cur int64) error {
3333+ // Never move the cursor backward — out-of-order replays would otherwise
3434+ // rewind progress.
3535+ existing, err := tx.Cursor.Query().Where(cursor.IDEQ(1)).Only(ctx)
3636+ now := time.Now().UnixMilli()
3737+ if ent.IsNotFound(err) {
3838+ return tx.Cursor.Create().SetID(1).SetCursor(cur).SetUpdated(now).Exec(ctx)
3939+ }
4040+ if err != nil {
4141+ return err
4242+ }
4343+ if cur <= existing.Cursor {
4444+ return nil
4545+ }
4646+ return tx.Cursor.UpdateOneID(1).SetCursor(cur).SetUpdated(now).Exec(ctx)
4747+}
+128
store/handle.go
···11+package store
22+33+import (
44+ "context"
55+ "time"
66+77+ "tangled.sh/chown.de/tangled-repo-firehose/ent"
88+ "tangled.sh/chown.de/tangled-repo-firehose/ent/handle"
99+)
1010+1111+// AllKnownDIDs returns every DID we've encountered: repo owners plus star
1212+// authors parsed from existing star at-uris. Uses raw SQL because ent has no
1313+// clean expression for substring + UNION.
1414+func (s *Store) AllKnownDIDs(ctx context.Context, limit int) ([]string, error) {
1515+ rows, err := s.db.QueryContext(ctx, `
1616+ SELECT did FROM repos
1717+ UNION
1818+ SELECT substr(at_uri, 6, instr(substr(at_uri, 6), '/') - 1) AS did
1919+ FROM stars
2020+ WHERE substr(at_uri, 6, instr(substr(at_uri, 6), '/') - 1) != ''
2121+ LIMIT ?
2222+ `, limit)
2323+ if err != nil {
2424+ return nil, err
2525+ }
2626+ defer rows.Close()
2727+ var out []string
2828+ for rows.Next() {
2929+ var did string
3030+ if err := rows.Scan(&did); err != nil {
3131+ return nil, err
3232+ }
3333+ if did != "" {
3434+ out = append(out, did)
3535+ }
3636+ }
3737+ return out, rows.Err()
3838+}
3939+4040+// UpsertHandle stores or refreshes a DID→handle mapping.
4141+func (s *Store) UpsertHandle(ctx context.Context, did, h string) error {
4242+ return s.client.Handle.Create().
4343+ SetID(did).
4444+ SetHandle(h).
4545+ SetRefreshedAt(time.Now().UnixMilli()).
4646+ OnConflict().
4747+ UpdateNewValues().
4848+ Exec(ctx)
4949+}
5050+5151+// Handle returns the cached handle for a DID, or "" if unknown.
5252+func (s *Store) Handle(ctx context.Context, did string) (string, error) {
5353+ row, err := s.client.Handle.Query().Where(handle.IDEQ(did)).Only(ctx)
5454+ if ent.IsNotFound(err) {
5555+ return "", nil
5656+ }
5757+ if err != nil {
5858+ return "", err
5959+ }
6060+ return row.Handle, nil
6161+}
6262+6363+// DIDsNeedingHandles returns owner DIDs from repos that have no cached handle
6464+// yet, or whose cached handle is older than maxAge.
6565+func (s *Store) DIDsNeedingHandles(ctx context.Context, maxAge time.Duration, limit int) ([]string, error) {
6666+ cutoff := time.Now().Add(-maxAge).UnixMilli()
6767+ rows, err := s.db.QueryContext(ctx, `
6868+ SELECT DISTINCT r.did
6969+ FROM repos r
7070+ LEFT JOIN handles h ON h.did = r.did
7171+ WHERE h.did IS NULL OR h.refreshed_at < ?
7272+ LIMIT ?
7373+ `, cutoff, limit)
7474+ if err != nil {
7575+ return nil, err
7676+ }
7777+ defer rows.Close()
7878+ var out []string
7979+ for rows.Next() {
8080+ var did string
8181+ if err := rows.Scan(&did); err != nil {
8282+ return nil, err
8383+ }
8484+ out = append(out, did)
8585+ }
8686+ return out, rows.Err()
8787+}
8888+8989+// HandleEntry is one row in the handles view.
9090+type HandleEntry struct {
9191+ DID string
9292+ Handle string
9393+ RefreshedAt time.Time
9494+ Repos int
9595+}
9696+9797+// Handles returns every resolved DID→handle mapping with repo count. Pass
9898+// limit <= 0 for no limit.
9999+func (s *Store) Handles(ctx context.Context, limit int) ([]HandleEntry, error) {
100100+ sqlLimit := -1
101101+ if limit > 0 {
102102+ sqlLimit = limit
103103+ }
104104+ rows, err := s.db.QueryContext(ctx, `
105105+ SELECT h.did, h.handle, h.refreshed_at,
106106+ (SELECT COUNT(*) FROM repos r WHERE r.did = h.did) AS repos
107107+ FROM handles h
108108+ ORDER BY repos DESC, h.handle
109109+ LIMIT ?
110110+ `, sqlLimit)
111111+ if err != nil {
112112+ return nil, err
113113+ }
114114+ defer rows.Close()
115115+ var out []HandleEntry
116116+ for rows.Next() {
117117+ var (
118118+ h HandleEntry
119119+ refreshed int64
120120+ )
121121+ if err := rows.Scan(&h.DID, &h.Handle, &refreshed, &h.Repos); err != nil {
122122+ return nil, err
123123+ }
124124+ h.RefreshedAt = time.UnixMilli(refreshed)
125125+ out = append(out, h)
126126+ }
127127+ return out, rows.Err()
128128+}
+163
store/knot.go
···11+package store
22+33+import (
44+ "context"
55+ "time"
66+77+ "tangled.sh/chown.de/tangled-repo-firehose/ent"
88+ "tangled.sh/chown.de/tangled-repo-firehose/ent/knot"
99+ "tangled.sh/chown.de/tangled-repo-firehose/ent/repo"
1010+)
1111+1212+// KnotEntry is one row in the knots view.
1313+type KnotEntry struct {
1414+ Host string
1515+ FirstSeen time.Time
1616+ LastSeen time.Time
1717+ LastOK *time.Time
1818+ LastError *time.Time
1919+ LastErrorMsg string
2020+ ConsecutiveErrors int
2121+ Disabled bool
2222+ Repos int
2323+}
2424+2525+// Knots returns every knot host with bookkeeping plus a count of repos
2626+// hosted on it.
2727+func (s *Store) Knots(ctx context.Context) ([]KnotEntry, error) {
2828+ knots, err := s.client.Knot.Query().
2929+ Order(ent.Desc(knot.FieldLastSeen)).
3030+ All(ctx)
3131+ if err != nil {
3232+ return nil, err
3333+ }
3434+ out := make([]KnotEntry, 0, len(knots))
3535+ for _, k := range knots {
3636+ repos, err := s.client.Repo.Query().Where(repo.KnotEQ(k.ID)).Count(ctx)
3737+ if err != nil {
3838+ return nil, err
3939+ }
4040+ entry := KnotEntry{
4141+ Host: k.ID,
4242+ FirstSeen: time.UnixMilli(k.FirstSeen),
4343+ LastSeen: time.UnixMilli(k.LastSeen),
4444+ ConsecutiveErrors: k.ConsecutiveErrors,
4545+ Disabled: k.Disabled != 0,
4646+ Repos: repos,
4747+ }
4848+ if k.LastOkAt != nil {
4949+ t := time.UnixMilli(*k.LastOkAt)
5050+ entry.LastOK = &t
5151+ }
5252+ if k.LastErrorAt != nil {
5353+ t := time.UnixMilli(*k.LastErrorAt)
5454+ entry.LastError = &t
5555+ }
5656+ if k.LastError != nil {
5757+ entry.LastErrorMsg = *k.LastError
5858+ }
5959+ out = append(out, entry)
6060+ }
6161+ // Sort by repos desc, then host — done in Go since it's a derived count.
6262+ for i := range out {
6363+ for j := i + 1; j < len(out); j++ {
6464+ if out[j].Repos > out[i].Repos || (out[j].Repos == out[i].Repos && out[j].Host < out[i].Host) {
6565+ out[i], out[j] = out[j], out[i]
6666+ }
6767+ }
6868+ }
6969+ return out, nil
7070+}
7171+7272+// UpsertKnot records that we've seen this host, refreshing last_seen.
7373+func (s *Store) UpsertKnot(ctx context.Context, host string) error {
7474+ if host == "" {
7575+ return nil
7676+ }
7777+ now := time.Now().UnixMilli()
7878+ return s.client.Knot.Create().
7979+ SetID(host).
8080+ SetFirstSeen(now).
8181+ SetLastSeen(now).
8282+ OnConflict().
8383+ Update(func(u *ent.KnotUpsert) {
8484+ u.SetLastSeen(now)
8585+ }).
8686+ Exec(ctx)
8787+}
8888+8989+// MarkKnotOK resets consecutive_errors on a successful call.
9090+func (s *Store) MarkKnotOK(ctx context.Context, host string) error {
9191+ if host == "" {
9292+ return nil
9393+ }
9494+ now := time.Now().UnixMilli()
9595+ return s.client.Knot.Create().
9696+ SetID(host).
9797+ SetFirstSeen(now).
9898+ SetLastSeen(now).
9999+ SetLastOkAt(now).
100100+ OnConflict().
101101+ Update(func(u *ent.KnotUpsert) {
102102+ u.SetLastSeen(now)
103103+ u.SetLastOkAt(now)
104104+ u.SetConsecutiveErrors(0)
105105+ }).
106106+ Exec(ctx)
107107+}
108108+109109+// MarkKnotError records a failure and increments consecutive_errors.
110110+func (s *Store) MarkKnotError(ctx context.Context, host, reason string) error {
111111+ if host == "" {
112112+ return nil
113113+ }
114114+ if len(reason) > 500 {
115115+ reason = reason[:500]
116116+ }
117117+ now := time.Now().UnixMilli()
118118+ // ent has no SQL-side "consecutive_errors + 1" expression in upsert, so
119119+ // we drop to raw SQL for the increment. Insert path uses ent.
120120+ _, err := s.db.ExecContext(ctx, `
121121+ INSERT INTO knots (host, first_seen, last_seen, last_error_at, last_error, consecutive_errors, disabled)
122122+ VALUES (?, ?, ?, ?, ?, 1, 0)
123123+ ON CONFLICT(host) DO UPDATE SET
124124+ last_error_at = excluded.last_error_at,
125125+ last_error = excluded.last_error,
126126+ last_seen = excluded.last_seen,
127127+ consecutive_errors = consecutive_errors + 1
128128+ `, host, now, now, now, reason)
129129+ return err
130130+}
131131+132132+// KnotCircuitOpen reports whether further calls to a host should be skipped.
133133+func (s *Store) KnotCircuitOpen(ctx context.Context, host string, minErrors int, cooldown time.Duration) (bool, error) {
134134+ if host == "" {
135135+ return false, nil
136136+ }
137137+ k, err := s.client.Knot.Query().Where(knot.IDEQ(host)).Only(ctx)
138138+ if ent.IsNotFound(err) {
139139+ return false, nil
140140+ }
141141+ if err != nil {
142142+ return false, err
143143+ }
144144+ if k.Disabled != 0 {
145145+ return true, nil
146146+ }
147147+ if k.ConsecutiveErrors < minErrors || k.LastErrorAt == nil {
148148+ return false, nil
149149+ }
150150+ return time.UnixMilli(*k.LastErrorAt).Add(cooldown).After(time.Now()), nil
151151+}
152152+153153+// SetKnotDisabled toggles the knot's disabled flag.
154154+func (s *Store) SetKnotDisabled(ctx context.Context, host string, disabled bool) error {
155155+ if host == "" {
156156+ return nil
157157+ }
158158+ v := 0
159159+ if disabled {
160160+ v = 1
161161+ }
162162+ return s.client.Knot.UpdateOneID(host).SetDisabled(v).Exec(ctx)
163163+}
+87
store/lang.go
···11+package store
22+33+import (
44+ "context"
55+ "encoding/json"
66+ "sort"
77+ "time"
88+)
99+1010+// UpdateLanguages writes the languages JSON map for an existing repo.
1111+func (s *Store) UpdateLanguages(ctx context.Context, atURI string, langs map[string]int64) error {
1212+ raw, err := json.Marshal(langs)
1313+ if err != nil {
1414+ return err
1515+ }
1616+ return s.client.Repo.UpdateOneID(atURI).
1717+ SetLanguages(string(raw)).
1818+ SetLanguageAt(time.Now().UnixMilli()).
1919+ Exec(ctx)
2020+}
2121+2222+// ReposNeedingLanguages returns repos whose language enrichment is missing or
2323+// older than maxAge.
2424+func (s *Store) ReposNeedingLanguages(ctx context.Context, maxAge time.Duration, limit int) ([]Repo, error) {
2525+ cutoff := time.Now().Add(-maxAge).UnixMilli()
2626+ rows, err := s.db.QueryContext(ctx, `
2727+ SELECT at_uri, did, rkey, name, knot
2828+ FROM repos
2929+ WHERE language_at IS NULL OR language_at < ?
3030+ ORDER BY seen_at DESC
3131+ LIMIT ?
3232+ `, cutoff, limit)
3333+ if err != nil {
3434+ return nil, err
3535+ }
3636+ defer rows.Close()
3737+ var out []Repo
3838+ for rows.Next() {
3939+ var r Repo
4040+ if err := rows.Scan(&r.AtURI, &r.DID, &r.Rkey, &r.Name, &r.Knot); err != nil {
4141+ return nil, err
4242+ }
4343+ out = append(out, r)
4444+ }
4545+ return out, rows.Err()
4646+}
4747+4848+// PrimaryLanguages returns the sorted set of "primary" languages observed
4949+// across all enriched repos — i.e., the highest-bytes language per repo,
5050+// deduplicated. Used to populate the language filter dropdown.
5151+func (s *Store) PrimaryLanguages(ctx context.Context) ([]string, error) {
5252+ rows, err := s.db.QueryContext(ctx, `SELECT languages FROM repos WHERE languages IS NOT NULL AND languages != '' AND languages != '{}'`)
5353+ if err != nil {
5454+ return nil, err
5555+ }
5656+ defer rows.Close()
5757+ seen := make(map[string]struct{})
5858+ for rows.Next() {
5959+ var raw string
6060+ if err := rows.Scan(&raw); err != nil {
6161+ return nil, err
6262+ }
6363+ var langs map[string]int64
6464+ if err := json.Unmarshal([]byte(raw), &langs); err != nil {
6565+ continue
6666+ }
6767+ var best string
6868+ var bestN int64
6969+ for k, v := range langs {
7070+ if v > bestN {
7171+ bestN, best = v, k
7272+ }
7373+ }
7474+ if best != "" {
7575+ seen[best] = struct{}{}
7676+ }
7777+ }
7878+ if err := rows.Err(); err != nil {
7979+ return nil, err
8080+ }
8181+ out := make([]string, 0, len(seen))
8282+ for k := range seen {
8383+ out = append(out, k)
8484+ }
8585+ sort.Strings(out)
8686+ return out, nil
8787+}
+205
store/repo.go
···11+package store
22+33+import (
44+ "context"
55+ "database/sql"
66+ "encoding/json"
77+ "time"
88+99+ "tangled.sh/chown.de/tangled-repo-firehose/ent"
1010+ "tangled.sh/chown.de/tangled-repo-firehose/ent/discoveryfailed"
1111+ "tangled.sh/chown.de/tangled-repo-firehose/ent/repo"
1212+)
1313+1414+// ApplyRepoUpsert writes (or replaces) a repo row and advances the cursor in
1515+// a single transaction. Languages column is intentionally untouched so async
1616+// language enrichment isn't clobbered by a record update.
1717+func (s *Store) ApplyRepoUpsert(ctx context.Context, r Repo, cur int64) error {
1818+ return withTx(ctx, s.client, func(tx *ent.Tx) error {
1919+ if err := upsertRepo(ctx, tx, r); err != nil {
2020+ return err
2121+ }
2222+ return advanceCursor(ctx, tx, cur)
2323+ })
2424+}
2525+2626+// ApplyRepoDelete removes a repo and advances the cursor in a single txn.
2727+func (s *Store) ApplyRepoDelete(ctx context.Context, atURI string, cur int64) error {
2828+ return withTx(ctx, s.client, func(tx *ent.Tx) error {
2929+ if _, err := tx.Repo.Delete().Where(repo.IDEQ(atURI)).Exec(ctx); err != nil {
3030+ return err
3131+ }
3232+ return advanceCursor(ctx, tx, cur)
3333+ })
3434+}
3535+3636+// UpsertRepo writes (or replaces) a repo row outside the firehose path —
3737+// used by discovery when a star references a repo we haven't seen yet. Does
3838+// NOT touch the cursor.
3939+func (s *Store) UpsertRepo(ctx context.Context, r Repo) error {
4040+ tx, err := s.client.Tx(ctx)
4141+ if err != nil {
4242+ return err
4343+ }
4444+ defer func() { _ = tx.Rollback() }()
4545+ if err := upsertRepo(ctx, tx, r); err != nil {
4646+ return err
4747+ }
4848+ return tx.Commit()
4949+}
5050+5151+func upsertRepo(ctx context.Context, tx *ent.Tx, r Repo) error {
5252+ topicsJSON, err := json.Marshal(r.Topics)
5353+ if err != nil {
5454+ return err
5555+ }
5656+ topics := string(topicsJSON)
5757+ create := tx.Repo.Create().
5858+ SetID(r.AtURI).
5959+ SetDid(r.DID).
6060+ SetRkey(r.Rkey).
6161+ SetName(r.Name).
6262+ SetKnot(r.Knot).
6363+ SetCreatedAt(r.CreatedAt.UnixMilli()).
6464+ SetSeenAt(r.SeenAt.UnixMilli()).
6565+ SetTopics(topics).
6666+ SetDescription(r.Description).
6767+ SetWebsite(r.Website).
6868+ SetSource(r.Source).
6969+ SetSpindle(r.Spindle).
7070+ SetRepoDid(r.RepoDID)
7171+ return create.OnConflict().
7272+ UpdateNewValues().
7373+ Exec(ctx)
7474+}
7575+7676+// HasRepo reports whether a repo at-uri already exists in the index.
7777+func (s *Store) HasRepo(ctx context.Context, atURI string) (bool, error) {
7878+ return s.client.Repo.Query().Where(repo.IDEQ(atURI)).Exist(ctx)
7979+}
8080+8181+// MarkDiscoveryFailed records that a PDS returned RecordNotFound for this
8282+// at-uri so we don't keep retrying every backfill cycle.
8383+func (s *Store) MarkDiscoveryFailed(ctx context.Context, atURI, reason string) error {
8484+ return s.client.DiscoveryFailed.Create().
8585+ SetID(atURI).
8686+ SetFailedAt(time.Now().UnixMilli()).
8787+ SetReason(reason).
8888+ OnConflict().
8989+ UpdateNewValues().
9090+ Exec(ctx)
9191+}
9292+9393+// DiscoveryFailedRecently reports whether a previous discovery attempt failed
9494+// within the given window — used to skip known-deleted records on retry.
9595+func (s *Store) DiscoveryFailedRecently(ctx context.Context, atURI string, maxAge time.Duration) (bool, error) {
9696+ cutoff := time.Now().Add(-maxAge).UnixMilli()
9797+ return s.client.DiscoveryFailed.Query().
9898+ Where(discoveryfailed.IDEQ(atURI), discoveryfailed.FailedAtGTE(cutoff)).
9999+ Exist(ctx)
100100+}
101101+102102+// ReposFilter narrows the result of RecentRepos. Zero values mean "no
103103+// filter" so callers can mix and match.
104104+type ReposFilter struct {
105105+ Language string // primary language match (post-filter in Go over JSON map)
106106+ Since time.Time // repos created at or after this time (zero = no time floor)
107107+ ForksOnly bool // only rows with non-empty source
108108+ NoForks bool // exclude rows with non-empty source
109109+}
110110+111111+// RecentRepos returns repos matching the filter, sorted newest first. Pass
112112+// limit <= 0 for no limit. Uses raw SQL because the scalar star-count
113113+// subquery and handle LEFT JOIN are awkward to express via ent's query API.
114114+func (s *Store) RecentRepos(ctx context.Context, f ReposFilter, limit int) ([]Repo, error) {
115115+ candidateLimit := -1
116116+ if limit > 0 {
117117+ candidateLimit = limit
118118+ if f.Language != "" {
119119+ candidateLimit = limit * 5
120120+ }
121121+ }
122122+ forksOnly := 0
123123+ if f.ForksOnly {
124124+ forksOnly = 1
125125+ }
126126+ noForks := 0
127127+ if f.NoForks {
128128+ noForks = 1
129129+ }
130130+131131+ rows, err := s.db.QueryContext(ctx, `
132132+ SELECT r.at_uri, r.did, r.rkey, r.name, r.knot, r.description, r.topics,
133133+ r.website, r.source, r.spindle, r.repo_did, r.created_at, r.seen_at,
134134+ r.languages, r.language_at, COALESCE(h.handle, ''),
135135+ (SELECT COUNT(*) FROM stars s WHERE s.subject = r.at_uri) AS star_count
136136+ FROM repos r
137137+ LEFT JOIN handles h ON h.did = r.did
138138+ WHERE r.created_at >= ?
139139+ AND (? = '' OR r.languages IS NOT NULL)
140140+ AND (? = 0 OR (r.source IS NOT NULL AND r.source != ''))
141141+ AND (? = 0 OR r.source IS NULL OR r.source = '')
142142+ ORDER BY r.created_at DESC
143143+ LIMIT ?
144144+ `, f.Since.UnixMilli(), f.Language, forksOnly, noForks, candidateLimit)
145145+ if err != nil {
146146+ return nil, err
147147+ }
148148+ defer rows.Close()
149149+150150+ var out []Repo
151151+ for rows.Next() {
152152+ r, err := scanRepo(rows)
153153+ if err != nil {
154154+ return nil, err
155155+ }
156156+ if f.Language != "" && r.Primary() != f.Language {
157157+ continue
158158+ }
159159+ out = append(out, r)
160160+ if limit > 0 && len(out) >= limit {
161161+ break
162162+ }
163163+ }
164164+ return out, rows.Err()
165165+}
166166+167167+// scanRepo reads a row from the SQL projection used by RecentRepos.
168168+func scanRepo(rows *sql.Rows) (Repo, error) {
169169+ var (
170170+ r Repo
171171+ description sql.NullString
172172+ topicsRaw sql.NullString
173173+ website sql.NullString
174174+ source sql.NullString
175175+ spindle sql.NullString
176176+ repoDID sql.NullString
177177+ languages sql.NullString
178178+ languageAt sql.NullInt64
179179+ createdAt int64
180180+ seenAt int64
181181+ )
182182+ if err := rows.Scan(&r.AtURI, &r.DID, &r.Rkey, &r.Name, &r.Knot,
183183+ &description, &topicsRaw, &website, &source, &spindle, &repoDID,
184184+ &createdAt, &seenAt, &languages, &languageAt, &r.Handle, &r.StarCount); err != nil {
185185+ return r, err
186186+ }
187187+ r.Description = description.String
188188+ r.Website = website.String
189189+ r.Source = source.String
190190+ r.Spindle = spindle.String
191191+ r.RepoDID = repoDID.String
192192+ r.CreatedAt = time.UnixMilli(createdAt)
193193+ r.SeenAt = time.UnixMilli(seenAt)
194194+ if topicsRaw.Valid && topicsRaw.String != "" && topicsRaw.String != "null" {
195195+ _ = json.Unmarshal([]byte(topicsRaw.String), &r.Topics)
196196+ }
197197+ if languages.Valid && languages.String != "" {
198198+ _ = json.Unmarshal([]byte(languages.String), &r.Languages)
199199+ }
200200+ if languageAt.Valid {
201201+ t := time.UnixMilli(languageAt.Int64)
202202+ r.LanguageAt = &t
203203+ }
204204+ return r, nil
205205+}
+61
store/star.go
···11+package store
22+33+import (
44+ "context"
55+ "database/sql"
66+ "time"
77+88+ "tangled.sh/chown.de/tangled-repo-firehose/ent"
99+ "tangled.sh/chown.de/tangled-repo-firehose/ent/star"
1010+)
1111+1212+// ApplyStarCreate inserts a star (idempotent on at_uri) and advances the
1313+// cursor in a single transaction.
1414+func (s *Store) ApplyStarCreate(ctx context.Context, atURI, subject string, createdAt time.Time, cur int64) error {
1515+ return withTx(ctx, s.client, func(tx *ent.Tx) error {
1616+ err := tx.Star.Create().
1717+ SetID(atURI).
1818+ SetSubject(subject).
1919+ SetCreatedAt(createdAt.UnixMilli()).
2020+ OnConflict().
2121+ DoNothing().
2222+ Exec(ctx)
2323+ // ent's DoNothing returns sql.ErrNoRows when nothing was inserted —
2424+ // safe to ignore; the row already exists.
2525+ if err != nil && err != sql.ErrNoRows {
2626+ return err
2727+ }
2828+ return advanceCursor(ctx, tx, cur)
2929+ })
3030+}
3131+3232+// ApplyStarDelete removes a star row and advances the cursor in a single txn.
3333+func (s *Store) ApplyStarDelete(ctx context.Context, atURI string, cur int64) error {
3434+ return withTx(ctx, s.client, func(tx *ent.Tx) error {
3535+ if _, err := tx.Star.Delete().Where(star.IDEQ(atURI)).Exec(ctx); err != nil {
3636+ return err
3737+ }
3838+ return advanceCursor(ctx, tx, cur)
3939+ })
4040+}
4141+4242+// StarCount returns the number of star records held for a repo's at-uri.
4343+func (s *Store) StarCount(ctx context.Context, repoAtURI string) (int, error) {
4444+ return s.client.Star.Query().Where(star.SubjectEQ(repoAtURI)).Count(ctx)
4545+}
4646+4747+// UpsertStar inserts a star outside the firehose path — used by the stars
4848+// backfill worker. Idempotent on at-uri; does not advance the cursor.
4949+func (s *Store) UpsertStar(ctx context.Context, atURI, subject string, createdAt time.Time) error {
5050+ err := s.client.Star.Create().
5151+ SetID(atURI).
5252+ SetSubject(subject).
5353+ SetCreatedAt(createdAt.UnixMilli()).
5454+ OnConflict().
5555+ DoNothing().
5656+ Exec(ctx)
5757+ if err != nil && err != sql.ErrNoRows {
5858+ return err
5959+ }
6060+ return nil
6161+}
+1-644
store/store.go
···11// Package store is the persistence layer for the Tangled repo firehose
22// aggregator. Schema is defined in ent/schema/ and generated via
33-// `go generate ./ent`. Auto-migration runs on Open and is additive only —
44-// existing data is never dropped.
33+// `go generate ./ent`. Auto-migration runs on Open.
54package store
6576import (
87 "context"
98 "database/sql"
1010- "encoding/json"
119 "fmt"
1210 "io"
1313- "sort"
1411 "time"
15121613 "entgo.io/ent/dialect"
1714 entsql "entgo.io/ent/dialect/sql"
18151916 "tangled.sh/chown.de/tangled-repo-firehose/ent"
2020- "tangled.sh/chown.de/tangled-repo-firehose/ent/cursor"
2121- "tangled.sh/chown.de/tangled-repo-firehose/ent/discoveryfailed"
2222- "tangled.sh/chown.de/tangled-repo-firehose/ent/handle"
2323- "tangled.sh/chown.de/tangled-repo-firehose/ent/knot"
2424- "tangled.sh/chown.de/tangled-repo-firehose/ent/repo"
2525- "tangled.sh/chown.de/tangled-repo-firehose/ent/star"
26172718 _ "modernc.org/sqlite"
2819)
29203021// dsn assembles the modernc.org/sqlite connection string. PRAGMAs go in the
3122// query string so they're applied per-connection (the pool may open many).
3232-// ent's migrator checks foreign_keys at startup so it must be set here.
3323func dsn(path string) string {
3424 return path + "?_pragma=journal_mode(WAL)&_pragma=synchronous(NORMAL)&_pragma=busy_timeout(5000)&_pragma=foreign_keys(1)"
3525}
···4737 defer client.Close()
4838 return client.Schema.WriteTo(ctx, w)
4939}
5050-51405241// Repo mirrors a sh.tangled.repo record plus enrichment columns.
5342type Repo struct {
···10291}
1039210493func (s *Store) Close() error { return s.client.Close() }
105105-106106-// GetCursor returns the persisted Jetstream cursor (unix microseconds), or 0
107107-// if none has been recorded yet.
108108-func (s *Store) GetCursor(ctx context.Context) (int64, error) {
109109- row, err := s.client.Cursor.Query().Where(cursor.IDEQ(1)).Only(ctx)
110110- if ent.IsNotFound(err) {
111111- return 0, nil
112112- }
113113- if err != nil {
114114- return 0, err
115115- }
116116- return row.Cursor, nil
117117-}
118118-119119-// ApplyRepoUpsert writes (or replaces) a repo row and advances the cursor in
120120-// a single transaction. Languages column is intentionally untouched so async
121121-// language enrichment isn't clobbered by a record update.
122122-func (s *Store) ApplyRepoUpsert(ctx context.Context, r Repo, cur int64) error {
123123- return withTx(ctx, s.client, func(tx *ent.Tx) error {
124124- if err := upsertRepo(ctx, tx, r); err != nil {
125125- return err
126126- }
127127- return advanceCursor(ctx, tx, cur)
128128- })
129129-}
130130-131131-// ApplyRepoDelete removes a repo and advances the cursor in a single txn.
132132-func (s *Store) ApplyRepoDelete(ctx context.Context, atURI string, cur int64) error {
133133- return withTx(ctx, s.client, func(tx *ent.Tx) error {
134134- if _, err := tx.Repo.Delete().Where(repo.IDEQ(atURI)).Exec(ctx); err != nil {
135135- return err
136136- }
137137- return advanceCursor(ctx, tx, cur)
138138- })
139139-}
140140-141141-// AdvanceCursor moves the cursor forward without applying any record. Used
142142-// for non-tracked event kinds so we don't replay them on restart.
143143-func (s *Store) AdvanceCursor(ctx context.Context, cur int64) error {
144144- return withTx(ctx, s.client, func(tx *ent.Tx) error {
145145- return advanceCursor(ctx, tx, cur)
146146- })
147147-}
148148-149149-func advanceCursor(ctx context.Context, tx *ent.Tx, cur int64) error {
150150- // Never move the cursor backward — out-of-order replays would otherwise
151151- // rewind progress.
152152- existing, err := tx.Cursor.Query().Where(cursor.IDEQ(1)).Only(ctx)
153153- now := time.Now().UnixMilli()
154154- if ent.IsNotFound(err) {
155155- return tx.Cursor.Create().SetID(1).SetCursor(cur).SetUpdated(now).Exec(ctx)
156156- }
157157- if err != nil {
158158- return err
159159- }
160160- if cur <= existing.Cursor {
161161- return nil
162162- }
163163- return tx.Cursor.UpdateOneID(1).SetCursor(cur).SetUpdated(now).Exec(ctx)
164164-}
165165-166166-// UpsertRepo writes (or replaces) a repo row outside the firehose path —
167167-// used by discovery when a star references a repo we haven't seen yet. Does
168168-// NOT touch the cursor.
169169-func (s *Store) UpsertRepo(ctx context.Context, r Repo) error {
170170- tx, err := s.client.Tx(ctx)
171171- if err != nil {
172172- return err
173173- }
174174- defer func() { _ = tx.Rollback() }()
175175- if err := upsertRepo(ctx, tx, r); err != nil {
176176- return err
177177- }
178178- return tx.Commit()
179179-}
180180-181181-func upsertRepo(ctx context.Context, tx *ent.Tx, r Repo) error {
182182- topicsJSON, err := json.Marshal(r.Topics)
183183- if err != nil {
184184- return err
185185- }
186186- topics := string(topicsJSON)
187187- create := tx.Repo.Create().
188188- SetID(r.AtURI).
189189- SetDid(r.DID).
190190- SetRkey(r.Rkey).
191191- SetName(r.Name).
192192- SetKnot(r.Knot).
193193- SetCreatedAt(r.CreatedAt.UnixMilli()).
194194- SetSeenAt(r.SeenAt.UnixMilli()).
195195- SetTopics(topics).
196196- SetDescription(r.Description).
197197- SetWebsite(r.Website).
198198- SetSource(r.Source).
199199- SetSpindle(r.Spindle).
200200- SetRepoDid(r.RepoDID)
201201- return create.OnConflict().
202202- UpdateNewValues().
203203- Exec(ctx)
204204-}
205205-206206-// HasRepo reports whether a repo at-uri already exists in the index.
207207-func (s *Store) HasRepo(ctx context.Context, atURI string) (bool, error) {
208208- return s.client.Repo.Query().Where(repo.IDEQ(atURI)).Exist(ctx)
209209-}
210210-211211-// MarkDiscoveryFailed records that a PDS returned RecordNotFound for this
212212-// at-uri so we don't keep retrying every backfill cycle.
213213-func (s *Store) MarkDiscoveryFailed(ctx context.Context, atURI, reason string) error {
214214- return s.client.DiscoveryFailed.Create().
215215- SetID(atURI).
216216- SetFailedAt(time.Now().UnixMilli()).
217217- SetReason(reason).
218218- OnConflict().
219219- UpdateNewValues().
220220- Exec(ctx)
221221-}
222222-223223-// DiscoveryFailedRecently reports whether a previous discovery attempt failed
224224-// within the given window — used to skip known-deleted records on retry.
225225-func (s *Store) DiscoveryFailedRecently(ctx context.Context, atURI string, maxAge time.Duration) (bool, error) {
226226- cutoff := time.Now().Add(-maxAge).UnixMilli()
227227- return s.client.DiscoveryFailed.Query().
228228- Where(discoveryfailed.IDEQ(atURI), discoveryfailed.FailedAtGTE(cutoff)).
229229- Exist(ctx)
230230-}
231231-232232-// ApplyStarCreate inserts a star (idempotent on at_uri) and advances the
233233-// cursor in a single transaction.
234234-func (s *Store) ApplyStarCreate(ctx context.Context, atURI, subject string, createdAt time.Time, cur int64) error {
235235- return withTx(ctx, s.client, func(tx *ent.Tx) error {
236236- err := tx.Star.Create().
237237- SetID(atURI).
238238- SetSubject(subject).
239239- SetCreatedAt(createdAt.UnixMilli()).
240240- OnConflict().
241241- DoNothing().
242242- Exec(ctx)
243243- // ent's DoNothing returns sql.ErrNoRows when nothing was inserted —
244244- // safe to ignore; the row already exists.
245245- if err != nil && err != sql.ErrNoRows {
246246- return err
247247- }
248248- return advanceCursor(ctx, tx, cur)
249249- })
250250-}
251251-252252-// ApplyStarDelete removes a star row and advances the cursor in a single txn.
253253-func (s *Store) ApplyStarDelete(ctx context.Context, atURI string, cur int64) error {
254254- return withTx(ctx, s.client, func(tx *ent.Tx) error {
255255- if _, err := tx.Star.Delete().Where(star.IDEQ(atURI)).Exec(ctx); err != nil {
256256- return err
257257- }
258258- return advanceCursor(ctx, tx, cur)
259259- })
260260-}
261261-262262-// StarCount returns the number of star records held for a repo's at-uri.
263263-func (s *Store) StarCount(ctx context.Context, repoAtURI string) (int, error) {
264264- return s.client.Star.Query().Where(star.SubjectEQ(repoAtURI)).Count(ctx)
265265-}
266266-267267-// UpsertStar inserts a star outside the firehose path — used by the stars
268268-// backfill worker. Idempotent on at-uri; does not advance the cursor.
269269-func (s *Store) UpsertStar(ctx context.Context, atURI, subject string, createdAt time.Time) error {
270270- err := s.client.Star.Create().
271271- SetID(atURI).
272272- SetSubject(subject).
273273- SetCreatedAt(createdAt.UnixMilli()).
274274- OnConflict().
275275- DoNothing().
276276- Exec(ctx)
277277- if err != nil && err != sql.ErrNoRows {
278278- return err
279279- }
280280- return nil
281281-}
282282-283283-// AllKnownDIDs returns every DID we've encountered: repo owners plus star
284284-// authors parsed from existing star at-uris. Uses raw SQL because ent has no
285285-// clean expression for substring + UNION.
286286-func (s *Store) AllKnownDIDs(ctx context.Context, limit int) ([]string, error) {
287287- rows, err := s.db.QueryContext(ctx, `
288288- SELECT did FROM repos
289289- UNION
290290- SELECT substr(at_uri, 6, instr(substr(at_uri, 6), '/') - 1) AS did
291291- FROM stars
292292- WHERE substr(at_uri, 6, instr(substr(at_uri, 6), '/') - 1) != ''
293293- LIMIT ?
294294- `, limit)
295295- if err != nil {
296296- return nil, err
297297- }
298298- defer rows.Close()
299299- var out []string
300300- for rows.Next() {
301301- var did string
302302- if err := rows.Scan(&did); err != nil {
303303- return nil, err
304304- }
305305- if did != "" {
306306- out = append(out, did)
307307- }
308308- }
309309- return out, rows.Err()
310310-}
311311-312312-// UpsertHandle stores or refreshes a DID→handle mapping.
313313-func (s *Store) UpsertHandle(ctx context.Context, did, h string) error {
314314- return s.client.Handle.Create().
315315- SetID(did).
316316- SetHandle(h).
317317- SetRefreshedAt(time.Now().UnixMilli()).
318318- OnConflict().
319319- UpdateNewValues().
320320- Exec(ctx)
321321-}
322322-323323-// Handle returns the cached handle for a DID, or "" if unknown.
324324-func (s *Store) Handle(ctx context.Context, did string) (string, error) {
325325- row, err := s.client.Handle.Query().Where(handle.IDEQ(did)).Only(ctx)
326326- if ent.IsNotFound(err) {
327327- return "", nil
328328- }
329329- if err != nil {
330330- return "", err
331331- }
332332- return row.Handle, nil
333333-}
334334-335335-// DIDsNeedingHandles returns owner DIDs from repos that have no cached handle
336336-// yet, or whose cached handle is older than maxAge.
337337-func (s *Store) DIDsNeedingHandles(ctx context.Context, maxAge time.Duration, limit int) ([]string, error) {
338338- cutoff := time.Now().Add(-maxAge).UnixMilli()
339339- rows, err := s.db.QueryContext(ctx, `
340340- SELECT DISTINCT r.did
341341- FROM repos r
342342- LEFT JOIN handles h ON h.did = r.did
343343- WHERE h.did IS NULL OR h.refreshed_at < ?
344344- LIMIT ?
345345- `, cutoff, limit)
346346- if err != nil {
347347- return nil, err
348348- }
349349- defer rows.Close()
350350- var out []string
351351- for rows.Next() {
352352- var did string
353353- if err := rows.Scan(&did); err != nil {
354354- return nil, err
355355- }
356356- out = append(out, did)
357357- }
358358- return out, rows.Err()
359359-}
360360-361361-// UpdateLanguages writes the languages JSON map for an existing repo.
362362-func (s *Store) UpdateLanguages(ctx context.Context, atURI string, langs map[string]int64) error {
363363- raw, err := json.Marshal(langs)
364364- if err != nil {
365365- return err
366366- }
367367- return s.client.Repo.UpdateOneID(atURI).
368368- SetLanguages(string(raw)).
369369- SetLanguageAt(time.Now().UnixMilli()).
370370- Exec(ctx)
371371-}
372372-373373-// ReposNeedingLanguages returns repos whose language enrichment is missing or
374374-// older than maxAge.
375375-func (s *Store) ReposNeedingLanguages(ctx context.Context, maxAge time.Duration, limit int) ([]Repo, error) {
376376- cutoff := time.Now().Add(-maxAge).UnixMilli()
377377- rows, err := s.db.QueryContext(ctx, `
378378- SELECT at_uri, did, rkey, name, knot
379379- FROM repos
380380- WHERE language_at IS NULL OR language_at < ?
381381- ORDER BY seen_at DESC
382382- LIMIT ?
383383- `, cutoff, limit)
384384- if err != nil {
385385- return nil, err
386386- }
387387- defer rows.Close()
388388- var out []Repo
389389- for rows.Next() {
390390- var r Repo
391391- if err := rows.Scan(&r.AtURI, &r.DID, &r.Rkey, &r.Name, &r.Knot); err != nil {
392392- return nil, err
393393- }
394394- out = append(out, r)
395395- }
396396- return out, rows.Err()
397397-}
398398-399399-// PrimaryLanguages returns the sorted set of "primary" languages observed
400400-// across all enriched repos — i.e., the highest-bytes language per repo,
401401-// deduplicated. Used to populate the language filter dropdown.
402402-func (s *Store) PrimaryLanguages(ctx context.Context) ([]string, error) {
403403- rows, err := s.db.QueryContext(ctx, `SELECT languages FROM repos WHERE languages IS NOT NULL AND languages != '' AND languages != '{}'`)
404404- if err != nil {
405405- return nil, err
406406- }
407407- defer rows.Close()
408408- seen := make(map[string]struct{})
409409- for rows.Next() {
410410- var raw string
411411- if err := rows.Scan(&raw); err != nil {
412412- return nil, err
413413- }
414414- var langs map[string]int64
415415- if err := json.Unmarshal([]byte(raw), &langs); err != nil {
416416- continue
417417- }
418418- var best string
419419- var bestN int64
420420- for k, v := range langs {
421421- if v > bestN {
422422- bestN, best = v, k
423423- }
424424- }
425425- if best != "" {
426426- seen[best] = struct{}{}
427427- }
428428- }
429429- if err := rows.Err(); err != nil {
430430- return nil, err
431431- }
432432- out := make([]string, 0, len(seen))
433433- for k := range seen {
434434- out = append(out, k)
435435- }
436436- sort.Strings(out)
437437- return out, nil
438438-}
439439-440440-// ReposFilter narrows the result of RecentRepos. Zero values mean "no
441441-// filter" so callers can mix and match.
442442-type ReposFilter struct {
443443- Language string // primary language match (post-filter in Go over JSON map)
444444- Since time.Time // repos created at or after this time (zero = no time floor)
445445- ForksOnly bool // only rows with non-empty source
446446- NoForks bool // exclude rows with non-empty source
447447-}
448448-449449-// RecentRepos returns repos matching the filter, sorted newest first. Pass
450450-// limit <= 0 for no limit. Uses raw SQL because the scalar star-count
451451-// subquery and handle LEFT JOIN are awkward to express via ent's query API.
452452-func (s *Store) RecentRepos(ctx context.Context, f ReposFilter, limit int) ([]Repo, error) {
453453- candidateLimit := -1
454454- if limit > 0 {
455455- candidateLimit = limit
456456- if f.Language != "" {
457457- candidateLimit = limit * 5
458458- }
459459- }
460460- forksOnly := 0
461461- if f.ForksOnly {
462462- forksOnly = 1
463463- }
464464- noForks := 0
465465- if f.NoForks {
466466- noForks = 1
467467- }
468468- rows, err := s.db.QueryContext(ctx, `
469469- SELECT r.at_uri, r.did, r.rkey, r.name, r.knot, r.description, r.topics,
470470- r.website, r.source, r.spindle, r.repo_did, r.created_at, r.seen_at,
471471- r.languages, r.language_at, COALESCE(h.handle, ''),
472472- (SELECT COUNT(*) FROM stars s WHERE s.subject = r.at_uri) AS star_count
473473- FROM repos r
474474- LEFT JOIN handles h ON h.did = r.did
475475- WHERE r.created_at >= ?
476476- AND (? = '' OR r.languages IS NOT NULL)
477477- AND (? = 0 OR (r.source IS NOT NULL AND r.source != ''))
478478- AND (? = 0 OR r.source IS NULL OR r.source = '')
479479- ORDER BY r.created_at DESC
480480- LIMIT ?
481481- `, f.Since.UnixMilli(), f.Language, forksOnly, noForks, candidateLimit)
482482- if err != nil {
483483- return nil, err
484484- }
485485- defer rows.Close()
486486-487487- var out []Repo
488488- for rows.Next() {
489489- r, err := scanRepo(rows)
490490- if err != nil {
491491- return nil, err
492492- }
493493- if f.Language != "" && r.Primary() != f.Language {
494494- continue
495495- }
496496- out = append(out, r)
497497- if limit > 0 && len(out) >= limit {
498498- break
499499- }
500500- }
501501- return out, rows.Err()
502502-}
503503-504504-// KnotEntry is one row in the knots view.
505505-type KnotEntry struct {
506506- Host string
507507- FirstSeen time.Time
508508- LastSeen time.Time
509509- LastOK *time.Time
510510- LastError *time.Time
511511- LastErrorMsg string
512512- ConsecutiveErrors int
513513- Disabled bool
514514- Repos int
515515-}
516516-517517-// Knots returns every knot host with bookkeeping plus a count of repos
518518-// hosted on it.
519519-func (s *Store) Knots(ctx context.Context) ([]KnotEntry, error) {
520520- knots, err := s.client.Knot.Query().
521521- Order(ent.Desc(knot.FieldLastSeen)).
522522- All(ctx)
523523- if err != nil {
524524- return nil, err
525525- }
526526- out := make([]KnotEntry, 0, len(knots))
527527- for _, k := range knots {
528528- repos, err := s.client.Repo.Query().Where(repo.KnotEQ(k.ID)).Count(ctx)
529529- if err != nil {
530530- return nil, err
531531- }
532532- entry := KnotEntry{
533533- Host: k.ID,
534534- FirstSeen: time.UnixMilli(k.FirstSeen),
535535- LastSeen: time.UnixMilli(k.LastSeen),
536536- ConsecutiveErrors: k.ConsecutiveErrors,
537537- Disabled: k.Disabled != 0,
538538- Repos: repos,
539539- }
540540- if k.LastOkAt != nil {
541541- t := time.UnixMilli(*k.LastOkAt)
542542- entry.LastOK = &t
543543- }
544544- if k.LastErrorAt != nil {
545545- t := time.UnixMilli(*k.LastErrorAt)
546546- entry.LastError = &t
547547- }
548548- if k.LastError != nil {
549549- entry.LastErrorMsg = *k.LastError
550550- }
551551- out = append(out, entry)
552552- }
553553- // Sort by repos desc, then host — done in Go since it's a derived count.
554554- for i := range out {
555555- for j := i + 1; j < len(out); j++ {
556556- if out[j].Repos > out[i].Repos || (out[j].Repos == out[i].Repos && out[j].Host < out[i].Host) {
557557- out[i], out[j] = out[j], out[i]
558558- }
559559- }
560560- }
561561- return out, nil
562562-}
563563-564564-// UpsertKnot records that we've seen this host, refreshing last_seen.
565565-func (s *Store) UpsertKnot(ctx context.Context, host string) error {
566566- if host == "" {
567567- return nil
568568- }
569569- now := time.Now().UnixMilli()
570570- return s.client.Knot.Create().
571571- SetID(host).
572572- SetFirstSeen(now).
573573- SetLastSeen(now).
574574- OnConflict().
575575- Update(func(u *ent.KnotUpsert) {
576576- u.SetLastSeen(now)
577577- }).
578578- Exec(ctx)
579579-}
580580-581581-// MarkKnotOK resets consecutive_errors on a successful call.
582582-func (s *Store) MarkKnotOK(ctx context.Context, host string) error {
583583- if host == "" {
584584- return nil
585585- }
586586- now := time.Now().UnixMilli()
587587- return s.client.Knot.Create().
588588- SetID(host).
589589- SetFirstSeen(now).
590590- SetLastSeen(now).
591591- SetLastOkAt(now).
592592- OnConflict().
593593- Update(func(u *ent.KnotUpsert) {
594594- u.SetLastSeen(now)
595595- u.SetLastOkAt(now)
596596- u.SetConsecutiveErrors(0)
597597- }).
598598- Exec(ctx)
599599-}
600600-601601-// MarkKnotError records a failure and increments consecutive_errors.
602602-func (s *Store) MarkKnotError(ctx context.Context, host, reason string) error {
603603- if host == "" {
604604- return nil
605605- }
606606- if len(reason) > 500 {
607607- reason = reason[:500]
608608- }
609609- now := time.Now().UnixMilli()
610610- // ent has no SQL-side "consecutive_errors + 1" expression in upsert, so
611611- // we drop to raw SQL for the increment. Insert path uses ent.
612612- _, err := s.db.ExecContext(ctx, `
613613- INSERT INTO knots (host, first_seen, last_seen, last_error_at, last_error, consecutive_errors, disabled)
614614- VALUES (?, ?, ?, ?, ?, 1, 0)
615615- ON CONFLICT(host) DO UPDATE SET
616616- last_error_at = excluded.last_error_at,
617617- last_error = excluded.last_error,
618618- last_seen = excluded.last_seen,
619619- consecutive_errors = consecutive_errors + 1
620620- `, host, now, now, now, reason)
621621- return err
622622-}
623623-624624-// KnotCircuitOpen reports whether further calls to a host should be skipped.
625625-func (s *Store) KnotCircuitOpen(ctx context.Context, host string, minErrors int, cooldown time.Duration) (bool, error) {
626626- if host == "" {
627627- return false, nil
628628- }
629629- k, err := s.client.Knot.Query().Where(knot.IDEQ(host)).Only(ctx)
630630- if ent.IsNotFound(err) {
631631- return false, nil
632632- }
633633- if err != nil {
634634- return false, err
635635- }
636636- if k.Disabled != 0 {
637637- return true, nil
638638- }
639639- if k.ConsecutiveErrors < minErrors || k.LastErrorAt == nil {
640640- return false, nil
641641- }
642642- return time.UnixMilli(*k.LastErrorAt).Add(cooldown).After(time.Now()), nil
643643-}
644644-645645-// SetKnotDisabled toggles the knot's disabled flag.
646646-func (s *Store) SetKnotDisabled(ctx context.Context, host string, disabled bool) error {
647647- if host == "" {
648648- return nil
649649- }
650650- v := 0
651651- if disabled {
652652- v = 1
653653- }
654654- return s.client.Knot.UpdateOneID(host).SetDisabled(v).Exec(ctx)
655655-}
656656-657657-// HandleEntry is one row in the handles view.
658658-type HandleEntry struct {
659659- DID string
660660- Handle string
661661- RefreshedAt time.Time
662662- Repos int
663663-}
664664-665665-// Handles returns every resolved DID→handle mapping with repo count. Pass
666666-// limit <= 0 for no limit.
667667-func (s *Store) Handles(ctx context.Context, limit int) ([]HandleEntry, error) {
668668- sqlLimit := -1
669669- if limit > 0 {
670670- sqlLimit = limit
671671- }
672672- rows, err := s.db.QueryContext(ctx, `
673673- SELECT h.did, h.handle, h.refreshed_at,
674674- (SELECT COUNT(*) FROM repos r WHERE r.did = h.did) AS repos
675675- FROM handles h
676676- ORDER BY repos DESC, h.handle
677677- LIMIT ?
678678- `, sqlLimit)
679679- if err != nil {
680680- return nil, err
681681- }
682682- defer rows.Close()
683683- var out []HandleEntry
684684- for rows.Next() {
685685- var (
686686- h HandleEntry
687687- refreshed int64
688688- )
689689- if err := rows.Scan(&h.DID, &h.Handle, &refreshed, &h.Repos); err != nil {
690690- return nil, err
691691- }
692692- h.RefreshedAt = time.UnixMilli(refreshed)
693693- out = append(out, h)
694694- }
695695- return out, rows.Err()
696696-}
697697-698698-// scanRepo reads a row from the SQL projection used by RecentRepos.
699699-func scanRepo(rows *sql.Rows) (Repo, error) {
700700- var (
701701- r Repo
702702- description sql.NullString
703703- topicsRaw sql.NullString
704704- website sql.NullString
705705- source sql.NullString
706706- spindle sql.NullString
707707- repoDID sql.NullString
708708- languages sql.NullString
709709- languageAt sql.NullInt64
710710- createdAt int64
711711- seenAt int64
712712- )
713713- if err := rows.Scan(&r.AtURI, &r.DID, &r.Rkey, &r.Name, &r.Knot,
714714- &description, &topicsRaw, &website, &source, &spindle, &repoDID,
715715- &createdAt, &seenAt, &languages, &languageAt, &r.Handle, &r.StarCount); err != nil {
716716- return r, err
717717- }
718718- r.Description = description.String
719719- r.Website = website.String
720720- r.Source = source.String
721721- r.Spindle = spindle.String
722722- r.RepoDID = repoDID.String
723723- r.CreatedAt = time.UnixMilli(createdAt)
724724- r.SeenAt = time.UnixMilli(seenAt)
725725- if topicsRaw.Valid && topicsRaw.String != "" && topicsRaw.String != "null" {
726726- _ = json.Unmarshal([]byte(topicsRaw.String), &r.Topics)
727727- }
728728- if languages.Valid && languages.String != "" {
729729- _ = json.Unmarshal([]byte(languages.String), &r.Languages)
730730- }
731731- if languageAt.Valid {
732732- t := time.UnixMilli(languageAt.Int64)
733733- r.LanguageAt = &t
734734- }
735735- return r, nil
736736-}
7379473895// withTx runs fn in a transaction, committing on success or rolling back on error.
73996func withTx(ctx context.Context, client *ent.Client, fn func(*ent.Tx) error) error {