Monorepo for Tangled tangled.org
859
fork

Configure Feed

Select the types of activity you want to include in your feed.

appview: drop DbWrapper, move jetstream helpers onto *DB

Lewis: May this revision serve well! <lewis@tangled.org>

Lewis 3103feed 224471fe

+30 -105
+2 -6
appview/db/jetstream.go
··· 1 1 package db 2 2 3 - type DbWrapper struct { 4 - Execer 5 - } 6 - 7 - func (db DbWrapper) SaveLastTimeUs(lastTimeUs int64) error { 3 + func (db *DB) SaveLastTimeUs(lastTimeUs int64) error { 8 4 _, err := db.Exec(` 9 5 insert into _jetstream (id, last_time_us) 10 6 values (1, ?) ··· 13 9 return err 14 10 } 15 11 16 - func (db DbWrapper) GetLastTimeUs() (int64, error) { 12 + func (db *DB) GetLastTimeUs() (int64, error) { 17 13 var lastTimeUs int64 18 14 row := db.QueryRow(`select last_time_us from _jetstream where id = 1;`) 19 15 err := row.Scan(&lastTimeUs)
+27 -97
appview/ingester.go
··· 35 35 ) 36 36 37 37 type Ingester struct { 38 - Db db.DbWrapper 38 + Db *db.DB 39 39 Enforcer *rbac.Enforcer 40 40 IdResolver *idresolver.Resolver 41 41 Cache *cache.Cache ··· 270 270 evidences = append(evidences, uri) 271 271 } 272 272 273 - ddb, ok := i.Db.Execer.(*db.DB) 274 - if !ok { 275 - return fmt.Errorf("failed to ingest vouch record, invalid db cast") 276 - } 277 - 278 - tx, txErr := ddb.Begin() 273 + tx, txErr := i.Db.Begin() 279 274 if txErr != nil { 280 275 return fmt.Errorf("failed to start transaction: %w", txErr) 281 276 } ··· 521 516 PreferredHandle: preferredHandle, 522 517 } 523 518 524 - ddb, ok := i.Db.Execer.(*db.DB) 525 - if !ok { 526 - return fmt.Errorf("failed to index profile record, invalid db cast") 527 - } 528 - 529 - tx, err := ddb.Begin() 519 + tx, err := i.Db.Begin() 530 520 if err != nil { 531 521 return fmt.Errorf("failed to start transaction") 532 522 } ··· 593 583 return err 594 584 } 595 585 596 - ddb, ok := i.Db.Execer.(*db.DB) 597 - if !ok { 598 - return fmt.Errorf("invalid db cast") 599 - } 600 - 601 - err = db.AddSpindleMember(ddb, models.SpindleMember{ 586 + err = db.AddSpindleMember(i.Db, models.SpindleMember{ 602 587 Did: syntax.DID(did), 603 588 Rkey: e.Commit.RKey, 604 589 Instance: record.Instance, ··· 617 602 case jmodels.CommitOperationDelete: 618 603 rkey := e.Commit.RKey 619 604 620 - ddb, ok := i.Db.Execer.(*db.DB) 621 - if !ok { 622 - return fmt.Errorf("failed to index profile record, invalid db cast") 623 - } 624 - 625 605 // get record from db first 626 606 members, err := db.GetSpindleMembers( 627 - ddb, 607 + i.Db, 628 608 orm.FilterEq("did", did), 629 609 orm.FilterEq("rkey", rkey), 630 610 ) ··· 633 613 } 634 614 member := members[0] 635 615 636 - tx, err := ddb.Begin() 616 + tx, err := i.Db.Begin() 637 617 if err != nil { 638 618 return fmt.Errorf("failed to start txn: %w", err) 639 619 } ··· 686 666 687 667 instance := e.Commit.RKey 688 668 689 - ddb, ok := i.Db.Execer.(*db.DB) 690 - if !ok { 691 - return fmt.Errorf("failed to index profile record, invalid db cast") 692 - } 693 - 694 - err := db.AddSpindle(ddb, models.Spindle{ 669 + err := db.AddSpindle(i.Db, models.Spindle{ 695 670 Owner: syntax.DID(did), 696 671 Instance: instance, 697 672 }) ··· 710 685 return err 711 686 } 712 687 713 - _, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did) 688 + _, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did) 714 689 if err != nil { 715 690 return fmt.Errorf("failed to mark verified: %w", err) 716 691 } ··· 720 695 case jmodels.CommitOperationDelete: 721 696 instance := e.Commit.RKey 722 697 723 - ddb, ok := i.Db.Execer.(*db.DB) 724 - if !ok { 725 - return fmt.Errorf("failed to index profile record, invalid db cast") 726 - } 727 - 728 698 // get record from db first 729 699 spindles, err := db.GetSpindles( 730 700 ctx, 731 - ddb, 701 + i.Db, 732 702 orm.FilterEq("owner", did), 733 703 orm.FilterEq("instance", instance), 734 704 ) ··· 737 707 } 738 708 spindle := spindles[0] 739 709 740 - tx, err := ddb.Begin() 710 + tx, err := i.Db.Begin() 741 711 if err != nil { 742 712 return err 743 713 } ··· 794 764 795 765 l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 796 766 l.Info("ingesting record") 797 - 798 - ddb, ok := i.Db.Execer.(*db.DB) 799 - if !ok { 800 - return fmt.Errorf("failed to index string record, invalid db cast") 801 - } 802 767 803 768 switch e.Commit.Operation { 804 769 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: ··· 817 782 return err 818 783 } 819 784 820 - if err = db.AddString(ddb, string); err != nil { 785 + if err = db.AddString(i.Db, string); err != nil { 821 786 l.Error("failed to add string", "err", err) 822 787 return err 823 788 } ··· 826 791 827 792 case jmodels.CommitOperationDelete: 828 793 if err := db.DeleteString( 829 - ddb, 794 + i.Db, 830 795 orm.FilterEq("did", did), 831 796 orm.FilterEq("rkey", rkey), 832 797 ); err != nil { ··· 911 876 912 877 domain := e.Commit.RKey 913 878 914 - ddb, ok := i.Db.Execer.(*db.DB) 915 - if !ok { 916 - return fmt.Errorf("failed to index profile record, invalid db cast") 917 - } 918 - 919 - err := db.AddKnot(ddb, domain, did) 879 + err := db.AddKnot(i.Db, domain, did) 920 880 if err != nil { 921 881 l.Error("failed to add knot to db", "err", err, "domain", domain) 922 882 return err ··· 934 894 return err 935 895 } 936 896 937 - err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did) 897 + err = serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did) 938 898 if err != nil { 939 899 return fmt.Errorf("failed to mark verified: %w", err) 940 900 } ··· 944 904 case jmodels.CommitOperationDelete: 945 905 domain := e.Commit.RKey 946 906 947 - ddb, ok := i.Db.Execer.(*db.DB) 948 - if !ok { 949 - return fmt.Errorf("failed to index knot record, invalid db cast") 950 - } 951 - 952 907 // get record from db first 953 908 registrations, err := db.GetRegistrations( 954 - ddb, 909 + i.Db, 955 910 orm.FilterEq("domain", domain), 956 911 orm.FilterEq("did", did), 957 912 ) ··· 963 918 } 964 919 registration := registrations[0] 965 920 966 - tx, err := ddb.Begin() 921 + tx, err := i.Db.Begin() 967 922 if err != nil { 968 923 return err 969 924 } ··· 1010 965 l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1011 966 l.Info("ingesting record") 1012 967 1013 - ddb, ok := i.Db.Execer.(*db.DB) 1014 - if !ok { 1015 - return fmt.Errorf("failed to index issue record, invalid db cast") 1016 - } 1017 - 1018 968 switch e.Commit.Operation { 1019 969 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1020 970 raw := json.RawMessage(e.Commit.Record) ··· 1044 994 } 1045 995 } 1046 996 1047 - tx, err := ddb.BeginTx(ctx, nil) 997 + tx, err := i.Db.BeginTx(ctx, nil) 1048 998 if err != nil { 1049 999 l.Error("failed to begin transaction", "err", err) 1050 1000 return err ··· 1066 1016 return nil 1067 1017 1068 1018 case jmodels.CommitOperationDelete: 1069 - tx, err := ddb.BeginTx(ctx, nil) 1019 + tx, err := i.Db.BeginTx(ctx, nil) 1070 1020 if err != nil { 1071 1021 l.Error("failed to begin transaction", "err", err) 1072 1022 return err ··· 1101 1051 l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1102 1052 l.Info("ingesting record") 1103 1053 1104 - ddb, ok := i.Db.Execer.(*db.DB) 1105 - if !ok { 1106 - return fmt.Errorf("failed to index pull record, invalid db cast") 1107 - } 1108 - 1109 1054 switch e.Commit.Operation { 1110 1055 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1111 1056 raw := json.RawMessage(e.Commit.Record) ··· 1188 1133 return fmt.Errorf("failed to validate pull: %w", err) 1189 1134 } 1190 1135 1191 - tx, err := ddb.BeginTx(ctx, nil) 1136 + tx, err := i.Db.BeginTx(ctx, nil) 1192 1137 if err != nil { 1193 1138 l.Error("failed to begin transaction", "err", err) 1194 1139 return err ··· 1210 1155 return nil 1211 1156 1212 1157 case jmodels.CommitOperationDelete: 1213 - tx, err := ddb.BeginTx(ctx, nil) 1158 + tx, err := i.Db.BeginTx(ctx, nil) 1214 1159 if err != nil { 1215 1160 l.Error("failed to begin transaction", "err", err) 1216 1161 return err ··· 1245 1190 l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1246 1191 l.Info("ingesting record") 1247 1192 1248 - ddb, ok := i.Db.Execer.(*db.DB) 1249 - if !ok { 1250 - return fmt.Errorf("failed to index issue comment record, invalid db cast") 1251 - } 1252 - 1253 1193 switch e.Commit.Operation { 1254 1194 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: 1255 1195 raw := json.RawMessage(e.Commit.Record) ··· 1268 1208 return fmt.Errorf("failed to validate comment: %w", err) 1269 1209 } 1270 1210 1271 - tx, err := ddb.Begin() 1211 + tx, err := i.Db.Begin() 1272 1212 if err != nil { 1273 1213 return fmt.Errorf("failed to start transaction: %w", err) 1274 1214 } ··· 1283 1223 1284 1224 case jmodels.CommitOperationDelete: 1285 1225 if err := db.DeleteIssueComments( 1286 - ddb, 1226 + i.Db, 1287 1227 orm.FilterEq("did", did), 1288 1228 orm.FilterEq("rkey", rkey), 1289 1229 ); err != nil { ··· 1304 1244 1305 1245 l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1306 1246 l.Info("ingesting record") 1307 - 1308 - ddb, ok := i.Db.Execer.(*db.DB) 1309 - if !ok { 1310 - return fmt.Errorf("failed to index label definition, invalid db cast") 1311 - } 1312 1247 1313 1248 switch e.Commit.Operation { 1314 1249 case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate: ··· 1328 1263 return fmt.Errorf("failed to validate labeldef: %w", err) 1329 1264 } 1330 1265 1331 - _, err = db.AddLabelDefinition(ddb, def) 1266 + _, err = db.AddLabelDefinition(i.Db, def) 1332 1267 if err != nil { 1333 1268 return fmt.Errorf("failed to create labeldef: %w", err) 1334 1269 } ··· 1337 1272 1338 1273 case jmodels.CommitOperationDelete: 1339 1274 if err := db.DeleteLabelDefinition( 1340 - ddb, 1275 + i.Db, 1341 1276 orm.FilterEq("did", did), 1342 1277 orm.FilterEq("rkey", rkey), 1343 1278 ); err != nil { ··· 1359 1294 l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey) 1360 1295 l.Info("ingesting record") 1361 1296 1362 - ddb, ok := i.Db.Execer.(*db.DB) 1363 - if !ok { 1364 - return fmt.Errorf("failed to index label op, invalid db cast") 1365 - } 1366 - 1367 1297 switch e.Commit.Operation { 1368 1298 case jmodels.CommitOperationCreate: 1369 1299 raw := json.RawMessage(e.Commit.Record) ··· 1379 1309 var repo *models.Repo 1380 1310 switch collection { 1381 1311 case tangled.RepoIssueNSID: 1382 - i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject)) 1312 + i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject)) 1383 1313 if err != nil || len(i) != 1 { 1384 1314 return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i)) 1385 1315 } ··· 1388 1318 return fmt.Errorf("unsupported label subject: %s", collection) 1389 1319 } 1390 1320 1391 - actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels)) 1321 + actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels)) 1392 1322 if err != nil { 1393 1323 return fmt.Errorf("failed to build label application ctx: %w", err) 1394 1324 } ··· 1405 1335 } 1406 1336 } 1407 1337 1408 - tx, err := ddb.Begin() 1338 + tx, err := i.Db.Begin() 1409 1339 if err != nil { 1410 1340 return err 1411 1341 }
+1 -2
appview/state/state.go
··· 118 118 119 119 mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver")) 120 120 121 - wrapper := db.DbWrapper{Execer: d} 122 121 jc, err := jetstream.NewJetstreamClient( 123 122 config.Jetstream.Endpoint, 124 123 "appview", ··· 142 141 }, 143 142 nil, 144 143 tlog.SubLogger(logger, "jetstream"), 145 - wrapper, 144 + d, 146 145 false, 147 146 148 147 // in-memory filter is inapplicable to appview so