Lewis: May this revision serve well! lewis@tangled.org
+29
-108
Diff
round #0
+2
-6
appview/db/jetstream.go
+2
-6
appview/db/jetstream.go
···
1
1
package db
2
2
3
-
type DbWrapper struct {
4
-
Execer
5
-
}
6
-
7
-
func (db DbWrapper) SaveLastTimeUs(lastTimeUs int64) error {
3
+
func (db *DB) SaveLastTimeUs(lastTimeUs int64) error {
8
4
_, err := db.Exec(`
9
5
insert into _jetstream (id, last_time_us)
10
6
values (1, ?)
···
13
9
return err
14
10
}
15
11
16
-
func (db DbWrapper) GetLastTimeUs() (int64, error) {
12
+
func (db *DB) GetLastTimeUs() (int64, error) {
17
13
var lastTimeUs int64
18
14
row := db.QueryRow(`select last_time_us from _jetstream where id = 1;`)
19
15
err := row.Scan(&lastTimeUs)
+26
-100
appview/ingester.go
+26
-100
appview/ingester.go
···
35
35
)
36
36
37
37
type Ingester struct {
38
-
Db db.DbWrapper
38
+
Db *db.DB
39
39
Enforcer *rbac.Enforcer
40
40
IdResolver *idresolver.Resolver
41
41
Cache *cache.Cache
···
494
494
PreferredHandle: preferredHandle,
495
495
}
496
496
497
-
ddb, ok := i.Db.Execer.(*db.DB)
498
-
if !ok {
499
-
return fmt.Errorf("failed to index profile record, invalid db cast")
500
-
}
501
-
502
-
tx, err := ddb.Begin()
497
+
tx, err := i.Db.Begin()
503
498
if err != nil {
504
499
return fmt.Errorf("failed to start transaction")
505
500
}
···
566
561
return err
567
562
}
568
563
569
-
ddb, ok := i.Db.Execer.(*db.DB)
570
-
if !ok {
571
-
return fmt.Errorf("invalid db cast")
572
-
}
573
-
574
-
err = db.AddSpindleMember(ddb, models.SpindleMember{
564
+
err = db.AddSpindleMember(i.Db, models.SpindleMember{
575
565
Did: syntax.DID(did),
576
566
Rkey: e.Commit.RKey,
577
567
Instance: record.Instance,
···
590
580
case jmodels.CommitOperationDelete:
591
581
rkey := e.Commit.RKey
592
582
593
-
ddb, ok := i.Db.Execer.(*db.DB)
594
-
if !ok {
595
-
return fmt.Errorf("failed to index profile record, invalid db cast")
596
-
}
597
-
598
583
// get record from db first
599
584
members, err := db.GetSpindleMembers(
600
-
ddb,
585
+
i.Db,
601
586
orm.FilterEq("did", did),
602
587
orm.FilterEq("rkey", rkey),
603
588
)
···
606
591
}
607
592
member := members[0]
608
593
609
-
tx, err := ddb.Begin()
594
+
tx, err := i.Db.Begin()
610
595
if err != nil {
611
596
return fmt.Errorf("failed to start txn: %w", err)
612
597
}
···
659
644
660
645
instance := e.Commit.RKey
661
646
662
-
ddb, ok := i.Db.Execer.(*db.DB)
663
-
if !ok {
664
-
return fmt.Errorf("failed to index profile record, invalid db cast")
665
-
}
666
-
667
-
err := db.AddSpindle(ddb, models.Spindle{
647
+
err := db.AddSpindle(i.Db, models.Spindle{
668
648
Owner: syntax.DID(did),
669
649
Instance: instance,
670
650
})
···
683
663
return err
684
664
}
685
665
686
-
_, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
666
+
_, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did)
687
667
if err != nil {
688
668
return fmt.Errorf("failed to mark verified: %w", err)
689
669
}
···
693
673
case jmodels.CommitOperationDelete:
694
674
instance := e.Commit.RKey
695
675
696
-
ddb, ok := i.Db.Execer.(*db.DB)
697
-
if !ok {
698
-
return fmt.Errorf("failed to index profile record, invalid db cast")
699
-
}
700
-
701
676
// get record from db first
702
677
spindles, err := db.GetSpindles(
703
678
ctx,
704
-
ddb,
679
+
i.Db,
705
680
orm.FilterEq("owner", did),
706
681
orm.FilterEq("instance", instance),
707
682
)
···
710
685
}
711
686
spindle := spindles[0]
712
687
713
-
tx, err := ddb.Begin()
688
+
tx, err := i.Db.Begin()
714
689
if err != nil {
715
690
return err
716
691
}
···
768
743
l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
769
744
l.Info("ingesting record")
770
745
771
-
ddb, ok := i.Db.Execer.(*db.DB)
772
-
if !ok {
773
-
return fmt.Errorf("failed to index string record, invalid db cast")
774
-
}
775
-
776
746
switch e.Commit.Operation {
777
747
case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
778
748
raw := json.RawMessage(e.Commit.Record)
···
790
760
return err
791
761
}
792
762
793
-
if err = db.AddString(ddb, string); err != nil {
763
+
if err = db.AddString(i.Db, string); err != nil {
794
764
l.Error("failed to add string", "err", err)
795
765
return err
796
766
}
···
799
769
800
770
case jmodels.CommitOperationDelete:
801
771
if err := db.DeleteString(
802
-
ddb,
772
+
i.Db,
803
773
orm.FilterEq("did", did),
804
774
orm.FilterEq("rkey", rkey),
805
775
); err != nil {
···
884
854
885
855
domain := e.Commit.RKey
886
856
887
-
ddb, ok := i.Db.Execer.(*db.DB)
888
-
if !ok {
889
-
return fmt.Errorf("failed to index profile record, invalid db cast")
890
-
}
891
-
892
-
err := db.AddKnot(ddb, domain, did)
857
+
err := db.AddKnot(i.Db, domain, did)
893
858
if err != nil {
894
859
l.Error("failed to add knot to db", "err", err, "domain", domain)
895
860
return err
···
907
872
return err
908
873
}
909
874
910
-
err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
875
+
err = serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did)
911
876
if err != nil {
912
877
return fmt.Errorf("failed to mark verified: %w", err)
913
878
}
···
917
882
case jmodels.CommitOperationDelete:
918
883
domain := e.Commit.RKey
919
884
920
-
ddb, ok := i.Db.Execer.(*db.DB)
921
-
if !ok {
922
-
return fmt.Errorf("failed to index knot record, invalid db cast")
923
-
}
924
-
925
885
// get record from db first
926
886
registrations, err := db.GetRegistrations(
927
-
ddb,
887
+
i.Db,
928
888
orm.FilterEq("domain", domain),
929
889
orm.FilterEq("did", did),
930
890
)
···
936
896
}
937
897
registration := registrations[0]
938
898
939
-
tx, err := ddb.Begin()
899
+
tx, err := i.Db.Begin()
940
900
if err != nil {
941
901
return err
942
902
}
···
983
943
l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
984
944
l.Info("ingesting record")
985
945
986
-
ddb, ok := i.Db.Execer.(*db.DB)
987
-
if !ok {
988
-
return fmt.Errorf("failed to index issue record, invalid db cast")
989
-
}
990
-
991
946
switch e.Commit.Operation {
992
947
case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
993
948
raw := json.RawMessage(e.Commit.Record)
···
1008
963
return fmt.Errorf("failed to validate issue: %w", err)
1009
964
}
1010
965
1011
-
if record.Repo != nil {
1012
-
repo, repoErr := db.GetRepoByAtUri(i.Db, *record.Repo)
1013
-
if repoErr == nil && repo.RepoDid != "" {
1014
-
if enqErr := db.EnqueuePdsRewrite(i.Db, did, repo.RepoDid, tangled.RepoIssueNSID, rkey, *record.Repo); enqErr != nil {
1015
-
l.Warn("failed to enqueue PDS rewrite for issue", "err", enqErr, "did", did, "repoDid", repo.RepoDid)
1016
-
}
1017
-
}
1018
-
}
1019
-
1020
-
tx, err := ddb.BeginTx(ctx, nil)
966
+
tx, err := i.Db.BeginTx(ctx, nil)
1021
967
if err != nil {
1022
968
l.Error("failed to begin transaction", "err", err)
1023
969
return err
···
1039
985
return nil
1040
986
1041
987
case jmodels.CommitOperationDelete:
1042
-
tx, err := ddb.BeginTx(ctx, nil)
988
+
tx, err := i.Db.BeginTx(ctx, nil)
1043
989
if err != nil {
1044
990
l.Error("failed to begin transaction", "err", err)
1045
991
return err
···
1074
1020
l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1075
1021
l.Info("ingesting record")
1076
1022
1077
-
ddb, ok := i.Db.Execer.(*db.DB)
1078
-
if !ok {
1079
-
return fmt.Errorf("failed to index pull record, invalid db cast")
1080
-
}
1081
-
1082
1023
switch e.Commit.Operation {
1083
1024
case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1084
1025
raw := json.RawMessage(e.Commit.Record)
···
1161
1102
return fmt.Errorf("failed to validate pull: %w", err)
1162
1103
}
1163
1104
1164
-
tx, err := ddb.BeginTx(ctx, nil)
1105
+
tx, err := i.Db.BeginTx(ctx, nil)
1165
1106
if err != nil {
1166
1107
l.Error("failed to begin transaction", "err", err)
1167
1108
return err
···
1183
1124
return nil
1184
1125
1185
1126
case jmodels.CommitOperationDelete:
1186
-
tx, err := ddb.BeginTx(ctx, nil)
1127
+
tx, err := i.Db.BeginTx(ctx, nil)
1187
1128
if err != nil {
1188
1129
l.Error("failed to begin transaction", "err", err)
1189
1130
return err
···
1218
1159
l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1219
1160
l.Info("ingesting record")
1220
1161
1221
-
ddb, ok := i.Db.Execer.(*db.DB)
1222
-
if !ok {
1223
-
return fmt.Errorf("failed to index issue comment record, invalid db cast")
1224
-
}
1225
-
1226
1162
switch e.Commit.Operation {
1227
1163
case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1228
1164
raw := json.RawMessage(e.Commit.Record)
···
1241
1177
return fmt.Errorf("failed to validate comment: %w", err)
1242
1178
}
1243
1179
1244
-
tx, err := ddb.Begin()
1180
+
tx, err := i.Db.Begin()
1245
1181
if err != nil {
1246
1182
return fmt.Errorf("failed to start transaction: %w", err)
1247
1183
}
···
1256
1192
1257
1193
case jmodels.CommitOperationDelete:
1258
1194
if err := db.DeleteIssueComments(
1259
-
ddb,
1195
+
i.Db,
1260
1196
orm.FilterEq("did", did),
1261
1197
orm.FilterEq("rkey", rkey),
1262
1198
); err != nil {
···
1278
1214
l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1279
1215
l.Info("ingesting record")
1280
1216
1281
-
ddb, ok := i.Db.Execer.(*db.DB)
1282
-
if !ok {
1283
-
return fmt.Errorf("failed to index label definition, invalid db cast")
1284
-
}
1285
-
1286
1217
switch e.Commit.Operation {
1287
1218
case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1288
1219
raw := json.RawMessage(e.Commit.Record)
···
1301
1232
return fmt.Errorf("failed to validate labeldef: %w", err)
1302
1233
}
1303
1234
1304
-
_, err = db.AddLabelDefinition(ddb, def)
1235
+
_, err = db.AddLabelDefinition(i.Db, def)
1305
1236
if err != nil {
1306
1237
return fmt.Errorf("failed to create labeldef: %w", err)
1307
1238
}
···
1310
1241
1311
1242
case jmodels.CommitOperationDelete:
1312
1243
if err := db.DeleteLabelDefinition(
1313
-
ddb,
1244
+
i.Db,
1314
1245
orm.FilterEq("did", did),
1315
1246
orm.FilterEq("rkey", rkey),
1316
1247
); err != nil {
···
1332
1263
l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1333
1264
l.Info("ingesting record")
1334
1265
1335
-
ddb, ok := i.Db.Execer.(*db.DB)
1336
-
if !ok {
1337
-
return fmt.Errorf("failed to index label op, invalid db cast")
1338
-
}
1339
-
1340
1266
switch e.Commit.Operation {
1341
1267
case jmodels.CommitOperationCreate:
1342
1268
raw := json.RawMessage(e.Commit.Record)
···
1352
1278
var repo *models.Repo
1353
1279
switch collection {
1354
1280
case tangled.RepoIssueNSID:
1355
-
i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject))
1281
+
i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject))
1356
1282
if err != nil || len(i) != 1 {
1357
1283
return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1358
1284
}
···
1361
1287
return fmt.Errorf("unsupported label subject: %s", collection)
1362
1288
}
1363
1289
1364
-
actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels))
1290
+
actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels))
1365
1291
if err != nil {
1366
1292
return fmt.Errorf("failed to build label application ctx: %w", err)
1367
1293
}
···
1378
1304
}
1379
1305
}
1380
1306
1381
-
tx, err := ddb.Begin()
1307
+
tx, err := i.Db.Begin()
1382
1308
if err != nil {
1383
1309
return err
1384
1310
}
+1
-2
appview/state/state.go
+1
-2
appview/state/state.go
···
118
118
119
119
mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
120
120
121
-
wrapper := db.DbWrapper{Execer: d}
122
121
jc, err := jetstream.NewJetstreamClient(
123
122
config.Jetstream.Endpoint,
124
123
"appview",
···
142
141
},
143
142
nil,
144
143
tlog.SubLogger(logger, "jetstream"),
145
-
wrapper,
144
+
d,
146
145
false,
147
146
148
147
// in-memory filter is inapplicable to appview so
History
1 round
0 comments
oyster.cafe
submitted
#0
1 commit
expand
collapse
appview: drop DbWrapper, move jetstream helpers onto *DB
Lewis: May this revision serve well! <lewis@tangled.org>
no conflicts, ready to merge