Monorepo for Tangled
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

appview: drop DbWrapper, move jetstream helpers onto *DB

Lewis: May this revision serve well! <lewis@tangled.org>

Lewis b4d966cc 2d8f99a3

+29 -108
+2 -6
appview/db/jetstream.go
··· 1 1 package db 2 2 3 - type DbWrapper struct { 4 - Execer 5 - } 6 - 7 - func (db DbWrapper) SaveLastTimeUs(lastTimeUs int64) error { 3 + func (db *DB) SaveLastTimeUs(lastTimeUs int64) error { 8 4 _, err := db.Exec(` 9 5 insert into _jetstream (id, last_time_us) 10 6 values (1, ?) ··· 13 9 return err 14 10 } 15 11 16 - func (db DbWrapper) GetLastTimeUs() (int64, error) { 12 + func (db *DB) GetLastTimeUs() (int64, error) { 17 13 var lastTimeUs int64 18 14 row := db.QueryRow(`select last_time_us from _jetstream where id = 1;`) 19 15 err := row.Scan(&lastTimeUs)
+26 -100
appview/ingester.go
··· 34 34 ) 35 35 36 36 type Ingester struct { 37 - Db db.DbWrapper 37 + Db *db.DB 38 38 Enforcer *rbac.Enforcer 39 39 IdResolver *idresolver.Resolver 40 40 Config *config.Config ··· 418 418 PreferredHandle: preferredHandle, 419 419 } 420 420 421 - ddb, ok := i.Db.Execer.(*db.DB) 422 - if !ok { 423 - return fmt.Errorf("failed to index profile record, invalid db cast") 424 - } 425 - 426 - tx, err := ddb.Begin() 421 + tx, err := i.Db.Begin() 427 422 if err != nil { 428 423 return fmt.Errorf("failed to start transaction") 429 424 } ··· 477 472 return err 478 473 } 479 474 480 - ddb, ok := i.Db.Execer.(*db.DB) 481 - if !ok { 482 - return fmt.Errorf("invalid db cast") 483 - } 484 - 485 - err = db.AddSpindleMember(ddb, models.SpindleMember{ 475 + err = db.AddSpindleMember(i.Db, models.SpindleMember{ 486 476 Did: syntax.DID(did), 487 477 Rkey: e.Commit.RKey, 488 478 Instance: record.Instance, ··· 501 491 case jmodels.CommitOperationDelete: 502 492 rkey := e.Commit.RKey 503 493 504 - ddb, ok := i.Db.Execer.(*db.DB) 505 - if !ok { 506 - return fmt.Errorf("failed to index profile record, invalid db cast") 507 - } 508 - 509 494 // get record from db first 510 495 members, err := db.GetSpindleMembers( 511 - ddb, 496 + i.Db, 512 497 orm.FilterEq("did", did), 513 498 orm.FilterEq("rkey", rkey), 514 499 ) ··· 517 502 } 518 503 member := members[0] 519 504 520 - tx, err := ddb.Begin() 505 + tx, err := i.Db.Begin() 521 506 if err != nil { 522 507 return fmt.Errorf("failed to start txn: %w", err) 523 508 } ··· 570 555 571 556 instance := e.Commit.RKey 572 557 573 - ddb, ok := i.Db.Execer.(*db.DB) 574 - if !ok { 575 - return fmt.Errorf("failed to index profile record, invalid db cast") 576 - } 577 - 578 - err := db.AddSpindle(ddb, models.Spindle{ 558 + err := db.AddSpindle(i.Db, models.Spindle{ 579 559 Owner: syntax.DID(did), 580 560 Instance: instance, 581 561 }) ··· 594 574 return err 595 575 } 596 576 597 - _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 577 + _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did) 598 578 if err != nil { 599 579 return fmt.Errorf("failed to mark verified: %w", err) 600 580 } ··· 604 584 case jmodels.CommitOperationDelete: 605 585 instance := e.Commit.RKey 606 586 607 - ddb, ok := i.Db.Execer.(*db.DB) 608 - if !ok { 609 - return fmt.Errorf("failed to index profile record, invalid db cast") 610 - } 611 - 612 587 // get record from db first 613 588 spindles, err := db.GetSpindles( 614 589 ctx, 615 - ddb, 590 + i.Db, 616 591 orm.FilterEq("owner", did), 617 592 orm.FilterEq("instance", instance), 618 593 ) ··· 621 596 } 622 597 spindle := spindles[0] 623 598 624 - tx, err := ddb.Begin() 599 + tx, err := i.Db.Begin() 625 600 if err != nil { 626 601 return err 627 602 } ··· 678 653 679 654 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 680 655 l.Info("ingesting record") 681 - 682 - ddb, ok := i.Db.Execer.(*db.DB) 683 - if !ok { 684 - return fmt.Errorf("failed to index string record, invalid db cast") 685 - } 686 656 687 657 switch e.Commit.Operation { 688 658 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: ··· 701 671 return err 702 672 } 703 673 704 - if err = db.AddString(ddb, string); err != nil { 674 + if err = db.AddString(i.Db, string); err != nil { 705 675 l.Error("failed to add string", "err", err) 706 676 return err 707 677 } ··· 710 680 711 681 case jmodels.CommitOperationDelete: 712 682 if err := db.DeleteString( 713 - ddb, 683 + i.Db, 714 684 orm.FilterEq("did", did), 715 685 orm.FilterEq("rkey", rkey), 716 686 ); err != nil { ··· 795 765 796 766 domain := e.Commit.RKey 797 767 798 - ddb, ok := i.Db.Execer.(*db.DB) 799 - if !ok { 800 - return fmt.Errorf("failed to index profile record, invalid db cast") 801 - } 802 - 803 - err := db.AddKnot(ddb, domain, did) 768 + err := db.AddKnot(i.Db, domain, did) 804 769 if err != nil { 805 770 l.Error("failed to add knot to db", "err", err, "domain", domain) 806 771 return err ··· 818 783 return err 819 784 } 820 785 821 - err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 786 + err = serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did) 822 787 if err != nil { 823 788 return fmt.Errorf("failed to mark verified: %w", err) 824 789 } ··· 828 793 case jmodels.CommitOperationDelete: 829 794 domain := e.Commit.RKey 830 795 831 - ddb, ok := i.Db.Execer.(*db.DB) 832 - if !ok { 833 - return fmt.Errorf("failed to index knot record, invalid db cast") 834 - } 835 - 836 796 // get record from db first 837 797 registrations, err := db.GetRegistrations( 838 - ddb, 798 + i.Db, 839 799 orm.FilterEq("domain", domain), 840 800 orm.FilterEq("did", did), 841 801 ) ··· 847 807 } 848 808 registration := registrations[0] 849 809 850 - tx, err := ddb.Begin() 810 + tx, err := i.Db.Begin() 851 811 if err != nil { 852 812 return err 853 813 } ··· 894 854 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 895 855 l.Info("ingesting record") 896 856 897 - ddb, ok := i.Db.Execer.(*db.DB) 898 - if !ok { 899 - return fmt.Errorf("failed to index issue record, invalid db cast") 900 - } 901 - 902 857 switch e.Commit.Operation { 903 858 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 904 859 raw := json.RawMessage(e.Commit.Record) ··· 919 874 return fmt.Errorf("failed to validate issue: %w", err) 920 875 } 921 876 922 - if record.Repo != nil { 923 - repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo) 924 - if repoErr == nil && repo.RepoDid != "" { 925 - if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.RepoIssueNSID, rkey, *record.Repo); enqErr != nil { 926 - l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 927 - } 928 - } 929 - } 930 - 931 - tx, err := ddb.BeginTx(ctx, nil) 877 + tx, err := i.Db.BeginTx(ctx, nil) 932 878 if err != nil { 933 879 l.Error("failed to begin transaction", "err", err) 934 880 return err ··· 950 896 return nil 951 897 952 898 case jmodels.CommitOperationDelete: 953 - tx, err := ddb.BeginTx(ctx, nil) 899 + tx, err := i.Db.BeginTx(ctx, nil) 954 900 if err != nil { 955 901 l.Error("failed to begin transaction", "err", err) 956 902 return err ··· 984 930 985 931 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 986 932 l.Info("ingesting record") 987 - 988 - ddb, ok := i.Db.Execer.(*db.DB) 989 - if !ok { 990 - return fmt.Errorf("failed to index pull record, invalid db cast") 991 - } 992 933 993 934 switch e.Commit.Operation { 994 935 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: ··· 1072 1013 return fmt.Errorf("failed to validate pull: %w", err) 1073 1014 } 1074 1015 1075 - tx, err := ddb.BeginTx(ctx, nil) 1016 + tx, err := i.Db.BeginTx(ctx, nil) 1076 1017 if err != nil { 1077 1018 l.Error("failed to begin transaction", "err", err) 1078 1019 return err ··· 1094 1035 return nil 1095 1036 1096 1037 case jmodels.CommitOperationDelete: 1097 - tx, err := ddb.BeginTx(ctx, nil) 1038 + tx, err := i.Db.BeginTx(ctx, nil) 1098 1039 if err != nil { 1099 1040 l.Error("failed to begin transaction", "err", err) 1100 1041 return err ··· 1129 1070 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1130 1071 l.Info("ingesting record") 1131 1072 1132 - ddb, ok := i.Db.Execer.(*db.DB) 1133 - if !ok { 1134 - return fmt.Errorf("failed to index issue comment record, invalid db cast") 1135 - } 1136 - 1137 1073 switch e.Commit.Operation { 1138 1074 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1139 1075 raw := json.RawMessage(e.Commit.Record) ··· 1152 1088 return fmt.Errorf("failed to validate comment: %w", err) 1153 1089 } 1154 1090 1155 - tx, err := ddb.Begin() 1091 + tx, err := i.Db.Begin() 1156 1092 if err != nil { 1157 1093 return fmt.Errorf("failed to start transaction: %w", err) 1158 1094 } ··· 1167 1103 1168 1104 case jmodels.CommitOperationDelete: 1169 1105 if err := db.DeleteIssueComments( 1170 - ddb, 1106 + i.Db, 1171 1107 orm.FilterEq("did", did), 1172 1108 orm.FilterEq("rkey", rkey), 1173 1109 ); err != nil { ··· 1188 1124 1189 1125 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1190 1126 l.Info("ingesting record") 1191 - 1192 - ddb, ok := i.Db.Execer.(*db.DB) 1193 - if !ok { 1194 - return fmt.Errorf("failed to index label definition, invalid db cast") 1195 - } 1196 1127 1197 1128 switch e.Commit.Operation { 1198 1129 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: ··· 1212 1143 return fmt.Errorf("failed to validate labeldef: %w", err) 1213 1144 } 1214 1145 1215 - _, err = db.AddLabelDefinition(ddb, def) 1146 + _, err = db.AddLabelDefinition(i.Db, def) 1216 1147 if err != nil { 1217 1148 return fmt.Errorf("failed to create labeldef: %w", err) 1218 1149 } ··· 1221 1152 1222 1153 case jmodels.CommitOperationDelete: 1223 1154 if err := db.DeleteLabelDefinition( 1224 - ddb, 1155 + i.Db, 1225 1156 orm.FilterEq("did", did), 1226 1157 orm.FilterEq("rkey", rkey), 1227 1158 ); err != nil { ··· 1243 1174 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1244 1175 l.Info("ingesting record") 1245 1176 1246 - ddb, ok := i.Db.Execer.(*db.DB) 1247 - if !ok { 1248 - return fmt.Errorf("failed to index label op, invalid db cast") 1249 - } 1250 - 1251 1177 switch e.Commit.Operation { 1252 1178 case jmodels.CommitOperationCreate: 1253 1179 raw := json.RawMessage(e.Commit.Record) ··· 1263 1189 var repo *models.Repo 1264 1190 switch collection { 1265 1191 case tangled.RepoIssueNSID: 1266 - i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject)) 1192 + i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject)) 1267 1193 if err != nil || len(i) != 1 { 1268 1194 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1269 1195 } ··· 1272 1198 return fmt.Errorf("unsupported label subject: %s", collection) 1273 1199 } 1274 1200 1275 - actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels)) 1201 + actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels)) 1276 1202 if err != nil { 1277 1203 return fmt.Errorf("failed to build label application ctx: %w", err) 1278 1204 } ··· 1289 1215 } 1290 1216 } 1291 1217 1292 - tx, err := ddb.Begin() 1218 + tx, err := i.Db.Begin() 1293 1219 if err != nil { 1294 1220 return err 1295 1221 }
+1 -2
appview/state/state.go
··· 110 110 111 111 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 112 112 113 - wrapper := db.DbWrapper{Execer: d} 114 113 jc, err := jetstream.NewJetstreamClient( 115 114 config.Jetstream.Endpoint, 116 115 "appview", ··· 133 132 }, 134 133 nil, 135 134 tlog.SubLogger(logger, "jetstream"), 136 - wrapper, 135 + d, 137 136 false, 138 137 139 138 // in-memory filter is inapplicable to appview so