Monorepo for Tangled tangled.org
758
fork

Configure Feed

Select the types of activity you want to include in your feed.

appview: drop DbWrapper, move jetstream helpers onto *DB #273

open opened by oyster.cafe targeting master from lt/repo-rename-by-rkey
Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:3fwecdnvtcscjnrx2p4n7alz/sh.tangled.repo.pull/3mjm6w2jgzj22
+29 -108
Diff #0
+2 -6
appview/db/jetstream.go
··· 1 1 package db 2 2 3 - type DbWrapper struct { 4 - Execer 5 - } 6 - 7 - func (db DbWrapper) SaveLastTimeUs(lastTimeUs int64) error { 3 + func (db *DB) SaveLastTimeUs(lastTimeUs int64) error { 8 4 _, err := db.Exec(` 9 5 insert into _jetstream (id, last_time_us) 10 6 values (1, ?) ··· 13 9 return err 14 10 } 15 11 16 - func (db DbWrapper) GetLastTimeUs() (int64, error) { 12 + func (db *DB) GetLastTimeUs() (int64, error) { 17 13 var lastTimeUs int64 18 14 row := db.QueryRow(`select last_time_us from _jetstream where id = 1;`) 19 15 err := row.Scan(&lastTimeUs)
+26 -100
appview/ingester.go
··· 35 35 ) 36 36 37 37 type Ingester struct { 38 - Db db.DbWrapper 38 + Db *db.DB 39 39 Enforcer *rbac.Enforcer 40 40 IdResolver *idresolver.Resolver 41 41 Cache *cache.Cache ··· 494 494 PreferredHandle: preferredHandle, 495 495 } 496 496 497 - ddb, ok := i.Db.Execer.(*db.DB) 498 - if !ok { 499 - return fmt.Errorf("failed to index profile record, invalid db cast") 500 - } 501 - 502 - tx, err := ddb.Begin() 497 + tx, err := i.Db.Begin() 503 498 if err != nil { 504 499 return fmt.Errorf("failed to start transaction") 505 500 } ··· 566 561 return err 567 562 } 568 563 569 - ddb, ok := i.Db.Execer.(*db.DB) 570 - if !ok { 571 - return fmt.Errorf("invalid db cast") 572 - } 573 - 574 - err = db.AddSpindleMember(ddb, models.SpindleMember{ 564 + err = db.AddSpindleMember(i.Db, models.SpindleMember{ 575 565 Did: syntax.DID(did), 576 566 Rkey: e.Commit.RKey, 577 567 Instance: record.Instance, ··· 590 580 case jmodels.CommitOperationDelete: 591 581 rkey := e.Commit.RKey 592 582 593 - ddb, ok := i.Db.Execer.(*db.DB) 594 - if !ok { 595 - return fmt.Errorf("failed to index profile record, invalid db cast") 596 - } 597 - 598 583 // get record from db first 599 584 members, err := db.GetSpindleMembers( 600 - ddb, 585 + i.Db, 601 586 orm.FilterEq("did", did), 602 587 orm.FilterEq("rkey", rkey), 603 588 ) ··· 606 591 } 607 592 member := members[0] 608 593 609 - tx, err := ddb.Begin() 594 + tx, err := i.Db.Begin() 610 595 if err != nil { 611 596 return fmt.Errorf("failed to start txn: %w", err) 612 597 } ··· 659 644 660 645 instance := e.Commit.RKey 661 646 662 - ddb, ok := i.Db.Execer.(*db.DB) 663 - if !ok { 664 - return fmt.Errorf("failed to index profile record, invalid db cast") 665 - } 666 - 667 - err := db.AddSpindle(ddb, models.Spindle{ 647 + err := db.AddSpindle(i.Db, models.Spindle{ 668 648 Owner: syntax.DID(did), 669 649 Instance: instance, 670 650 }) ··· 683 663 return err 684 664 } 685 665 686 - _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 666 + _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did) 687 667 if err != nil { 688 668 return fmt.Errorf("failed to mark verified: %w", err) 689 669 } ··· 693 673 case jmodels.CommitOperationDelete: 694 674 instance := e.Commit.RKey 695 675 696 - ddb, ok := i.Db.Execer.(*db.DB) 697 - if !ok { 698 - return fmt.Errorf("failed to index profile record, invalid db cast") 699 - } 700 - 701 676 // get record from db first 702 677 spindles, err := db.GetSpindles( 703 678 ctx, 704 - ddb, 679 + i.Db, 705 680 orm.FilterEq("owner", did), 706 681 orm.FilterEq("instance", instance), 707 682 ) ··· 710 685 } 711 686 spindle := spindles[0] 712 687 713 - tx, err := ddb.Begin() 688 + tx, err := i.Db.Begin() 714 689 if err != nil { 715 690 return err 716 691 } ··· 768 743 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 769 744 l.Info("ingesting record") 770 745 771 - ddb, ok := i.Db.Execer.(*db.DB) 772 - if !ok { 773 - return fmt.Errorf("failed to index string record, invalid db cast") 774 - } 775 - 776 746 switch e.Commit.Operation { 777 747 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 778 748 raw := json.RawMessage(e.Commit.Record) ··· 790 760 return err 791 761 } 792 762 793 - if err = db.AddString(ddb, string); err != nil { 763 + if err = db.AddString(i.Db, string); err != nil { 794 764 l.Error("failed to add string", "err", err) 795 765 return err 796 766 } ··· 799 769 800 770 case jmodels.CommitOperationDelete: 801 771 if err := db.DeleteString( 802 - ddb, 772 + i.Db, 803 773 orm.FilterEq("did", did), 804 774 orm.FilterEq("rkey", rkey), 805 775 ); err != nil { ··· 884 854 885 855 domain := e.Commit.RKey 886 856 887 - ddb, ok := i.Db.Execer.(*db.DB) 888 - if !ok { 889 - return fmt.Errorf("failed to index profile record, invalid db cast") 890 - } 891 - 892 - err := db.AddKnot(ddb, domain, did) 857 + err := db.AddKnot(i.Db, domain, did) 893 858 if err != nil { 894 859 l.Error("failed to add knot to db", "err", err, "domain", domain) 895 860 return err ··· 907 872 return err 908 873 } 909 874 910 - err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 875 + err = serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did) 911 876 if err != nil { 912 877 return fmt.Errorf("failed to mark verified: %w", err) 913 878 } ··· 917 882 case jmodels.CommitOperationDelete: 918 883 domain := e.Commit.RKey 919 884 920 - ddb, ok := i.Db.Execer.(*db.DB) 921 - if !ok { 922 - return fmt.Errorf("failed to index knot record, invalid db cast") 923 - } 924 - 925 885 // get record from db first 926 886 registrations, err := db.GetRegistrations( 927 - ddb, 887 + i.Db, 928 888 orm.FilterEq("domain", domain), 929 889 orm.FilterEq("did", did), 930 890 ) ··· 936 896 } 937 897 registration := registrations[0] 938 898 939 - tx, err := ddb.Begin() 899 + tx, err := i.Db.Begin() 940 900 if err != nil { 941 901 return err 942 902 } ··· 983 943 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 984 944 l.Info("ingesting record") 985 945 986 - ddb, ok := i.Db.Execer.(*db.DB) 987 - if !ok { 988 - return fmt.Errorf("failed to index issue record, invalid db cast") 989 - } 990 - 991 946 switch e.Commit.Operation { 992 947 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 993 948 raw := json.RawMessage(e.Commit.Record) ··· 1008 963 return fmt.Errorf("failed to validate issue: %w", err) 1009 964 } 1010 965 1011 - if record.Repo != nil { 1012 - repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo) 1013 - if repoErr == nil && repo.RepoDid != "" { 1014 - if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.RepoIssueNSID, rkey, *record.Repo); enqErr != nil { 1015 - l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid) 1016 - } 1017 - } 1018 - } 1019 - 1020 - tx, err := ddb.BeginTx(ctx, nil) 966 + tx, err := i.Db.BeginTx(ctx, nil) 1021 967 if err != nil { 1022 968 l.Error("failed to begin transaction", "err", err) 1023 969 return err ··· 1039 985 return nil 1040 986 1041 987 case jmodels.CommitOperationDelete: 1042 - tx, err := ddb.BeginTx(ctx, nil) 988 + tx, err := i.Db.BeginTx(ctx, nil) 1043 989 if err != nil { 1044 990 l.Error("failed to begin transaction", "err", err) 1045 991 return err ··· 1074 1020 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1075 1021 l.Info("ingesting record") 1076 1022 1077 - ddb, ok := i.Db.Execer.(*db.DB) 1078 - if !ok { 1079 - return fmt.Errorf("failed to index pull record, invalid db cast") 1080 - } 1081 - 1082 1023 switch e.Commit.Operation { 1083 1024 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1084 1025 raw := json.RawMessage(e.Commit.Record) ··· 1161 1102 return fmt.Errorf("failed to validate pull: %w", err) 1162 1103 } 1163 1104 1164 - tx, err := ddb.BeginTx(ctx, nil) 1105 + tx, err := i.Db.BeginTx(ctx, nil) 1165 1106 if err != nil { 1166 1107 l.Error("failed to begin transaction", "err", err) 1167 1108 return err ··· 1183 1124 return nil 1184 1125 1185 1126 case jmodels.CommitOperationDelete: 1186 - tx, err := ddb.BeginTx(ctx, nil) 1127 + tx, err := i.Db.BeginTx(ctx, nil) 1187 1128 if err != nil { 1188 1129 l.Error("failed to begin transaction", "err", err) 1189 1130 return err ··· 1218 1159 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1219 1160 l.Info("ingesting record") 1220 1161 1221 - ddb, ok := i.Db.Execer.(*db.DB) 1222 - if !ok { 1223 - return fmt.Errorf("failed to index issue comment record, invalid db cast") 1224 - } 1225 - 1226 1162 switch e.Commit.Operation { 1227 1163 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1228 1164 raw := json.RawMessage(e.Commit.Record) ··· 1241 1177 return fmt.Errorf("failed to validate comment: %w", err) 1242 1178 } 1243 1179 1244 - tx, err := ddb.Begin() 1180 + tx, err := i.Db.Begin() 1245 1181 if err != nil { 1246 1182 return fmt.Errorf("failed to start transaction: %w", err) 1247 1183 } ··· 1256 1192 1257 1193 case jmodels.CommitOperationDelete: 1258 1194 if err := db.DeleteIssueComments( 1259 - ddb, 1195 + i.Db, 1260 1196 orm.FilterEq("did", did), 1261 1197 orm.FilterEq("rkey", rkey), 1262 1198 ); err != nil { ··· 1278 1214 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1279 1215 l.Info("ingesting record") 1280 1216 1281 - ddb, ok := i.Db.Execer.(*db.DB) 1282 - if !ok { 1283 - return fmt.Errorf("failed to index label definition, invalid db cast") 1284 - } 1285 - 1286 1217 switch e.Commit.Operation { 1287 1218 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1288 1219 raw := json.RawMessage(e.Commit.Record) ··· 1301 1232 return fmt.Errorf("failed to validate labeldef: %w", err) 1302 1233 } 1303 1234 1304 - _, err = db.AddLabelDefinition(ddb, def) 1235 + _, err = db.AddLabelDefinition(i.Db, def) 1305 1236 if err != nil { 1306 1237 return fmt.Errorf("failed to create labeldef: %w", err) 1307 1238 } ··· 1310 1241 1311 1242 case jmodels.CommitOperationDelete: 1312 1243 if err := db.DeleteLabelDefinition( 1313 - ddb, 1244 + i.Db, 1314 1245 orm.FilterEq("did", did), 1315 1246 orm.FilterEq("rkey", rkey), 1316 1247 ); err != nil { ··· 1332 1263 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1333 1264 l.Info("ingesting record") 1334 1265 1335 - ddb, ok := i.Db.Execer.(*db.DB) 1336 - if !ok { 1337 - return fmt.Errorf("failed to index label op, invalid db cast") 1338 - } 1339 - 1340 1266 switch e.Commit.Operation { 1341 1267 case jmodels.CommitOperationCreate: 1342 1268 raw := json.RawMessage(e.Commit.Record) ··· 1352 1278 var repo *models.Repo 1353 1279 switch collection { 1354 1280 case tangled.RepoIssueNSID: 1355 - i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject)) 1281 + i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject)) 1356 1282 if err != nil || len(i) != 1 { 1357 1283 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1358 1284 } ··· 1361 1287 return fmt.Errorf("unsupported label subject: %s", collection) 1362 1288 } 1363 1289 1364 - actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels)) 1290 + actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels)) 1365 1291 if err != nil { 1366 1292 return fmt.Errorf("failed to build label application ctx: %w", err) 1367 1293 } ··· 1378 1304 } 1379 1305 } 1380 1306 1381 - tx, err := ddb.Begin() 1307 + tx, err := i.Db.Begin() 1382 1308 if err != nil { 1383 1309 return err 1384 1310 }
+1 -2
appview/state/state.go
··· 118 118 119 119 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 120 120 121 - wrapper := db.DbWrapper{Execer: d} 122 121 jc, err := jetstream.NewJetstreamClient( 123 122 config.Jetstream.Endpoint, 124 123 "appview", ··· 142 141 }, 143 142 nil, 144 143 tlog.SubLogger(logger, "jetstream"), 145 - wrapper, 144 + d, 146 145 false, 147 146 148 147 // in-memory filter is inapplicable to appview so

History

1 round 0 comments
sign up or login to add to the discussion
oyster.cafe submitted #0
1 commit
expand
appview: drop DbWrapper, move jetstream helpers onto *DB
no conflicts, ready to merge
expand 0 comments