Lewis: May this revision serve well! lewis@tangled.org
+30
-105
Diff
round #8
+2
-6
appview/db/jetstream.go
+2
-6
appview/db/jetstream.go
···
1
1
package db
2
2
3
-
type DbWrapper struct {
4
-
Execer
5
-
}
6
-
7
-
func (db DbWrapper) SaveLastTimeUs(lastTimeUs int64) error {
3
+
func (db *DB) SaveLastTimeUs(lastTimeUs int64) error {
8
4
_, err := db.Exec(`
9
5
insert into _jetstream (id, last_time_us)
10
6
values (1, ?)
···
13
9
return err
14
10
}
15
11
16
-
func (db DbWrapper) GetLastTimeUs() (int64, error) {
12
+
func (db *DB) GetLastTimeUs() (int64, error) {
17
13
var lastTimeUs int64
18
14
row := db.QueryRow(`select last_time_us from _jetstream where id = 1;`)
19
15
err := row.Scan(&lastTimeUs)
+27
-97
appview/ingester.go
+27
-97
appview/ingester.go
···
35
35
)
36
36
37
37
type Ingester struct {
38
-
Db db.DbWrapper
38
+
Db *db.DB
39
39
Enforcer *rbac.Enforcer
40
40
IdResolver *idresolver.Resolver
41
41
Cache *cache.Cache
···
270
270
evidences = append(evidences, uri)
271
271
}
272
272
273
-
ddb, ok := i.Db.Execer.(*db.DB)
274
-
if !ok {
275
-
return fmt.Errorf("failed to ingest vouch record, invalid db cast")
276
-
}
277
-
278
-
tx, txErr := ddb.Begin()
273
+
tx, txErr := i.Db.Begin()
279
274
if txErr != nil {
280
275
return fmt.Errorf("failed to start transaction: %w", txErr)
281
276
}
···
521
516
PreferredHandle: preferredHandle,
522
517
}
523
518
524
-
ddb, ok := i.Db.Execer.(*db.DB)
525
-
if !ok {
526
-
return fmt.Errorf("failed to index profile record, invalid db cast")
527
-
}
528
-
529
-
tx, err := ddb.Begin()
519
+
tx, err := i.Db.Begin()
530
520
if err != nil {
531
521
return fmt.Errorf("failed to start transaction")
532
522
}
···
593
583
return err
594
584
}
595
585
596
-
ddb, ok := i.Db.Execer.(*db.DB)
597
-
if !ok {
598
-
return fmt.Errorf("invalid db cast")
599
-
}
600
-
601
-
err = db.AddSpindleMember(ddb, models.SpindleMember{
586
+
err = db.AddSpindleMember(i.Db, models.SpindleMember{
602
587
Did: syntax.DID(did),
603
588
Rkey: e.Commit.RKey,
604
589
Instance: record.Instance,
···
617
602
case jmodels.CommitOperationDelete:
618
603
rkey := e.Commit.RKey
619
604
620
-
ddb, ok := i.Db.Execer.(*db.DB)
621
-
if !ok {
622
-
return fmt.Errorf("failed to index profile record, invalid db cast")
623
-
}
624
-
625
605
// get record from db first
626
606
members, err := db.GetSpindleMembers(
627
-
ddb,
607
+
i.Db,
628
608
orm.FilterEq("did", did),
629
609
orm.FilterEq("rkey", rkey),
630
610
)
···
633
613
}
634
614
member := members[0]
635
615
636
-
tx, err := ddb.Begin()
616
+
tx, err := i.Db.Begin()
637
617
if err != nil {
638
618
return fmt.Errorf("failed to start txn: %w", err)
639
619
}
···
686
666
687
667
instance := e.Commit.RKey
688
668
689
-
ddb, ok := i.Db.Execer.(*db.DB)
690
-
if !ok {
691
-
return fmt.Errorf("failed to index profile record, invalid db cast")
692
-
}
693
-
694
-
err := db.AddSpindle(ddb, models.Spindle{
669
+
err := db.AddSpindle(i.Db, models.Spindle{
695
670
Owner: syntax.DID(did),
696
671
Instance: instance,
697
672
})
···
710
685
return err
711
686
}
712
687
713
-
_, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
688
+
_, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did)
714
689
if err != nil {
715
690
return fmt.Errorf("failed to mark verified: %w", err)
716
691
}
···
720
695
case jmodels.CommitOperationDelete:
721
696
instance := e.Commit.RKey
722
697
723
-
ddb, ok := i.Db.Execer.(*db.DB)
724
-
if !ok {
725
-
return fmt.Errorf("failed to index profile record, invalid db cast")
726
-
}
727
-
728
698
// get record from db first
729
699
spindles, err := db.GetSpindles(
730
700
ctx,
731
-
ddb,
701
+
i.Db,
732
702
orm.FilterEq("owner", did),
733
703
orm.FilterEq("instance", instance),
734
704
)
···
737
707
}
738
708
spindle := spindles[0]
739
709
740
-
tx, err := ddb.Begin()
710
+
tx, err := i.Db.Begin()
741
711
if err != nil {
742
712
return err
743
713
}
···
795
765
l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
796
766
l.Info("ingesting record")
797
767
798
-
ddb, ok := i.Db.Execer.(*db.DB)
799
-
if !ok {
800
-
return fmt.Errorf("failed to index string record, invalid db cast")
801
-
}
802
-
803
768
switch e.Commit.Operation {
804
769
case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
805
770
raw := json.RawMessage(e.Commit.Record)
···
817
782
return err
818
783
}
819
784
820
-
if err = db.AddString(ddb, string); err != nil {
785
+
if err = db.AddString(i.Db, string); err != nil {
821
786
l.Error("failed to add string", "err", err)
822
787
return err
823
788
}
···
826
791
827
792
case jmodels.CommitOperationDelete:
828
793
if err := db.DeleteString(
829
-
ddb,
794
+
i.Db,
830
795
orm.FilterEq("did", did),
831
796
orm.FilterEq("rkey", rkey),
832
797
); err != nil {
···
911
876
912
877
domain := e.Commit.RKey
913
878
914
-
ddb, ok := i.Db.Execer.(*db.DB)
915
-
if !ok {
916
-
return fmt.Errorf("failed to index profile record, invalid db cast")
917
-
}
918
-
919
-
err := db.AddKnot(ddb, domain, did)
879
+
err := db.AddKnot(i.Db, domain, did)
920
880
if err != nil {
921
881
l.Error("failed to add knot to db", "err", err, "domain", domain)
922
882
return err
···
934
894
return err
935
895
}
936
896
937
-
err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
897
+
err = serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did)
938
898
if err != nil {
939
899
return fmt.Errorf("failed to mark verified: %w", err)
940
900
}
···
944
904
case jmodels.CommitOperationDelete:
945
905
domain := e.Commit.RKey
946
906
947
-
ddb, ok := i.Db.Execer.(*db.DB)
948
-
if !ok {
949
-
return fmt.Errorf("failed to index knot record, invalid db cast")
950
-
}
951
-
952
907
// get record from db first
953
908
registrations, err := db.GetRegistrations(
954
-
ddb,
909
+
i.Db,
955
910
orm.FilterEq("domain", domain),
956
911
orm.FilterEq("did", did),
957
912
)
···
963
918
}
964
919
registration := registrations[0]
965
920
966
-
tx, err := ddb.Begin()
921
+
tx, err := i.Db.Begin()
967
922
if err != nil {
968
923
return err
969
924
}
···
1010
965
l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1011
966
l.Info("ingesting record")
1012
967
1013
-
ddb, ok := i.Db.Execer.(*db.DB)
1014
-
if !ok {
1015
-
return fmt.Errorf("failed to index issue record, invalid db cast")
1016
-
}
1017
-
1018
968
switch e.Commit.Operation {
1019
969
case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1020
970
raw := json.RawMessage(e.Commit.Record)
···
1044
994
}
1045
995
}
1046
996
1047
-
tx, err := ddb.BeginTx(ctx, nil)
997
+
tx, err := i.Db.BeginTx(ctx, nil)
1048
998
if err != nil {
1049
999
l.Error("failed to begin transaction", "err", err)
1050
1000
return err
···
1066
1016
return nil
1067
1017
1068
1018
case jmodels.CommitOperationDelete:
1069
-
tx, err := ddb.BeginTx(ctx, nil)
1019
+
tx, err := i.Db.BeginTx(ctx, nil)
1070
1020
if err != nil {
1071
1021
l.Error("failed to begin transaction", "err", err)
1072
1022
return err
···
1101
1051
l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1102
1052
l.Info("ingesting record")
1103
1053
1104
-
ddb, ok := i.Db.Execer.(*db.DB)
1105
-
if !ok {
1106
-
return fmt.Errorf("failed to index pull record, invalid db cast")
1107
-
}
1108
-
1109
1054
switch e.Commit.Operation {
1110
1055
case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1111
1056
raw := json.RawMessage(e.Commit.Record)
···
1188
1133
return fmt.Errorf("failed to validate pull: %w", err)
1189
1134
}
1190
1135
1191
-
tx, err := ddb.BeginTx(ctx, nil)
1136
+
tx, err := i.Db.BeginTx(ctx, nil)
1192
1137
if err != nil {
1193
1138
l.Error("failed to begin transaction", "err", err)
1194
1139
return err
···
1210
1155
return nil
1211
1156
1212
1157
case jmodels.CommitOperationDelete:
1213
-
tx, err := ddb.BeginTx(ctx, nil)
1158
+
tx, err := i.Db.BeginTx(ctx, nil)
1214
1159
if err != nil {
1215
1160
l.Error("failed to begin transaction", "err", err)
1216
1161
return err
···
1245
1190
l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1246
1191
l.Info("ingesting record")
1247
1192
1248
-
ddb, ok := i.Db.Execer.(*db.DB)
1249
-
if !ok {
1250
-
return fmt.Errorf("failed to index issue comment record, invalid db cast")
1251
-
}
1252
-
1253
1193
switch e.Commit.Operation {
1254
1194
case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1255
1195
raw := json.RawMessage(e.Commit.Record)
···
1268
1208
return fmt.Errorf("failed to validate comment: %w", err)
1269
1209
}
1270
1210
1271
-
tx, err := ddb.Begin()
1211
+
tx, err := i.Db.Begin()
1272
1212
if err != nil {
1273
1213
return fmt.Errorf("failed to start transaction: %w", err)
1274
1214
}
···
1283
1223
1284
1224
case jmodels.CommitOperationDelete:
1285
1225
if err := db.DeleteIssueComments(
1286
-
ddb,
1226
+
i.Db,
1287
1227
orm.FilterEq("did", did),
1288
1228
orm.FilterEq("rkey", rkey),
1289
1229
); err != nil {
···
1305
1245
l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1306
1246
l.Info("ingesting record")
1307
1247
1308
-
ddb, ok := i.Db.Execer.(*db.DB)
1309
-
if !ok {
1310
-
return fmt.Errorf("failed to index label definition, invalid db cast")
1311
-
}
1312
-
1313
1248
switch e.Commit.Operation {
1314
1249
case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1315
1250
raw := json.RawMessage(e.Commit.Record)
···
1328
1263
return fmt.Errorf("failed to validate labeldef: %w", err)
1329
1264
}
1330
1265
1331
-
_, err = db.AddLabelDefinition(ddb, def)
1266
+
_, err = db.AddLabelDefinition(i.Db, def)
1332
1267
if err != nil {
1333
1268
return fmt.Errorf("failed to create labeldef: %w", err)
1334
1269
}
···
1337
1272
1338
1273
case jmodels.CommitOperationDelete:
1339
1274
if err := db.DeleteLabelDefinition(
1340
-
ddb,
1275
+
i.Db,
1341
1276
orm.FilterEq("did", did),
1342
1277
orm.FilterEq("rkey", rkey),
1343
1278
); err != nil {
···
1359
1294
l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1360
1295
l.Info("ingesting record")
1361
1296
1362
-
ddb, ok := i.Db.Execer.(*db.DB)
1363
-
if !ok {
1364
-
return fmt.Errorf("failed to index label op, invalid db cast")
1365
-
}
1366
-
1367
1297
switch e.Commit.Operation {
1368
1298
case jmodels.CommitOperationCreate:
1369
1299
raw := json.RawMessage(e.Commit.Record)
···
1379
1309
var repo *models.Repo
1380
1310
switch collection {
1381
1311
case tangled.RepoIssueNSID:
1382
-
i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject))
1312
+
i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject))
1383
1313
if err != nil || len(i) != 1 {
1384
1314
return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1385
1315
}
···
1388
1318
return fmt.Errorf("unsupported label subject: %s", collection)
1389
1319
}
1390
1320
1391
-
actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels))
1321
+
actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels))
1392
1322
if err != nil {
1393
1323
return fmt.Errorf("failed to build label application ctx: %w", err)
1394
1324
}
···
1405
1335
}
1406
1336
}
1407
1337
1408
-
tx, err := ddb.Begin()
1338
+
tx, err := i.Db.Begin()
1409
1339
if err != nil {
1410
1340
return err
1411
1341
}
+1
-2
appview/state/state.go
+1
-2
appview/state/state.go
···
118
118
119
119
mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
120
120
121
-
wrapper := db.DbWrapper{Execer: d}
122
121
jc, err := jetstream.NewJetstreamClient(
123
122
config.Jetstream.Endpoint,
124
123
"appview",
···
142
141
},
143
142
nil,
144
143
tlog.SubLogger(logger, "jetstream"),
145
-
wrapper,
144
+
d,
146
145
false,
147
146
148
147
// in-memory filter is inapplicable to appview so
History
9 rounds
0 comments
oyster.cafe
submitted
#8
1 commit
expand
collapse
appview: drop DbWrapper, move jetstream helpers onto *DB
Lewis: May this revision serve well! <lewis@tangled.org>
merge conflicts detected
expand
collapse
expand
collapse
- api/tangled/cbor_gen.go:866
- api/tangled/feedstar.go:5
- api/tangled/gitrefUpdate.go:29
- api/tangled/repocollaborator.go:19
- api/tangled/repoissue.go:22
- api/tangled/repopull.go:39
- api/tangled/tangledrepo.go:24
- cmd/cborgen/cborgen.go:17
- knotserver/xrpc/merge.go:118
- lexicons/feed/star.json:10
- lexicons/git/refUpdate.json:11
- lexicons/issue/issue.json:9
- lexicons/pulls/pull.json:65
- lexicons/repo/collaborator.json:11
- lexicons/repo/repo.json:6
expand 0 comments
oyster.cafe
submitted
#7
1 commit
expand
collapse
appview: drop DbWrapper, move jetstream helpers onto *DB
Lewis: May this revision serve well! <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#6
1 commit
expand
collapse
appview: drop DbWrapper, move jetstream helpers onto *DB
Lewis: May this revision serve well! <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#5
1 commit
expand
collapse
appview: drop DbWrapper, move jetstream helpers onto *DB
Lewis: May this revision serve well! <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#4
1 commit
expand
collapse
appview: drop DbWrapper, move jetstream helpers onto *DB
Lewis: May this revision serve well! <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#3
1 commit
expand
collapse
appview: drop DbWrapper, move jetstream helpers onto *DB
Lewis: May this revision serve well! <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#2
1 commit
expand
collapse
appview: drop DbWrapper, move jetstream helpers onto *DB
Lewis: May this revision serve well! <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#1
1 commit
expand
collapse
appview: drop DbWrapper, move jetstream helpers onto *DB
Lewis: May this revision serve well! <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#0
1 commit
expand
collapse
appview: drop DbWrapper, move jetstream helpers onto *DB
Lewis: May this revision serve well! <lewis@tangled.org>