Monorepo for Tangled tangled.org
761
fork

Configure Feed

Select the types of activity you want to include in your feed.

knotmirror: key repos by repo_did + disk migration

Lewis: May this revision serve well! <lewis@tangled.org>

Lewis 6600c3a7 3c633a38

+451 -191
+25
cmd/knotmirror/main.go
··· 11 11 "github.com/urfave/cli/v3" 12 12 "tangled.org/core/knotmirror" 13 13 "tangled.org/core/knotmirror/config" 14 + "tangled.org/core/knotmirror/db" 15 + "tangled.org/core/knotmirror/migrate" 14 16 "tangled.org/core/log" 15 17 ) 16 18 ··· 42 44 Action: runKnotMirror, 43 45 Flags: []cli.Flag{}, 44 46 }, 47 + { 48 + Name: "migrate-disk", 49 + Usage: "rename mirror dirs from {did}/{rkey} -> {repo_did}; daemon must be stopped", 50 + Action: runMigrateDisk, 51 + Flags: []cli.Flag{}, 52 + }, 45 53 } 46 54 return app.Run(ctx, args) 47 55 } ··· 56 64 logger.Debug("config loaded:", "config", cfg) 57 65 return knotmirror.Run(ctx, cfg) 58 66 } 67 + 68 + func runMigrateDisk(ctx context.Context, cmd *cli.Command) error { 69 + logger := log.FromContext(ctx) 70 + cfg, err := config.Load(ctx) 71 + if err != nil { 72 + return err 73 + } 74 + database, err := db.Make(ctx, cfg.DbUrl, 4) 75 + if err != nil { 76 + return err 77 + } 78 + defer database.Close() 79 + 80 + stats, err := migrate.RenameDisk(ctx, cfg.GitRepoBasePath, database, logger) 81 + logger.Info("migrate-disk complete", "stats", stats.String()) 82 + return err 83 + }
+5
knotmirror/db/db.go
··· 7 7 "time" 8 8 9 9 _ "github.com/jackc/pgx/v5/stdlib" 10 + "tangled.org/core/log" 10 11 ) 11 12 12 13 func Make(ctx context.Context, dbUrl string, maxConns int) (*sql.DB, error) { ··· 94 95 `) 95 96 if err != nil { 96 97 return nil, fmt.Errorf("initializing db schema: %w", err) 98 + } 99 + 100 + if err := RunMigrations(ctx, conn, log.FromContext(ctx), Migrations); err != nil { 101 + return nil, fmt.Errorf("running migrations: %w", err) 97 102 } 98 103 99 104 return db, nil
+72
knotmirror/db/migrations.go
··· 1 + package db 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "fmt" 7 + "log/slog" 8 + ) 9 + 10 + type MigrationFn = func(context.Context, *sql.Tx) error 11 + 12 + type Migration struct { 13 + Name string 14 + Fn MigrationFn 15 + } 16 + 17 + func ensureMigrationsTable(ctx context.Context, conn *sql.Conn) error { 18 + _, err := conn.ExecContext(ctx, ` 19 + create table if not exists migrations ( 20 + name text primary key, 21 + applied_at timestamptz not null default now() 22 + ); 23 + `) 24 + return err 25 + } 26 + 27 + func RunMigration(ctx context.Context, conn *sql.Conn, logger *slog.Logger, m Migration) error { 28 + logger = logger.With("migration", m.Name) 29 + 30 + tx, err := conn.BeginTx(ctx, nil) 31 + if err != nil { 32 + return fmt.Errorf("begin migration tx: %w", err) 33 + } 34 + defer tx.Rollback() 35 + 36 + var exists bool 37 + if err := tx.QueryRowContext(ctx, `select exists (select 1 from migrations where name = $1)`, m.Name).Scan(&exists); err != nil { 38 + return fmt.Errorf("checking migration state: %w", err) 39 + } 40 + if exists { 41 + logger.Debug("migration already applied") 42 + return nil 43 + } 44 + 45 + if err := m.Fn(ctx, tx); err != nil { 46 + logger.Error("migration failed", "err", err) 47 + return fmt.Errorf("running migration %s: %w", m.Name, err) 48 + } 49 + 50 + if _, err := tx.ExecContext(ctx, `insert into migrations (name) values ($1)`, m.Name); err != nil { 51 + return fmt.Errorf("recording migration: %w", err) 52 + } 53 + 54 + if err := tx.Commit(); err != nil { 55 + return fmt.Errorf("commit migration: %w", err) 56 + } 57 + 58 + logger.Info("migration applied") 59 + return nil 60 + } 61 + 62 + func RunMigrations(ctx context.Context, conn *sql.Conn, logger *slog.Logger, ms []Migration) error { 63 + if err := ensureMigrationsTable(ctx, conn); err != nil { 64 + return fmt.Errorf("ensuring migrations table: %w", err) 65 + } 66 + for _, m := range ms { 67 + if err := RunMigration(ctx, conn, logger, m); err != nil { 68 + return err 69 + } 70 + } 71 + return nil 72 + }
+56
knotmirror/db/migrations_list.go
··· 1 + package db 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "fmt" 7 + 8 + "tangled.org/core/log" 9 + ) 10 + 11 + var Migrations = []Migration{ 12 + { 13 + Name: "repos_pk_to_repo_did", 14 + Fn: reposPkToRepoDid, 15 + }, 16 + } 17 + 18 + func reposPkToRepoDid(ctx context.Context, tx *sql.Tx) error { 19 + if _, err := tx.ExecContext(ctx, 20 + `alter table repos add column if not exists repo_did text`, 21 + ); err != nil { 22 + return fmt.Errorf("adding repo_did column: %w", err) 23 + } 24 + var bad int 25 + if err := tx.QueryRowContext(ctx, 26 + `select count(*) from repos where repo_did is null or repo_did = ''`, 27 + ).Scan(&bad); err != nil { 28 + return fmt.Errorf("counting rows with null repo_did: %w", err) 29 + } 30 + if bad > 0 { 31 + log.FromContext(ctx).Warn( 32 + "dropping repos with null repo_did; their on-disk dirs will be orphaned. re-crawl via tap to restore", 33 + "count", bad, 34 + ) 35 + if _, err := tx.ExecContext(ctx, 36 + `delete from repos where repo_did is null or repo_did = ''`, 37 + ); err != nil { 38 + return fmt.Errorf("deleting null repo_did rows: %w", err) 39 + } 40 + } 41 + return execAll(ctx, tx, 42 + `alter table repos alter column repo_did set not null`, 43 + `alter table repos drop constraint if exists repos_pkey`, 44 + `alter table repos add constraint repos_pkey primary key (repo_did)`, 45 + ) 46 + } 47 + 48 + func execAll(ctx context.Context, tx *sql.Tx, stmts ...string) error { 49 + if len(stmts) == 0 { 50 + return nil 51 + } 52 + if _, err := tx.ExecContext(ctx, stmts[0]); err != nil { 53 + return err 54 + } 55 + return execAll(ctx, tx, stmts[1:]...) 56 + }
+54 -93
knotmirror/db/repos.go
··· 11 11 "tangled.org/core/knotmirror/models" 12 12 ) 13 13 14 - func AddRepo(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, cid syntax.CID, name, knot string) error { 15 - if _, err := e.ExecContext(ctx, 16 - `insert into repos (did, rkey, cid, name, knot_domain) 17 - values ($1, $2, $3, $4, $5)`, 18 - did, rkey, cid, name, knot, 19 - ); err != nil { 20 - return fmt.Errorf("inserting repo: %w", err) 14 + func UpsertRepo(ctx context.Context, e *sql.DB, repo *models.Repo) error { 15 + if repo.RepoDid == "" { 16 + return fmt.Errorf("upsert repo: repo_did is required") 21 17 } 22 - return nil 23 - } 24 - 25 - func UpsertRepo(ctx context.Context, e *sql.DB, repo *models.Repo) error { 26 18 if _, err := e.ExecContext(ctx, 27 - `insert into repos (did, rkey, cid, name, knot_domain, git_rev, repo_sha, state, error_msg, retry_count, retry_after) 28 - values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) 29 - on conflict(did, rkey) do update set 19 + `insert into repos (did, rkey, cid, name, knot_domain, repo_did, git_rev, repo_sha, state, error_msg, retry_count, retry_after) 20 + values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) 21 + on conflict(repo_did) do update set 22 + did = excluded.did, 23 + rkey = excluded.rkey, 30 24 cid = excluded.cid, 31 25 name = excluded.name, 32 26 knot_domain = excluded.knot_domain, ··· 36 30 error_msg = excluded.error_msg, 37 31 retry_count = excluded.retry_count, 38 32 retry_after = excluded.retry_after`, 39 - // where repos.cid != excluded.cid`, 40 33 repo.Did, 41 34 repo.Rkey, 42 35 repo.Cid, 43 36 repo.Name, 44 37 repo.KnotDomain, 38 + repo.RepoDid, 45 39 repo.GitRev, 46 40 repo.RepoSha, 47 41 repo.State, ··· 54 48 return nil 55 49 } 56 50 57 - func UpdateRepoState(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, state models.RepoState) error { 51 + func UpdateRepoState(ctx context.Context, e *sql.DB, repoDid syntax.DID, state models.RepoState) error { 58 52 if _, err := e.ExecContext(ctx, 59 53 `update repos 60 54 set state = $1 61 - where did = $2 and rkey = $3`, 55 + where repo_did = $2`, 62 56 state, 63 - did, rkey, 57 + repoDid, 64 58 ); err != nil { 65 59 return fmt.Errorf("updating repo: %w", err) 66 60 } ··· 78 72 return nil 79 73 } 80 74 81 - func GetRepoByName(ctx context.Context, e *sql.DB, did syntax.DID, name string) (*models.Repo, error) { 75 + const repoColumns = ` 76 + did, 77 + rkey, 78 + cid, 79 + name, 80 + knot_domain, 81 + repo_did, 82 + git_rev, 83 + repo_sha, 84 + state, 85 + error_msg, 86 + retry_count, 87 + retry_after` 88 + 89 + func scanRepo(row interface{ Scan(...any) error }) (*models.Repo, error) { 82 90 var repo models.Repo 83 - if err := e.QueryRowContext(ctx, 84 - `select 85 - did, 86 - rkey, 87 - cid, 88 - name, 89 - knot_domain, 90 - git_rev, 91 - repo_sha, 92 - state, 93 - error_msg, 94 - retry_count, 95 - retry_after 96 - from repos 97 - where did = $1 and name = $2`, 98 - did, 99 - name, 100 - ).Scan( 91 + if err := row.Scan( 101 92 &repo.Did, 102 93 &repo.Rkey, 103 94 &repo.Cid, 104 95 &repo.Name, 105 96 &repo.KnotDomain, 97 + &repo.RepoDid, 106 98 &repo.GitRev, 107 99 &repo.RepoSha, 108 100 &repo.State, ··· 110 102 &repo.RetryCount, 111 103 &repo.RetryAfter, 112 104 ); err != nil { 105 + return nil, err 106 + } 107 + return &repo, nil 108 + } 109 + 110 + func GetRepoByRepoDid(ctx context.Context, e *sql.DB, repoDid syntax.DID) (*models.Repo, error) { 111 + row := e.QueryRowContext(ctx, 112 + `select`+repoColumns+` 113 + from repos 114 + where repo_did = $1`, 115 + repoDid, 116 + ) 117 + repo, err := scanRepo(row) 118 + if err != nil { 113 119 if errors.Is(err, sql.ErrNoRows) { 114 120 return nil, nil 115 121 } 116 122 return nil, fmt.Errorf("querying repo: %w", err) 117 123 } 118 - return &repo, nil 124 + return repo, nil 119 125 } 120 126 121 127 func GetRepoByAtUri(ctx context.Context, e *sql.DB, aturi syntax.ATURI) (*models.Repo, error) { 122 - var repo models.Repo 123 - if err := e.QueryRowContext(ctx, 124 - `select 125 - did, 126 - rkey, 127 - cid, 128 - name, 129 - knot_domain, 130 - git_rev, 131 - repo_sha, 132 - state, 133 - error_msg, 134 - retry_count, 135 - retry_after 128 + row := e.QueryRowContext(ctx, 129 + `select`+repoColumns+` 136 130 from repos 137 131 where at_uri = $1`, 138 132 aturi, 139 - ).Scan( 140 - &repo.Did, 141 - &repo.Rkey, 142 - &repo.Cid, 143 - &repo.Name, 144 - &repo.KnotDomain, 145 - &repo.GitRev, 146 - &repo.RepoSha, 147 - &repo.State, 148 - &repo.ErrorMsg, 149 - &repo.RetryCount, 150 - &repo.RetryAfter, 151 - ); err != nil { 133 + ) 134 + repo, err := scanRepo(row) 135 + if err != nil { 152 136 if errors.Is(err, sql.ErrNoRows) { 153 137 return nil, nil 154 138 } 155 139 return nil, fmt.Errorf("querying repo: %w", err) 156 140 } 157 - return &repo, nil 141 + return repo, nil 158 142 } 159 143 160 144 func ListRepos(ctx context.Context, e *sql.DB, page pagination.Page, did, knot, state string) ([]models.Repo, error) { ··· 188 172 } 189 173 190 174 query := ` 191 - select 192 - did, 193 - rkey, 194 - cid, 195 - name, 196 - knot_domain, 197 - git_rev, 198 - repo_sha, 199 - state, 200 - error_msg, 201 - retry_count, 202 - retry_after 175 + select` + repoColumns + ` 203 176 from repos 204 177 ` + whereClause + pageClause 205 178 rows, err := e.QueryContext(ctx, query, args...) ··· 210 183 211 184 var repos []models.Repo 212 185 for rows.Next() { 213 - var repo models.Repo 214 - if err := rows.Scan( 215 - &repo.Did, 216 - &repo.Rkey, 217 - &repo.Cid, 218 - &repo.Name, 219 - &repo.KnotDomain, 220 - &repo.GitRev, 221 - &repo.RepoSha, 222 - &repo.State, 223 - &repo.ErrorMsg, 224 - &repo.RetryCount, 225 - &repo.RetryAfter, 226 - ); err != nil { 186 + repo, err := scanRepo(rows) 187 + if err != nil { 227 188 return nil, fmt.Errorf("scanning row: %w", err) 228 189 } 229 - repos = append(repos, repo) 190 + repos = append(repos, *repo) 230 191 } 231 192 if err := rows.Err(); err != nil { 232 193 return nil, fmt.Errorf("scanning rows: %w ", err)
+17 -37
knotmirror/git.go
··· 19 19 20 20 type GitMirrorManager interface { 21 21 Exist(repo *models.Repo) (bool, error) 22 - // RemoteSetUrl updates git repository 'origin' remote 23 - RemoteSetUrl(ctx context.Context, repo *models.Repo) error 24 22 // Clone clones the repository as a mirror 25 23 Clone(ctx context.Context, repo *models.Repo) error 26 24 // Fetch fetches the repository ··· 44 42 var _ GitMirrorManager = new(CliGitMirrorManager) 45 43 46 44 func (c *CliGitMirrorManager) makeRepoPath(repo *models.Repo) string { 47 - return filepath.Join(c.repoBasePath, repo.Did.String(), repo.Rkey.String()) 45 + return filepath.Join(c.repoBasePath, repo.RepoDid.String()) 48 46 } 49 47 50 48 func (c *CliGitMirrorManager) Exist(repo *models.Repo) (bool, error) { 51 49 return isDir(c.makeRepoPath(repo)) 52 50 } 53 51 54 - func (c *CliGitMirrorManager) RemoteSetUrl(ctx context.Context, repo *models.Repo) error { 55 - path := c.makeRepoPath(repo) 56 - url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL) 57 - if err != nil { 58 - return fmt.Errorf("constructing repo remote url: %w", err) 59 - } 60 - cmd := exec.CommandContext(ctx, "git", "-C", path, "remote", "set-url", "origin", url) 61 - if out, err := cmd.CombinedOutput(); err != nil { 62 - if ctx.Err() != nil { 63 - return ctx.Err() 64 - } 65 - msg := string(out) 66 - return fmt.Errorf("running 'git remote set-url origin %s': %w\n%s", url, err, msg) 67 - } 68 - return nil 69 - } 70 - 71 52 func (c *CliGitMirrorManager) Clone(ctx context.Context, repo *models.Repo) error { 72 53 path := c.makeRepoPath(repo) 73 - url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL) 54 + url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.RepoIdentifier(), c.knotUseSSL) 74 55 if err != nil { 75 56 return fmt.Errorf("constructing repo remote url: %w", err) 76 57 } ··· 94 75 95 76 func (c *CliGitMirrorManager) Fetch(ctx context.Context, repo *models.Repo) error { 96 77 path := c.makeRepoPath(repo) 97 - return c.fetch(ctx, path) 78 + url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.RepoIdentifier(), c.knotUseSSL) 79 + if err != nil { 80 + return fmt.Errorf("constructing repo remote url: %w", err) 81 + } 82 + return c.fetch(ctx, path, url) 98 83 } 99 84 100 - func (c *CliGitMirrorManager) fetch(ctx context.Context, path string) error { 101 - // TODO: use `repo.Knot` instead of depending on origin 102 - cmd := exec.CommandContext(ctx, "git", "-C", path, "fetch", "--prune", "origin") 85 + func (c *CliGitMirrorManager) fetch(ctx context.Context, path, url string) error { 86 + cmd := exec.CommandContext(ctx, "git", "-C", path, "fetch", "--prune", url, "+refs/*:refs/*") 103 87 if out, err := cmd.CombinedOutput(); err != nil { 104 88 if ctx.Err() != nil { 105 89 return ctx.Err() ··· 111 95 112 96 func (c *CliGitMirrorManager) Sync(ctx context.Context, repo *models.Repo) error { 113 97 path := c.makeRepoPath(repo) 114 - url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL) 98 + url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.RepoIdentifier(), c.knotUseSSL) 115 99 if err != nil { 116 100 return fmt.Errorf("constructing repo remote url: %w", err) 117 101 } ··· 125 109 return fmt.Errorf("cloning repo: %w", err) 126 110 } 127 111 } else { 128 - if err := c.fetch(ctx, path); err != nil { 112 + if err := c.fetch(ctx, path, url); err != nil { 129 113 return fmt.Errorf("fetching repo: %w", err) 130 114 } 131 115 } ··· 191 175 var _ GitMirrorManager = new(GoGitMirrorManager) 192 176 193 177 func (c *GoGitMirrorManager) makeRepoPath(repo *models.Repo) string { 194 - return filepath.Join(c.repoBasePath, repo.Did.String(), repo.Rkey.String()) 178 + return filepath.Join(c.repoBasePath, repo.RepoDid.String()) 195 179 } 196 180 197 181 func (c *GoGitMirrorManager) Exist(repo *models.Repo) (bool, error) { 198 182 return isDir(c.makeRepoPath(repo)) 199 183 } 200 184 201 - func (c *GoGitMirrorManager) RemoteSetUrl(ctx context.Context, repo *models.Repo) error { 202 - panic("unimplemented") 203 - } 204 - 205 185 func (c *GoGitMirrorManager) Clone(ctx context.Context, repo *models.Repo) error { 206 186 path := c.makeRepoPath(repo) 207 - url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL) 187 + url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.RepoIdentifier(), c.knotUseSSL) 208 188 if err != nil { 209 189 return fmt.Errorf("constructing repo remote url: %w", err) 210 190 } ··· 224 204 225 205 func (c *GoGitMirrorManager) Fetch(ctx context.Context, repo *models.Repo) error { 226 206 path := c.makeRepoPath(repo) 227 - url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL) 207 + url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.RepoIdentifier(), c.knotUseSSL) 228 208 if err != nil { 229 209 return fmt.Errorf("constructing repo remote url: %w", err) 230 210 } ··· 250 230 251 231 func (c *GoGitMirrorManager) Sync(ctx context.Context, repo *models.Repo) error { 252 232 path := c.makeRepoPath(repo) 253 - url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL) 233 + url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.RepoIdentifier(), c.knotUseSSL) 254 234 if err != nil { 255 235 return fmt.Errorf("constructing repo remote url: %w", err) 256 236 } ··· 271 251 return nil 272 252 } 273 253 274 - func makeRepoRemoteUrl(knot, didSlashRepo string, knotUseSSL bool) (string, error) { 254 + func makeRepoRemoteUrl(knot, repoIdentifier string, knotUseSSL bool) (string, error) { 275 255 if !strings.Contains(knot, "://") { 276 256 if knotUseSSL { 277 257 knot = "https://" + knot ··· 289 269 return "", fmt.Errorf("unsupported scheme: %s", u.Scheme) 290 270 } 291 271 292 - u = u.JoinPath(didSlashRepo) 272 + u = u.JoinPath(repoIdentifier) 293 273 return u.String(), nil 294 274 } 295 275
+30 -27
knotmirror/knotstream/slurper.go
··· 15 15 "github.com/bluesky-social/indigo/util/ssrf" 16 16 "github.com/carlmjohnson/versioninfo" 17 17 "github.com/gorilla/websocket" 18 - "tangled.org/core/api/tangled" 19 18 "tangled.org/core/knotmirror/config" 20 19 "tangled.org/core/knotmirror/db" 21 20 "tangled.org/core/knotmirror/models" ··· 262 261 } 263 262 } 264 263 264 + type legacyGitRefUpdate struct { 265 + OwnerDid *string `json:"ownerDid,omitempty"` 266 + RepoDid *string `json:"repoDid,omitempty"` 267 + RepoName string `json:"repoName,omitempty"` 268 + } 269 + 265 270 type LegacyGitEvent struct { 266 271 Rkey string 267 272 Nsid string 268 - Event tangled.GitRefUpdate 273 + Event legacyGitRefUpdate 269 274 } 270 275 271 276 func (s *KnotSlurper) ProcessEvent(ctx context.Context, task *Task) error { ··· 280 285 return nil 281 286 } 282 287 288 + // lookupRepoForRefUpdate resolves the local repo row for an incoming refUpdate 289 + // via the stable RepoDid join. Returns (nil, "", nil) when the event has no 290 + // repoDid (unjoinable) and (nil, key, nil) on a clean miss. 291 + func (s *KnotSlurper) lookupRepoForRefUpdate(ctx context.Context, evt *LegacyGitEvent) (*models.Repo, string, error) { 292 + if evt.Event.RepoDid == nil || *evt.Event.RepoDid == "" { 293 + return nil, "", nil 294 + } 295 + repoDid := syntax.DID(*evt.Event.RepoDid) 296 + curr, err := db.GetRepoByRepoDid(ctx, s.db, repoDid) 297 + return curr, repoDid.String(), err 298 + } 299 + 283 300 func (s *KnotSlurper) ProcessLegacyGitRefUpdate(ctx context.Context, source string, evt *LegacyGitEvent) error { 284 301 knotstreamEventsReceived.Inc() 285 302 286 303 l := s.logger.With("src", source) 287 304 288 - ownerDid := "" 289 - if evt.Event.OwnerDid != nil { 290 - ownerDid = *evt.Event.OwnerDid 291 - } else { 292 - // handle legacy event 293 - if evt.Event.RepoDid != nil { 294 - ownerDid = *evt.Event.RepoDid 295 - } 296 - } 297 - curr, err := db.GetRepoByName(ctx, s.db, syntax.DID(ownerDid), evt.Event.RepoName) 305 + curr, lookupKey, err := s.lookupRepoForRefUpdate(ctx, evt) 298 306 if err != nil { 299 - return fmt.Errorf("failed to get repo '%s': %w", ownerDid+"/"+evt.Event.RepoName, err) 307 + return fmt.Errorf("failed to get repo '%s': %w", lookupKey, err) 300 308 } 301 309 if curr == nil { 302 - // if repo doesn't exist in DB, just ignore the event. That repo is unknown. 303 - // 304 - // Normally did+name is already enough to perform git-fetch as that's 305 - // what needed to fetch the repository. 306 - // But we want to store that in did/rkey in knot-mirror. 307 - // Therefore, we should ignore when the repository is unknown. 308 - // Hopefully crawler will sync it later. 309 - l.Warn("skipping event from unknown repo", "did/name", ownerDid+"/"+evt.Event.RepoName) 310 + if lookupKey == "" { 311 + l.Warn("skipping gitRefUpdate: event has no fields to join on", 312 + "repo_did", evt.Event.RepoDid) 313 + } else { 314 + // if repo doesn't exist in DB, just ignore the event. That repo is unknown. 315 + // Hopefully crawler/tap will sync it later. 316 + l.Warn("skipping event from unknown repo", "key", lookupKey) 317 + } 310 318 knotstreamEventsSkipped.Inc() 311 319 return nil 312 320 } ··· 325 333 return nil 326 334 } 327 335 328 - // if curr.State == models.RepoStateResyncing { 329 - // firehoseEventsSkipped.Inc() 330 - // return fp.events.addToResyncBuffer(ctx, commit) 331 - // } 332 - 333 336 // can't skip anything, update repo state 334 - if err := db.UpdateRepoState(ctx, s.db, curr.Did, curr.Rkey, models.RepoStateDesynchronized); err != nil { 337 + if err := db.UpdateRepoState(ctx, s.db, curr.RepoDid, models.RepoStateDesynchronized); err != nil { 335 338 return err 336 339 } 337 340
+135
knotmirror/migrate/migrate.go
··· 1 + package migrate 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "errors" 7 + "fmt" 8 + "log/slog" 9 + "os" 10 + "path/filepath" 11 + "strings" 12 + 13 + "github.com/bluesky-social/indigo/atproto/syntax" 14 + "tangled.org/core/api/tangled" 15 + "tangled.org/core/knotmirror/db" 16 + ) 17 + 18 + type Stats struct { 19 + Renamed int 20 + Skipped int 21 + Orphaned int 22 + OwnerDirsRm int 23 + AlreadyExists int 24 + } 25 + 26 + func (s Stats) String() string { 27 + return fmt.Sprintf( 28 + "renamed=%d skipped=%d orphaned=%d owner_dirs_removed=%d target_existed=%d", 29 + s.Renamed, s.Skipped, s.Orphaned, s.OwnerDirsRm, s.AlreadyExists, 30 + ) 31 + } 32 + 33 + func RenameDisk(ctx context.Context, base string, database *sql.DB, logger *slog.Logger) (Stats, error) { 34 + entries, err := os.ReadDir(base) 35 + if err != nil { 36 + return Stats{}, fmt.Errorf("reading base path: %w", err) 37 + } 38 + return reduceEntries(ctx, entries, 0, Stats{}, ownerStep(base, database, logger)) 39 + } 40 + 41 + type stepFn func(context.Context, os.DirEntry, Stats) (Stats, error) 42 + 43 + func reduceEntries(ctx context.Context, entries []os.DirEntry, idx int, acc Stats, fn stepFn) (Stats, error) { 44 + if idx >= len(entries) { 45 + return acc, nil 46 + } 47 + if err := ctx.Err(); err != nil { 48 + return acc, err 49 + } 50 + next, err := fn(ctx, entries[idx], acc) 51 + if err != nil { 52 + return next, err 53 + } 54 + return reduceEntries(ctx, entries, idx+1, next, fn) 55 + } 56 + 57 + func ownerStep(base string, database *sql.DB, logger *slog.Logger) stepFn { 58 + return func(ctx context.Context, entry os.DirEntry, acc Stats) (Stats, error) { 59 + if !entry.IsDir() || !strings.HasPrefix(entry.Name(), "did:") { 60 + return acc, nil 61 + } 62 + ownerPath := filepath.Join(base, entry.Name()) 63 + if _, err := os.Stat(filepath.Join(ownerPath, "HEAD")); err == nil { 64 + return acc, nil 65 + } 66 + subEntries, err := os.ReadDir(ownerPath) 67 + if err != nil { 68 + logger.Error("reading owner dir", "ownerPath", ownerPath, "err", err) 69 + return acc, nil 70 + } 71 + next, err := reduceEntries(ctx, subEntries, 0, acc, rkeyStep(base, database, logger, syntax.DID(entry.Name()), ownerPath)) 72 + if err != nil { 73 + return next, err 74 + } 75 + remaining, err := os.ReadDir(ownerPath) 76 + if err == nil && len(remaining) == 0 { 77 + if rmErr := os.Remove(ownerPath); rmErr == nil { 78 + next.OwnerDirsRm++ 79 + logger.Info("removed empty owner dir", "ownerPath", ownerPath) 80 + } else { 81 + logger.Warn("failed to remove empty owner dir", "ownerPath", ownerPath, "err", rmErr) 82 + } 83 + } 84 + return next, nil 85 + } 86 + } 87 + 88 + func rkeyStep(base string, database *sql.DB, logger *slog.Logger, ownerDid syntax.DID, ownerPath string) stepFn { 89 + return func(ctx context.Context, sub os.DirEntry, acc Stats) (Stats, error) { 90 + if !sub.IsDir() { 91 + return acc, nil 92 + } 93 + rkey := sub.Name() 94 + subPath := filepath.Join(ownerPath, rkey) 95 + l := logger.With("did", ownerDid, "rkey", rkey, "subPath", subPath) 96 + 97 + if _, err := os.Stat(filepath.Join(subPath, "HEAD")); err != nil { 98 + l.Warn("skipping non-repo subdir") 99 + acc.Skipped++ 100 + return acc, nil 101 + } 102 + 103 + aturi := syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", ownerDid, tangled.RepoNSID, rkey)) 104 + repo, err := db.GetRepoByAtUri(ctx, database, aturi) 105 + if err != nil { 106 + return acc, fmt.Errorf("looking up repo by aturi %s: %w", aturi, err) 107 + } 108 + if repo == nil { 109 + l.Warn("orphan disk repo, no DB row; leaving in place") 110 + acc.Orphaned++ 111 + return acc, nil 112 + } 113 + if repo.RepoDid == "" { 114 + l.Warn("DB row has empty repo_did; leaving in place") 115 + acc.Orphaned++ 116 + return acc, nil 117 + } 118 + 119 + target := filepath.Join(base, repo.RepoDid.String()) 120 + if _, err := os.Stat(target); err == nil { 121 + l.Warn("target path already exists; leaving source in place", "target", target) 122 + acc.AlreadyExists++ 123 + return acc, nil 124 + } else if !errors.Is(err, os.ErrNotExist) { 125 + return acc, fmt.Errorf("stat target %s: %w", target, err) 126 + } 127 + 128 + if err := os.Rename(subPath, target); err != nil { 129 + return acc, fmt.Errorf("rename %s -> %s: %w", subPath, target, err) 130 + } 131 + acc.Renamed++ 132 + l.Info("renamed", "target", target) 133 + return acc, nil 134 + } 135 + }
+3 -2
knotmirror/models/models.go
··· 14 14 // content of tangled.Repo 15 15 Name string 16 16 KnotDomain string 17 + RepoDid syntax.DID 17 18 18 19 GitRev syntax.TID // last processed git.refUpdate revision 19 20 RepoSha string // sha256 sum of git refs (to avoid no-op git fetch) ··· 27 28 return syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", r.Did, tangled.RepoNSID, r.Rkey)) 28 29 } 29 30 30 - func (r *Repo) DidSlashRepo() string { 31 - return fmt.Sprintf("%s/%s", r.Did, r.Name) 31 + func (r *Repo) RepoIdentifier() string { 32 + return r.RepoDid.String() 32 33 } 33 34 34 35 type RepoState string
+1 -1
knotmirror/resyncer.go
··· 278 278 279 279 // checkKnotReachability checks if Knot is reachable and is valid git remote server 280 280 func (r *Resyncer) checkKnotReachability(ctx context.Context, repo *models.Repo) error { 281 - repoUrl, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), r.cfg.KnotUseSSL) 281 + repoUrl, err := makeRepoRemoteUrl(repo.KnotDomain, repo.RepoIdentifier(), r.cfg.KnotUseSSL) 282 282 if err != nil { 283 283 return err 284 284 }
+7 -15
knotmirror/tapclient.go
··· 11 11 "strings" 12 12 "time" 13 13 14 + "github.com/bluesky-social/indigo/atproto/syntax" 14 15 "tangled.org/core/api/tangled" 15 16 "tangled.org/core/knotmirror/config" 16 17 "tangled.org/core/knotmirror/db" ··· 104 105 errMsg = "suspending non-public knot" 105 106 } 106 107 108 + if record.RepoDid == nil || *record.RepoDid == "" { 109 + t.logger.Warn("dropping repo record without repo_did", "did", evt.Did, "rkey", evt.Rkey) 110 + return nil 111 + } 107 112 repo := &models.Repo{ 108 113 Did: evt.Did, 109 114 Rkey: evt.Rkey, 110 115 Cid: evt.CID, 111 - Name: record.Name, 116 + Name: evt.Rkey.String(), 112 117 KnotDomain: knotUrl, 118 + RepoDid: syntax.DID(*record.RepoDid), 113 119 State: status, 114 120 ErrorMsg: errMsg, 115 121 RetryAfter: 0, // clear retry info 116 122 RetryCount: 0, 117 123 } 118 124 119 - if evt.Action == tapc.RecordUpdateAction { 120 - exist, err := t.gitm.Exist(repo) 121 - if err != nil { 122 - return fmt.Errorf("checking git repo existence: %w", err) 123 - } 124 - if exist { 125 - // update git repo remote url 126 - if err := t.gitm.RemoteSetUrl(ctx, repo); err != nil { 127 - return fmt.Errorf("updating git repo remote url: %w", err) 128 - } 129 - } 130 - } 131 - 132 - t.logger.Debug("tap: upserting repo with knot", "knot", repo.KnotDomain) 133 125 if err := db.UpsertRepo(ctx, t.db, repo); err != nil { 134 126 return fmt.Errorf("upserting repo to db: %w", err) 135 127 }
+10 -8
knotmirror/xrpc/git_list_branches.go
··· 9 9 10 10 "github.com/bluesky-social/indigo/atproto/atclient" 11 11 "github.com/bluesky-social/indigo/atproto/syntax" 12 + "tangled.org/core/knotmirror/db" 12 13 "tangled.org/core/knotserver/git" 13 14 "tangled.org/core/types" 14 15 ) ··· 82 83 } 83 84 84 85 func (x *Xrpc) makeRepoPath(ctx context.Context, repo syntax.ATURI) (string, error) { 85 - id, err := x.resolver.ResolveIdent(ctx, repo.Authority().String()) 86 + r, err := db.GetRepoByAtUri(ctx, x.db, repo) 86 87 if err != nil { 87 - return "", err 88 + return "", fmt.Errorf("looking up repo: %w", err) 89 + } 90 + if r == nil { 91 + return "", fmt.Errorf("repo not found: %s", repo) 92 + } 93 + if r.RepoDid == "" { 94 + return "", fmt.Errorf("repo missing repo_did: %s", repo) 88 95 } 89 - 90 - return filepath.Join( 91 - x.cfg.GitRepoBasePath, 92 - id.DID.String(), 93 - repo.RecordKey().String(), 94 - ), nil 96 + return filepath.Join(x.cfg.GitRepoBasePath, r.RepoDid.String()), nil 95 97 }
+28 -7
knotmirror/xrpc/proxy.go
··· 40 40 } 41 41 42 42 type knotInfo struct { 43 - baseURL string 44 - didSlashRepo string 43 + baseURL string 44 + repoIdentifier string 45 45 } 46 46 47 47 func (x *Xrpc) resolveKnot(ctx context.Context, repoAt syntax.ATURI) (*knotInfo, error) { ··· 49 49 if err == nil && repo != nil { 50 50 if repo.State != models.RepoStatePending && repo.State != models.RepoStateResyncing { 51 51 go func() { 52 - if err := db.UpdateRepoState(context.Background(), x.db, repo.Did, repo.Rkey, models.RepoStatePending); err != nil { 52 + if err := db.UpdateRepoState(context.Background(), x.db, repo.RepoDid, models.RepoStatePending); err != nil { 53 53 x.logger.Error("failed to mark repo for resync after proxy", "err", err) 54 54 } 55 55 }() 56 56 } 57 - return &knotInfo{baseURL: repo.KnotDomain, didSlashRepo: repo.DidSlashRepo()}, nil 57 + return &knotInfo{baseURL: repo.KnotDomain, repoIdentifier: repo.RepoIdentifier()}, nil 58 58 } 59 59 60 60 owner, err := x.resolver.ResolveIdent(ctx, repoAt.Authority().String()) ··· 69 69 } 70 70 71 71 record := out.Value.Val.(*tangled.Repo) 72 + if record.RepoDid == nil || *record.RepoDid == "" { 73 + return nil, fmt.Errorf("repo record has no repo_did") 74 + } 72 75 knotURL := record.Knot 73 76 if !strings.Contains(record.Knot, "://") { 74 77 if host, _ := db.GetHost(ctx, x.db, record.Knot); host != nil { ··· 83 86 } 84 87 } 85 88 89 + rkey := repoAt.RecordKey().String() 90 + repoDid := syntax.DID(*record.RepoDid) 91 + go func() { 92 + bgCtx := context.Background() 93 + pending := &models.Repo{ 94 + Did: owner.DID, 95 + Rkey: repoAt.RecordKey(), 96 + Cid: (*syntax.CID)(out.Cid), 97 + Name: rkey, 98 + KnotDomain: knotURL, 99 + RepoDid: repoDid, 100 + State: models.RepoStatePending, 101 + } 102 + if upsertErr := db.UpsertRepo(bgCtx, x.db, pending); upsertErr != nil { 103 + x.logger.Error("failed to upsert repo after proxy resolution", "err", upsertErr) 104 + } 105 + }() 106 + 86 107 return &knotInfo{ 87 - baseURL: knotURL, 88 - didSlashRepo: fmt.Sprintf("%s/%s", owner.DID, record.Name), 108 + baseURL: knotURL, 109 + repoIdentifier: repoDid.String(), 89 110 }, nil 90 111 } 91 112 ··· 106 127 for k, v := range r.URL.Query() { 107 128 params[k] = v 108 129 } 109 - params.Set("repo", knot.didSlashRepo) 130 + params.Set("repo", knot.repoIdentifier) 110 131 111 132 target := fmt.Sprintf("%s/xrpc/%s?%s", knot.baseURL, knotNSID, params.Encode()) 112 133
+8 -1
knotmirror/xrpc/sync_request_crawl.go
··· 71 71 } 72 72 } 73 73 74 + if record.RepoDid == nil || *record.RepoDid == "" { 75 + l.Warn("dropping repo crawl request without repo_did", "did", owner.DID, "rkey", repoAt.RecordKey()) 76 + writeErr(w, fmt.Errorf("repo record missing repo_did")) 77 + return 78 + } 79 + 74 80 repo := &models.Repo{ 75 81 Did: owner.DID, 76 82 Rkey: repoAt.RecordKey(), 77 83 Cid: (*syntax.CID)(out.Cid), 78 - Name: record.Name, 84 + Name: repoAt.RecordKey().String(), 79 85 KnotDomain: knotUrl, 86 + RepoDid: syntax.DID(*record.RepoDid), 80 87 State: models.RepoStatePending, 81 88 ErrorMsg: "", 82 89 RetryAfter: 0,