Mirror of @tangled.org/core. Running on a Raspberry Pi Zero 2 (Please be gentle).
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

knotserver: switch to using official jetstream pkg

+163 -280
+13 -8
go.mod
··· 8 8 github.com/Blank-Xu/sql-adapter v1.1.1 9 9 github.com/bluekeyes/go-gitdiff v0.8.0 10 10 github.com/bluesky-social/indigo v0.0.0-20250123072624-9e3b84fdbb20 11 + github.com/bluesky-social/jetstream v0.0.0-20241210005130-ea96859b93d1 11 12 github.com/casbin/casbin/v2 v2.103.0 12 13 github.com/gliderlabs/ssh v0.3.5 13 14 github.com/go-chi/chi/v5 v5.2.0 ··· 35 34 github.com/bmatcuk/doublestar/v4 v4.7.1 // indirect 36 35 github.com/carlmjohnson/versioninfo v0.22.5 // indirect 37 36 github.com/casbin/govaluate v1.3.0 // indirect 38 - github.com/cespare/xxhash/v2 v2.2.0 // indirect 37 + github.com/cespare/xxhash/v2 v2.3.0 // indirect 39 38 github.com/cloudflare/circl v1.4.0 // indirect 40 39 github.com/cyphar/filepath-securejoin v0.3.3 // indirect 41 40 github.com/davecgh/go-spew v1.1.1 // indirect ··· 45 44 github.com/go-git/go-billy/v5 v5.5.0 // indirect 46 45 github.com/go-logr/logr v1.4.1 // indirect 47 46 github.com/go-logr/stdr v1.2.2 // indirect 47 + github.com/goccy/go-json v0.10.2 // indirect 48 48 github.com/gogo/protobuf v1.3.2 // indirect 49 49 github.com/gorilla/css v1.0.1 // indirect 50 50 github.com/gorilla/securecookie v1.1.2 // indirect ··· 68 66 github.com/jbenet/go-context v0.0.0-20150711004518-d14ea06fba99 // indirect 69 67 github.com/jbenet/goprocess v0.1.4 // indirect 70 68 github.com/kevinburke/ssh_config v1.2.0 // indirect 69 + github.com/klauspost/compress v1.17.9 // indirect 71 70 github.com/klauspost/cpuid/v2 v2.2.7 // indirect 72 71 github.com/mattn/go-isatty v0.0.20 // indirect 73 - github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect 74 72 github.com/minio/sha256-simd v1.0.1 // indirect 75 73 github.com/mr-tron/base58 v1.2.0 // indirect 76 74 github.com/multiformats/go-base32 v0.1.0 // indirect ··· 82 80 github.com/pjbgf/sha1cd v0.3.0 // indirect 83 81 github.com/pmezard/go-difflib v1.0.0 // indirect 84 82 github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f // indirect 85 - github.com/prometheus/client_golang v1.17.0 // indirect 86 - github.com/prometheus/client_model v0.5.0 // indirect 87 - github.com/prometheus/common v0.45.0 // indirect 88 - github.com/prometheus/procfs v0.12.0 // indirect 83 + github.com/prometheus/client_golang v1.19.1 // indirect 84 + github.com/prometheus/client_model v0.6.1 // indirect 85 + github.com/prometheus/common v0.54.0 // indirect 86 + github.com/prometheus/procfs v0.15.1 // indirect 89 87 github.com/sergi/go-diff v1.3.1 // indirect 90 88 github.com/skeema/knownhosts v1.3.0 // indirect 91 89 github.com/spaolacci/murmur3 v1.1.0 // indirect ··· 103 101 golang.org/x/crypto v0.32.0 // indirect 104 102 golang.org/x/net v0.33.0 // indirect 105 103 golang.org/x/sys v0.29.0 // indirect 106 - golang.org/x/time v0.3.0 // indirect 107 - google.golang.org/protobuf v1.33.0 // indirect 104 + golang.org/x/time v0.5.0 // indirect 105 + google.golang.org/protobuf v1.34.2 // indirect 108 106 gopkg.in/warnings.v0 v0.1.2 // indirect 109 107 gopkg.in/yaml.v3 v3.0.1 // indirect 110 108 lukechampine.com/blake3 v1.2.1 // indirect ··· 113 111 replace github.com/sergi/go-diff => github.com/sergi/go-diff v1.1.0 114 112 115 113 replace github.com/go-git/go-git/v5 => github.com/go-git/go-git/v5 v5.6.1 114 + 115 + // from bluesky-social/indigo 116 + replace github.com/gocql/gocql => github.com/scylladb/gocql v1.14.4
+22 -18
go.sum
··· 22 22 github.com/bluekeyes/go-gitdiff v0.8.0/go.mod h1:WWAk1Mc6EgWarCrPFO+xeYlujPu98VuLW3Tu+B/85AE= 23 23 github.com/bluesky-social/indigo v0.0.0-20250123072624-9e3b84fdbb20 h1:yHusfYYi8odoCcsI6AurU+dRWb7itHAQNwt3/Rl9Vfs= 24 24 github.com/bluesky-social/indigo v0.0.0-20250123072624-9e3b84fdbb20/go.mod h1:Qp4YqWf+AQ3TwQCxV5Ls8O2tXE55zVTGVs3zTmn7BOg= 25 + github.com/bluesky-social/jetstream v0.0.0-20241210005130-ea96859b93d1 h1:CFvRtYNSnWRAi/98M3O466t9dYuwtesNbu6FVPymRrA= 26 + github.com/bluesky-social/jetstream v0.0.0-20241210005130-ea96859b93d1/go.mod h1:WiYEeyJSdUwqoaZ71KJSpTblemUCpwJfh5oVXplK6T4= 25 27 github.com/bmatcuk/doublestar/v4 v4.6.1/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc= 26 28 github.com/bmatcuk/doublestar/v4 v4.7.1 h1:fdDeAqgT47acgwd9bd9HxJRDmc9UAmPpc+2m0CXv75Q= 27 29 github.com/bmatcuk/doublestar/v4 v4.7.1/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc= ··· 37 35 github.com/casbin/govaluate v1.2.0/go.mod h1:G/UnbIjZk/0uMNaLwZZmFQrR72tYRZWQkO70si/iR7A= 38 36 github.com/casbin/govaluate v1.3.0 h1:VA0eSY0M2lA86dYd5kPPuNZMUD9QkWnOCnavGrw9myc= 39 37 github.com/casbin/govaluate v1.3.0/go.mod h1:G/UnbIjZk/0uMNaLwZZmFQrR72tYRZWQkO70si/iR7A= 40 - github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= 41 - github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= 38 + github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= 39 + github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= 42 40 github.com/cloudflare/circl v1.1.0/go.mod h1:prBCrKB9DV4poKZY1l9zBXg2QJY7mvgRvtMxxK7fi4I= 43 41 github.com/cloudflare/circl v1.3.3/go.mod h1:5XYMA4rFBvNIrhs50XuiBJ15vF2pZn4nnUKZrLbUZFA= 44 42 github.com/cloudflare/circl v1.4.0 h1:BV7h5MgrktNzytKmWjpOtdYrf0lkkbF8YMlBGPhJQrY= ··· 75 73 github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= 76 74 github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= 77 75 github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0= 76 + github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU= 77 + github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= 78 78 github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= 79 79 github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= 80 80 github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc= ··· 151 147 github.com/kevinburke/ssh_config v1.2.0/go.mod h1:CT57kijsi8u/K/BOFA39wgDQJ9CxiF4nAY/ojJ6r6mM= 152 148 github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= 153 149 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= 150 + github.com/klauspost/compress v1.17.9 h1:6KIumPrER1LHsvBVuDa0r5xaG0Es51mhhB9BQB2qeMA= 151 + github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= 154 152 github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= 155 153 github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= 156 154 github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= ··· 170 164 github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= 171 165 github.com/mattn/go-sqlite3 v1.14.24 h1:tpSp2G2KyMnnQu99ngJ47EIkWVmliIizyZBfPrBWDRM= 172 166 github.com/mattn/go-sqlite3 v1.14.24/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= 173 - github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg= 174 - github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k= 175 167 github.com/microcosm-cc/bluemonday v1.0.27 h1:MpEUotklkwCSLeH+Qdx1VJgNqLlpY2KXwXFM08ygZfk= 176 168 github.com/microcosm-cc/bluemonday v1.0.27/go.mod h1:jFi9vgW+H7c3V0lb6nR74Ib/DIB5OBs92Dimizgw2cA= 177 169 github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM= ··· 201 197 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= 202 198 github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f h1:VXTQfuJj9vKR4TCkEuWIckKvdHFeJH/huIFJ9/cXOB0= 203 199 github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f/go.mod h1:/zvteZs/GwLtCgZ4BL6CBsk9IKIlexP43ObX9AxTqTw= 204 - github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q= 205 - github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= 206 - github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= 207 - github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= 208 - github.com/prometheus/common v0.45.0 h1:2BGz0eBc2hdMDLnO/8n0jeB3oPrt2D08CekT0lneoxM= 209 - github.com/prometheus/common v0.45.0/go.mod h1:YJmSTw9BoKxJplESWWxlbyttQR4uaEcGyv9MZjVOJsY= 210 - github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo= 211 - github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo= 200 + github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE= 201 + github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho= 202 + github.com/prometheus/client_model v0.6.1 h1:ZKSh/rekM+n3CeS952MLRAdFwIKqeY8b62p8ais2e9E= 203 + github.com/prometheus/client_model v0.6.1/go.mod h1:OrxVMOVHjw3lKMa8+x6HeMGkHMQyHDk9E3jmP2AmGiY= 204 + github.com/prometheus/common v0.54.0 h1:ZlZy0BgJhTwVZUn7dLOkwCZHUkrAqd3WYtcFCWnM1D8= 205 + github.com/prometheus/common v0.54.0/go.mod h1:/TQgMJP5CuVYveyT7n/0Ix8yLNNXy9yRSkhnLTHPDIQ= 206 + github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0learggepc= 207 + github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= 212 208 github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= 213 - github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDNXVBh4M= 214 - github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA= 209 + github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= 210 + github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= 215 211 github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= 216 212 github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk= 217 213 github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= ··· 366 362 golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= 367 363 golang.org/x/text v0.21.0 h1:zyQAAkrwaneQ066sspRyJaG9VNi/YJ1NfzcGB3hZ/qo= 368 364 golang.org/x/text v0.21.0/go.mod h1:4IBbMaMmOPCJ8SecivzSH54+73PCFmPWxNTLm+vZkEQ= 369 - golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= 370 - golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= 365 + golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= 366 + golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= 371 367 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= 372 368 golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= 373 369 golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= ··· 388 384 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= 389 385 golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 h1:+cNy6SZtPcJQH3LJVLOSmiC7MMxXNOb3PU/VUEz+EhU= 390 386 golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028/go.mod h1:NDW/Ps6MPRej6fsCIbMTohpP40sJ/P/vI1MoTEGwX90= 391 - google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= 392 - google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= 387 + google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= 388 + google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= 393 389 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= 394 390 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= 395 391 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+2 -3
knotserver/handler.go
··· 9 9 "github.com/go-chi/chi/v5" 10 10 "github.com/sotangled/tangled/knotserver/config" 11 11 "github.com/sotangled/tangled/knotserver/db" 12 - "github.com/sotangled/tangled/knotserver/jsclient" 13 12 "github.com/sotangled/tangled/rbac" 14 13 ) 15 14 ··· 19 20 type Handle struct { 20 21 c *config.Config 21 22 db *db.DB 22 - js *jsclient.JetstreamClient 23 + jc *JetstreamClient 23 24 e *rbac.Enforcer 24 25 l *slog.Logger 25 26 ··· 60 61 if len(dids) > 0 { 61 62 h.knotInitialized = true 62 63 close(h.init) 63 - h.js.UpdateDids(dids) 64 + h.jc.UpdateDids(dids) 64 65 } 65 66 66 67 r.Get("/", h.Index)
+123 -78
knotserver/jetstream.go
··· 8 8 "net/http" 9 9 "net/url" 10 10 "strings" 11 + "sync" 11 12 "time" 12 13 14 + "github.com/bluesky-social/jetstream/pkg/client" 15 + "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 16 + "github.com/bluesky-social/jetstream/pkg/models" 13 17 "github.com/sotangled/tangled/api/tangled" 14 18 "github.com/sotangled/tangled/knotserver/db" 15 - "github.com/sotangled/tangled/knotserver/jsclient" 16 19 "github.com/sotangled/tangled/log" 17 20 ) 21 + 22 + type JetstreamClient struct { 23 + cfg *client.ClientConfig 24 + client *client.Client 25 + reconnectCh chan struct{} 26 + mu sync.RWMutex 27 + } 18 28 19 29 func (h *Handle) StartJetstream(ctx context.Context) error { 20 30 l := h.l.With("component", "jetstream") ··· 37 27 return err 38 28 } 39 29 40 - h.js = jsclient.NewJetstreamClient(collections, dids) 41 - messages, err := h.js.ReadJetstream(ctx, lastTimeUs) 30 + cfg := client.DefaultClientConfig() 31 + cfg.WebsocketURL = "wss://jetstream1.us-west.bsky.network/subscribe" 32 + cfg.WantedCollections = collections 33 + cfg.WantedDids = dids 34 + 35 + sched := sequential.NewScheduler("knotserver", l, h.processMessages) 36 + 37 + client, err := client.NewClient(cfg, l, sched) 42 38 if err != nil { 43 - return fmt.Errorf("failed to read from jetstream: %w", err) 39 + l.Error("failed to create jetstream client", "error", err) 44 40 } 45 41 46 - go h.processMessages(ctx, messages) 42 + jc := &JetstreamClient{ 43 + cfg: cfg, 44 + client: client, 45 + reconnectCh: make(chan struct{}), 46 + } 47 47 48 + h.jc = jc 49 + 50 + go func() { 51 + for len(h.jc.cfg.WantedDids) == 0 { 52 + time.Sleep(time.Second) 53 + } 54 + h.connectAndRead(ctx, &lastTimeUs) 55 + }() 48 56 return nil 57 + } 58 + 59 + func (h *Handle) connectAndRead(ctx context.Context, cursor *int64) { 60 + l := log.FromContext(ctx) 61 + for { 62 + select { 63 + case <-h.jc.reconnectCh: 64 + l.Info("reconnecting jetstream client") 65 + h.jc.client.Scheduler.Shutdown() 66 + if err := h.jc.client.ConnectAndRead(ctx, cursor); err != nil { 67 + l.Error("error reading jetstream", "error", err) 68 + } 69 + default: 70 + if err := h.jc.client.ConnectAndRead(ctx, cursor); err != nil { 71 + l.Error("error reading jetstream", "error", err) 72 + } 73 + } 74 + } 75 + } 76 + 77 + func (j *JetstreamClient) UpdateDids(dids []string) { 78 + j.mu.Lock() 79 + j.cfg.WantedDids = dids 80 + j.mu.Unlock() 81 + j.reconnectCh <- struct{}{} 49 82 } 50 83 51 84 func (h *Handle) getLastTimeUs(ctx context.Context) (int64, error) { ··· 113 60 return lastTimeUs, nil 114 61 } 115 62 116 - func (h *Handle) processPublicKey(ctx context.Context, did string, record map[string]interface{}) error { 63 + func (h *Handle) processPublicKey(ctx context.Context, did string, record tangled.PublicKey) error { 117 64 l := log.FromContext(ctx) 118 - if err := h.db.AddPublicKeyFromRecord(did, record); err != nil { 65 + pk := db.PublicKey{ 66 + Did: did, 67 + PublicKey: record, 68 + } 69 + if err := h.db.AddPublicKey(pk); err != nil { 119 70 l.Error("failed to add public key", "error", err) 120 71 return fmt.Errorf("failed to add public key: %w", err) 121 72 } 122 73 l.Info("added public key from firehose", "did", did) 74 + return nil 75 + } 76 + 77 + func (h *Handle) processKnotMember(ctx context.Context, did string, record tangled.KnotMember) error { 78 + l := log.FromContext(ctx) 79 + 80 + if record.Domain != h.c.Server.Hostname { 81 + l.Error("domain mismatch", "domain", record.Domain, "expected", h.c.Server.Hostname) 82 + return fmt.Errorf("domain mismatch: %s != %s", record.Domain, h.c.Server.Hostname) 83 + } 84 + 85 + ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite") 86 + if err != nil || !ok { 87 + l.Error("failed to add member", "did", did) 88 + return fmt.Errorf("failed to enforce permissions: %w", err) 89 + } 90 + 91 + l.Info("adding member") 92 + if err := h.e.AddMember(ThisServer, record.Member); err != nil { 93 + l.Error("failed to add member", "error", err) 94 + return fmt.Errorf("failed to add member: %w", err) 95 + } 96 + l.Info("added member from firehose", "member", record.Member) 97 + 98 + if err := h.db.AddDid(did); err != nil { 99 + l.Error("failed to add did", "error", err) 100 + return fmt.Errorf("failed to add did: %w", err) 101 + } 102 + 103 + if err := h.fetchAndAddKeys(ctx, did); err != nil { 104 + return fmt.Errorf("failed to fetch and add keys: %w", err) 105 + } 106 + 107 + h.jc.UpdateDids([]string{did}) 123 108 return nil 124 109 } 125 110 ··· 178 87 defer resp.Body.Close() 179 88 180 89 if ct := resp.Header.Get("Content-Type"); !strings.HasPrefix(ct, "text/plain") { 181 - l.Error("unexpected content type", "content-type", ct) 182 90 return fmt.Errorf("unexpected content type: %s", ct) 183 91 } 184 92 ··· 203 113 return nil 204 114 } 205 115 206 - func (h *Handle) processKnotMember(ctx context.Context, did string, record map[string]interface{}) error { 207 - l := log.FromContext(ctx) 116 + func (h *Handle) processMessages(ctx context.Context, event *models.Event) error { 117 + did := event.Did 208 118 209 - if record["domain"] != h.c.Server.Hostname { 210 - l.Error("domain mismatch", "domain", record["domain"], "expected", h.c.Server.Hostname) 211 - return fmt.Errorf("domain mismatch: %s != %s", record["domain"], h.c.Server.Hostname) 119 + raw := json.RawMessage(event.Commit.Record) 120 + 121 + switch event.Commit.Collection { 122 + case tangled.PublicKeyNSID: 123 + var record tangled.PublicKey 124 + if err := json.Unmarshal(raw, &record); err != nil { 125 + return fmt.Errorf("failed to unmarshal record: %w", err) 126 + } 127 + if err := h.processPublicKey(ctx, did, record); err != nil { 128 + return fmt.Errorf("failed to process public key: %w", err) 129 + } 130 + 131 + case tangled.KnotMemberNSID: 132 + var record tangled.KnotMember 133 + if err := json.Unmarshal(raw, &record); err != nil { 134 + return fmt.Errorf("failed to unmarshal record: %w", err) 135 + } 136 + if err := h.processKnotMember(ctx, did, record); err != nil { 137 + return fmt.Errorf("failed to process knot member: %w", err) 138 + } 212 139 } 213 140 214 - ok, err := h.e.E.Enforce(did, ThisServer, ThisServer, "server:invite") 215 - if err != nil || !ok { 216 - l.Error("failed to add member", "did", did) 217 - return fmt.Errorf("failed to enforce permissions: %w", err) 141 + lastTimeUs := event.TimeUS 142 + if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 143 + return fmt.Errorf("failed to save last time us: %w", err) 218 144 } 219 145 220 - l.Info("adding member") 221 - if err := h.e.AddMember(ThisServer, record["member"].(string)); err != nil { 222 - l.Error("failed to add member", "error", err) 223 - return fmt.Errorf("failed to add member: %w", err) 224 - } 225 - l.Info("added member from firehose", "member", record["member"]) 226 - 227 - if err := h.db.AddDid(did); err != nil { 228 - l.Error("failed to add did", "error", err) 229 - return fmt.Errorf("failed to add did: %w", err) 230 - } 231 - 232 - if err := h.fetchAndAddKeys(ctx, did); err != nil { 233 - return fmt.Errorf("failed to fetch and add keys: %w", err) 234 - } 235 - 236 - h.js.UpdateDids([]string{did}) 237 146 return nil 238 - } 239 - 240 - func (h *Handle) processMessages(ctx context.Context, messages <-chan []byte) { 241 - l := log.FromContext(ctx) 242 - l.Info("waiting for knot to be initialized") 243 - <-h.init 244 - l.Info("initialized jetstream watcher") 245 - 246 - for msg := range messages { 247 - var data map[string]interface{} 248 - if err := json.Unmarshal(msg, &data); err != nil { 249 - l.Error("error unmarshaling message", "error", err) 250 - continue 251 - } 252 - 253 - if kind, ok := data["kind"].(string); ok && kind == "commit" { 254 - commit := data["commit"].(map[string]interface{}) 255 - did := data["did"].(string) 256 - record := commit["record"].(map[string]interface{}) 257 - 258 - var processErr error 259 - switch commit["collection"].(string) { 260 - case tangled.PublicKeyNSID: 261 - if err := h.processPublicKey(ctx, did, record); err != nil { 262 - processErr = fmt.Errorf("failed to process public key: %w", err) 263 - } 264 - case tangled.KnotMemberNSID: 265 - if err := h.processKnotMember(ctx, did, record); err != nil { 266 - processErr = fmt.Errorf("failed to process knot member: %w", err) 267 - } 268 - } 269 - 270 - if processErr != nil { 271 - l.Error("error processing message", "error", processErr) 272 - continue 273 - } 274 - 275 - lastTimeUs := int64(data["time_us"].(float64)) 276 - if err := h.db.SaveLastTimeUs(lastTimeUs); err != nil { 277 - l.Error("failed to save last time us", "error", err) 278 - continue 279 - } 280 - } 281 - } 282 147 }
-170
knotserver/jsclient/jetstream.go
··· 1 - package jsclient 2 - 3 - import ( 4 - "context" 5 - "fmt" 6 - "log" 7 - "net/url" 8 - "sync" 9 - "time" 10 - 11 - "github.com/gorilla/websocket" 12 - ) 13 - 14 - type JetstreamClient struct { 15 - collections []string 16 - dids []string 17 - conn *websocket.Conn 18 - mu sync.RWMutex 19 - reconnectCh chan struct{} 20 - } 21 - 22 - func NewJetstreamClient(collections, dids []string) *JetstreamClient { 23 - return &JetstreamClient{ 24 - collections: collections, 25 - dids: dids, 26 - reconnectCh: make(chan struct{}, 1), 27 - } 28 - } 29 - 30 - func (j *JetstreamClient) buildWebsocketURL(queryParams string) url.URL { 31 - 32 - u := url.URL{ 33 - Scheme: "wss", 34 - Host: "jetstream1.us-west.bsky.network", 35 - Path: "/subscribe", 36 - RawQuery: queryParams, 37 - } 38 - 39 - return u 40 - } 41 - 42 - // UpdateCollections updates the collections list and triggers a reconnection 43 - func (j *JetstreamClient) UpdateCollections(collections []string) { 44 - j.mu.Lock() 45 - j.collections = collections 46 - j.mu.Unlock() 47 - j.triggerReconnect() 48 - } 49 - 50 - // UpdateDids updates the Dids list and triggers a reconnection 51 - func (j *JetstreamClient) UpdateDids(dids []string) { 52 - j.mu.Lock() 53 - j.dids = dids 54 - j.mu.Unlock() 55 - j.triggerReconnect() 56 - } 57 - 58 - // Adds one did to the did list 59 - func (j *JetstreamClient) AddDid(did string) { 60 - j.mu.Lock() 61 - j.dids = append(j.dids, did) 62 - j.mu.Unlock() 63 - j.triggerReconnect() 64 - } 65 - 66 - func (j *JetstreamClient) triggerReconnect() { 67 - select { 68 - case j.reconnectCh <- struct{}{}: 69 - default: 70 - // Channel already has a pending reconnect 71 - } 72 - } 73 - 74 - func (j *JetstreamClient) buildQueryParams(cursor int64) string { 75 - j.mu.RLock() 76 - defer j.mu.RUnlock() 77 - 78 - var collections, dids string 79 - if len(j.collections) > 0 { 80 - collections = fmt.Sprintf("wantedCollections=%s&cursor=%d", j.collections[0], cursor) 81 - for _, collection := range j.collections[1:] { 82 - collections += fmt.Sprintf("&wantedCollections=%s", collection) 83 - } 84 - } 85 - if len(j.dids) > 0 { 86 - for i, did := range j.dids { 87 - if i == 0 { 88 - dids = fmt.Sprintf("wantedDids=%s", did) 89 - } else { 90 - dids += fmt.Sprintf("&wantedDids=%s", did) 91 - } 92 - } 93 - } 94 - 95 - var queryStr string 96 - if collections != "" && dids != "" { 97 - queryStr = collections + "&" + dids 98 - } else if collections != "" { 99 - queryStr = collections 100 - } else if dids != "" { 101 - queryStr = dids 102 - } 103 - 104 - return queryStr 105 - } 106 - 107 - func (j *JetstreamClient) connect(cursor int64) error { 108 - queryParams := j.buildQueryParams(cursor) 109 - u := j.buildWebsocketURL(queryParams) 110 - 111 - dialer := websocket.Dialer{ 112 - HandshakeTimeout: 10 * time.Second, 113 - } 114 - 115 - conn, _, err := dialer.Dial(u.String(), nil) 116 - if err != nil { 117 - return err 118 - } 119 - 120 - if j.conn != nil { 121 - j.conn.Close() 122 - } 123 - j.conn = conn 124 - return nil 125 - } 126 - 127 - func (j *JetstreamClient) readMessages(ctx context.Context, messages chan []byte) { 128 - defer close(messages) 129 - defer j.conn.Close() 130 - 131 - ticker := time.NewTicker(1 * time.Second) 132 - defer ticker.Stop() 133 - 134 - for { 135 - select { 136 - case <-ctx.Done(): 137 - return 138 - case <-j.reconnectCh: 139 - // Reconnect with new parameters 140 - cursor := time.Now().Add(-5 * time.Second).UnixMicro() 141 - if err := j.connect(cursor); err != nil { 142 - log.Printf("error reconnecting to jetstream: %v", err) 143 - return 144 - } 145 - case <-ticker.C: 146 - _, message, err := j.conn.ReadMessage() 147 - if err != nil { 148 - log.Printf("error reading from websocket: %v", err) 149 - return 150 - } 151 - messages <- message 152 - } 153 - } 154 - } 155 - 156 - func (j *JetstreamClient) ReadJetstream(ctx context.Context, lastTimestamp int64) (chan []byte, error) { 157 - if lastTimestamp == 0 { 158 - lastTimestamp = time.Now().Add(-5 * time.Second).UnixMicro() 159 - } 160 - 161 - if err := j.connect(lastTimestamp); err != nil { 162 - log.Printf("error connecting to jetstream: %v", err) 163 - return nil, err 164 - } 165 - 166 - messages := make(chan []byte) 167 - go j.readMessages(ctx, messages) 168 - 169 - return messages, nil 170 - }
+3 -3
knotserver/routes.go
··· 436 436 return 437 437 } 438 438 439 - h.js.UpdateDids([]string{did}) 439 + h.jc.UpdateDids([]string{did}) 440 440 if err := h.e.AddMember(ThisServer, did); err != nil { 441 441 l.Error("adding member", "error", err.Error()) 442 442 writeError(w, err.Error(), http.StatusInternalServerError) ··· 472 472 writeError(w, err.Error(), http.StatusInternalServerError) 473 473 return 474 474 } 475 - h.js.UpdateDids([]string{data.Did}) 475 + h.jc.UpdateDids([]string{data.Did}) 476 476 477 477 repoName := filepath.Join(ownerDid, repo) 478 478 if err := h.e.AddRepo(data.Did, ThisServer, repoName); err != nil { ··· 520 520 return 521 521 } 522 522 523 - h.js.UpdateDids([]string{data.Did}) 523 + h.jc.UpdateDids([]string{data.Did}) 524 524 if err := h.e.AddOwner(ThisServer, data.Did); err != nil { 525 525 l.Error("adding owner", "error", err.Error()) 526 526 writeError(w, err.Error(), http.StatusInternalServerError)