Lewis: May this revision serve well! lewis@tangled.org
+29
-99
Diff
round #5
+2
-6
appview/db/jetstream.go
+2
-6
appview/db/jetstream.go
···
1
1
package db
2
2
3
-
type DbWrapper struct {
4
-
Execer
5
-
}
6
-
7
-
func (db DbWrapper) SaveLastTimeUs(lastTimeUs int64) error {
3
+
func (db *DB) SaveLastTimeUs(lastTimeUs int64) error {
8
4
_, err := db.Exec(`
9
5
insert into _jetstream (id, last_time_us)
10
6
values (1, ?)
···
13
9
return err
14
10
}
15
11
16
-
func (db DbWrapper) GetLastTimeUs() (int64, error) {
12
+
func (db *DB) GetLastTimeUs() (int64, error) {
17
13
var lastTimeUs int64
18
14
row := db.QueryRow(`select last_time_us from _jetstream where id = 1;`)
19
15
err := row.Scan(&lastTimeUs)
+26
-91
appview/ingester.go
+26
-91
appview/ingester.go
···
35
35
)
36
36
37
37
type Ingester struct {
38
-
Db db.DbWrapper
38
+
Db *db.DB
39
39
Enforcer *rbac.Enforcer
40
40
IdResolver *idresolver.Resolver
41
41
Cache *cache.Cache
···
494
494
PreferredHandle: preferredHandle,
495
495
}
496
496
497
-
ddb, ok := i.Db.Execer.(*db.DB)
498
-
if !ok {
499
-
return fmt.Errorf("failed to index profile record, invalid db cast")
500
-
}
501
-
502
-
tx, err := ddb.Begin()
497
+
tx, err := i.Db.Begin()
503
498
if err != nil {
504
499
return fmt.Errorf("failed to start transaction")
505
500
}
···
566
561
return err
567
562
}
568
563
569
-
ddb, ok := i.Db.Execer.(*db.DB)
570
-
if !ok {
571
-
return fmt.Errorf("invalid db cast")
572
-
}
573
-
574
-
err = db.AddSpindleMember(ddb, models.SpindleMember{
564
+
err = db.AddSpindleMember(i.Db, models.SpindleMember{
575
565
Did: syntax.DID(did),
576
566
Rkey: e.Commit.RKey,
577
567
Instance: record.Instance,
···
590
580
case jmodels.CommitOperationDelete:
591
581
rkey := e.Commit.RKey
592
582
593
-
ddb, ok := i.Db.Execer.(*db.DB)
594
-
if !ok {
595
-
return fmt.Errorf("failed to index profile record, invalid db cast")
596
-
}
597
-
598
583
// get record from db first
599
584
members, err := db.GetSpindleMembers(
600
-
ddb,
585
+
i.Db,
601
586
orm.FilterEq("did", did),
602
587
orm.FilterEq("rkey", rkey),
603
588
)
···
606
591
}
607
592
member := members[0]
608
593
609
-
tx, err := ddb.Begin()
594
+
tx, err := i.Db.Begin()
610
595
if err != nil {
611
596
return fmt.Errorf("failed to start txn: %w", err)
612
597
}
···
659
644
660
645
instance := e.Commit.RKey
661
646
662
-
ddb, ok := i.Db.Execer.(*db.DB)
663
-
if !ok {
664
-
return fmt.Errorf("failed to index profile record, invalid db cast")
665
-
}
666
-
667
-
err := db.AddSpindle(ddb, models.Spindle{
647
+
err := db.AddSpindle(i.Db, models.Spindle{
668
648
Owner: syntax.DID(did),
669
649
Instance: instance,
670
650
})
···
683
663
return err
684
664
}
685
665
686
-
_, err = serververify.MarkSpindleVerified(ddb, i.Enforcer, instance, did)
666
+
_, err = serververify.MarkSpindleVerified(i.Db, i.Enforcer, instance, did)
687
667
if err != nil {
688
668
return fmt.Errorf("failed to mark verified: %w", err)
689
669
}
···
693
673
case jmodels.CommitOperationDelete:
694
674
instance := e.Commit.RKey
695
675
696
-
ddb, ok := i.Db.Execer.(*db.DB)
697
-
if !ok {
698
-
return fmt.Errorf("failed to index profile record, invalid db cast")
699
-
}
700
-
701
676
// get record from db first
702
677
spindles, err := db.GetSpindles(
703
678
ctx,
704
-
ddb,
679
+
i.Db,
705
680
orm.FilterEq("owner", did),
706
681
orm.FilterEq("instance", instance),
707
682
)
···
710
685
}
711
686
spindle := spindles[0]
712
687
713
-
tx, err := ddb.Begin()
688
+
tx, err := i.Db.Begin()
714
689
if err != nil {
715
690
return err
716
691
}
···
768
743
l := i.Logger.With("handler", "ingestString", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
769
744
l.Info("ingesting record")
770
745
771
-
ddb, ok := i.Db.Execer.(*db.DB)
772
-
if !ok {
773
-
return fmt.Errorf("failed to index string record, invalid db cast")
774
-
}
775
-
776
746
switch e.Commit.Operation {
777
747
case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
778
748
raw := json.RawMessage(e.Commit.Record)
···
790
760
return err
791
761
}
792
762
793
-
if err = db.AddString(ddb, string); err != nil {
763
+
if err = db.AddString(i.Db, string); err != nil {
794
764
l.Error("failed to add string", "err", err)
795
765
return err
796
766
}
···
799
769
800
770
case jmodels.CommitOperationDelete:
801
771
if err := db.DeleteString(
802
-
ddb,
772
+
i.Db,
803
773
orm.FilterEq("did", did),
804
774
orm.FilterEq("rkey", rkey),
805
775
); err != nil {
···
884
854
885
855
domain := e.Commit.RKey
886
856
887
-
ddb, ok := i.Db.Execer.(*db.DB)
888
-
if !ok {
889
-
return fmt.Errorf("failed to index profile record, invalid db cast")
890
-
}
891
-
892
-
err := db.AddKnot(ddb, domain, did)
857
+
err := db.AddKnot(i.Db, domain, did)
893
858
if err != nil {
894
859
l.Error("failed to add knot to db", "err", err, "domain", domain)
895
860
return err
···
907
872
return err
908
873
}
909
874
910
-
err = serververify.MarkKnotVerified(ddb, i.Enforcer, domain, did)
875
+
err = serververify.MarkKnotVerified(i.Db, i.Enforcer, domain, did)
911
876
if err != nil {
912
877
return fmt.Errorf("failed to mark verified: %w", err)
913
878
}
···
917
882
case jmodels.CommitOperationDelete:
918
883
domain := e.Commit.RKey
919
884
920
-
ddb, ok := i.Db.Execer.(*db.DB)
921
-
if !ok {
922
-
return fmt.Errorf("failed to index knot record, invalid db cast")
923
-
}
924
-
925
885
// get record from db first
926
886
registrations, err := db.GetRegistrations(
927
-
ddb,
887
+
i.Db,
928
888
orm.FilterEq("domain", domain),
929
889
orm.FilterEq("did", did),
930
890
)
···
936
896
}
937
897
registration := registrations[0]
938
898
939
-
tx, err := ddb.Begin()
899
+
tx, err := i.Db.Begin()
940
900
if err != nil {
941
901
return err
942
902
}
···
983
943
l := i.Logger.With("handler", "ingestIssue", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
984
944
l.Info("ingesting record")
985
945
986
-
ddb, ok := i.Db.Execer.(*db.DB)
987
-
if !ok {
988
-
return fmt.Errorf("failed to index issue record, invalid db cast")
989
-
}
990
-
991
946
switch e.Commit.Operation {
992
947
case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
993
948
raw := json.RawMessage(e.Commit.Record)
···
1017
972
}
1018
973
}
1019
974
1020
-
tx, err := ddb.BeginTx(ctx, nil)
975
+
tx, err := i.Db.BeginTx(ctx, nil)
1021
976
if err != nil {
1022
977
l.Error("failed to begin transaction", "err", err)
1023
978
return err
···
1039
994
return nil
1040
995
1041
996
case jmodels.CommitOperationDelete:
1042
-
tx, err := ddb.BeginTx(ctx, nil)
997
+
tx, err := i.Db.BeginTx(ctx, nil)
1043
998
if err != nil {
1044
999
l.Error("failed to begin transaction", "err", err)
1045
1000
return err
···
1074
1029
l := i.Logger.With("handler", "ingestPull", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1075
1030
l.Info("ingesting record")
1076
1031
1077
-
ddb, ok := i.Db.Execer.(*db.DB)
1078
-
if !ok {
1079
-
return fmt.Errorf("failed to index pull record, invalid db cast")
1080
-
}
1081
-
1082
1032
switch e.Commit.Operation {
1083
1033
case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1084
1034
raw := json.RawMessage(e.Commit.Record)
···
1161
1111
return fmt.Errorf("failed to validate pull: %w", err)
1162
1112
}
1163
1113
1164
-
tx, err := ddb.BeginTx(ctx, nil)
1114
+
tx, err := i.Db.BeginTx(ctx, nil)
1165
1115
if err != nil {
1166
1116
l.Error("failed to begin transaction", "err", err)
1167
1117
return err
···
1183
1133
return nil
1184
1134
1185
1135
case jmodels.CommitOperationDelete:
1186
-
tx, err := ddb.BeginTx(ctx, nil)
1136
+
tx, err := i.Db.BeginTx(ctx, nil)
1187
1137
if err != nil {
1188
1138
l.Error("failed to begin transaction", "err", err)
1189
1139
return err
···
1218
1168
l := i.Logger.With("handler", "ingestIssueComment", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1219
1169
l.Info("ingesting record")
1220
1170
1221
-
ddb, ok := i.Db.Execer.(*db.DB)
1222
-
if !ok {
1223
-
return fmt.Errorf("failed to index issue comment record, invalid db cast")
1224
-
}
1225
-
1226
1171
switch e.Commit.Operation {
1227
1172
case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1228
1173
raw := json.RawMessage(e.Commit.Record)
···
1241
1186
return fmt.Errorf("failed to validate comment: %w", err)
1242
1187
}
1243
1188
1244
-
tx, err := ddb.Begin()
1189
+
tx, err := i.Db.Begin()
1245
1190
if err != nil {
1246
1191
return fmt.Errorf("failed to start transaction: %w", err)
1247
1192
}
···
1256
1201
1257
1202
case jmodels.CommitOperationDelete:
1258
1203
if err := db.DeleteIssueComments(
1259
-
ddb,
1204
+
i.Db,
1260
1205
orm.FilterEq("did", did),
1261
1206
orm.FilterEq("rkey", rkey),
1262
1207
); err != nil {
···
1278
1223
l := i.Logger.With("handler", "ingestLabelDefinition", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1279
1224
l.Info("ingesting record")
1280
1225
1281
-
ddb, ok := i.Db.Execer.(*db.DB)
1282
-
if !ok {
1283
-
return fmt.Errorf("failed to index label definition, invalid db cast")
1284
-
}
1285
-
1286
1226
switch e.Commit.Operation {
1287
1227
case jmodels.CommitOperationCreate, jmodels.CommitOperationUpdate:
1288
1228
raw := json.RawMessage(e.Commit.Record)
···
1301
1241
return fmt.Errorf("failed to validate labeldef: %w", err)
1302
1242
}
1303
1243
1304
-
_, err = db.AddLabelDefinition(ddb, def)
1244
+
_, err = db.AddLabelDefinition(i.Db, def)
1305
1245
if err != nil {
1306
1246
return fmt.Errorf("failed to create labeldef: %w", err)
1307
1247
}
···
1310
1250
1311
1251
case jmodels.CommitOperationDelete:
1312
1252
if err := db.DeleteLabelDefinition(
1313
-
ddb,
1253
+
i.Db,
1314
1254
orm.FilterEq("did", did),
1315
1255
orm.FilterEq("rkey", rkey),
1316
1256
); err != nil {
···
1332
1272
l := i.Logger.With("handler", "ingestLabelOp", "nsid", e.Commit.Collection, "did", did, "rkey", rkey)
1333
1273
l.Info("ingesting record")
1334
1274
1335
-
ddb, ok := i.Db.Execer.(*db.DB)
1336
-
if !ok {
1337
-
return fmt.Errorf("failed to index label op, invalid db cast")
1338
-
}
1339
-
1340
1275
switch e.Commit.Operation {
1341
1276
case jmodels.CommitOperationCreate:
1342
1277
raw := json.RawMessage(e.Commit.Record)
···
1352
1287
var repo *models.Repo
1353
1288
switch collection {
1354
1289
case tangled.RepoIssueNSID:
1355
-
i, err := db.GetIssues(ddb, orm.FilterEq("at_uri", subject))
1290
+
i, err := db.GetIssues(i.Db, orm.FilterEq("at_uri", subject))
1356
1291
if err != nil || len(i) != 1 {
1357
1292
return fmt.Errorf("failed to find subject: %w || subject count %d", err, len(i))
1358
1293
}
···
1361
1296
return fmt.Errorf("unsupported label subject: %s", collection)
1362
1297
}
1363
1298
1364
-
actx, err := db.NewLabelApplicationCtx(ddb, orm.FilterIn("at_uri", repo.Labels))
1299
+
actx, err := db.NewLabelApplicationCtx(i.Db, orm.FilterIn("at_uri", repo.Labels))
1365
1300
if err != nil {
1366
1301
return fmt.Errorf("failed to build label application ctx: %w", err)
1367
1302
}
···
1378
1313
}
1379
1314
}
1380
1315
1381
-
tx, err := ddb.Begin()
1316
+
tx, err := i.Db.Begin()
1382
1317
if err != nil {
1383
1318
return err
1384
1319
}
+1
-2
appview/state/state.go
+1
-2
appview/state/state.go
···
118
118
119
119
mentionsResolver := mentions.New(config, res, d, log.SubLogger(logger, "mentionsResolver"))
120
120
121
-
wrapper := db.DbWrapper{Execer: d}
122
121
jc, err := jetstream.NewJetstreamClient(
123
122
config.Jetstream.Endpoint,
124
123
"appview",
···
142
141
},
143
142
nil,
144
143
tlog.SubLogger(logger, "jetstream"),
145
-
wrapper,
144
+
d,
146
145
false,
147
146
148
147
// in-memory filter is inapplicable to appview so
History
9 rounds
0 comments
oyster.cafe
submitted
#8
1 commit
expand
collapse
appview: drop DbWrapper, move jetstream helpers onto *DB
Lewis: May this revision serve well! <lewis@tangled.org>
merge conflicts detected
expand
collapse
expand
collapse
- api/tangled/cbor_gen.go:866
- api/tangled/feedstar.go:5
- api/tangled/gitrefUpdate.go:29
- api/tangled/repocollaborator.go:19
- api/tangled/repoissue.go:22
- api/tangled/repopull.go:39
- api/tangled/tangledrepo.go:24
- cmd/cborgen/cborgen.go:17
- knotserver/xrpc/merge.go:118
- lexicons/feed/star.json:10
- lexicons/git/refUpdate.json:11
- lexicons/issue/issue.json:9
- lexicons/pulls/pull.json:65
- lexicons/repo/collaborator.json:11
- lexicons/repo/repo.json:6
expand 0 comments
oyster.cafe
submitted
#7
1 commit
expand
collapse
appview: drop DbWrapper, move jetstream helpers onto *DB
Lewis: May this revision serve well! <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#6
1 commit
expand
collapse
appview: drop DbWrapper, move jetstream helpers onto *DB
Lewis: May this revision serve well! <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#5
1 commit
expand
collapse
appview: drop DbWrapper, move jetstream helpers onto *DB
Lewis: May this revision serve well! <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#4
1 commit
expand
collapse
appview: drop DbWrapper, move jetstream helpers onto *DB
Lewis: May this revision serve well! <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#3
1 commit
expand
collapse
appview: drop DbWrapper, move jetstream helpers onto *DB
Lewis: May this revision serve well! <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#2
1 commit
expand
collapse
appview: drop DbWrapper, move jetstream helpers onto *DB
Lewis: May this revision serve well! <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#1
1 commit
expand
collapse
appview: drop DbWrapper, move jetstream helpers onto *DB
Lewis: May this revision serve well! <lewis@tangled.org>
expand 0 comments
oyster.cafe
submitted
#0
1 commit
expand
collapse
appview: drop DbWrapper, move jetstream helpers onto *DB
Lewis: May this revision serve well! <lewis@tangled.org>