Mirror of @tangled.org/core. Running on a Raspberry Pi Zero 2 (Please be gentle).
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

nix,spindle: sync workflow files on `sh.tangled.git.refUpdate`

Spindle will sync git repo when new repo is registered

Spindle will listen to `sh.tangled.git.refUpdate` event from knot
stream and sync its local git repo instead. Spindle's git repo will
sparse-checkout only `/.tangled/workflows` directory.

Spindle now requires git version >=2.49 for `--revision` flag in `git
clone` command.

References:
- <https://stackoverflow.com/q/47541033/13150270>
- <https://stackoverflow.com/q/600079/13150270>

Signed-off-by: Seongmin Lee <git@boltless.me>

+165 -7
+1
go.mod
··· 29 29 github.com/gorilla/feeds v1.2.0 30 30 github.com/gorilla/sessions v1.4.0 31 31 github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 32 + github.com/hashicorp/go-version v1.8.0 32 33 github.com/hiddeco/sshsig v0.2.0 33 34 github.com/hpcloud/tail v1.0.0 34 35 github.com/ipfs/go-cid v0.5.0
+2
go.sum
··· 265 265 github.com/hashicorp/go-secure-stdlib/strutil v0.1.2/go.mod h1:Gou2R9+il93BqX25LAKCLuM+y9U2T4hlwvT1yprcna4= 266 266 github.com/hashicorp/go-sockaddr v1.0.7 h1:G+pTkSO01HpR5qCxg7lxfsFEZaG+C0VssTy/9dbT+Fw= 267 267 github.com/hashicorp/go-sockaddr v1.0.7/go.mod h1:FZQbEYa1pxkQ7WLpyXJ6cbjpT8q0YgQaK/JakXqGyWw= 268 + github.com/hashicorp/go-version v1.8.0 h1:KAkNb1HAiZd1ukkxDFGmokVZe1Xy9HG6NUp+bPle2i4= 269 + github.com/hashicorp/go-version v1.8.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= 268 270 github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= 269 271 github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= 270 272 github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k=
+3
nix/gomod2nix.toml
··· 304 304 [mod."github.com/hashicorp/go-sockaddr"] 305 305 version = "v1.0.7" 306 306 hash = "sha256-p6eDOrGzN1jMmT/F/f/VJMq0cKNFhUcEuVVwTE6vSrs=" 307 + [mod."github.com/hashicorp/go-version"] 308 + version = "v1.8.0" 309 + hash = "sha256-KXtqERmYrWdpqPCViWcHbe6jnuH7k16bvBIcuJuevj8=" 307 310 [mod."github.com/hashicorp/golang-lru"] 308 311 version = "v1.0.2" 309 312 hash = "sha256-yy+5botc6T5wXgOe2mfNXJP3wr+MkVlUZ2JBkmmrA48="
+4
nix/modules/spindle.nix
··· 1 1 { 2 2 config, 3 + pkgs, 3 4 lib, 4 5 ... 5 6 }: let ··· 146 145 description = "spindle service"; 147 146 after = ["network.target" "docker.service" "spindle-tap.service"]; 148 147 wantedBy = ["multi-user.target"]; 148 + path = [ 149 + pkgs.git 150 + ]; 149 151 serviceConfig = { 150 152 LogsDirectory = "spindle"; 151 153 StateDirectory = "spindle";
+4
spindle/config/config.go
··· 27 27 return syntax.DID(fmt.Sprintf("did:web:%s", s.Hostname)) 28 28 } 29 29 30 + func (s Server) RepoDir() string { 31 + return filepath.Join(s.DataDir, "repos") 32 + } 33 + 30 34 func (s Server) DBPath() string { 31 35 return filepath.Join(s.DataDir, "spindle.db") 32 36 }
+73
spindle/git/git.go
··· 1 + package git 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "fmt" 7 + "os" 8 + "os/exec" 9 + "strings" 10 + 11 + "github.com/hashicorp/go-version" 12 + ) 13 + 14 + func Version() (*version.Version, error) { 15 + var buf bytes.Buffer 16 + cmd := exec.Command("git", "version") 17 + cmd.Stdout = &buf 18 + cmd.Stderr = os.Stderr 19 + err := cmd.Run() 20 + if err != nil { 21 + return nil, err 22 + } 23 + fields := strings.Fields(buf.String()) 24 + if len(fields) < 3 { 25 + return nil, fmt.Errorf("invalid git version: %s", buf.String()) 26 + } 27 + 28 + // version string is like: "git version 2.29.3" or "git version 2.29.3.windows.1" 29 + versionString := fields[2] 30 + if pos := strings.Index(versionString, "windows"); pos >= 1 { 31 + versionString = versionString[:pos-1] 32 + } 33 + return version.NewVersion(versionString) 34 + } 35 + 36 + const WorkflowDir = `/.tangled/workflows` 37 + 38 + func SparseSyncGitRepo(ctx context.Context, cloneUri, path, rev string) error { 39 + exist, err := isDir(path) 40 + if err != nil { 41 + return err 42 + } 43 + if rev == "" { 44 + rev = "HEAD" 45 + } 46 + if !exist { 47 + if err := exec.Command("git", "clone", "--no-checkout", "--depth=1", "--filter=tree:0", "--revision="+rev, cloneUri, path).Run(); err != nil { 48 + return fmt.Errorf("git clone: %w", err) 49 + } 50 + if err := exec.Command("git", "-C", path, "sparse-checkout", "set", "--no-cone", WorkflowDir).Run(); err != nil { 51 + return fmt.Errorf("git sparse-checkout set: %w", err) 52 + } 53 + } else { 54 + if err := exec.Command("git", "-C", path, "fetch", "--depth=1", "--filter=tree:0", "origin", rev).Run(); err != nil { 55 + return fmt.Errorf("git pull: %w", err) 56 + } 57 + } 58 + if err := exec.Command("git", "-C", path, "checkout", rev).Run(); err != nil { 59 + return fmt.Errorf("git checkout: %w", err) 60 + } 61 + return nil 62 + } 63 + 64 + func isDir(path string) (bool, error) { 65 + info, err := os.Stat(path) 66 + if err == nil && info.IsDir() { 67 + return true, nil 68 + } 69 + if os.IsNotExist(err) { 70 + return false, nil 71 + } 72 + return false, err 73 + }
+66 -5
spindle/server.go
··· 8 8 "log/slog" 9 9 "maps" 10 10 "net/http" 11 + "path/filepath" 11 12 "sync" 12 13 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 14 15 "github.com/go-chi/chi/v5" 16 + "github.com/hashicorp/go-version" 15 17 "tangled.org/core/api/tangled" 16 18 "tangled.org/core/eventconsumer" 17 19 "tangled.org/core/eventconsumer/cursor" ··· 25 23 "tangled.org/core/spindle/db" 26 24 "tangled.org/core/spindle/engine" 27 25 "tangled.org/core/spindle/engines/nixery" 26 + "tangled.org/core/spindle/git" 28 27 "tangled.org/core/spindle/models" 29 28 "tangled.org/core/spindle/queue" 30 29 "tangled.org/core/spindle/secrets" ··· 57 54 func New(ctx context.Context, cfg *config.Config, engines map[string]models.Engine) (*Spindle, error) { 58 55 logger := log.FromContext(ctx) 59 56 60 - d, err := db.Make(ctx, cfg.Server.DBPath) 57 + if err := ensureGitVersion(); err != nil { 58 + return nil, fmt.Errorf("ensuring git version: %w", err) 59 + } 60 + 61 + d, err := db.Make(ctx, cfg.Server.DBPath()) 61 62 if err != nil { 62 63 return nil, fmt.Errorf("failed to setup db: %w", err) 63 64 } 64 65 65 - e, err := rbac2.NewEnforcer(cfg.Server.DBPath) 66 + e, err := rbac2.NewEnforcer(cfg.Server.DBPath()) 66 67 if err != nil { 67 68 return nil, fmt.Errorf("failed to setup rbac enforcer: %w", err) 68 69 } ··· 89 82 } 90 83 logger.Info("using openbao secrets provider", "proxy_address", cfg.Server.Secrets.OpenBao.ProxyAddr, "mount", cfg.Server.Secrets.OpenBao.Mount) 91 84 case "sqlite", "": 92 - vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath, secrets.WithTableName("secrets")) 85 + vault, err = secrets.NewSQLiteManager(cfg.Server.DBPath(), secrets.WithTableName("secrets")) 93 86 if err != nil { 94 87 return nil, fmt.Errorf("failed to setup sqlite secrets provider: %w", err) 95 88 } 96 - logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath) 89 + logger.Info("using sqlite secrets provider", "path", cfg.Server.DBPath()) 97 90 default: 98 91 return nil, fmt.Errorf("unknown secrets provider: %s", cfg.Server.Secrets.Provider) 99 92 } ··· 125 118 } 126 119 logger.Info("owner set", "did", cfg.Server.Owner) 127 120 128 - cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath) 121 + cursorStore, err := cursor.NewSQLiteStore(cfg.Server.DBPath()) 129 122 if err != nil { 130 123 return nil, fmt.Errorf("failed to setup sqlite3 cursor store: %w", err) 131 124 } ··· 282 275 } 283 276 284 277 func (s *Spindle) processPipeline(ctx context.Context, src eventconsumer.Source, msg eventconsumer.Message) error { 278 + l := log.FromContext(ctx).With("handler", "processKnotStream") 279 + l = l.With("src", src.Key(), "msg.Nsid", msg.Nsid, "msg.Rkey", msg.Rkey) 285 280 if msg.Nsid == tangled.PipelineNSID { 281 + return nil 286 282 tpl := tangled.Pipeline{} 287 283 err := json.Unmarshal(msg.EventJson, &tpl) 288 284 if err != nil { ··· 386 376 } else { 387 377 s.l.Error("failed to enqueue pipeline: queue is full") 388 378 } 379 + } else if msg.Nsid == tangled.GitRefUpdateNSID { 380 + event := tangled.GitRefUpdate{} 381 + if err := json.Unmarshal(msg.EventJson, &event); err != nil { 382 + l.Error("error unmarshalling", "err", err) 383 + return err 384 + } 385 + l = l.With("repoDid", event.RepoDid, "repoName", event.RepoName) 386 + 387 + // resolve repo name to rkey 388 + // TODO: git.refUpdate should respond with rkey instead of repo name 389 + repo, err := s.db.GetRepoWithName(syntax.DID(event.RepoDid), event.RepoName) 390 + if err != nil { 391 + return fmt.Errorf("get repo with did and name (%s/%s): %w", event.RepoDid, event.RepoName, err) 392 + } 393 + 394 + // NOTE: we are blindly trusting the knot that it will return only repos it own 395 + repoCloneUri := s.newRepoCloneUrl(src.Key(), event.RepoDid, event.RepoName) 396 + repoPath := s.newRepoPath(repo.Did, repo.Rkey) 397 + if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, event.NewSha); err != nil { 398 + return fmt.Errorf("sync git repo: %w", err) 399 + } 400 + l.Info("synced git repo") 401 + 402 + // TODO: plan the pipeline 389 403 } 390 404 405 + return nil 406 + } 407 + 408 + // newRepoPath creates a path to store repository by its did and rkey. 409 + // The path format would be: `/data/repos/did:plc:foo/sh.tangled.repo/repo-rkey 410 + func (s *Spindle) newRepoPath(did syntax.DID, rkey syntax.RecordKey) string { 411 + return filepath.Join(s.cfg.Server.RepoDir(), did.String(), tangled.RepoNSID, rkey.String()) 412 + } 413 + 414 + func (s *Spindle) newRepoCloneUrl(knot, did, name string) string { 415 + scheme := "https://" 416 + if s.cfg.Server.Dev { 417 + scheme = "http://" 418 + } 419 + return fmt.Sprintf("%s%s/%s/%s", scheme, knot, did, name) 420 + } 421 + 422 + const RequiredVersion = "2.49.0" 423 + 424 + func ensureGitVersion() error { 425 + v, err := git.Version() 426 + if err != nil { 427 + return fmt.Errorf("fetching git version: %w", err) 428 + } 429 + if v.LessThan(version.Must(version.NewVersion(RequiredVersion))) { 430 + return fmt.Errorf("installed git version %q is not supported, Spindle requires git version >= %q", v, RequiredVersion) 431 + } 391 432 return nil 392 433 }
+12 -2
spindle/tap.go
··· 10 10 "tangled.org/core/api/tangled" 11 11 "tangled.org/core/eventconsumer" 12 12 "tangled.org/core/spindle/db" 13 + "tangled.org/core/spindle/git" 13 14 "tangled.org/core/tap" 14 15 ) 15 16 ··· 226 225 return nil 227 226 } 228 227 229 - if err := s.db.PutRepo(&db.Repo{ 228 + repo := &db.Repo{ 230 229 Did: evt.Record.Did, 231 230 Rkey: evt.Record.Rkey, 232 231 Name: record.Name, 233 232 Knot: record.Knot, 234 - }); err != nil { 233 + } 234 + 235 + if err := s.db.PutRepo(repo); err != nil { 235 236 return fmt.Errorf("adding repo to db: %w", err) 236 237 } 237 238 ··· 244 241 // add this knot to the event consumer 245 242 src := eventconsumer.NewKnotSource(record.Knot) 246 243 s.ks.AddSource(context.Background(), src) 244 + 245 + // setup sparse sync 246 + repoCloneUri := s.newRepoCloneUrl(repo.Knot, repo.Did.String(), repo.Name) 247 + repoPath := s.newRepoPath(repo.Did, repo.Rkey) 248 + if err := git.SparseSyncGitRepo(ctx, repoCloneUri, repoPath, ""); err != nil { 249 + return fmt.Errorf("setting up sparse-clone git repo: %w", err) 250 + } 247 251 248 252 l.Info("added repo", "repo", evt.Record.AtUri()) 249 253 return nil