this repo has no description
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

refactor store #2

open opened by chown.de targeting main from refactor-store

break it up into smaller pieces.

Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:x3ni2r3jgdqms5euhzt2qqdr/sh.tangled.repo.pull/3mkxjcdkn6c22
+698 -650
Diff #0
+6 -6
firehose/worker.go
··· 27 27 } 28 28 29 29 type WorkerConfig[T comparable] struct { 30 - Name string // logged on errors / drops 31 - Logger *slog.Logger // required 32 - Process func(ctx context.Context, item T) error // required 33 - QueueSize int // default 256 34 - SeenTTL time.Duration // 0 = forever 35 - MinDelay time.Duration // gap between processed items 30 + Name string // logged on errors / drops 31 + Logger *slog.Logger // required 32 + Process func(ctx context.Context, item T) error // required 33 + QueueSize int // default 256 34 + SeenTTL time.Duration // 0 = forever 35 + MinDelay time.Duration // gap between processed items 36 36 } 37 37 38 38 func NewWorker[T comparable](cfg WorkerConfig[T]) *Worker[T] {
+47
store/cursor.go
··· 1 + package store 2 + 3 + import ( 4 + "context" 5 + "time" 6 + 7 + "tangled.sh/chown.de/tangled-repo-firehose/ent" 8 + "tangled.sh/chown.de/tangled-repo-firehose/ent/cursor" 9 + ) 10 + 11 + // GetCursor returns the persisted Jetstream cursor (unix microseconds), or 0 12 + // if none has been recorded yet. 13 + func (s *Store) GetCursor(ctx context.Context) (int64, error) { 14 + row, err := s.client.Cursor.Query().Where(cursor.IDEQ(1)).Only(ctx) 15 + if ent.IsNotFound(err) { 16 + return 0, nil 17 + } 18 + if err != nil { 19 + return 0, err 20 + } 21 + return row.Cursor, nil 22 + } 23 + 24 + // AdvanceCursor moves the cursor forward without applying any record. Used 25 + // for non-tracked event kinds so we don't replay them on restart. 26 + func (s *Store) AdvanceCursor(ctx context.Context, cur int64) error { 27 + return withTx(ctx, s.client, func(tx *ent.Tx) error { 28 + return advanceCursor(ctx, tx, cur) 29 + }) 30 + } 31 + 32 + func advanceCursor(ctx context.Context, tx *ent.Tx, cur int64) error { 33 + // Never move the cursor backward — out-of-order replays would otherwise 34 + // rewind progress. 35 + existing, err := tx.Cursor.Query().Where(cursor.IDEQ(1)).Only(ctx) 36 + now := time.Now().UnixMilli() 37 + if ent.IsNotFound(err) { 38 + return tx.Cursor.Create().SetID(1).SetCursor(cur).SetUpdated(now).Exec(ctx) 39 + } 40 + if err != nil { 41 + return err 42 + } 43 + if cur <= existing.Cursor { 44 + return nil 45 + } 46 + return tx.Cursor.UpdateOneID(1).SetCursor(cur).SetUpdated(now).Exec(ctx) 47 + }
+128
store/handle.go
··· 1 + package store 2 + 3 + import ( 4 + "context" 5 + "time" 6 + 7 + "tangled.sh/chown.de/tangled-repo-firehose/ent" 8 + "tangled.sh/chown.de/tangled-repo-firehose/ent/handle" 9 + ) 10 + 11 + // AllKnownDIDs returns every DID we've encountered: repo owners plus star 12 + // authors parsed from existing star at-uris. Uses raw SQL because ent has no 13 + // clean expression for substring + UNION. 14 + func (s *Store) AllKnownDIDs(ctx context.Context, limit int) ([]string, error) { 15 + rows, err := s.db.QueryContext(ctx, ` 16 + SELECT did FROM repos 17 + UNION 18 + SELECT substr(at_uri, 6, instr(substr(at_uri, 6), '/') - 1) AS did 19 + FROM stars 20 + WHERE substr(at_uri, 6, instr(substr(at_uri, 6), '/') - 1) != '' 21 + LIMIT ? 22 + `, limit) 23 + if err != nil { 24 + return nil, err 25 + } 26 + defer rows.Close() 27 + var out []string 28 + for rows.Next() { 29 + var did string 30 + if err := rows.Scan(&did); err != nil { 31 + return nil, err 32 + } 33 + if did != "" { 34 + out = append(out, did) 35 + } 36 + } 37 + return out, rows.Err() 38 + } 39 + 40 + // UpsertHandle stores or refreshes a DID→handle mapping. 41 + func (s *Store) UpsertHandle(ctx context.Context, did, h string) error { 42 + return s.client.Handle.Create(). 43 + SetID(did). 44 + SetHandle(h). 45 + SetRefreshedAt(time.Now().UnixMilli()). 46 + OnConflict(). 47 + UpdateNewValues(). 48 + Exec(ctx) 49 + } 50 + 51 + // Handle returns the cached handle for a DID, or "" if unknown. 52 + func (s *Store) Handle(ctx context.Context, did string) (string, error) { 53 + row, err := s.client.Handle.Query().Where(handle.IDEQ(did)).Only(ctx) 54 + if ent.IsNotFound(err) { 55 + return "", nil 56 + } 57 + if err != nil { 58 + return "", err 59 + } 60 + return row.Handle, nil 61 + } 62 + 63 + // DIDsNeedingHandles returns owner DIDs from repos that have no cached handle 64 + // yet, or whose cached handle is older than maxAge. 65 + func (s *Store) DIDsNeedingHandles(ctx context.Context, maxAge time.Duration, limit int) ([]string, error) { 66 + cutoff := time.Now().Add(-maxAge).UnixMilli() 67 + rows, err := s.db.QueryContext(ctx, ` 68 + SELECT DISTINCT r.did 69 + FROM repos r 70 + LEFT JOIN handles h ON h.did = r.did 71 + WHERE h.did IS NULL OR h.refreshed_at < ? 72 + LIMIT ? 73 + `, cutoff, limit) 74 + if err != nil { 75 + return nil, err 76 + } 77 + defer rows.Close() 78 + var out []string 79 + for rows.Next() { 80 + var did string 81 + if err := rows.Scan(&did); err != nil { 82 + return nil, err 83 + } 84 + out = append(out, did) 85 + } 86 + return out, rows.Err() 87 + } 88 + 89 + // HandleEntry is one row in the handles view. 90 + type HandleEntry struct { 91 + DID string 92 + Handle string 93 + RefreshedAt time.Time 94 + Repos int 95 + } 96 + 97 + // Handles returns every resolved DID→handle mapping with repo count. Pass 98 + // limit <= 0 for no limit. 99 + func (s *Store) Handles(ctx context.Context, limit int) ([]HandleEntry, error) { 100 + sqlLimit := -1 101 + if limit > 0 { 102 + sqlLimit = limit 103 + } 104 + rows, err := s.db.QueryContext(ctx, ` 105 + SELECT h.did, h.handle, h.refreshed_at, 106 + (SELECT COUNT(*) FROM repos r WHERE r.did = h.did) AS repos 107 + FROM handles h 108 + ORDER BY repos DESC, h.handle 109 + LIMIT ? 110 + `, sqlLimit) 111 + if err != nil { 112 + return nil, err 113 + } 114 + defer rows.Close() 115 + var out []HandleEntry 116 + for rows.Next() { 117 + var ( 118 + h HandleEntry 119 + refreshed int64 120 + ) 121 + if err := rows.Scan(&h.DID, &h.Handle, &refreshed, &h.Repos); err != nil { 122 + return nil, err 123 + } 124 + h.RefreshedAt = time.UnixMilli(refreshed) 125 + out = append(out, h) 126 + } 127 + return out, rows.Err() 128 + }
+163
store/knot.go
··· 1 + package store 2 + 3 + import ( 4 + "context" 5 + "time" 6 + 7 + "tangled.sh/chown.de/tangled-repo-firehose/ent" 8 + "tangled.sh/chown.de/tangled-repo-firehose/ent/knot" 9 + "tangled.sh/chown.de/tangled-repo-firehose/ent/repo" 10 + ) 11 + 12 + // KnotEntry is one row in the knots view. 13 + type KnotEntry struct { 14 + Host string 15 + FirstSeen time.Time 16 + LastSeen time.Time 17 + LastOK *time.Time 18 + LastError *time.Time 19 + LastErrorMsg string 20 + ConsecutiveErrors int 21 + Disabled bool 22 + Repos int 23 + } 24 + 25 + // Knots returns every knot host with bookkeeping plus a count of repos 26 + // hosted on it. 27 + func (s *Store) Knots(ctx context.Context) ([]KnotEntry, error) { 28 + knots, err := s.client.Knot.Query(). 29 + Order(ent.Desc(knot.FieldLastSeen)). 30 + All(ctx) 31 + if err != nil { 32 + return nil, err 33 + } 34 + out := make([]KnotEntry, 0, len(knots)) 35 + for _, k := range knots { 36 + repos, err := s.client.Repo.Query().Where(repo.KnotEQ(k.ID)).Count(ctx) 37 + if err != nil { 38 + return nil, err 39 + } 40 + entry := KnotEntry{ 41 + Host: k.ID, 42 + FirstSeen: time.UnixMilli(k.FirstSeen), 43 + LastSeen: time.UnixMilli(k.LastSeen), 44 + ConsecutiveErrors: k.ConsecutiveErrors, 45 + Disabled: k.Disabled != 0, 46 + Repos: repos, 47 + } 48 + if k.LastOkAt != nil { 49 + t := time.UnixMilli(*k.LastOkAt) 50 + entry.LastOK = &t 51 + } 52 + if k.LastErrorAt != nil { 53 + t := time.UnixMilli(*k.LastErrorAt) 54 + entry.LastError = &t 55 + } 56 + if k.LastError != nil { 57 + entry.LastErrorMsg = *k.LastError 58 + } 59 + out = append(out, entry) 60 + } 61 + // Sort by repos desc, then host — done in Go since it's a derived count. 62 + for i := range out { 63 + for j := i + 1; j < len(out); j++ { 64 + if out[j].Repos > out[i].Repos || (out[j].Repos == out[i].Repos && out[j].Host < out[i].Host) { 65 + out[i], out[j] = out[j], out[i] 66 + } 67 + } 68 + } 69 + return out, nil 70 + } 71 + 72 + // UpsertKnot records that we've seen this host, refreshing last_seen. 73 + func (s *Store) UpsertKnot(ctx context.Context, host string) error { 74 + if host == "" { 75 + return nil 76 + } 77 + now := time.Now().UnixMilli() 78 + return s.client.Knot.Create(). 79 + SetID(host). 80 + SetFirstSeen(now). 81 + SetLastSeen(now). 82 + OnConflict(). 83 + Update(func(u *ent.KnotUpsert) { 84 + u.SetLastSeen(now) 85 + }). 86 + Exec(ctx) 87 + } 88 + 89 + // MarkKnotOK resets consecutive_errors on a successful call. 90 + func (s *Store) MarkKnotOK(ctx context.Context, host string) error { 91 + if host == "" { 92 + return nil 93 + } 94 + now := time.Now().UnixMilli() 95 + return s.client.Knot.Create(). 96 + SetID(host). 97 + SetFirstSeen(now). 98 + SetLastSeen(now). 99 + SetLastOkAt(now). 100 + OnConflict(). 101 + Update(func(u *ent.KnotUpsert) { 102 + u.SetLastSeen(now) 103 + u.SetLastOkAt(now) 104 + u.SetConsecutiveErrors(0) 105 + }). 106 + Exec(ctx) 107 + } 108 + 109 + // MarkKnotError records a failure and increments consecutive_errors. 110 + func (s *Store) MarkKnotError(ctx context.Context, host, reason string) error { 111 + if host == "" { 112 + return nil 113 + } 114 + if len(reason) > 500 { 115 + reason = reason[:500] 116 + } 117 + now := time.Now().UnixMilli() 118 + // ent has no SQL-side "consecutive_errors + 1" expression in upsert, so 119 + // we drop to raw SQL for the increment. Insert path uses ent. 120 + _, err := s.db.ExecContext(ctx, ` 121 + INSERT INTO knots (host, first_seen, last_seen, last_error_at, last_error, consecutive_errors, disabled) 122 + VALUES (?, ?, ?, ?, ?, 1, 0) 123 + ON CONFLICT(host) DO UPDATE SET 124 + last_error_at = excluded.last_error_at, 125 + last_error = excluded.last_error, 126 + last_seen = excluded.last_seen, 127 + consecutive_errors = consecutive_errors + 1 128 + `, host, now, now, now, reason) 129 + return err 130 + } 131 + 132 + // KnotCircuitOpen reports whether further calls to a host should be skipped. 133 + func (s *Store) KnotCircuitOpen(ctx context.Context, host string, minErrors int, cooldown time.Duration) (bool, error) { 134 + if host == "" { 135 + return false, nil 136 + } 137 + k, err := s.client.Knot.Query().Where(knot.IDEQ(host)).Only(ctx) 138 + if ent.IsNotFound(err) { 139 + return false, nil 140 + } 141 + if err != nil { 142 + return false, err 143 + } 144 + if k.Disabled != 0 { 145 + return true, nil 146 + } 147 + if k.ConsecutiveErrors < minErrors || k.LastErrorAt == nil { 148 + return false, nil 149 + } 150 + return time.UnixMilli(*k.LastErrorAt).Add(cooldown).After(time.Now()), nil 151 + } 152 + 153 + // SetKnotDisabled toggles the knot's disabled flag. 154 + func (s *Store) SetKnotDisabled(ctx context.Context, host string, disabled bool) error { 155 + if host == "" { 156 + return nil 157 + } 158 + v := 0 159 + if disabled { 160 + v = 1 161 + } 162 + return s.client.Knot.UpdateOneID(host).SetDisabled(v).Exec(ctx) 163 + }
+87
store/lang.go
··· 1 + package store 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "sort" 7 + "time" 8 + ) 9 + 10 + // UpdateLanguages writes the languages JSON map for an existing repo. 11 + func (s *Store) UpdateLanguages(ctx context.Context, atURI string, langs map[string]int64) error { 12 + raw, err := json.Marshal(langs) 13 + if err != nil { 14 + return err 15 + } 16 + return s.client.Repo.UpdateOneID(atURI). 17 + SetLanguages(string(raw)). 18 + SetLanguageAt(time.Now().UnixMilli()). 19 + Exec(ctx) 20 + } 21 + 22 + // ReposNeedingLanguages returns repos whose language enrichment is missing or 23 + // older than maxAge. 24 + func (s *Store) ReposNeedingLanguages(ctx context.Context, maxAge time.Duration, limit int) ([]Repo, error) { 25 + cutoff := time.Now().Add(-maxAge).UnixMilli() 26 + rows, err := s.db.QueryContext(ctx, ` 27 + SELECT at_uri, did, rkey, name, knot 28 + FROM repos 29 + WHERE language_at IS NULL OR language_at < ? 30 + ORDER BY seen_at DESC 31 + LIMIT ? 32 + `, cutoff, limit) 33 + if err != nil { 34 + return nil, err 35 + } 36 + defer rows.Close() 37 + var out []Repo 38 + for rows.Next() { 39 + var r Repo 40 + if err := rows.Scan(&r.AtURI, &r.DID, &r.Rkey, &r.Name, &r.Knot); err != nil { 41 + return nil, err 42 + } 43 + out = append(out, r) 44 + } 45 + return out, rows.Err() 46 + } 47 + 48 + // PrimaryLanguages returns the sorted set of "primary" languages observed 49 + // across all enriched repos — i.e., the highest-bytes language per repo, 50 + // deduplicated. Used to populate the language filter dropdown. 51 + func (s *Store) PrimaryLanguages(ctx context.Context) ([]string, error) { 52 + rows, err := s.db.QueryContext(ctx, `SELECT languages FROM repos WHERE languages IS NOT NULL AND languages != '' AND languages != '{}'`) 53 + if err != nil { 54 + return nil, err 55 + } 56 + defer rows.Close() 57 + seen := make(map[string]struct{}) 58 + for rows.Next() { 59 + var raw string 60 + if err := rows.Scan(&raw); err != nil { 61 + return nil, err 62 + } 63 + var langs map[string]int64 64 + if err := json.Unmarshal([]byte(raw), &langs); err != nil { 65 + continue 66 + } 67 + var best string 68 + var bestN int64 69 + for k, v := range langs { 70 + if v > bestN { 71 + bestN, best = v, k 72 + } 73 + } 74 + if best != "" { 75 + seen[best] = struct{}{} 76 + } 77 + } 78 + if err := rows.Err(); err != nil { 79 + return nil, err 80 + } 81 + out := make([]string, 0, len(seen)) 82 + for k := range seen { 83 + out = append(out, k) 84 + } 85 + sort.Strings(out) 86 + return out, nil 87 + }
+205
store/repo.go
··· 1 + package store 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "encoding/json" 7 + "time" 8 + 9 + "tangled.sh/chown.de/tangled-repo-firehose/ent" 10 + "tangled.sh/chown.de/tangled-repo-firehose/ent/discoveryfailed" 11 + "tangled.sh/chown.de/tangled-repo-firehose/ent/repo" 12 + ) 13 + 14 + // ApplyRepoUpsert writes (or replaces) a repo row and advances the cursor in 15 + // a single transaction. Languages column is intentionally untouched so async 16 + // language enrichment isn't clobbered by a record update. 17 + func (s *Store) ApplyRepoUpsert(ctx context.Context, r Repo, cur int64) error { 18 + return withTx(ctx, s.client, func(tx *ent.Tx) error { 19 + if err := upsertRepo(ctx, tx, r); err != nil { 20 + return err 21 + } 22 + return advanceCursor(ctx, tx, cur) 23 + }) 24 + } 25 + 26 + // ApplyRepoDelete removes a repo and advances the cursor in a single txn. 27 + func (s *Store) ApplyRepoDelete(ctx context.Context, atURI string, cur int64) error { 28 + return withTx(ctx, s.client, func(tx *ent.Tx) error { 29 + if _, err := tx.Repo.Delete().Where(repo.IDEQ(atURI)).Exec(ctx); err != nil { 30 + return err 31 + } 32 + return advanceCursor(ctx, tx, cur) 33 + }) 34 + } 35 + 36 + // UpsertRepo writes (or replaces) a repo row outside the firehose path — 37 + // used by discovery when a star references a repo we haven't seen yet. Does 38 + // NOT touch the cursor. 39 + func (s *Store) UpsertRepo(ctx context.Context, r Repo) error { 40 + tx, err := s.client.Tx(ctx) 41 + if err != nil { 42 + return err 43 + } 44 + defer func() { _ = tx.Rollback() }() 45 + if err := upsertRepo(ctx, tx, r); err != nil { 46 + return err 47 + } 48 + return tx.Commit() 49 + } 50 + 51 + func upsertRepo(ctx context.Context, tx *ent.Tx, r Repo) error { 52 + topicsJSON, err := json.Marshal(r.Topics) 53 + if err != nil { 54 + return err 55 + } 56 + topics := string(topicsJSON) 57 + create := tx.Repo.Create(). 58 + SetID(r.AtURI). 59 + SetDid(r.DID). 60 + SetRkey(r.Rkey). 61 + SetName(r.Name). 62 + SetKnot(r.Knot). 63 + SetCreatedAt(r.CreatedAt.UnixMilli()). 64 + SetSeenAt(r.SeenAt.UnixMilli()). 65 + SetTopics(topics). 66 + SetDescription(r.Description). 67 + SetWebsite(r.Website). 68 + SetSource(r.Source). 69 + SetSpindle(r.Spindle). 70 + SetRepoDid(r.RepoDID) 71 + return create.OnConflict(). 72 + UpdateNewValues(). 73 + Exec(ctx) 74 + } 75 + 76 + // HasRepo reports whether a repo at-uri already exists in the index. 77 + func (s *Store) HasRepo(ctx context.Context, atURI string) (bool, error) { 78 + return s.client.Repo.Query().Where(repo.IDEQ(atURI)).Exist(ctx) 79 + } 80 + 81 + // MarkDiscoveryFailed records that a PDS returned RecordNotFound for this 82 + // at-uri so we don't keep retrying every backfill cycle. 83 + func (s *Store) MarkDiscoveryFailed(ctx context.Context, atURI, reason string) error { 84 + return s.client.DiscoveryFailed.Create(). 85 + SetID(atURI). 86 + SetFailedAt(time.Now().UnixMilli()). 87 + SetReason(reason). 88 + OnConflict(). 89 + UpdateNewValues(). 90 + Exec(ctx) 91 + } 92 + 93 + // DiscoveryFailedRecently reports whether a previous discovery attempt failed 94 + // within the given window — used to skip known-deleted records on retry. 95 + func (s *Store) DiscoveryFailedRecently(ctx context.Context, atURI string, maxAge time.Duration) (bool, error) { 96 + cutoff := time.Now().Add(-maxAge).UnixMilli() 97 + return s.client.DiscoveryFailed.Query(). 98 + Where(discoveryfailed.IDEQ(atURI), discoveryfailed.FailedAtGTE(cutoff)). 99 + Exist(ctx) 100 + } 101 + 102 + // ReposFilter narrows the result of RecentRepos. Zero values mean "no 103 + // filter" so callers can mix and match. 104 + type ReposFilter struct { 105 + Language string // primary language match (post-filter in Go over JSON map) 106 + Since time.Time // repos created at or after this time (zero = no time floor) 107 + ForksOnly bool // only rows with non-empty source 108 + NoForks bool // exclude rows with non-empty source 109 + } 110 + 111 + // RecentRepos returns repos matching the filter, sorted newest first. Pass 112 + // limit <= 0 for no limit. Uses raw SQL because the scalar star-count 113 + // subquery and handle LEFT JOIN are awkward to express via ent's query API. 114 + func (s *Store) RecentRepos(ctx context.Context, f ReposFilter, limit int) ([]Repo, error) { 115 + candidateLimit := -1 116 + if limit > 0 { 117 + candidateLimit = limit 118 + if f.Language != "" { 119 + candidateLimit = limit * 5 120 + } 121 + } 122 + forksOnly := 0 123 + if f.ForksOnly { 124 + forksOnly = 1 125 + } 126 + noForks := 0 127 + if f.NoForks { 128 + noForks = 1 129 + } 130 + 131 + rows, err := s.db.QueryContext(ctx, ` 132 + SELECT r.at_uri, r.did, r.rkey, r.name, r.knot, r.description, r.topics, 133 + r.website, r.source, r.spindle, r.repo_did, r.created_at, r.seen_at, 134 + r.languages, r.language_at, COALESCE(h.handle, ''), 135 + (SELECT COUNT(*) FROM stars s WHERE s.subject = r.at_uri) AS star_count 136 + FROM repos r 137 + LEFT JOIN handles h ON h.did = r.did 138 + WHERE r.created_at >= ? 139 + AND (? = '' OR r.languages IS NOT NULL) 140 + AND (? = 0 OR (r.source IS NOT NULL AND r.source != '')) 141 + AND (? = 0 OR r.source IS NULL OR r.source = '') 142 + ORDER BY r.created_at DESC 143 + LIMIT ? 144 + `, f.Since.UnixMilli(), f.Language, forksOnly, noForks, candidateLimit) 145 + if err != nil { 146 + return nil, err 147 + } 148 + defer rows.Close() 149 + 150 + var out []Repo 151 + for rows.Next() { 152 + r, err := scanRepo(rows) 153 + if err != nil { 154 + return nil, err 155 + } 156 + if f.Language != "" && r.Primary() != f.Language { 157 + continue 158 + } 159 + out = append(out, r) 160 + if limit > 0 && len(out) >= limit { 161 + break 162 + } 163 + } 164 + return out, rows.Err() 165 + } 166 + 167 + // scanRepo reads a row from the SQL projection used by RecentRepos. 168 + func scanRepo(rows *sql.Rows) (Repo, error) { 169 + var ( 170 + r Repo 171 + description sql.NullString 172 + topicsRaw sql.NullString 173 + website sql.NullString 174 + source sql.NullString 175 + spindle sql.NullString 176 + repoDID sql.NullString 177 + languages sql.NullString 178 + languageAt sql.NullInt64 179 + createdAt int64 180 + seenAt int64 181 + ) 182 + if err := rows.Scan(&r.AtURI, &r.DID, &r.Rkey, &r.Name, &r.Knot, 183 + &description, &topicsRaw, &website, &source, &spindle, &repoDID, 184 + &createdAt, &seenAt, &languages, &languageAt, &r.Handle, &r.StarCount); err != nil { 185 + return r, err 186 + } 187 + r.Description = description.String 188 + r.Website = website.String 189 + r.Source = source.String 190 + r.Spindle = spindle.String 191 + r.RepoDID = repoDID.String 192 + r.CreatedAt = time.UnixMilli(createdAt) 193 + r.SeenAt = time.UnixMilli(seenAt) 194 + if topicsRaw.Valid && topicsRaw.String != "" && topicsRaw.String != "null" { 195 + _ = json.Unmarshal([]byte(topicsRaw.String), &r.Topics) 196 + } 197 + if languages.Valid && languages.String != "" { 198 + _ = json.Unmarshal([]byte(languages.String), &r.Languages) 199 + } 200 + if languageAt.Valid { 201 + t := time.UnixMilli(languageAt.Int64) 202 + r.LanguageAt = &t 203 + } 204 + return r, nil 205 + }
+61
store/star.go
··· 1 + package store 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "time" 7 + 8 + "tangled.sh/chown.de/tangled-repo-firehose/ent" 9 + "tangled.sh/chown.de/tangled-repo-firehose/ent/star" 10 + ) 11 + 12 + // ApplyStarCreate inserts a star (idempotent on at_uri) and advances the 13 + // cursor in a single transaction. 14 + func (s *Store) ApplyStarCreate(ctx context.Context, atURI, subject string, createdAt time.Time, cur int64) error { 15 + return withTx(ctx, s.client, func(tx *ent.Tx) error { 16 + err := tx.Star.Create(). 17 + SetID(atURI). 18 + SetSubject(subject). 19 + SetCreatedAt(createdAt.UnixMilli()). 20 + OnConflict(). 21 + DoNothing(). 22 + Exec(ctx) 23 + // ent's DoNothing returns sql.ErrNoRows when nothing was inserted — 24 + // safe to ignore; the row already exists. 25 + if err != nil && err != sql.ErrNoRows { 26 + return err 27 + } 28 + return advanceCursor(ctx, tx, cur) 29 + }) 30 + } 31 + 32 + // ApplyStarDelete removes a star row and advances the cursor in a single txn. 33 + func (s *Store) ApplyStarDelete(ctx context.Context, atURI string, cur int64) error { 34 + return withTx(ctx, s.client, func(tx *ent.Tx) error { 35 + if _, err := tx.Star.Delete().Where(star.IDEQ(atURI)).Exec(ctx); err != nil { 36 + return err 37 + } 38 + return advanceCursor(ctx, tx, cur) 39 + }) 40 + } 41 + 42 + // StarCount returns the number of star records held for a repo's at-uri. 43 + func (s *Store) StarCount(ctx context.Context, repoAtURI string) (int, error) { 44 + return s.client.Star.Query().Where(star.SubjectEQ(repoAtURI)).Count(ctx) 45 + } 46 + 47 + // UpsertStar inserts a star outside the firehose path — used by the stars 48 + // backfill worker. Idempotent on at-uri; does not advance the cursor. 49 + func (s *Store) UpsertStar(ctx context.Context, atURI, subject string, createdAt time.Time) error { 50 + err := s.client.Star.Create(). 51 + SetID(atURI). 52 + SetSubject(subject). 53 + SetCreatedAt(createdAt.UnixMilli()). 54 + OnConflict(). 55 + DoNothing(). 56 + Exec(ctx) 57 + if err != nil && err != sql.ErrNoRows { 58 + return err 59 + } 60 + return nil 61 + }
+1 -644
store/store.go
··· 1 1 // Package store is the persistence layer for the Tangled repo firehose 2 2 // aggregator. Schema is defined in ent/schema/ and generated via 3 - // `go generate ./ent`. Auto-migration runs on Open and is additive only — 4 - // existing data is never dropped. 3 + // `go generate ./ent`. Auto-migration runs on Open. 5 4 package store 6 5 7 6 import ( 8 7 "context" 9 8 "database/sql" 10 - "encoding/json" 11 9 "fmt" 12 10 "io" 13 - "sort" 14 11 "time" 15 12 16 13 "entgo.io/ent/dialect" 17 14 entsql "entgo.io/ent/dialect/sql" 18 15 19 16 "tangled.sh/chown.de/tangled-repo-firehose/ent" 20 - "tangled.sh/chown.de/tangled-repo-firehose/ent/cursor" 21 - "tangled.sh/chown.de/tangled-repo-firehose/ent/discoveryfailed" 22 - "tangled.sh/chown.de/tangled-repo-firehose/ent/handle" 23 - "tangled.sh/chown.de/tangled-repo-firehose/ent/knot" 24 - "tangled.sh/chown.de/tangled-repo-firehose/ent/repo" 25 - "tangled.sh/chown.de/tangled-repo-firehose/ent/star" 26 17 27 18 _ "modernc.org/sqlite" 28 19 ) 29 20 30 21 // dsn assembles the modernc.org/sqlite connection string. PRAGMAs go in the 31 22 // query string so they're applied per-connection (the pool may open many). 32 - // ent's migrator checks foreign_keys at startup so it must be set here. 33 23 func dsn(path string) string { 34 24 return path + "?_pragma=journal_mode(WAL)&_pragma=synchronous(NORMAL)&_pragma=busy_timeout(5000)&_pragma=foreign_keys(1)" 35 25 } ··· 48 38 return client.Schema.WriteTo(ctx, w) 49 39 } 50 40 51 - 52 41 // Repo mirrors a sh.tangled.repo record plus enrichment columns. 53 42 type Repo struct { 54 43 AtURI string ··· 103 92 104 93 func (s *Store) Close() error { return s.client.Close() } 105 94 106 - // GetCursor returns the persisted Jetstream cursor (unix microseconds), or 0 107 - // if none has been recorded yet. 108 - func (s *Store) GetCursor(ctx context.Context) (int64, error) { 109 - row, err := s.client.Cursor.Query().Where(cursor.IDEQ(1)).Only(ctx) 110 - if ent.IsNotFound(err) { 111 - return 0, nil 112 - } 113 - if err != nil { 114 - return 0, err 115 - } 116 - return row.Cursor, nil 117 - } 118 - 119 - // ApplyRepoUpsert writes (or replaces) a repo row and advances the cursor in 120 - // a single transaction. Languages column is intentionally untouched so async 121 - // language enrichment isn't clobbered by a record update. 122 - func (s *Store) ApplyRepoUpsert(ctx context.Context, r Repo, cur int64) error { 123 - return withTx(ctx, s.client, func(tx *ent.Tx) error { 124 - if err := upsertRepo(ctx, tx, r); err != nil { 125 - return err 126 - } 127 - return advanceCursor(ctx, tx, cur) 128 - }) 129 - } 130 - 131 - // ApplyRepoDelete removes a repo and advances the cursor in a single txn. 132 - func (s *Store) ApplyRepoDelete(ctx context.Context, atURI string, cur int64) error { 133 - return withTx(ctx, s.client, func(tx *ent.Tx) error { 134 - if _, err := tx.Repo.Delete().Where(repo.IDEQ(atURI)).Exec(ctx); err != nil { 135 - return err 136 - } 137 - return advanceCursor(ctx, tx, cur) 138 - }) 139 - } 140 - 141 - // AdvanceCursor moves the cursor forward without applying any record. Used 142 - // for non-tracked event kinds so we don't replay them on restart. 143 - func (s *Store) AdvanceCursor(ctx context.Context, cur int64) error { 144 - return withTx(ctx, s.client, func(tx *ent.Tx) error { 145 - return advanceCursor(ctx, tx, cur) 146 - }) 147 - } 148 - 149 - func advanceCursor(ctx context.Context, tx *ent.Tx, cur int64) error { 150 - // Never move the cursor backward — out-of-order replays would otherwise 151 - // rewind progress. 152 - existing, err := tx.Cursor.Query().Where(cursor.IDEQ(1)).Only(ctx) 153 - now := time.Now().UnixMilli() 154 - if ent.IsNotFound(err) { 155 - return tx.Cursor.Create().SetID(1).SetCursor(cur).SetUpdated(now).Exec(ctx) 156 - } 157 - if err != nil { 158 - return err 159 - } 160 - if cur <= existing.Cursor { 161 - return nil 162 - } 163 - return tx.Cursor.UpdateOneID(1).SetCursor(cur).SetUpdated(now).Exec(ctx) 164 - } 165 - 166 - // UpsertRepo writes (or replaces) a repo row outside the firehose path — 167 - // used by discovery when a star references a repo we haven't seen yet. Does 168 - // NOT touch the cursor. 169 - func (s *Store) UpsertRepo(ctx context.Context, r Repo) error { 170 - tx, err := s.client.Tx(ctx) 171 - if err != nil { 172 - return err 173 - } 174 - defer func() { _ = tx.Rollback() }() 175 - if err := upsertRepo(ctx, tx, r); err != nil { 176 - return err 177 - } 178 - return tx.Commit() 179 - } 180 - 181 - func upsertRepo(ctx context.Context, tx *ent.Tx, r Repo) error { 182 - topicsJSON, err := json.Marshal(r.Topics) 183 - if err != nil { 184 - return err 185 - } 186 - topics := string(topicsJSON) 187 - create := tx.Repo.Create(). 188 - SetID(r.AtURI). 189 - SetDid(r.DID). 190 - SetRkey(r.Rkey). 191 - SetName(r.Name). 192 - SetKnot(r.Knot). 193 - SetCreatedAt(r.CreatedAt.UnixMilli()). 194 - SetSeenAt(r.SeenAt.UnixMilli()). 195 - SetTopics(topics). 196 - SetDescription(r.Description). 197 - SetWebsite(r.Website). 198 - SetSource(r.Source). 199 - SetSpindle(r.Spindle). 200 - SetRepoDid(r.RepoDID) 201 - return create.OnConflict(). 202 - UpdateNewValues(). 203 - Exec(ctx) 204 - } 205 - 206 - // HasRepo reports whether a repo at-uri already exists in the index. 207 - func (s *Store) HasRepo(ctx context.Context, atURI string) (bool, error) { 208 - return s.client.Repo.Query().Where(repo.IDEQ(atURI)).Exist(ctx) 209 - } 210 - 211 - // MarkDiscoveryFailed records that a PDS returned RecordNotFound for this 212 - // at-uri so we don't keep retrying every backfill cycle. 213 - func (s *Store) MarkDiscoveryFailed(ctx context.Context, atURI, reason string) error { 214 - return s.client.DiscoveryFailed.Create(). 215 - SetID(atURI). 216 - SetFailedAt(time.Now().UnixMilli()). 217 - SetReason(reason). 218 - OnConflict(). 219 - UpdateNewValues(). 220 - Exec(ctx) 221 - } 222 - 223 - // DiscoveryFailedRecently reports whether a previous discovery attempt failed 224 - // within the given window — used to skip known-deleted records on retry. 225 - func (s *Store) DiscoveryFailedRecently(ctx context.Context, atURI string, maxAge time.Duration) (bool, error) { 226 - cutoff := time.Now().Add(-maxAge).UnixMilli() 227 - return s.client.DiscoveryFailed.Query(). 228 - Where(discoveryfailed.IDEQ(atURI), discoveryfailed.FailedAtGTE(cutoff)). 229 - Exist(ctx) 230 - } 231 - 232 - // ApplyStarCreate inserts a star (idempotent on at_uri) and advances the 233 - // cursor in a single transaction. 234 - func (s *Store) ApplyStarCreate(ctx context.Context, atURI, subject string, createdAt time.Time, cur int64) error { 235 - return withTx(ctx, s.client, func(tx *ent.Tx) error { 236 - err := tx.Star.Create(). 237 - SetID(atURI). 238 - SetSubject(subject). 239 - SetCreatedAt(createdAt.UnixMilli()). 240 - OnConflict(). 241 - DoNothing(). 242 - Exec(ctx) 243 - // ent's DoNothing returns sql.ErrNoRows when nothing was inserted — 244 - // safe to ignore; the row already exists. 245 - if err != nil && err != sql.ErrNoRows { 246 - return err 247 - } 248 - return advanceCursor(ctx, tx, cur) 249 - }) 250 - } 251 - 252 - // ApplyStarDelete removes a star row and advances the cursor in a single txn. 253 - func (s *Store) ApplyStarDelete(ctx context.Context, atURI string, cur int64) error { 254 - return withTx(ctx, s.client, func(tx *ent.Tx) error { 255 - if _, err := tx.Star.Delete().Where(star.IDEQ(atURI)).Exec(ctx); err != nil { 256 - return err 257 - } 258 - return advanceCursor(ctx, tx, cur) 259 - }) 260 - } 261 - 262 - // StarCount returns the number of star records held for a repo's at-uri. 263 - func (s *Store) StarCount(ctx context.Context, repoAtURI string) (int, error) { 264 - return s.client.Star.Query().Where(star.SubjectEQ(repoAtURI)).Count(ctx) 265 - } 266 - 267 - // UpsertStar inserts a star outside the firehose path — used by the stars 268 - // backfill worker. Idempotent on at-uri; does not advance the cursor. 269 - func (s *Store) UpsertStar(ctx context.Context, atURI, subject string, createdAt time.Time) error { 270 - err := s.client.Star.Create(). 271 - SetID(atURI). 272 - SetSubject(subject). 273 - SetCreatedAt(createdAt.UnixMilli()). 274 - OnConflict(). 275 - DoNothing(). 276 - Exec(ctx) 277 - if err != nil && err != sql.ErrNoRows { 278 - return err 279 - } 280 - return nil 281 - } 282 - 283 - // AllKnownDIDs returns every DID we've encountered: repo owners plus star 284 - // authors parsed from existing star at-uris. Uses raw SQL because ent has no 285 - // clean expression for substring + UNION. 286 - func (s *Store) AllKnownDIDs(ctx context.Context, limit int) ([]string, error) { 287 - rows, err := s.db.QueryContext(ctx, ` 288 - SELECT did FROM repos 289 - UNION 290 - SELECT substr(at_uri, 6, instr(substr(at_uri, 6), '/') - 1) AS did 291 - FROM stars 292 - WHERE substr(at_uri, 6, instr(substr(at_uri, 6), '/') - 1) != '' 293 - LIMIT ? 294 - `, limit) 295 - if err != nil { 296 - return nil, err 297 - } 298 - defer rows.Close() 299 - var out []string 300 - for rows.Next() { 301 - var did string 302 - if err := rows.Scan(&did); err != nil { 303 - return nil, err 304 - } 305 - if did != "" { 306 - out = append(out, did) 307 - } 308 - } 309 - return out, rows.Err() 310 - } 311 - 312 - // UpsertHandle stores or refreshes a DID→handle mapping. 313 - func (s *Store) UpsertHandle(ctx context.Context, did, h string) error { 314 - return s.client.Handle.Create(). 315 - SetID(did). 316 - SetHandle(h). 317 - SetRefreshedAt(time.Now().UnixMilli()). 318 - OnConflict(). 319 - UpdateNewValues(). 320 - Exec(ctx) 321 - } 322 - 323 - // Handle returns the cached handle for a DID, or "" if unknown. 324 - func (s *Store) Handle(ctx context.Context, did string) (string, error) { 325 - row, err := s.client.Handle.Query().Where(handle.IDEQ(did)).Only(ctx) 326 - if ent.IsNotFound(err) { 327 - return "", nil 328 - } 329 - if err != nil { 330 - return "", err 331 - } 332 - return row.Handle, nil 333 - } 334 - 335 - // DIDsNeedingHandles returns owner DIDs from repos that have no cached handle 336 - // yet, or whose cached handle is older than maxAge. 337 - func (s *Store) DIDsNeedingHandles(ctx context.Context, maxAge time.Duration, limit int) ([]string, error) { 338 - cutoff := time.Now().Add(-maxAge).UnixMilli() 339 - rows, err := s.db.QueryContext(ctx, ` 340 - SELECT DISTINCT r.did 341 - FROM repos r 342 - LEFT JOIN handles h ON h.did = r.did 343 - WHERE h.did IS NULL OR h.refreshed_at < ? 344 - LIMIT ? 345 - `, cutoff, limit) 346 - if err != nil { 347 - return nil, err 348 - } 349 - defer rows.Close() 350 - var out []string 351 - for rows.Next() { 352 - var did string 353 - if err := rows.Scan(&did); err != nil { 354 - return nil, err 355 - } 356 - out = append(out, did) 357 - } 358 - return out, rows.Err() 359 - } 360 - 361 - // UpdateLanguages writes the languages JSON map for an existing repo. 362 - func (s *Store) UpdateLanguages(ctx context.Context, atURI string, langs map[string]int64) error { 363 - raw, err := json.Marshal(langs) 364 - if err != nil { 365 - return err 366 - } 367 - return s.client.Repo.UpdateOneID(atURI). 368 - SetLanguages(string(raw)). 369 - SetLanguageAt(time.Now().UnixMilli()). 370 - Exec(ctx) 371 - } 372 - 373 - // ReposNeedingLanguages returns repos whose language enrichment is missing or 374 - // older than maxAge. 375 - func (s *Store) ReposNeedingLanguages(ctx context.Context, maxAge time.Duration, limit int) ([]Repo, error) { 376 - cutoff := time.Now().Add(-maxAge).UnixMilli() 377 - rows, err := s.db.QueryContext(ctx, ` 378 - SELECT at_uri, did, rkey, name, knot 379 - FROM repos 380 - WHERE language_at IS NULL OR language_at < ? 381 - ORDER BY seen_at DESC 382 - LIMIT ? 383 - `, cutoff, limit) 384 - if err != nil { 385 - return nil, err 386 - } 387 - defer rows.Close() 388 - var out []Repo 389 - for rows.Next() { 390 - var r Repo 391 - if err := rows.Scan(&r.AtURI, &r.DID, &r.Rkey, &r.Name, &r.Knot); err != nil { 392 - return nil, err 393 - } 394 - out = append(out, r) 395 - } 396 - return out, rows.Err() 397 - } 398 - 399 - // PrimaryLanguages returns the sorted set of "primary" languages observed 400 - // across all enriched repos — i.e., the highest-bytes language per repo, 401 - // deduplicated. Used to populate the language filter dropdown. 402 - func (s *Store) PrimaryLanguages(ctx context.Context) ([]string, error) { 403 - rows, err := s.db.QueryContext(ctx, `SELECT languages FROM repos WHERE languages IS NOT NULL AND languages != '' AND languages != '{}'`) 404 - if err != nil { 405 - return nil, err 406 - } 407 - defer rows.Close() 408 - seen := make(map[string]struct{}) 409 - for rows.Next() { 410 - var raw string 411 - if err := rows.Scan(&raw); err != nil { 412 - return nil, err 413 - } 414 - var langs map[string]int64 415 - if err := json.Unmarshal([]byte(raw), &langs); err != nil { 416 - continue 417 - } 418 - var best string 419 - var bestN int64 420 - for k, v := range langs { 421 - if v > bestN { 422 - bestN, best = v, k 423 - } 424 - } 425 - if best != "" { 426 - seen[best] = struct{}{} 427 - } 428 - } 429 - if err := rows.Err(); err != nil { 430 - return nil, err 431 - } 432 - out := make([]string, 0, len(seen)) 433 - for k := range seen { 434 - out = append(out, k) 435 - } 436 - sort.Strings(out) 437 - return out, nil 438 - } 439 - 440 - // ReposFilter narrows the result of RecentRepos. Zero values mean "no 441 - // filter" so callers can mix and match. 442 - type ReposFilter struct { 443 - Language string // primary language match (post-filter in Go over JSON map) 444 - Since time.Time // repos created at or after this time (zero = no time floor) 445 - ForksOnly bool // only rows with non-empty source 446 - NoForks bool // exclude rows with non-empty source 447 - } 448 - 449 - // RecentRepos returns repos matching the filter, sorted newest first. Pass 450 - // limit <= 0 for no limit. Uses raw SQL because the scalar star-count 451 - // subquery and handle LEFT JOIN are awkward to express via ent's query API. 452 - func (s *Store) RecentRepos(ctx context.Context, f ReposFilter, limit int) ([]Repo, error) { 453 - candidateLimit := -1 454 - if limit > 0 { 455 - candidateLimit = limit 456 - if f.Language != "" { 457 - candidateLimit = limit * 5 458 - } 459 - } 460 - forksOnly := 0 461 - if f.ForksOnly { 462 - forksOnly = 1 463 - } 464 - noForks := 0 465 - if f.NoForks { 466 - noForks = 1 467 - } 468 - rows, err := s.db.QueryContext(ctx, ` 469 - SELECT r.at_uri, r.did, r.rkey, r.name, r.knot, r.description, r.topics, 470 - r.website, r.source, r.spindle, r.repo_did, r.created_at, r.seen_at, 471 - r.languages, r.language_at, COALESCE(h.handle, ''), 472 - (SELECT COUNT(*) FROM stars s WHERE s.subject = r.at_uri) AS star_count 473 - FROM repos r 474 - LEFT JOIN handles h ON h.did = r.did 475 - WHERE r.created_at >= ? 476 - AND (? = '' OR r.languages IS NOT NULL) 477 - AND (? = 0 OR (r.source IS NOT NULL AND r.source != '')) 478 - AND (? = 0 OR r.source IS NULL OR r.source = '') 479 - ORDER BY r.created_at DESC 480 - LIMIT ? 481 - `, f.Since.UnixMilli(), f.Language, forksOnly, noForks, candidateLimit) 482 - if err != nil { 483 - return nil, err 484 - } 485 - defer rows.Close() 486 - 487 - var out []Repo 488 - for rows.Next() { 489 - r, err := scanRepo(rows) 490 - if err != nil { 491 - return nil, err 492 - } 493 - if f.Language != "" && r.Primary() != f.Language { 494 - continue 495 - } 496 - out = append(out, r) 497 - if limit > 0 && len(out) >= limit { 498 - break 499 - } 500 - } 501 - return out, rows.Err() 502 - } 503 - 504 - // KnotEntry is one row in the knots view. 505 - type KnotEntry struct { 506 - Host string 507 - FirstSeen time.Time 508 - LastSeen time.Time 509 - LastOK *time.Time 510 - LastError *time.Time 511 - LastErrorMsg string 512 - ConsecutiveErrors int 513 - Disabled bool 514 - Repos int 515 - } 516 - 517 - // Knots returns every knot host with bookkeeping plus a count of repos 518 - // hosted on it. 519 - func (s *Store) Knots(ctx context.Context) ([]KnotEntry, error) { 520 - knots, err := s.client.Knot.Query(). 521 - Order(ent.Desc(knot.FieldLastSeen)). 522 - All(ctx) 523 - if err != nil { 524 - return nil, err 525 - } 526 - out := make([]KnotEntry, 0, len(knots)) 527 - for _, k := range knots { 528 - repos, err := s.client.Repo.Query().Where(repo.KnotEQ(k.ID)).Count(ctx) 529 - if err != nil { 530 - return nil, err 531 - } 532 - entry := KnotEntry{ 533 - Host: k.ID, 534 - FirstSeen: time.UnixMilli(k.FirstSeen), 535 - LastSeen: time.UnixMilli(k.LastSeen), 536 - ConsecutiveErrors: k.ConsecutiveErrors, 537 - Disabled: k.Disabled != 0, 538 - Repos: repos, 539 - } 540 - if k.LastOkAt != nil { 541 - t := time.UnixMilli(*k.LastOkAt) 542 - entry.LastOK = &t 543 - } 544 - if k.LastErrorAt != nil { 545 - t := time.UnixMilli(*k.LastErrorAt) 546 - entry.LastError = &t 547 - } 548 - if k.LastError != nil { 549 - entry.LastErrorMsg = *k.LastError 550 - } 551 - out = append(out, entry) 552 - } 553 - // Sort by repos desc, then host — done in Go since it's a derived count. 554 - for i := range out { 555 - for j := i + 1; j < len(out); j++ { 556 - if out[j].Repos > out[i].Repos || (out[j].Repos == out[i].Repos && out[j].Host < out[i].Host) { 557 - out[i], out[j] = out[j], out[i] 558 - } 559 - } 560 - } 561 - return out, nil 562 - } 563 - 564 - // UpsertKnot records that we've seen this host, refreshing last_seen. 565 - func (s *Store) UpsertKnot(ctx context.Context, host string) error { 566 - if host == "" { 567 - return nil 568 - } 569 - now := time.Now().UnixMilli() 570 - return s.client.Knot.Create(). 571 - SetID(host). 572 - SetFirstSeen(now). 573 - SetLastSeen(now). 574 - OnConflict(). 575 - Update(func(u *ent.KnotUpsert) { 576 - u.SetLastSeen(now) 577 - }). 578 - Exec(ctx) 579 - } 580 - 581 - // MarkKnotOK resets consecutive_errors on a successful call. 582 - func (s *Store) MarkKnotOK(ctx context.Context, host string) error { 583 - if host == "" { 584 - return nil 585 - } 586 - now := time.Now().UnixMilli() 587 - return s.client.Knot.Create(). 588 - SetID(host). 589 - SetFirstSeen(now). 590 - SetLastSeen(now). 591 - SetLastOkAt(now). 592 - OnConflict(). 593 - Update(func(u *ent.KnotUpsert) { 594 - u.SetLastSeen(now) 595 - u.SetLastOkAt(now) 596 - u.SetConsecutiveErrors(0) 597 - }). 598 - Exec(ctx) 599 - } 600 - 601 - // MarkKnotError records a failure and increments consecutive_errors. 602 - func (s *Store) MarkKnotError(ctx context.Context, host, reason string) error { 603 - if host == "" { 604 - return nil 605 - } 606 - if len(reason) > 500 { 607 - reason = reason[:500] 608 - } 609 - now := time.Now().UnixMilli() 610 - // ent has no SQL-side "consecutive_errors + 1" expression in upsert, so 611 - // we drop to raw SQL for the increment. Insert path uses ent. 612 - _, err := s.db.ExecContext(ctx, ` 613 - INSERT INTO knots (host, first_seen, last_seen, last_error_at, last_error, consecutive_errors, disabled) 614 - VALUES (?, ?, ?, ?, ?, 1, 0) 615 - ON CONFLICT(host) DO UPDATE SET 616 - last_error_at = excluded.last_error_at, 617 - last_error = excluded.last_error, 618 - last_seen = excluded.last_seen, 619 - consecutive_errors = consecutive_errors + 1 620 - `, host, now, now, now, reason) 621 - return err 622 - } 623 - 624 - // KnotCircuitOpen reports whether further calls to a host should be skipped. 625 - func (s *Store) KnotCircuitOpen(ctx context.Context, host string, minErrors int, cooldown time.Duration) (bool, error) { 626 - if host == "" { 627 - return false, nil 628 - } 629 - k, err := s.client.Knot.Query().Where(knot.IDEQ(host)).Only(ctx) 630 - if ent.IsNotFound(err) { 631 - return false, nil 632 - } 633 - if err != nil { 634 - return false, err 635 - } 636 - if k.Disabled != 0 { 637 - return true, nil 638 - } 639 - if k.ConsecutiveErrors < minErrors || k.LastErrorAt == nil { 640 - return false, nil 641 - } 642 - return time.UnixMilli(*k.LastErrorAt).Add(cooldown).After(time.Now()), nil 643 - } 644 - 645 - // SetKnotDisabled toggles the knot's disabled flag. 646 - func (s *Store) SetKnotDisabled(ctx context.Context, host string, disabled bool) error { 647 - if host == "" { 648 - return nil 649 - } 650 - v := 0 651 - if disabled { 652 - v = 1 653 - } 654 - return s.client.Knot.UpdateOneID(host).SetDisabled(v).Exec(ctx) 655 - } 656 - 657 - // HandleEntry is one row in the handles view. 658 - type HandleEntry struct { 659 - DID string 660 - Handle string 661 - RefreshedAt time.Time 662 - Repos int 663 - } 664 - 665 - // Handles returns every resolved DID→handle mapping with repo count. Pass 666 - // limit <= 0 for no limit. 667 - func (s *Store) Handles(ctx context.Context, limit int) ([]HandleEntry, error) { 668 - sqlLimit := -1 669 - if limit > 0 { 670 - sqlLimit = limit 671 - } 672 - rows, err := s.db.QueryContext(ctx, ` 673 - SELECT h.did, h.handle, h.refreshed_at, 674 - (SELECT COUNT(*) FROM repos r WHERE r.did = h.did) AS repos 675 - FROM handles h 676 - ORDER BY repos DESC, h.handle 677 - LIMIT ? 678 - `, sqlLimit) 679 - if err != nil { 680 - return nil, err 681 - } 682 - defer rows.Close() 683 - var out []HandleEntry 684 - for rows.Next() { 685 - var ( 686 - h HandleEntry 687 - refreshed int64 688 - ) 689 - if err := rows.Scan(&h.DID, &h.Handle, &refreshed, &h.Repos); err != nil { 690 - return nil, err 691 - } 692 - h.RefreshedAt = time.UnixMilli(refreshed) 693 - out = append(out, h) 694 - } 695 - return out, rows.Err() 696 - } 697 - 698 - // scanRepo reads a row from the SQL projection used by RecentRepos. 699 - func scanRepo(rows *sql.Rows) (Repo, error) { 700 - var ( 701 - r Repo 702 - description sql.NullString 703 - topicsRaw sql.NullString 704 - website sql.NullString 705 - source sql.NullString 706 - spindle sql.NullString 707 - repoDID sql.NullString 708 - languages sql.NullString 709 - languageAt sql.NullInt64 710 - createdAt int64 711 - seenAt int64 712 - ) 713 - if err := rows.Scan(&r.AtURI, &r.DID, &r.Rkey, &r.Name, &r.Knot, 714 - &description, &topicsRaw, &website, &source, &spindle, &repoDID, 715 - &createdAt, &seenAt, &languages, &languageAt, &r.Handle, &r.StarCount); err != nil { 716 - return r, err 717 - } 718 - r.Description = description.String 719 - r.Website = website.String 720 - r.Source = source.String 721 - r.Spindle = spindle.String 722 - r.RepoDID = repoDID.String 723 - r.CreatedAt = time.UnixMilli(createdAt) 724 - r.SeenAt = time.UnixMilli(seenAt) 725 - if topicsRaw.Valid && topicsRaw.String != "" && topicsRaw.String != "null" { 726 - _ = json.Unmarshal([]byte(topicsRaw.String), &r.Topics) 727 - } 728 - if languages.Valid && languages.String != "" { 729 - _ = json.Unmarshal([]byte(languages.String), &r.Languages) 730 - } 731 - if languageAt.Valid { 732 - t := time.UnixMilli(languageAt.Int64) 733 - r.LanguageAt = &t 734 - } 735 - return r, nil 736 - } 737 - 738 95 // withTx runs fn in a transaction, committing on success or rolling back on error. 739 96 func withTx(ctx context.Context, client *ent.Client, fn func(*ent.Tx) error) error { 740 97 tx, err := client.Tx(ctx)

History

1 round 0 comments
sign up or login to add to the discussion
chown.de submitted #0
1 commit
expand
chore: refactor store
merge conflicts detected
expand
  • firehose/worker.go:27
  • store/store.go:1
expand 0 comments