Mirror of @tangled.org/core. Running on a Raspberry Pi Zero 2 (Please be gentle).
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

knotmirror: introduce knotmirror

KnotMirror is an external service that is intended to be used by
appview. It will ingest all known git repos and provide xrpc methods to
inspect them, so appview won't need to fetch individual knots on every
page render.

Using postgres exclusively instead of sqlite to support A LOT of
concurrent writes.

Signed-off-by: Seongmin Lee <git@boltless.me>

authored by

Seongmin Lee and committed by tangled.org aa77df5a b1c60fc4

+2619 -38
+58
cmd/knotmirror/main.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "log/slog" 6 + "os" 7 + "os/signal" 8 + "syscall" 9 + 10 + "github.com/carlmjohnson/versioninfo" 11 + "github.com/urfave/cli/v3" 12 + "tangled.org/core/knotmirror" 13 + "tangled.org/core/knotmirror/config" 14 + "tangled.org/core/log" 15 + ) 16 + 17 + func main() { 18 + if err := run(os.Args); err != nil { 19 + slog.Error("error running knotmirror", "err", err) 20 + os.Exit(-1) 21 + } 22 + } 23 + 24 + func run(args []string) error { 25 + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) 26 + defer cancel() 27 + 28 + logger := log.New("knotmirror") 29 + slog.SetDefault(logger) 30 + ctx = log.IntoContext(ctx, logger) 31 + 32 + app := cli.Command{ 33 + Name: "knotmirror", 34 + Usage: "knot mirroring service", 35 + Version: versioninfo.Short(), 36 + } 37 + app.Flags = []cli.Flag{} 38 + app.Commands = []*cli.Command{ 39 + { 40 + Name: "serve", 41 + Usage: "run the knotmirror daemon", 42 + Action: runKnotMirror, 43 + Flags: []cli.Flag{}, 44 + }, 45 + } 46 + return app.Run(ctx, args) 47 + } 48 + 49 + func runKnotMirror(ctx context.Context, cmd *cli.Command) error { 50 + logger := log.FromContext(ctx) 51 + cfg, err := config.Load(ctx) 52 + if err != nil { 53 + return err 54 + } 55 + 56 + logger.Debug("config loaded:", "config", cfg) 57 + return knotmirror.Run(ctx, cfg) 58 + }
+3 -1
flake.nix
··· 107 107 knot = self.callPackage ./nix/pkgs/knot.nix {}; 108 108 dolly = self.callPackage ./nix/pkgs/dolly.nix {}; 109 109 tap = self.callPackage ./nix/pkgs/tap.nix {}; 110 + knotmirror = self.callPackage ./nix/pkgs/knot-mirror.nix {}; 110 111 }); 111 112 in { 112 113 overlays.default = final: prev: { 113 - inherit (mkPackageSet final) lexgen goat sqlite-lib spindle knot-unwrapped knot appview docs dolly tap; 114 + inherit (mkPackageSet final) lexgen goat sqlite-lib spindle knot-unwrapped knot appview docs dolly tap knotmirror; 114 115 }; 115 116 116 117 packages = forAllSystems (system: let ··· 207 206 pkgs.coreutils # for those of us who are on systems that use busybox (alpine) 208 207 packages'.lexgen 209 208 packages'.treefmt-wrapper 209 + packages'.tap 210 210 ]; 211 211 shellHook = '' 212 212 mkdir -p appview/pages/static
+12 -7
go.mod
··· 35 35 github.com/hiddeco/sshsig v0.2.0 36 36 github.com/hpcloud/tail v1.0.0 37 37 github.com/ipfs/go-cid v0.5.0 38 + github.com/jackc/pgx/v5 v5.8.0 38 39 github.com/mattn/go-sqlite3 v1.14.24 39 40 github.com/microcosm-cc/bluemonday v1.0.27 40 41 github.com/openbao/openbao/api/v2 v2.3.0 41 42 github.com/posthog/posthog-go v1.5.5 43 + github.com/prometheus/client_golang v1.23.2 42 44 github.com/redis/go-redis/v9 v9.7.3 43 45 github.com/resend/resend-go/v2 v2.15.0 44 46 github.com/sethvargo/go-envconfig v1.1.0 45 47 github.com/srwiley/oksvg v0.0.0-20221011165216-be6e8873101c 46 48 github.com/srwiley/rasterx v0.0.0-20220730225603-2ab79fcdd4ef 47 - github.com/stretchr/testify v1.10.0 49 + github.com/stretchr/testify v1.11.1 48 50 github.com/urfave/cli/v3 v3.4.1 49 51 github.com/whyrusleeping/cbor-gen v0.3.1 50 52 github.com/yuin/goldmark v1.7.13 ··· 54 52 github.com/yuin/goldmark-highlighting/v2 v2.0.0-20230729083705-37449abec8cc 55 53 gitlab.com/staticnoise/goldmark-callout v0.0.0-20240609120641-6366b799e4ab 56 54 go.abhg.dev/goldmark/mermaid v0.6.0 57 - golang.org/x/crypto v0.40.0 55 + golang.org/x/crypto v0.41.0 58 56 golang.org/x/image v0.31.0 59 - golang.org/x/net v0.42.0 57 + golang.org/x/net v0.43.0 60 58 golang.org/x/xerrors v0.0.0-20240903120638-7835f813f4da 61 59 gopkg.in/yaml.v3 v3.0.1 62 60 ) ··· 163 161 github.com/ipfs/go-log v1.0.5 // indirect 164 162 github.com/ipfs/go-log/v2 v2.6.0 // indirect 165 163 github.com/ipfs/go-metrics-interface v0.3.0 // indirect 164 + github.com/jackc/pgpassfile v1.0.0 // indirect 165 + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect 166 + github.com/jackc/puddle/v2 v2.2.2 // indirect 166 167 github.com/json-iterator/go v1.1.12 // indirect 167 168 github.com/kevinburke/ssh_config v1.2.0 // indirect 168 169 github.com/klauspost/compress v1.18.0 // indirect ··· 198 193 github.com/pkg/errors v0.9.1 // indirect 199 194 github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect 200 195 github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f // indirect 201 - github.com/prometheus/client_golang v1.22.0 // indirect 202 196 github.com/prometheus/client_model v0.6.2 // indirect 203 - github.com/prometheus/common v0.64.0 // indirect 197 + github.com/prometheus/common v0.66.1 // indirect 204 198 github.com/prometheus/procfs v0.16.1 // indirect 205 199 github.com/rivo/uniseg v0.4.7 // indirect 206 200 github.com/ryanuber/go-glob v1.0.0 // indirect ··· 226 222 go.uber.org/atomic v1.11.0 // indirect 227 223 go.uber.org/multierr v1.11.0 // indirect 228 224 go.uber.org/zap v1.27.0 // indirect 225 + go.yaml.in/yaml/v2 v2.4.2 // indirect 229 226 golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b // indirect 230 227 golang.org/x/sync v0.17.0 // indirect 231 - golang.org/x/sys v0.34.0 // indirect 228 + golang.org/x/sys v0.35.0 // indirect 232 229 golang.org/x/text v0.29.0 // indirect 233 230 golang.org/x/time v0.12.0 // indirect 234 231 google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 // indirect 235 232 google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect 236 233 google.golang.org/grpc v1.73.0 // indirect 237 - google.golang.org/protobuf v1.36.6 // indirect 234 + google.golang.org/protobuf v1.36.8 // indirect 238 235 gopkg.in/fsnotify.v1 v1.4.7 // indirect 239 236 gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect 240 237 gopkg.in/warnings.v0 v0.1.2 // indirect
+28 -16
go.sum
··· 350 350 github.com/ipfs/go-log/v2 v2.6.0/go.mod h1:p+Efr3qaY5YXpx9TX7MoLCSEZX5boSWj9wh86P5HJa8= 351 351 github.com/ipfs/go-metrics-interface v0.3.0 h1:YwG7/Cy4R94mYDUuwsBfeziJCVm9pBMJ6q/JR9V40TU= 352 352 github.com/ipfs/go-metrics-interface v0.3.0/go.mod h1:OxxQjZDGocXVdyTPocns6cOLwHieqej/jos7H4POwoY= 353 + github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= 354 + github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= 355 + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= 356 + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= 357 + github.com/jackc/pgx/v5 v5.8.0 h1:TYPDoleBBme0xGSAX3/+NujXXtpZn9HBONkQC7IEZSo= 358 + github.com/jackc/pgx/v5 v5.8.0/go.mod h1:QVeDInX2m9VyzvNeiCJVjCkNFqzsNb43204HshNSZKw= 359 + github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= 360 + github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= 353 361 github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= 354 362 github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= 355 363 github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= ··· 379 371 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= 380 372 github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= 381 373 github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= 374 + github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= 375 + github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= 382 376 github.com/lucasb-eyer/go-colorful v1.2.0 h1:1nnpGOrhyZZuNyfu1QjKiUICQ74+3FNCN69Aj6K7nkY= 383 377 github.com/lucasb-eyer/go-colorful v1.2.0/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0= 384 378 github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= ··· 482 472 github.com/polydawn/refmt v0.89.1-0.20221221234430-40501e09de1f/go.mod h1:/zvteZs/GwLtCgZ4BL6CBsk9IKIlexP43ObX9AxTqTw= 483 473 github.com/posthog/posthog-go v1.5.5 h1:2o3j7IrHbTIfxRtj4MPaXKeimuTYg49onNzNBZbwksM= 484 474 github.com/posthog/posthog-go v1.5.5/go.mod h1:3RqUmSnPuwmeVj/GYrS75wNGqcAKdpODiwc83xZWgdE= 485 - github.com/prometheus/client_golang v1.22.0 h1:rb93p9lokFEsctTys46VnV1kLCDpVZ0a/Y92Vm0Zc6Q= 486 - github.com/prometheus/client_golang v1.22.0/go.mod h1:R7ljNsLXhuQXYZYtw6GAE9AZg8Y7vEW5scdCXrWRXC0= 475 + github.com/prometheus/client_golang v1.23.2 h1:Je96obch5RDVy3FDMndoUsjAhG5Edi49h0RJWRi/o0o= 476 + github.com/prometheus/client_golang v1.23.2/go.mod h1:Tb1a6LWHB3/SPIzCoaDXI4I8UHKeFTEQ1YCr+0Gyqmg= 487 477 github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= 488 478 github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= 489 - github.com/prometheus/common v0.64.0 h1:pdZeA+g617P7oGv1CzdTzyeShxAGrTBsolKNOLQPGO4= 490 - github.com/prometheus/common v0.64.0/go.mod h1:0gZns+BLRQ3V6NdaerOhMbwwRbNh9hkGINtQAsP5GS8= 479 + github.com/prometheus/common v0.66.1 h1:h5E0h5/Y8niHc5DlaLlWLArTQI7tMrsfQjHV+d9ZoGs= 480 + github.com/prometheus/common v0.66.1/go.mod h1:gcaUsgf3KfRSwHY4dIMXLPV0K/Wg1oZ8+SbZk/HH/dA= 491 481 github.com/prometheus/procfs v0.16.1 h1:hZ15bTNuirocR6u0JZ6BAHHmwS1p8B4P6MRqxtzMyRg= 492 482 github.com/prometheus/procfs v0.16.1/go.mod h1:teAbpZRB1iIAJYREa1LsoWUXykVXA1KlTmWl8x/U+Is= 493 483 github.com/redis/go-redis/v9 v9.0.0-rc.4/go.mod h1:Vo3EsyWnicKnSKCA7HhgnvnyA74wOA69Cd2Meli5mmA= ··· 533 523 github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= 534 524 github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= 535 525 github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= 536 - github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= 537 - github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= 526 + github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= 527 + github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= 538 528 github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= 539 529 github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= 540 530 github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= ··· 618 608 go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ= 619 609 go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= 620 610 go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= 611 + go.yaml.in/yaml/v2 v2.4.2 h1:DzmwEr2rDGHl7lsFgAHxmNz/1NlQ7xLIrlN2h5d1eGI= 612 + go.yaml.in/yaml/v2 v2.4.2/go.mod h1:081UH+NErpNdqlCXm3TtEran0rJZGxAYx9hb/ELlsPU= 621 613 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= 622 614 golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= 623 615 golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= ··· 627 615 golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= 628 616 golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= 629 617 golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU= 630 - golang.org/x/crypto v0.40.0 h1:r4x+VvoG5Fm+eJcxMaY8CQM7Lb0l1lsmjGBQ6s8BfKM= 631 - golang.org/x/crypto v0.40.0/go.mod h1:Qr1vMER5WyS2dfPHAlsOj01wgLbsyWtFn/aY+5+ZdxY= 618 + golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4= 619 + golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc= 632 620 golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b h1:M2rDM6z3Fhozi9O7NWsxAkg/yqS/lQJ6PmkyIV3YP+o= 633 621 golang.org/x/exp v0.0.0-20250620022241-b7579e27df2b/go.mod h1:3//PLf8L/X+8b4vuAfHzxeRUl04Adcb341+IGKfnqS8= 634 622 golang.org/x/image v0.31.0 h1:mLChjE2MV6g1S7oqbXC0/UcKijjm5fnJLUYKIYrLESA= ··· 663 651 golang.org/x/net v0.5.0/go.mod h1:DivGGAXEgPSlEBzxGzZI+ZLohi+xUj054jfeKui00ws= 664 652 golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= 665 653 golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= 666 - golang.org/x/net v0.42.0 h1:jzkYrhi3YQWD6MLBJcsklgQsoAcw89EcZbJw8Z614hs= 667 - golang.org/x/net v0.42.0/go.mod h1:FF1RA5d3u7nAYA4z2TkclSCKh68eSXtiFwcWQpPXdt8= 654 + golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= 655 + golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= 668 656 golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= 669 657 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= 670 658 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= ··· 704 692 golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= 705 693 golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= 706 694 golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= 707 - golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= 708 - golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= 695 + golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= 696 + golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= 709 697 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= 710 698 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= 711 699 golang.org/x/term v0.1.0/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= ··· 715 703 golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= 716 704 golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= 717 705 golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk= 718 - golang.org/x/term v0.33.0 h1:NuFncQrRcaRvVmgRkvM3j/F00gWIAlcmlB8ACEKmGIg= 719 - golang.org/x/term v0.33.0/go.mod h1:s18+ql9tYWp1IfpV9DmCtQDDSRBUjKaw9M1eAv5UeF0= 706 + golang.org/x/term v0.34.0 h1:O/2T7POpk0ZZ7MAzMeWFSg6S5IpWd/RXDlM9hgM3DR4= 707 + golang.org/x/term v0.34.0/go.mod h1:5jC53AEywhIVebHgPVeg0mj8OD3VO9OzclacVrqpaAw= 720 708 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= 721 709 golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= 722 710 golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= ··· 769 757 google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= 770 758 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= 771 759 google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= 772 - google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= 773 - google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= 760 + google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= 761 + google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= 774 762 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= 775 763 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= 776 764 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+182
knotmirror/adminpage.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "database/sql" 5 + "embed" 6 + "fmt" 7 + "html" 8 + "html/template" 9 + "log/slog" 10 + "net/http" 11 + "strconv" 12 + "time" 13 + 14 + "github.com/bluesky-social/indigo/atproto/syntax" 15 + "github.com/go-chi/chi/v5" 16 + "tangled.org/core/appview/pagination" 17 + "tangled.org/core/knotmirror/db" 18 + "tangled.org/core/knotmirror/models" 19 + ) 20 + 21 + //go:embed templates/*.html 22 + var templateFS embed.FS 23 + 24 + const repoPageSize = 20 25 + 26 + type AdminServer struct { 27 + db *sql.DB 28 + resyncer *Resyncer 29 + logger *slog.Logger 30 + } 31 + 32 + func NewAdminServer(l *slog.Logger, database *sql.DB, resyncer *Resyncer) *AdminServer { 33 + return &AdminServer{ 34 + db: database, 35 + resyncer: resyncer, 36 + logger: l, 37 + } 38 + } 39 + 40 + func (s *AdminServer) Router() http.Handler { 41 + r := chi.NewRouter() 42 + r.Get("/repos", s.handleRepos()) 43 + r.Get("/hosts", s.handleHosts()) 44 + 45 + r.Post("/api/triggerRepoResync", s.handleRepoResyncTrigger()) 46 + r.Post("/api/cancelRepoResync", s.handleRepoResyncCancel()) 47 + return r 48 + } 49 + 50 + func funcmap() template.FuncMap { 51 + return template.FuncMap{ 52 + "add": func(a, b int) int { return a + b }, 53 + "sub": func(a, b int) int { return a - b }, 54 + "readt": func(ts int64) string { 55 + if ts <= 0 { 56 + return "n/a" 57 + } 58 + return time.Unix(ts, 0).Format("2006-01-02 15:04") 59 + }, 60 + "const": func() map[string]any { 61 + return map[string]any{ 62 + "AllRepoStates": models.AllRepoStates, 63 + "AllHostStatuses": models.AllHostStatuses, 64 + } 65 + }, 66 + } 67 + } 68 + 69 + func (s *AdminServer) handleRepos() http.HandlerFunc { 70 + tpl := template.Must(template.New("").Funcs(funcmap()).ParseFS(templateFS, "templates/base.html", "templates/repos.html")) 71 + return func(w http.ResponseWriter, r *http.Request) { 72 + pageNum, _ := strconv.Atoi(r.URL.Query().Get("page")) 73 + if pageNum < 1 { 74 + pageNum = 1 75 + } 76 + page := pagination.Page{ 77 + Offset: (pageNum - 1) * repoPageSize, 78 + Limit: repoPageSize, 79 + } 80 + 81 + var ( 82 + did = r.URL.Query().Get("did") 83 + knot = r.URL.Query().Get("knot") 84 + state = r.URL.Query().Get("state") 85 + ) 86 + 87 + repos, err := db.ListRepos(r.Context(), s.db, page, did, knot, state) 88 + if err != nil { 89 + http.Error(w, err.Error(), http.StatusInternalServerError) 90 + } 91 + counts, err := db.GetRepoCountsByState(r.Context(), s.db) 92 + if err != nil { 93 + http.Error(w, err.Error(), http.StatusInternalServerError) 94 + } 95 + err = tpl.ExecuteTemplate(w, "base", map[string]any{ 96 + "Repos": repos, 97 + "RepoCounts": counts, 98 + "Page": pageNum, 99 + "FilterByDid": did, 100 + "FilterByKnot": knot, 101 + "FilterByState": models.RepoState(state), 102 + }) 103 + if err != nil { 104 + slog.Error("failed to render", "err", err) 105 + } 106 + } 107 + } 108 + 109 + func (s *AdminServer) handleHosts() http.HandlerFunc { 110 + tpl := template.Must(template.New("").Funcs(funcmap()).ParseFS(templateFS, "templates/base.html", "templates/hosts.html")) 111 + return func(w http.ResponseWriter, r *http.Request) { 112 + var status = models.HostStatus(r.URL.Query().Get("status")) 113 + if status == "" { 114 + status = models.HostStatusActive 115 + } 116 + 117 + hosts, err := db.ListHosts(r.Context(), s.db, status) 118 + if err != nil { 119 + http.Error(w, err.Error(), http.StatusInternalServerError) 120 + } 121 + err = tpl.ExecuteTemplate(w, "base", map[string]any{ 122 + "Hosts": hosts, 123 + "FilterByStatus": models.HostStatus(status), 124 + }) 125 + if err != nil { 126 + slog.Error("failed to render", "err", err) 127 + } 128 + } 129 + } 130 + 131 + func (s *AdminServer) handleRepoResyncTrigger() http.HandlerFunc { 132 + return func(w http.ResponseWriter, r *http.Request) { 133 + var repoQuery = r.FormValue("repo") 134 + 135 + repo, err := syntax.ParseATURI(repoQuery) 136 + if err != nil || repo.RecordKey() == "" { 137 + writeNotif(w, http.StatusBadRequest, fmt.Sprintf("repo parameter invalid: %s", repoQuery)) 138 + return 139 + } 140 + 141 + if err := s.resyncer.TriggerResyncJob(r.Context(), repo); err != nil { 142 + s.logger.Error("failed to trigger resync job", "err", err) 143 + writeNotif(w, http.StatusInternalServerError, fmt.Sprintf("repo parameter invalid: %s", repoQuery)) 144 + return 145 + } 146 + writeNotif(w, http.StatusOK, "success") 147 + } 148 + } 149 + 150 + func (s *AdminServer) handleRepoResyncCancel() http.HandlerFunc { 151 + return func(w http.ResponseWriter, r *http.Request) { 152 + var repoQuery = r.FormValue("repo") 153 + 154 + repo, err := syntax.ParseATURI(repoQuery) 155 + if err != nil || repo.RecordKey() == "" { 156 + writeNotif(w, http.StatusBadRequest, fmt.Sprintf("repo parameter invalid: %s", repoQuery)) 157 + return 158 + } 159 + 160 + s.resyncer.CancelResyncJob(repo) 161 + writeNotif(w, http.StatusOK, "success") 162 + } 163 + } 164 + 165 + func writeNotif(w http.ResponseWriter, status int, msg string) { 166 + w.Header().Set("Content-Type", "text/html") 167 + w.WriteHeader(status) 168 + 169 + class := "info" 170 + switch { 171 + case status >= 500: 172 + class = "error" 173 + case status >= 400: 174 + class = "warn" 175 + } 176 + 177 + fmt.Fprintf(w, 178 + `<div hx-swap-oob="beforeend:#notifications"><div class="notif %s">%s</div></div>`, 179 + class, 180 + html.EscapeString(msg), 181 + ) 182 + }
+34
knotmirror/config/config.go
··· 1 + package config 2 + 3 + import ( 4 + "context" 5 + "time" 6 + 7 + "github.com/sethvargo/go-envconfig" 8 + ) 9 + 10 + type Config struct { 11 + TapUrl string `env:"MIRROR_TAP_URL, default=http://localhost:2480"` 12 + DbUrl string `env:"MIRROR_DB_URL, required"` 13 + KnotUseSSL bool `env:"MIRROR_KNOT_USE_SSL, default=false"` // use SSL for Knot when not scheme is not specified 14 + KnotSSRF bool `env:"MIRROR_KNOT_SSRF, default=false"` 15 + GitRepoBasePath string `env:"MIRROR_GIT_BASEPATH, default=repos"` 16 + GitRepoFetchTimeout time.Duration `env:"MIRROR_GIT_FETCH_TIMEOUT, default=600s"` 17 + ResyncParallelism int `env:"MIRROR_RESYNC_PARALLELISM, default=5"` 18 + Slurper SlurperConfig `env:",prefix=MIRROR_SLURPER_"` 19 + MetricsListen string `env:"MIRROR_METRICS_LISTEN, default=127.0.0.1:7100"` 20 + AdminListen string `env:"MIRROR_ADMIN_LISTEN, default=127.0.0.1:7200"` 21 + } 22 + 23 + type SlurperConfig struct { 24 + PersistCursorPeriod time.Duration `env:"PERSIST_CURSOR_PERIOD, default=4s"` 25 + ConcurrencyPerHost int `env:"CONCURRENCY, default=4"` 26 + } 27 + 28 + func Load(ctx context.Context) (*Config, error) { 29 + var cfg Config 30 + if err := envconfig.Process(ctx, &cfg); err != nil { 31 + return nil, err 32 + } 33 + return &cfg, nil 34 + }
+25
knotmirror/crawler.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "log/slog" 7 + 8 + "tangled.org/core/log" 9 + ) 10 + 11 + type Crawler struct { 12 + logger *slog.Logger 13 + db *sql.DB 14 + } 15 + 16 + func NewCrawler(l *slog.Logger, db *sql.DB) *Crawler { 17 + return &Crawler{ 18 + logger: log.SubLogger(l, "crawler"), 19 + db: db, 20 + } 21 + } 22 + 23 + func (c *Crawler) Start(ctx context.Context) { 24 + // TODO: repository crawler 25 + }
+100
knotmirror/db/db.go
··· 1 + package db 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "fmt" 7 + "time" 8 + 9 + _ "github.com/jackc/pgx/v5/stdlib" 10 + ) 11 + 12 + func Make(ctx context.Context, dbUrl string, maxConns int) (*sql.DB, error) { 13 + db, err := sql.Open("pgx", dbUrl) 14 + if err != nil { 15 + return nil, fmt.Errorf("opening db: %w", err) 16 + } 17 + 18 + db.SetMaxOpenConns(maxConns) 19 + db.SetMaxIdleConns(maxConns) 20 + db.SetConnMaxIdleTime(time.Hour) 21 + 22 + pingCtx, cancel := context.WithTimeout(ctx, 5*time.Second) 23 + defer cancel() 24 + if err := db.PingContext(pingCtx); err != nil { 25 + db.Close() 26 + return nil, fmt.Errorf("ping db: %w", err) 27 + } 28 + 29 + conn, err := db.Conn(ctx) 30 + if err != nil { 31 + return nil, err 32 + } 33 + defer conn.Close() 34 + 35 + _, err = conn.ExecContext(ctx, ` 36 + create table if not exists repos ( 37 + did text not null, 38 + rkey text not null, 39 + at_uri text generated always as ('at://' || did || '/' || 'sh.tangled.repo' || '/' || rkey) stored, 40 + cid text not null, 41 + 42 + -- record content 43 + name text not null, 44 + knot_domain text not null, 45 + 46 + -- sync data 47 + git_rev text not null, 48 + repo_sha text not null, 49 + state text not null default 'pending', 50 + error_msg text, 51 + retry_count integer not null default 0, 52 + retry_after integer not null default 0, 53 + db_created_at timestamptz not null default now(), 54 + db_updated_at timestamptz not null default now(), 55 + 56 + constraint repos_pkey primary key (did, rkey) 57 + ); 58 + 59 + -- knot hosts 60 + create table if not exists hosts ( 61 + hostname text not null, 62 + no_ssl boolean not null default false, 63 + status text not null default 'active', 64 + last_seq bigint not null default -1, 65 + db_created_at timestamptz not null default now(), 66 + db_updated_at timestamptz not null default now(), 67 + 68 + constraint hosts_pkey primary key (hostname) 69 + ); 70 + 71 + create index if not exists idx_repos_aturi on repos (at_uri); 72 + create index if not exists idx_repos_db_updated_at on repos (db_updated_at desc); 73 + create index if not exists idx_hosts_db_updated_at on hosts (db_updated_at desc); 74 + 75 + create or replace function set_updated_at() 76 + returns trigger as $$ 77 + begin 78 + new.db_updated_at = now(); 79 + return new; 80 + end; 81 + $$ language plpgsql; 82 + 83 + drop trigger if exists repos_set_updated_at on repos; 84 + create trigger repos_set_updated_at 85 + before update on repos 86 + for each row 87 + execute function set_updated_at(); 88 + 89 + drop trigger if exists hosts_set_updated_at on hosts; 90 + create trigger hosts_set_updated_at 91 + before update on hosts 92 + for each row 93 + execute function set_updated_at(); 94 + `) 95 + if err != nil { 96 + return nil, fmt.Errorf("initializing db schema: %w", err) 97 + } 98 + 99 + return db, nil 100 + }
+102
knotmirror/db/hosts.go
··· 1 + package db 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "errors" 7 + "fmt" 8 + "log" 9 + 10 + "tangled.org/core/knotmirror/models" 11 + ) 12 + 13 + func UpsertHost(ctx context.Context, e *sql.DB, host *models.Host) error { 14 + if _, err := e.ExecContext(ctx, 15 + `insert into hosts (hostname, no_ssl, status, last_seq) 16 + values ($1, $2, $3, $4) 17 + on conflict(hostname) do update set 18 + no_ssl = excluded.no_ssl, 19 + status = excluded.status, 20 + last_seq = excluded.last_seq 21 + `, 22 + host.Hostname, 23 + host.NoSSL, 24 + host.Status, 25 + host.LastSeq, 26 + ); err != nil { 27 + return fmt.Errorf("upserting host: %w", err) 28 + } 29 + return nil 30 + } 31 + 32 + func GetHost(ctx context.Context, e *sql.DB, hostname string) (*models.Host, error) { 33 + var host models.Host 34 + if err := e.QueryRowContext(ctx, 35 + `select hostname, no_ssl, status, last_seq 36 + from hosts where hostname = $1`, 37 + hostname, 38 + ).Scan( 39 + &host.Hostname, 40 + &host.NoSSL, 41 + &host.Status, 42 + &host.LastSeq, 43 + ); err != nil { 44 + if errors.Is(err, sql.ErrNoRows) { 45 + return nil, nil 46 + } 47 + return nil, err 48 + } 49 + return &host, nil 50 + } 51 + 52 + func StoreCursors(ctx context.Context, e *sql.DB, cursors []models.HostCursor) error { 53 + tx, err := e.BeginTx(ctx, nil) 54 + if err != nil { 55 + return fmt.Errorf("starting transaction: %w", err) 56 + } 57 + defer tx.Rollback() 58 + for _, cur := range cursors { 59 + if cur.LastSeq <= 0 { 60 + continue 61 + } 62 + if _, err := tx.ExecContext(ctx, 63 + `update hosts set last_seq = $1 where hostname = $2`, 64 + cur.LastSeq, 65 + cur.Hostname, 66 + ); err != nil { 67 + log.Println("failed to persist host cursor", "host", cur.Hostname, "lastSeq", cur.LastSeq, "err", err) 68 + } 69 + } 70 + return tx.Commit() 71 + } 72 + 73 + func ListHosts(ctx context.Context, e *sql.DB, status models.HostStatus) ([]models.Host, error) { 74 + rows, err := e.QueryContext(ctx, 75 + `select hostname, no_ssl, status, last_seq 76 + from hosts 77 + where status = $1`, 78 + status, 79 + ) 80 + if err != nil { 81 + return nil, fmt.Errorf("querying hosts: %w", err) 82 + } 83 + defer rows.Close() 84 + 85 + var hosts []models.Host 86 + for rows.Next() { 87 + var host models.Host 88 + if err := rows.Scan( 89 + &host.Hostname, 90 + &host.NoSSL, 91 + &host.Status, 92 + &host.LastSeq, 93 + ); err != nil { 94 + return nil, fmt.Errorf("scanning row: %w", err) 95 + } 96 + hosts = append(hosts, host) 97 + } 98 + if err := rows.Err(); err != nil { 99 + return nil, fmt.Errorf("scanning rows: %w ", err) 100 + } 101 + return hosts, nil 102 + }
+275
knotmirror/db/repos.go
··· 1 + package db 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "errors" 7 + "fmt" 8 + 9 + "github.com/bluesky-social/indigo/atproto/syntax" 10 + "tangled.org/core/appview/pagination" 11 + "tangled.org/core/knotmirror/models" 12 + ) 13 + 14 + func AddRepo(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, cid syntax.CID, name, knot string) error { 15 + if _, err := e.ExecContext(ctx, 16 + `insert into repos (did, rkey, cid, name, knot_domain) 17 + values ($1, $2, $3, $4, $5)`, 18 + did, rkey, cid, name, knot, 19 + ); err != nil { 20 + return fmt.Errorf("inserting repo: %w", err) 21 + } 22 + return nil 23 + } 24 + 25 + func UpsertRepo(ctx context.Context, e *sql.DB, repo *models.Repo) error { 26 + if _, err := e.ExecContext(ctx, 27 + `insert into repos (did, rkey, cid, name, knot_domain, git_rev, repo_sha, state, error_msg, retry_count, retry_after) 28 + values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) 29 + on conflict(did, rkey) do update set 30 + cid = excluded.cid, 31 + name = excluded.name, 32 + knot_domain = excluded.knot_domain, 33 + git_rev = excluded.git_rev, 34 + repo_sha = excluded.repo_sha, 35 + state = excluded.state, 36 + error_msg = excluded.error_msg, 37 + retry_count = excluded.retry_count, 38 + retry_after = excluded.retry_after`, 39 + // where repos.cid != excluded.cid`, 40 + repo.Did, 41 + repo.Rkey, 42 + repo.Cid, 43 + repo.Name, 44 + repo.KnotDomain, 45 + repo.GitRev, 46 + repo.RepoSha, 47 + repo.State, 48 + repo.ErrorMsg, 49 + repo.RetryCount, 50 + repo.RetryAfter, 51 + ); err != nil { 52 + return fmt.Errorf("upserting repo: %w", err) 53 + } 54 + return nil 55 + } 56 + 57 + func UpdateRepoState(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, state models.RepoState) error { 58 + if _, err := e.ExecContext(ctx, 59 + `update repos 60 + set state = $1 61 + where did = $2 and rkey = $3`, 62 + state, 63 + did, rkey, 64 + ); err != nil { 65 + return fmt.Errorf("updating repo: %w", err) 66 + } 67 + return nil 68 + } 69 + 70 + func DeleteRepo(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey) error { 71 + if _, err := e.ExecContext(ctx, 72 + `delete from repos where did = $1 and rkey = $2`, 73 + did, 74 + rkey, 75 + ); err != nil { 76 + return fmt.Errorf("deleting repo: %w", err) 77 + } 78 + return nil 79 + } 80 + 81 + func GetRepoByName(ctx context.Context, e *sql.DB, did syntax.DID, name string) (*models.Repo, error) { 82 + var repo models.Repo 83 + if err := e.QueryRowContext(ctx, 84 + `select 85 + did, 86 + rkey, 87 + cid, 88 + name, 89 + knot_domain, 90 + git_rev, 91 + repo_sha, 92 + state, 93 + error_msg, 94 + retry_count, 95 + retry_after 96 + from repos 97 + where did = $1 and name = $2`, 98 + did, 99 + name, 100 + ).Scan( 101 + &repo.Did, 102 + &repo.Rkey, 103 + &repo.Cid, 104 + &repo.Name, 105 + &repo.KnotDomain, 106 + &repo.GitRev, 107 + &repo.RepoSha, 108 + &repo.State, 109 + &repo.ErrorMsg, 110 + &repo.RetryCount, 111 + &repo.RetryAfter, 112 + ); err != nil { 113 + if errors.Is(err, sql.ErrNoRows) { 114 + return nil, nil 115 + } 116 + return nil, fmt.Errorf("querying repo: %w", err) 117 + } 118 + return &repo, nil 119 + } 120 + 121 + func GetRepoByAtUri(ctx context.Context, e *sql.DB, aturi syntax.ATURI) (*models.Repo, error) { 122 + var repo models.Repo 123 + if err := e.QueryRowContext(ctx, 124 + `select 125 + did, 126 + rkey, 127 + cid, 128 + name, 129 + knot_domain, 130 + git_rev, 131 + repo_sha, 132 + state, 133 + error_msg, 134 + retry_count, 135 + retry_after 136 + from repos 137 + where at_uri = $1`, 138 + aturi, 139 + ).Scan( 140 + &repo.Did, 141 + &repo.Rkey, 142 + &repo.Cid, 143 + &repo.Name, 144 + &repo.KnotDomain, 145 + &repo.GitRev, 146 + &repo.RepoSha, 147 + &repo.State, 148 + &repo.ErrorMsg, 149 + &repo.RetryCount, 150 + &repo.RetryAfter, 151 + ); err != nil { 152 + if errors.Is(err, sql.ErrNoRows) { 153 + return nil, nil 154 + } 155 + return nil, fmt.Errorf("querying repo: %w", err) 156 + } 157 + return &repo, nil 158 + } 159 + 160 + func ListRepos(ctx context.Context, e *sql.DB, page pagination.Page, did, knot, state string) ([]models.Repo, error) { 161 + var conditions []string 162 + var args []any 163 + 164 + pageClause := "" 165 + if page.Limit > 0 { 166 + pageClause = " limit $1 offset $2 " 167 + args = append(args, page.Limit, page.Offset) 168 + } 169 + 170 + whereClause := "" 171 + if did != "" { 172 + conditions = append(conditions, fmt.Sprintf("did = $%d", len(args)+1)) 173 + args = append(args, did) 174 + } 175 + if knot != "" { 176 + conditions = append(conditions, fmt.Sprintf("knot_domain = $%d", len(args)+1)) 177 + args = append(args, knot) 178 + } 179 + if state != "" { 180 + conditions = append(conditions, fmt.Sprintf("state = $%d", len(args)+1)) 181 + args = append(args, state) 182 + } 183 + if len(conditions) > 0 { 184 + whereClause = "WHERE " + conditions[0] 185 + for _, condition := range conditions[1:] { 186 + whereClause += " AND " + condition 187 + } 188 + } 189 + 190 + query := ` 191 + select 192 + did, 193 + rkey, 194 + cid, 195 + name, 196 + knot_domain, 197 + git_rev, 198 + repo_sha, 199 + state, 200 + error_msg, 201 + retry_count, 202 + retry_after 203 + from repos 204 + ` + whereClause + pageClause 205 + rows, err := e.QueryContext(ctx, query, args...) 206 + if err != nil { 207 + return nil, err 208 + } 209 + defer rows.Close() 210 + 211 + var repos []models.Repo 212 + for rows.Next() { 213 + var repo models.Repo 214 + if err := rows.Scan( 215 + &repo.Did, 216 + &repo.Rkey, 217 + &repo.Cid, 218 + &repo.Name, 219 + &repo.KnotDomain, 220 + &repo.GitRev, 221 + &repo.RepoSha, 222 + &repo.State, 223 + &repo.ErrorMsg, 224 + &repo.RetryCount, 225 + &repo.RetryAfter, 226 + ); err != nil { 227 + return nil, fmt.Errorf("scanning row: %w", err) 228 + } 229 + repos = append(repos, repo) 230 + } 231 + if err := rows.Err(); err != nil { 232 + return nil, fmt.Errorf("scanning rows: %w ", err) 233 + } 234 + 235 + return repos, nil 236 + } 237 + 238 + func GetRepoCountsByState(ctx context.Context, e *sql.DB) (map[models.RepoState]int64, error) { 239 + const q = ` 240 + SELECT state, COUNT(*) 241 + FROM repos 242 + GROUP BY state 243 + ` 244 + 245 + rows, err := e.QueryContext(ctx, q) 246 + if err != nil { 247 + return nil, err 248 + } 249 + defer rows.Close() 250 + 251 + counts := make(map[models.RepoState]int64) 252 + 253 + for rows.Next() { 254 + var state string 255 + var count int64 256 + 257 + if err := rows.Scan(&state, &count); err != nil { 258 + return nil, err 259 + } 260 + 261 + counts[models.RepoState(state)] = count 262 + } 263 + 264 + if err := rows.Err(); err != nil { 265 + return nil, err 266 + } 267 + 268 + for _, s := range models.AllRepoStates { 269 + if _, ok := counts[s]; !ok { 270 + counts[s] = 0 271 + } 272 + } 273 + 274 + return counts, nil 275 + }
+305
knotmirror/git.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "context" 5 + "errors" 6 + "fmt" 7 + "net/url" 8 + "os" 9 + "os/exec" 10 + "path/filepath" 11 + "regexp" 12 + "strings" 13 + 14 + "github.com/go-git/go-git/v5" 15 + gitconfig "github.com/go-git/go-git/v5/config" 16 + "github.com/go-git/go-git/v5/plumbing/transport" 17 + "tangled.org/core/knotmirror/models" 18 + ) 19 + 20 + type GitMirrorManager interface { 21 + Exist(repo *models.Repo) (bool, error) 22 + // RemoteSetUrl updates git repository 'origin' remote 23 + RemoteSetUrl(ctx context.Context, repo *models.Repo) error 24 + // Clone clones the repository as a mirror 25 + Clone(ctx context.Context, repo *models.Repo) error 26 + // Fetch fetches the repository 27 + Fetch(ctx context.Context, repo *models.Repo) error 28 + // Sync mirrors the repository. It will clone the repository if repository doesn't exist. 29 + Sync(ctx context.Context, repo *models.Repo) error 30 + } 31 + 32 + type CliGitMirrorManager struct { 33 + repoBasePath string 34 + knotUseSSL bool 35 + } 36 + 37 + func NewCliGitMirrorManager(repoBasePath string, knotUseSSL bool) *CliGitMirrorManager { 38 + return &CliGitMirrorManager{ 39 + repoBasePath, 40 + knotUseSSL, 41 + } 42 + } 43 + 44 + var _ GitMirrorManager = new(CliGitMirrorManager) 45 + 46 + func (c *CliGitMirrorManager) makeRepoPath(repo *models.Repo) string { 47 + return filepath.Join(c.repoBasePath, repo.Did.String(), repo.Rkey.String()) 48 + } 49 + 50 + func (c *CliGitMirrorManager) Exist(repo *models.Repo) (bool, error) { 51 + return isDir(c.makeRepoPath(repo)) 52 + } 53 + 54 + func (c *CliGitMirrorManager) RemoteSetUrl(ctx context.Context, repo *models.Repo) error { 55 + path := c.makeRepoPath(repo) 56 + url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL) 57 + if err != nil { 58 + return fmt.Errorf("constructing repo remote url: %w", err) 59 + } 60 + cmd := exec.CommandContext(ctx, "git", "-C", path, "remote", "set-url", "origin", url) 61 + if out, err := cmd.CombinedOutput(); err != nil { 62 + if ctx.Err() != nil { 63 + return ctx.Err() 64 + } 65 + msg := string(out) 66 + return fmt.Errorf("running 'git remote set-url origin %s': %w\n%s", url, err, msg) 67 + } 68 + return nil 69 + } 70 + 71 + func (c *CliGitMirrorManager) Clone(ctx context.Context, repo *models.Repo) error { 72 + path := c.makeRepoPath(repo) 73 + url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL) 74 + if err != nil { 75 + return fmt.Errorf("constructing repo remote url: %w", err) 76 + } 77 + return c.clone(ctx, path, url) 78 + } 79 + 80 + func (c *CliGitMirrorManager) clone(ctx context.Context, path, url string) error { 81 + cmd := exec.CommandContext(ctx, "git", "clone", "--mirror", url, path) 82 + if out, err := cmd.CombinedOutput(); err != nil { 83 + if ctx.Err() != nil { 84 + return ctx.Err() 85 + } 86 + msg := string(out) 87 + if classification := classifyCliError(msg); classification != nil { 88 + return classification 89 + } 90 + return fmt.Errorf("running 'git clone --mirror %s': %w\n%s", url, err, msg) 91 + } 92 + return nil 93 + } 94 + 95 + func (c *CliGitMirrorManager) Fetch(ctx context.Context, repo *models.Repo) error { 96 + path := c.makeRepoPath(repo) 97 + return c.fetch(ctx, path) 98 + } 99 + 100 + func (c *CliGitMirrorManager) fetch(ctx context.Context, path string) error { 101 + // TODO: use `repo.Knot` instead of depending on origin 102 + cmd := exec.CommandContext(ctx, "git", "-C", path, "fetch", "--prune", "origin") 103 + if out, err := cmd.CombinedOutput(); err != nil { 104 + if ctx.Err() != nil { 105 + return ctx.Err() 106 + } 107 + return fmt.Errorf("running 'git fetch': %w\n%s", err, string(out)) 108 + } 109 + return nil 110 + } 111 + 112 + func (c *CliGitMirrorManager) Sync(ctx context.Context, repo *models.Repo) error { 113 + path := c.makeRepoPath(repo) 114 + url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL) 115 + if err != nil { 116 + return fmt.Errorf("constructing repo remote url: %w", err) 117 + } 118 + 119 + exist, err := isDir(path) 120 + if err != nil { 121 + return fmt.Errorf("checking repo path: %w", err) 122 + } 123 + if !exist { 124 + if err := c.clone(ctx, path, url); err != nil { 125 + return fmt.Errorf("cloning repo: %w", err) 126 + } 127 + } else { 128 + if err := c.fetch(ctx, path); err != nil { 129 + return fmt.Errorf("fetching repo: %w", err) 130 + } 131 + } 132 + return nil 133 + } 134 + 135 + var ( 136 + ErrDNSFailure = errors.New("git: knot: dns failure (could not resolve host)") 137 + ErrCertExpired = errors.New("git: knot: certificate has expired") 138 + ErrCertMismatch = errors.New("git: knot: certificate hostname mismatch") 139 + ErrTLSHandshake = errors.New("git: knot: tls handshake failure") 140 + ErrHTTPStatus = errors.New("git: knot: request url returned error") 141 + ErrUnreachable = errors.New("git: knot: could not connect to server") 142 + ErrRepoNotFound = errors.New("git: repo: repository not found") 143 + ) 144 + 145 + var ( 146 + reDNSFailure = regexp.MustCompile(`Could not resolve host:`) 147 + reCertExpired = regexp.MustCompile(`SSL certificate OpenSSL verify result: certificate has expired`) 148 + reCertMismatch = regexp.MustCompile(`SSL: no alternative certificate subject name matches target hostname`) 149 + reTLSHandshake = regexp.MustCompile(`TLS connect error: (.*)`) 150 + reHTTPStatus = regexp.MustCompile(`The requested URL returned error: (\d\d\d)`) 151 + reUnreachable = regexp.MustCompile(`Could not connect to server`) 152 + reRepoNotFound = regexp.MustCompile(`repository '.*?' not found`) 153 + ) 154 + 155 + // classifyCliError classifies git cli error message. It will return nil for unknown error messages 156 + func classifyCliError(stderr string) error { 157 + msg := strings.TrimSpace(stderr) 158 + if m := reTLSHandshake.FindStringSubmatch(msg); len(m) > 1 { 159 + return fmt.Errorf("%w: %s", ErrTLSHandshake, m[1]) 160 + } 161 + if m := reHTTPStatus.FindStringSubmatch(msg); len(m) > 1 { 162 + return fmt.Errorf("%w: %s", ErrHTTPStatus, m[1]) 163 + } 164 + switch { 165 + case reDNSFailure.MatchString(msg): 166 + return ErrDNSFailure 167 + case reCertExpired.MatchString(msg): 168 + return ErrCertExpired 169 + case reCertMismatch.MatchString(msg): 170 + return ErrCertMismatch 171 + case reUnreachable.MatchString(msg): 172 + return ErrUnreachable 173 + case reRepoNotFound.MatchString(msg): 174 + return ErrRepoNotFound 175 + } 176 + return nil 177 + } 178 + 179 + type GoGitMirrorManager struct { 180 + repoBasePath string 181 + knotUseSSL bool 182 + } 183 + 184 + func NewGoGitMirrorClient(repoBasePath string, knotUseSSL bool) *GoGitMirrorManager { 185 + return &GoGitMirrorManager{ 186 + repoBasePath, 187 + knotUseSSL, 188 + } 189 + } 190 + 191 + var _ GitMirrorManager = new(GoGitMirrorManager) 192 + 193 + func (c *GoGitMirrorManager) makeRepoPath(repo *models.Repo) string { 194 + return filepath.Join(c.repoBasePath, repo.Did.String(), repo.Rkey.String()) 195 + } 196 + 197 + func (c *GoGitMirrorManager) Exist(repo *models.Repo) (bool, error) { 198 + return isDir(c.makeRepoPath(repo)) 199 + } 200 + 201 + func (c *GoGitMirrorManager) RemoteSetUrl(ctx context.Context, repo *models.Repo) error { 202 + panic("unimplemented") 203 + } 204 + 205 + func (c *GoGitMirrorManager) Clone(ctx context.Context, repo *models.Repo) error { 206 + path := c.makeRepoPath(repo) 207 + url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL) 208 + if err != nil { 209 + return fmt.Errorf("constructing repo remote url: %w", err) 210 + } 211 + return c.clone(ctx, path, url) 212 + } 213 + 214 + func (c *GoGitMirrorManager) clone(ctx context.Context, path, url string) error { 215 + _, err := git.PlainCloneContext(ctx, path, true, &git.CloneOptions{ 216 + URL: url, 217 + Mirror: true, 218 + }) 219 + if err != nil && !errors.Is(err, transport.ErrEmptyRemoteRepository) { 220 + return fmt.Errorf("cloning repo: %w", err) 221 + } 222 + return nil 223 + } 224 + 225 + func (c *GoGitMirrorManager) Fetch(ctx context.Context, repo *models.Repo) error { 226 + path := c.makeRepoPath(repo) 227 + url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL) 228 + if err != nil { 229 + return fmt.Errorf("constructing repo remote url: %w", err) 230 + } 231 + 232 + return c.fetch(ctx, path, url) 233 + } 234 + 235 + func (c *GoGitMirrorManager) fetch(ctx context.Context, path, url string) error { 236 + gr, err := git.PlainOpen(path) 237 + if err != nil { 238 + return fmt.Errorf("opening local repo: %w", err) 239 + } 240 + if err := gr.FetchContext(ctx, &git.FetchOptions{ 241 + RemoteURL: url, 242 + RefSpecs: []gitconfig.RefSpec{gitconfig.RefSpec("+refs/*:refs/*")}, 243 + Force: true, 244 + Prune: true, 245 + }); err != nil { 246 + return fmt.Errorf("fetching reppo: %w", err) 247 + } 248 + return nil 249 + } 250 + 251 + func (c *GoGitMirrorManager) Sync(ctx context.Context, repo *models.Repo) error { 252 + path := c.makeRepoPath(repo) 253 + url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL) 254 + if err != nil { 255 + return fmt.Errorf("constructing repo remote url: %w", err) 256 + } 257 + 258 + exist, err := isDir(path) 259 + if err != nil { 260 + return fmt.Errorf("checking repo path: %w", err) 261 + } 262 + if !exist { 263 + if err := c.clone(ctx, path, url); err != nil { 264 + return fmt.Errorf("cloning repo: %w", err) 265 + } 266 + } else { 267 + if err := c.fetch(ctx, path, url); err != nil { 268 + return fmt.Errorf("fetching repo: %w", err) 269 + } 270 + } 271 + return nil 272 + } 273 + 274 + func makeRepoRemoteUrl(knot, didSlashRepo string, knotUseSSL bool) (string, error) { 275 + if !strings.Contains(knot, "://") { 276 + if knotUseSSL { 277 + knot = "https://" + knot 278 + } else { 279 + knot = "http://" + knot 280 + } 281 + } 282 + 283 + u, err := url.Parse(knot) 284 + if err != nil { 285 + return "", err 286 + } 287 + 288 + if u.Scheme != "http" && u.Scheme != "https" { 289 + return "", fmt.Errorf("unsupported scheme: %s", u.Scheme) 290 + } 291 + 292 + u = u.JoinPath(didSlashRepo) 293 + return u.String(), nil 294 + } 295 + 296 + func isDir(path string) (bool, error) { 297 + info, err := os.Stat(path) 298 + if err == nil && info.IsDir() { 299 + return true, nil 300 + } 301 + if os.IsNotExist(err) { 302 + return false, nil 303 + } 304 + return false, err 305 + }
+117
knotmirror/knotmirror.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "net/http" 7 + _ "net/http/pprof" 8 + "time" 9 + 10 + "github.com/prometheus/client_golang/prometheus/promhttp" 11 + "tangled.org/core/knotmirror/config" 12 + "tangled.org/core/knotmirror/db" 13 + "tangled.org/core/knotmirror/knotstream" 14 + "tangled.org/core/knotmirror/models" 15 + "tangled.org/core/log" 16 + ) 17 + 18 + func Run(ctx context.Context, cfg *config.Config) error { 19 + // make sure every services are cleaned up on fast return 20 + ctx, cancel := context.WithCancel(ctx) 21 + defer cancel() 22 + 23 + logger := log.FromContext(ctx) 24 + 25 + db, err := db.Make(ctx, cfg.DbUrl, 32) 26 + if err != nil { 27 + return fmt.Errorf("initializing db: %w", err) 28 + } 29 + 30 + // NOTE: using plain git-cli for clone/fetch as go-git is too memory-intensive. 31 + gitm := NewCliGitMirrorManager(cfg.GitRepoBasePath, cfg.KnotUseSSL) 32 + 33 + res, err := db.ExecContext(ctx, 34 + `update repos set state = $1 where state = $2`, 35 + models.RepoStateDesynchronized, 36 + models.RepoStateResyncing, 37 + ) 38 + if err != nil { 39 + return fmt.Errorf("clearing resyning states: %w", err) 40 + } 41 + rows, err := res.RowsAffected() 42 + if err != nil { 43 + return fmt.Errorf("getting affected rows: %w", err) 44 + } 45 + logger.Info(fmt.Sprintf("clearing resyning states: %d records updated", rows)) 46 + 47 + knotstream := knotstream.NewKnotStream(logger, db, cfg) 48 + crawler := NewCrawler(logger, db) 49 + resyncer := NewResyncer(logger, db, gitm, cfg) 50 + adminpage := NewAdminServer(logger, db, resyncer) 51 + 52 + // maintain repository list with tap 53 + // NOTE: this can be removed once we introduce did-for-repo because then we can just listen to KnotStream for #identity events. 54 + tap := NewTapClient(logger, cfg, db, gitm, knotstream) 55 + 56 + // start metrics endpoint 57 + go func() { 58 + metricsAddr := cfg.MetricsListen 59 + logger.Info("starting metrics server", "addr", metricsAddr) 60 + http.Handle("/metrics", promhttp.Handler()) 61 + if err := http.ListenAndServe(metricsAddr, nil); err != nil { 62 + logger.Error("metrics server failed", "error", err) 63 + } 64 + }() 65 + 66 + // start admin page endpoint 67 + go func() { 68 + logger.Info("starting admin server", "addr", cfg.AdminListen) 69 + if err := http.ListenAndServe(cfg.AdminListen, adminpage.Router()); err != nil { 70 + logger.Error("admin server failed", "error", err) 71 + } 72 + }() 73 + 74 + tap.Start(ctx) 75 + 76 + resyncer.Start(ctx) 77 + 78 + // periodically crawl the entire network to mirror the repositories 79 + crawler.Start(ctx) 80 + 81 + // listen to knotstream (currently we don't have relay for knots, so subscribe every known knots) 82 + knotstream.Start(ctx) 83 + 84 + svcErr := make(chan error, 1) 85 + if err := knotstream.ResubscribeAllHosts(ctx); err != nil { 86 + svcErr <- fmt.Errorf("resubscribing known hosts: %w", err) 87 + } 88 + 89 + logger.Info("startup complete") 90 + select { 91 + case <-ctx.Done(): 92 + logger.Info("received shutdown signal", "reason", ctx.Err()) 93 + case err := <-svcErr: 94 + if err != nil { 95 + logger.Error("service error", "error", err) 96 + } 97 + cancel() 98 + } 99 + 100 + logger.Info("shutting down knotmirror") 101 + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 10*time.Second) 102 + defer shutdownCancel() 103 + 104 + var errs []error 105 + if err := knotstream.Shutdown(shutdownCtx); err != nil { 106 + errs = append(errs, err) 107 + } 108 + if err := db.Close(); err != nil { 109 + errs = append(errs, err) 110 + } 111 + for _, err := range errs { 112 + logger.Error("error during shutdown", "err", err) 113 + } 114 + 115 + logger.Info("shutdown complete") 116 + return nil 117 + }
+88
knotmirror/knotstream/knotstream.go
··· 1 + package knotstream 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "fmt" 7 + "log/slog" 8 + "time" 9 + 10 + "tangled.org/core/knotmirror/config" 11 + "tangled.org/core/knotmirror/db" 12 + "tangled.org/core/knotmirror/models" 13 + "tangled.org/core/log" 14 + ) 15 + 16 + type KnotStream struct { 17 + logger *slog.Logger 18 + db *sql.DB 19 + slurper *KnotSlurper 20 + } 21 + 22 + func NewKnotStream(l *slog.Logger, db *sql.DB, cfg *config.Config) *KnotStream { 23 + l = log.SubLogger(l, "knotstream") 24 + return &KnotStream{ 25 + logger: l, 26 + db: db, 27 + slurper: NewKnotSlurper(l, db, cfg.Slurper), 28 + } 29 + } 30 + 31 + func (s *KnotStream) Start(ctx context.Context) { 32 + go s.slurper.Run(ctx) 33 + } 34 + 35 + func (s *KnotStream) Shutdown(ctx context.Context) error { 36 + return s.slurper.Shutdown(ctx) 37 + } 38 + 39 + func (s *KnotStream) CheckIfSubscribed(hostname string) bool { 40 + return s.slurper.CheckIfSubscribed(hostname) 41 + } 42 + 43 + func (s *KnotStream) SubscribeHost(ctx context.Context, hostname string, noSSL bool) error { 44 + l := s.logger.With("hostname", hostname, "nossl", noSSL) 45 + l.Debug("subscribe") 46 + host, err := db.GetHost(ctx, s.db, hostname) 47 + if err != nil { 48 + return fmt.Errorf("loading host from db: %w", err) 49 + } 50 + 51 + if host == nil { 52 + host = &models.Host{ 53 + Hostname: hostname, 54 + NoSSL: noSSL, 55 + Status: models.HostStatusActive, 56 + LastSeq: 0, 57 + } 58 + 59 + if err := db.UpsertHost(ctx, s.db, host); err != nil { 60 + return fmt.Errorf("adding host to db: %w", err) 61 + } 62 + 63 + l.Info("adding new host subscription") 64 + } 65 + 66 + if host.Status == models.HostStatusBanned { 67 + return fmt.Errorf("cannot subscribe to banned knot") 68 + } 69 + return s.slurper.Subscribe(ctx, *host) 70 + } 71 + 72 + func (s *KnotStream) ResubscribeAllHosts(ctx context.Context) error { 73 + hosts, err := db.ListHosts(ctx, s.db, models.HostStatusActive) 74 + if err != nil { 75 + return fmt.Errorf("listing hosts: %w", err) 76 + } 77 + 78 + for _, host := range hosts { 79 + l := s.logger.With("hostname", host.Hostname) 80 + l.Info("re-subscribing to active host") 81 + if err := s.slurper.Subscribe(ctx, host); err != nil { 82 + l.Warn("failed to re-subscribe to host", "err", err) 83 + } 84 + // sleep for a very short period, so we don't open tons of sockets at the same time 85 + time.Sleep(1 * time.Millisecond) 86 + } 87 + return nil 88 + }
+28
knotmirror/knotstream/metrics.go
··· 1 + package knotstream 2 + 3 + import ( 4 + "github.com/prometheus/client_golang/prometheus" 5 + "github.com/prometheus/client_golang/prometheus/promauto" 6 + ) 7 + 8 + // KnotStream metrics 9 + var ( 10 + knotstreamEventsReceived = promauto.NewCounter(prometheus.CounterOpts{ 11 + Name: "knotmirror_knotstream_events_received_total", 12 + Help: "Total number of events received from knotstream", 13 + }) 14 + knotstreamEventsProcessed = promauto.NewCounter(prometheus.CounterOpts{ 15 + Name: "knotmirror_knotstream_events_processed_total", 16 + Help: "Total number of events successfully processed", 17 + }) 18 + knotstreamEventsSkipped = promauto.NewCounter(prometheus.CounterOpts{ 19 + Name: "knotmirror_knotstream_events_skipped_total", 20 + Help: "Total number of events skipped (not tracked)", 21 + }) 22 + ) 23 + 24 + // slurper metrics 25 + var connectedInbound = promauto.NewGauge(prometheus.GaugeOpts{ 26 + Name: "knotmirror_connected_inbound", 27 + Help: "Number of inbound knotstream we are consuming", 28 + })
+102
knotmirror/knotstream/scheduler.go
··· 1 + package knotstream 2 + 3 + import ( 4 + "context" 5 + "log/slog" 6 + "sync" 7 + "sync/atomic" 8 + "time" 9 + 10 + "tangled.org/core/log" 11 + ) 12 + 13 + type ParallelScheduler struct { 14 + concurrency int 15 + 16 + do func(ctx context.Context, task *Task) error 17 + 18 + feeder chan *Task 19 + lk sync.Mutex 20 + scheduled map[string][]*Task 21 + lastSeq atomic.Int64 22 + 23 + logger *slog.Logger 24 + } 25 + 26 + type Task struct { 27 + key string 28 + message []byte 29 + } 30 + 31 + func NewParallelScheduler(maxC int, ident string, do func(context.Context, *Task) error) *ParallelScheduler { 32 + return &ParallelScheduler{ 33 + concurrency: maxC, 34 + do: do, 35 + feeder: make(chan *Task), 36 + scheduled: make(map[string][]*Task), 37 + logger: log.New("parallel-scheduler"), 38 + } 39 + } 40 + 41 + func (s *ParallelScheduler) Start(ctx context.Context) { 42 + for range s.concurrency { 43 + go s.ForEach(ctx, s.do) 44 + } 45 + } 46 + 47 + func (s *ParallelScheduler) AddTask(ctx context.Context, task *Task) { 48 + s.lk.Lock() 49 + if st, ok := s.scheduled[task.key]; ok { 50 + // schedule task 51 + s.scheduled[task.key] = append(st, task) 52 + s.lk.Unlock() 53 + return 54 + } 55 + s.scheduled[task.key] = []*Task{} 56 + s.lk.Unlock() 57 + 58 + select { 59 + case <-ctx.Done(): 60 + return 61 + case s.feeder <- task: 62 + return 63 + } 64 + } 65 + 66 + func (s *ParallelScheduler) ForEach(ctx context.Context, fn func(context.Context, *Task) error) { 67 + for task := range s.feeder { 68 + for task != nil { 69 + select { 70 + case <-ctx.Done(): 71 + return 72 + default: 73 + } 74 + if err := fn(ctx, task); err != nil { 75 + s.logger.Error("event handler failed", "err", err) 76 + } 77 + 78 + s.lk.Lock() 79 + func() { 80 + rem, ok := s.scheduled[task.key] 81 + if !ok { 82 + s.logger.Error("should always have an 'active' entry if a worker is processing a job") 83 + } 84 + if len(rem) == 0 { 85 + delete(s.scheduled, task.key) 86 + task = nil 87 + } else { 88 + task = rem[0] 89 + s.scheduled[task.key] = rem[1:] 90 + } 91 + 92 + // TODO: update seq from received message 93 + s.lastSeq.Store(time.Now().UnixNano()) 94 + }() 95 + s.lk.Unlock() 96 + } 97 + } 98 + } 99 + 100 + func (s *ParallelScheduler) LastSeq() int64 { 101 + return s.lastSeq.Load() 102 + }
+334
knotmirror/knotstream/slurper.go
··· 1 + package knotstream 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "encoding/json" 7 + "fmt" 8 + "log/slog" 9 + "math/rand" 10 + "net/http" 11 + "sync" 12 + "time" 13 + 14 + "github.com/bluesky-social/indigo/atproto/syntax" 15 + "github.com/bluesky-social/indigo/util/ssrf" 16 + "github.com/carlmjohnson/versioninfo" 17 + "github.com/gorilla/websocket" 18 + "tangled.org/core/api/tangled" 19 + "tangled.org/core/knotmirror/config" 20 + "tangled.org/core/knotmirror/db" 21 + "tangled.org/core/knotmirror/models" 22 + "tangled.org/core/log" 23 + ) 24 + 25 + type KnotSlurper struct { 26 + logger *slog.Logger 27 + db *sql.DB 28 + cfg config.SlurperConfig 29 + 30 + subsLk sync.Mutex 31 + subs map[string]*subscription 32 + } 33 + 34 + func NewKnotSlurper(l *slog.Logger, db *sql.DB, cfg config.SlurperConfig) *KnotSlurper { 35 + return &KnotSlurper{ 36 + logger: log.SubLogger(l, "slurper"), 37 + db: db, 38 + cfg: cfg, 39 + subs: make(map[string]*subscription), 40 + } 41 + } 42 + 43 + func (s *KnotSlurper) Run(ctx context.Context) { 44 + for { 45 + select { 46 + case <-ctx.Done(): 47 + return 48 + case <-time.After(s.cfg.PersistCursorPeriod): 49 + if err := s.persistCursors(ctx); err != nil { 50 + s.logger.Error("failed to flush cursors", "err", err) 51 + } 52 + } 53 + } 54 + } 55 + 56 + func (s *KnotSlurper) CheckIfSubscribed(hostname string) bool { 57 + s.subsLk.Lock() 58 + defer s.subsLk.Unlock() 59 + 60 + _, ok := s.subs[hostname] 61 + return ok 62 + } 63 + 64 + func (s *KnotSlurper) Shutdown(ctx context.Context) error { 65 + s.logger.Info("starting shutdown host cursor flush") 66 + err := s.persistCursors(ctx) 67 + if err != nil { 68 + s.logger.Error("shutdown error", "err", err) 69 + } 70 + s.logger.Info("slurper shutdown complete") 71 + return err 72 + } 73 + 74 + func (s *KnotSlurper) persistCursors(ctx context.Context) error { 75 + // // gather cursor list from subscriptions and store them to DB 76 + // start := time.Now() 77 + 78 + s.subsLk.Lock() 79 + cursors := make([]models.HostCursor, len(s.subs)) 80 + i := 0 81 + for _, sub := range s.subs { 82 + cursors[i] = sub.HostCursor() 83 + i++ 84 + } 85 + s.subsLk.Unlock() 86 + 87 + err := db.StoreCursors(ctx, s.db, cursors) 88 + // s.logger.Info("finished persisting cursors", "count", len(cursors), "duration", time.Since(start).String(), "err", err) 89 + return err 90 + } 91 + 92 + func (s *KnotSlurper) Subscribe(ctx context.Context, host models.Host) error { 93 + s.subsLk.Lock() 94 + defer s.subsLk.Unlock() 95 + 96 + _, ok := s.subs[host.Hostname] 97 + if ok { 98 + return fmt.Errorf("already subscribed: %s", host.Hostname) 99 + } 100 + 101 + // TODO: include `cancel` function to kill subscription by hostname 102 + sub := &subscription{ 103 + hostname: host.Hostname, 104 + scheduler: NewParallelScheduler( 105 + s.cfg.ConcurrencyPerHost, 106 + host.Hostname, 107 + s.ProcessEvent, 108 + ), 109 + } 110 + s.subs[host.Hostname] = sub 111 + 112 + sub.scheduler.Start(ctx) 113 + go s.subscribeWithRedialer(ctx, host, sub) 114 + return nil 115 + } 116 + 117 + func (s *KnotSlurper) subscribeWithRedialer(ctx context.Context, host models.Host, sub *subscription) { 118 + l := s.logger.With("host", host.Hostname) 119 + 120 + dialer := websocket.Dialer{ 121 + HandshakeTimeout: time.Second * 5, 122 + } 123 + 124 + // if this isn't a localhost / private connection, then we should enable SSRF protections 125 + if !host.NoSSL { 126 + netDialer := ssrf.PublicOnlyDialer() 127 + dialer.NetDialContext = netDialer.DialContext 128 + } 129 + 130 + cursor := host.LastSeq 131 + 132 + connectedInbound.Inc() 133 + defer connectedInbound.Dec() 134 + 135 + var backoff int 136 + for { 137 + select { 138 + case <-ctx.Done(): 139 + return 140 + default: 141 + } 142 + u := host.LegacyEventsURL(cursor) 143 + l.Debug("made url with cursor", "cursor", cursor, "url", u) 144 + 145 + // NOTE: manual backoff retry implementation to explicitly handle fails 146 + hdr := make(http.Header) 147 + hdr.Add("User-Agent", userAgent()) 148 + conn, resp, err := dialer.DialContext(ctx, u, hdr) 149 + if err != nil { 150 + l.Warn("dialing failed", "err", err, "backoff", backoff) 151 + time.Sleep(sleepForBackoff(backoff)) 152 + backoff++ 153 + if backoff > 30 { 154 + l.Warn("host does not appear to be online, disabling for now") 155 + host.Status = models.HostStatusOffline 156 + if err := db.UpsertHost(ctx, s.db, &host); err != nil { 157 + l.Error("failed to update host status", "err", err) 158 + } 159 + return 160 + } 161 + continue 162 + } 163 + 164 + l.Debug("knot event subscription response", "code", resp.StatusCode, "url", u) 165 + 166 + if err := s.handleConnection(ctx, conn, sub); err != nil { 167 + // TODO: measure the last N connection error times and if they're coming too fast reconnect slower or don't reconnect and wait for requestCrawl 168 + l.Warn("host connection failed", "err", err, "backoff", backoff) 169 + } 170 + 171 + updatedCursor := sub.LastSeq() 172 + didProgress := updatedCursor > cursor 173 + l.Debug("cursor compare", "cursor", cursor, "updatedCursor", updatedCursor, "didProgress", didProgress) 174 + if cursor == 0 || didProgress { 175 + cursor = updatedCursor 176 + backoff = 0 177 + 178 + batch := []models.HostCursor{sub.HostCursor()} 179 + if err := db.StoreCursors(ctx, s.db, batch); err != nil { 180 + l.Error("failed to store cursors", "err", err) 181 + } 182 + } 183 + } 184 + } 185 + 186 + // handleConnection handles websocket connection. 187 + // Schedules task from received event and return when connection is closed 188 + func (s *KnotSlurper) handleConnection(ctx context.Context, conn *websocket.Conn, sub *subscription) error { 189 + // ping on every 30s 190 + ctx, cancel := context.WithCancel(ctx) 191 + defer cancel() // close the background ping job on connection close 192 + go func() { 193 + t := time.NewTicker(30 * time.Second) 194 + defer t.Stop() 195 + failcount := 0 196 + 197 + for { 198 + select { 199 + case <-t.C: 200 + if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second*10)); err != nil { 201 + s.logger.Warn("failed to ping", "err", err) 202 + failcount++ 203 + if failcount >= 4 { 204 + s.logger.Error("too many ping fails", "count", failcount) 205 + _ = conn.Close() 206 + return 207 + } 208 + } else { 209 + failcount = 0 // ok ping 210 + } 211 + case <-ctx.Done(): 212 + _ = conn.Close() 213 + return 214 + } 215 + } 216 + }() 217 + 218 + conn.SetPingHandler(func(message string) error { 219 + err := conn.WriteControl(websocket.PongMessage, []byte(message), time.Now().Add(time.Minute)) 220 + if err == websocket.ErrCloseSent { 221 + return nil 222 + } 223 + return err 224 + }) 225 + conn.SetPongHandler(func(_ string) error { 226 + if err := conn.SetReadDeadline(time.Now().Add(time.Minute)); err != nil { 227 + s.logger.Error("failed to set read deadline", "err", err) 228 + } 229 + return nil 230 + }) 231 + 232 + for { 233 + select { 234 + case <-ctx.Done(): 235 + return ctx.Err() 236 + default: 237 + } 238 + msgType, msg, err := conn.ReadMessage() 239 + if err != nil { 240 + return err 241 + } 242 + 243 + if msgType != websocket.TextMessage { 244 + continue 245 + } 246 + 247 + sub.scheduler.AddTask(ctx, &Task{ 248 + key: sub.hostname, // TODO: replace to repository AT-URI for better concurrency 249 + message: msg, 250 + }) 251 + } 252 + } 253 + 254 + type LegacyGitEvent struct { 255 + Rkey string 256 + Nsid string 257 + Event tangled.GitRefUpdate 258 + } 259 + 260 + func (s *KnotSlurper) ProcessEvent(ctx context.Context, task *Task) error { 261 + var legacyMessage LegacyGitEvent 262 + if err := json.Unmarshal(task.message, &legacyMessage); err != nil { 263 + return fmt.Errorf("unmarshaling message: %w", err) 264 + } 265 + 266 + if err := s.ProcessLegacyGitRefUpdate(ctx, &legacyMessage); err != nil { 267 + return fmt.Errorf("processing gitRefUpdate: %w", err) 268 + } 269 + return nil 270 + } 271 + 272 + func (s *KnotSlurper) ProcessLegacyGitRefUpdate(ctx context.Context, evt *LegacyGitEvent) error { 273 + knotstreamEventsReceived.Inc() 274 + 275 + curr, err := db.GetRepoByName(ctx, s.db, syntax.DID(evt.Event.RepoDid), evt.Event.RepoName) 276 + if err != nil { 277 + return fmt.Errorf("failed to get repo '%s': %w", evt.Event.RepoDid+"/"+evt.Event.RepoName, err) 278 + } 279 + if curr == nil { 280 + // if repo doesn't exist in DB, just ignore the event. That repo is unknown. 281 + // 282 + // Normally did+name is already enough to perform git-fetch as that's 283 + // what needed to fetch the repository. 284 + // But we want to store that in did/rkey in knot-mirror. 285 + // Therefore, we should ignore when the repository is unknown. 286 + // Hopefully crawler will sync it later. 287 + s.logger.Warn("skipping event from unknown repo", "did/repo", evt.Event.RepoDid+"/"+evt.Event.RepoName) 288 + knotstreamEventsSkipped.Inc() 289 + return nil 290 + } 291 + l := s.logger.With("repoAt", curr.AtUri()) 292 + 293 + // TODO: should plan resync to resyncBuffer on RepoStateResyncing 294 + if curr.State != models.RepoStateActive { 295 + l.Debug("skipping non-active repo") 296 + knotstreamEventsSkipped.Inc() 297 + return nil 298 + } 299 + 300 + if curr.GitRev != "" && evt.Rkey <= curr.GitRev.String() { 301 + l.Debug("skipping replayed event", "event.Rkey", evt.Rkey, "currentRev", curr.GitRev) 302 + knotstreamEventsSkipped.Inc() 303 + return nil 304 + } 305 + 306 + // if curr.State == models.RepoStateResyncing { 307 + // firehoseEventsSkipped.Inc() 308 + // return fp.events.addToResyncBuffer(ctx, commit) 309 + // } 310 + 311 + // can't skip anything, update repo state 312 + if err := db.UpdateRepoState(ctx, s.db, curr.Did, curr.Rkey, models.RepoStateDesynchronized); err != nil { 313 + return err 314 + } 315 + 316 + l.Info("event processed", "eventRev", evt.Rkey) 317 + 318 + knotstreamEventsProcessed.Inc() 319 + return nil 320 + } 321 + 322 + func userAgent() string { 323 + return fmt.Sprintf("knotmirror/%s", versioninfo.Short()) 324 + } 325 + 326 + func sleepForBackoff(b int) time.Duration { 327 + if b == 0 { 328 + return 0 329 + } 330 + if b < 10 { 331 + return time.Millisecond * time.Duration((50*b)+rand.Intn(500)) 332 + } 333 + return time.Second * 30 334 + }
+22
knotmirror/knotstream/subscription.go
··· 1 + package knotstream 2 + 3 + import "tangled.org/core/knotmirror/models" 4 + 5 + // subscription represents websocket connection with that host 6 + type subscription struct { 7 + hostname string 8 + 9 + // embedded parallel job scheduler 10 + scheduler *ParallelScheduler 11 + } 12 + 13 + func (s *subscription) LastSeq() int64 { 14 + return s.scheduler.LastSeq() 15 + } 16 + 17 + func (s *subscription) HostCursor() models.HostCursor { 18 + return models.HostCursor{ 19 + Hostname: s.hostname, 20 + LastSeq: s.LastSeq(), 21 + } 22 + }
+29
knotmirror/metrics.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "github.com/prometheus/client_golang/prometheus" 5 + "github.com/prometheus/client_golang/prometheus/promauto" 6 + ) 7 + 8 + // Resync metrics 9 + var ( 10 + // TODO: 11 + // - working / waiting resycner counts 12 + resyncsStarted = promauto.NewCounter(prometheus.CounterOpts{ 13 + Name: "knotmirror_resyncs_started_total", 14 + Help: "Total number of repo resyncs started", 15 + }) 16 + resyncsCompleted = promauto.NewCounter(prometheus.CounterOpts{ 17 + Name: "knotmirror_resyncs_completed_total", 18 + Help: "Total number of repo resyncs completed", 19 + }) 20 + resyncsFailed = promauto.NewCounter(prometheus.CounterOpts{ 21 + Name: "knotmirror_resyncs_failed_total", 22 + Help: "Total number of repo resyncs failed", 23 + }) 24 + resyncDuration = promauto.NewHistogram(prometheus.HistogramOpts{ 25 + Name: "knotmirror_resync_duration_seconds", 26 + Help: "Duration of repo resync operations", 27 + Buckets: prometheus.ExponentialBuckets(0.1, 2, 12), 28 + }) 29 + )
+110
knotmirror/models/models.go
··· 1 + package models 2 + 3 + import ( 4 + "fmt" 5 + 6 + "github.com/bluesky-social/indigo/atproto/syntax" 7 + "tangled.org/core/api/tangled" 8 + ) 9 + 10 + type Repo struct { 11 + Did syntax.DID 12 + Rkey syntax.RecordKey 13 + Cid *syntax.CID 14 + // content of tangled.Repo 15 + Name string 16 + KnotDomain string 17 + 18 + GitRev syntax.TID // last processed git.refUpdate revision 19 + RepoSha string // sha256 sum of git refs (to avoid no-op git fetch) 20 + State RepoState 21 + ErrorMsg string 22 + RetryCount int 23 + RetryAfter int64 // Unix timestamp (seconds) 24 + } 25 + 26 + func (r *Repo) AtUri() syntax.ATURI { 27 + return syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", r.Did, tangled.RepoNSID, r.Rkey)) 28 + } 29 + 30 + func (r *Repo) DidSlashRepo() string { 31 + return fmt.Sprintf("%s/%s", r.Did, r.Name) 32 + } 33 + 34 + type RepoState string 35 + 36 + const ( 37 + RepoStatePending RepoState = "pending" 38 + RepoStateDesynchronized RepoState = "desynchronized" 39 + RepoStateResyncing RepoState = "resyncing" 40 + RepoStateActive RepoState = "active" 41 + RepoStateSuspended RepoState = "suspended" 42 + RepoStateError RepoState = "error" 43 + ) 44 + 45 + var AllRepoStates = []RepoState{ 46 + RepoStatePending, 47 + RepoStateDesynchronized, 48 + RepoStateResyncing, 49 + RepoStateActive, 50 + RepoStateSuspended, 51 + RepoStateError, 52 + } 53 + 54 + func (s RepoState) IsResyncing() bool { 55 + return s == RepoStateResyncing 56 + } 57 + 58 + type HostCursor struct { 59 + Hostname string 60 + LastSeq int64 61 + } 62 + 63 + type Host struct { 64 + Hostname string 65 + NoSSL bool 66 + Status HostStatus 67 + LastSeq int64 68 + } 69 + 70 + type HostStatus string 71 + 72 + const ( 73 + HostStatusActive HostStatus = "active" 74 + HostStatusIdle HostStatus = "idle" 75 + HostStatusOffline HostStatus = "offline" 76 + HostStatusThrottled HostStatus = "throttled" 77 + HostStatusBanned HostStatus = "banned" 78 + ) 79 + 80 + var AllHostStatuses = []HostStatus{ 81 + HostStatusActive, 82 + HostStatusIdle, 83 + HostStatusOffline, 84 + HostStatusThrottled, 85 + HostStatusBanned, 86 + } 87 + 88 + // func (h *Host) SubscribeGitRefsURL(cursor int64) string { 89 + // scheme := "wss" 90 + // if h.NoSSL { 91 + // scheme = "ws" 92 + // } 93 + // u := fmt.Sprintf("%s://%s/xrpc/%s", scheme, h.Hostname, tangled.SubscribeGitRefsNSID) 94 + // if cursor > 0 { 95 + // u = fmt.Sprintf("%s?cursor=%d", u, h.LastSeq) 96 + // } 97 + // return u 98 + // } 99 + 100 + func (h *Host) LegacyEventsURL(cursor int64) string { 101 + scheme := "wss" 102 + if h.NoSSL { 103 + scheme = "ws" 104 + } 105 + u := fmt.Sprintf("%s://%s/events", scheme, h.Hostname) 106 + if cursor > 0 { 107 + u = fmt.Sprintf("%s?cursor=%d", u, cursor) 108 + } 109 + return u 110 + }
+8
knotmirror/readme.md
··· 1 + # KnotMirror 2 + 3 + KnotMirror is a git mirror service for all known repos. Heavily inspired by [indigo/relay] and [indigo/tap]. 4 + 5 + KnotMirror syncs repo list using tap and subscribe to all known knots as KnotStream. 6 + 7 + [indigo/relay]: https://github.com/bluesky-social/indigo/tree/main/cmd/relay 8 + [indigo/tap]: https://github.com/bluesky-social/indigo/tree/main/cmd/tap
+273
knotmirror/resyncer.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "errors" 7 + "fmt" 8 + "log/slog" 9 + "math/rand" 10 + "strings" 11 + "sync" 12 + "time" 13 + 14 + "github.com/bluesky-social/indigo/atproto/syntax" 15 + "tangled.org/core/knotmirror/config" 16 + "tangled.org/core/knotmirror/db" 17 + "tangled.org/core/knotmirror/models" 18 + "tangled.org/core/log" 19 + ) 20 + 21 + type Resyncer struct { 22 + logger *slog.Logger 23 + db *sql.DB 24 + gitm GitMirrorManager 25 + 26 + claimJobMu sync.Mutex 27 + 28 + runningJobs map[syntax.ATURI]context.CancelFunc 29 + runningJobsMu sync.Mutex 30 + 31 + repoFetchTimeout time.Duration 32 + manualResyncTimeout time.Duration 33 + parallelism int 34 + } 35 + 36 + func NewResyncer(l *slog.Logger, db *sql.DB, gitm GitMirrorManager, cfg *config.Config) *Resyncer { 37 + return &Resyncer{ 38 + logger: log.SubLogger(l, "resyncer"), 39 + db: db, 40 + gitm: gitm, 41 + 42 + runningJobs: make(map[syntax.ATURI]context.CancelFunc), 43 + 44 + repoFetchTimeout: cfg.GitRepoFetchTimeout, 45 + manualResyncTimeout: 30 * time.Minute, 46 + parallelism: cfg.ResyncParallelism, 47 + } 48 + } 49 + 50 + func (r *Resyncer) Start(ctx context.Context) { 51 + for i := 0; i < r.parallelism; i++ { 52 + go r.runResyncWorker(ctx, i) 53 + } 54 + } 55 + 56 + func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) { 57 + l := r.logger.With("worker", workerID) 58 + for { 59 + select { 60 + case <-ctx.Done(): 61 + l.Info("resync worker shutting down", "error", ctx.Err()) 62 + return 63 + default: 64 + } 65 + repoAt, found, err := r.claimResyncJob(ctx) 66 + if err != nil { 67 + l.Error("failed to claim resync job", "error", err) 68 + time.Sleep(time.Second) 69 + continue 70 + } 71 + if !found { 72 + time.Sleep(time.Second) 73 + continue 74 + } 75 + l.Info("processing resync", "aturi", repoAt) 76 + if err := r.resyncRepo(ctx, repoAt); err != nil { 77 + l.Error("resync failed", "aturi", repoAt, "error", err) 78 + } 79 + } 80 + } 81 + 82 + func (r *Resyncer) registerRunning(repo syntax.ATURI, cancel context.CancelFunc) { 83 + r.runningJobsMu.Lock() 84 + defer r.runningJobsMu.Unlock() 85 + 86 + if _, exists := r.runningJobs[repo]; exists { 87 + return 88 + } 89 + r.runningJobs[repo] = cancel 90 + } 91 + 92 + func (r *Resyncer) unregisterRunning(repo syntax.ATURI) { 93 + r.runningJobsMu.Lock() 94 + defer r.runningJobsMu.Unlock() 95 + 96 + delete(r.runningJobs, repo) 97 + } 98 + 99 + func (r *Resyncer) CancelResyncJob(repo syntax.ATURI) { 100 + r.runningJobsMu.Lock() 101 + defer r.runningJobsMu.Unlock() 102 + 103 + cancel, ok := r.runningJobs[repo] 104 + if !ok { 105 + return 106 + } 107 + delete(r.runningJobs, repo) 108 + cancel() 109 + } 110 + 111 + // TriggerResyncJob manually triggers the resync job 112 + func (r *Resyncer) TriggerResyncJob(ctx context.Context, repoAt syntax.ATURI) error { 113 + repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 114 + if err != nil { 115 + return fmt.Errorf("failed to get repo: %w", err) 116 + } 117 + if repo == nil { 118 + return fmt.Errorf("repo not found: %s", repoAt) 119 + } 120 + 121 + if repo.State == models.RepoStateResyncing { 122 + return fmt.Errorf("repo already resyncing") 123 + } 124 + 125 + repo.State = models.RepoStatePending 126 + repo.RetryAfter = -1 // resyncer will prioritize this 127 + 128 + if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 129 + return fmt.Errorf("updating repo state to pending %w", err) 130 + } 131 + return nil 132 + } 133 + 134 + func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.ATURI, bool, error) { 135 + // use mutex to prevent duplicated jobs 136 + r.claimJobMu.Lock() 137 + defer r.claimJobMu.Unlock() 138 + 139 + var repoAt syntax.ATURI 140 + now := time.Now().Unix() 141 + if err := r.db.QueryRowContext(ctx, 142 + `update repos 143 + set state = $1 144 + where at_uri = ( 145 + select at_uri from repos 146 + where state in ($2, $3, $4) 147 + and (retry_after = -1 or retry_after = 0 or retry_after < $5) 148 + order by 149 + (retry_after = -1) desc, 150 + (retry_after = 0) desc, 151 + retry_after 152 + limit 1 153 + ) 154 + returning at_uri 155 + `, 156 + models.RepoStateResyncing, 157 + models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError, 158 + now, 159 + ).Scan(&repoAt); err != nil { 160 + if errors.Is(err, sql.ErrNoRows) { 161 + return "", false, nil 162 + } 163 + return "", false, err 164 + } 165 + 166 + return repoAt, true, nil 167 + } 168 + 169 + func (r *Resyncer) resyncRepo(ctx context.Context, repoAt syntax.ATURI) error { 170 + // ctx, span := tracer.Start(ctx, "resyncRepo") 171 + // span.SetAttributes(attribute.String("aturi", repoAt)) 172 + // defer span.End() 173 + 174 + resyncsStarted.Inc() 175 + startTime := time.Now() 176 + 177 + jobCtx, cancel := context.WithCancel(ctx) 178 + r.registerRunning(repoAt, cancel) 179 + defer r.unregisterRunning(repoAt) 180 + 181 + success, err := r.doResync(jobCtx, repoAt) 182 + if !success { 183 + resyncsFailed.Inc() 184 + resyncDuration.Observe(time.Since(startTime).Seconds()) 185 + return r.handleResyncFailure(ctx, repoAt, err) 186 + } 187 + 188 + resyncsCompleted.Inc() 189 + resyncDuration.Observe(time.Since(startTime).Seconds()) 190 + return nil 191 + } 192 + 193 + func (r *Resyncer) doResync(ctx context.Context, repoAt syntax.ATURI) (bool, error) { 194 + // ctx, span := tracer.Start(ctx, "doResync") 195 + // span.SetAttributes(attribute.String("aturi", repoAt)) 196 + // defer span.End() 197 + 198 + repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 199 + if err != nil { 200 + return false, fmt.Errorf("failed to get repo: %w", err) 201 + } 202 + if repo == nil { // untracked repo, skip 203 + return false, nil 204 + } 205 + 206 + // TODO: check if Knot is on backoff list. If so, return (false, nil) 207 + // TODO: detect rate limit error (http.StatusTooManyRequests) to put Knot in backoff list 208 + 209 + timeout := r.repoFetchTimeout 210 + if repo.RetryAfter == -1 { 211 + timeout = r.manualResyncTimeout 212 + } 213 + fetchCtx, cancel := context.WithTimeout(ctx, timeout) 214 + defer cancel() 215 + 216 + if err := r.gitm.Sync(fetchCtx, repo); err != nil { 217 + return false, err 218 + } 219 + 220 + // repo.GitRev = <processed git.refUpdate revision> 221 + // repo.RepoSha = <sha256 sum of git refs> 222 + repo.State = models.RepoStateActive 223 + repo.ErrorMsg = "" 224 + repo.RetryCount = 0 225 + repo.RetryAfter = 0 226 + if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 227 + return false, fmt.Errorf("updating repo state to active %w", err) 228 + } 229 + return true, nil 230 + } 231 + 232 + func (r *Resyncer) handleResyncFailure(ctx context.Context, repoAt syntax.ATURI, err error) error { 233 + r.logger.Debug("handleResyncFailure", "at_uri", repoAt, "err", err) 234 + var state models.RepoState 235 + var errMsg string 236 + if err == nil { 237 + state = models.RepoStateDesynchronized 238 + errMsg = "" 239 + } else { 240 + state = models.RepoStateError 241 + errMsg = err.Error() 242 + } 243 + 244 + repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 245 + if err != nil { 246 + return fmt.Errorf("failed to get repo: %w", err) 247 + } 248 + if repo == nil { 249 + return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt) 250 + } 251 + 252 + // start a 1 min & go up to 1 hr between retries 253 + var retryCount = repo.RetryCount + 1 254 + var retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix() 255 + 256 + // remove null bytes 257 + errMsg = strings.ReplaceAll(errMsg, "\x00", "") 258 + 259 + repo.State = state 260 + repo.ErrorMsg = errMsg 261 + repo.RetryCount = retryCount 262 + repo.RetryAfter = retryAfter 263 + if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 264 + return fmt.Errorf("failed to update repo state: %w", err) 265 + } 266 + return nil 267 + } 268 + 269 + func backoff(retries int, max int) time.Duration { 270 + dur := min(1<<retries, max) 271 + jitter := time.Millisecond * time.Duration(rand.Intn(1000)) 272 + return time.Second*time.Duration(dur) + jitter 273 + }
+152
knotmirror/tapclient.go
··· 1 + package knotmirror 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "encoding/json" 7 + "fmt" 8 + "log/slog" 9 + "net/netip" 10 + "net/url" 11 + "time" 12 + 13 + "tangled.org/core/api/tangled" 14 + "tangled.org/core/knotmirror/config" 15 + "tangled.org/core/knotmirror/db" 16 + "tangled.org/core/knotmirror/knotstream" 17 + "tangled.org/core/knotmirror/models" 18 + "tangled.org/core/log" 19 + "tangled.org/core/tapc" 20 + ) 21 + 22 + type Tap struct { 23 + logger *slog.Logger 24 + cfg *config.Config 25 + tap tapc.Client 26 + db *sql.DB 27 + gitm GitMirrorManager 28 + ks *knotstream.KnotStream 29 + } 30 + 31 + func NewTapClient(l *slog.Logger, cfg *config.Config, db *sql.DB, gitm GitMirrorManager, ks *knotstream.KnotStream) *Tap { 32 + return &Tap{ 33 + logger: log.SubLogger(l, "tapclient"), 34 + cfg: cfg, 35 + tap: tapc.NewClient(cfg.TapUrl, ""), 36 + db: db, 37 + gitm: gitm, 38 + ks: ks, 39 + } 40 + } 41 + 42 + func (t *Tap) Start(ctx context.Context) { 43 + // TODO: better reconnect logic 44 + go func() { 45 + for { 46 + t.tap.Connect(ctx, &tapc.SimpleIndexer{ 47 + EventHandler: t.processEvent, 48 + }) 49 + time.Sleep(time.Second) 50 + } 51 + }() 52 + } 53 + 54 + func (t *Tap) processEvent(ctx context.Context, evt tapc.Event) error { 55 + l := t.logger.With("component", "tapIndexer") 56 + 57 + var err error 58 + switch evt.Type { 59 + case tapc.EvtRecord: 60 + switch evt.Record.Collection.String() { 61 + case tangled.RepoNSID: 62 + err = t.processRepo(ctx, evt.Record) 63 + } 64 + } 65 + 66 + if err != nil { 67 + l.Error("failed to process message. will retry later", "event.ID", evt.ID, "err", err) 68 + return err 69 + } 70 + return nil 71 + } 72 + 73 + func (t *Tap) processRepo(ctx context.Context, evt *tapc.RecordEventData) error { 74 + switch evt.Action { 75 + case tapc.RecordCreateAction, tapc.RecordUpdateAction: 76 + record := tangled.Repo{} 77 + if err := json.Unmarshal(evt.Record, &record); err != nil { 78 + return fmt.Errorf("parsing record: %w", err) 79 + } 80 + 81 + status := models.RepoStatePending 82 + errMsg := "" 83 + u, err := url.Parse("http://" + record.Knot) // parsing with fake scheme 84 + if err != nil { 85 + status = models.RepoStateSuspended 86 + errMsg = "failed to parse knot url" 87 + } else if t.cfg.KnotSSRF && isPrivate(u.Hostname()) { 88 + status = models.RepoStateSuspended 89 + errMsg = "suspending non-public knot" 90 + } 91 + 92 + repo := &models.Repo{ 93 + Did: evt.Did, 94 + Rkey: evt.Rkey, 95 + Cid: evt.CID, 96 + Name: record.Name, 97 + KnotDomain: record.Knot, 98 + State: status, 99 + ErrorMsg: errMsg, 100 + RetryAfter: 0, // clear retry info 101 + RetryCount: 0, 102 + } 103 + 104 + if evt.Action == tapc.RecordUpdateAction { 105 + exist, err := t.gitm.Exist(repo) 106 + if err != nil { 107 + return fmt.Errorf("checking git repo existance: %w", err) 108 + } 109 + if exist { 110 + // update git repo remote url 111 + if err := t.gitm.RemoteSetUrl(ctx, repo); err != nil { 112 + return fmt.Errorf("updating git repo remote url: %w", err) 113 + } 114 + } 115 + } 116 + 117 + if err := db.UpsertRepo(ctx, t.db, repo); err != nil { 118 + return fmt.Errorf("upserting repo to db: %w", err) 119 + } 120 + 121 + if !t.ks.CheckIfSubscribed(record.Knot) { 122 + if err := t.ks.SubscribeHost(ctx, record.Knot, !t.cfg.KnotUseSSL); err != nil { 123 + return fmt.Errorf("subscribing to knot: %w", err) 124 + } 125 + } 126 + 127 + case tapc.RecordDeleteAction: 128 + if err := db.DeleteRepo(ctx, t.db, evt.Did, evt.Rkey); err != nil { 129 + return fmt.Errorf("deleting repo from db: %w", err) 130 + } 131 + } 132 + return nil 133 + } 134 + 135 + // isPrivate checks if host is private network. It doesn't perform DNS resolution 136 + func isPrivate(host string) bool { 137 + if host == "localhost" { 138 + return true 139 + } 140 + addr, err := netip.ParseAddr(host) 141 + if err != nil { 142 + return false 143 + } 144 + return isPrivateAddr(addr) 145 + } 146 + 147 + func isPrivateAddr(addr netip.Addr) bool { 148 + return addr.IsLoopback() || 149 + addr.IsPrivate() || 150 + addr.IsLinkLocalUnicast() || 151 + addr.IsLinkLocalMulticast() 152 + }
+55
knotmirror/templates/base.html
··· 1 + {{define "base"}} 2 + <!DOCTYPE html> 3 + <html> 4 + <head> 5 + <title>KnotMirror Admin</title> 6 + <script src="https://cdn.jsdelivr.net/npm/htmx.org@2.0.8/dist/htmx.min.js" integrity="sha384-/TgkGk7p307TH7EXJDuUlgG3Ce1UVolAOFopFekQkkXihi5u/6OCvVKyz1W+idaz" crossorigin="anonymous"></script> 7 + <style> 8 + nav { margin-bottom: 20px; border-bottom: 1px solid #ccc; padding: 10px 0; } 9 + nav a { margin-right: 15px; } 10 + table { width: 100%; border-collapse: collapse; } 11 + th, td { text-align: left; padding: 8px; border: 1px solid #ddd; } 12 + .pagination { margin-top: 20px; } 13 + .filters { background: #f4f4f4; padding: 15px; margin-bottom: 20px; } 14 + #notifications { 15 + position: fixed; 16 + bottom: 8px; 17 + right: 8px; 18 + z-index: 1000; 19 + pointer-events: none; 20 + } 21 + .notif { 22 + pointer-events: auto; 23 + background: #333; 24 + color: #fff; 25 + padding: 2px 4px; 26 + margin: 4px 0; 27 + opacity: 0.95; 28 + } 29 + .notif.warn { background: #ed6c02 } 30 + .notif.error { background: #d32f2f } 31 + </style> 32 + </head> 33 + <body> 34 + <nav> 35 + <a href="/repos">Repositories</a> 36 + <a href="/hosts">Knot Hosts</a> 37 + </nav> 38 + <main id="main"> 39 + {{template "content" .}} 40 + </main> 41 + <div id="notifications"></div> 42 + <script> 43 + document.body.addEventListener("htmx:oobBeforeSwap", (evt) => { 44 + evt.detail.fragment.querySelectorAll(".notif").forEach((el) => { 45 + console.debug("set timeout to notif element", el) 46 + setTimeout(() => { 47 + console.debug("clearing notif element", el); 48 + el.remove(); 49 + }, 10 * 1000); 50 + }); 51 + }); 52 + </script> 53 + </body> 54 + </html> 55 + {{end}}
+44
knotmirror/templates/hosts.html
··· 1 + {{template "base" .}} 2 + {{define "content"}} 3 + <h2>Knot Hosts</h2> 4 + 5 + <div class="filters"> 6 + <form 7 + hx-get="" 8 + hx-target="#table" 9 + hx-select="#table" 10 + hx-swap="outerHTML" 11 + hx-trigger="every 10s" 12 + > 13 + <select name="status"> 14 + {{ range const.AllHostStatuses }} 15 + <option value="{{.}}" {{ if eq $.FilterByStatus . }}selected{{end}}>{{.}}</option> 16 + {{ end }} 17 + </select> 18 + <button type="submit">Filter</button> 19 + </form> 20 + </div> 21 + 22 + <table id="table"> 23 + <thead> 24 + <tr> 25 + <th>Hostname</th> 26 + <th>SSL</th> 27 + <th>Status</th> 28 + <th>Last Seq</th> 29 + </tr> 30 + </thead> 31 + <tbody> 32 + {{range .Hosts}} 33 + <tr> 34 + <td>{{.Hostname}}</td> 35 + <td>{{if .NoSSL}}False{{else}}True{{end}}</td> 36 + <td>{{.Status}}</td> 37 + <td>{{.LastSeq}}</td> 38 + </tr> 39 + {{else}} 40 + <tr><td colspan="4">No hosts registered.</td></tr> 41 + {{end}} 42 + </tbody> 43 + </table> 44 + {{end}}
+86
knotmirror/templates/repos.html
··· 1 + {{template "base" .}} 2 + {{define "content"}} 3 + <h2>Repositories</h2> 4 + 5 + <div class="filters"> 6 + <form 7 + hx-get="" 8 + hx-target="#table" 9 + hx-select="#table" 10 + hx-swap="outerHTML" 11 + hx-trigger="every 10s" 12 + > 13 + <input type="text" name="did" placeholder="DID" value="{{.FilterByDid}}"> 14 + <input type="text" name="knot" placeholder="Knot Domain" value="{{.FilterByKnot}}"> 15 + <select name="state"> 16 + <option value="">-- State --</option> 17 + {{ range const.AllRepoStates }} 18 + <option value="{{.}}" {{ if eq $.FilterByState . }}selected{{end}}>{{.}}</option> 19 + {{ end }} 20 + </select> 21 + <button type="submit">Filter</button> 22 + <a href="/repos">Clear</a> 23 + </form> 24 + </div> 25 + 26 + <div id="table"> 27 + <div class="repo-state-indicators"> 28 + {{range const.AllRepoStates}} 29 + <span class="state-pill state-{{.}}"> 30 + {{.}}: {{index $.RepoCounts .}} 31 + </span> 32 + {{end}} 33 + </div> 34 + <table> 35 + <thead> 36 + <tr> 37 + <th>DID</th> 38 + <th>Name</th> 39 + <th>Knot</th> 40 + <th>State</th> 41 + <th>Retry</th> 42 + <th>Retry After</th> 43 + <th>Error Message</th> 44 + <th>Action</th> 45 + </tr> 46 + </thead> 47 + <tbody> 48 + {{range .Repos}} 49 + <tr> 50 + <td><code>{{.Did}}</code></td> 51 + <td>{{.Name}}</td> 52 + <td>{{.KnotDomain}}</td> 53 + <td><strong>{{.State}}</strong></td> 54 + <td>{{.RetryCount}}</td> 55 + <td>{{readt .RetryAfter}}</td> 56 + <td>{{.ErrorMsg}}</td> 57 + <td> 58 + <form 59 + {{ if .State.IsResyncing -}} 60 + hx-post="/api/cancelRepoResync" 61 + {{- else -}} 62 + hx-post="/api/triggerRepoResync" 63 + {{- end }} 64 + hx-swap="none" 65 + hx-disabled-elt="find button" 66 + > 67 + <input type="hidden" name="repo" value="{{.AtUri}}"> 68 + <button type="submit">{{ if .State.IsResyncing }}cancel{{ else }}resync{{ end }}</button> 69 + </form> 70 + </td> 71 + </tr> 72 + {{else}} 73 + <tr><td colspan="99">No repositories found.</td></tr> 74 + {{end}} 75 + </tbody> 76 + </table> 77 + </div> 78 + 79 + <div class="pagination"> 80 + {{if gt .Page 1}} 81 + <a href="?page={{sub .Page 1}}&did={{.FilterByDid}}&knot={{.FilterByKnot}}&state={{.FilterByState}}">« Previous</a> 82 + {{end}} 83 + <span>Page {{.Page}}</span> 84 + <a href="?page={{add .Page 1}}&did={{.FilterByDid}}&knot={{.FilterByKnot}}&state={{.FilterByState}}">Next »</a> 85 + </div> 86 + {{end}}
+29 -14
nix/gomod2nix.toml
··· 398 398 [mod."github.com/ipfs/go-metrics-interface"] 399 399 version = "v0.3.0" 400 400 hash = "sha256-b3tp3jxecLmJEGx2kW7MiKGlAKPEWg/LJ7hXylSC8jQ=" 401 + [mod."github.com/jackc/pgpassfile"] 402 + version = "v1.0.0" 403 + hash = "sha256-H0nFbC34/3pZUFnuiQk9W7yvAMh6qJDrqvHp+akBPLM=" 404 + [mod."github.com/jackc/pgservicefile"] 405 + version = "v0.0.0-20240606120523-5a60cdf6a761" 406 + hash = "sha256-ETpGsLAA2wcm5xJBayr/mZrCE1YsWbnkbSSX3ptrFn0=" 407 + [mod."github.com/jackc/pgx/v5"] 408 + version = "v5.8.0" 409 + hash = "sha256-Mq5/A/Obcceu6kKxUv30DPC2ZaVvD8Iq/YtmLm1BVec=" 410 + [mod."github.com/jackc/puddle/v2"] 411 + version = "v2.2.2" 412 + hash = "sha256-IUxdu4JYfsCh/qlz2SiUWu7EVPHhyooiVA4oaS2Z6yk=" 401 413 [mod."github.com/json-iterator/go"] 402 414 version = "v1.1.12" 403 415 hash = "sha256-To8A0h+lbfZ/6zM+2PpRpY3+L6725OPC66lffq6fUoM=" ··· 519 507 version = "v1.5.5" 520 508 hash = "sha256-ouhfDUCXsfpcgaCLfJE9oYprAQHuV61OJzb/aEhT0j8=" 521 509 [mod."github.com/prometheus/client_golang"] 522 - version = "v1.22.0" 523 - hash = "sha256-OJ/9rlWG1DIPQJAZUTzjykkX0o+f+4IKLvW8YityaMQ=" 510 + version = "v1.23.2" 511 + hash = "sha256-3GD4fBFa1tJu8MS4TNP6r2re2eViUE+kWUaieIOQXCg=" 524 512 [mod."github.com/prometheus/client_model"] 525 513 version = "v0.6.2" 526 514 hash = "sha256-q6Fh6v8iNJN9ypD47LjWmx66YITa3FyRjZMRsuRTFeQ=" 527 515 [mod."github.com/prometheus/common"] 528 - version = "v0.64.0" 529 - hash = "sha256-uy3KO60F2Cvhamz3fWQALGSsy13JiTk3NfpXgRuwqtI=" 516 + version = "v0.66.1" 517 + hash = "sha256-bqHPaV9IV70itx63wqwgy2PtxMN0sn5ThVxDmiD7+Tk=" 530 518 [mod."github.com/prometheus/procfs"] 531 519 version = "v0.16.1" 532 520 hash = "sha256-OBCvKlLW2obct35p0L9Q+1ZrxZjpTmbgHMP2rng9hpo=" ··· 559 547 version = "v0.0.0-20220730225603-2ab79fcdd4ef" 560 548 hash = "sha256-/XmSE/J+f6FLWXGvljh6uBK71uoCAK3h82XQEQ1Ki68=" 561 549 [mod."github.com/stretchr/testify"] 562 - version = "v1.10.0" 563 - hash = "sha256-fJ4gnPr0vnrOhjQYQwJ3ARDKPsOtA7d4olQmQWR+wpI=" 550 + version = "v1.11.1" 551 + hash = "sha256-sWfjkuKJyDllDEtnM8sb/pdLzPQmUYWYtmeWz/5suUc=" 564 552 [mod."github.com/tidwall/gjson"] 565 553 version = "v1.18.0" 566 554 hash = "sha256-CO6hqDu8Y58Po6A01e5iTpwiUBQ5khUZsw7czaJHw0I=" ··· 645 633 [mod."go.uber.org/zap"] 646 634 version = "v1.27.0" 647 635 hash = "sha256-8655KDrulc4Das3VRduO9MjCn8ZYD5WkULjCvruaYsU=" 636 + [mod."go.yaml.in/yaml/v2"] 637 + version = "v2.4.2" 638 + hash = "sha256-oC8RWdf1zbMYCtmR0ATy/kCkhIwPR9UqFZSMOKLVF/A=" 648 639 [mod."golang.org/x/crypto"] 649 - version = "v0.40.0" 650 - hash = "sha256-I6p2fqvz63P9MwAuoQrljI7IUbfZQvCem0ii4Q2zZng=" 640 + version = "v0.41.0" 641 + hash = "sha256-o5Di0lsFmYnXl7a5MBTqmN9vXMCRpE9ay71C1Ar8jEY=" 651 642 [mod."golang.org/x/exp"] 652 643 version = "v0.0.0-20250620022241-b7579e27df2b" 653 644 hash = "sha256-IsDTeuWLj4UkPO4NhWTvFeZ22WNtlxjoWiyAJh6zdig=" ··· 658 643 version = "v0.31.0" 659 644 hash = "sha256-ZFTlu9+4QToPPLA8C5UcG2eq/lQylq81RoG/WtYo9rg=" 660 645 [mod."golang.org/x/net"] 661 - version = "v0.42.0" 662 - hash = "sha256-YxileisIIez+kcAI+21kY5yk0iRuEqti2YdmS8jvP2s=" 646 + version = "v0.43.0" 647 + hash = "sha256-bf3iQFrsC8BoarVaS0uSspEFAcr1zHp1uziTtBpwV34=" 663 648 [mod."golang.org/x/sync"] 664 649 version = "v0.17.0" 665 650 hash = "sha256-M85lz4hK3/fzmcUViAp/CowHSxnr3BHSO7pjHp1O6i0=" 666 651 [mod."golang.org/x/sys"] 667 - version = "v0.34.0" 668 - hash = "sha256-5rZ7p8IaGli5X1sJbfIKOcOEwY4c0yQhinJPh2EtK50=" 652 + version = "v0.35.0" 653 + hash = "sha256-ZKM8pesQE6NAFZeKQ84oPn5JMhGr8g4TSwLYAsHMGSI=" 669 654 [mod."golang.org/x/text"] 670 655 version = "v0.29.0" 671 656 hash = "sha256-2cWBtJje+Yc+AnSgCANqBlIwnOMZEGkpQ2cFI45VfLI=" ··· 685 670 version = "v1.73.0" 686 671 hash = "sha256-LfVlwip++q2DX70RU6CxoXglx1+r5l48DwlFD05G11c=" 687 672 [mod."google.golang.org/protobuf"] 688 - version = "v1.36.6" 689 - hash = "sha256-lT5qnefI5FDJnowz9PEkAGylH3+fE+A3DJDkAyy9RMc=" 673 + version = "v1.36.8" 674 + hash = "sha256-yZN8ZON0b5HjUNUSubHst7zbvnMsOzd81tDPYQRtPgM=" 690 675 [mod."gopkg.in/fsnotify.v1"] 691 676 version = "v1.4.7" 692 677 hash = "sha256-j/Ts92oXa3k1MFU7Yd8/AqafRTsFn7V2pDKCyDJLah8="
+18
nix/pkgs/knot-mirror.nix
··· 1 + { 2 + buildGoApplication, 3 + modules, 4 + src, 5 + }: 6 + buildGoApplication { 7 + pname = "knotmirror"; 8 + version = "0.1.0"; 9 + inherit src modules; 10 + 11 + doCheck = false; 12 + 13 + subPackages = ["cmd/knotmirror"]; 14 + 15 + meta = { 16 + mainProgram = "knotmirror"; 17 + }; 18 + }