Monorepo for Tangled tangled.org
761
fork

Configure Feed

Select the types of activity you want to include in your feed.

appview: ingester and indexers use repoDID

Lewis: May this revision serve well! <lewis@tangled.org>

Lewis 10650582 00097e1d

+942 -83
+5 -5
appview/indexer/issues/indexer.go
··· 31 31 unicodeNormalizeName = "uicodeNormalize" 32 32 33 33 // Bump this when the index mapping changes to trigger a rebuild. 34 - issueIndexerVersion = 3 34 + issueIndexerVersion = 4 35 35 ) 36 36 37 37 type Indexer struct { ··· 85 85 docMapping.AddFieldMappingsAt("title", textFieldMapping) 86 86 docMapping.AddFieldMappingsAt("body", textFieldMapping) 87 87 88 - docMapping.AddFieldMappingsAt("repo_at", keywordFieldMapping) 88 + docMapping.AddFieldMappingsAt("repo_did", keywordFieldMapping) 89 89 docMapping.AddFieldMappingsAt("is_open", boolFieldMapping) 90 90 docMapping.AddFieldMappingsAt("author_did", keywordFieldMapping) 91 91 docMapping.AddFieldMappingsAt("labels", keywordFieldMapping) ··· 185 185 186 186 type issueData struct { 187 187 ID int64 `json:"id"` 188 - RepoAt string `json:"repo_at"` 188 + RepoDid string `json:"repo_did"` 189 189 IssueID int `json:"issue_id"` 190 190 Title string `json:"title"` 191 191 Body string `json:"body"` ··· 200 200 func makeIssueData(issue *models.Issue) *issueData { 201 201 return &issueData{ 202 202 ID: issue.Id, 203 - RepoAt: issue.RepoAt.String(), 203 + RepoDid: string(issue.RepoDid), 204 204 IssueID: issue.IssueId, 205 205 Title: issue.Title, 206 206 Body: issue.Body, ··· 274 274 )) 275 275 } 276 276 277 - musts = append(musts, bleveutil.KeywordFieldQuery("repo_at", opts.RepoAt)) 277 + musts = append(musts, bleveutil.KeywordFieldQuery("repo_did", opts.RepoDid)) 278 278 if opts.IsOpen != nil { 279 279 musts = append(musts, bleveutil.BoolFieldQuery("is_open", *opts.IsOpen)) 280 280 }
+30 -30
appview/indexer/issues/indexer_test.go
··· 67 67 ctx := context.Background() 68 68 69 69 err := ix.Index(ctx, 70 - models.Issue{Id: 1, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Fix login bug", Body: "Users cannot login", Open: true, Did: "did:plc:alice", Labels: makeLabelState("bug")}, 71 - models.Issue{Id: 2, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Add dark mode", Body: "Implement dark theme", Open: true, Did: "did:plc:bob", Labels: makeLabelState("feature")}, 72 - models.Issue{Id: 3, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Fix login timeout", Body: "Login takes too long", Open: false, Did: "did:plc:alice", Labels: makeLabelState("bug")}, 70 + models.Issue{Id: 1, RepoDid: "did:plc:testrepo", Title: "Fix login bug", Body: "Users cannot login", Open: true, Did: "did:plc:alice", Labels: makeLabelState("bug")}, 71 + models.Issue{Id: 2, RepoDid: "did:plc:testrepo", Title: "Add dark mode", Body: "Implement dark theme", Open: true, Did: "did:plc:bob", Labels: makeLabelState("feature")}, 72 + models.Issue{Id: 3, RepoDid: "did:plc:testrepo", Title: "Fix login timeout", Body: "Login takes too long", Open: false, Did: "did:plc:alice", Labels: makeLabelState("bug")}, 73 73 ) 74 74 require.NoError(t, err) 75 75 76 76 opts := func() models.IssueSearchOptions { 77 77 return models.IssueSearchOptions{ 78 - RepoAt: "at://did:plc:test/sh.tangled.repo/abc", 79 - IsOpen: boolPtr(true), 80 - Page: pagination.Page{Limit: 10}, 78 + RepoDid: "did:plc:testrepo", 79 + IsOpen: boolPtr(true), 80 + Page: pagination.Page{Limit: 10}, 81 81 } 82 82 } 83 83 ··· 148 148 ctx := context.Background() 149 149 150 150 err := ix.Index(ctx, 151 - models.Issue{Id: 1, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Issue 1", Body: "Body", Open: true, Did: "did:plc:alice", Labels: makeLabelState("bug")}, 152 - models.Issue{Id: 2, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Issue 2", Body: "Body", Open: true, Did: "did:plc:bob", Labels: makeLabelState("bug", "urgent")}, 151 + models.Issue{Id: 1, RepoDid: "did:plc:testrepo", Title: "Issue 1", Body: "Body", Open: true, Did: "did:plc:alice", Labels: makeLabelState("bug")}, 152 + models.Issue{Id: 2, RepoDid: "did:plc:testrepo", Title: "Issue 2", Body: "Body", Open: true, Did: "did:plc:bob", Labels: makeLabelState("bug", "urgent")}, 153 153 ) 154 154 require.NoError(t, err) 155 155 156 156 result, err := ix.Search(ctx, models.IssueSearchOptions{ 157 - RepoAt: "at://did:plc:test/sh.tangled.repo/abc", 158 - IsOpen: boolPtr(true), 159 - Labels: []string{"bug", "urgent"}, 160 - Page: pagination.Page{Limit: 10}, 157 + RepoDid: "did:plc:testrepo", 158 + IsOpen: boolPtr(true), 159 + Labels: []string{"bug", "urgent"}, 160 + Page: pagination.Page{Limit: 10}, 161 161 }) 162 162 require.NoError(t, err) 163 163 assert.Equal(t, uint64(1), result.Total) ··· 171 171 ctx := context.Background() 172 172 173 173 err := ix.Index(ctx, 174 - models.Issue{Id: 1, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Fix login bug", Body: "Users cannot login", Open: true, Did: "did:plc:alice", Labels: makeLabelState("bug")}, 175 - models.Issue{Id: 2, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Add dark mode", Body: "Implement dark theme", Open: true, Did: "did:plc:bob", Labels: makeLabelState("feature")}, 176 - models.Issue{Id: 3, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Fix timeout bug", Body: "Timeout on save", Open: true, Did: "did:plc:alice", Labels: makeLabelState("bug", "urgent")}, 174 + models.Issue{Id: 1, RepoDid: "did:plc:testrepo", Title: "Fix login bug", Body: "Users cannot login", Open: true, Did: "did:plc:alice", Labels: makeLabelState("bug")}, 175 + models.Issue{Id: 2, RepoDid: "did:plc:testrepo", Title: "Add dark mode", Body: "Implement dark theme", Open: true, Did: "did:plc:bob", Labels: makeLabelState("feature")}, 176 + models.Issue{Id: 3, RepoDid: "did:plc:testrepo", Title: "Fix timeout bug", Body: "Timeout on save", Open: true, Did: "did:plc:alice", Labels: makeLabelState("bug", "urgent")}, 177 177 ) 178 178 require.NoError(t, err) 179 179 180 180 opts := func() models.IssueSearchOptions { 181 181 return models.IssueSearchOptions{ 182 - RepoAt: "at://did:plc:test/sh.tangled.repo/abc", 183 - IsOpen: boolPtr(true), 184 - Page: pagination.Page{Limit: 10}, 182 + RepoDid: "did:plc:testrepo", 183 + IsOpen: boolPtr(true), 184 + Page: pagination.Page{Limit: 10}, 185 185 } 186 186 } 187 187 ··· 227 227 ctx := context.Background() 228 228 229 229 err := ix.Index(ctx, 230 - models.Issue{Id: 1, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Fix login bug", Body: "Users cannot login", Open: true, Did: "did:plc:alice"}, 231 - models.Issue{Id: 2, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Add dark mode", Body: "Implement dark theme", Open: true, Did: "did:plc:bob"}, 232 - models.Issue{Id: 3, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Fix timeout bug", Body: "Timeout on save", Open: true, Did: "did:plc:alice"}, 230 + models.Issue{Id: 1, RepoDid: "did:plc:testrepo", Title: "Fix login bug", Body: "Users cannot login", Open: true, Did: "did:plc:alice"}, 231 + models.Issue{Id: 2, RepoDid: "did:plc:testrepo", Title: "Add dark mode", Body: "Implement dark theme", Open: true, Did: "did:plc:bob"}, 232 + models.Issue{Id: 3, RepoDid: "did:plc:testrepo", Title: "Fix timeout bug", Body: "Timeout on save", Open: true, Did: "did:plc:alice"}, 233 233 ) 234 234 require.NoError(t, err) 235 235 ··· 244 244 require.Equal(t, []string{"dark theme"}, negatedPhrases) 245 245 246 246 result, err := ix.Search(ctx, models.IssueSearchOptions{ 247 - RepoAt: "at://did:plc:test/sh.tangled.repo/abc", 247 + RepoDid: "did:plc:testrepo", 248 248 IsOpen: boolPtr(true), 249 249 NegatedPhrases: negatedPhrases, 250 250 Page: pagination.Page{Limit: 10}, ··· 261 261 ctx := context.Background() 262 262 263 263 err := ix.Index(ctx, 264 - models.Issue{Id: 1, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Issue", Body: "Body", Open: true, Did: "did:plc:alice"}, 264 + models.Issue{Id: 1, RepoDid: "did:plc:testrepo", Title: "Issue", Body: "Body", Open: true, Did: "did:plc:alice"}, 265 265 ) 266 266 require.NoError(t, err) 267 267 268 268 result, err := ix.Search(ctx, models.IssueSearchOptions{ 269 269 Keywords: []string{"nonexistent"}, 270 - RepoAt: "at://did:plc:test/sh.tangled.repo/abc", 270 + RepoDid: "did:plc:testrepo", 271 271 IsOpen: boolPtr(true), 272 272 Page: pagination.Page{Limit: 10}, 273 273 }) ··· 283 283 ctx := context.Background() 284 284 285 285 err := ix.Index(ctx, 286 - models.Issue{Id: 1, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "High priority bug", Body: "Urgent", Open: true, Did: "did:plc:alice", 286 + models.Issue{Id: 1, RepoDid: "did:plc:testrepo", Title: "High priority bug", Body: "Urgent", Open: true, Did: "did:plc:alice", 287 287 Labels: makeLabelState("bug", "priority=high")}, 288 - models.Issue{Id: 2, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "Low priority feature", Body: "Nice to have", Open: true, Did: "did:plc:bob", 288 + models.Issue{Id: 2, RepoDid: "did:plc:testrepo", Title: "Low priority feature", Body: "Nice to have", Open: true, Did: "did:plc:bob", 289 289 Labels: makeLabelState("feature", "priority=low")}, 290 - models.Issue{Id: 3, RepoAt: "at://did:plc:test/sh.tangled.repo/abc", Title: "High priority feature", Body: "Important", Open: true, Did: "did:plc:alice", 290 + models.Issue{Id: 3, RepoDid: "did:plc:testrepo", Title: "High priority feature", Body: "Important", Open: true, Did: "did:plc:alice", 291 291 Labels: makeLabelState("feature", "priority=high")}, 292 292 ) 293 293 require.NoError(t, err) 294 294 295 295 opts := func() models.IssueSearchOptions { 296 296 return models.IssueSearchOptions{ 297 - RepoAt: "at://did:plc:test/sh.tangled.repo/abc", 298 - IsOpen: boolPtr(true), 299 - Page: pagination.Page{Limit: 10}, 297 + RepoDid: "did:plc:testrepo", 298 + IsOpen: boolPtr(true), 299 + Page: pagination.Page{Limit: 10}, 300 300 } 301 301 } 302 302
+19 -11
appview/indexer/notifier.go
··· 4 4 "context" 5 5 6 6 "github.com/bluesky-social/indigo/atproto/syntax" 7 - "tangled.org/core/api/tangled" 8 7 "tangled.org/core/appview/db" 9 8 "tangled.org/core/appview/models" 10 9 "tangled.org/core/appview/notify" ··· 14 13 15 14 var _ notify.Notifier = &Indexer{} 16 15 17 - func (ix *Indexer) getAndReindexRepo(ctx context.Context, repoAt syntax.ATURI) { 18 - l := log.FromContext(ctx).With("notifier", "indexer", "repo_at", repoAt) 16 + func (ix *Indexer) getAndReindexRepo(ctx context.Context, repoDid syntax.DID) { 17 + l := log.FromContext(ctx).With("notifier", "indexer", "repo_did", repoDid) 19 18 20 - repo, err := db.GetRepo(ix.Db, orm.FilterEq("at_uri", repoAt.String())) 19 + repo, err := db.GetRepo(ix.Db, orm.FilterEq("repo_did", string(repoDid))) 21 20 if err != nil { 22 21 l.Error("failed to get repo for reindexing", "err", err) 23 22 return ··· 39 38 } 40 39 41 40 l.Debug("reindexing repo after new issue") 42 - ix.getAndReindexRepo(ctx, issue.RepoAt) 41 + ix.getAndReindexRepo(ctx, issue.RepoDid) 43 42 } 44 43 45 44 func (ix *Indexer) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) { ··· 61 60 } 62 61 63 62 l.Debug("reindexing repo after issue deletion") 64 - ix.getAndReindexRepo(ctx, issue.RepoAt) 63 + ix.getAndReindexRepo(ctx, issue.RepoDid) 65 64 } 66 65 67 66 func (ix *Indexer) NewIssueLabelOp(ctx context.Context, issue *models.Issue) { ··· 92 91 } 93 92 94 93 l.Debug("reindexing repo after new pull") 95 - ix.getAndReindexRepo(ctx, pull.RepoAt) 94 + ix.getAndReindexRepo(ctx, pull.RepoDid) 96 95 } 97 96 98 97 func (ix *Indexer) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) { ··· 113 112 } 114 113 } 115 114 115 + func (ix *Indexer) RenameRepo(ctx context.Context, actor syntax.DID, oldRepo, newRepo *models.Repo) { 116 + l := log.FromContext(ctx).With("notifier", "indexer", "repo", newRepo.RepoIdentifier(), "actor", actor, "old_name", oldRepo.Name, "new_name", newRepo.Name) 117 + l.Debug("reindexing repo after rename") 118 + err := ix.Repos.Index(ctx, *newRepo) 119 + if err != nil { 120 + l.Error("failed to reindex repo", "err", err) 121 + } 122 + } 123 + 116 124 func (ix *Indexer) DeleteRepo(ctx context.Context, repo *models.Repo) { 117 125 l := log.FromContext(ctx).With("notifier", "indexer", "repo", repo) 118 126 l.Debug("deleting repo from index") ··· 125 133 func (ix *Indexer) NewStar(ctx context.Context, star *models.Star) { 126 134 l := log.FromContext(ctx).With("notifier", "indexer", "star", star) 127 135 128 - if star.RepoAt.Collection().String() != tangled.RepoNSID { 136 + if star.SubjectType != models.StarSubjectRepo { 129 137 return 130 138 } 131 139 132 140 l.Debug("reindexing repo after new star") 133 - ix.getAndReindexRepo(ctx, star.RepoAt) 141 + ix.getAndReindexRepo(ctx, syntax.DID(star.Subject)) 134 142 } 135 143 136 144 func (ix *Indexer) DeleteStar(ctx context.Context, star *models.Star) { 137 145 l := log.FromContext(ctx).With("notifier", "indexer", "star", star) 138 146 139 - if star.RepoAt.Collection().String() != tangled.RepoNSID { 147 + if star.SubjectType != models.StarSubjectRepo { 140 148 return 141 149 } 142 150 143 151 l.Debug("reindexing repo after star deletion") 144 - ix.getAndReindexRepo(ctx, star.RepoAt) 152 + ix.getAndReindexRepo(ctx, syntax.DID(star.Subject)) 145 153 }
+5 -5
appview/indexer/pulls/indexer.go
··· 30 30 unicodeNormalizeName = "uicodeNormalize" 31 31 32 32 // Bump this when the index mapping changes to trigger a rebuild. 33 - pullIndexerVersion = 3 33 + pullIndexerVersion = 4 34 34 ) 35 35 36 36 type Indexer struct { ··· 80 80 docMapping.AddFieldMappingsAt("title", textFieldMapping) 81 81 docMapping.AddFieldMappingsAt("body", textFieldMapping) 82 82 83 - docMapping.AddFieldMappingsAt("repo_at", keywordFieldMapping) 83 + docMapping.AddFieldMappingsAt("repo_did", keywordFieldMapping) 84 84 docMapping.AddFieldMappingsAt("state", keywordFieldMapping) 85 85 docMapping.AddFieldMappingsAt("author_did", keywordFieldMapping) 86 86 docMapping.AddFieldMappingsAt("labels", keywordFieldMapping) ··· 180 180 181 181 type pullData struct { 182 182 ID int64 `json:"id"` 183 - RepoAt string `json:"repo_at"` 183 + RepoDid string `json:"repo_did"` 184 184 PullID int `json:"pull_id"` 185 185 Title string `json:"title"` 186 186 Body string `json:"body"` ··· 195 195 func makePullData(pull *models.Pull) *pullData { 196 196 return &pullData{ 197 197 ID: int64(pull.ID), 198 - RepoAt: pull.RepoAt.String(), 198 + RepoDid: string(pull.RepoDid), 199 199 PullID: pull.PullId, 200 200 Title: pull.Title, 201 201 Body: pull.Body, ··· 275 275 )) 276 276 } 277 277 278 - musts = append(musts, bleveutil.KeywordFieldQuery("repo_at", opts.RepoAt)) 278 + musts = append(musts, bleveutil.KeywordFieldQuery("repo_did", opts.RepoDid)) 279 279 if opts.State != nil { 280 280 musts = append(musts, bleveutil.KeywordFieldQuery("state", opts.State.String())) 281 281 }
+4 -4
appview/indexer/repos/indexer.go
··· 34 34 unicodeNormalizeName = "unicodeNormalize" 35 35 36 36 // Bump this when the index mapping changes to trigger a rebuild. 37 - repoIndexerVersion = 6 37 + repoIndexerVersion = 7 38 38 ) 39 39 40 40 type Indexer struct { ··· 120 120 docMapping.AddFieldMappingsAt("topics_exact", caseInsensitiveKeywordMapping) 121 121 docMapping.AddFieldMappingsAt("did", keywordFieldMapping) 122 122 docMapping.AddFieldMappingsAt("knot", keywordFieldMapping) 123 - docMapping.AddFieldMappingsAt("repo_at", keywordFieldMapping) 123 + docMapping.AddFieldMappingsAt("repo_did", keywordFieldMapping) 124 124 125 125 // fork indicator for down-ranking 126 126 docMapping.AddFieldMappingsAt("is_fork", booleanFieldMapping) ··· 258 258 259 259 type repoData struct { 260 260 ID int64 `json:"id"` 261 - RepoAt string `json:"repo_at"` 261 + RepoDid string `json:"repo_did"` 262 262 Did string `json:"did"` 263 263 Name string `json:"name"` 264 264 NameTrigram string `json:"name_trigram"` ··· 294 294 295 295 return &repoData{ 296 296 ID: repo.Id, 297 - RepoAt: repo.RepoAt().String(), 297 + RepoDid: repo.RepoDid, 298 298 Did: repo.Did, 299 299 Name: repo.Name, 300 300 NameTrigram: repo.Name,
+90 -28
appview/ingester.go
··· 12 12 "net/http" 13 13 "net/url" 14 14 "slices" 15 + "strings" 15 16 "sync" 16 17 17 18 "time" ··· 27 28 "tangled.org/core/appview/config" 28 29 "tangled.org/core/appview/db" 29 30 "tangled.org/core/appview/models" 31 + "tangled.org/core/appview/notify" 30 32 "tangled.org/core/appview/serververify" 31 33 "tangled.org/core/appview/validator" 32 34 "tangled.org/core/idresolver" ··· 42 44 Config *config.Config 43 45 Logger *slog.Logger 44 46 Validator *validator.Validator 47 + Notifier notify.Notifier 45 48 } 46 49 47 50 type processFunc func(ctx context.Context, e *jmodels.Event) error ··· 97 100 err = i.ingestLabelDefinition(e) 98 101 case tangled.LabelOpNSID: 99 102 err = i.ingestLabelOp(e) 103 + case tangled.RepoNSID: 104 + err = i.ingestRepo(ctx, e) 100 105 } 101 106 l = i.Logger.With("nsid", e.Commit.Collection) 102 107 } ··· 114 119 } 115 120 } 116 121 122 + func (i *Ingester) resolveRepoRef(ref string) (*models.Repo, error) { 123 + if strings.HasPrefix(ref, "did:") { 124 + return db.GetRepoByDid(i.Db, ref) 125 + } 126 + return db.GetRepoByAtUri(i.Db, ref) 127 + } 128 + 129 + func (i *Ingester) resolveOldFormatStar(raw json.RawMessage, star *models.Star, l *slog.Logger) (bool, error) { 130 + var legacy struct { 131 + Subject *string `json:"subject"` 132 + SubjectDid *string `json:"subjectDid"` 133 + } 134 + if err := json.Unmarshal(raw, &legacy); err != nil { 135 + return false, err 136 + } 137 + 138 + switch { 139 + case legacy.SubjectDid != nil: 140 + repo, err := i.resolveRepoRef(*legacy.SubjectDid) 141 + if err != nil { 142 + l.Warn("skipping old-format star for unknown repo", "subjectDid", *legacy.SubjectDid) 143 + return false, nil 144 + } 145 + star.SubjectType = models.StarSubjectRepo 146 + star.Subject = repo.RepoDid 147 + return true, nil 148 + 149 + case legacy.Subject != nil: 150 + uri, err := syntax.ParseATURI(*legacy.Subject) 151 + if err != nil { 152 + return false, fmt.Errorf("invalid old-format star subject: %w", err) 153 + } 154 + switch uri.Collection().String() { 155 + case tangled.RepoNSID: 156 + repo, err := db.GetRepoByAtUri(i.Db, uri.String()) 157 + if err != nil { 158 + l.Warn("skipping old-format star for unknown repo", "subject", *legacy.Subject) 159 + return false, nil 160 + } 161 + star.SubjectType = models.StarSubjectRepo 162 + star.Subject = repo.RepoDid 163 + return true, nil 164 + default: 165 + star.SubjectType = models.StarSubjectString 166 + star.Subject = *legacy.Subject 167 + return true, nil 168 + } 169 + 170 + default: 171 + return false, fmt.Errorf("old-format star has neither subject nor subjectDid") 172 + } 173 + } 174 + 117 175 func (i *Ingester) ingestStar(e *jmodels.Event) error { 118 176 var err error 119 177 did := e.Did ··· 123 181 124 182 switch e.Commit.Operation { 125 183 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 126 - var subjectUri syntax.ATURI 127 - 128 184 raw := json.RawMessage(e.Commit.Record) 129 185 record := tangled.FeedStar{} 130 - err := json.Unmarshal(raw, &record) 131 - if err != nil { 132 - l.Error("invalid record", "err", err) 133 - return err 134 - } 186 + unmarshalErr := json.Unmarshal(raw, &record) 135 187 136 188 star := &models.Star{ 137 189 Did: did, ··· 139 191 } 140 192 141 193 switch { 142 - case record.SubjectDid != nil: 143 - repo, repoErr := db.GetRepo(i.Db, orm.FilterEq("repo_did", *record.SubjectDid)) 144 - if repoErr == nil { 145 - subjectUri = repo.RepoAt() 146 - star.RepoAt = subjectUri 194 + case unmarshalErr != nil: 195 + resolved, resolveErr := i.resolveOldFormatStar(raw, star, l) 196 + if resolveErr != nil { 197 + l.Error("invalid record", "newFmtErr", unmarshalErr, "oldFmtErr", resolveErr) 198 + return unmarshalErr 147 199 } 148 - case record.Subject != nil: 149 - subjectUri, err = syntax.ParseATURI(*record.Subject) 150 - if err != nil { 151 - l.Error("invalid record", "err", err) 152 - return err 200 + if !resolved { 201 + return nil 153 202 } 154 - star.RepoAt = subjectUri 155 - repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String()) 156 - if repoErr == nil && repo.RepoDid != "" { 157 - if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.FeedStarNSID, e.Commit.RKey, *record.Subject); enqErr != nil { 158 - l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 159 - } 203 + 204 + case record.Subject == nil: 205 + return fmt.Errorf("star record has nil subject") 206 + 207 + case record.Subject.FeedStar_Repo != nil: 208 + repo, repoErr := i.resolveRepoRef(record.Subject.FeedStar_Repo.Did) 209 + if repoErr != nil { 210 + l.Warn("skipping star for unknown repo", "did", record.Subject.FeedStar_Repo.Did) 211 + return nil 160 212 } 213 + star.SubjectType = models.StarSubjectRepo 214 + star.Subject = repo.RepoDid 215 + 216 + case record.Subject.FeedStar_String != nil: 217 + star.SubjectType = models.StarSubjectString 218 + star.Subject = record.Subject.FeedStar_String.Uri 219 + 161 220 default: 162 - l.Error("star record has neither subject nor subjectDid") 163 - return fmt.Errorf("star record has neither subject nor subjectDid") 221 + return fmt.Errorf("star record has empty subject union") 164 222 } 223 + 165 224 err = db.AddStar(i.Db, star) 166 225 case jmodels.CommitOperationDelete: 167 226 err = db.DeleteStarByRkey(i.Db, did, e.Commit.RKey) ··· 373 432 repoDid = *record.RepoDid 374 433 } 375 434 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 376 - if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey, *record.Repo); enqErr != nil { 435 + if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey); enqErr != nil { 377 436 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 378 437 } 379 438 } ··· 386 445 artifact := models.Artifact{ 387 446 Did: did, 388 447 Rkey: e.Commit.RKey, 389 - RepoAt: repo.RepoAt(), 448 + RepoDid: syntax.DID(repo.RepoDid), 390 449 Tag: plumbing.Hash(record.Tag), 391 450 CreatedAt: createdAt, 392 451 BlobCid: cid.Cid(record.Artifact.Ref), ··· 955 1014 956 1015 issue := models.IssueFromRecord(did, rkey, record) 957 1016 958 - if issue.RepoAt == "" { 1017 + if issue.RepoDid == "" { 959 1018 return fmt.Errorf("issue record has no repo field") 1019 + } 1020 + if _, err := syntax.ParseDID(string(issue.RepoDid)); err != nil { 1021 + return fmt.Errorf("issue record repo field is not a valid DID: %w", err) 960 1022 } 961 1023 962 1024 if err := i.Validator.ValidateIssue(&issue); err != nil {
+332
appview/ingester_repo.go
··· 1 + package appview 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "encoding/json" 7 + "errors" 8 + "fmt" 9 + "slices" 10 + 11 + "github.com/bluesky-social/indigo/atproto/syntax" 12 + jmodels "github.com/bluesky-social/jetstream/pkg/models" 13 + "tangled.org/core/api/tangled" 14 + "tangled.org/core/appview/db" 15 + "tangled.org/core/appview/models" 16 + "tangled.org/core/orm" 17 + ) 18 + 19 + func (i *Ingester) ingestRepo(ctx context.Context, e *jmodels.Event) error { 20 + l := i.Logger.With("handler", "ingestRepo", "did", e.Did, "rkey", e.Commit.RKey) 21 + 22 + switch e.Commit.Operation { 23 + case jmodels.CommitOperationCreate: 24 + return i.ingestRepoCreate(ctx, e) 25 + case jmodels.CommitOperationUpdate: 26 + return i.ingestRepoUpdate(ctx, e) 27 + case jmodels.CommitOperationDelete: 28 + return i.ingestRepoDelete(ctx, e) 29 + default: 30 + l.Info("unknown repo operation", "op", e.Commit.Operation) 31 + return nil 32 + } 33 + } 34 + 35 + func (i *Ingester) ingestRepoCreate(ctx context.Context, e *jmodels.Event) error { 36 + l := i.Logger.With("handler", "ingestRepoCreate", "did", e.Did, "rkey", e.Commit.RKey) 37 + 38 + record := tangled.Repo{} 39 + if err := json.Unmarshal(json.RawMessage(e.Commit.Record), &record); err != nil { 40 + l.Error("invalid record", "err", err) 41 + return err 42 + } 43 + 44 + if record.RepoDid == nil || *record.RepoDid == "" { 45 + l.Info("skipping repo create from non-DID-migrated knot") 46 + return nil 47 + } 48 + repoDid := *record.RepoDid 49 + 50 + _, err := db.GetRepo(i.Db, 51 + orm.FilterEq("did", e.Did), 52 + orm.FilterEq("rkey", e.Commit.RKey), 53 + ) 54 + if err == nil { 55 + l.Info("repo row already exists, skipping create", "did", e.Did, "rkey", e.Commit.RKey) 56 + return nil 57 + } 58 + if !errors.Is(err, sql.ErrNoRows) { 59 + return fmt.Errorf("failed to check existing repo: %w", err) 60 + } 61 + 62 + prev, err := db.GetRepoByDid(i.Db, repoDid) 63 + if err != nil && !errors.Is(err, sql.ErrNoRows) { 64 + return fmt.Errorf("failed to check existing repoDid: %w", err) 65 + } 66 + 67 + if prev != nil { 68 + l.Info("repoDid exists under different rkey, renaming", 69 + "oldRkey", prev.Rkey, "newRkey", e.Commit.RKey) 70 + 71 + oldRepo := *prev 72 + 73 + tx, txErr := i.Db.Begin() 74 + if txErr != nil { 75 + return fmt.Errorf("failed to begin rename tx: %w", txErr) 76 + } 77 + defer tx.Rollback() 78 + 79 + newName := derefString(record.Name) 80 + if newName == "" { 81 + newName = e.Commit.RKey 82 + } 83 + 84 + if err := db.RenameRepo(tx, e.Did, prev.Rkey, e.Commit.RKey, newName); err != nil { 85 + return fmt.Errorf("failed to rename repo: %w", err) 86 + } 87 + if err := db.RecordRepoRename(tx, e.Did, prev.Rkey, repoDid); err != nil { 88 + return fmt.Errorf("failed to record rename history: %w", err) 89 + } 90 + 91 + renamed := *prev 92 + renamed.Rkey = e.Commit.RKey 93 + renamed.Name = newName 94 + desired := repoFromRecord(&renamed, &record) 95 + if repoMetadataChanged(&renamed, &desired) { 96 + if err := applyRepoMetadata(tx, &renamed, desired); err != nil { 97 + return fmt.Errorf("failed to apply metadata after rename: %w", err) 98 + } 99 + } 100 + 101 + if err := tx.Commit(); err != nil { 102 + return fmt.Errorf("failed to commit rename tx: %w", err) 103 + } 104 + 105 + newRepo, err := db.GetRepo(i.Db, 106 + orm.FilterEq("did", e.Did), 107 + orm.FilterEq("rkey", e.Commit.RKey), 108 + ) 109 + if err != nil { 110 + l.Warn("failed to fetch repo after rename for notification", "err", err) 111 + return nil 112 + } 113 + i.Notifier.RenameRepo(ctx, syntax.DID(e.Did), &oldRepo, newRepo) 114 + return nil 115 + } 116 + 117 + rkey := e.Commit.RKey 118 + name := derefString(record.Name) 119 + if name == "" { 120 + name = rkey 121 + } 122 + 123 + repo := &models.Repo{ 124 + Did: e.Did, 125 + Name: name, 126 + Knot: record.Knot, 127 + Rkey: rkey, 128 + Description: derefString(record.Description), 129 + Website: derefString(record.Website), 130 + Topics: append([]string(nil), record.Topics...), 131 + Source: derefString(record.Source), 132 + Spindle: derefString(record.Spindle), 133 + Labels: append([]string(nil), record.Labels...), 134 + RepoDid: repoDid, 135 + } 136 + 137 + tx, err := i.Db.Begin() 138 + if err != nil { 139 + return fmt.Errorf("failed to begin insert tx: %w", err) 140 + } 141 + defer tx.Rollback() 142 + 143 + if err := db.AddRepo(tx, repo); err != nil { 144 + return fmt.Errorf("failed to insert repo: %w", err) 145 + } 146 + if err := tx.Commit(); err != nil { 147 + return fmt.Errorf("failed to commit insert tx: %w", err) 148 + } 149 + 150 + i.Notifier.NewRepo(ctx, repo) 151 + return nil 152 + } 153 + 154 + func (i *Ingester) ingestRepoUpdate(ctx context.Context, e *jmodels.Event) error { 155 + l := i.Logger.With("handler", "ingestRepoUpdate", "did", e.Did, "rkey", e.Commit.RKey) 156 + 157 + record := tangled.Repo{} 158 + if err := json.Unmarshal(json.RawMessage(e.Commit.Record), &record); err != nil { 159 + l.Error("invalid record", "err", err) 160 + return err 161 + } 162 + 163 + if record.RepoDid == nil || *record.RepoDid == "" { 164 + l.Info("skipping repo update from non-DID-migrated knot") 165 + return nil 166 + } 167 + 168 + current, err := db.GetRepo(i.Db, 169 + orm.FilterEq("did", e.Did), 170 + orm.FilterEq("rkey", e.Commit.RKey), 171 + ) 172 + if err != nil { 173 + if errors.Is(err, sql.ErrNoRows) { 174 + l.Info("skipping repo update for unknown row") 175 + return nil 176 + } 177 + return fmt.Errorf("failed to fetch repo for ingest: %w", err) 178 + } 179 + 180 + desired := repoFromRecord(current, &record) 181 + 182 + if current.Source != desired.Source { 183 + l.Warn("source field changed but mutation is unsupported, ignoring", 184 + "current", current.Source, "desired", desired.Source) 185 + } 186 + 187 + if !repoMetadataChanged(current, &desired) { 188 + return nil 189 + } 190 + 191 + tx, err := i.Db.Begin() 192 + if err != nil { 193 + return fmt.Errorf("failed to begin tx: %w", err) 194 + } 195 + defer tx.Rollback() 196 + 197 + if err := applyRepoMetadata(tx, current, desired); err != nil { 198 + return fmt.Errorf("failed to apply repo metadata: %w", err) 199 + } 200 + return tx.Commit() 201 + } 202 + 203 + func (i *Ingester) ingestRepoDelete(ctx context.Context, e *jmodels.Event) error { 204 + l := i.Logger.With("handler", "ingestRepoDelete", "did", e.Did, "rkey", e.Commit.RKey) 205 + 206 + repo, err := db.GetRepo(i.Db, 207 + orm.FilterEq("did", e.Did), 208 + orm.FilterEq("rkey", e.Commit.RKey), 209 + ) 210 + if err != nil { 211 + if errors.Is(err, sql.ErrNoRows) { 212 + l.Info("skipping repo delete for unknown row") 213 + return nil 214 + } 215 + return fmt.Errorf("failed to fetch repo for delete: %w", err) 216 + } 217 + 218 + if err := db.RemoveRepo(i.Db, e.Did, e.Commit.RKey); err != nil { 219 + return fmt.Errorf("failed to delete repo: %w", err) 220 + } 221 + 222 + i.Notifier.DeleteRepo(ctx, repo) 223 + l.Info("deleted repo row") 224 + return nil 225 + } 226 + 227 + func applyRepoMetadata(tx *sql.Tx, current *models.Repo, desired models.Repo) error { 228 + if err := db.PutRepo(tx, desired); err != nil { 229 + return err 230 + } 231 + 232 + if current.Spindle != desired.Spindle { 233 + var spindlePtr *string 234 + if desired.Spindle != "" { 235 + spindlePtr = &desired.Spindle 236 + } 237 + if err := db.UpdateSpindle(tx, desired.RepoDid, spindlePtr); err != nil { 238 + return err 239 + } 240 + } 241 + 242 + if !labelsEqual(current.Labels, desired.Labels) { 243 + if err := reconcileLabels(tx, current, desired); err != nil { 244 + return err 245 + } 246 + } 247 + 248 + return nil 249 + } 250 + 251 + func reconcileLabels(tx *sql.Tx, current *models.Repo, desired models.Repo) error { 252 + added := filterOut(desired.Labels, current.Labels) 253 + removed := filterOut(current.Labels, desired.Labels) 254 + 255 + if err := applyEach(added, func(l string) error { 256 + return db.SubscribeLabel(tx, &models.RepoLabel{ 257 + RepoDid: syntax.DID(desired.RepoDid), 258 + LabelAt: syntax.ATURI(l), 259 + }) 260 + }); err != nil { 261 + return err 262 + } 263 + 264 + return applyEach(removed, func(l string) error { 265 + return db.UnsubscribeLabel(tx, 266 + orm.FilterEq("repo_did", desired.RepoDid), 267 + orm.FilterEq("label_at", l), 268 + ) 269 + }) 270 + } 271 + 272 + func filterOut(items, exclude []string) []string { 273 + return slices.DeleteFunc(slices.Clone(items), func(s string) bool { 274 + return slices.Contains(exclude, s) 275 + }) 276 + } 277 + 278 + func applyEach(items []string, fn func(string) error) error { 279 + for _, item := range items { 280 + if err := fn(item); err != nil { 281 + return err 282 + } 283 + } 284 + return nil 285 + } 286 + 287 + func labelsEqual(a, b []string) bool { 288 + if len(a) != len(b) { 289 + return false 290 + } 291 + aSorted := append([]string(nil), a...) 292 + bSorted := append([]string(nil), b...) 293 + slices.Sort(aSorted) 294 + slices.Sort(bSorted) 295 + return slices.Equal(aSorted, bSorted) 296 + } 297 + 298 + func repoFromRecord(current *models.Repo, record *tangled.Repo) models.Repo { 299 + out := *current 300 + out.Name = derefString(record.Name) 301 + if out.Name == "" { 302 + out.Name = current.Rkey 303 + } 304 + out.Knot = record.Knot 305 + out.Description = derefString(record.Description) 306 + out.Website = derefString(record.Website) 307 + out.Topics = append([]string(nil), record.Topics...) 308 + out.Spindle = derefString(record.Spindle) 309 + out.Source = derefString(record.Source) 310 + out.Labels = append([]string(nil), record.Labels...) 311 + if record.RepoDid != nil { 312 + out.RepoDid = *record.RepoDid 313 + } 314 + return out 315 + } 316 + 317 + func repoMetadataChanged(current *models.Repo, desired *models.Repo) bool { 318 + return current.Name != desired.Name || 319 + current.Knot != desired.Knot || 320 + current.Description != desired.Description || 321 + current.Website != desired.Website || 322 + current.TopicStr() != desired.TopicStr() || 323 + current.Spindle != desired.Spindle || 324 + !labelsEqual(current.Labels, desired.Labels) 325 + } 326 + 327 + func derefString(s *string) string { 328 + if s == nil { 329 + return "" 330 + } 331 + return *s 332 + }
+457
appview/ingester_repo_test.go
··· 1 + package appview 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "encoding/json" 7 + "errors" 8 + "io" 9 + "log/slog" 10 + "path/filepath" 11 + "testing" 12 + 13 + "github.com/bluesky-social/indigo/atproto/syntax" 14 + jmodels "github.com/bluesky-social/jetstream/pkg/models" 15 + "tangled.org/core/api/tangled" 16 + "tangled.org/core/appview/db" 17 + "tangled.org/core/appview/models" 18 + "tangled.org/core/appview/notify" 19 + "tangled.org/core/orm" 20 + ) 21 + 22 + type spyNotifier struct { 23 + notify.BaseNotifier 24 + creates int 25 + deletes int 26 + renames int 27 + } 28 + 29 + func (s *spyNotifier) NewRepo(_ context.Context, _ *models.Repo) { s.creates++ } 30 + func (s *spyNotifier) DeleteRepo(_ context.Context, _ *models.Repo) { s.deletes++ } 31 + func (s *spyNotifier) RenameRepo(_ context.Context, _ syntax.DID, _, _ *models.Repo) { 32 + s.renames++ 33 + } 34 + 35 + func newTestIngester(t *testing.T) (*Ingester, *spyNotifier) { 36 + t.Helper() 37 + path := filepath.Join(t.TempDir(), "test.db") 38 + d, err := db.Make(context.Background(), path) 39 + if err != nil { 40 + t.Fatalf("db.Make: %v", err) 41 + } 42 + t.Cleanup(func() { d.Close() }) 43 + 44 + spy := &spyNotifier{} 45 + ing := &Ingester{ 46 + Db: d, 47 + Logger: slog.New(slog.NewTextHandler(io.Discard, nil)), 48 + Notifier: spy, 49 + } 50 + return ing, spy 51 + } 52 + 53 + func seedRepoRow(t *testing.T, ing *Ingester, did, knot, name, rkey, repoDid string) *models.Repo { 54 + t.Helper() 55 + tx, err := ing.Db.Begin() 56 + if err != nil { 57 + t.Fatalf("Begin: %v", err) 58 + } 59 + repo := &models.Repo{ 60 + Did: did, 61 + Name: name, 62 + Knot: knot, 63 + Rkey: rkey, 64 + RepoDid: repoDid, 65 + } 66 + if err := db.AddRepo(tx, repo); err != nil { 67 + t.Fatalf("AddRepo: %v", err) 68 + } 69 + if err := tx.Commit(); err != nil { 70 + t.Fatalf("Commit: %v", err) 71 + } 72 + return repo 73 + } 74 + 75 + func ptr[T any](v T) *T { return &v } 76 + 77 + func makeEvent(t *testing.T, op string, did, rkey string, record tangled.Repo) *jmodels.Event { 78 + t.Helper() 79 + raw, err := json.Marshal(record) 80 + if err != nil { 81 + t.Fatalf("marshal record: %v", err) 82 + } 83 + return &jmodels.Event{ 84 + Did: did, 85 + Kind: jmodels.EventKindCommit, 86 + Commit: &jmodels.Commit{ 87 + Operation: op, 88 + Collection: tangled.RepoNSID, 89 + RKey: rkey, 90 + Record: raw, 91 + }, 92 + } 93 + } 94 + 95 + func makeDeleteEvent(did, rkey string) *jmodels.Event { 96 + return &jmodels.Event{ 97 + Did: did, 98 + Kind: jmodels.EventKindCommit, 99 + Commit: &jmodels.Commit{ 100 + Operation: jmodels.CommitOperationDelete, 101 + Collection: tangled.RepoNSID, 102 + RKey: rkey, 103 + }, 104 + } 105 + } 106 + 107 + func loadRepo(t *testing.T, ing *Ingester, did, rkey string) *models.Repo { 108 + t.Helper() 109 + r, err := db.GetRepo(ing.Db, 110 + orm.FilterEq("did", did), 111 + orm.FilterEq("rkey", rkey), 112 + ) 113 + if err != nil { 114 + t.Fatalf("GetRepo: %v", err) 115 + } 116 + return r 117 + } 118 + 119 + func TestIngestRepo_CreateInsertsNewRow(t *testing.T) { 120 + ing, spy := newTestIngester(t) 121 + 122 + e := makeEvent(t, jmodels.CommitOperationCreate, "did:plc:akshay", "myrepo", tangled.Repo{ 123 + Knot: "knot.example", 124 + Name: ptr("MyRepo"), 125 + Description: ptr("a test repo"), 126 + RepoDid: ptr("did:plc:repo1"), 127 + }) 128 + 129 + if err := ing.ingestRepo(context.Background(), e); err != nil { 130 + t.Fatalf("ingestRepo: %v", err) 131 + } 132 + 133 + r := loadRepo(t, ing, "did:plc:akshay", "myrepo") 134 + if r.Name != "MyRepo" { 135 + t.Errorf("name = %q, want %q", r.Name, "MyRepo") 136 + } 137 + if r.Description != "a test repo" { 138 + t.Errorf("description = %q", r.Description) 139 + } 140 + if r.RepoDid != "did:plc:repo1" { 141 + t.Errorf("repoDid = %q", r.RepoDid) 142 + } 143 + if spy.creates != 1 { 144 + t.Errorf("NewRepo called %d times, want 1", spy.creates) 145 + } 146 + } 147 + 148 + func TestIngestRepo_CreateSkipsIfRowExists(t *testing.T) { 149 + ing, spy := newTestIngester(t) 150 + seedRepoRow(t, ing, "did:plc:akshay", "knot.example", "myrepo", "myrepo", "did:plc:repo1") 151 + 152 + e := makeEvent(t, jmodels.CommitOperationCreate, "did:plc:akshay", "myrepo", tangled.Repo{ 153 + Knot: "knot.example", 154 + Name: ptr("myrepo"), 155 + RepoDid: ptr("did:plc:repo1"), 156 + }) 157 + 158 + if err := ing.ingestRepo(context.Background(), e); err != nil { 159 + t.Fatalf("ingestRepo: %v", err) 160 + } 161 + if spy.creates != 0 { 162 + t.Errorf("row already exists, NewRepo should not be called but was called %d times", spy.creates) 163 + } 164 + } 165 + 166 + func TestIngestRepo_CreateCascadesRename(t *testing.T) { 167 + ing, spy := newTestIngester(t) 168 + seedRepoRow(t, ing, "did:plc:akshay", "knot.example", "oldname", "oldname", "did:plc:repo1") 169 + 170 + e := makeEvent(t, jmodels.CommitOperationCreate, "did:plc:akshay", "newname", tangled.Repo{ 171 + Knot: "knot.example", 172 + Name: ptr("NewName"), 173 + RepoDid: ptr("did:plc:repo1"), 174 + }) 175 + 176 + if err := ing.ingestRepo(context.Background(), e); err != nil { 177 + t.Fatalf("ingestRepo: %v", err) 178 + } 179 + 180 + _, err := db.GetRepo(ing.Db, 181 + orm.FilterEq("did", "did:plc:akshay"), 182 + orm.FilterEq("rkey", "oldname"), 183 + ) 184 + if !errors.Is(err, sql.ErrNoRows) { 185 + t.Errorf("old rkey row should be gone, got err = %v", err) 186 + } 187 + 188 + r := loadRepo(t, ing, "did:plc:akshay", "newname") 189 + if r.Name != "NewName" { 190 + t.Errorf("name = %q, want %q", r.Name, "NewName") 191 + } 192 + if r.RepoDid != "did:plc:repo1" { 193 + t.Errorf("repoDid = %q", r.RepoDid) 194 + } 195 + 196 + hint, err := db.LookupRepoRename(ing.Db, "did:plc:akshay", "oldname") 197 + if err != nil { 198 + t.Fatalf("LookupRepoRename: %v", err) 199 + } 200 + if hint == nil { 201 + t.Fatal("expected rename history, got nil") 202 + } 203 + 204 + if spy.renames != 1 { 205 + t.Errorf("RenameRepo called %d times, want 1", spy.renames) 206 + } 207 + if spy.creates != 0 { 208 + t.Errorf("rename should not create: NewRepo called %d times, want 0", spy.creates) 209 + } 210 + } 211 + 212 + func TestIngestRepo_CreateNoRepoDidSkipped(t *testing.T) { 213 + ing, spy := newTestIngester(t) 214 + 215 + e := makeEvent(t, jmodels.CommitOperationCreate, "did:plc:akshay", "myrepo", tangled.Repo{ 216 + Knot: "knot.example", 217 + Name: ptr("myrepo"), 218 + }) 219 + 220 + if err := ing.ingestRepo(context.Background(), e); err != nil { 221 + t.Fatalf("ingestRepo: %v", err) 222 + } 223 + if spy.creates != 0 { 224 + t.Errorf("NewRepo called %d times, want 0", spy.creates) 225 + } 226 + } 227 + 228 + func TestIngestRepo_UpdateMetadata(t *testing.T) { 229 + ing, _ := newTestIngester(t) 230 + seedRepoRow(t, ing, "did:plc:akshay", "knot.example", "foo", "foo", "did:plc:repo1") 231 + 232 + e := makeEvent(t, jmodels.CommitOperationUpdate, "did:plc:akshay", "foo", tangled.Repo{ 233 + Knot: "knot.example", 234 + Name: ptr("foo"), 235 + Description: ptr("updated description"), 236 + Website: ptr("https://example.com"), 237 + Topics: []string{"go", "test"}, 238 + RepoDid: ptr("did:plc:repo1"), 239 + }) 240 + 241 + if err := ing.ingestRepo(context.Background(), e); err != nil { 242 + t.Fatalf("ingestRepo: %v", err) 243 + } 244 + 245 + r := loadRepo(t, ing, "did:plc:akshay", "foo") 246 + if r.Description != "updated description" { 247 + t.Errorf("description = %q", r.Description) 248 + } 249 + if r.Website != "https://example.com" { 250 + t.Errorf("website = %q", r.Website) 251 + } 252 + if got := r.TopicStr(); got != "go test" { 253 + t.Errorf("topics = %q", got) 254 + } 255 + } 256 + 257 + func TestIngestRepo_UpdateDisplayName(t *testing.T) { 258 + ing, _ := newTestIngester(t) 259 + seedRepoRow(t, ing, "did:plc:akshay", "knot.example", "foo", "foo", "did:plc:repo1") 260 + 261 + e := makeEvent(t, jmodels.CommitOperationUpdate, "did:plc:akshay", "foo", tangled.Repo{ 262 + Knot: "knot.example", 263 + Name: ptr("Foo"), 264 + RepoDid: ptr("did:plc:repo1"), 265 + }) 266 + 267 + if err := ing.ingestRepo(context.Background(), e); err != nil { 268 + t.Fatalf("ingestRepo: %v", err) 269 + } 270 + 271 + r := loadRepo(t, ing, "did:plc:akshay", "foo") 272 + if r.Name != "Foo" { 273 + t.Errorf("name = %q, want %q", r.Name, "Foo") 274 + } 275 + if r.Rkey != "foo" { 276 + t.Errorf("rkey should be unchanged but got %q, want %q", r.Rkey, "foo") 277 + } 278 + } 279 + 280 + func TestIngestRepo_UpdateNothingChangedNoOp(t *testing.T) { 281 + ing, _ := newTestIngester(t) 282 + seedRepoRow(t, ing, "did:plc:akshay", "knot.example", "foo", "foo", "did:plc:repo1") 283 + 284 + e := makeEvent(t, jmodels.CommitOperationUpdate, "did:plc:akshay", "foo", tangled.Repo{ 285 + Knot: "knot.example", 286 + Name: ptr("foo"), 287 + RepoDid: ptr("did:plc:repo1"), 288 + }) 289 + 290 + if err := ing.ingestRepo(context.Background(), e); err != nil { 291 + t.Fatalf("ingestRepo: %v", err) 292 + } 293 + 294 + r := loadRepo(t, ing, "did:plc:akshay", "foo") 295 + if r.Name != "foo" { 296 + t.Errorf("name = %q, want unchanged %q", r.Name, "foo") 297 + } 298 + } 299 + 300 + func TestIngestRepo_UnknownRowSkipped(t *testing.T) { 301 + ops := []string{jmodels.CommitOperationUpdate, jmodels.CommitOperationDelete} 302 + for _, op := range ops { 303 + t.Run(op, func(t *testing.T) { 304 + ing, _ := newTestIngester(t) 305 + 306 + var e *jmodels.Event 307 + switch op { 308 + case jmodels.CommitOperationUpdate: 309 + e = makeEvent(t, op, "did:plc:nobody", "ghost", tangled.Repo{ 310 + Knot: "knot.example", 311 + Name: ptr("ghost"), 312 + RepoDid: ptr("did:plc:nope"), 313 + }) 314 + case jmodels.CommitOperationDelete: 315 + e = makeDeleteEvent("did:plc:nobody", "ghost") 316 + } 317 + 318 + if err := ing.ingestRepo(context.Background(), e); err != nil { 319 + t.Fatalf("ingestRepo: %v", err) 320 + } 321 + }) 322 + } 323 + } 324 + 325 + func TestIngestRepo_UpdateNoRepoDidSkipped(t *testing.T) { 326 + ing, _ := newTestIngester(t) 327 + seedRepoRow(t, ing, "did:plc:akshay", "knot.example", "foo", "foo", "did:plc:repo1") 328 + 329 + e := makeEvent(t, jmodels.CommitOperationUpdate, "did:plc:akshay", "foo", tangled.Repo{ 330 + Knot: "knot.example", 331 + Name: ptr("bar"), 332 + }) 333 + 334 + if err := ing.ingestRepo(context.Background(), e); err != nil { 335 + t.Fatalf("ingestRepo: %v", err) 336 + } 337 + 338 + r := loadRepo(t, ing, "did:plc:akshay", "foo") 339 + if r.Name != "foo" { 340 + t.Errorf("name = %q, want unchanged %q", r.Name, "foo") 341 + } 342 + } 343 + 344 + func TestIngestRepo_DeleteRemovesRow(t *testing.T) { 345 + ing, _ := newTestIngester(t) 346 + seedRepoRow(t, ing, "did:plc:akshay", "knot.example", "foo", "foo", "did:plc:repo1") 347 + 348 + e := makeDeleteEvent("did:plc:akshay", "foo") 349 + if err := ing.ingestRepo(context.Background(), e); err != nil { 350 + t.Fatalf("ingestRepo: %v", err) 351 + } 352 + 353 + _, err := db.GetRepo(ing.Db, 354 + orm.FilterEq("did", "did:plc:akshay"), 355 + orm.FilterEq("rkey", "foo"), 356 + ) 357 + if !errors.Is(err, sql.ErrNoRows) { 358 + t.Errorf("expected row to be deleted, got err = %v", err) 359 + } 360 + } 361 + 362 + func TestIngestRepo_MalformedRecord(t *testing.T) { 363 + ing, _ := newTestIngester(t) 364 + 365 + e := &jmodels.Event{ 366 + Did: "did:plc:akshay", 367 + Kind: jmodels.EventKindCommit, 368 + Commit: &jmodels.Commit{ 369 + Operation: jmodels.CommitOperationUpdate, 370 + Collection: tangled.RepoNSID, 371 + RKey: "rkey1", 372 + Record: json.RawMessage("{not json"), 373 + }, 374 + } 375 + 376 + if err := ing.ingestRepo(context.Background(), e); err == nil { 377 + t.Errorf("ingestRepo with malformed record: err = nil, want error") 378 + } 379 + } 380 + 381 + func TestIngestRepo_RenameDeleteSequenceNoTornState(t *testing.T) { 382 + ing, spy := newTestIngester(t) 383 + seedRepoRow(t, ing, "did:plc:akshay", "knot.example", "oldname", "oldname", "did:plc:repo1") 384 + 385 + if _, err := ing.Db.Exec( 386 + `insert into stars (did, rkey, subject_type, subject) values (?, ?, ?, ?)`, 387 + "did:plc:boltless", "star1", "repo", "did:plc:repo1", 388 + ); err != nil { 389 + t.Fatalf("seed star: %v", err) 390 + } 391 + 392 + createEvt := makeEvent(t, jmodels.CommitOperationCreate, "did:plc:akshay", "newname", tangled.Repo{ 393 + Knot: "knot.example", 394 + Name: ptr("NewName"), 395 + RepoDid: ptr("did:plc:repo1"), 396 + }) 397 + if err := ing.ingestRepo(context.Background(), createEvt); err != nil { 398 + t.Fatalf("ingest create: %v", err) 399 + } 400 + 401 + deleteEvt := makeDeleteEvent("did:plc:akshay", "oldname") 402 + if err := ing.ingestRepo(context.Background(), deleteEvt); err != nil { 403 + t.Fatalf("ingest delete: %v", err) 404 + } 405 + 406 + r := loadRepo(t, ing, "did:plc:akshay", "newname") 407 + if r.Name != "NewName" { 408 + t.Errorf("name = %q, want %q", r.Name, "NewName") 409 + } 410 + if r.RepoDid != "did:plc:repo1" { 411 + t.Errorf("repoDid = %q, want %q", r.RepoDid, "did:plc:repo1") 412 + } 413 + 414 + _, err := db.GetRepo(ing.Db, 415 + orm.FilterEq("did", "did:plc:akshay"), 416 + orm.FilterEq("rkey", "oldname"), 417 + ) 418 + if !errors.Is(err, sql.ErrNoRows) { 419 + t.Errorf("old rkey should be gone, got err = %v", err) 420 + } 421 + 422 + var starSubject string 423 + if err := ing.Db.QueryRow(`select subject from stars where did = ?`, "did:plc:boltless").Scan(&starSubject); err != nil { 424 + t.Fatalf("query star: %v", err) 425 + } 426 + if starSubject != "did:plc:repo1" { 427 + t.Errorf("star subject = %q, want %q", starSubject, "did:plc:repo1") 428 + } 429 + 430 + if spy.renames != 1 { 431 + t.Errorf("RenameRepo called %d times, want 1", spy.renames) 432 + } 433 + if spy.creates != 0 { 434 + t.Errorf("rename should not create: NewRepo called %d times, want 0", spy.creates) 435 + } 436 + if spy.deletes != 0 { 437 + t.Errorf("old rkey already gone, DeleteRepo should not be called but was called %d times", spy.deletes) 438 + } 439 + } 440 + 441 + func TestIngestRepo_CreateFallsBackToRkeyForName(t *testing.T) { 442 + ing, _ := newTestIngester(t) 443 + 444 + e := makeEvent(t, jmodels.CommitOperationCreate, "did:plc:akshay", "myrepo", tangled.Repo{ 445 + Knot: "knot.example", 446 + RepoDid: ptr("did:plc:repo1"), 447 + }) 448 + 449 + if err := ing.ingestRepo(context.Background(), e); err != nil { 450 + t.Fatalf("ingestRepo: %v", err) 451 + } 452 + 453 + r := loadRepo(t, ing, "did:plc:akshay", "myrepo") 454 + if r.Name != "myrepo" { 455 + t.Errorf("name should fall back to rkey: got %q, want %q", r.Name, "myrepo") 456 + } 457 + }