Monorepo for Tangled tangled.org
854
fork

Configure Feed

Select the types of activity you want to include in your feed.

appview: replace `PullComment` to `Comment`

Including db migration to migrate `issue_comments` and `pull_comments`
to unified `comments` table.

Signed-off-by: Seongmin Lee <git@boltless.me>

+767 -242
+268
appview/db/comments.go
··· 1 + package db 2 + 3 + import ( 4 + "database/sql" 5 + "encoding/json" 6 + "fmt" 7 + "sort" 8 + "strings" 9 + "time" 10 + 11 + "github.com/bluesky-social/indigo/api/atproto" 12 + "github.com/bluesky-social/indigo/atproto/syntax" 13 + "tangled.org/core/api/tangled" 14 + "tangled.org/core/appview/models" 15 + "tangled.org/core/orm" 16 + ) 17 + 18 + func PutComment(tx *sql.Tx, c *models.Comment, references []syntax.ATURI) error { 19 + if c.Collection == "" { 20 + c.Collection = tangled.FeedCommentNSID 21 + } 22 + 23 + var bodyBlobs, replyToUri, replyToCid *string 24 + if len(c.Body.Blobs) > 0 { 25 + encoded, err := json.Marshal(c.Body.Blobs) 26 + if err != nil { 27 + return fmt.Errorf("encoding blobs to json: %w", err) 28 + } 29 + encodedStr := string(encoded) 30 + bodyBlobs = &encodedStr 31 + } 32 + if c.ReplyTo != nil { 33 + replyToUri = &c.ReplyTo.Uri 34 + replyToCid = &c.ReplyTo.Cid 35 + } 36 + result, err := tx.Exec( 37 + // users can change the 'created' date. 38 + // skip update entirely if cid is unchanged. 39 + `insert into comments ( 40 + did, 41 + collection, 42 + rkey, 43 + cid, 44 + subject_uri, 45 + subject_cid, 46 + body_text, 47 + body_original, 48 + body_blobs, 49 + created, 50 + reply_to_uri, 51 + reply_to_cid, 52 + pull_round_idx 53 + ) 54 + values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) 55 + on conflict(did, collection, rkey) 56 + do update set 57 + cid = excluded.cid, 58 + subject_uri = excluded.subject_uri, 59 + subject_cid = excluded.subject_cid, 60 + body_text = excluded.body_text, 61 + body_original = excluded.body_original, 62 + body_blobs = excluded.body_blobs, 63 + created = excluded.created, 64 + reply_to_uri = excluded.reply_to_uri, 65 + reply_to_cid = excluded.reply_to_cid, 66 + pull_round_idx = excluded.pull_round_idx, 67 + edited = ? 68 + where comments.cid is not excluded.cid`, 69 + c.Did, 70 + c.Collection, 71 + c.Rkey, 72 + c.Cid, 73 + c.Subject.Uri, 74 + c.Subject.Cid, 75 + c.Body.Text, 76 + c.Body.Original, 77 + bodyBlobs, 78 + c.Created.Format(time.RFC3339), 79 + replyToUri, 80 + replyToCid, 81 + c.PullRoundIdx, 82 + time.Now().Format(time.RFC3339), 83 + ) 84 + if err != nil { 85 + return err 86 + } 87 + 88 + c.Id, err = result.LastInsertId() 89 + if err != nil { 90 + return err 91 + } 92 + 93 + affected, err := result.RowsAffected() 94 + if err != nil { 95 + return err 96 + } 97 + 98 + if affected > 0 { 99 + // update references when comment is updated 100 + if err := putReferences(tx, c.AtUri(), references); err != nil { 101 + return fmt.Errorf("put reference_links: %w", err) 102 + } 103 + } 104 + 105 + return nil 106 + } 107 + 108 + // PurgeComments actually purges a comment row from db instead of marking it as "deleted" 109 + func PurgeComments(e Execer, filters ...orm.Filter) error { 110 + var conditions []string 111 + var args []any 112 + for _, filter := range filters { 113 + conditions = append(conditions, filter.Condition()) 114 + args = append(args, filter.Arg()...) 115 + } 116 + 117 + whereClause := "" 118 + if conditions != nil { 119 + whereClause = " where " + strings.Join(conditions, " and ") 120 + } 121 + 122 + _, err := e.Exec(fmt.Sprintf(`delete from comments %s`, whereClause), args...) 123 + return err 124 + } 125 + 126 + func DeleteComments(e Execer, filters ...orm.Filter) error { 127 + var conditions []string 128 + var args []any 129 + for _, filter := range filters { 130 + conditions = append(conditions, filter.Condition()) 131 + args = append(args, filter.Arg()...) 132 + } 133 + 134 + whereClause := "" 135 + if conditions != nil { 136 + whereClause = " where " + strings.Join(conditions, " and ") 137 + } 138 + 139 + query := fmt.Sprintf( 140 + `update comments 141 + set body_text = "", 142 + body_original = null, 143 + body_blobs = null, 144 + deleted = strftime('%%Y-%%m-%%dT%%H:%%M:%%SZ', 'now') 145 + %s`, 146 + whereClause, 147 + ) 148 + 149 + _, err := e.Exec(query, args...) 150 + return err 151 + } 152 + 153 + func GetComments(e Execer, filters ...orm.Filter) ([]models.Comment, error) { 154 + var comments []models.Comment 155 + 156 + var conditions []string 157 + var args []any 158 + for _, filter := range filters { 159 + conditions = append(conditions, filter.Condition()) 160 + args = append(args, filter.Arg()...) 161 + } 162 + 163 + whereClause := "" 164 + if conditions != nil { 165 + whereClause = " where " + strings.Join(conditions, " and ") 166 + } 167 + 168 + query := fmt.Sprintf(` 169 + select 170 + id, 171 + did, 172 + collection, 173 + rkey, 174 + cid, 175 + subject_uri, 176 + subject_cid, 177 + body_text, 178 + body_original, 179 + body_blobs, 180 + created, 181 + reply_to_uri, 182 + reply_to_cid, 183 + pull_round_idx, 184 + edited, 185 + deleted 186 + from 187 + comments 188 + %s 189 + `, whereClause) 190 + 191 + rows, err := e.Query(query, args...) 192 + if err != nil { 193 + return nil, err 194 + } 195 + defer rows.Close() 196 + 197 + for rows.Next() { 198 + var comment models.Comment 199 + var created string 200 + var cid, bodyBlobs, replyToUri, replyToCid, edited, deleted sql.Null[string] 201 + err := rows.Scan( 202 + &comment.Id, 203 + &comment.Did, 204 + &comment.Collection, 205 + &comment.Rkey, 206 + &cid, 207 + &comment.Subject.Uri, 208 + &comment.Subject.Cid, 209 + &comment.Body.Text, 210 + &comment.Body.Original, 211 + &bodyBlobs, 212 + &created, 213 + &replyToUri, 214 + &replyToCid, 215 + &comment.PullRoundIdx, 216 + &edited, 217 + &deleted, 218 + ) 219 + if err != nil { 220 + return nil, err 221 + } 222 + 223 + if cid.Valid && cid.V != "" { 224 + comment.Cid = syntax.CID(cid.V) 225 + } 226 + 227 + if bodyBlobs.Valid && bodyBlobs.V != "" { 228 + if err := json.Unmarshal([]byte(bodyBlobs.V), &comment.Body.Blobs); err != nil { 229 + return nil, fmt.Errorf("decoding blobs: %w", err) 230 + } 231 + } 232 + 233 + if t, err := time.Parse(time.RFC3339, created); err == nil { 234 + comment.Created = t 235 + } 236 + 237 + if replyToUri.Valid && replyToCid.Valid { 238 + comment.ReplyTo = &atproto.RepoStrongRef{ 239 + Uri: replyToUri.V, 240 + Cid: replyToCid.V, 241 + } 242 + } 243 + 244 + if edited.Valid { 245 + if t, err := time.Parse(time.RFC3339, edited.V); err == nil { 246 + comment.Edited = &t 247 + } 248 + } 249 + 250 + if deleted.Valid { 251 + if t, err := time.Parse(time.RFC3339, deleted.V); err == nil { 252 + comment.Deleted = &t 253 + } 254 + } 255 + 256 + comments = append(comments, comment) 257 + } 258 + 259 + if err := rows.Err(); err != nil { 260 + return nil, err 261 + } 262 + 263 + sort.Slice(comments, func(i, j int) bool { 264 + return comments[i].Created.Before(comments[j].Created) 265 + }) 266 + 267 + return comments, nil 268 + }
+96
appview/db/db.go
··· 1519 1519 }) 1520 1520 conn.ExecContext(ctx, "pragma foreign_keys = on;") 1521 1521 1522 + orm.RunMigration(conn, logger, "add-comments-table", func(tx *sql.Tx) error { 1523 + _, err := tx.Exec(` 1524 + drop table if exists comments; 1525 + 1526 + create table comments ( 1527 + -- identifiers 1528 + id integer primary key autoincrement, 1529 + 1530 + did text not null, 1531 + collection text not null default 'sh.tangled.feed.comment', 1532 + rkey text not null, 1533 + at_uri text generated always as ('at://' || did || '/' || collection || '/' || rkey) stored, 1534 + cid text, 1535 + 1536 + -- content 1537 + subject_uri text not null, -- at_uri of subject (issue, pr, string) 1538 + subject_cid text not null, -- cid of subject 1539 + 1540 + body_text text not null, 1541 + body_original text, 1542 + body_blobs text, -- json 1543 + 1544 + created text not null default (strftime('%Y-%m-%dT%H:%M:%SZ', 'now')), 1545 + 1546 + reply_to_uri text, -- at_uri of parent comment 1547 + reply_to_cid text, -- cid of parent comment 1548 + 1549 + pull_round_idx integer, -- pull round index. required when subject is sh.tangled.repo.pull 1550 + 1551 + -- appview-local information 1552 + edited text, 1553 + deleted text, 1554 + 1555 + unique(did, collection, rkey) 1556 + ); 1557 + 1558 + insert into comments ( 1559 + did, 1560 + collection, 1561 + rkey, 1562 + subject_uri, 1563 + subject_cid, -- we need to know cid 1564 + body_text, 1565 + created, 1566 + reply_to_uri, 1567 + reply_to_cid, -- we need to know cid 1568 + edited, 1569 + deleted 1570 + ) 1571 + select 1572 + did, 1573 + 'sh.tangled.repo.issue.comment', 1574 + rkey, 1575 + issue_at, 1576 + '', 1577 + body, 1578 + created, 1579 + reply_to, 1580 + '', 1581 + edited, 1582 + deleted 1583 + from issue_comments 1584 + where rkey is not null; 1585 + 1586 + insert into comments ( 1587 + did, 1588 + collection, 1589 + rkey, 1590 + subject_uri, 1591 + subject_cid, -- we need to know cid 1592 + body_text, 1593 + created, 1594 + pull_round_idx 1595 + ) 1596 + select 1597 + c.owner_did, 1598 + 'sh.tangled.repo.pull.comment', 1599 + substr( 1600 + substr(c.comment_at, 6 + instr(substr(c.comment_at, 6), '/')), -- nsid/rkey 1601 + instr( 1602 + substr(c.comment_at, 6 + instr(substr(c.comment_at, 6), '/')), -- nsid/rkey 1603 + '/' 1604 + ) + 1 1605 + ), -- rkey 1606 + p.at_uri, 1607 + '', 1608 + c.body, 1609 + c.created, 1610 + s.round_number 1611 + from pull_comments c 1612 + join pulls p on c.repo_at = p.repo_at and c.pull_id = p.pull_id 1613 + join pull_submissions s on s.id = c.submission_id; 1614 + `) 1615 + return err 1616 + }) 1617 + 1522 1618 return &DB{ 1523 1619 db, 1524 1620 logger,
+15 -132
appview/db/pulls.go
··· 524 524 } 525 525 defer rows.Close() 526 526 527 - submissionMap := make(map[int]*models.PullSubmission) 527 + pullMap := make(map[syntax.ATURI][]*models.PullSubmission) 528 528 529 529 for rows.Next() { 530 530 var submission models.PullSubmission ··· 572 572 submission.Blob.Size = patchBlobSize.V 573 573 } 574 574 575 - submissionMap[submission.ID] = &submission 575 + pullMap[submission.PullAt] = append(pullMap[submission.PullAt], &submission) 576 576 } 577 577 578 578 if err := rows.Err(); err != nil { 579 579 return nil, err 580 580 } 581 581 582 - // Get comments for all submissions using GetPullComments 583 - submissionIds := slices.Collect(maps.Keys(submissionMap)) 584 - comments, err := GetPullComments(e, orm.FilterIn("submission_id", submissionIds)) 582 + // Get comments for all submissions using GetComments 583 + pullAts := slices.Collect(maps.Keys(pullMap)) 584 + comments, err := GetComments(e, orm.FilterIn("subject_uri", pullAts)) 585 585 if err != nil { 586 586 return nil, fmt.Errorf("failed to get pull comments: %w", err) 587 587 } 588 588 for _, comment := range comments { 589 - if submission, ok := submissionMap[comment.SubmissionId]; ok { 590 - submission.Comments = append(submission.Comments, comment) 589 + if comment.PullRoundIdx != nil { 590 + roundIdx := *comment.PullRoundIdx 591 + if submissions, ok := pullMap[syntax.ATURI(comment.Subject.Uri)]; ok { 592 + if roundIdx < len(submissions) { 593 + submission := submissions[roundIdx] 594 + submission.Comments = append(submission.Comments, comment) 595 + } 596 + } 591 597 } 592 598 } 593 599 594 - // group the submissions by pull_at 595 - m := make(map[syntax.ATURI][]*models.PullSubmission) 596 - for _, s := range submissionMap { 597 - m[s.PullAt] = append(m[s.PullAt], s) 598 - } 599 - 600 600 // sort each one by round number 601 - for _, s := range m { 601 + for _, s := range pullMap { 602 602 slices.SortFunc(s, func(a, b *models.PullSubmission) int { 603 603 return cmp.Compare(a.RoundNumber, b.RoundNumber) 604 604 }) 605 605 } 606 606 607 - return m, nil 608 - } 609 - 610 - func GetPullComments(e Execer, filters ...orm.Filter) ([]models.PullComment, error) { 611 - var conditions []string 612 - var args []any 613 - for _, filter := range filters { 614 - conditions = append(conditions, filter.Condition()) 615 - args = append(args, filter.Arg()...) 616 - } 617 - 618 - whereClause := "" 619 - if conditions != nil { 620 - whereClause = " where " + strings.Join(conditions, " and ") 621 - } 622 - 623 - query := fmt.Sprintf(` 624 - select 625 - id, 626 - pull_id, 627 - submission_id, 628 - repo_at, 629 - owner_did, 630 - comment_at, 631 - body, 632 - created 633 - from 634 - pull_comments 635 - %s 636 - order by 637 - created asc 638 - `, whereClause) 639 - 640 - rows, err := e.Query(query, args...) 641 - if err != nil { 642 - return nil, err 643 - } 644 - defer rows.Close() 645 - 646 - commentMap := make(map[string]*models.PullComment) 647 - for rows.Next() { 648 - var comment models.PullComment 649 - var createdAt string 650 - err := rows.Scan( 651 - &comment.ID, 652 - &comment.PullId, 653 - &comment.SubmissionId, 654 - &comment.RepoAt, 655 - &comment.OwnerDid, 656 - &comment.CommentAt, 657 - &comment.Body, 658 - &createdAt, 659 - ) 660 - if err != nil { 661 - return nil, err 662 - } 663 - 664 - if t, err := time.Parse(time.RFC3339, createdAt); err == nil { 665 - comment.Created = t 666 - } 667 - 668 - atUri := comment.AtUri().String() 669 - commentMap[atUri] = &comment 670 - } 671 - 672 - if err := rows.Err(); err != nil { 673 - return nil, err 674 - } 675 - 676 - // collect references for each comments 677 - commentAts := slices.Collect(maps.Keys(commentMap)) 678 - allReferences, err := GetReferencesAll(e, orm.FilterIn("from_at", commentAts)) 679 - if err != nil { 680 - return nil, fmt.Errorf("failed to query reference_links: %w", err) 681 - } 682 - for commentAt, references := range allReferences { 683 - if comment, ok := commentMap[commentAt.String()]; ok { 684 - comment.References = references 685 - } 686 - } 687 - 688 - var comments []models.PullComment 689 - for _, c := range commentMap { 690 - comments = append(comments, *c) 691 - } 692 - 693 - sort.Slice(comments, func(i, j int) bool { 694 - return comments[i].Created.Before(comments[j].Created) 695 - }) 696 - 697 - return comments, nil 607 + return pullMap, nil 698 608 } 699 609 700 610 // timeframe here is directly passed into the sql query filter, and any ··· 771 681 } 772 682 773 683 return pulls, nil 774 - } 775 - 776 - func NewPullComment(tx *sql.Tx, comment *models.PullComment) (int64, error) { 777 - query := `insert into pull_comments (owner_did, repo_at, submission_id, comment_at, pull_id, body) values (?, ?, ?, ?, ?, ?)` 778 - res, err := tx.Exec( 779 - query, 780 - comment.OwnerDid, 781 - comment.RepoAt, 782 - comment.SubmissionId, 783 - comment.CommentAt, 784 - comment.PullId, 785 - comment.Body, 786 - ) 787 - if err != nil { 788 - return 0, err 789 - } 790 - 791 - i, err := res.LastInsertId() 792 - if err != nil { 793 - return 0, err 794 - } 795 - 796 - if err := putReferences(tx, comment.AtUri(), comment.References); err != nil { 797 - return 0, fmt.Errorf("put reference_links: %w", err) 798 - } 799 - 800 - return i, nil 801 684 } 802 685 803 686 // use with transaction
+7 -8
appview/db/reference.go
··· 124 124 values %s 125 125 ) 126 126 select 127 - p.owner_did, p.rkey, 128 - c.comment_at 127 + p.owner_did, p.rkey, c.at_uri 129 128 from input inp 130 129 join repos r 131 130 on r.did = inp.owner_did ··· 133 132 join pulls p 134 133 on p.repo_at = r.at_uri 135 134 and p.pull_id = inp.pull_id 136 - left join pull_comments c 135 + left join comments c 137 136 on inp.comment_id is not null 138 - and c.repo_at = r.at_uri and c.pull_id = p.pull_id 137 + and c.subject_uri = ('at://' || p.owner_did || '/' || 'sh.tangled.repo.pull' || '/' || p.rkey) 139 138 and c.id = inp.comment_id 140 139 `, 141 140 strings.Join(vals, ","), ··· 293 292 return nil, fmt.Errorf("get pull backlinks: %w", err) 294 293 } 295 294 backlinks = append(backlinks, ls...) 296 - ls, err = getPullCommentBacklinks(e, target, backlinksMap[tangled.RepoPullCommentNSID]) 295 + ls, err = getPullCommentBacklinks(e, target, backlinksMap[tangled.FeedCommentNSID]) 297 296 if err != nil { 298 297 return nil, fmt.Errorf("get pull_comment backlinks: %w", err) 299 298 } ··· 430 429 if len(aturis) == 0 { 431 430 return nil, nil 432 431 } 433 - filter := orm.FilterIn("c.comment_at", aturis) 432 + filter := orm.FilterIn("c.at_uri", aturis) 434 433 exclude := orm.FilterNotEq("p.at_uri", target) 435 434 rows, err := e.Query( 436 435 fmt.Sprintf( ··· 438 437 from repos r 439 438 join pulls p 440 439 on r.at_uri = p.repo_at 441 - join pull_comments c 442 - on r.at_uri = c.repo_at and p.pull_id = c.pull_id 440 + join comments c 441 + on ('at://' || p.owner_did || '/' || 'sh.tangled.repo.pull' || '/' || p.rkey) = c.subject_uri 443 442 where %s and %s`, 444 443 filter.Condition(), 445 444 exclude.Condition(),
+109 -7
appview/ingester.go
··· 26 26 "tangled.org/core/appview/cache" 27 27 "tangled.org/core/appview/config" 28 28 "tangled.org/core/appview/db" 29 + "tangled.org/core/appview/mentions" 29 30 "tangled.org/core/appview/models" 31 + "tangled.org/core/appview/notify" 30 32 "tangled.org/core/appview/serververify" 31 33 "tangled.org/core/appview/validator" 32 34 "tangled.org/core/idresolver" ··· 35 37 ) 36 38 37 39 type Ingester struct { 38 - Db db.DbWrapper 39 - Enforcer *rbac.Enforcer 40 - IdResolver *idresolver.Resolver 41 - Cache *cache.Cache 42 - Config *config.Config 43 - Logger *slog.Logger 44 - Validator *validator.Validator 40 + Db db.DbWrapper 41 + Enforcer *rbac.Enforcer 42 + IdResolver *idresolver.Resolver 43 + Cache *cache.Cache 44 + Config *config.Config 45 + Logger *slog.Logger 46 + Validator *validator.Validator 47 + MentionsResolver *mentions.Resolver 48 + Notifier notify.Notifier 45 49 } 46 50 47 51 type processFunc func(ctx context.Context, e *jmodels.Event) error ··· 91 95 err = i.ingestIssue(ctx, e) 92 96 case tangled.RepoPullNSID: 93 97 err = i.ingestPull(ctx, e) 98 + case tangled.FeedCommentNSID: 99 + err = i.ingestComment(e) 94 100 case tangled.RepoIssueCommentNSID: 95 101 err = i.ingestIssueComment(e) 102 + case tangled.RepoPullCommentNSID: 103 + err = i.ingestPullComment(e) 96 104 case tangled.LabelDefinitionNSID: 97 105 err = i.ingestLabelDefinition(e) 98 106 case tangled.LabelOpNSID: ··· 1288 1296 orm.FilterEq("rkey", rkey), 1289 1297 ); err != nil { 1290 1298 return fmt.Errorf("failed to delete issue comment record: %w", err) 1299 + } 1300 + 1301 + return nil 1302 + } 1303 + 1304 + return nil 1305 + } 1306 + 1307 + // ingestPullComment ingests legacy sh.tangled.repo.pull.comment deletions 1308 + func (i *Ingester) ingestPullComment(e *jmodels.Event) error { 1309 + l := i.Logger.With("handler", "ingestPullComment", "nsid", e.Commit.Collection, "did", e.Did, "rkey", e.Commit.RKey) 1310 + l.Info("ingesting record") 1311 + 1312 + switch e.Commit.Operation { 1313 + case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1314 + // no-op. sh.tangled.repo.pull.comment is deprecated 1315 + 1316 + case jmodels.CommitOperationDelete: 1317 + if err := db.PurgeComments( 1318 + i.Db, 1319 + orm.FilterEq("did", e.Did), 1320 + orm.FilterEq("collection", e.Commit.Collection), 1321 + orm.FilterEq("rkey", e.Commit.RKey), 1322 + ); err != nil { 1323 + return fmt.Errorf("failed to delete comment record: %w", err) 1324 + } 1325 + } 1326 + 1327 + return nil 1328 + } 1329 + 1330 + func (i *Ingester) ingestComment(e *jmodels.Event) error { 1331 + did := e.Did 1332 + rkey := e.Commit.RKey 1333 + cid := e.Commit.CID 1334 + 1335 + var err error 1336 + 1337 + l := i.Logger.With("handler", "ingestComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1338 + l.Info("ingesting record") 1339 + 1340 + ddb, ok := i.Db.Execer.(*db.DB) 1341 + if !ok { 1342 + return fmt.Errorf("failed to index issue comment record, invalid db cast") 1343 + } 1344 + 1345 + ctx := context.Background() 1346 + 1347 + switch e.Commit.Operation { 1348 + case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1349 + raw := json.RawMessage(e.Commit.Record) 1350 + record := tangled.FeedComment{} 1351 + err = json.Unmarshal(raw, &record) 1352 + if err != nil { 1353 + return fmt.Errorf("invalid record: %w", err) 1354 + } 1355 + 1356 + comment, err := models.CommentFromRecord(syntax.DID(did), syntax.RecordKey(rkey), syntax.CID(cid), record) 1357 + if err != nil { 1358 + return fmt.Errorf("failed to parse comment from record: %w", err) 1359 + } 1360 + 1361 + if err := comment.Validate(); err != nil { 1362 + return fmt.Errorf("failed to validate comment: %w", err) 1363 + } 1364 + 1365 + var references []syntax.ATURI 1366 + if comment.Body.Original != nil { 1367 + _, references = i.MentionsResolver.Resolve(ctx, *comment.Body.Original) 1368 + } 1369 + 1370 + tx, err := ddb.Begin() 1371 + if err != nil { 1372 + return fmt.Errorf("failed to start transaction: %w", err) 1373 + } 1374 + defer tx.Rollback() 1375 + 1376 + err = db.PutComment(tx, comment, references) 1377 + if err != nil { 1378 + return fmt.Errorf("failed to create comment: %w", err) 1379 + } 1380 + 1381 + if err := tx.Commit(); err != nil { 1382 + return err 1383 + } 1384 + 1385 + case jmodels.CommitOperationDelete: 1386 + if err := db.DeleteComments( 1387 + ddb, 1388 + orm.FilterEq("did", did), 1389 + orm.FilterEq("collection", e.Commit.Collection), 1390 + orm.FilterEq("rkey", rkey), 1391 + ); err != nil { 1392 + return fmt.Errorf("failed to delete comment record: %w", err) 1291 1393 } 1292 1394 1293 1395 return nil
+148
appview/models/comment.go
··· 1 + package models 2 + 3 + import ( 4 + "fmt" 5 + "strings" 6 + "time" 7 + 8 + comatproto "github.com/bluesky-social/indigo/api/atproto" 9 + "github.com/bluesky-social/indigo/atproto/syntax" 10 + typegen "github.com/whyrusleeping/cbor-gen" 11 + "tangled.org/core/api/tangled" 12 + ) 13 + 14 + type Comment struct { 15 + Id int64 16 + 17 + Did syntax.DID 18 + Collection syntax.NSID 19 + Rkey syntax.RecordKey 20 + Cid syntax.CID 21 + 22 + // record content 23 + Subject comatproto.RepoStrongRef 24 + Body tangled.MarkupMarkdown // markup body type. only markdown is supported right now 25 + Created time.Time 26 + ReplyTo *comatproto.RepoStrongRef // (optional) parent comment 27 + PullRoundIdx *int // (optional) pull round number used when subject is sh.tangled.repo.pull 28 + 29 + // store on db, but not on PDS 30 + Edited *time.Time 31 + Deleted *time.Time 32 + } 33 + 34 + func (c *Comment) AtUri() syntax.ATURI { 35 + return syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", c.Did, c.Collection, c.Rkey)) 36 + } 37 + 38 + func (c *Comment) StrongRef() comatproto.RepoStrongRef { 39 + return comatproto.RepoStrongRef{ 40 + Uri: c.AtUri().String(), 41 + Cid: c.Cid.String(), 42 + } 43 + } 44 + 45 + func (c *Comment) AsRecord() typegen.CBORMarshaler { 46 + // can't convert to record for legacy types 47 + if c.Collection != tangled.FeedCommentNSID { 48 + return nil 49 + } 50 + var pullRoundIdx *int64 51 + if c.PullRoundIdx != nil { 52 + pullRoundIdx = new(int64) 53 + *pullRoundIdx = int64(*c.PullRoundIdx) 54 + } 55 + return &tangled.FeedComment{ 56 + Subject: &c.Subject, 57 + Body: &tangled.FeedComment_Body{MarkupMarkdown: &c.Body}, 58 + CreatedAt: c.Created.Format(time.RFC3339), 59 + ReplyTo: c.ReplyTo, 60 + PullRoundIdx: pullRoundIdx, 61 + } 62 + } 63 + 64 + func (c *Comment) IsTopLevel() bool { 65 + return c.ReplyTo == nil 66 + } 67 + 68 + func (c *Comment) IsReply() bool { 69 + return c.ReplyTo != nil 70 + } 71 + 72 + func (c *Comment) Validate() error { 73 + // TODO: sanitize the body and then trim space 74 + if sb := strings.TrimSpace(c.Body.Text); sb == "" { 75 + return fmt.Errorf("body is empty after HTML sanitization") 76 + } 77 + 78 + // if it's for PR, PullSubmissionId should not be nil 79 + subjectAt, err := syntax.ParseATURI(c.Subject.Uri) 80 + if err != nil { 81 + return fmt.Errorf("subject.uri is not valid at-uri: %w", err) 82 + } 83 + if subjectAt.Collection().String() == tangled.RepoPullNSID { 84 + if c.PullRoundIdx == nil { 85 + return fmt.Errorf("pullSubmissionId should not be nil when subject is sh.tangled.repo.pull") 86 + } 87 + } 88 + return nil 89 + } 90 + 91 + func CommentFromRecord(did syntax.DID, rkey syntax.RecordKey, cid syntax.CID, record tangled.FeedComment) (*Comment, error) { 92 + created, err := time.Parse(time.RFC3339, record.CreatedAt) 93 + if err != nil { 94 + created = time.Now() 95 + } 96 + 97 + if record.Subject == nil { 98 + return nil, fmt.Errorf("subject can't be nil") 99 + } 100 + subjectAt, err := syntax.ParseATURI(record.Subject.Uri) 101 + if err != nil { 102 + return nil, fmt.Errorf("invalid subject uri: %w", err) 103 + } 104 + if _, err = syntax.ParseCID(record.Subject.Cid); err != nil { 105 + return nil, fmt.Errorf("invalid subject cid: %w", err) 106 + } 107 + 108 + if subjectAt.Collection() == tangled.RepoPullNSID { 109 + if record.PullRoundIdx == nil { 110 + return nil, fmt.Errorf("pullRoundIdx can't be nil when subject is sh.tangled.repo.pull") 111 + } 112 + } 113 + 114 + if record.Body == nil { 115 + return nil, fmt.Errorf("body can't be nil") 116 + } 117 + if record.Body.MarkupMarkdown == nil { 118 + return nil, fmt.Errorf("body should be markdown type") 119 + } 120 + 121 + if record.ReplyTo != nil { 122 + if _, err = syntax.ParseATURI(record.ReplyTo.Uri); err != nil { 123 + return nil, fmt.Errorf("invalid replyTo uri: %w", err) 124 + } 125 + if _, err = syntax.ParseCID(record.ReplyTo.Cid); err != nil { 126 + return nil, fmt.Errorf("invalid replyTo cid: %w", err) 127 + } 128 + } 129 + 130 + var pullRoundIdx *int 131 + if record.PullRoundIdx != nil { 132 + pullRoundIdx = new(int) 133 + *pullRoundIdx = int(*record.PullRoundIdx) 134 + } 135 + 136 + return &Comment{ 137 + Did: did, 138 + Collection: tangled.FeedCommentNSID, 139 + Rkey: rkey, 140 + Cid: cid, 141 + 142 + Subject: *record.Subject, 143 + Body: *record.Body.MarkupMarkdown, 144 + Created: created, 145 + ReplyTo: record.ReplyTo, 146 + PullRoundIdx: pullRoundIdx, 147 + }, nil 148 + }
+2 -28
appview/models/pull.go
··· 299 299 Blob lexutil.LexBlob 300 300 Patch string 301 301 Combined string 302 - Comments []PullComment 302 + Comments []Comment 303 303 SourceRev string // include the rev that was used to create this submission: only for branch/fork PRs 304 304 305 305 // meta 306 306 Created time.Time 307 - } 308 - 309 - type PullComment struct { 310 - // ids 311 - ID int 312 - PullId int 313 - SubmissionId int 314 - 315 - // at ids 316 - RepoAt string 317 - OwnerDid string 318 - CommentAt string 319 - 320 - // content 321 - Body string 322 - 323 - // meta 324 - Mentions []syntax.DID 325 - References []syntax.ATURI 326 - 327 - // meta 328 - Created time.Time 329 - } 330 - 331 - func (p *PullComment) AtUri() syntax.ATURI { 332 - return syntax.ATURI(p.CommentAt) 333 307 } 334 308 335 309 func (p *Pull) TotalComments() int { ··· 451 425 addParticipant(s.PullAt.Authority().String()) 452 426 453 427 for _, c := range s.Comments { 454 - addParticipant(c.OwnerDid) 428 + addParticipant(c.Did.String()) 455 429 } 456 430 457 431 return participants
+13 -7
appview/notify/db/db.go
··· 281 281 ) 282 282 } 283 283 284 - func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) { 284 + func (n *databaseNotifier) NewPullComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) { 285 285 l := log.FromContext(ctx) 286 286 287 - pull, err := db.GetPull(n.db, 288 - orm.FilterEq("repo_at", syntax.ATURI(comment.RepoAt)), 289 - orm.FilterEq("pull_id", comment.PullId), 287 + subjectAt := syntax.ATURI(comment.Subject.Uri) 288 + pulls, err := db.GetPulls(n.db, 289 + orm.FilterEq("owner_did", subjectAt.Authority()), 290 + orm.FilterEq("rkey", subjectAt.RecordKey()), 290 291 ) 291 292 if err != nil { 292 - l.Error("failed to get pulls", "err", err) 293 + l.Error("failed to get pull", "err", err) 293 294 return 294 295 } 296 + if len(pulls) == 0 { 297 + l.Error("NewPullComment: no pull found", "aturi", comment.Subject) 298 + return 299 + } 300 + pull := pulls[0] 295 301 296 - repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", comment.RepoAt)) 302 + repo, err := db.GetRepo(n.db, orm.FilterEq("at_uri", pull.RepoAt)) 297 303 if err != nil { 298 304 l.Error("failed to get repos", "err", err) 299 305 return ··· 311 317 recipients.Remove(m) 312 318 } 313 319 314 - actorDid := syntax.DID(comment.OwnerDid) 320 + actorDid := comment.Did 315 321 eventType := models.NotificationTypePullCommented 316 322 entityType := "pull" 317 323 entityId := pull.AtUri().String()
+1 -1
appview/notify/logging/notifier.go
··· 86 86 l.inner.NewPull(ctx, pull) 87 87 } 88 88 89 - func (l *loggingNotifier) NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) { 89 + func (l *loggingNotifier) NewPullComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) { 90 90 ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "NewPullComment")) 91 91 l.inner.NewPullComment(ctx, comment, mentions) 92 92 }
+1 -1
appview/notify/merged_notifier.go
··· 82 82 m.fanout(func(n Notifier) { n.NewPull(ctx, pull) }) 83 83 } 84 84 85 - func (m *mergedNotifier) NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) { 85 + func (m *mergedNotifier) NewPullComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) { 86 86 m.fanout(func(n Notifier) { n.NewPullComment(ctx, comment, mentions) }) 87 87 } 88 88
+2 -2
appview/notify/notifier.go
··· 23 23 DeleteFollow(ctx context.Context, follow *models.Follow) 24 24 25 25 NewPull(ctx context.Context, pull *models.Pull) 26 - NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) 26 + NewPullComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) 27 27 NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) 28 28 29 29 NewIssueLabelOp(ctx context.Context, issue *models.Issue) ··· 64 64 func (m *BaseNotifier) DeleteFollow(ctx context.Context, follow *models.Follow) {} 65 65 66 66 func (m *BaseNotifier) NewPull(ctx context.Context, pull *models.Pull) {} 67 - func (m *BaseNotifier) NewPullComment(ctx context.Context, models *models.PullComment, mentions []syntax.DID) { 67 + func (m *BaseNotifier) NewPullComment(ctx context.Context, models *models.Comment, mentions []syntax.DID) { 68 68 } 69 69 func (m *BaseNotifier) NewPullState(ctx context.Context, actor syntax.DID, pull *models.Pull) {} 70 70
+3 -4
appview/notify/posthog/notifier.go
··· 86 86 } 87 87 } 88 88 89 - func (n *posthogNotifier) NewPullComment(ctx context.Context, comment *models.PullComment, mentions []syntax.DID) { 89 + func (n *posthogNotifier) NewPullComment(ctx context.Context, comment *models.Comment, mentions []syntax.DID) { 90 90 err := n.client.Enqueue(posthog.Capture{ 91 - DistinctId: comment.OwnerDid, 91 + DistinctId: comment.Did.String(), 92 92 Event: "new_pull_comment", 93 93 Properties: posthog.Properties{ 94 - "repo_at": comment.RepoAt, 95 - "pull_id": comment.PullId, 94 + "pull_at": comment.Subject, 96 95 "mentions": mentions, 97 96 }, 98 97 })
+1
appview/oauth/scopes.go
··· 4 4 "atproto", 5 5 6 6 "repo:sh.tangled.actor.profile", 7 + "repo:sh.tangled.feed.comment", 7 8 "repo:sh.tangled.feed.reaction", 8 9 "repo:sh.tangled.feed.star", 9 10 "repo:sh.tangled.graph.follow",
+5 -5
appview/pages/templates/repo/pulls/pull.html
··· 630 630 {{ define "submissionComment" }} 631 631 {{ $comment := index . 0 }} 632 632 {{ $root := index . 1 }} 633 - <div id="comment-{{$comment.ID}}" class="flex gap-2 -ml-4 py-4 w-full mx-auto group/comment"> 633 + <div id="comment-{{$comment.Id}}" class="flex gap-2 -ml-4 py-4 w-full mx-auto group/comment"> 634 634 <!-- left column: profile picture --> 635 635 <div class="flex-shrink-0 h-fit relative"> 636 - {{ template "user/fragments/picLink" (list $comment.OwnerDid "size-8" (index $root.VouchRelationships (did $comment.OwnerDid))) }} 636 + {{ template "user/fragments/picLink" (list $comment.Did.String "size-8" (index $root.VouchRelationships (did $comment.OwnerDid))) }} 637 637 </div> 638 638 <!-- right column: name and body in two rows --> 639 639 <div class="flex-1 min-w-0"> 640 640 <!-- Row 1: Author and timestamp --> 641 641 <div class="text-sm text-gray-500 dark:text-gray-400 flex items-center gap-1 group-target/comment:bg-yellow-200/30 group-target/comment:dark:bg-yellow-600/30"> 642 - {{ $handle := resolve $comment.OwnerDid }} 642 + {{ $handle := resolve $comment.Did.String }} 643 643 <a class="text-gray-500 dark:text-gray-400 hover:text-gray-500 dark:hover:text-gray-300" href="/{{ $handle }}">{{ $handle }}</a> 644 644 <span class="before:content-['·']"></span> 645 - <a class="text-gray-500 dark:text-gray-400 hover:text-gray-500 dark:hover:text-gray-300" href="#comment-{{$comment.ID}}"> 645 + <a class="text-gray-500 dark:text-gray-400 hover:text-gray-500 dark:hover:text-gray-300" href="#comment-{{$comment.Id}}"> 646 646 {{ template "repo/fragments/shortTime" $comment.Created }} 647 647 </a> 648 648 </div> 649 649 <!-- Row 2: Body text --> 650 650 <div class="prose dark:prose-invert mt-1"> 651 - {{ $comment.Body | markdown }} 651 + {{ $comment.Body.Text | markdown }} 652 652 </div> 653 653 </div> 654 654 </div>
+78 -33
appview/pulls/comment.go
··· 14 14 "tangled.org/core/tid" 15 15 16 16 comatproto "github.com/bluesky-social/indigo/api/atproto" 17 + "github.com/bluesky-social/indigo/atproto/syntax" 17 18 lexutil "github.com/bluesky-social/indigo/lex/util" 19 + indigoxrpc "github.com/bluesky-social/indigo/xrpc" 18 20 "github.com/go-chi/chi/v5" 19 21 ) 20 22 ··· 60 62 case http.MethodPost: 61 63 body := r.FormValue("body") 62 64 if body == "" { 63 - s.pages.Notice(w, "pull", "Comment body is required") 65 + s.pages.Notice(w, "pull-comment", "Comment body is required") 64 66 return 65 67 } 66 68 69 + // TODO(boltless): normalize markdown body 70 + normalizedBody := body 67 71 mentions, references := s.mentionsResolver.Resolve(r.Context(), body) 68 72 69 - // Start a transaction 70 - tx, err := s.db.BeginTx(r.Context(), nil) 73 + markdownBody := tangled.MarkupMarkdown{ 74 + Text: normalizedBody, 75 + Original: &body, 76 + Blobs: nil, 77 + } 78 + 79 + // ingest CID of PR record on-demand. 80 + // TODO(boltless): appview should ingest CID of atproto records 81 + cid, err := func() (syntax.CID, error) { 82 + ident, err := s.idResolver.ResolveIdent(r.Context(), pull.OwnerDid) 83 + if err != nil { 84 + return "", err 85 + } 86 + 87 + xrpcc := indigoxrpc.Client{Host: ident.PDSEndpoint()} 88 + out, err := comatproto.RepoGetRecord(r.Context(), &xrpcc, "", tangled.RepoPullNSID, pull.OwnerDid, pull.Rkey) 89 + if err != nil { 90 + return "", err 91 + } 92 + if out.Cid == nil { 93 + return "", fmt.Errorf("record CID is empty") 94 + } 95 + 96 + cid, err := syntax.ParseCID(*out.Cid) 97 + if err != nil { 98 + return "", err 99 + } 100 + 101 + return cid, nil 102 + }() 71 103 if err != nil { 72 - l.Error("failed to start transaction", "err", err) 104 + s.logger.Error("failed to backfill subject PR record", "err", err) 105 + s.pages.Notice(w, "pull-comment", "failed to backfill subject record") 106 + return 107 + } 108 + pullStrongRef := comatproto.RepoStrongRef{ 109 + Uri: pull.AtUri().String(), 110 + Cid: cid.String(), 111 + } 112 + 113 + comment := models.Comment{ 114 + Did: syntax.DID(user.Did), 115 + Collection: tangled.FeedCommentNSID, 116 + Rkey: syntax.RecordKey(tid.TID()), 117 + 118 + Subject: pullStrongRef, 119 + Body: markdownBody, 120 + Created: time.Now(), 121 + ReplyTo: nil, 122 + PullRoundIdx: &roundNumber, 123 + } 124 + if err = comment.Validate(); err != nil { 125 + s.logger.Error("failed to validate comment", "err", err) 73 126 s.pages.Notice(w, "pull-comment", "Failed to create comment.") 74 127 return 75 128 } 76 - defer tx.Rollback() 77 - 78 - createdAt := time.Now().Format(time.RFC3339) 79 129 80 130 client, err := s.oauth.AuthorizedClient(r) 81 131 if err != nil { 82 - l.Error("failed to get authorized client", "err", err) 132 + s.logger.Error("failed to get authorized client", "err", err) 83 133 s.pages.Notice(w, "pull-comment", "Failed to create comment.") 84 134 return 85 135 } 86 - atResp, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 87 - Collection: tangled.RepoPullCommentNSID, 88 - Repo: user.Did, 89 - Rkey: tid.TID(), 90 - Record: &lexutil.LexiconTypeDecoder{ 91 - Val: &tangled.RepoPullComment{ 92 - Pull: pull.AtUri().String(), 93 - Body: body, 94 - CreatedAt: createdAt, 95 - }, 96 - }, 136 + 137 + out, err := comatproto.RepoPutRecord(r.Context(), client, &comatproto.RepoPutRecord_Input{ 138 + Collection: comment.Collection.String(), 139 + Repo: comment.Did.String(), 140 + Rkey: comment.Rkey.String(), 141 + Record: &lexutil.LexiconTypeDecoder{Val: comment.AsRecord()}, 97 142 }) 98 143 if err != nil { 99 - l.Error("failed to create pull comment", "err", err) 144 + s.logger.Error("failed to create pull comment", "err", err) 100 145 s.pages.Notice(w, "pull-comment", "Failed to create comment.") 101 146 return 102 147 } 103 148 104 - comment := &models.PullComment{ 105 - OwnerDid: user.Did, 106 - RepoAt: f.RepoAt().String(), 107 - PullId: pull.PullId, 108 - Body: body, 109 - CommentAt: atResp.Uri, 110 - SubmissionId: pull.Submissions[roundNumber].ID, 111 - Mentions: mentions, 112 - References: references, 149 + comment.Cid = syntax.CID(out.Cid) 150 + 151 + // Start a transaction 152 + tx, err := s.db.BeginTx(r.Context(), nil) 153 + if err != nil { 154 + l.Error("failed to start transaction", "err", err) 155 + s.pages.Notice(w, "pull-comment", "Failed to create comment.") 156 + return 113 157 } 158 + defer tx.Rollback() 114 159 115 - // Create the pull comment in the database with the commentAt field 116 - commentId, err := db.NewPullComment(tx, comment) 160 + // Create the pull comment in the database 161 + err = db.PutComment(tx, &comment, references) 117 162 if err != nil { 118 163 l.Error("failed to create pull comment in database", "err", err) 119 164 s.pages.Notice(w, "pull-comment", "Failed to create comment.") ··· 127 172 return 128 173 } 129 174 130 - s.notifier.NewPullComment(r.Context(), comment, mentions) 175 + s.notifier.NewPullComment(r.Context(), &comment, mentions) 131 176 132 177 ownerSlashRepo := reporesolver.GetBaseRepoPath(r, f) 133 - s.pages.HxLocation(w, fmt.Sprintf("/%s/pulls/%d#comment-%d", ownerSlashRepo, pull.PullId, commentId)) 178 + s.pages.HxLocation(w, fmt.Sprintf("/%s/pulls/%d#comment-%d", ownerSlashRepo, pull.PullId, comment.Id)) 134 179 return 135 180 } 136 181 }
+18 -14
appview/state/state.go
··· 125 125 []string{ 126 126 tangled.ActorProfileNSID, 127 127 tangled.FeedStarNSID, 128 + tangled.FeedCommentNSID, 128 129 tangled.GraphFollowNSID, 129 130 tangled.GraphVouchNSID, 130 131 tangled.KnotMemberNSID, ··· 136 137 tangled.RepoIssueCommentNSID, 137 138 tangled.RepoIssueNSID, 138 139 tangled.RepoPullNSID, 140 + tangled.RepoPullCommentNSID, 139 141 tangled.SpindleMemberNSID, 140 142 tangled.SpindleNSID, 141 143 tangled.StringNSID, ··· 155 157 156 158 if err := BackfillDefaultDefs(d, res, config.Label.DefaultLabelDefs); err != nil { 157 159 return nil, fmt.Errorf("failed to backfill default label defs: %w", err) 158 - } 159 - 160 - ingester := appview.Ingester{ 161 - Db: wrapper, 162 - Enforcer: enforcer, 163 - IdResolver: res, 164 - Cache: rdb, 165 - Config: config, 166 - Logger: log.SubLogger(logger, "ingester"), 167 - Validator: validator, 168 - } 169 - err = jc.StartJetstream(ctx, ingester.Ingest()) 170 - if err != nil { 171 - return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 172 160 } 173 161 174 162 var notifiers []notify.Notifier ··· 186 174 187 175 notifier := notify.NewMergedNotifier(notifiers) 188 176 notifier = lognotify.NewLoggingNotifier(notifier, tlog.SubLogger(logger, "notify")) 177 + 178 + ingester := appview.Ingester{ 179 + Db: wrapper, 180 + Enforcer: enforcer, 181 + IdResolver: res, 182 + Cache: rdb, 183 + Config: config, 184 + Logger: log.SubLogger(logger, "ingester"), 185 + Validator: validator, 186 + MentionsResolver: mentionsResolver, 187 + Notifier: notifier, 188 + } 189 + err = jc.StartJetstream(ctx, ingester.Ingest()) 190 + if err != nil { 191 + return nil, fmt.Errorf("failed to start jetstream watcher: %w", err) 192 + } 189 193 190 194 var cfClient *cloudflare.Client 191 195 if config.Cloudflare.ApiToken != "" {