Monorepo for Tangled tangled.org
856
fork

Configure Feed

Select the types of activity you want to include in your feed.

appview: unify pds record migration

Signed-off-by: Seongmin Lee <git@boltless.me>

authored by

Seongmin Lee and committed by
Tangled
25f18960 fd69c1b7

+44 -221
+24
appview/db/db.go
··· 1451 1451 return err 1452 1452 }) 1453 1453 1454 + orm.RunMigration(conn, logger, "unify-pds-record-migration-table", func(tx *sql.Tx) error { 1455 + _, err := tx.Exec(` 1456 + insert into pds_migration ( 1457 + name, 1458 + did, 1459 + collection, 1460 + rkey, 1461 + status, 1462 + updated_at 1463 + ) 1464 + select 1465 + 'add-repo-did', 1466 + user_did, 1467 + record_nsid, 1468 + record_rkey, 1469 + status, 1470 + updated_at 1471 + from pds_rewrite_status; 1472 + 1473 + drop table pds_rewrite_status; 1474 + `) 1475 + return err 1476 + }) 1477 + 1454 1478 return &DB{ 1455 1479 db, 1456 1480 logger,
+13 -63
appview/db/repos.go
··· 1 1 package db 2 2 3 3 import ( 4 + "context" 4 5 "database/sql" 5 6 "errors" 6 7 "fmt" ··· 10 11 "time" 11 12 12 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 + "tangled.org/core/api/tangled" 13 15 "tangled.org/core/appview/models" 14 16 "tangled.org/core/appview/pagination" 15 17 "tangled.org/core/orm" ··· 589 591 return GetRepo(e, orm.FilterEq("repo_did", repoDid)) 590 592 } 591 593 594 + // TODO: just queue every legacy records regardless of target repo has a DID or not. 595 + // doable after we have `repo_did` column in db for each tables. 592 596 func EnqueuePdsRewritesForRepo(tx *sql.Tx, repoDid, repoAtUri string) error { 593 597 type record struct { 594 598 userDidCol string 595 599 table string 596 - nsid string 600 + nsid syntax.NSID 597 601 fkCol string 598 602 } 599 603 sources := []record{ 600 - {"did", "repos", "sh.tangled.repo", "at_uri"}, 601 - {"did", "issues", "sh.tangled.repo.issue", "repo_at"}, 602 - {"owner_did", "pulls", "sh.tangled.repo.pull", "repo_at"}, 603 - {"did", "collaborators", "sh.tangled.repo.collaborator", "repo_at"}, 604 - {"did", "artifacts", "sh.tangled.repo.artifact", "repo_at"}, 605 - {"did", "stars", "sh.tangled.feed.star", "subject_at"}, 604 + {"did", "repos", tangled.RepoNSID, "at_uri"}, 605 + {"did", "issues", tangled.RepoIssueNSID, "repo_at"}, 606 + {"owner_did", "pulls", tangled.RepoPullNSID, "repo_at"}, 607 + {"did", "collaborators", tangled.RepoCollaboratorNSID, "repo_at"}, 608 + {"did", "artifacts", tangled.RepoArchiveNSID, "repo_at"}, 609 + {"did", "stars", tangled.FeedStarNSID, "subject_at"}, 606 610 } 607 611 608 612 for _, src := range sources { ··· 629 633 } 630 634 631 635 for _, p := range pairs { 632 - if err := EnqueuePdsRewrite(tx, p.did, repoDid, src.nsid, p.rkey, repoAtUri); err != nil { 636 + if err := EnqueuePdsRecordMigration(context.Background(), tx, "add-repo-did", syntax.DID(p.did), src.nsid, syntax.RecordKey(p.rkey)); err != nil { 633 637 return fmt.Errorf("enqueue pds rewrite for %s/%s: %w", src.table, p.rkey, err) 634 638 } 635 639 } ··· 657 661 } 658 662 659 663 for _, d := range profileDids { 660 - if err := EnqueuePdsRewrite(tx, d, repoDid, "sh.tangled.actor.profile", "self", repoAtUri); err != nil { 664 + if err := EnqueuePdsRecordMigration(context.Background(), tx, "add-repo-did", syntax.DID(d), tangled.ActorProfileNSID, "self"); err != nil { 661 665 return fmt.Errorf("enqueue pds rewrite for profile/%s: %w", d, err) 662 666 } 663 667 } 664 668 665 669 return nil 666 - } 667 - 668 - type PdsRewrite struct { 669 - Id int 670 - RepoDid string 671 - RecordNsid string 672 - RecordRkey string 673 - OldRepoAt string 674 - } 675 - 676 - func GetPendingPdsRewrites(e Execer, userDid string) ([]PdsRewrite, error) { 677 - rows, err := e.Query( 678 - `SELECT id, repo_did, record_nsid, record_rkey, old_repo_at 679 - FROM pds_rewrite_status 680 - WHERE user_did = ? AND status = 'pending'`, 681 - userDid, 682 - ) 683 - if err != nil { 684 - return nil, err 685 - } 686 - defer rows.Close() 687 - 688 - var rewrites []PdsRewrite 689 - for rows.Next() { 690 - var r PdsRewrite 691 - if err := rows.Scan(&r.Id, &r.RepoDid, &r.RecordNsid, &r.RecordRkey, &r.OldRepoAt); err != nil { 692 - return nil, err 693 - } 694 - rewrites = append(rewrites, r) 695 - } 696 - return rewrites, rows.Err() 697 - } 698 - 699 - func CompletePdsRewrite(e Execer, id int) error { 700 - _, err := e.Exec( 701 - `UPDATE pds_rewrite_status SET status = 'done', updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now') WHERE id = ?`, 702 - id, 703 - ) 704 - return err 705 - } 706 - 707 - func EnqueuePdsRewrite(e Execer, userDid, repoDid, recordNsid, recordRkey, oldRepoAt string) error { 708 - _, err := e.Exec( 709 - `INSERT INTO pds_rewrite_status 710 - (user_did, repo_did, record_nsid, record_rkey, old_repo_at, status) 711 - VALUES (?, ?, ?, ?, ?, 'pending') 712 - ON CONFLICT(user_did, record_nsid, record_rkey) DO UPDATE SET 713 - status = 'pending', 714 - repo_did = excluded.repo_did, 715 - old_repo_at = excluded.old_repo_at, 716 - updated_at = strftime('%Y-%m-%dT%H:%M:%SZ', 'now')`, 717 - userDid, repoDid, recordNsid, recordRkey, oldRepoAt, 718 - ) 719 - return err 720 670 } 721 671 722 672 func CascadeRepoDid(tx *sql.Tx, repoAtUri, repoDid string) error {
+7 -7
appview/ingester.go
··· 70 70 case tangled.GraphVouchNSID: 71 71 err = i.ingestVouch(ctx, e) 72 72 case tangled.FeedStarNSID: 73 - err = i.ingestStar(e) 73 + err = i.ingestStar(ctx, e) 74 74 case tangled.PublicKeyNSID: 75 75 err = i.ingestPublicKey(e) 76 76 case tangled.RepoArtifactNSID: 77 - err = i.ingestArtifact(e) 77 + err = i.ingestArtifact(ctx, e) 78 78 case tangled.ActorProfileNSID: 79 79 err = i.ingestProfile(ctx, e) 80 80 case tangled.SpindleMemberNSID: ··· 114 114 } 115 115 } 116 116 117 - func (i *Ingester) ingestStar(e *jmodels.Event) error { 117 + func (i *Ingester) ingestStar(ctx context.Context, e *jmodels.Event) error { 118 118 var err error 119 119 did := e.Did 120 120 ··· 154 154 star.RepoAt = subjectUri 155 155 repo, repoErr := db.GetRepoByAtUri(i.Db, subjectUri.String()) 156 156 if repoErr == nil && repo.RepoDid != "" { 157 - if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.FeedStarNSID, e.Commit.RKey, *record.Subject); enqErr != nil { 157 + if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.FeedStarNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 158 158 l.Warn("failed to enqueue PDS rewrite for star", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 159 159 } 160 160 } ··· 325 325 return nil 326 326 } 327 327 328 - func (i *Ingester) ingestArtifact(e *jmodels.Event) error { 328 + func (i *Ingester) ingestArtifact(ctx context.Context, e *jmodels.Event) error { 329 329 did := e.Did 330 330 var err error 331 331 ··· 373 373 repoDid = *record.RepoDid 374 374 } 375 375 if repoDid != "" && (record.RepoDid == nil || *record.RepoDid == "") && record.Repo != nil { 376 - if enqErr := db.EnqueuePdsRewrite(i.Db, did, repoDid, tangled.RepoArtifactNSID, e.Commit.RKey, *record.Repo); enqErr != nil { 376 + if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoArtifactNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 377 377 l.Warn("failed to enqueue PDS rewrite for artifact", "err", enqErr, "did", did, "repoDid", repoDid) 378 378 } 379 379 } ··· 1011 1011 if record.Repo != nil { 1012 1012 repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo) 1013 1013 if repoErr == nil && repo.RepoDid != "" { 1014 - if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.RepoIssueNSID, rkey, *record.Repo); enqErr != nil { 1014 + if enqErr := db.EnqueuePdsRecordMigration(ctx, i.Db, "add-repo-did", syntax.DID(did), syntax.NSID(tangled.RepoIssueNSID), syntax.RecordKey(e.Commit.RKey)); enqErr != nil { 1015 1015 l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 1016 1016 } 1017 1017 }
-148
appview/oauth/handler.go
··· 13 13 "time" 14 14 15 15 comatproto "github.com/bluesky-social/indigo/api/atproto" 16 - atpclient "github.com/bluesky-social/indigo/atproto/atclient" 17 16 "github.com/bluesky-social/indigo/atproto/auth/oauth" 18 17 lexutil "github.com/bluesky-social/indigo/lex/util" 19 18 xrpc "github.com/bluesky-social/indigo/xrpc" ··· 271 270 } 272 271 273 272 l.Debug("successfully created empty Tangled profile on PDS and DB") 274 - } 275 - 276 - func (o *OAuth) PdsRewriteMiddleware(next http.Handler) http.Handler { 277 - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { 278 - defer next.ServeHTTP(w, r) 279 - 280 - sess, err := o.ResumeSession(r) 281 - if err != nil { 282 - return 283 - } 284 - 285 - go o.drainPdsRewrites(sess.Data) 286 - }) 287 - } 288 - 289 - func (o *OAuth) drainPdsRewrites(sessData *oauth.ClientSessionData) { 290 - ctx := context.Background() 291 - did := sessData.AccountDID.String() 292 - l := o.Logger.With("did", did, "handler", "drainPdsRewrites") 293 - 294 - rewrites, err := db.GetPendingPdsRewrites(o.Db, did) 295 - if err != nil { 296 - l.Error("failed to get pending rewrites", "err", err) 297 - return 298 - } 299 - if len(rewrites) == 0 { 300 - return 301 - } 302 - 303 - l.Info("draining pending PDS rewrites", "count", len(rewrites)) 304 - 305 - sess, err := o.ClientApp.ResumeSession(ctx, sessData.AccountDID, sessData.SessionID) 306 - if err != nil { 307 - l.Error("failed to resume session for PDS rewrites", "err", err) 308 - return 309 - } 310 - client := sess.APIClient() 311 - 312 - for _, rw := range rewrites { 313 - if err := o.rewritePdsRecord(ctx, client, did, rw); err != nil { 314 - l.Error("failed to rewrite PDS record", 315 - "nsid", rw.RecordNsid, 316 - "rkey", rw.RecordRkey, 317 - "repo_did", rw.RepoDid, 318 - "err", err) 319 - continue 320 - } 321 - 322 - if err := db.CompletePdsRewrite(o.Db, rw.Id); err != nil { 323 - l.Error("failed to mark rewrite complete", "id", rw.Id, "err", err) 324 - } 325 - } 326 - } 327 - 328 - func (o *OAuth) rewritePdsRecord(ctx context.Context, client *atpclient.APIClient, userDid string, rw db.PdsRewrite) error { 329 - ex, err := comatproto.RepoGetRecord(ctx, client, "", rw.RecordNsid, userDid, rw.RecordRkey) 330 - if err != nil { 331 - return fmt.Errorf("get record: %w", err) 332 - } 333 - 334 - val := ex.Value.Val 335 - repoDid := rw.RepoDid 336 - 337 - switch rw.RecordNsid { 338 - case tangled.RepoNSID: 339 - rec, ok := val.(*tangled.Repo) 340 - if !ok { 341 - return fmt.Errorf("unexpected type for repo record") 342 - } 343 - rec.RepoDid = &repoDid 344 - 345 - case tangled.RepoIssueNSID: 346 - rec, ok := val.(*tangled.RepoIssue) 347 - if !ok { 348 - return fmt.Errorf("unexpected type for issue record") 349 - } 350 - rec.RepoDid = &repoDid 351 - 352 - case tangled.RepoPullNSID: 353 - rec, ok := val.(*tangled.RepoPull) 354 - if !ok { 355 - return fmt.Errorf("unexpected type for pull record") 356 - } 357 - if rec.Target != nil { 358 - rec.Target.RepoDid = &repoDid 359 - } 360 - if rec.Source != nil && rec.Source.Repo != nil && *rec.Source.Repo == rw.OldRepoAt { 361 - rec.Source.RepoDid = &repoDid 362 - } 363 - 364 - case tangled.RepoCollaboratorNSID: 365 - rec, ok := val.(*tangled.RepoCollaborator) 366 - if !ok { 367 - return fmt.Errorf("unexpected type for collaborator record") 368 - } 369 - rec.RepoDid = &repoDid 370 - 371 - case tangled.RepoArtifactNSID: 372 - rec, ok := val.(*tangled.RepoArtifact) 373 - if !ok { 374 - return fmt.Errorf("unexpected type for artifact record") 375 - } 376 - rec.RepoDid = &repoDid 377 - 378 - case tangled.FeedStarNSID: 379 - rec, ok := val.(*tangled.FeedStar) 380 - if !ok { 381 - return fmt.Errorf("unexpected type for star record") 382 - } 383 - rec.SubjectDid = &repoDid 384 - 385 - case tangled.ActorProfileNSID: 386 - rec, ok := val.(*tangled.ActorProfile) 387 - if !ok { 388 - return fmt.Errorf("unexpected type for profile record") 389 - } 390 - rewritten := make([]string, 0, len(rec.PinnedRepositories)) 391 - for _, pin := range rec.PinnedRepositories { 392 - if strings.HasPrefix(pin, "did:") { 393 - rewritten = append(rewritten, pin) 394 - continue 395 - } 396 - repo, repoErr := db.GetRepoByAtUri(o.Db, pin) 397 - if repoErr != nil || repo.RepoDid == "" { 398 - rewritten = append(rewritten, pin) 399 - continue 400 - } 401 - rewritten = append(rewritten, repo.RepoDid) 402 - } 403 - rec.PinnedRepositories = rewritten 404 - 405 - default: 406 - return fmt.Errorf("unsupported NSID for PDS rewrite: %s", rw.RecordNsid) 407 - } 408 - 409 - _, err = comatproto.RepoPutRecord(ctx, client, &comatproto.RepoPutRecord_Input{ 410 - Collection: rw.RecordNsid, 411 - Repo: userDid, 412 - Rkey: rw.RecordRkey, 413 - SwapRecord: ex.Cid, 414 - Record: &lexutil.LexiconTypeDecoder{Val: val}, 415 - }) 416 - if err != nil { 417 - return fmt.Errorf("put record: %w", err) 418 - } 419 - 420 - return nil 421 273 } 422 274 423 275 // create a AppPasswordSession using apppasswords
-3
appview/state/router.go
··· 38 38 s.logger, 39 39 ) 40 40 41 - // TODO(boltless): merge this into BackgroundMigrationMiddleware 42 - router.Use(s.oauth.PdsRewriteMiddleware) 43 - 44 41 m := migration.NewMigration(s.db, s.oauth, s.idResolver.Directory(), s.logger) 45 42 router.Use(m.BackgroundMigrationMiddleware) 46 43