Signed-off-by: Seongmin Lee git@boltless.me
+58
-221
Diff
round #0
+38
appview/db/db.go
+38
appview/db/db.go
···
1451
1451
return err
1452
1452
})
1453
1453
1454
+
orm.RunMigration(conn, logger, "migrate-legacy-comments", func(tx *sql.Tx) error {
1455
+
_, err := tx.Exec(`
1456
+
insert into pds_migration (name, did, collection, rkey)
1457
+
select
1458
+
'use-feed-comment',
1459
+
did,
1460
+
collection,
1461
+
rkey
1462
+
from comments
1463
+
where collection <> 'sh.tangled.feed.comment';
1464
+
`)
1465
+
return err
1466
+
})
1467
+
1468
+
orm.RunMigration(conn, logger, "unify-pds-record-migration-table", func(tx *sql.Tx) error {
1469
+
_, err := tx.Exec(`
1470
+
insert into pds_migration (
1471
+
name,
1472
+
did,
1473
+
collection,
1474
+
rkey,
1475
+
status,
1476
+
updated_at
1477
+
)
1478
+
select
1479
+
'add-repo-did',
1480
+
user_did,
1481
+
record_nsid,
1482
+
record_rkey,
1483
+
status,
1484
+
updated_at
1485
+
from pds_rewrite_status;
1486
+
1487
+
drop table pds_rewrite_status;
1488
+
`)
1489
+
return err
1490
+
})
1491
+
1454
1492
return &DB{
1455
1493
db,
1456
1494
logger,
+13
-63
appview/db/repos.go
+13
-63
appview/db/repos.go
···
1
1
package db
2
2
3
3
import (
4
+
"context"
4
5
"database/sql"
5
6
"errors"
6
7
"fmt"
···
10
11
"time"
11
12
12
13
"github.com/bluesky-social/indigo/atproto/syntax"
14
+
"tangled.org/core/api/tangled"
13
15
"tangled.org/core/appview/models"
14
16
"tangled.org/core/appview/pagination"
15
17
"tangled.org/core/orm"
···
589
591
return GetRepo(e, orm.FilterEq("repo_did", repoDid))
590
592
}
591
593
594
+
// TODO: just queue every legacy records regardless of target repo has a DID or not.
595
+
// doable after we have `repo_did` column in db for each tables.
592
596
func EnqueuePdsRewritesForRepo(tx *sql.Tx, repoDid, repoAtUri string) error {
593
597
type record struct {
594
598
userDidCol string
595
599
table string
596
-
nsid string
600
+
nsid syntax.NSID
597
601
fkCol string
598
602
}
599
603
sources := []record{
600
-
{"did", "repos", "sh.tangled.repo", "at_uri"},
601
-
{"did", "issues", "sh.tangled.repo.issue", "repo_at"},
602
-
{"owner_did", "pulls", "sh.tangled.repo.pull", "repo_at"},
603
-
{"did", "collaborators", "sh.tangled.repo.collaborator", "repo_at"},
604
-
{"did", "artifacts", "sh.tangled.repo.artifact", "repo_at"},
605
-
{"did", "stars", "sh.tangled.feed.star", "subject_at"},
604
+
{"did", "repos", tangled.RepoNSID, "at_uri"},
605
+
{"did", "issues", tangled.RepoIssueNSID, "repo_at"},
606
+
{"owner_did", "pulls", tangled.RepoPullNSID, "repo_at"},
607
+
{"did", "collaborators", tangled.RepoCollaboratorNSID, "repo_at"},
608
+
{"did", "artifacts", tangled.RepoArchiveNSID, "repo_at"},
609
+
{"did", "stars", tangled.FeedStarNSID, "subject_at"},
606
610
}
607
611
608
612
for _, src := range sources {
···
629
633
}
630
634
631
635
for _, p := range pairs {
632
-
if err := EnqueuePdsRewrite(tx, p.did, repoDid, src.nsid, p.rkey, repoAtUri); err != nil {
636
+
if err := EnqueuePdsRecordMigration(context.Background(), tx, "add-repo-did", syntax.DID(p.did), src.nsid, syntax.RecordKey(p.rkey)); err != nil {
633
637
return fmt.Errorf("enqueue pds rewrite for %s/%s: %w", src.table, p.rkey, err)
634
638
}
635
639
}
···
657
661
}
658
662
659
663
for _, d := range profileDids {
660
-
if err := EnqueuePdsRewrite(tx, d, repoDid, "sh.tangled.actor.profile", "self", repoAtUri); err != nil {
664
+
if err := EnqueuePdsRecordMigration(context.Background(), tx, "add-repo-did", syntax.DID(d), tangled.ActorProfileNSID, "self"); err != nil {
661
665
return fmt.Errorf("enqueue pds rewrite for profile/%s: %w", d, err)
662
666
}
663
667
}
···
665
669
return nil
666
670
}
667
671
668
-
type PdsRewrite struct {
669
-
Id int
670
-
RepoDid string
671
-
RecordNsid string
672
-
RecordRkey string
673
-
OldRepoAt string
674
-
}
675
-
676
-
func GetPendingPdsRewrites(e Execer, userDid string) ([]PdsRewrite, error) {
677
-
rows, err := e.Query(
678
-
`SELECT id, repo_did, record_nsid, record_rkey, old_repo_at
679
-
FROM pds_rewrite_status
680
-
WHERE user_did = ? AND status = 'pending'`,
681
-
userDid,
682
-
)
683
-
if err != nil {
684
-
return nil, err
685
-
}
686
-
defer rows.Close()
687
-
688
-
var rewrites []PdsRewrite
689
-
for rows.Next() {
690
-
var r PdsRewrite
691
-
if err := rows.Scan(&r.Id, &r.RepoDid, &r.RecordNsid, &r.RecordRkey, &r.OldRepoAt); err != nil {
692
-
return nil, err
693
-
}
694
-
rewrites = append(rewrites, r)
695
-
}
696
-
return rewrites, rows.Err()
697
-
}
698
-
699
-
func CompletePdsRewrite(e Execer, id int) error {
700
-
_, err := e.Exec(
701
-
`UPDATE pds_rewrite_status SET status = 'done', updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') WHERE id = ?`,
702
-
id,
703
-
)
704
-
return err
705
-
}
706
-
707
-
func EnqueuePdsRewrite(e Execer, userDid, repoDid, recordNsid, recordRkey, oldRepoAt string) error {
708
-
_, err := e.Exec(
709
-
`INSERT INTO pds_rewrite_status
710
-
(user_did, repo_did, record_nsid, record_rkey, old_repo_at, status)
711
-
VALUES (?, ?, ?, ?, ?, 'pending')
712
-
ON CONFLICT(user_did, record_nsid, record_rkey) DO UPDATE SET
713
-
status = 'pending',
714
-
repo_did = excluded.repo_did,
715
-
old_repo_at = excluded.old_repo_at,
716
-
updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')`,
717
-
userDid, repoDid, recordNsid, recordRkey, oldRepoAt,
718
-
)
719
-
return err
720
-
}
721
-
722
672
func CascadeRepoDid(tx *sql.Tx, repoAtUri, repoDid string) error {
723
673
_, err := tx.Exec(
724
674
`UPDATE repos SET repo_did = ? WHERE at_uri = ?`,
+7
-7
appview/ingester.go
+7
-7
appview/ingester.go
···
70
70
case tangled.GraphVouchNSID:
71
71
err = i.ingestVouch(ctx, e)
72
72
case tangled.FeedStarNSID:
73
-
err = i.ingestStar(e)
73
+
err = i.ingestStar(ctx, e)
74
74
case tangled.PublicKeyNSID:
75
75
err = i.ingestPublicKey(e)
76
76
case tangled.RepoArtifactNSID:
77
-
err = i.ingestArtifact(e)
77
+
err = i.ingestArtifact(ctx, e)
78
78
case tangled.ActorProfileNSID:
79
79
err = i.ingestProfile(ctx, e)
80
80
case tangled.SpindleMemberNSID:
···
114
114
}
115
115
}
116
116
117
-
func (i *Ingester) ingestStar(e *jmodels.Event) error {
117
+
func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event) error {
118
118
var err error
119
119
did := e.Did
120
120
···
154
154
star.RepoAt = subjectUri
155
155
repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String())
156
156
if repoErr == nil && repo.RepoDid != "" {
157
-
if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.FeedStarNSID, e.Commit.RKey, *record.Subject); enqErr != nil {
157
+
if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.FeedStarNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
158
158
l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
159
159
}
160
160
}
···
325
325
return nil
326
326
}
327
327
328
-
func (i *Ingester) ingestArtifact(e *jmodels.Event) error {
328
+
func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event) error {
329
329
did := e.Did
330
330
var err error
331
331
···
373
373
repoDid = *record.RepoDid
374
374
}
375
375
if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil {
376
-
if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey, *record.Repo); enqErr != nil {
376
+
if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
377
377
l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid)
378
378
}
379
379
}
···
1011
1011
if record.Repo != nil {
1012
1012
repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo)
1013
1013
if repoErr == nil && repo.RepoDid != "" {
1014
-
if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.RepoIssueNSID, rkey, *record.Repo); enqErr != nil {
1014
+
if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil {
1015
1015
l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
1016
1016
}
1017
1017
}
-148
appview/oauth/handler.go
-148
appview/oauth/handler.go
···
13
13
"time"
14
14
15
15
comatproto "github.com/bluesky-social/indigo/api/atproto"
16
-
atpclient "github.com/bluesky-social/indigo/atproto/atclient"
17
16
"github.com/bluesky-social/indigo/atproto/auth/oauth"
18
17
lexutil "github.com/bluesky-social/indigo/lex/util"
19
18
xrpc "github.com/bluesky-social/indigo/xrpc"
···
273
272
l.Debug("successfully created empty Tangled profile on PDS and DB")
274
273
}
275
274
276
-
func (o *OAuth) PdsRewriteMiddleware(next http.Handler) http.Handler {
277
-
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
278
-
defer next.ServeHTTP(w, r)
279
-
280
-
sess, err := o.ResumeSession(r)
281
-
if err != nil {
282
-
return
283
-
}
284
-
285
-
go o.drainPdsRewrites(sess.Data)
286
-
})
287
-
}
288
-
289
-
func (o *OAuth) drainPdsRewrites(sessData *oauth.ClientSessionData) {
290
-
ctx := context.Background()
291
-
did := sessData.AccountDID.String()
292
-
l := o.Logger.With("did", did, "handler", "drainPdsRewrites")
293
-
294
-
rewrites, err := db.GetPendingPdsRewrites(o.Db, did)
295
-
if err != nil {
296
-
l.Error("failed to get pending rewrites", "err", err)
297
-
return
298
-
}
299
-
if len(rewrites) == 0 {
300
-
return
301
-
}
302
-
303
-
l.Info("draining pending PDS rewrites", "count", len(rewrites))
304
-
305
-
sess, err := o.ClientApp.ResumeSession(ctx, sessData.AccountDID, sessData.SessionID)
306
-
if err != nil {
307
-
l.Error("failed to resume session for PDS rewrites", "err", err)
308
-
return
309
-
}
310
-
client := sess.APIClient()
311
-
312
-
for _, rw := range rewrites {
313
-
if err := o.rewritePdsRecord(ctx, client, did, rw); err != nil {
314
-
l.Error("failed to rewrite PDS record",
315
-
"nsid", rw.RecordNsid,
316
-
"rkey", rw.RecordRkey,
317
-
"repo_did", rw.RepoDid,
318
-
"err", err)
319
-
continue
320
-
}
321
-
322
-
if err := db.CompletePdsRewrite(o.Db, rw.Id); err != nil {
323
-
l.Error("failed to mark rewrite complete", "id", rw.Id, "err", err)
324
-
}
325
-
}
326
-
}
327
-
328
-
func (o *OAuth) rewritePdsRecord(ctx context.Context, client *atpclient.APIClient, userDid string, rw db.PdsRewrite) error {
329
-
ex, err := comatproto.RepoGetRecord(ctx, client, "", rw.RecordNsid, userDid, rw.RecordRkey)
330
-
if err != nil {
331
-
return fmt.Errorf("get record: %w", err)
332
-
}
333
-
334
-
val := ex.Value.Val
335
-
repoDid := rw.RepoDid
336
-
337
-
switch rw.RecordNsid {
338
-
case tangled.RepoNSID:
339
-
rec, ok := val.(*tangled.Repo)
340
-
if !ok {
341
-
return fmt.Errorf("unexpected type for repo record")
342
-
}
343
-
rec.RepoDid = &repoDid
344
-
345
-
case tangled.RepoIssueNSID:
346
-
rec, ok := val.(*tangled.RepoIssue)
347
-
if !ok {
348
-
return fmt.Errorf("unexpected type for issue record")
349
-
}
350
-
rec.RepoDid = &repoDid
351
-
352
-
case tangled.RepoPullNSID:
353
-
rec, ok := val.(*tangled.RepoPull)
354
-
if !ok {
355
-
return fmt.Errorf("unexpected type for pull record")
356
-
}
357
-
if rec.Target != nil {
358
-
rec.Target.RepoDid = &repoDid
359
-
}
360
-
if rec.Source != nil && rec.Source.Repo != nil && *rec.Source.Repo == rw.OldRepoAt {
361
-
rec.Source.RepoDid = &repoDid
362
-
}
363
-
364
-
case tangled.RepoCollaboratorNSID:
365
-
rec, ok := val.(*tangled.RepoCollaborator)
366
-
if !ok {
367
-
return fmt.Errorf("unexpected type for collaborator record")
368
-
}
369
-
rec.RepoDid = &repoDid
370
-
371
-
case tangled.RepoArtifactNSID:
372
-
rec, ok := val.(*tangled.RepoArtifact)
373
-
if !ok {
374
-
return fmt.Errorf("unexpected type for artifact record")
375
-
}
376
-
rec.RepoDid = &repoDid
377
-
378
-
case tangled.FeedStarNSID:
379
-
rec, ok := val.(*tangled.FeedStar)
380
-
if !ok {
381
-
return fmt.Errorf("unexpected type for star record")
382
-
}
383
-
rec.SubjectDid = &repoDid
384
-
385
-
case tangled.ActorProfileNSID:
386
-
rec, ok := val.(*tangled.ActorProfile)
387
-
if !ok {
388
-
return fmt.Errorf("unexpected type for profile record")
389
-
}
390
-
rewritten := make([]string, 0, len(rec.PinnedRepositories))
391
-
for _, pin := range rec.PinnedRepositories {
392
-
if strings.HasPrefix(pin, "did:") {
393
-
rewritten = append(rewritten, pin)
394
-
continue
395
-
}
396
-
repo, repoErr := db.GetRepoByAtUri(o.Db, pin)
397
-
if repoErr != nil || repo.RepoDid == "" {
398
-
rewritten = append(rewritten, pin)
399
-
continue
400
-
}
401
-
rewritten = append(rewritten, repo.RepoDid)
402
-
}
403
-
rec.PinnedRepositories = rewritten
404
-
405
-
default:
406
-
return fmt.Errorf("unsupported NSID for PDS rewrite: %s", rw.RecordNsid)
407
-
}
408
-
409
-
_, err = comatproto.RepoPutRecord(ctx, client, &comatproto.RepoPutRecord_Input{
410
-
Collection: rw.RecordNsid,
411
-
Repo: userDid,
412
-
Rkey: rw.RecordRkey,
413
-
SwapRecord: ex.Cid,
414
-
Record: &lexutil.LexiconTypeDecoder{Val: val},
415
-
})
416
-
if err != nil {
417
-
return fmt.Errorf("put record: %w", err)
418
-
}
419
-
420
-
return nil
421
-
}
422
-
423
275
// create a AppPasswordSession using apppasswords
424
276
type AppPasswordSession struct {
425
277
AccessJwt string `json:"accessJwt"`
-3
appview/state/router.go
-3
appview/state/router.go
History
2 rounds
0 comments
boltless.me
submitted
#1
1 commit
expand
collapse
appview: unify pds record migration
Signed-off-by: Seongmin Lee <git@boltless.me>
merge conflicts detected
expand
collapse
expand
collapse
- appview/db/repos.go:1
- appview/ingester.go:70
- appview/oauth/handler.go:13
- appview/state/router.go:38
expand 0 comments
boltless.me
submitted
#0
1 commit
expand
collapse
appview: unify pds record migration
Signed-off-by: Seongmin Lee <git@boltless.me>