Lewis: May this revision serve well! lewis@tangled.org
+942
-83
Diff
round #0
+5
-5
appview/indexer/issues/indexer.go
+5
-5
appview/indexer/issues/indexer.go
···
31
31
unicodeNormalizeName = "uicodeNormalize"
32
32
33
33
// Bump this when the index mapping changes to trigger a rebuild.
34
-
issueIndexerVersion = 3
34
+
issueIndexerVersion = 4
35
35
)
36
36
37
37
type Indexer struct {
···
85
85
docMapping.AddFieldMappingsAt("title", textFieldMapping)
86
86
docMapping.AddFieldMappingsAt("body", textFieldMapping)
87
87
88
-
docMapping.AddFieldMappingsAt("repo_at", keywordFieldMapping)
88
+
docMapping.AddFieldMappingsAt("repo_did", keywordFieldMapping)
89
89
docMapping.AddFieldMappingsAt("is_open", boolFieldMapping)
90
90
docMapping.AddFieldMappingsAt("author_did", keywordFieldMapping)
91
91
docMapping.AddFieldMappingsAt("labels", keywordFieldMapping)
···
185
185
186
186
type issueData struct {
187
187
ID int64 `json:"id"`
188
-
RepoAt string `json:"repo_at"`
188
+
RepoDid string `json:"repo_did"`
189
189
IssueID int `json:"issue_id"`
190
190
Title string `json:"title"`
191
191
Body string `json:"body"`
···
200
200
func makeIssueData(issue *models.Issue) *issueData {
201
201
return &issueData{
202
202
ID: issue.Id,
203
-
RepoAt: issue.RepoAt.String(),
203
+
RepoDid: string(issue.RepoDid),
204
204
IssueID: issue.IssueId,
205
205
Title: issue.Title,
206
206
Body: issue.Body,
···
274
274
))
275
275
}
276
276
277
-
musts = append(musts, bleveutil.KeywordFieldQuery("repo_at", opts.RepoAt))
277
+
musts = append(musts, bleveutil.KeywordFieldQuery("repo_did", opts.RepoDid))
278
278
if opts.IsOpen != nil {
279
279
musts = append(musts, bleveutil.BoolFieldQuery("is_open", *opts.IsOpen))
280
280
}
+30
-30
appview/indexer/issues/indexer_test.go
+30
-30
appview/indexer/issues/indexer_test.go
···
67
67
ctx := context.Background()
68
68
69
69
err := ix.Index(ctx,
70
-
models.Issue{Id: 1, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Fix login bug", Body: "Users cannot login", Open: true, Did: "did:plc:alice", Labels: makeLabelState("bug")},
71
-
models.Issue{Id: 2, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Add dark mode", Body: "Implement dark theme", Open: true, Did: "did:plc:bob", Labels: makeLabelState("feature")},
72
-
models.Issue{Id: 3, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Fix login timeout", Body: "Login takes too long", Open: false, Did: "did:plc:alice", Labels: makeLabelState("bug")},
70
+
models.Issue{Id: 1, RepoDid: "did:plc:testrepo", Title: "Fix login bug", Body: "Users cannot login", Open: true, Did: "did:plc:alice", Labels: makeLabelState("bug")},
71
+
models.Issue{Id: 2, RepoDid: "did:plc:testrepo", Title: "Add dark mode", Body: "Implement dark theme", Open: true, Did: "did:plc:bob", Labels: makeLabelState("feature")},
72
+
models.Issue{Id: 3, RepoDid: "did:plc:testrepo", Title: "Fix login timeout", Body: "Login takes too long", Open: false, Did: "did:plc:alice", Labels: makeLabelState("bug")},
73
73
)
74
74
require.NoError(t, err)
75
75
76
76
opts := func() models.IssueSearchOptions {
77
77
return models.IssueSearchOptions{
78
-
RepoAt: "at://did:plc:test/sh.tangled.repo/abc",
79
-
IsOpen: boolPtr(true),
80
-
Page: pagination.Page{Limit: 10},
78
+
RepoDid: "did:plc:testrepo",
79
+
IsOpen: boolPtr(true),
80
+
Page: pagination.Page{Limit: 10},
81
81
}
82
82
}
83
83
···
148
148
ctx := context.Background()
149
149
150
150
err := ix.Index(ctx,
151
-
models.Issue{Id: 1, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Issue 1", Body: "Body", Open: true, Did: "did:plc:alice", Labels: makeLabelState("bug")},
152
-
models.Issue{Id: 2, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Issue 2", Body: "Body", Open: true, Did: "did:plc:bob", Labels: makeLabelState("bug", "urgent")},
151
+
models.Issue{Id: 1, RepoDid: "did:plc:testrepo", Title: "Issue 1", Body: "Body", Open: true, Did: "did:plc:alice", Labels: makeLabelState("bug")},
152
+
models.Issue{Id: 2, RepoDid: "did:plc:testrepo", Title: "Issue 2", Body: "Body", Open: true, Did: "did:plc:bob", Labels: makeLabelState("bug", "urgent")},
153
153
)
154
154
require.NoError(t, err)
155
155
156
156
result, err := ix.Search(ctx, models.IssueSearchOptions{
157
-
RepoAt: "at://did:plc:test/sh.tangled.repo/abc",
158
-
IsOpen: boolPtr(true),
159
-
Labels: []string{"bug", "urgent"},
160
-
Page: pagination.Page{Limit: 10},
157
+
RepoDid: "did:plc:testrepo",
158
+
IsOpen: boolPtr(true),
159
+
Labels: []string{"bug", "urgent"},
160
+
Page: pagination.Page{Limit: 10},
161
161
})
162
162
require.NoError(t, err)
163
163
assert.Equal(t, uint64(1), result.Total)
···
171
171
ctx := context.Background()
172
172
173
173
err := ix.Index(ctx,
174
-
models.Issue{Id: 1, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Fix login bug", Body: "Users cannot login", Open: true, Did: "did:plc:alice", Labels: makeLabelState("bug")},
175
-
models.Issue{Id: 2, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Add dark mode", Body: "Implement dark theme", Open: true, Did: "did:plc:bob", Labels: makeLabelState("feature")},
176
-
models.Issue{Id: 3, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Fix timeout bug", Body: "Timeout on save", Open: true, Did: "did:plc:alice", Labels: makeLabelState("bug", "urgent")},
174
+
models.Issue{Id: 1, RepoDid: "did:plc:testrepo", Title: "Fix login bug", Body: "Users cannot login", Open: true, Did: "did:plc:alice", Labels: makeLabelState("bug")},
175
+
models.Issue{Id: 2, RepoDid: "did:plc:testrepo", Title: "Add dark mode", Body: "Implement dark theme", Open: true, Did: "did:plc:bob", Labels: makeLabelState("feature")},
176
+
models.Issue{Id: 3, RepoDid: "did:plc:testrepo", Title: "Fix timeout bug", Body: "Timeout on save", Open: true, Did: "did:plc:alice", Labels: makeLabelState("bug", "urgent")},
177
177
)
178
178
require.NoError(t, err)
179
179
180
180
opts := func() models.IssueSearchOptions {
181
181
return models.IssueSearchOptions{
182
-
RepoAt: "at://did:plc:test/sh.tangled.repo/abc",
183
-
IsOpen: boolPtr(true),
184
-
Page: pagination.Page{Limit: 10},
182
+
RepoDid: "did:plc:testrepo",
183
+
IsOpen: boolPtr(true),
184
+
Page: pagination.Page{Limit: 10},
185
185
}
186
186
}
187
187
···
227
227
ctx := context.Background()
228
228
229
229
err := ix.Index(ctx,
230
-
models.Issue{Id: 1, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Fix login bug", Body: "Users cannot login", Open: true, Did: "did:plc:alice"},
231
-
models.Issue{Id: 2, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Add dark mode", Body: "Implement dark theme", Open: true, Did: "did:plc:bob"},
232
-
models.Issue{Id: 3, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Fix timeout bug", Body: "Timeout on save", Open: true, Did: "did:plc:alice"},
230
+
models.Issue{Id: 1, RepoDid: "did:plc:testrepo", Title: "Fix login bug", Body: "Users cannot login", Open: true, Did: "did:plc:alice"},
231
+
models.Issue{Id: 2, RepoDid: "did:plc:testrepo", Title: "Add dark mode", Body: "Implement dark theme", Open: true, Did: "did:plc:bob"},
232
+
models.Issue{Id: 3, RepoDid: "did:plc:testrepo", Title: "Fix timeout bug", Body: "Timeout on save", Open: true, Did: "did:plc:alice"},
233
233
)
234
234
require.NoError(t, err)
235
235
···
244
244
require.Equal(t, []string{"dark theme"}, negatedPhrases)
245
245
246
246
result, err := ix.Search(ctx, models.IssueSearchOptions{
247
-
RepoAt: "at://did:plc:test/sh.tangled.repo/abc",
247
+
RepoDid: "did:plc:testrepo",
248
248
IsOpen: boolPtr(true),
249
249
NegatedPhrases: negatedPhrases,
250
250
Page: pagination.Page{Limit: 10},
···
261
261
ctx := context.Background()
262
262
263
263
err := ix.Index(ctx,
264
-
models.Issue{Id: 1, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Issue", Body: "Body", Open: true, Did: "did:plc:alice"},
264
+
models.Issue{Id: 1, RepoDid: "did:plc:testrepo", Title: "Issue", Body: "Body", Open: true, Did: "did:plc:alice"},
265
265
)
266
266
require.NoError(t, err)
267
267
268
268
result, err := ix.Search(ctx, models.IssueSearchOptions{
269
269
Keywords: []string{"nonexistent"},
270
-
RepoAt: "at://did:plc:test/sh.tangled.repo/abc",
270
+
RepoDid: "did:plc:testrepo",
271
271
IsOpen: boolPtr(true),
272
272
Page: pagination.Page{Limit: 10},
273
273
})
···
283
283
ctx := context.Background()
284
284
285
285
err := ix.Index(ctx,
286
-
models.Issue{Id: 1, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "High priority bug", Body: "Urgent", Open: true, Did: "did:plc:alice",
286
+
models.Issue{Id: 1, RepoDid: "did:plc:testrepo", Title: "High priority bug", Body: "Urgent", Open: true, Did: "did:plc:alice",
287
287
Labels: makeLabelState("bug", "priority=high")},
288
-
models.Issue{Id: 2, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Low priority feature", Body: "Nice to have", Open: true, Did: "did:plc:bob",
288
+
models.Issue{Id: 2, RepoDid: "did:plc:testrepo", Title: "Low priority feature", Body: "Nice to have", Open: true, Did: "did:plc:bob",
289
289
Labels: makeLabelState("feature", "priority=low")},
290
-
models.Issue{Id: 3, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "High priority feature", Body: "Important", Open: true, Did: "did:plc:alice",
290
+
models.Issue{Id: 3, RepoDid: "did:plc:testrepo", Title: "High priority feature", Body: "Important", Open: true, Did: "did:plc:alice",
291
291
Labels: makeLabelState("feature", "priority=high")},
292
292
)
293
293
require.NoError(t, err)
294
294
295
295
opts := func() models.IssueSearchOptions {
296
296
return models.IssueSearchOptions{
297
-
RepoAt: "at://did:plc:test/sh.tangled.repo/abc",
298
-
IsOpen: boolPtr(true),
299
-
Page: pagination.Page{Limit: 10},
297
+
RepoDid: "did:plc:testrepo",
298
+
IsOpen: boolPtr(true),
299
+
Page: pagination.Page{Limit: 10},
300
300
}
301
301
}
302
302
+19
-11
appview/indexer/notifier.go
+19
-11
appview/indexer/notifier.go
···
4
4
"context"
5
5
6
6
"github.com/bluesky-social/indigo/atproto/syntax"
7
-
"tangled.org/core/api/tangled"
8
7
"tangled.org/core/appview/db"
9
8
"tangled.org/core/appview/models"
10
9
"tangled.org/core/appview/notify"
···
14
13
15
14
var _ notify.Notifier = &Indexer{}
16
15
17
-
func (ix *Indexer) getAndReindexRepo(ctx context.Context, repoAt syntax.ATURI) {
18
-
l := log.FromContext(ctx).With("notifier", "indexer", "repo_at", repoAt)
16
+
func (ix *Indexer) getAndReindexRepo(ctx context.Context, repoDid syntax.DID) {
17
+
l := log.FromContext(ctx).With("notifier", "indexer", "repo_did", repoDid)
19
18
20
-
repo, err := db.GetRepo(ix.Db, orm.FilterEq("at_uri", repoAt.String()))
19
+
repo, err := db.GetRepo(ix.Db, orm.FilterEq("repo_did", string(repoDid)))
21
20
if err != nil {
22
21
l.Error("failed to get repo for reindexing", "err", err)
23
22
return
···
39
38
}
40
39
41
40
l.Debug("reindexing repo after new issue")
42
-
ix.getAndReindexRepo(ctx, issue.RepoAt)
41
+
ix.getAndReindexRepo(ctx, issue.RepoDid)
43
42
}
44
43
45
44
func (ix *Indexer) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) {
···
61
60
}
62
61
63
62
l.Debug("reindexing repo after issue deletion")
64
-
ix.getAndReindexRepo(ctx, issue.RepoAt)
63
+
ix.getAndReindexRepo(ctx, issue.RepoDid)
65
64
}
66
65
67
66
func (ix *Indexer) NewIssueLabelOp(ctx context.Context, issue *models.Issue) {
···
92
91
}
93
92
94
93
l.Debug("reindexing repo after new pull")
95
-
ix.getAndReindexRepo(ctx, pull.RepoAt)
94
+
ix.getAndReindexRepo(ctx, pull.RepoDid)
96
95
}
97
96
98
97
func (ix *Indexer) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {
···
113
112
}
114
113
}
115
114
115
+
func (ix *Indexer) RenameRepo(ctx context.Context, actor syntax.DID, oldRepo, newRepo *models.Repo) {
116
+
l := log.FromContext(ctx).With("notifier", "indexer", "repo", newRepo.RepoIdentifier(), "actor", actor, "old_name", oldRepo.Name, "new_name", newRepo.Name)
117
+
l.Debug("reindexing repo after rename")
118
+
err := ix.Repos.Index(ctx, *newRepo)
119
+
if err != nil {
120
+
l.Error("failed to reindex repo", "err", err)
121
+
}
122
+
}
123
+
116
124
func (ix *Indexer) DeleteRepo(ctx context.Context, repo *models.Repo) {
117
125
l := log.FromContext(ctx).With("notifier", "indexer", "repo", repo)
118
126
l.Debug("deleting repo from index")
···
125
133
func (ix *Indexer) NewStar(ctx context.Context, star *models.Star) {
126
134
l := log.FromContext(ctx).With("notifier", "indexer", "star", star)
127
135
128
-
if star.RepoAt.Collection().String() != tangled.RepoNSID {
136
+
if star.SubjectType != models.StarSubjectRepo {
129
137
return
130
138
}
131
139
132
140
l.Debug("reindexing repo after new star")
133
-
ix.getAndReindexRepo(ctx, star.RepoAt)
141
+
ix.getAndReindexRepo(ctx, syntax.DID(star.Subject))
134
142
}
135
143
136
144
func (ix *Indexer) DeleteStar(ctx context.Context, star *models.Star) {
137
145
l := log.FromContext(ctx).With("notifier", "indexer", "star", star)
138
146
139
-
if star.RepoAt.Collection().String() != tangled.RepoNSID {
147
+
if star.SubjectType != models.StarSubjectRepo {
140
148
return
141
149
}
142
150
143
151
l.Debug("reindexing repo after star deletion")
144
-
ix.getAndReindexRepo(ctx, star.RepoAt)
152
+
ix.getAndReindexRepo(ctx, syntax.DID(star.Subject))
145
153
}
+5
-5
appview/indexer/pulls/indexer.go
+5
-5
appview/indexer/pulls/indexer.go
···
30
30
unicodeNormalizeName = "uicodeNormalize"
31
31
32
32
// Bump this when the index mapping changes to trigger a rebuild.
33
-
pullIndexerVersion = 3
33
+
pullIndexerVersion = 4
34
34
)
35
35
36
36
type Indexer struct {
···
80
80
docMapping.AddFieldMappingsAt("title", textFieldMapping)
81
81
docMapping.AddFieldMappingsAt("body", textFieldMapping)
82
82
83
-
docMapping.AddFieldMappingsAt("repo_at", keywordFieldMapping)
83
+
docMapping.AddFieldMappingsAt("repo_did", keywordFieldMapping)
84
84
docMapping.AddFieldMappingsAt("state", keywordFieldMapping)
85
85
docMapping.AddFieldMappingsAt("author_did", keywordFieldMapping)
86
86
docMapping.AddFieldMappingsAt("labels", keywordFieldMapping)
···
180
180
181
181
type pullData struct {
182
182
ID int64 `json:"id"`
183
-
RepoAt string `json:"repo_at"`
183
+
RepoDid string `json:"repo_did"`
184
184
PullID int `json:"pull_id"`
185
185
Title string `json:"title"`
186
186
Body string `json:"body"`
···
195
195
func makePullData(pull *models.Pull) *pullData {
196
196
return &pullData{
197
197
ID: int64(pull.ID),
198
-
RepoAt: pull.RepoAt.String(),
198
+
RepoDid: string(pull.RepoDid),
199
199
PullID: pull.PullId,
200
200
Title: pull.Title,
201
201
Body: pull.Body,
···
275
275
))
276
276
}
277
277
278
-
musts = append(musts, bleveutil.KeywordFieldQuery("repo_at", opts.RepoAt))
278
+
musts = append(musts, bleveutil.KeywordFieldQuery("repo_did", opts.RepoDid))
279
279
if opts.State != nil {
280
280
musts = append(musts, bleveutil.KeywordFieldQuery("state", opts.State.String()))
281
281
}
+4
-4
appview/indexer/repos/indexer.go
+4
-4
appview/indexer/repos/indexer.go
···
34
34
unicodeNormalizeName = "unicodeNormalize"
35
35
36
36
// Bump this when the index mapping changes to trigger a rebuild.
37
-
repoIndexerVersion = 6
37
+
repoIndexerVersion = 7
38
38
)
39
39
40
40
type Indexer struct {
···
120
120
docMapping.AddFieldMappingsAt("topics_exact", caseInsensitiveKeywordMapping)
121
121
docMapping.AddFieldMappingsAt("did", keywordFieldMapping)
122
122
docMapping.AddFieldMappingsAt("knot", keywordFieldMapping)
123
-
docMapping.AddFieldMappingsAt("repo_at", keywordFieldMapping)
123
+
docMapping.AddFieldMappingsAt("repo_did", keywordFieldMapping)
124
124
125
125
// fork indicator for down-ranking
126
126
docMapping.AddFieldMappingsAt("is_fork", booleanFieldMapping)
···
258
258
259
259
type repoData struct {
260
260
ID int64 `json:"id"`
261
-
RepoAt string `json:"repo_at"`
261
+
RepoDid string `json:"repo_did"`
262
262
Did string `json:"did"`
263
263
Name string `json:"name"`
264
264
NameTrigram string `json:"name_trigram"`
···
294
294
295
295
return &repoData{
296
296
ID: repo.Id,
297
-
RepoAt: repo.RepoAt().String(),
297
+
RepoDid: repo.RepoDid,
298
298
Did: repo.Did,
299
299
Name: repo.Name,
300
300
NameTrigram: repo.Name,
+90
-28
appview/ingester.go
+90
-28
appview/ingester.go
···
12
12
"net/http"
13
13
"net/url"
14
14
"slices"
15
+
"strings"
15
16
"sync"
16
17
17
18
"time"
···
27
28
"tangled.org/core/appview/config"
28
29
"tangled.org/core/appview/db"
29
30
"tangled.org/core/appview/models"
31
+
"tangled.org/core/appview/notify"
30
32
"tangled.org/core/appview/serververify"
31
33
"tangled.org/core/appview/validator"
32
34
"tangled.org/core/idresolver"
···
42
44
Config *config.Config
43
45
Logger *slog.Logger
44
46
Validator *validator.Validator
47
+
Notifier notify.Notifier
45
48
}
46
49
47
50
type processFunc func(ctx context.Context, e *jmodels.Event) error
···
97
100
err = i.ingestLabelDefinition(e)
98
101
case tangled.LabelOpNSID:
99
102
err = i.ingestLabelOp(e)
103
+
case tangled.RepoNSID:
104
+
err = i.ingestRepo(ctx, e)
100
105
}
101
106
l = i.Logger.With("nsid", e.Commit.Collection)
102
107
}
···
114
119
}
115
120
}
116
121
122
+
func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) {
123
+
if strings.HasPrefix(ref, "did:") {
124
+
return db.GetRepoByDid(i.Db, ref)
125
+
}
126
+
return db.GetRepoByAtUri(i.Db, ref)
127
+
}
128
+
129
+
func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) {
130
+
var legacy struct {
131
+
Subject *string `json:"subject"`
132
+
SubjectDid *string `json:"subjectDid"`
133
+
}
134
+
if err := json.Unmarshal(raw, &legacy); err != nil {
135
+
return false, err
136
+
}
137
+
138
+
switch {
139
+
case legacy.SubjectDid != nil:
140
+
repo, err := i.resolveRepoRef(*legacy.SubjectDid)
141
+
if err != nil {
142
+
l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid)
143
+
return false, nil
144
+
}
145
+
star.SubjectType = models.StarSubjectRepo
146
+
star.Subject = repo.RepoDid
147
+
return true, nil
148
+
149
+
case legacy.Subject != nil:
150
+
uri, err := syntax.ParseATURI(*legacy.Subject)
151
+
if err != nil {
152
+
return false, fmt.Errorf("invalid old-format star subject: %w", err)
153
+
}
154
+
switch uri.Collection().String() {
155
+
case tangled.RepoNSID:
156
+
repo, err := db.GetRepoByAtUri(i.Db, uri.String())
157
+
if err != nil {
158
+
l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject)
159
+
return false, nil
160
+
}
161
+
star.SubjectType = models.StarSubjectRepo
162
+
star.Subject = repo.RepoDid
163
+
return true, nil
164
+
default:
165
+
star.SubjectType = models.StarSubjectString
166
+
star.Subject = *legacy.Subject
167
+
return true, nil
168
+
}
169
+
170
+
default:
171
+
return false, fmt.Errorf("old-format star has neither subject nor subjectDid")
172
+
}
173
+
}
174
+
117
175
func (i *Ingester) ingestStar(e *jmodels.Event) error {
118
176
var err error
119
177
did := e.Did
···
123
181
124
182
switch e.Commit.Operation {
125
183
case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
126
-
var subjectUri syntax.ATURI
127
-
128
184
raw := json.RawMessage(e.Commit.Record)
129
185
record := tangled.FeedStar{}
130
-
err := json.Unmarshal(raw, &record)
131
-
if err != nil {
132
-
l.Error("invalid record", "err", err)
133
-
return err
134
-
}
186
+
unmarshalErr := json.Unmarshal(raw, &record)
135
187
136
188
star := &models.Star{
137
189
Did: did,
···
139
191
}
140
192
141
193
switch {
142
-
case record.SubjectDid != nil:
143
-
repo, repoErr := db.GetRepo(i.Db, orm.FilterEq("repo_did", *record.SubjectDid))
144
-
if repoErr == nil {
145
-
subjectUri = repo.RepoAt()
146
-
star.RepoAt = subjectUri
194
+
case unmarshalErr != nil:
195
+
resolved, resolveErr := i.resolveOldFormatStar(raw, star, l)
196
+
if resolveErr != nil {
197
+
l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr)
198
+
return unmarshalErr
147
199
}
148
-
case record.Subject != nil:
149
-
subjectUri, err = syntax.ParseATURI(*record.Subject)
150
-
if err != nil {
151
-
l.Error("invalid record", "err", err)
152
-
return err
200
+
if !resolved {
201
+
return nil
153
202
}
154
-
star.RepoAt = subjectUri
155
-
repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String())
156
-
if repoErr == nil && repo.RepoDid != "" {
157
-
if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.FeedStarNSID, e.Commit.RKey, *record.Subject); enqErr != nil {
158
-
l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
159
-
}
203
+
204
+
case record.Subject == nil:
205
+
return fmt.Errorf("star record has nil subject")
206
+
207
+
case record.Subject.FeedStar_Repo != nil:
208
+
repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did)
209
+
if repoErr != nil {
210
+
l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did)
211
+
return nil
160
212
}
213
+
star.SubjectType = models.StarSubjectRepo
214
+
star.Subject = repo.RepoDid
215
+
216
+
case record.Subject.FeedStar_String != nil:
217
+
star.SubjectType = models.StarSubjectString
218
+
star.Subject = record.Subject.FeedStar_String.Uri
219
+
161
220
default:
162
-
l.Error("star record has neither subject nor subjectDid")
163
-
return fmt.Errorf("star record has neither subject nor subjectDid")
221
+
return fmt.Errorf("star record has empty subject union")
164
222
}
223
+
165
224
err = db.AddStar(i.Db, star)
166
225
case jmodels.CommitOperationDelete:
167
226
err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey)
···
373
432
repoDid = *record.RepoDid
374
433
}
375
434
if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil {
376
-
if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey, *record.Repo); enqErr != nil {
435
+
if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey); enqErr != nil {
377
436
l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid)
378
437
}
379
438
}
···
386
445
artifact := models.Artifact{
387
446
Did: did,
388
447
Rkey: e.Commit.RKey,
389
-
RepoAt: repo.RepoAt(),
448
+
RepoDid: syntax.DID(repo.RepoDid),
390
449
Tag: plumbing.Hash(record.Tag),
391
450
CreatedAt: createdAt,
392
451
BlobCid: cid.Cid(record.Artifact.Ref),
···
955
1014
956
1015
issue := models.IssueFromRecord(did, rkey, record)
957
1016
958
-
if issue.RepoAt == "" {
1017
+
if issue.RepoDid == "" {
959
1018
return fmt.Errorf("issue record has no repo field")
960
1019
}
1020
+
if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil {
1021
+
return fmt.Errorf("issue record repo field is not a valid DID: %w", err)
1022
+
}
961
1023
962
1024
if err := i.Validator.ValidateIssue(&issue); err != nil {
963
1025
return fmt.Errorf("failed to validate issue: %w", err)
+332
appview/ingester_repo.go
+332
appview/ingester_repo.go
···
1
+
package appview
2
+
3
+
import (
4
+
"context"
5
+
"database/sql"
6
+
"encoding/json"
7
+
"errors"
8
+
"fmt"
9
+
"slices"
10
+
11
+
"github.com/bluesky-social/indigo/atproto/syntax"
12
+
jmodels "github.com/bluesky-social/jetstream/pkg/models"
13
+
"tangled.org/core/api/tangled"
14
+
"tangled.org/core/appview/db"
15
+
"tangled.org/core/appview/models"
16
+
"tangled.org/core/orm"
17
+
)
18
+
19
+
func (i *Ingester) ingestRepo(ctx context.Context, e *jmodels.Event) error {
20
+
l := i.Logger.With("handler", "ingestRepo", "did", e.Did, "rkey", e.Commit.RKey)
21
+
22
+
switch e.Commit.Operation {
23
+
case jmodels.CommitOperationCreate:
24
+
return i.ingestRepoCreate(ctx, e)
25
+
case jmodels.CommitOperationUpdate:
26
+
return i.ingestRepoUpdate(ctx, e)
27
+
case jmodels.CommitOperationDelete:
28
+
return i.ingestRepoDelete(ctx, e)
29
+
default:
30
+
l.Info("unknown repo operation", "op", e.Commit.Operation)
31
+
return nil
32
+
}
33
+
}
34
+
35
+
func (i *Ingester) ingestRepoCreate(ctx context.Context, e *jmodels.Event) error {
36
+
l := i.Logger.With("handler", "ingestRepoCreate", "did", e.Did, "rkey", e.Commit.RKey)
37
+
38
+
record := tangled.Repo{}
39
+
if err := json.Unmarshal(json.RawMessage(e.Commit.Record), &record); err != nil {
40
+
l.Error("invalid record", "err", err)
41
+
return err
42
+
}
43
+
44
+
if record.RepoDid == nil || *record.RepoDid == "" {
45
+
l.Info("skipping repo create from non-DID-migrated knot")
46
+
return nil
47
+
}
48
+
repoDid := *record.RepoDid
49
+
50
+
_, err := db.GetRepo(i.Db,
51
+
orm.FilterEq("did", e.Did),
52
+
orm.FilterEq("rkey", e.Commit.RKey),
53
+
)
54
+
if err == nil {
55
+
l.Info("repo row already exists, skipping create", "did", e.Did, "rkey", e.Commit.RKey)
56
+
return nil
57
+
}
58
+
if !errors.Is(err, sql.ErrNoRows) {
59
+
return fmt.Errorf("failed to check existing repo: %w", err)
60
+
}
61
+
62
+
prev, err := db.GetRepoByDid(i.Db, repoDid)
63
+
if err != nil && !errors.Is(err, sql.ErrNoRows) {
64
+
return fmt.Errorf("failed to check existing repoDid: %w", err)
65
+
}
66
+
67
+
if prev != nil {
68
+
l.Info("repoDid exists under different rkey, renaming",
69
+
"oldRkey", prev.Rkey, "newRkey", e.Commit.RKey)
70
+
71
+
oldRepo := *prev
72
+
73
+
tx, txErr := i.Db.Begin()
74
+
if txErr != nil {
75
+
return fmt.Errorf("failed to begin rename tx: %w", txErr)
76
+
}
77
+
defer tx.Rollback()
78
+
79
+
newName := derefString(record.Name)
80
+
if newName == "" {
81
+
newName = e.Commit.RKey
82
+
}
83
+
84
+
if err := db.RenameRepo(tx, e.Did, prev.Rkey, e.Commit.RKey, newName); err != nil {
85
+
return fmt.Errorf("failed to rename repo: %w", err)
86
+
}
87
+
if err := db.RecordRepoRename(tx, e.Did, prev.Rkey, repoDid); err != nil {
88
+
return fmt.Errorf("failed to record rename history: %w", err)
89
+
}
90
+
91
+
renamed := *prev
92
+
renamed.Rkey = e.Commit.RKey
93
+
renamed.Name = newName
94
+
desired := repoFromRecord(&renamed, &record)
95
+
if repoMetadataChanged(&renamed, &desired) {
96
+
if err := applyRepoMetadata(tx, &renamed, desired); err != nil {
97
+
return fmt.Errorf("failed to apply metadata after rename: %w", err)
98
+
}
99
+
}
100
+
101
+
if err := tx.Commit(); err != nil {
102
+
return fmt.Errorf("failed to commit rename tx: %w", err)
103
+
}
104
+
105
+
newRepo, err := db.GetRepo(i.Db,
106
+
orm.FilterEq("did", e.Did),
107
+
orm.FilterEq("rkey", e.Commit.RKey),
108
+
)
109
+
if err != nil {
110
+
l.Warn("failed to fetch repo after rename for notification", "err", err)
111
+
return nil
112
+
}
113
+
i.Notifier.RenameRepo(ctx, syntax.DID(e.Did), &oldRepo, newRepo)
114
+
return nil
115
+
}
116
+
117
+
rkey := e.Commit.RKey
118
+
name := derefString(record.Name)
119
+
if name == "" {
120
+
name = rkey
121
+
}
122
+
123
+
repo := &models.Repo{
124
+
Did: e.Did,
125
+
Name: name,
126
+
Knot: record.Knot,
127
+
Rkey: rkey,
128
+
Description: derefString(record.Description),
129
+
Website: derefString(record.Website),
130
+
Topics: append([]string(nil), record.Topics...),
131
+
Source: derefString(record.Source),
132
+
Spindle: derefString(record.Spindle),
133
+
Labels: append([]string(nil), record.Labels...),
134
+
RepoDid: repoDid,
135
+
}
136
+
137
+
tx, err := i.Db.Begin()
138
+
if err != nil {
139
+
return fmt.Errorf("failed to begin insert tx: %w", err)
140
+
}
141
+
defer tx.Rollback()
142
+
143
+
if err := db.AddRepo(tx, repo); err != nil {
144
+
return fmt.Errorf("failed to insert repo: %w", err)
145
+
}
146
+
if err := tx.Commit(); err != nil {
147
+
return fmt.Errorf("failed to commit insert tx: %w", err)
148
+
}
149
+
150
+
i.Notifier.NewRepo(ctx, repo)
151
+
return nil
152
+
}
153
+
154
+
func (i *Ingester) ingestRepoUpdate(ctx context.Context, e *jmodels.Event) error {
155
+
l := i.Logger.With("handler", "ingestRepoUpdate", "did", e.Did, "rkey", e.Commit.RKey)
156
+
157
+
record := tangled.Repo{}
158
+
if err := json.Unmarshal(json.RawMessage(e.Commit.Record), &record); err != nil {
159
+
l.Error("invalid record", "err", err)
160
+
return err
161
+
}
162
+
163
+
if record.RepoDid == nil || *record.RepoDid == "" {
164
+
l.Info("skipping repo update from non-DID-migrated knot")
165
+
return nil
166
+
}
167
+
168
+
current, err := db.GetRepo(i.Db,
169
+
orm.FilterEq("did", e.Did),
170
+
orm.FilterEq("rkey", e.Commit.RKey),
171
+
)
172
+
if err != nil {
173
+
if errors.Is(err, sql.ErrNoRows) {
174
+
l.Info("skipping repo update for unknown row")
175
+
return nil
176
+
}
177
+
return fmt.Errorf("failed to fetch repo for ingest: %w", err)
178
+
}
179
+
180
+
desired := repoFromRecord(current, &record)
181
+
182
+
if current.Source != desired.Source {
183
+
l.Warn("source field changed but mutation is unsupported, ignoring",
184
+
"current", current.Source, "desired", desired.Source)
185
+
}
186
+
187
+
if !repoMetadataChanged(current, &desired) {
188
+
return nil
189
+
}
190
+
191
+
tx, err := i.Db.Begin()
192
+
if err != nil {
193
+
return fmt.Errorf("failed to begin tx: %w", err)
194
+
}
195
+
defer tx.Rollback()
196
+
197
+
if err := applyRepoMetadata(tx, current, desired); err != nil {
198
+
return fmt.Errorf("failed to apply repo metadata: %w", err)
199
+
}
200
+
return tx.Commit()
201
+
}
202
+
203
+
func (i *Ingester) ingestRepoDelete(ctx context.Context, e *jmodels.Event) error {
204
+
l := i.Logger.With("handler", "ingestRepoDelete", "did", e.Did, "rkey", e.Commit.RKey)
205
+
206
+
repo, err := db.GetRepo(i.Db,
207
+
orm.FilterEq("did", e.Did),
208
+
orm.FilterEq("rkey", e.Commit.RKey),
209
+
)
210
+
if err != nil {
211
+
if errors.Is(err, sql.ErrNoRows) {
212
+
l.Info("skipping repo delete for unknown row")
213
+
return nil
214
+
}
215
+
return fmt.Errorf("failed to fetch repo for delete: %w", err)
216
+
}
217
+
218
+
if err := db.RemoveRepo(i.Db, e.Did, e.Commit.RKey); err != nil {
219
+
return fmt.Errorf("failed to delete repo: %w", err)
220
+
}
221
+
222
+
i.Notifier.DeleteRepo(ctx, repo)
223
+
l.Info("deleted repo row")
224
+
return nil
225
+
}
226
+
227
+
func applyRepoMetadata(tx *sql.Tx, current *models.Repo, desired models.Repo) error {
228
+
if err := db.PutRepo(tx, desired); err != nil {
229
+
return err
230
+
}
231
+
232
+
if current.Spindle != desired.Spindle {
233
+
var spindlePtr *string
234
+
if desired.Spindle != "" {
235
+
spindlePtr = &desired.Spindle
236
+
}
237
+
if err := db.UpdateSpindle(tx, desired.RepoDid, spindlePtr); err != nil {
238
+
return err
239
+
}
240
+
}
241
+
242
+
if !labelsEqual(current.Labels, desired.Labels) {
243
+
if err := reconcileLabels(tx, current, desired); err != nil {
244
+
return err
245
+
}
246
+
}
247
+
248
+
return nil
249
+
}
250
+
251
+
func reconcileLabels(tx *sql.Tx, current *models.Repo, desired models.Repo) error {
252
+
added := filterOut(desired.Labels, current.Labels)
253
+
removed := filterOut(current.Labels, desired.Labels)
254
+
255
+
if err := applyEach(added, func(l string) error {
256
+
return db.SubscribeLabel(tx, &models.RepoLabel{
257
+
RepoDid: syntax.DID(desired.RepoDid),
258
+
LabelAt: syntax.ATURI(l),
259
+
})
260
+
}); err != nil {
261
+
return err
262
+
}
263
+
264
+
return applyEach(removed, func(l string) error {
265
+
return db.UnsubscribeLabel(tx,
266
+
orm.FilterEq("repo_did", desired.RepoDid),
267
+
orm.FilterEq("label_at", l),
268
+
)
269
+
})
270
+
}
271
+
272
+
func filterOut(items, exclude []string) []string {
273
+
return slices.DeleteFunc(slices.Clone(items), func(s string) bool {
274
+
return slices.Contains(exclude, s)
275
+
})
276
+
}
277
+
278
+
func applyEach(items []string, fn func(string) error) error {
279
+
for _, item := range items {
280
+
if err := fn(item); err != nil {
281
+
return err
282
+
}
283
+
}
284
+
return nil
285
+
}
286
+
287
+
func labelsEqual(a, b []string) bool {
288
+
if len(a) != len(b) {
289
+
return false
290
+
}
291
+
aSorted := append([]string(nil), a...)
292
+
bSorted := append([]string(nil), b...)
293
+
slices.Sort(aSorted)
294
+
slices.Sort(bSorted)
295
+
return slices.Equal(aSorted, bSorted)
296
+
}
297
+
298
+
func repoFromRecord(current *models.Repo, record *tangled.Repo) models.Repo {
299
+
out := *current
300
+
out.Name = derefString(record.Name)
301
+
if out.Name == "" {
302
+
out.Name = current.Rkey
303
+
}
304
+
out.Knot = record.Knot
305
+
out.Description = derefString(record.Description)
306
+
out.Website = derefString(record.Website)
307
+
out.Topics = append([]string(nil), record.Topics...)
308
+
out.Spindle = derefString(record.Spindle)
309
+
out.Source = derefString(record.Source)
310
+
out.Labels = append([]string(nil), record.Labels...)
311
+
if record.RepoDid != nil {
312
+
out.RepoDid = *record.RepoDid
313
+
}
314
+
return out
315
+
}
316
+
317
+
func repoMetadataChanged(current *models.Repo, desired *models.Repo) bool {
318
+
return current.Name != desired.Name ||
319
+
current.Knot != desired.Knot ||
320
+
current.Description != desired.Description ||
321
+
current.Website != desired.Website ||
322
+
current.TopicStr() != desired.TopicStr() ||
323
+
current.Spindle != desired.Spindle ||
324
+
!labelsEqual(current.Labels, desired.Labels)
325
+
}
326
+
327
+
func derefString(s *string) string {
328
+
if s == nil {
329
+
return ""
330
+
}
331
+
return *s
332
+
}
+457
appview/ingester_repo_test.go
+457
appview/ingester_repo_test.go
···
1
+
package appview
2
+
3
+
import (
4
+
"context"
5
+
"database/sql"
6
+
"encoding/json"
7
+
"errors"
8
+
"io"
9
+
"log/slog"
10
+
"path/filepath"
11
+
"testing"
12
+
13
+
"github.com/bluesky-social/indigo/atproto/syntax"
14
+
jmodels "github.com/bluesky-social/jetstream/pkg/models"
15
+
"tangled.org/core/api/tangled"
16
+
"tangled.org/core/appview/db"
17
+
"tangled.org/core/appview/models"
18
+
"tangled.org/core/appview/notify"
19
+
"tangled.org/core/orm"
20
+
)
21
+
22
+
type spyNotifier struct {
23
+
notify.BaseNotifier
24
+
creates int
25
+
deletes int
26
+
renames int
27
+
}
28
+
29
+
func (s *spyNotifier) NewRepo(_ context.Context, _ *models.Repo) { s.creates++ }
30
+
func (s *spyNotifier) DeleteRepo(_ context.Context, _ *models.Repo) { s.deletes++ }
31
+
func (s *spyNotifier) RenameRepo(_ context.Context, _ syntax.DID, _, _ *models.Repo) {
32
+
s.renames++
33
+
}
34
+
35
+
func newTestIngester(t *testing.T) (*Ingester, *spyNotifier) {
36
+
t.Helper()
37
+
path := filepath.Join(t.TempDir(), "test.db")
38
+
d, err := db.Make(context.Background(), path)
39
+
if err != nil {
40
+
t.Fatalf("db.Make: %v", err)
41
+
}
42
+
t.Cleanup(func() { d.Close() })
43
+
44
+
spy := &spyNotifier{}
45
+
ing := &Ingester{
46
+
Db: d,
47
+
Logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
48
+
Notifier: spy,
49
+
}
50
+
return ing, spy
51
+
}
52
+
53
+
func seedRepoRow(t *testing.T, ing *Ingester, did, knot, name, rkey, repoDid string) *models.Repo {
54
+
t.Helper()
55
+
tx, err := ing.Db.Begin()
56
+
if err != nil {
57
+
t.Fatalf("Begin: %v", err)
58
+
}
59
+
repo := &models.Repo{
60
+
Did: did,
61
+
Name: name,
62
+
Knot: knot,
63
+
Rkey: rkey,
64
+
RepoDid: repoDid,
65
+
}
66
+
if err := db.AddRepo(tx, repo); err != nil {
67
+
t.Fatalf("AddRepo: %v", err)
68
+
}
69
+
if err := tx.Commit(); err != nil {
70
+
t.Fatalf("Commit: %v", err)
71
+
}
72
+
return repo
73
+
}
74
+
75
+
func ptr[T any](v T) *T { return &v }
76
+
77
+
func makeEvent(t *testing.T, op string, did, rkey string, record tangled.Repo) *jmodels.Event {
78
+
t.Helper()
79
+
raw, err := json.Marshal(record)
80
+
if err != nil {
81
+
t.Fatalf("marshal record: %v", err)
82
+
}
83
+
return &jmodels.Event{
84
+
Did: did,
85
+
Kind: jmodels.EventKindCommit,
86
+
Commit: &jmodels.Commit{
87
+
Operation: op,
88
+
Collection: tangled.RepoNSID,
89
+
RKey: rkey,
90
+
Record: raw,
91
+
},
92
+
}
93
+
}
94
+
95
+
func makeDeleteEvent(did, rkey string) *jmodels.Event {
96
+
return &jmodels.Event{
97
+
Did: did,
98
+
Kind: jmodels.EventKindCommit,
99
+
Commit: &jmodels.Commit{
100
+
Operation: jmodels.CommitOperationDelete,
101
+
Collection: tangled.RepoNSID,
102
+
RKey: rkey,
103
+
},
104
+
}
105
+
}
106
+
107
+
func loadRepo(t *testing.T, ing *Ingester, did, rkey string) *models.Repo {
108
+
t.Helper()
109
+
r, err := db.GetRepo(ing.Db,
110
+
orm.FilterEq("did", did),
111
+
orm.FilterEq("rkey", rkey),
112
+
)
113
+
if err != nil {
114
+
t.Fatalf("GetRepo: %v", err)
115
+
}
116
+
return r
117
+
}
118
+
119
+
func TestIngestRepo_CreateInsertsNewRow(t *testing.T) {
120
+
ing, spy := newTestIngester(t)
121
+
122
+
e := makeEvent(t, jmodels.CommitOperationCreate, "did:plc:akshay", "myrepo", tangled.Repo{
123
+
Knot: "knot.example",
124
+
Name: ptr("MyRepo"),
125
+
Description: ptr("a test repo"),
126
+
RepoDid: ptr("did:plc:repo1"),
127
+
})
128
+
129
+
if err := ing.ingestRepo(context.Background(), e); err != nil {
130
+
t.Fatalf("ingestRepo: %v", err)
131
+
}
132
+
133
+
r := loadRepo(t, ing, "did:plc:akshay", "myrepo")
134
+
if r.Name != "MyRepo" {
135
+
t.Errorf("name = %q, want %q", r.Name, "MyRepo")
136
+
}
137
+
if r.Description != "a test repo" {
138
+
t.Errorf("description = %q", r.Description)
139
+
}
140
+
if r.RepoDid != "did:plc:repo1" {
141
+
t.Errorf("repoDid = %q", r.RepoDid)
142
+
}
143
+
if spy.creates != 1 {
144
+
t.Errorf("NewRepo called %d times, want 1", spy.creates)
145
+
}
146
+
}
147
+
148
+
func TestIngestRepo_CreateSkipsIfRowExists(t *testing.T) {
149
+
ing, spy := newTestIngester(t)
150
+
seedRepoRow(t, ing, "did:plc:akshay", "knot.example", "myrepo", "myrepo", "did:plc:repo1")
151
+
152
+
e := makeEvent(t, jmodels.CommitOperationCreate, "did:plc:akshay", "myrepo", tangled.Repo{
153
+
Knot: "knot.example",
154
+
Name: ptr("myrepo"),
155
+
RepoDid: ptr("did:plc:repo1"),
156
+
})
157
+
158
+
if err := ing.ingestRepo(context.Background(), e); err != nil {
159
+
t.Fatalf("ingestRepo: %v", err)
160
+
}
161
+
if spy.creates != 0 {
162
+
t.Errorf("row already exists, NewRepo should not be called but was called %d times", spy.creates)
163
+
}
164
+
}
165
+
166
+
func TestIngestRepo_CreateCascadesRename(t *testing.T) {
167
+
ing, spy := newTestIngester(t)
168
+
seedRepoRow(t, ing, "did:plc:akshay", "knot.example", "oldname", "oldname", "did:plc:repo1")
169
+
170
+
e := makeEvent(t, jmodels.CommitOperationCreate, "did:plc:akshay", "newname", tangled.Repo{
171
+
Knot: "knot.example",
172
+
Name: ptr("NewName"),
173
+
RepoDid: ptr("did:plc:repo1"),
174
+
})
175
+
176
+
if err := ing.ingestRepo(context.Background(), e); err != nil {
177
+
t.Fatalf("ingestRepo: %v", err)
178
+
}
179
+
180
+
_, err := db.GetRepo(ing.Db,
181
+
orm.FilterEq("did", "did:plc:akshay"),
182
+
orm.FilterEq("rkey", "oldname"),
183
+
)
184
+
if !errors.Is(err, sql.ErrNoRows) {
185
+
t.Errorf("old rkey row should be gone, got err = %v", err)
186
+
}
187
+
188
+
r := loadRepo(t, ing, "did:plc:akshay", "newname")
189
+
if r.Name != "NewName" {
190
+
t.Errorf("name = %q, want %q", r.Name, "NewName")
191
+
}
192
+
if r.RepoDid != "did:plc:repo1" {
193
+
t.Errorf("repoDid = %q", r.RepoDid)
194
+
}
195
+
196
+
hint, err := db.LookupRepoRename(ing.Db, "did:plc:akshay", "oldname")
197
+
if err != nil {
198
+
t.Fatalf("LookupRepoRename: %v", err)
199
+
}
200
+
if hint == nil {
201
+
t.Fatal("expected rename history, got nil")
202
+
}
203
+
204
+
if spy.renames != 1 {
205
+
t.Errorf("RenameRepo called %d times, want 1", spy.renames)
206
+
}
207
+
if spy.creates != 0 {
208
+
t.Errorf("rename should not create: NewRepo called %d times, want 0", spy.creates)
209
+
}
210
+
}
211
+
212
+
func TestIngestRepo_CreateNoRepoDidSkipped(t *testing.T) {
213
+
ing, spy := newTestIngester(t)
214
+
215
+
e := makeEvent(t, jmodels.CommitOperationCreate, "did:plc:akshay", "myrepo", tangled.Repo{
216
+
Knot: "knot.example",
217
+
Name: ptr("myrepo"),
218
+
})
219
+
220
+
if err := ing.ingestRepo(context.Background(), e); err != nil {
221
+
t.Fatalf("ingestRepo: %v", err)
222
+
}
223
+
if spy.creates != 0 {
224
+
t.Errorf("NewRepo called %d times, want 0", spy.creates)
225
+
}
226
+
}
227
+
228
+
func TestIngestRepo_UpdateMetadata(t *testing.T) {
229
+
ing, _ := newTestIngester(t)
230
+
seedRepoRow(t, ing, "did:plc:akshay", "knot.example", "foo", "foo", "did:plc:repo1")
231
+
232
+
e := makeEvent(t, jmodels.CommitOperationUpdate, "did:plc:akshay", "foo", tangled.Repo{
233
+
Knot: "knot.example",
234
+
Name: ptr("foo"),
235
+
Description: ptr("updated description"),
236
+
Website: ptr("https://example.com"),
237
+
Topics: []string{"go", "test"},
238
+
RepoDid: ptr("did:plc:repo1"),
239
+
})
240
+
241
+
if err := ing.ingestRepo(context.Background(), e); err != nil {
242
+
t.Fatalf("ingestRepo: %v", err)
243
+
}
244
+
245
+
r := loadRepo(t, ing, "did:plc:akshay", "foo")
246
+
if r.Description != "updated description" {
247
+
t.Errorf("description = %q", r.Description)
248
+
}
249
+
if r.Website != "https://example.com" {
250
+
t.Errorf("website = %q", r.Website)
251
+
}
252
+
if got := r.TopicStr(); got != "go test" {
253
+
t.Errorf("topics = %q", got)
254
+
}
255
+
}
256
+
257
+
func TestIngestRepo_UpdateDisplayName(t *testing.T) {
258
+
ing, _ := newTestIngester(t)
259
+
seedRepoRow(t, ing, "did:plc:akshay", "knot.example", "foo", "foo", "did:plc:repo1")
260
+
261
+
e := makeEvent(t, jmodels.CommitOperationUpdate, "did:plc:akshay", "foo", tangled.Repo{
262
+
Knot: "knot.example",
263
+
Name: ptr("Foo"),
264
+
RepoDid: ptr("did:plc:repo1"),
265
+
})
266
+
267
+
if err := ing.ingestRepo(context.Background(), e); err != nil {
268
+
t.Fatalf("ingestRepo: %v", err)
269
+
}
270
+
271
+
r := loadRepo(t, ing, "did:plc:akshay", "foo")
272
+
if r.Name != "Foo" {
273
+
t.Errorf("name = %q, want %q", r.Name, "Foo")
274
+
}
275
+
if r.Rkey != "foo" {
276
+
t.Errorf("rkey should be unchanged but got %q, want %q", r.Rkey, "foo")
277
+
}
278
+
}
279
+
280
+
func TestIngestRepo_UpdateNothingChangedNoOp(t *testing.T) {
281
+
ing, _ := newTestIngester(t)
282
+
seedRepoRow(t, ing, "did:plc:akshay", "knot.example", "foo", "foo", "did:plc:repo1")
283
+
284
+
e := makeEvent(t, jmodels.CommitOperationUpdate, "did:plc:akshay", "foo", tangled.Repo{
285
+
Knot: "knot.example",
286
+
Name: ptr("foo"),
287
+
RepoDid: ptr("did:plc:repo1"),
288
+
})
289
+
290
+
if err := ing.ingestRepo(context.Background(), e); err != nil {
291
+
t.Fatalf("ingestRepo: %v", err)
292
+
}
293
+
294
+
r := loadRepo(t, ing, "did:plc:akshay", "foo")
295
+
if r.Name != "foo" {
296
+
t.Errorf("name = %q, want unchanged %q", r.Name, "foo")
297
+
}
298
+
}
299
+
300
+
func TestIngestRepo_UnknownRowSkipped(t *testing.T) {
301
+
ops := []string{jmodels.CommitOperationUpdate, jmodels.CommitOperationDelete}
302
+
for _, op := range ops {
303
+
t.Run(op, func(t *testing.T) {
304
+
ing, _ := newTestIngester(t)
305
+
306
+
var e *jmodels.Event
307
+
switch op {
308
+
case jmodels.CommitOperationUpdate:
309
+
e = makeEvent(t, op, "did:plc:nobody", "ghost", tangled.Repo{
310
+
Knot: "knot.example",
311
+
Name: ptr("ghost"),
312
+
RepoDid: ptr("did:plc:nope"),
313
+
})
314
+
case jmodels.CommitOperationDelete:
315
+
e = makeDeleteEvent("did:plc:nobody", "ghost")
316
+
}
317
+
318
+
if err := ing.ingestRepo(context.Background(), e); err != nil {
319
+
t.Fatalf("ingestRepo: %v", err)
320
+
}
321
+
})
322
+
}
323
+
}
324
+
325
+
func TestIngestRepo_UpdateNoRepoDidSkipped(t *testing.T) {
326
+
ing, _ := newTestIngester(t)
327
+
seedRepoRow(t, ing, "did:plc:akshay", "knot.example", "foo", "foo", "did:plc:repo1")
328
+
329
+
e := makeEvent(t, jmodels.CommitOperationUpdate, "did:plc:akshay", "foo", tangled.Repo{
330
+
Knot: "knot.example",
331
+
Name: ptr("bar"),
332
+
})
333
+
334
+
if err := ing.ingestRepo(context.Background(), e); err != nil {
335
+
t.Fatalf("ingestRepo: %v", err)
336
+
}
337
+
338
+
r := loadRepo(t, ing, "did:plc:akshay", "foo")
339
+
if r.Name != "foo" {
340
+
t.Errorf("name = %q, want unchanged %q", r.Name, "foo")
341
+
}
342
+
}
343
+
344
+
func TestIngestRepo_DeleteRemovesRow(t *testing.T) {
345
+
ing, _ := newTestIngester(t)
346
+
seedRepoRow(t, ing, "did:plc:akshay", "knot.example", "foo", "foo", "did:plc:repo1")
347
+
348
+
e := makeDeleteEvent("did:plc:akshay", "foo")
349
+
if err := ing.ingestRepo(context.Background(), e); err != nil {
350
+
t.Fatalf("ingestRepo: %v", err)
351
+
}
352
+
353
+
_, err := db.GetRepo(ing.Db,
354
+
orm.FilterEq("did", "did:plc:akshay"),
355
+
orm.FilterEq("rkey", "foo"),
356
+
)
357
+
if !errors.Is(err, sql.ErrNoRows) {
358
+
t.Errorf("expected row to be deleted, got err = %v", err)
359
+
}
360
+
}
361
+
362
+
func TestIngestRepo_MalformedRecord(t *testing.T) {
363
+
ing, _ := newTestIngester(t)
364
+
365
+
e := &jmodels.Event{
366
+
Did: "did:plc:akshay",
367
+
Kind: jmodels.EventKindCommit,
368
+
Commit: &jmodels.Commit{
369
+
Operation: jmodels.CommitOperationUpdate,
370
+
Collection: tangled.RepoNSID,
371
+
RKey: "rkey1",
372
+
Record: json.RawMessage("{not json"),
373
+
},
374
+
}
375
+
376
+
if err := ing.ingestRepo(context.Background(), e); err == nil {
377
+
t.Errorf("ingestRepo with malformed record: err = nil, want error")
378
+
}
379
+
}
380
+
381
+
func TestIngestRepo_RenameDeleteSequenceNoTornState(t *testing.T) {
382
+
ing, spy := newTestIngester(t)
383
+
seedRepoRow(t, ing, "did:plc:akshay", "knot.example", "oldname", "oldname", "did:plc:repo1")
384
+
385
+
if _, err := ing.Db.Exec(
386
+
`insert into stars (did, rkey, subject_type, subject) values (?, ?, ?, ?)`,
387
+
"did:plc:boltless", "star1", "repo", "did:plc:repo1",
388
+
); err != nil {
389
+
t.Fatalf("seed star: %v", err)
390
+
}
391
+
392
+
createEvt := makeEvent(t, jmodels.CommitOperationCreate, "did:plc:akshay", "newname", tangled.Repo{
393
+
Knot: "knot.example",
394
+
Name: ptr("NewName"),
395
+
RepoDid: ptr("did:plc:repo1"),
396
+
})
397
+
if err := ing.ingestRepo(context.Background(), createEvt); err != nil {
398
+
t.Fatalf("ingest create: %v", err)
399
+
}
400
+
401
+
deleteEvt := makeDeleteEvent("did:plc:akshay", "oldname")
402
+
if err := ing.ingestRepo(context.Background(), deleteEvt); err != nil {
403
+
t.Fatalf("ingest delete: %v", err)
404
+
}
405
+
406
+
r := loadRepo(t, ing, "did:plc:akshay", "newname")
407
+
if r.Name != "NewName" {
408
+
t.Errorf("name = %q, want %q", r.Name, "NewName")
409
+
}
410
+
if r.RepoDid != "did:plc:repo1" {
411
+
t.Errorf("repoDid = %q, want %q", r.RepoDid, "did:plc:repo1")
412
+
}
413
+
414
+
_, err := db.GetRepo(ing.Db,
415
+
orm.FilterEq("did", "did:plc:akshay"),
416
+
orm.FilterEq("rkey", "oldname"),
417
+
)
418
+
if !errors.Is(err, sql.ErrNoRows) {
419
+
t.Errorf("old rkey should be gone, got err = %v", err)
420
+
}
421
+
422
+
var starSubject string
423
+
if err := ing.Db.QueryRow(`select subject from stars where did = ?`, "did:plc:boltless").Scan(&starSubject); err != nil {
424
+
t.Fatalf("query star: %v", err)
425
+
}
426
+
if starSubject != "did:plc:repo1" {
427
+
t.Errorf("star subject = %q, want %q", starSubject, "did:plc:repo1")
428
+
}
429
+
430
+
if spy.renames != 1 {
431
+
t.Errorf("RenameRepo called %d times, want 1", spy.renames)
432
+
}
433
+
if spy.creates != 0 {
434
+
t.Errorf("rename should not create: NewRepo called %d times, want 0", spy.creates)
435
+
}
436
+
if spy.deletes != 0 {
437
+
t.Errorf("old rkey already gone, DeleteRepo should not be called but was called %d times", spy.deletes)
438
+
}
439
+
}
440
+
441
+
func TestIngestRepo_CreateFallsBackToRkeyForName(t *testing.T) {
442
+
ing, _ := newTestIngester(t)
443
+
444
+
e := makeEvent(t, jmodels.CommitOperationCreate, "did:plc:akshay", "myrepo", tangled.Repo{
445
+
Knot: "knot.example",
446
+
RepoDid: ptr("did:plc:repo1"),
447
+
})
448
+
449
+
if err := ing.ingestRepo(context.Background(), e); err != nil {
450
+
t.Fatalf("ingestRepo: %v", err)
451
+
}
452
+
453
+
r := loadRepo(t, ing, "did:plc:akshay", "myrepo")
454
+
if r.Name != "myrepo" {
455
+
t.Errorf("name should fall back to rkey: got %q, want %q", r.Name, "myrepo")
456
+
}
457
+
}
History
1 round
0 comments
oyster.cafe
submitted
#0
1 commit
expand
collapse
appview: ingester and indexers use repoDID
Lewis: May this revision serve well! <lewis@tangled.org>
no conflicts, ready to merge