break it up into smaller pieces.
+698
-650
Diff
round #0
+6
-6
firehose/worker.go
+6
-6
firehose/worker.go
···
27
27
}
28
28
29
29
type WorkerConfig[T comparable] struct {
30
-
Name string // logged on errors / drops
31
-
Logger *slog.Logger // required
32
-
Process func(ctx context.Context, item T) error // required
33
-
QueueSize int // default 256
34
-
SeenTTL time.Duration // 0 = forever
35
-
MinDelay time.Duration // gap between processed items
30
+
Name string // logged on errors / drops
31
+
Logger *slog.Logger // required
32
+
Process func(ctx context.Context, item T) error // required
33
+
QueueSize int // default 256
34
+
SeenTTL time.Duration // 0 = forever
35
+
MinDelay time.Duration // gap between processed items
36
36
}
37
37
38
38
func NewWorker[T comparable](cfg WorkerConfig[T]) *Worker[T] {
+47
store/cursor.go
+47
store/cursor.go
···
1
+
package store
2
+
3
+
import (
4
+
"context"
5
+
"time"
6
+
7
+
"tangled.sh/chown.de/tangled-repo-firehose/ent"
8
+
"tangled.sh/chown.de/tangled-repo-firehose/ent/cursor"
9
+
)
10
+
11
+
// GetCursor returns the persisted Jetstream cursor (unix microseconds), or 0
12
+
// if none has been recorded yet.
13
+
func (s *Store) GetCursor(ctx context.Context) (int64, error) {
14
+
row, err := s.client.Cursor.Query().Where(cursor.IDEQ(1)).Only(ctx)
15
+
if ent.IsNotFound(err) {
16
+
return 0, nil
17
+
}
18
+
if err != nil {
19
+
return 0, err
20
+
}
21
+
return row.Cursor, nil
22
+
}
23
+
24
+
// AdvanceCursor moves the cursor forward without applying any record. Used
25
+
// for non-tracked event kinds so we don't replay them on restart.
26
+
func (s *Store) AdvanceCursor(ctx context.Context, cur int64) error {
27
+
return withTx(ctx, s.client, func(tx *ent.Tx) error {
28
+
return advanceCursor(ctx, tx, cur)
29
+
})
30
+
}
31
+
32
+
func advanceCursor(ctx context.Context, tx *ent.Tx, cur int64) error {
33
+
// Never move the cursor backward — out-of-order replays would otherwise
34
+
// rewind progress.
35
+
existing, err := tx.Cursor.Query().Where(cursor.IDEQ(1)).Only(ctx)
36
+
now := time.Now().UnixMilli()
37
+
if ent.IsNotFound(err) {
38
+
return tx.Cursor.Create().SetID(1).SetCursor(cur).SetUpdated(now).Exec(ctx)
39
+
}
40
+
if err != nil {
41
+
return err
42
+
}
43
+
if cur <= existing.Cursor {
44
+
return nil
45
+
}
46
+
return tx.Cursor.UpdateOneID(1).SetCursor(cur).SetUpdated(now).Exec(ctx)
47
+
}
+128
store/handle.go
+128
store/handle.go
···
1
+
package store
2
+
3
+
import (
4
+
"context"
5
+
"time"
6
+
7
+
"tangled.sh/chown.de/tangled-repo-firehose/ent"
8
+
"tangled.sh/chown.de/tangled-repo-firehose/ent/handle"
9
+
)
10
+
11
+
// AllKnownDIDs returns every DID we've encountered: repo owners plus star
12
+
// authors parsed from existing star at-uris. Uses raw SQL because ent has no
13
+
// clean expression for substring + UNION.
14
+
func (s *Store) AllKnownDIDs(ctx context.Context, limit int) ([]string, error) {
15
+
rows, err := s.db.QueryContext(ctx, `
16
+
SELECT did FROM repos
17
+
UNION
18
+
SELECT substr(at_uri, 6, instr(substr(at_uri, 6), '/') - 1) AS did
19
+
FROM stars
20
+
WHERE substr(at_uri, 6, instr(substr(at_uri, 6), '/') - 1) != ''
21
+
LIMIT ?
22
+
`, limit)
23
+
if err != nil {
24
+
return nil, err
25
+
}
26
+
defer rows.Close()
27
+
var out []string
28
+
for rows.Next() {
29
+
var did string
30
+
if err := rows.Scan(&did); err != nil {
31
+
return nil, err
32
+
}
33
+
if did != "" {
34
+
out = append(out, did)
35
+
}
36
+
}
37
+
return out, rows.Err()
38
+
}
39
+
40
+
// UpsertHandle stores or refreshes a DID→handle mapping.
41
+
func (s *Store) UpsertHandle(ctx context.Context, did, h string) error {
42
+
return s.client.Handle.Create().
43
+
SetID(did).
44
+
SetHandle(h).
45
+
SetRefreshedAt(time.Now().UnixMilli()).
46
+
OnConflict().
47
+
UpdateNewValues().
48
+
Exec(ctx)
49
+
}
50
+
51
+
// Handle returns the cached handle for a DID, or "" if unknown.
52
+
func (s *Store) Handle(ctx context.Context, did string) (string, error) {
53
+
row, err := s.client.Handle.Query().Where(handle.IDEQ(did)).Only(ctx)
54
+
if ent.IsNotFound(err) {
55
+
return "", nil
56
+
}
57
+
if err != nil {
58
+
return "", err
59
+
}
60
+
return row.Handle, nil
61
+
}
62
+
63
+
// DIDsNeedingHandles returns owner DIDs from repos that have no cached handle
64
+
// yet, or whose cached handle is older than maxAge.
65
+
func (s *Store) DIDsNeedingHandles(ctx context.Context, maxAge time.Duration, limit int) ([]string, error) {
66
+
cutoff := time.Now().Add(-maxAge).UnixMilli()
67
+
rows, err := s.db.QueryContext(ctx, `
68
+
SELECT DISTINCT r.did
69
+
FROM repos r
70
+
LEFT JOIN handles h ON h.did = r.did
71
+
WHERE h.did IS NULL OR h.refreshed_at < ?
72
+
LIMIT ?
73
+
`, cutoff, limit)
74
+
if err != nil {
75
+
return nil, err
76
+
}
77
+
defer rows.Close()
78
+
var out []string
79
+
for rows.Next() {
80
+
var did string
81
+
if err := rows.Scan(&did); err != nil {
82
+
return nil, err
83
+
}
84
+
out = append(out, did)
85
+
}
86
+
return out, rows.Err()
87
+
}
88
+
89
+
// HandleEntry is one row in the handles view.
90
+
type HandleEntry struct {
91
+
DID string
92
+
Handle string
93
+
RefreshedAt time.Time
94
+
Repos int
95
+
}
96
+
97
+
// Handles returns every resolved DID→handle mapping with repo count. Pass
98
+
// limit <= 0 for no limit.
99
+
func (s *Store) Handles(ctx context.Context, limit int) ([]HandleEntry, error) {
100
+
sqlLimit := -1
101
+
if limit > 0 {
102
+
sqlLimit = limit
103
+
}
104
+
rows, err := s.db.QueryContext(ctx, `
105
+
SELECT h.did, h.handle, h.refreshed_at,
106
+
(SELECT COUNT(*) FROM repos r WHERE r.did = h.did) AS repos
107
+
FROM handles h
108
+
ORDER BY repos DESC, h.handle
109
+
LIMIT ?
110
+
`, sqlLimit)
111
+
if err != nil {
112
+
return nil, err
113
+
}
114
+
defer rows.Close()
115
+
var out []HandleEntry
116
+
for rows.Next() {
117
+
var (
118
+
h HandleEntry
119
+
refreshed int64
120
+
)
121
+
if err := rows.Scan(&h.DID, &h.Handle, &refreshed, &h.Repos); err != nil {
122
+
return nil, err
123
+
}
124
+
h.RefreshedAt = time.UnixMilli(refreshed)
125
+
out = append(out, h)
126
+
}
127
+
return out, rows.Err()
128
+
}
+163
store/knot.go
+163
store/knot.go
···
1
+
package store
2
+
3
+
import (
4
+
"context"
5
+
"time"
6
+
7
+
"tangled.sh/chown.de/tangled-repo-firehose/ent"
8
+
"tangled.sh/chown.de/tangled-repo-firehose/ent/knot"
9
+
"tangled.sh/chown.de/tangled-repo-firehose/ent/repo"
10
+
)
11
+
12
+
// KnotEntry is one row in the knots view.
13
+
type KnotEntry struct {
14
+
Host string
15
+
FirstSeen time.Time
16
+
LastSeen time.Time
17
+
LastOK *time.Time
18
+
LastError *time.Time
19
+
LastErrorMsg string
20
+
ConsecutiveErrors int
21
+
Disabled bool
22
+
Repos int
23
+
}
24
+
25
+
// Knots returns every knot host with bookkeeping plus a count of repos
26
+
// hosted on it.
27
+
func (s *Store) Knots(ctx context.Context) ([]KnotEntry, error) {
28
+
knots, err := s.client.Knot.Query().
29
+
Order(ent.Desc(knot.FieldLastSeen)).
30
+
All(ctx)
31
+
if err != nil {
32
+
return nil, err
33
+
}
34
+
out := make([]KnotEntry, 0, len(knots))
35
+
for _, k := range knots {
36
+
repos, err := s.client.Repo.Query().Where(repo.KnotEQ(k.ID)).Count(ctx)
37
+
if err != nil {
38
+
return nil, err
39
+
}
40
+
entry := KnotEntry{
41
+
Host: k.ID,
42
+
FirstSeen: time.UnixMilli(k.FirstSeen),
43
+
LastSeen: time.UnixMilli(k.LastSeen),
44
+
ConsecutiveErrors: k.ConsecutiveErrors,
45
+
Disabled: k.Disabled != 0,
46
+
Repos: repos,
47
+
}
48
+
if k.LastOkAt != nil {
49
+
t := time.UnixMilli(*k.LastOkAt)
50
+
entry.LastOK = &t
51
+
}
52
+
if k.LastErrorAt != nil {
53
+
t := time.UnixMilli(*k.LastErrorAt)
54
+
entry.LastError = &t
55
+
}
56
+
if k.LastError != nil {
57
+
entry.LastErrorMsg = *k.LastError
58
+
}
59
+
out = append(out, entry)
60
+
}
61
+
// Sort by repos desc, then host — done in Go since it's a derived count.
62
+
for i := range out {
63
+
for j := i + 1; j < len(out); j++ {
64
+
if out[j].Repos > out[i].Repos || (out[j].Repos == out[i].Repos && out[j].Host < out[i].Host) {
65
+
out[i], out[j] = out[j], out[i]
66
+
}
67
+
}
68
+
}
69
+
return out, nil
70
+
}
71
+
72
+
// UpsertKnot records that we've seen this host, refreshing last_seen.
73
+
func (s *Store) UpsertKnot(ctx context.Context, host string) error {
74
+
if host == "" {
75
+
return nil
76
+
}
77
+
now := time.Now().UnixMilli()
78
+
return s.client.Knot.Create().
79
+
SetID(host).
80
+
SetFirstSeen(now).
81
+
SetLastSeen(now).
82
+
OnConflict().
83
+
Update(func(u *ent.KnotUpsert) {
84
+
u.SetLastSeen(now)
85
+
}).
86
+
Exec(ctx)
87
+
}
88
+
89
+
// MarkKnotOK resets consecutive_errors on a successful call.
90
+
func (s *Store) MarkKnotOK(ctx context.Context, host string) error {
91
+
if host == "" {
92
+
return nil
93
+
}
94
+
now := time.Now().UnixMilli()
95
+
return s.client.Knot.Create().
96
+
SetID(host).
97
+
SetFirstSeen(now).
98
+
SetLastSeen(now).
99
+
SetLastOkAt(now).
100
+
OnConflict().
101
+
Update(func(u *ent.KnotUpsert) {
102
+
u.SetLastSeen(now)
103
+
u.SetLastOkAt(now)
104
+
u.SetConsecutiveErrors(0)
105
+
}).
106
+
Exec(ctx)
107
+
}
108
+
109
+
// MarkKnotError records a failure and increments consecutive_errors.
110
+
func (s *Store) MarkKnotError(ctx context.Context, host, reason string) error {
111
+
if host == "" {
112
+
return nil
113
+
}
114
+
if len(reason) > 500 {
115
+
reason = reason[:500]
116
+
}
117
+
now := time.Now().UnixMilli()
118
+
// ent has no SQL-side "consecutive_errors + 1" expression in upsert, so
119
+
// we drop to raw SQL for the increment. Insert path uses ent.
120
+
_, err := s.db.ExecContext(ctx, `
121
+
INSERT INTO knots (host, first_seen, last_seen, last_error_at, last_error, consecutive_errors, disabled)
122
+
VALUES (?, ?, ?, ?, ?, 1, 0)
123
+
ON CONFLICT(host) DO UPDATE SET
124
+
last_error_at = excluded.last_error_at,
125
+
last_error = excluded.last_error,
126
+
last_seen = excluded.last_seen,
127
+
consecutive_errors = consecutive_errors + 1
128
+
`, host, now, now, now, reason)
129
+
return err
130
+
}
131
+
132
+
// KnotCircuitOpen reports whether further calls to a host should be skipped.
133
+
func (s *Store) KnotCircuitOpen(ctx context.Context, host string, minErrors int, cooldown time.Duration) (bool, error) {
134
+
if host == "" {
135
+
return false, nil
136
+
}
137
+
k, err := s.client.Knot.Query().Where(knot.IDEQ(host)).Only(ctx)
138
+
if ent.IsNotFound(err) {
139
+
return false, nil
140
+
}
141
+
if err != nil {
142
+
return false, err
143
+
}
144
+
if k.Disabled != 0 {
145
+
return true, nil
146
+
}
147
+
if k.ConsecutiveErrors < minErrors || k.LastErrorAt == nil {
148
+
return false, nil
149
+
}
150
+
return time.UnixMilli(*k.LastErrorAt).Add(cooldown).After(time.Now()), nil
151
+
}
152
+
153
+
// SetKnotDisabled toggles the knot's disabled flag.
154
+
func (s *Store) SetKnotDisabled(ctx context.Context, host string, disabled bool) error {
155
+
if host == "" {
156
+
return nil
157
+
}
158
+
v := 0
159
+
if disabled {
160
+
v = 1
161
+
}
162
+
return s.client.Knot.UpdateOneID(host).SetDisabled(v).Exec(ctx)
163
+
}
+87
store/lang.go
+87
store/lang.go
···
1
+
package store
2
+
3
+
import (
4
+
"context"
5
+
"encoding/json"
6
+
"sort"
7
+
"time"
8
+
)
9
+
10
+
// UpdateLanguages writes the languages JSON map for an existing repo.
11
+
func (s *Store) UpdateLanguages(ctx context.Context, atURI string, langs map[string]int64) error {
12
+
raw, err := json.Marshal(langs)
13
+
if err != nil {
14
+
return err
15
+
}
16
+
return s.client.Repo.UpdateOneID(atURI).
17
+
SetLanguages(string(raw)).
18
+
SetLanguageAt(time.Now().UnixMilli()).
19
+
Exec(ctx)
20
+
}
21
+
22
+
// ReposNeedingLanguages returns repos whose language enrichment is missing or
23
+
// older than maxAge.
24
+
func (s *Store) ReposNeedingLanguages(ctx context.Context, maxAge time.Duration, limit int) ([]Repo, error) {
25
+
cutoff := time.Now().Add(-maxAge).UnixMilli()
26
+
rows, err := s.db.QueryContext(ctx, `
27
+
SELECT at_uri, did, rkey, name, knot
28
+
FROM repos
29
+
WHERE language_at IS NULL OR language_at < ?
30
+
ORDER BY seen_at DESC
31
+
LIMIT ?
32
+
`, cutoff, limit)
33
+
if err != nil {
34
+
return nil, err
35
+
}
36
+
defer rows.Close()
37
+
var out []Repo
38
+
for rows.Next() {
39
+
var r Repo
40
+
if err := rows.Scan(&r.AtURI, &r.DID, &r.Rkey, &r.Name, &r.Knot); err != nil {
41
+
return nil, err
42
+
}
43
+
out = append(out, r)
44
+
}
45
+
return out, rows.Err()
46
+
}
47
+
48
+
// PrimaryLanguages returns the sorted set of "primary" languages observed
49
+
// across all enriched repos — i.e., the highest-bytes language per repo,
50
+
// deduplicated. Used to populate the language filter dropdown.
51
+
func (s *Store) PrimaryLanguages(ctx context.Context) ([]string, error) {
52
+
rows, err := s.db.QueryContext(ctx, `SELECT languages FROM repos WHERE languages IS NOT NULL AND languages != '' AND languages != '{}'`)
53
+
if err != nil {
54
+
return nil, err
55
+
}
56
+
defer rows.Close()
57
+
seen := make(map[string]struct{})
58
+
for rows.Next() {
59
+
var raw string
60
+
if err := rows.Scan(&raw); err != nil {
61
+
return nil, err
62
+
}
63
+
var langs map[string]int64
64
+
if err := json.Unmarshal([]byte(raw), &langs); err != nil {
65
+
continue
66
+
}
67
+
var best string
68
+
var bestN int64
69
+
for k, v := range langs {
70
+
if v > bestN {
71
+
bestN, best = v, k
72
+
}
73
+
}
74
+
if best != "" {
75
+
seen[best] = struct{}{}
76
+
}
77
+
}
78
+
if err := rows.Err(); err != nil {
79
+
return nil, err
80
+
}
81
+
out := make([]string, 0, len(seen))
82
+
for k := range seen {
83
+
out = append(out, k)
84
+
}
85
+
sort.Strings(out)
86
+
return out, nil
87
+
}
+205
store/repo.go
+205
store/repo.go
···
1
+
package store
2
+
3
+
import (
4
+
"context"
5
+
"database/sql"
6
+
"encoding/json"
7
+
"time"
8
+
9
+
"tangled.sh/chown.de/tangled-repo-firehose/ent"
10
+
"tangled.sh/chown.de/tangled-repo-firehose/ent/discoveryfailed"
11
+
"tangled.sh/chown.de/tangled-repo-firehose/ent/repo"
12
+
)
13
+
14
+
// ApplyRepoUpsert writes (or replaces) a repo row and advances the cursor in
15
+
// a single transaction. Languages column is intentionally untouched so async
16
+
// language enrichment isn't clobbered by a record update.
17
+
func (s *Store) ApplyRepoUpsert(ctx context.Context, r Repo, cur int64) error {
18
+
return withTx(ctx, s.client, func(tx *ent.Tx) error {
19
+
if err := upsertRepo(ctx, tx, r); err != nil {
20
+
return err
21
+
}
22
+
return advanceCursor(ctx, tx, cur)
23
+
})
24
+
}
25
+
26
+
// ApplyRepoDelete removes a repo and advances the cursor in a single txn.
27
+
func (s *Store) ApplyRepoDelete(ctx context.Context, atURI string, cur int64) error {
28
+
return withTx(ctx, s.client, func(tx *ent.Tx) error {
29
+
if _, err := tx.Repo.Delete().Where(repo.IDEQ(atURI)).Exec(ctx); err != nil {
30
+
return err
31
+
}
32
+
return advanceCursor(ctx, tx, cur)
33
+
})
34
+
}
35
+
36
+
// UpsertRepo writes (or replaces) a repo row outside the firehose path —
37
+
// used by discovery when a star references a repo we haven't seen yet. Does
38
+
// NOT touch the cursor.
39
+
func (s *Store) UpsertRepo(ctx context.Context, r Repo) error {
40
+
tx, err := s.client.Tx(ctx)
41
+
if err != nil {
42
+
return err
43
+
}
44
+
defer func() { _ = tx.Rollback() }()
45
+
if err := upsertRepo(ctx, tx, r); err != nil {
46
+
return err
47
+
}
48
+
return tx.Commit()
49
+
}
50
+
51
+
func upsertRepo(ctx context.Context, tx *ent.Tx, r Repo) error {
52
+
topicsJSON, err := json.Marshal(r.Topics)
53
+
if err != nil {
54
+
return err
55
+
}
56
+
topics := string(topicsJSON)
57
+
create := tx.Repo.Create().
58
+
SetID(r.AtURI).
59
+
SetDid(r.DID).
60
+
SetRkey(r.Rkey).
61
+
SetName(r.Name).
62
+
SetKnot(r.Knot).
63
+
SetCreatedAt(r.CreatedAt.UnixMilli()).
64
+
SetSeenAt(r.SeenAt.UnixMilli()).
65
+
SetTopics(topics).
66
+
SetDescription(r.Description).
67
+
SetWebsite(r.Website).
68
+
SetSource(r.Source).
69
+
SetSpindle(r.Spindle).
70
+
SetRepoDid(r.RepoDID)
71
+
return create.OnConflict().
72
+
UpdateNewValues().
73
+
Exec(ctx)
74
+
}
75
+
76
+
// HasRepo reports whether a repo at-uri already exists in the index.
77
+
func (s *Store) HasRepo(ctx context.Context, atURI string) (bool, error) {
78
+
return s.client.Repo.Query().Where(repo.IDEQ(atURI)).Exist(ctx)
79
+
}
80
+
81
+
// MarkDiscoveryFailed records that a PDS returned RecordNotFound for this
82
+
// at-uri so we don't keep retrying every backfill cycle.
83
+
func (s *Store) MarkDiscoveryFailed(ctx context.Context, atURI, reason string) error {
84
+
return s.client.DiscoveryFailed.Create().
85
+
SetID(atURI).
86
+
SetFailedAt(time.Now().UnixMilli()).
87
+
SetReason(reason).
88
+
OnConflict().
89
+
UpdateNewValues().
90
+
Exec(ctx)
91
+
}
92
+
93
+
// DiscoveryFailedRecently reports whether a previous discovery attempt failed
94
+
// within the given window — used to skip known-deleted records on retry.
95
+
func (s *Store) DiscoveryFailedRecently(ctx context.Context, atURI string, maxAge time.Duration) (bool, error) {
96
+
cutoff := time.Now().Add(-maxAge).UnixMilli()
97
+
return s.client.DiscoveryFailed.Query().
98
+
Where(discoveryfailed.IDEQ(atURI), discoveryfailed.FailedAtGTE(cutoff)).
99
+
Exist(ctx)
100
+
}
101
+
102
+
// ReposFilter narrows the result of RecentRepos. Zero values mean "no
103
+
// filter" so callers can mix and match.
104
+
type ReposFilter struct {
105
+
Language string // primary language match (post-filter in Go over JSON map)
106
+
Since time.Time // repos created at or after this time (zero = no time floor)
107
+
ForksOnly bool // only rows with non-empty source
108
+
NoForks bool // exclude rows with non-empty source
109
+
}
110
+
111
+
// RecentRepos returns repos matching the filter, sorted newest first. Pass
112
+
// limit <= 0 for no limit. Uses raw SQL because the scalar star-count
113
+
// subquery and handle LEFT JOIN are awkward to express via ent's query API.
114
+
func (s *Store) RecentRepos(ctx context.Context, f ReposFilter, limit int) ([]Repo, error) {
115
+
candidateLimit := -1
116
+
if limit > 0 {
117
+
candidateLimit = limit
118
+
if f.Language != "" {
119
+
candidateLimit = limit * 5
120
+
}
121
+
}
122
+
forksOnly := 0
123
+
if f.ForksOnly {
124
+
forksOnly = 1
125
+
}
126
+
noForks := 0
127
+
if f.NoForks {
128
+
noForks = 1
129
+
}
130
+
131
+
rows, err := s.db.QueryContext(ctx, `
132
+
SELECT r.at_uri, r.did, r.rkey, r.name, r.knot, r.description, r.topics,
133
+
r.website, r.source, r.spindle, r.repo_did, r.created_at, r.seen_at,
134
+
r.languages, r.language_at, COALESCE(h.handle, ''),
135
+
(SELECT COUNT(*) FROM stars s WHERE s.subject = r.at_uri) AS star_count
136
+
FROM repos r
137
+
LEFT JOIN handles h ON h.did = r.did
138
+
WHERE r.created_at >= ?
139
+
AND (? = '' OR r.languages IS NOT NULL)
140
+
AND (? = 0 OR (r.source IS NOT NULL AND r.source != ''))
141
+
AND (? = 0 OR r.source IS NULL OR r.source = '')
142
+
ORDER BY r.created_at DESC
143
+
LIMIT ?
144
+
`, f.Since.UnixMilli(), f.Language, forksOnly, noForks, candidateLimit)
145
+
if err != nil {
146
+
return nil, err
147
+
}
148
+
defer rows.Close()
149
+
150
+
var out []Repo
151
+
for rows.Next() {
152
+
r, err := scanRepo(rows)
153
+
if err != nil {
154
+
return nil, err
155
+
}
156
+
if f.Language != "" && r.Primary() != f.Language {
157
+
continue
158
+
}
159
+
out = append(out, r)
160
+
if limit > 0 && len(out) >= limit {
161
+
break
162
+
}
163
+
}
164
+
return out, rows.Err()
165
+
}
166
+
167
+
// scanRepo reads a row from the SQL projection used by RecentRepos.
168
+
func scanRepo(rows *sql.Rows) (Repo, error) {
169
+
var (
170
+
r Repo
171
+
description sql.NullString
172
+
topicsRaw sql.NullString
173
+
website sql.NullString
174
+
source sql.NullString
175
+
spindle sql.NullString
176
+
repoDID sql.NullString
177
+
languages sql.NullString
178
+
languageAt sql.NullInt64
179
+
createdAt int64
180
+
seenAt int64
181
+
)
182
+
if err := rows.Scan(&r.AtURI, &r.DID, &r.Rkey, &r.Name, &r.Knot,
183
+
&description, &topicsRaw, &website, &source, &spindle, &repoDID,
184
+
&createdAt, &seenAt, &languages, &languageAt, &r.Handle, &r.StarCount); err != nil {
185
+
return r, err
186
+
}
187
+
r.Description = description.String
188
+
r.Website = website.String
189
+
r.Source = source.String
190
+
r.Spindle = spindle.String
191
+
r.RepoDID = repoDID.String
192
+
r.CreatedAt = time.UnixMilli(createdAt)
193
+
r.SeenAt = time.UnixMilli(seenAt)
194
+
if topicsRaw.Valid && topicsRaw.String != "" && topicsRaw.String != "null" {
195
+
_ = json.Unmarshal([]byte(topicsRaw.String), &r.Topics)
196
+
}
197
+
if languages.Valid && languages.String != "" {
198
+
_ = json.Unmarshal([]byte(languages.String), &r.Languages)
199
+
}
200
+
if languageAt.Valid {
201
+
t := time.UnixMilli(languageAt.Int64)
202
+
r.LanguageAt = &t
203
+
}
204
+
return r, nil
205
+
}
+61
store/star.go
+61
store/star.go
···
1
+
package store
2
+
3
+
import (
4
+
"context"
5
+
"database/sql"
6
+
"time"
7
+
8
+
"tangled.sh/chown.de/tangled-repo-firehose/ent"
9
+
"tangled.sh/chown.de/tangled-repo-firehose/ent/star"
10
+
)
11
+
12
+
// ApplyStarCreate inserts a star (idempotent on at_uri) and advances the
13
+
// cursor in a single transaction.
14
+
func (s *Store) ApplyStarCreate(ctx context.Context, atURI, subject string, createdAt time.Time, cur int64) error {
15
+
return withTx(ctx, s.client, func(tx *ent.Tx) error {
16
+
err := tx.Star.Create().
17
+
SetID(atURI).
18
+
SetSubject(subject).
19
+
SetCreatedAt(createdAt.UnixMilli()).
20
+
OnConflict().
21
+
DoNothing().
22
+
Exec(ctx)
23
+
// ent's DoNothing returns sql.ErrNoRows when nothing was inserted —
24
+
// safe to ignore; the row already exists.
25
+
if err != nil && err != sql.ErrNoRows {
26
+
return err
27
+
}
28
+
return advanceCursor(ctx, tx, cur)
29
+
})
30
+
}
31
+
32
+
// ApplyStarDelete removes a star row and advances the cursor in a single txn.
33
+
func (s *Store) ApplyStarDelete(ctx context.Context, atURI string, cur int64) error {
34
+
return withTx(ctx, s.client, func(tx *ent.Tx) error {
35
+
if _, err := tx.Star.Delete().Where(star.IDEQ(atURI)).Exec(ctx); err != nil {
36
+
return err
37
+
}
38
+
return advanceCursor(ctx, tx, cur)
39
+
})
40
+
}
41
+
42
+
// StarCount returns the number of star records held for a repo's at-uri.
43
+
func (s *Store) StarCount(ctx context.Context, repoAtURI string) (int, error) {
44
+
return s.client.Star.Query().Where(star.SubjectEQ(repoAtURI)).Count(ctx)
45
+
}
46
+
47
+
// UpsertStar inserts a star outside the firehose path — used by the stars
48
+
// backfill worker. Idempotent on at-uri; does not advance the cursor.
49
+
func (s *Store) UpsertStar(ctx context.Context, atURI, subject string, createdAt time.Time) error {
50
+
err := s.client.Star.Create().
51
+
SetID(atURI).
52
+
SetSubject(subject).
53
+
SetCreatedAt(createdAt.UnixMilli()).
54
+
OnConflict().
55
+
DoNothing().
56
+
Exec(ctx)
57
+
if err != nil && err != sql.ErrNoRows {
58
+
return err
59
+
}
60
+
return nil
61
+
}
+1
-644
store/store.go
+1
-644
store/store.go
···
1
1
// Package store is the persistence layer for the Tangled repo firehose
2
2
// aggregator. Schema is defined in ent/schema/ and generated via
3
-
// `go generate ./ent`. Auto-migration runs on Open and is additive only —
4
-
// existing data is never dropped.
3
+
// `go generate ./ent`. Auto-migration runs on Open.
5
4
package store
6
5
7
6
import (
8
7
"context"
9
8
"database/sql"
10
-
"encoding/json"
11
9
"fmt"
12
10
"io"
13
-
"sort"
14
11
"time"
15
12
16
13
"entgo.io/ent/dialect"
17
14
entsql "entgo.io/ent/dialect/sql"
18
15
19
16
"tangled.sh/chown.de/tangled-repo-firehose/ent"
20
-
"tangled.sh/chown.de/tangled-repo-firehose/ent/cursor"
21
-
"tangled.sh/chown.de/tangled-repo-firehose/ent/discoveryfailed"
22
-
"tangled.sh/chown.de/tangled-repo-firehose/ent/handle"
23
-
"tangled.sh/chown.de/tangled-repo-firehose/ent/knot"
24
-
"tangled.sh/chown.de/tangled-repo-firehose/ent/repo"
25
-
"tangled.sh/chown.de/tangled-repo-firehose/ent/star"
26
17
27
18
_ "modernc.org/sqlite"
28
19
)
29
20
30
21
// dsn assembles the modernc.org/sqlite connection string. PRAGMAs go in the
31
22
// query string so they're applied per-connection (the pool may open many).
32
-
// ent's migrator checks foreign_keys at startup so it must be set here.
33
23
func dsn(path string) string {
34
24
return path + "?_pragma=journal_mode(WAL)&_pragma=synchronous(NORMAL)&_pragma=busy_timeout(5000)&_pragma=foreign_keys(1)"
35
25
}
···
48
38
return client.Schema.WriteTo(ctx, w)
49
39
}
50
40
51
-
52
41
// Repo mirrors a sh.tangled.repo record plus enrichment columns.
53
42
type Repo struct {
54
43
AtURI string
···
103
92
104
93
func (s *Store) Close() error { return s.client.Close() }
105
94
106
-
// GetCursor returns the persisted Jetstream cursor (unix microseconds), or 0
107
-
// if none has been recorded yet.
108
-
func (s *Store) GetCursor(ctx context.Context) (int64, error) {
109
-
row, err := s.client.Cursor.Query().Where(cursor.IDEQ(1)).Only(ctx)
110
-
if ent.IsNotFound(err) {
111
-
return 0, nil
112
-
}
113
-
if err != nil {
114
-
return 0, err
115
-
}
116
-
return row.Cursor, nil
117
-
}
118
-
119
-
// ApplyRepoUpsert writes (or replaces) a repo row and advances the cursor in
120
-
// a single transaction. Languages column is intentionally untouched so async
121
-
// language enrichment isn't clobbered by a record update.
122
-
func (s *Store) ApplyRepoUpsert(ctx context.Context, r Repo, cur int64) error {
123
-
return withTx(ctx, s.client, func(tx *ent.Tx) error {
124
-
if err := upsertRepo(ctx, tx, r); err != nil {
125
-
return err
126
-
}
127
-
return advanceCursor(ctx, tx, cur)
128
-
})
129
-
}
130
-
131
-
// ApplyRepoDelete removes a repo and advances the cursor in a single txn.
132
-
func (s *Store) ApplyRepoDelete(ctx context.Context, atURI string, cur int64) error {
133
-
return withTx(ctx, s.client, func(tx *ent.Tx) error {
134
-
if _, err := tx.Repo.Delete().Where(repo.IDEQ(atURI)).Exec(ctx); err != nil {
135
-
return err
136
-
}
137
-
return advanceCursor(ctx, tx, cur)
138
-
})
139
-
}
140
-
141
-
// AdvanceCursor moves the cursor forward without applying any record. Used
142
-
// for non-tracked event kinds so we don't replay them on restart.
143
-
func (s *Store) AdvanceCursor(ctx context.Context, cur int64) error {
144
-
return withTx(ctx, s.client, func(tx *ent.Tx) error {
145
-
return advanceCursor(ctx, tx, cur)
146
-
})
147
-
}
148
-
149
-
func advanceCursor(ctx context.Context, tx *ent.Tx, cur int64) error {
150
-
// Never move the cursor backward — out-of-order replays would otherwise
151
-
// rewind progress.
152
-
existing, err := tx.Cursor.Query().Where(cursor.IDEQ(1)).Only(ctx)
153
-
now := time.Now().UnixMilli()
154
-
if ent.IsNotFound(err) {
155
-
return tx.Cursor.Create().SetID(1).SetCursor(cur).SetUpdated(now).Exec(ctx)
156
-
}
157
-
if err != nil {
158
-
return err
159
-
}
160
-
if cur <= existing.Cursor {
161
-
return nil
162
-
}
163
-
return tx.Cursor.UpdateOneID(1).SetCursor(cur).SetUpdated(now).Exec(ctx)
164
-
}
165
-
166
-
// UpsertRepo writes (or replaces) a repo row outside the firehose path —
167
-
// used by discovery when a star references a repo we haven't seen yet. Does
168
-
// NOT touch the cursor.
169
-
func (s *Store) UpsertRepo(ctx context.Context, r Repo) error {
170
-
tx, err := s.client.Tx(ctx)
171
-
if err != nil {
172
-
return err
173
-
}
174
-
defer func() { _ = tx.Rollback() }()
175
-
if err := upsertRepo(ctx, tx, r); err != nil {
176
-
return err
177
-
}
178
-
return tx.Commit()
179
-
}
180
-
181
-
func upsertRepo(ctx context.Context, tx *ent.Tx, r Repo) error {
182
-
topicsJSON, err := json.Marshal(r.Topics)
183
-
if err != nil {
184
-
return err
185
-
}
186
-
topics := string(topicsJSON)
187
-
create := tx.Repo.Create().
188
-
SetID(r.AtURI).
189
-
SetDid(r.DID).
190
-
SetRkey(r.Rkey).
191
-
SetName(r.Name).
192
-
SetKnot(r.Knot).
193
-
SetCreatedAt(r.CreatedAt.UnixMilli()).
194
-
SetSeenAt(r.SeenAt.UnixMilli()).
195
-
SetTopics(topics).
196
-
SetDescription(r.Description).
197
-
SetWebsite(r.Website).
198
-
SetSource(r.Source).
199
-
SetSpindle(r.Spindle).
200
-
SetRepoDid(r.RepoDID)
201
-
return create.OnConflict().
202
-
UpdateNewValues().
203
-
Exec(ctx)
204
-
}
205
-
206
-
// HasRepo reports whether a repo at-uri already exists in the index.
207
-
func (s *Store) HasRepo(ctx context.Context, atURI string) (bool, error) {
208
-
return s.client.Repo.Query().Where(repo.IDEQ(atURI)).Exist(ctx)
209
-
}
210
-
211
-
// MarkDiscoveryFailed records that a PDS returned RecordNotFound for this
212
-
// at-uri so we don't keep retrying every backfill cycle.
213
-
func (s *Store) MarkDiscoveryFailed(ctx context.Context, atURI, reason string) error {
214
-
return s.client.DiscoveryFailed.Create().
215
-
SetID(atURI).
216
-
SetFailedAt(time.Now().UnixMilli()).
217
-
SetReason(reason).
218
-
OnConflict().
219
-
UpdateNewValues().
220
-
Exec(ctx)
221
-
}
222
-
223
-
// DiscoveryFailedRecently reports whether a previous discovery attempt failed
224
-
// within the given window — used to skip known-deleted records on retry.
225
-
func (s *Store) DiscoveryFailedRecently(ctx context.Context, atURI string, maxAge time.Duration) (bool, error) {
226
-
cutoff := time.Now().Add(-maxAge).UnixMilli()
227
-
return s.client.DiscoveryFailed.Query().
228
-
Where(discoveryfailed.IDEQ(atURI), discoveryfailed.FailedAtGTE(cutoff)).
229
-
Exist(ctx)
230
-
}
231
-
232
-
// ApplyStarCreate inserts a star (idempotent on at_uri) and advances the
233
-
// cursor in a single transaction.
234
-
func (s *Store) ApplyStarCreate(ctx context.Context, atURI, subject string, createdAt time.Time, cur int64) error {
235
-
return withTx(ctx, s.client, func(tx *ent.Tx) error {
236
-
err := tx.Star.Create().
237
-
SetID(atURI).
238
-
SetSubject(subject).
239
-
SetCreatedAt(createdAt.UnixMilli()).
240
-
OnConflict().
241
-
DoNothing().
242
-
Exec(ctx)
243
-
// ent's DoNothing returns sql.ErrNoRows when nothing was inserted —
244
-
// safe to ignore; the row already exists.
245
-
if err != nil && err != sql.ErrNoRows {
246
-
return err
247
-
}
248
-
return advanceCursor(ctx, tx, cur)
249
-
})
250
-
}
251
-
252
-
// ApplyStarDelete removes a star row and advances the cursor in a single txn.
253
-
func (s *Store) ApplyStarDelete(ctx context.Context, atURI string, cur int64) error {
254
-
return withTx(ctx, s.client, func(tx *ent.Tx) error {
255
-
if _, err := tx.Star.Delete().Where(star.IDEQ(atURI)).Exec(ctx); err != nil {
256
-
return err
257
-
}
258
-
return advanceCursor(ctx, tx, cur)
259
-
})
260
-
}
261
-
262
-
// StarCount returns the number of star records held for a repo's at-uri.
263
-
func (s *Store) StarCount(ctx context.Context, repoAtURI string) (int, error) {
264
-
return s.client.Star.Query().Where(star.SubjectEQ(repoAtURI)).Count(ctx)
265
-
}
266
-
267
-
// UpsertStar inserts a star outside the firehose path — used by the stars
268
-
// backfill worker. Idempotent on at-uri; does not advance the cursor.
269
-
func (s *Store) UpsertStar(ctx context.Context, atURI, subject string, createdAt time.Time) error {
270
-
err := s.client.Star.Create().
271
-
SetID(atURI).
272
-
SetSubject(subject).
273
-
SetCreatedAt(createdAt.UnixMilli()).
274
-
OnConflict().
275
-
DoNothing().
276
-
Exec(ctx)
277
-
if err != nil && err != sql.ErrNoRows {
278
-
return err
279
-
}
280
-
return nil
281
-
}
282
-
283
-
// AllKnownDIDs returns every DID we've encountered: repo owners plus star
284
-
// authors parsed from existing star at-uris. Uses raw SQL because ent has no
285
-
// clean expression for substring + UNION.
286
-
func (s *Store) AllKnownDIDs(ctx context.Context, limit int) ([]string, error) {
287
-
rows, err := s.db.QueryContext(ctx, `
288
-
SELECT did FROM repos
289
-
UNION
290
-
SELECT substr(at_uri, 6, instr(substr(at_uri, 6), '/') - 1) AS did
291
-
FROM stars
292
-
WHERE substr(at_uri, 6, instr(substr(at_uri, 6), '/') - 1) != ''
293
-
LIMIT ?
294
-
`, limit)
295
-
if err != nil {
296
-
return nil, err
297
-
}
298
-
defer rows.Close()
299
-
var out []string
300
-
for rows.Next() {
301
-
var did string
302
-
if err := rows.Scan(&did); err != nil {
303
-
return nil, err
304
-
}
305
-
if did != "" {
306
-
out = append(out, did)
307
-
}
308
-
}
309
-
return out, rows.Err()
310
-
}
311
-
312
-
// UpsertHandle stores or refreshes a DID→handle mapping.
313
-
func (s *Store) UpsertHandle(ctx context.Context, did, h string) error {
314
-
return s.client.Handle.Create().
315
-
SetID(did).
316
-
SetHandle(h).
317
-
SetRefreshedAt(time.Now().UnixMilli()).
318
-
OnConflict().
319
-
UpdateNewValues().
320
-
Exec(ctx)
321
-
}
322
-
323
-
// Handle returns the cached handle for a DID, or "" if unknown.
324
-
func (s *Store) Handle(ctx context.Context, did string) (string, error) {
325
-
row, err := s.client.Handle.Query().Where(handle.IDEQ(did)).Only(ctx)
326
-
if ent.IsNotFound(err) {
327
-
return "", nil
328
-
}
329
-
if err != nil {
330
-
return "", err
331
-
}
332
-
return row.Handle, nil
333
-
}
334
-
335
-
// DIDsNeedingHandles returns owner DIDs from repos that have no cached handle
336
-
// yet, or whose cached handle is older than maxAge.
337
-
func (s *Store) DIDsNeedingHandles(ctx context.Context, maxAge time.Duration, limit int) ([]string, error) {
338
-
cutoff := time.Now().Add(-maxAge).UnixMilli()
339
-
rows, err := s.db.QueryContext(ctx, `
340
-
SELECT DISTINCT r.did
341
-
FROM repos r
342
-
LEFT JOIN handles h ON h.did = r.did
343
-
WHERE h.did IS NULL OR h.refreshed_at < ?
344
-
LIMIT ?
345
-
`, cutoff, limit)
346
-
if err != nil {
347
-
return nil, err
348
-
}
349
-
defer rows.Close()
350
-
var out []string
351
-
for rows.Next() {
352
-
var did string
353
-
if err := rows.Scan(&did); err != nil {
354
-
return nil, err
355
-
}
356
-
out = append(out, did)
357
-
}
358
-
return out, rows.Err()
359
-
}
360
-
361
-
// UpdateLanguages writes the languages JSON map for an existing repo.
362
-
func (s *Store) UpdateLanguages(ctx context.Context, atURI string, langs map[string]int64) error {
363
-
raw, err := json.Marshal(langs)
364
-
if err != nil {
365
-
return err
366
-
}
367
-
return s.client.Repo.UpdateOneID(atURI).
368
-
SetLanguages(string(raw)).
369
-
SetLanguageAt(time.Now().UnixMilli()).
370
-
Exec(ctx)
371
-
}
372
-
373
-
// ReposNeedingLanguages returns repos whose language enrichment is missing or
374
-
// older than maxAge.
375
-
func (s *Store) ReposNeedingLanguages(ctx context.Context, maxAge time.Duration, limit int) ([]Repo, error) {
376
-
cutoff := time.Now().Add(-maxAge).UnixMilli()
377
-
rows, err := s.db.QueryContext(ctx, `
378
-
SELECT at_uri, did, rkey, name, knot
379
-
FROM repos
380
-
WHERE language_at IS NULL OR language_at < ?
381
-
ORDER BY seen_at DESC
382
-
LIMIT ?
383
-
`, cutoff, limit)
384
-
if err != nil {
385
-
return nil, err
386
-
}
387
-
defer rows.Close()
388
-
var out []Repo
389
-
for rows.Next() {
390
-
var r Repo
391
-
if err := rows.Scan(&r.AtURI, &r.DID, &r.Rkey, &r.Name, &r.Knot); err != nil {
392
-
return nil, err
393
-
}
394
-
out = append(out, r)
395
-
}
396
-
return out, rows.Err()
397
-
}
398
-
399
-
// PrimaryLanguages returns the sorted set of "primary" languages observed
400
-
// across all enriched repos — i.e., the highest-bytes language per repo,
401
-
// deduplicated. Used to populate the language filter dropdown.
402
-
func (s *Store) PrimaryLanguages(ctx context.Context) ([]string, error) {
403
-
rows, err := s.db.QueryContext(ctx, `SELECT languages FROM repos WHERE languages IS NOT NULL AND languages != '' AND languages != '{}'`)
404
-
if err != nil {
405
-
return nil, err
406
-
}
407
-
defer rows.Close()
408
-
seen := make(map[string]struct{})
409
-
for rows.Next() {
410
-
var raw string
411
-
if err := rows.Scan(&raw); err != nil {
412
-
return nil, err
413
-
}
414
-
var langs map[string]int64
415
-
if err := json.Unmarshal([]byte(raw), &langs); err != nil {
416
-
continue
417
-
}
418
-
var best string
419
-
var bestN int64
420
-
for k, v := range langs {
421
-
if v > bestN {
422
-
bestN, best = v, k
423
-
}
424
-
}
425
-
if best != "" {
426
-
seen[best] = struct{}{}
427
-
}
428
-
}
429
-
if err := rows.Err(); err != nil {
430
-
return nil, err
431
-
}
432
-
out := make([]string, 0, len(seen))
433
-
for k := range seen {
434
-
out = append(out, k)
435
-
}
436
-
sort.Strings(out)
437
-
return out, nil
438
-
}
439
-
440
-
// ReposFilter narrows the result of RecentRepos. Zero values mean "no
441
-
// filter" so callers can mix and match.
442
-
type ReposFilter struct {
443
-
Language string // primary language match (post-filter in Go over JSON map)
444
-
Since time.Time // repos created at or after this time (zero = no time floor)
445
-
ForksOnly bool // only rows with non-empty source
446
-
NoForks bool // exclude rows with non-empty source
447
-
}
448
-
449
-
// RecentRepos returns repos matching the filter, sorted newest first. Pass
450
-
// limit <= 0 for no limit. Uses raw SQL because the scalar star-count
451
-
// subquery and handle LEFT JOIN are awkward to express via ent's query API.
452
-
func (s *Store) RecentRepos(ctx context.Context, f ReposFilter, limit int) ([]Repo, error) {
453
-
candidateLimit := -1
454
-
if limit > 0 {
455
-
candidateLimit = limit
456
-
if f.Language != "" {
457
-
candidateLimit = limit * 5
458
-
}
459
-
}
460
-
forksOnly := 0
461
-
if f.ForksOnly {
462
-
forksOnly = 1
463
-
}
464
-
noForks := 0
465
-
if f.NoForks {
466
-
noForks = 1
467
-
}
468
-
rows, err := s.db.QueryContext(ctx, `
469
-
SELECT r.at_uri, r.did, r.rkey, r.name, r.knot, r.description, r.topics,
470
-
r.website, r.source, r.spindle, r.repo_did, r.created_at, r.seen_at,
471
-
r.languages, r.language_at, COALESCE(h.handle, ''),
472
-
(SELECT COUNT(*) FROM stars s WHERE s.subject = r.at_uri) AS star_count
473
-
FROM repos r
474
-
LEFT JOIN handles h ON h.did = r.did
475
-
WHERE r.created_at >= ?
476
-
AND (? = '' OR r.languages IS NOT NULL)
477
-
AND (? = 0 OR (r.source IS NOT NULL AND r.source != ''))
478
-
AND (? = 0 OR r.source IS NULL OR r.source = '')
479
-
ORDER BY r.created_at DESC
480
-
LIMIT ?
481
-
`, f.Since.UnixMilli(), f.Language, forksOnly, noForks, candidateLimit)
482
-
if err != nil {
483
-
return nil, err
484
-
}
485
-
defer rows.Close()
486
-
487
-
var out []Repo
488
-
for rows.Next() {
489
-
r, err := scanRepo(rows)
490
-
if err != nil {
491
-
return nil, err
492
-
}
493
-
if f.Language != "" && r.Primary() != f.Language {
494
-
continue
495
-
}
496
-
out = append(out, r)
497
-
if limit > 0 && len(out) >= limit {
498
-
break
499
-
}
500
-
}
501
-
return out, rows.Err()
502
-
}
503
-
504
-
// KnotEntry is one row in the knots view.
505
-
type KnotEntry struct {
506
-
Host string
507
-
FirstSeen time.Time
508
-
LastSeen time.Time
509
-
LastOK *time.Time
510
-
LastError *time.Time
511
-
LastErrorMsg string
512
-
ConsecutiveErrors int
513
-
Disabled bool
514
-
Repos int
515
-
}
516
-
517
-
// Knots returns every knot host with bookkeeping plus a count of repos
518
-
// hosted on it.
519
-
func (s *Store) Knots(ctx context.Context) ([]KnotEntry, error) {
520
-
knots, err := s.client.Knot.Query().
521
-
Order(ent.Desc(knot.FieldLastSeen)).
522
-
All(ctx)
523
-
if err != nil {
524
-
return nil, err
525
-
}
526
-
out := make([]KnotEntry, 0, len(knots))
527
-
for _, k := range knots {
528
-
repos, err := s.client.Repo.Query().Where(repo.KnotEQ(k.ID)).Count(ctx)
529
-
if err != nil {
530
-
return nil, err
531
-
}
532
-
entry := KnotEntry{
533
-
Host: k.ID,
534
-
FirstSeen: time.UnixMilli(k.FirstSeen),
535
-
LastSeen: time.UnixMilli(k.LastSeen),
536
-
ConsecutiveErrors: k.ConsecutiveErrors,
537
-
Disabled: k.Disabled != 0,
538
-
Repos: repos,
539
-
}
540
-
if k.LastOkAt != nil {
541
-
t := time.UnixMilli(*k.LastOkAt)
542
-
entry.LastOK = &t
543
-
}
544
-
if k.LastErrorAt != nil {
545
-
t := time.UnixMilli(*k.LastErrorAt)
546
-
entry.LastError = &t
547
-
}
548
-
if k.LastError != nil {
549
-
entry.LastErrorMsg = *k.LastError
550
-
}
551
-
out = append(out, entry)
552
-
}
553
-
// Sort by repos desc, then host — done in Go since it's a derived count.
554
-
for i := range out {
555
-
for j := i + 1; j < len(out); j++ {
556
-
if out[j].Repos > out[i].Repos || (out[j].Repos == out[i].Repos && out[j].Host < out[i].Host) {
557
-
out[i], out[j] = out[j], out[i]
558
-
}
559
-
}
560
-
}
561
-
return out, nil
562
-
}
563
-
564
-
// UpsertKnot records that we've seen this host, refreshing last_seen.
565
-
func (s *Store) UpsertKnot(ctx context.Context, host string) error {
566
-
if host == "" {
567
-
return nil
568
-
}
569
-
now := time.Now().UnixMilli()
570
-
return s.client.Knot.Create().
571
-
SetID(host).
572
-
SetFirstSeen(now).
573
-
SetLastSeen(now).
574
-
OnConflict().
575
-
Update(func(u *ent.KnotUpsert) {
576
-
u.SetLastSeen(now)
577
-
}).
578
-
Exec(ctx)
579
-
}
580
-
581
-
// MarkKnotOK resets consecutive_errors on a successful call.
582
-
func (s *Store) MarkKnotOK(ctx context.Context, host string) error {
583
-
if host == "" {
584
-
return nil
585
-
}
586
-
now := time.Now().UnixMilli()
587
-
return s.client.Knot.Create().
588
-
SetID(host).
589
-
SetFirstSeen(now).
590
-
SetLastSeen(now).
591
-
SetLastOkAt(now).
592
-
OnConflict().
593
-
Update(func(u *ent.KnotUpsert) {
594
-
u.SetLastSeen(now)
595
-
u.SetLastOkAt(now)
596
-
u.SetConsecutiveErrors(0)
597
-
}).
598
-
Exec(ctx)
599
-
}
600
-
601
-
// MarkKnotError records a failure and increments consecutive_errors.
602
-
func (s *Store) MarkKnotError(ctx context.Context, host, reason string) error {
603
-
if host == "" {
604
-
return nil
605
-
}
606
-
if len(reason) > 500 {
607
-
reason = reason[:500]
608
-
}
609
-
now := time.Now().UnixMilli()
610
-
// ent has no SQL-side "consecutive_errors + 1" expression in upsert, so
611
-
// we drop to raw SQL for the increment. Insert path uses ent.
612
-
_, err := s.db.ExecContext(ctx, `
613
-
INSERT INTO knots (host, first_seen, last_seen, last_error_at, last_error, consecutive_errors, disabled)
614
-
VALUES (?, ?, ?, ?, ?, 1, 0)
615
-
ON CONFLICT(host) DO UPDATE SET
616
-
last_error_at = excluded.last_error_at,
617
-
last_error = excluded.last_error,
618
-
last_seen = excluded.last_seen,
619
-
consecutive_errors = consecutive_errors + 1
620
-
`, host, now, now, now, reason)
621
-
return err
622
-
}
623
-
624
-
// KnotCircuitOpen reports whether further calls to a host should be skipped.
625
-
func (s *Store) KnotCircuitOpen(ctx context.Context, host string, minErrors int, cooldown time.Duration) (bool, error) {
626
-
if host == "" {
627
-
return false, nil
628
-
}
629
-
k, err := s.client.Knot.Query().Where(knot.IDEQ(host)).Only(ctx)
630
-
if ent.IsNotFound(err) {
631
-
return false, nil
632
-
}
633
-
if err != nil {
634
-
return false, err
635
-
}
636
-
if k.Disabled != 0 {
637
-
return true, nil
638
-
}
639
-
if k.ConsecutiveErrors < minErrors || k.LastErrorAt == nil {
640
-
return false, nil
641
-
}
642
-
return time.UnixMilli(*k.LastErrorAt).Add(cooldown).After(time.Now()), nil
643
-
}
644
-
645
-
// SetKnotDisabled toggles the knot's disabled flag.
646
-
func (s *Store) SetKnotDisabled(ctx context.Context, host string, disabled bool) error {
647
-
if host == "" {
648
-
return nil
649
-
}
650
-
v := 0
651
-
if disabled {
652
-
v = 1
653
-
}
654
-
return s.client.Knot.UpdateOneID(host).SetDisabled(v).Exec(ctx)
655
-
}
656
-
657
-
// HandleEntry is one row in the handles view.
658
-
type HandleEntry struct {
659
-
DID string
660
-
Handle string
661
-
RefreshedAt time.Time
662
-
Repos int
663
-
}
664
-
665
-
// Handles returns every resolved DID→handle mapping with repo count. Pass
666
-
// limit <= 0 for no limit.
667
-
func (s *Store) Handles(ctx context.Context, limit int) ([]HandleEntry, error) {
668
-
sqlLimit := -1
669
-
if limit > 0 {
670
-
sqlLimit = limit
671
-
}
672
-
rows, err := s.db.QueryContext(ctx, `
673
-
SELECT h.did, h.handle, h.refreshed_at,
674
-
(SELECT COUNT(*) FROM repos r WHERE r.did = h.did) AS repos
675
-
FROM handles h
676
-
ORDER BY repos DESC, h.handle
677
-
LIMIT ?
678
-
`, sqlLimit)
679
-
if err != nil {
680
-
return nil, err
681
-
}
682
-
defer rows.Close()
683
-
var out []HandleEntry
684
-
for rows.Next() {
685
-
var (
686
-
h HandleEntry
687
-
refreshed int64
688
-
)
689
-
if err := rows.Scan(&h.DID, &h.Handle, &refreshed, &h.Repos); err != nil {
690
-
return nil, err
691
-
}
692
-
h.RefreshedAt = time.UnixMilli(refreshed)
693
-
out = append(out, h)
694
-
}
695
-
return out, rows.Err()
696
-
}
697
-
698
-
// scanRepo reads a row from the SQL projection used by RecentRepos.
699
-
func scanRepo(rows *sql.Rows) (Repo, error) {
700
-
var (
701
-
r Repo
702
-
description sql.NullString
703
-
topicsRaw sql.NullString
704
-
website sql.NullString
705
-
source sql.NullString
706
-
spindle sql.NullString
707
-
repoDID sql.NullString
708
-
languages sql.NullString
709
-
languageAt sql.NullInt64
710
-
createdAt int64
711
-
seenAt int64
712
-
)
713
-
if err := rows.Scan(&r.AtURI, &r.DID, &r.Rkey, &r.Name, &r.Knot,
714
-
&description, &topicsRaw, &website, &source, &spindle, &repoDID,
715
-
&createdAt, &seenAt, &languages, &languageAt, &r.Handle, &r.StarCount); err != nil {
716
-
return r, err
717
-
}
718
-
r.Description = description.String
719
-
r.Website = website.String
720
-
r.Source = source.String
721
-
r.Spindle = spindle.String
722
-
r.RepoDID = repoDID.String
723
-
r.CreatedAt = time.UnixMilli(createdAt)
724
-
r.SeenAt = time.UnixMilli(seenAt)
725
-
if topicsRaw.Valid && topicsRaw.String != "" && topicsRaw.String != "null" {
726
-
_ = json.Unmarshal([]byte(topicsRaw.String), &r.Topics)
727
-
}
728
-
if languages.Valid && languages.String != "" {
729
-
_ = json.Unmarshal([]byte(languages.String), &r.Languages)
730
-
}
731
-
if languageAt.Valid {
732
-
t := time.UnixMilli(languageAt.Int64)
733
-
r.LanguageAt = &t
734
-
}
735
-
return r, nil
736
-
}
737
-
738
95
// withTx runs fn in a transaction, committing on success or rolling back on error.
739
96
func withTx(ctx context.Context, client *ent.Client, fn func(*ent.Tx) error) error {
740
97
tx, err := client.Tx(ctx)