Lewis: May this revision serve well! lewis@tangled.org
+451
-191
Diff
round #0
+25
cmd/knotmirror/main.go
+25
cmd/knotmirror/main.go
···
11
11
"github.com/urfave/cli/v3"
12
12
"tangled.org/core/knotmirror"
13
13
"tangled.org/core/knotmirror/config"
14
+
"tangled.org/core/knotmirror/db"
15
+
"tangled.org/core/knotmirror/migrate"
14
16
"tangled.org/core/log"
15
17
)
16
18
···
42
44
Action: runKnotMirror,
43
45
Flags: []cli.Flag{},
44
46
},
47
+
{
48
+
Name: "migrate-disk",
49
+
Usage: "rename mirror dirs from {did}/{rkey} -> {repo_did}; daemon must be stopped",
50
+
Action: runMigrateDisk,
51
+
Flags: []cli.Flag{},
52
+
},
45
53
}
46
54
return app.Run(ctx, args)
47
55
}
···
56
64
logger.Debug("config loaded:", "config", cfg)
57
65
return knotmirror.Run(ctx, cfg)
58
66
}
67
+
68
+
func runMigrateDisk(ctx context.Context, cmd *cli.Command) error {
69
+
logger := log.FromContext(ctx)
70
+
cfg, err := config.Load(ctx)
71
+
if err != nil {
72
+
return err
73
+
}
74
+
database, err := db.Make(ctx, cfg.DbUrl, 4)
75
+
if err != nil {
76
+
return err
77
+
}
78
+
defer database.Close()
79
+
80
+
stats, err := migrate.RenameDisk(ctx, cfg.GitRepoBasePath, database, logger)
81
+
logger.Info("migrate-disk complete", "stats", stats.String())
82
+
return err
83
+
}
+5
knotmirror/db/db.go
+5
knotmirror/db/db.go
···
7
7
"time"
8
8
9
9
_ "github.com/jackc/pgx/v5/stdlib"
10
+
"tangled.org/core/log"
10
11
)
11
12
12
13
func Make(ctx context.Context, dbUrl string, maxConns int) (*sql.DB, error) {
···
96
97
return nil, fmt.Errorf("initializing db schema: %w", err)
97
98
}
98
99
100
+
if err := RunMigrations(ctx, conn, log.FromContext(ctx), Migrations); err != nil {
101
+
return nil, fmt.Errorf("running migrations: %w", err)
102
+
}
103
+
99
104
return db, nil
100
105
}
+72
knotmirror/db/migrations.go
+72
knotmirror/db/migrations.go
···
1
+
package db
2
+
3
+
import (
4
+
"context"
5
+
"database/sql"
6
+
"fmt"
7
+
"log/slog"
8
+
)
9
+
10
+
type MigrationFn = func(context.Context, *sql.Tx) error
11
+
12
+
type Migration struct {
13
+
Name string
14
+
Fn MigrationFn
15
+
}
16
+
17
+
func ensureMigrationsTable(ctx context.Context, conn *sql.Conn) error {
18
+
_, err := conn.ExecContext(ctx, `
19
+
create table if not exists migrations (
20
+
name text primary key,
21
+
applied_at timestamptz not null default now()
22
+
);
23
+
`)
24
+
return err
25
+
}
26
+
27
+
func RunMigration(ctx context.Context, conn *sql.Conn, logger *slog.Logger, m Migration) error {
28
+
logger = logger.With("migration", m.Name)
29
+
30
+
tx, err := conn.BeginTx(ctx, nil)
31
+
if err != nil {
32
+
return fmt.Errorf("begin migration tx: %w", err)
33
+
}
34
+
defer tx.Rollback()
35
+
36
+
var exists bool
37
+
if err := tx.QueryRowContext(ctx, `select exists (select 1 from migrations where name = $1)`, m.Name).Scan(&exists); err != nil {
38
+
return fmt.Errorf("checking migration state: %w", err)
39
+
}
40
+
if exists {
41
+
logger.Debug("migration already applied")
42
+
return nil
43
+
}
44
+
45
+
if err := m.Fn(ctx, tx); err != nil {
46
+
logger.Error("migration failed", "err", err)
47
+
return fmt.Errorf("running migration %s: %w", m.Name, err)
48
+
}
49
+
50
+
if _, err := tx.ExecContext(ctx, `insert into migrations (name) values ($1)`, m.Name); err != nil {
51
+
return fmt.Errorf("recording migration: %w", err)
52
+
}
53
+
54
+
if err := tx.Commit(); err != nil {
55
+
return fmt.Errorf("commit migration: %w", err)
56
+
}
57
+
58
+
logger.Info("migration applied")
59
+
return nil
60
+
}
61
+
62
+
func RunMigrations(ctx context.Context, conn *sql.Conn, logger *slog.Logger, ms []Migration) error {
63
+
if err := ensureMigrationsTable(ctx, conn); err != nil {
64
+
return fmt.Errorf("ensuring migrations table: %w", err)
65
+
}
66
+
for _, m := range ms {
67
+
if err := RunMigration(ctx, conn, logger, m); err != nil {
68
+
return err
69
+
}
70
+
}
71
+
return nil
72
+
}
+56
knotmirror/db/migrations_list.go
+56
knotmirror/db/migrations_list.go
···
1
+
package db
2
+
3
+
import (
4
+
"context"
5
+
"database/sql"
6
+
"fmt"
7
+
8
+
"tangled.org/core/log"
9
+
)
10
+
11
+
var Migrations = []Migration{
12
+
{
13
+
Name: "repos_pk_to_repo_did",
14
+
Fn: reposPkToRepoDid,
15
+
},
16
+
}
17
+
18
+
func reposPkToRepoDid(ctx context.Context, tx *sql.Tx) error {
19
+
if _, err := tx.ExecContext(ctx,
20
+
`alter table repos add column if not exists repo_did text`,
21
+
); err != nil {
22
+
return fmt.Errorf("adding repo_did column: %w", err)
23
+
}
24
+
var bad int
25
+
if err := tx.QueryRowContext(ctx,
26
+
`select count(*) from repos where repo_did is null or repo_did = ''`,
27
+
).Scan(&bad); err != nil {
28
+
return fmt.Errorf("counting rows with null repo_did: %w", err)
29
+
}
30
+
if bad > 0 {
31
+
log.FromContext(ctx).Warn(
32
+
"dropping repos with null repo_did; their on-disk dirs will be orphaned. re-crawl via tap to restore",
33
+
"count", bad,
34
+
)
35
+
if _, err := tx.ExecContext(ctx,
36
+
`delete from repos where repo_did is null or repo_did = ''`,
37
+
); err != nil {
38
+
return fmt.Errorf("deleting null repo_did rows: %w", err)
39
+
}
40
+
}
41
+
return execAll(ctx, tx,
42
+
`alter table repos alter column repo_did set not null`,
43
+
`alter table repos drop constraint if exists repos_pkey`,
44
+
`alter table repos add constraint repos_pkey primary key (repo_did)`,
45
+
)
46
+
}
47
+
48
+
func execAll(ctx context.Context, tx *sql.Tx, stmts ...string) error {
49
+
if len(stmts) == 0 {
50
+
return nil
51
+
}
52
+
if _, err := tx.ExecContext(ctx, stmts[0]); err != nil {
53
+
return err
54
+
}
55
+
return execAll(ctx, tx, stmts[1:]...)
56
+
}
+54
-93
knotmirror/db/repos.go
+54
-93
knotmirror/db/repos.go
···
11
11
"tangled.org/core/knotmirror/models"
12
12
)
13
13
14
-
func AddRepo(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, cid syntax.CID, name, knot string) error {
15
-
if _, err := e.ExecContext(ctx,
16
-
`insert into repos (did, rkey, cid, name, knot_domain)
17
-
values ($1, $2, $3, $4, $5)`,
18
-
did, rkey, cid, name, knot,
19
-
); err != nil {
20
-
return fmt.Errorf("inserting repo: %w", err)
21
-
}
22
-
return nil
23
-
}
24
-
25
14
func UpsertRepo(ctx context.Context, e *sql.DB, repo *models.Repo) error {
15
+
if repo.RepoDid == "" {
16
+
return fmt.Errorf("upsert repo: repo_did is required")
17
+
}
26
18
if _, err := e.ExecContext(ctx,
27
-
`insert into repos (did, rkey, cid, name, knot_domain, git_rev, repo_sha, state, error_msg, retry_count, retry_after)
28
-
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
29
-
on conflict(did, rkey) do update set
19
+
`insert into repos (did, rkey, cid, name, knot_domain, repo_did, git_rev, repo_sha, state, error_msg, retry_count, retry_after)
20
+
values ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
21
+
on conflict(repo_did) do update set
22
+
did = excluded.did,
23
+
rkey = excluded.rkey,
30
24
cid = excluded.cid,
31
25
name = excluded.name,
32
26
knot_domain = excluded.knot_domain,
···
36
30
error_msg = excluded.error_msg,
37
31
retry_count = excluded.retry_count,
38
32
retry_after = excluded.retry_after`,
39
-
// where repos.cid != excluded.cid`,
40
33
repo.Did,
41
34
repo.Rkey,
42
35
repo.Cid,
43
36
repo.Name,
44
37
repo.KnotDomain,
38
+
repo.RepoDid,
45
39
repo.GitRev,
46
40
repo.RepoSha,
47
41
repo.State,
···
54
48
return nil
55
49
}
56
50
57
-
func UpdateRepoState(ctx context.Context, e *sql.DB, did syntax.DID, rkey syntax.RecordKey, state models.RepoState) error {
51
+
func UpdateRepoState(ctx context.Context, e *sql.DB, repoDid syntax.DID, state models.RepoState) error {
58
52
if _, err := e.ExecContext(ctx,
59
53
`update repos
60
54
set state = $1
61
-
where did = $2 and rkey = $3`,
55
+
where repo_did = $2`,
62
56
state,
63
-
did, rkey,
57
+
repoDid,
64
58
); err != nil {
65
59
return fmt.Errorf("updating repo: %w", err)
66
60
}
···
78
72
return nil
79
73
}
80
74
81
-
func GetRepoByName(ctx context.Context, e *sql.DB, did syntax.DID, name string) (*models.Repo, error) {
75
+
const repoColumns = `
76
+
did,
77
+
rkey,
78
+
cid,
79
+
name,
80
+
knot_domain,
81
+
repo_did,
82
+
git_rev,
83
+
repo_sha,
84
+
state,
85
+
error_msg,
86
+
retry_count,
87
+
retry_after`
88
+
89
+
func scanRepo(row interface{ Scan(...any) error }) (*models.Repo, error) {
82
90
var repo models.Repo
83
-
if err := e.QueryRowContext(ctx,
84
-
`select
85
-
did,
86
-
rkey,
87
-
cid,
88
-
name,
89
-
knot_domain,
90
-
git_rev,
91
-
repo_sha,
92
-
state,
93
-
error_msg,
94
-
retry_count,
95
-
retry_after
96
-
from repos
97
-
where did = $1 and name = $2`,
98
-
did,
99
-
name,
100
-
).Scan(
91
+
if err := row.Scan(
101
92
&repo.Did,
102
93
&repo.Rkey,
103
94
&repo.Cid,
104
95
&repo.Name,
105
96
&repo.KnotDomain,
97
+
&repo.RepoDid,
106
98
&repo.GitRev,
107
99
&repo.RepoSha,
108
100
&repo.State,
···
110
102
&repo.RetryCount,
111
103
&repo.RetryAfter,
112
104
); err != nil {
105
+
return nil, err
106
+
}
107
+
return &repo, nil
108
+
}
109
+
110
+
func GetRepoByRepoDid(ctx context.Context, e *sql.DB, repoDid syntax.DID) (*models.Repo, error) {
111
+
row := e.QueryRowContext(ctx,
112
+
`select`+repoColumns+`
113
+
from repos
114
+
where repo_did = $1`,
115
+
repoDid,
116
+
)
117
+
repo, err := scanRepo(row)
118
+
if err != nil {
113
119
if errors.Is(err, sql.ErrNoRows) {
114
120
return nil, nil
115
121
}
116
122
return nil, fmt.Errorf("querying repo: %w", err)
117
123
}
118
-
return &repo, nil
124
+
return repo, nil
119
125
}
120
126
121
127
func GetRepoByAtUri(ctx context.Context, e *sql.DB, aturi syntax.ATURI) (*models.Repo, error) {
122
-
var repo models.Repo
123
-
if err := e.QueryRowContext(ctx,
124
-
`select
125
-
did,
126
-
rkey,
127
-
cid,
128
-
name,
129
-
knot_domain,
130
-
git_rev,
131
-
repo_sha,
132
-
state,
133
-
error_msg,
134
-
retry_count,
135
-
retry_after
128
+
row := e.QueryRowContext(ctx,
129
+
`select`+repoColumns+`
136
130
from repos
137
131
where at_uri = $1`,
138
132
aturi,
139
-
).Scan(
140
-
&repo.Did,
141
-
&repo.Rkey,
142
-
&repo.Cid,
143
-
&repo.Name,
144
-
&repo.KnotDomain,
145
-
&repo.GitRev,
146
-
&repo.RepoSha,
147
-
&repo.State,
148
-
&repo.ErrorMsg,
149
-
&repo.RetryCount,
150
-
&repo.RetryAfter,
151
-
); err != nil {
133
+
)
134
+
repo, err := scanRepo(row)
135
+
if err != nil {
152
136
if errors.Is(err, sql.ErrNoRows) {
153
137
return nil, nil
154
138
}
155
139
return nil, fmt.Errorf("querying repo: %w", err)
156
140
}
157
-
return &repo, nil
141
+
return repo, nil
158
142
}
159
143
160
144
func ListRepos(ctx context.Context, e *sql.DB, page pagination.Page, did, knot, state string) ([]models.Repo, error) {
···
188
172
}
189
173
190
174
query := `
191
-
select
192
-
did,
193
-
rkey,
194
-
cid,
195
-
name,
196
-
knot_domain,
197
-
git_rev,
198
-
repo_sha,
199
-
state,
200
-
error_msg,
201
-
retry_count,
202
-
retry_after
175
+
select` + repoColumns + `
203
176
from repos
204
177
` + whereClause + pageClause
205
178
rows, err := e.QueryContext(ctx, query, args...)
···
210
183
211
184
var repos []models.Repo
212
185
for rows.Next() {
213
-
var repo models.Repo
214
-
if err := rows.Scan(
215
-
&repo.Did,
216
-
&repo.Rkey,
217
-
&repo.Cid,
218
-
&repo.Name,
219
-
&repo.KnotDomain,
220
-
&repo.GitRev,
221
-
&repo.RepoSha,
222
-
&repo.State,
223
-
&repo.ErrorMsg,
224
-
&repo.RetryCount,
225
-
&repo.RetryAfter,
226
-
); err != nil {
186
+
repo, err := scanRepo(rows)
187
+
if err != nil {
227
188
return nil, fmt.Errorf("scanning row: %w", err)
228
189
}
229
-
repos = append(repos, repo)
190
+
repos = append(repos, *repo)
230
191
}
231
192
if err := rows.Err(); err != nil {
232
193
return nil, fmt.Errorf("scanning rows: %w ", err)
+17
-37
knotmirror/git.go
+17
-37
knotmirror/git.go
···
19
19
20
20
type GitMirrorManager interface {
21
21
Exist(repo *models.Repo) (bool, error)
22
-
// RemoteSetUrl updates git repository 'origin' remote
23
-
RemoteSetUrl(ctx context.Context, repo *models.Repo) error
24
22
// Clone clones the repository as a mirror
25
23
Clone(ctx context.Context, repo *models.Repo) error
26
24
// Fetch fetches the repository
···
44
42
var _ GitMirrorManager = new(CliGitMirrorManager)
45
43
46
44
func (c *CliGitMirrorManager) makeRepoPath(repo *models.Repo) string {
47
-
return filepath.Join(c.repoBasePath, repo.Did.String(), repo.Rkey.String())
45
+
return filepath.Join(c.repoBasePath, repo.RepoDid.String())
48
46
}
49
47
50
48
func (c *CliGitMirrorManager) Exist(repo *models.Repo) (bool, error) {
51
49
return isDir(c.makeRepoPath(repo))
52
50
}
53
51
54
-
func (c *CliGitMirrorManager) RemoteSetUrl(ctx context.Context, repo *models.Repo) error {
55
-
path := c.makeRepoPath(repo)
56
-
url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL)
57
-
if err != nil {
58
-
return fmt.Errorf("constructing repo remote url: %w", err)
59
-
}
60
-
cmd := exec.CommandContext(ctx, "git", "-C", path, "remote", "set-url", "origin", url)
61
-
if out, err := cmd.CombinedOutput(); err != nil {
62
-
if ctx.Err() != nil {
63
-
return ctx.Err()
64
-
}
65
-
msg := string(out)
66
-
return fmt.Errorf("running 'git remote set-url origin %s': %w\n%s", url, err, msg)
67
-
}
68
-
return nil
69
-
}
70
-
71
52
func (c *CliGitMirrorManager) Clone(ctx context.Context, repo *models.Repo) error {
72
53
path := c.makeRepoPath(repo)
73
-
url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL)
54
+
url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.RepoIdentifier(), c.knotUseSSL)
74
55
if err != nil {
75
56
return fmt.Errorf("constructing repo remote url: %w", err)
76
57
}
···
94
75
95
76
func (c *CliGitMirrorManager) Fetch(ctx context.Context, repo *models.Repo) error {
96
77
path := c.makeRepoPath(repo)
97
-
return c.fetch(ctx, path)
78
+
url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.RepoIdentifier(), c.knotUseSSL)
79
+
if err != nil {
80
+
return fmt.Errorf("constructing repo remote url: %w", err)
81
+
}
82
+
return c.fetch(ctx, path, url)
98
83
}
99
84
100
-
func (c *CliGitMirrorManager) fetch(ctx context.Context, path string) error {
101
-
// TODO: use `repo.Knot` instead of depending on origin
102
-
cmd := exec.CommandContext(ctx, "git", "-C", path, "fetch", "--prune", "origin")
85
+
func (c *CliGitMirrorManager) fetch(ctx context.Context, path, url string) error {
86
+
cmd := exec.CommandContext(ctx, "git", "-C", path, "fetch", "--prune", url, "+refs/*:refs/*")
103
87
if out, err := cmd.CombinedOutput(); err != nil {
104
88
if ctx.Err() != nil {
105
89
return ctx.Err()
···
111
95
112
96
func (c *CliGitMirrorManager) Sync(ctx context.Context, repo *models.Repo) error {
113
97
path := c.makeRepoPath(repo)
114
-
url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL)
98
+
url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.RepoIdentifier(), c.knotUseSSL)
115
99
if err != nil {
116
100
return fmt.Errorf("constructing repo remote url: %w", err)
117
101
}
···
125
109
return fmt.Errorf("cloning repo: %w", err)
126
110
}
127
111
} else {
128
-
if err := c.fetch(ctx, path); err != nil {
112
+
if err := c.fetch(ctx, path, url); err != nil {
129
113
return fmt.Errorf("fetching repo: %w", err)
130
114
}
131
115
}
···
191
175
var _ GitMirrorManager = new(GoGitMirrorManager)
192
176
193
177
func (c *GoGitMirrorManager) makeRepoPath(repo *models.Repo) string {
194
-
return filepath.Join(c.repoBasePath, repo.Did.String(), repo.Rkey.String())
178
+
return filepath.Join(c.repoBasePath, repo.RepoDid.String())
195
179
}
196
180
197
181
func (c *GoGitMirrorManager) Exist(repo *models.Repo) (bool, error) {
198
182
return isDir(c.makeRepoPath(repo))
199
183
}
200
184
201
-
func (c *GoGitMirrorManager) RemoteSetUrl(ctx context.Context, repo *models.Repo) error {
202
-
panic("unimplemented")
203
-
}
204
-
205
185
func (c *GoGitMirrorManager) Clone(ctx context.Context, repo *models.Repo) error {
206
186
path := c.makeRepoPath(repo)
207
-
url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL)
187
+
url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.RepoIdentifier(), c.knotUseSSL)
208
188
if err != nil {
209
189
return fmt.Errorf("constructing repo remote url: %w", err)
210
190
}
···
224
204
225
205
func (c *GoGitMirrorManager) Fetch(ctx context.Context, repo *models.Repo) error {
226
206
path := c.makeRepoPath(repo)
227
-
url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL)
207
+
url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.RepoIdentifier(), c.knotUseSSL)
228
208
if err != nil {
229
209
return fmt.Errorf("constructing repo remote url: %w", err)
230
210
}
···
250
230
251
231
func (c *GoGitMirrorManager) Sync(ctx context.Context, repo *models.Repo) error {
252
232
path := c.makeRepoPath(repo)
253
-
url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), c.knotUseSSL)
233
+
url, err := makeRepoRemoteUrl(repo.KnotDomain, repo.RepoIdentifier(), c.knotUseSSL)
254
234
if err != nil {
255
235
return fmt.Errorf("constructing repo remote url: %w", err)
256
236
}
···
271
251
return nil
272
252
}
273
253
274
-
func makeRepoRemoteUrl(knot, didSlashRepo string, knotUseSSL bool) (string, error) {
254
+
func makeRepoRemoteUrl(knot, repoIdentifier string, knotUseSSL bool) (string, error) {
275
255
if !strings.Contains(knot, "://") {
276
256
if knotUseSSL {
277
257
knot = "https://" + knot
···
289
269
return "", fmt.Errorf("unsupported scheme: %s", u.Scheme)
290
270
}
291
271
292
-
u = u.JoinPath(didSlashRepo)
272
+
u = u.JoinPath(repoIdentifier)
293
273
return u.String(), nil
294
274
}
295
275
+30
-27
knotmirror/knotstream/slurper.go
+30
-27
knotmirror/knotstream/slurper.go
···
15
15
"github.com/bluesky-social/indigo/util/ssrf"
16
16
"github.com/carlmjohnson/versioninfo"
17
17
"github.com/gorilla/websocket"
18
-
"tangled.org/core/api/tangled"
19
18
"tangled.org/core/knotmirror/config"
20
19
"tangled.org/core/knotmirror/db"
21
20
"tangled.org/core/knotmirror/models"
···
262
261
}
263
262
}
264
263
264
+
type legacyGitRefUpdate struct {
265
+
OwnerDid *string `json:"ownerDid,omitempty"`
266
+
RepoDid *string `json:"repoDid,omitempty"`
267
+
RepoName string `json:"repoName,omitempty"`
268
+
}
269
+
265
270
type LegacyGitEvent struct {
266
271
Rkey string
267
272
Nsid string
268
-
Event tangled.GitRefUpdate
273
+
Event legacyGitRefUpdate
269
274
}
270
275
271
276
func (s *KnotSlurper) ProcessEvent(ctx context.Context, task *Task) error {
···
280
285
return nil
281
286
}
282
287
288
+
// lookupRepoForRefUpdate resolves the local repo row for an incoming refUpdate
289
+
// via the stable RepoDid join. Returns (nil, "", nil) when the event has no
290
+
// repoDid (unjoinable) and (nil, key, nil) on a clean miss.
291
+
func (s *KnotSlurper) lookupRepoForRefUpdate(ctx context.Context, evt *LegacyGitEvent) (*models.Repo, string, error) {
292
+
if evt.Event.RepoDid == nil || *evt.Event.RepoDid == "" {
293
+
return nil, "", nil
294
+
}
295
+
repoDid := syntax.DID(*evt.Event.RepoDid)
296
+
curr, err := db.GetRepoByRepoDid(ctx, s.db, repoDid)
297
+
return curr, repoDid.String(), err
298
+
}
299
+
283
300
func (s *KnotSlurper) ProcessLegacyGitRefUpdate(ctx context.Context, source string, evt *LegacyGitEvent) error {
284
301
knotstreamEventsReceived.Inc()
285
302
286
303
l := s.logger.With("src", source)
287
304
288
-
ownerDid := ""
289
-
if evt.Event.OwnerDid != nil {
290
-
ownerDid = *evt.Event.OwnerDid
291
-
} else {
292
-
// handle legacy event
293
-
if evt.Event.RepoDid != nil {
294
-
ownerDid = *evt.Event.RepoDid
295
-
}
296
-
}
297
-
curr, err := db.GetRepoByName(ctx, s.db, syntax.DID(ownerDid), evt.Event.RepoName)
305
+
curr, lookupKey, err := s.lookupRepoForRefUpdate(ctx, evt)
298
306
if err != nil {
299
-
return fmt.Errorf("failed to get repo '%s': %w", ownerDid+"/"+evt.Event.RepoName, err)
307
+
return fmt.Errorf("failed to get repo '%s': %w", lookupKey, err)
300
308
}
301
309
if curr == nil {
302
-
// if repo doesn't exist in DB, just ignore the event. That repo is unknown.
303
-
//
304
-
// Normally did+name is already enough to perform git-fetch as that's
305
-
// what needed to fetch the repository.
306
-
// But we want to store that in did/rkey in knot-mirror.
307
-
// Therefore, we should ignore when the repository is unknown.
308
-
// Hopefully crawler will sync it later.
309
-
l.Warn("skipping event from unknown repo", "did/name", ownerDid+"/"+evt.Event.RepoName)
310
+
if lookupKey == "" {
311
+
l.Warn("skipping gitRefUpdate: event has no fields to join on",
312
+
"repo_did", evt.Event.RepoDid)
313
+
} else {
314
+
// if repo doesn't exist in DB, just ignore the event. That repo is unknown.
315
+
// Hopefully crawler/tap will sync it later.
316
+
l.Warn("skipping event from unknown repo", "key", lookupKey)
317
+
}
310
318
knotstreamEventsSkipped.Inc()
311
319
return nil
312
320
}
···
325
333
return nil
326
334
}
327
335
328
-
// if curr.State == models.RepoStateResyncing {
329
-
// firehoseEventsSkipped.Inc()
330
-
// return fp.events.addToResyncBuffer(ctx, commit)
331
-
// }
332
-
333
336
// can't skip anything, update repo state
334
-
if err := db.UpdateRepoState(ctx, s.db, curr.Did, curr.Rkey, models.RepoStateDesynchronized); err != nil {
337
+
if err := db.UpdateRepoState(ctx, s.db, curr.RepoDid, models.RepoStateDesynchronized); err != nil {
335
338
return err
336
339
}
337
340
+135
knotmirror/migrate/migrate.go
+135
knotmirror/migrate/migrate.go
···
1
+
package migrate
2
+
3
+
import (
4
+
"context"
5
+
"database/sql"
6
+
"errors"
7
+
"fmt"
8
+
"log/slog"
9
+
"os"
10
+
"path/filepath"
11
+
"strings"
12
+
13
+
"github.com/bluesky-social/indigo/atproto/syntax"
14
+
"tangled.org/core/api/tangled"
15
+
"tangled.org/core/knotmirror/db"
16
+
)
17
+
18
+
type Stats struct {
19
+
Renamed int
20
+
Skipped int
21
+
Orphaned int
22
+
OwnerDirsRm int
23
+
AlreadyExists int
24
+
}
25
+
26
+
func (s Stats) String() string {
27
+
return fmt.Sprintf(
28
+
"renamed=%d skipped=%d orphaned=%d owner_dirs_removed=%d target_existed=%d",
29
+
s.Renamed, s.Skipped, s.Orphaned, s.OwnerDirsRm, s.AlreadyExists,
30
+
)
31
+
}
32
+
33
+
func RenameDisk(ctx context.Context, base string, database *sql.DB, logger *slog.Logger) (Stats, error) {
34
+
entries, err := os.ReadDir(base)
35
+
if err != nil {
36
+
return Stats{}, fmt.Errorf("reading base path: %w", err)
37
+
}
38
+
return reduceEntries(ctx, entries, 0, Stats{}, ownerStep(base, database, logger))
39
+
}
40
+
41
+
type stepFn func(context.Context, os.DirEntry, Stats) (Stats, error)
42
+
43
+
func reduceEntries(ctx context.Context, entries []os.DirEntry, idx int, acc Stats, fn stepFn) (Stats, error) {
44
+
if idx >= len(entries) {
45
+
return acc, nil
46
+
}
47
+
if err := ctx.Err(); err != nil {
48
+
return acc, err
49
+
}
50
+
next, err := fn(ctx, entries[idx], acc)
51
+
if err != nil {
52
+
return next, err
53
+
}
54
+
return reduceEntries(ctx, entries, idx+1, next, fn)
55
+
}
56
+
57
+
func ownerStep(base string, database *sql.DB, logger *slog.Logger) stepFn {
58
+
return func(ctx context.Context, entry os.DirEntry, acc Stats) (Stats, error) {
59
+
if !entry.IsDir() || !strings.HasPrefix(entry.Name(), "did:") {
60
+
return acc, nil
61
+
}
62
+
ownerPath := filepath.Join(base, entry.Name())
63
+
if _, err := os.Stat(filepath.Join(ownerPath, "HEAD")); err == nil {
64
+
return acc, nil
65
+
}
66
+
subEntries, err := os.ReadDir(ownerPath)
67
+
if err != nil {
68
+
logger.Error("reading owner dir", "ownerPath", ownerPath, "err", err)
69
+
return acc, nil
70
+
}
71
+
next, err := reduceEntries(ctx, subEntries, 0, acc, rkeyStep(base, database, logger, syntax.DID(entry.Name()), ownerPath))
72
+
if err != nil {
73
+
return next, err
74
+
}
75
+
remaining, err := os.ReadDir(ownerPath)
76
+
if err == nil && len(remaining) == 0 {
77
+
if rmErr := os.Remove(ownerPath); rmErr == nil {
78
+
next.OwnerDirsRm++
79
+
logger.Info("removed empty owner dir", "ownerPath", ownerPath)
80
+
} else {
81
+
logger.Warn("failed to remove empty owner dir", "ownerPath", ownerPath, "err", rmErr)
82
+
}
83
+
}
84
+
return next, nil
85
+
}
86
+
}
87
+
88
+
func rkeyStep(base string, database *sql.DB, logger *slog.Logger, ownerDid syntax.DID, ownerPath string) stepFn {
89
+
return func(ctx context.Context, sub os.DirEntry, acc Stats) (Stats, error) {
90
+
if !sub.IsDir() {
91
+
return acc, nil
92
+
}
93
+
rkey := sub.Name()
94
+
subPath := filepath.Join(ownerPath, rkey)
95
+
l := logger.With("did", ownerDid, "rkey", rkey, "subPath", subPath)
96
+
97
+
if _, err := os.Stat(filepath.Join(subPath, "HEAD")); err != nil {
98
+
l.Warn("skipping non-repo subdir")
99
+
acc.Skipped++
100
+
return acc, nil
101
+
}
102
+
103
+
aturi := syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", ownerDid, tangled.RepoNSID, rkey))
104
+
repo, err := db.GetRepoByAtUri(ctx, database, aturi)
105
+
if err != nil {
106
+
return acc, fmt.Errorf("looking up repo by aturi %s: %w", aturi, err)
107
+
}
108
+
if repo == nil {
109
+
l.Warn("orphan disk repo, no DB row; leaving in place")
110
+
acc.Orphaned++
111
+
return acc, nil
112
+
}
113
+
if repo.RepoDid == "" {
114
+
l.Warn("DB row has empty repo_did; leaving in place")
115
+
acc.Orphaned++
116
+
return acc, nil
117
+
}
118
+
119
+
target := filepath.Join(base, repo.RepoDid.String())
120
+
if _, err := os.Stat(target); err == nil {
121
+
l.Warn("target path already exists; leaving source in place", "target", target)
122
+
acc.AlreadyExists++
123
+
return acc, nil
124
+
} else if !errors.Is(err, os.ErrNotExist) {
125
+
return acc, fmt.Errorf("stat target %s: %w", target, err)
126
+
}
127
+
128
+
if err := os.Rename(subPath, target); err != nil {
129
+
return acc, fmt.Errorf("rename %s -> %s: %w", subPath, target, err)
130
+
}
131
+
acc.Renamed++
132
+
l.Info("renamed", "target", target)
133
+
return acc, nil
134
+
}
135
+
}
+3
-2
knotmirror/models/models.go
+3
-2
knotmirror/models/models.go
···
14
14
// content of tangled.Repo
15
15
Name string
16
16
KnotDomain string
17
+
RepoDid syntax.DID
17
18
18
19
GitRev syntax.TID // last processed git.refUpdate revision
19
20
RepoSha string // sha256 sum of git refs (to avoid no-op git fetch)
···
27
28
return syntax.ATURI(fmt.Sprintf("at://%s/%s/%s", r.Did, tangled.RepoNSID, r.Rkey))
28
29
}
29
30
30
-
func (r *Repo) DidSlashRepo() string {
31
-
return fmt.Sprintf("%s/%s", r.Did, r.Name)
31
+
func (r *Repo) RepoIdentifier() string {
32
+
return r.RepoDid.String()
32
33
}
33
34
34
35
type RepoState string
+1
-1
knotmirror/resyncer.go
+1
-1
knotmirror/resyncer.go
···
278
278
279
279
// checkKnotReachability checks if Knot is reachable and is valid git remote server
280
280
func (r *Resyncer) checkKnotReachability(ctx context.Context, repo *models.Repo) error {
281
-
repoUrl, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), r.cfg.KnotUseSSL)
281
+
repoUrl, err := makeRepoRemoteUrl(repo.KnotDomain, repo.RepoIdentifier(), r.cfg.KnotUseSSL)
282
282
if err != nil {
283
283
return err
284
284
}
+7
-15
knotmirror/tapclient.go
+7
-15
knotmirror/tapclient.go
···
11
11
"strings"
12
12
"time"
13
13
14
+
"github.com/bluesky-social/indigo/atproto/syntax"
14
15
"tangled.org/core/api/tangled"
15
16
"tangled.org/core/knotmirror/config"
16
17
"tangled.org/core/knotmirror/db"
···
104
105
errMsg = "suspending non-public knot"
105
106
}
106
107
108
+
if record.RepoDid == nil || *record.RepoDid == "" {
109
+
t.logger.Warn("dropping repo record without repo_did", "did", evt.Did, "rkey", evt.Rkey)
110
+
return nil
111
+
}
107
112
repo := &models.Repo{
108
113
Did: evt.Did,
109
114
Rkey: evt.Rkey,
110
115
Cid: evt.CID,
111
-
Name: record.Name,
116
+
Name: evt.Rkey.String(),
112
117
KnotDomain: knotUrl,
118
+
RepoDid: syntax.DID(*record.RepoDid),
113
119
State: status,
114
120
ErrorMsg: errMsg,
115
121
RetryAfter: 0, // clear retry info
116
122
RetryCount: 0,
117
123
}
118
124
119
-
if evt.Action == tapc.RecordUpdateAction {
120
-
exist, err := t.gitm.Exist(repo)
121
-
if err != nil {
122
-
return fmt.Errorf("checking git repo existence: %w", err)
123
-
}
124
-
if exist {
125
-
// update git repo remote url
126
-
if err := t.gitm.RemoteSetUrl(ctx, repo); err != nil {
127
-
return fmt.Errorf("updating git repo remote url: %w", err)
128
-
}
129
-
}
130
-
}
131
-
132
-
t.logger.Debug("tap: upserting repo with knot", "knot", repo.KnotDomain)
133
125
if err := db.UpsertRepo(ctx, t.db, repo); err != nil {
134
126
return fmt.Errorf("upserting repo to db: %w", err)
135
127
}
+10
-8
knotmirror/xrpc/git_list_branches.go
+10
-8
knotmirror/xrpc/git_list_branches.go
···
9
9
10
10
"github.com/bluesky-social/indigo/atproto/atclient"
11
11
"github.com/bluesky-social/indigo/atproto/syntax"
12
+
"tangled.org/core/knotmirror/db"
12
13
"tangled.org/core/knotserver/git"
13
14
"tangled.org/core/types"
14
15
)
···
82
83
}
83
84
84
85
func (x *Xrpc) makeRepoPath(ctx context.Context, repo syntax.ATURI) (string, error) {
85
-
id, err := x.resolver.ResolveIdent(ctx, repo.Authority().String())
86
+
r, err := db.GetRepoByAtUri(ctx, x.db, repo)
86
87
if err != nil {
87
-
return "", err
88
+
return "", fmt.Errorf("looking up repo: %w", err)
88
89
}
89
-
90
-
return filepath.Join(
91
-
x.cfg.GitRepoBasePath,
92
-
id.DID.String(),
93
-
repo.RecordKey().String(),
94
-
), nil
90
+
if r == nil {
91
+
return "", fmt.Errorf("repo not found: %s", repo)
92
+
}
93
+
if r.RepoDid == "" {
94
+
return "", fmt.Errorf("repo missing repo_did: %s", repo)
95
+
}
96
+
return filepath.Join(x.cfg.GitRepoBasePath, r.RepoDid.String()), nil
95
97
}
+28
-7
knotmirror/xrpc/proxy.go
+28
-7
knotmirror/xrpc/proxy.go
···
40
40
}
41
41
42
42
type knotInfo struct {
43
-
baseURL string
44
-
didSlashRepo string
43
+
baseURL string
44
+
repoIdentifier string
45
45
}
46
46
47
47
func (x *Xrpc) resolveKnot(ctx context.Context, repoAt syntax.ATURI) (*knotInfo, error) {
···
49
49
if err == nil && repo != nil {
50
50
if repo.State != models.RepoStatePending && repo.State != models.RepoStateResyncing {
51
51
go func() {
52
-
if err := db.UpdateRepoState(context.Background(), x.db, repo.Did, repo.Rkey, models.RepoStatePending); err != nil {
52
+
if err := db.UpdateRepoState(context.Background(), x.db, repo.RepoDid, models.RepoStatePending); err != nil {
53
53
x.logger.Error("failed to mark repo for resync after proxy", "err", err)
54
54
}
55
55
}()
56
56
}
57
-
return &knotInfo{baseURL: repo.KnotDomain, didSlashRepo: repo.DidSlashRepo()}, nil
57
+
return &knotInfo{baseURL: repo.KnotDomain, repoIdentifier: repo.RepoIdentifier()}, nil
58
58
}
59
59
60
60
owner, err := x.resolver.ResolveIdent(ctx, repoAt.Authority().String())
···
69
69
}
70
70
71
71
record := out.Value.Val.(*tangled.Repo)
72
+
if record.RepoDid == nil || *record.RepoDid == "" {
73
+
return nil, fmt.Errorf("repo record has no repo_did")
74
+
}
72
75
knotURL := record.Knot
73
76
if !strings.Contains(record.Knot, "://") {
74
77
if host, _ := db.GetHost(ctx, x.db, record.Knot); host != nil {
···
83
86
}
84
87
}
85
88
89
+
rkey := repoAt.RecordKey().String()
90
+
repoDid := syntax.DID(*record.RepoDid)
91
+
go func() {
92
+
bgCtx := context.Background()
93
+
pending := &models.Repo{
94
+
Did: owner.DID,
95
+
Rkey: repoAt.RecordKey(),
96
+
Cid: (*syntax.CID)(out.Cid),
97
+
Name: rkey,
98
+
KnotDomain: knotURL,
99
+
RepoDid: repoDid,
100
+
State: models.RepoStatePending,
101
+
}
102
+
if upsertErr := db.UpsertRepo(bgCtx, x.db, pending); upsertErr != nil {
103
+
x.logger.Error("failed to upsert repo after proxy resolution", "err", upsertErr)
104
+
}
105
+
}()
106
+
86
107
return &knotInfo{
87
-
baseURL: knotURL,
88
-
didSlashRepo: fmt.Sprintf("%s/%s", owner.DID, record.Name),
108
+
baseURL: knotURL,
109
+
repoIdentifier: repoDid.String(),
89
110
}, nil
90
111
}
91
112
···
106
127
for k, v := range r.URL.Query() {
107
128
params[k] = v
108
129
}
109
-
params.Set("repo", knot.didSlashRepo)
130
+
params.Set("repo", knot.repoIdentifier)
110
131
111
132
target := fmt.Sprintf("%s/xrpc/%s?%s", knot.baseURL, knotNSID, params.Encode())
112
133
+8
-1
knotmirror/xrpc/sync_request_crawl.go
+8
-1
knotmirror/xrpc/sync_request_crawl.go
···
71
71
}
72
72
}
73
73
74
+
if record.RepoDid == nil || *record.RepoDid == "" {
75
+
l.Warn("dropping repo crawl request without repo_did", "did", owner.DID, "rkey", repoAt.RecordKey())
76
+
writeErr(w, fmt.Errorf("repo record missing repo_did"))
77
+
return
78
+
}
79
+
74
80
repo := &models.Repo{
75
81
Did: owner.DID,
76
82
Rkey: repoAt.RecordKey(),
77
83
Cid: (*syntax.CID)(out.Cid),
78
-
Name: record.Name,
84
+
Name: repoAt.RecordKey().String(),
79
85
KnotDomain: knotUrl,
86
+
RepoDid: syntax.DID(*record.RepoDid),
80
87
State: models.RepoStatePending,
81
88
ErrorMsg: "",
82
89
RetryAfter: 0,
History
1 round
0 comments
oyster.cafe
submitted
#0
1 commit
expand
collapse
knotmirror: key repos by repo_did + disk migration
Lewis: May this revision serve well! <lewis@tangled.org>
merge conflicts detected
expand
collapse
expand
collapse
- knotmirror/xrpc/proxy.go:49