forked from
tangled.org/core
Monorepo for Tangled
1package knotmirror
2
3import (
4 "context"
5 "database/sql"
6 "errors"
7 "fmt"
8 "log/slog"
9 "math/rand"
10 "net/http"
11 "net/url"
12 "strings"
13 "sync"
14 "time"
15
16 "github.com/bluesky-social/indigo/atproto/syntax"
17 "tangled.org/core/knotmirror/config"
18 "tangled.org/core/knotmirror/db"
19 "tangled.org/core/knotmirror/models"
20 "tangled.org/core/log"
21)
22
23type Resyncer struct {
24 logger *slog.Logger
25 db *sql.DB
26 gitm GitMirrorManager
27 cfg *config.Config
28
29 claimJobMu sync.Mutex
30
31 runningJobs map[syntax.ATURI]context.CancelFunc
32 runningJobsMu sync.Mutex
33
34 repoFetchTimeout time.Duration
35 manualResyncTimeout time.Duration
36 parallelism int
37
38 knotBackoff map[string]time.Time
39 knotBackoffMu sync.RWMutex
40
41 httpClient *http.Client
42}
43
44func NewResyncer(l *slog.Logger, db *sql.DB, gitm GitMirrorManager, cfg *config.Config) *Resyncer {
45 return &Resyncer{
46 logger: log.SubLogger(l, "resyncer"),
47 db: db,
48 gitm: gitm,
49 cfg: cfg,
50
51 runningJobs: make(map[syntax.ATURI]context.CancelFunc),
52
53 repoFetchTimeout: cfg.GitRepoFetchTimeout,
54 manualResyncTimeout: 30 * time.Minute,
55 parallelism: cfg.ResyncParallelism,
56
57 knotBackoff: make(map[string]time.Time),
58
59 httpClient: &http.Client{Timeout: 30 * time.Second},
60 }
61}
62
63func (r *Resyncer) Start(ctx context.Context) {
64 for i := 0; i < r.parallelism; i++ {
65 go r.runResyncWorker(ctx, i)
66 }
67}
68
69func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) {
70 l := r.logger.With("worker", workerID)
71 for {
72 select {
73 case <-ctx.Done():
74 l.Info("resync worker shutting down", "error", ctx.Err())
75 return
76 default:
77 }
78 repoAt, found, err := r.claimResyncJob(ctx)
79 if err != nil {
80 l.Error("failed to claim resync job", "error", err)
81 time.Sleep(time.Second)
82 continue
83 }
84 if !found {
85 time.Sleep(time.Second)
86 continue
87 }
88 l.Info("processing resync", "aturi", repoAt)
89 if err := r.resyncRepo(ctx, repoAt); err != nil {
90 l.Error("resync failed", "aturi", repoAt, "error", err)
91 }
92 }
93}
94
95func (r *Resyncer) registerRunning(repo syntax.ATURI, cancel context.CancelFunc) {
96 r.runningJobsMu.Lock()
97 defer r.runningJobsMu.Unlock()
98
99 if _, exists := r.runningJobs[repo]; exists {
100 return
101 }
102 r.runningJobs[repo] = cancel
103}
104
105func (r *Resyncer) unregisterRunning(repo syntax.ATURI) {
106 r.runningJobsMu.Lock()
107 defer r.runningJobsMu.Unlock()
108
109 delete(r.runningJobs, repo)
110}
111
112func (r *Resyncer) CancelResyncJob(repo syntax.ATURI) {
113 r.runningJobsMu.Lock()
114 defer r.runningJobsMu.Unlock()
115
116 cancel, ok := r.runningJobs[repo]
117 if !ok {
118 return
119 }
120 delete(r.runningJobs, repo)
121 cancel()
122}
123
124// TriggerResyncJob manually triggers the resync job
125func (r *Resyncer) TriggerResyncJob(ctx context.Context, repoAt syntax.ATURI) error {
126 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt)
127 if err != nil {
128 return fmt.Errorf("failed to get repo: %w", err)
129 }
130 if repo == nil {
131 return fmt.Errorf("repo not found: %s", repoAt)
132 }
133
134 if repo.State == models.RepoStateResyncing {
135 return fmt.Errorf("repo already resyncing")
136 }
137
138 repo.State = models.RepoStatePending
139 repo.RetryAfter = -1 // resyncer will prioritize this
140
141 if err := db.UpsertRepo(ctx, r.db, repo); err != nil {
142 return fmt.Errorf("updating repo state to pending %w", err)
143 }
144 return nil
145}
146
147func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.ATURI, bool, error) {
148 // use mutex to prevent duplicated jobs
149 r.claimJobMu.Lock()
150 defer r.claimJobMu.Unlock()
151
152 var repoAt syntax.ATURI
153 now := time.Now().Unix()
154 if err := r.db.QueryRowContext(ctx,
155 `update repos
156 set state = $1
157 where at_uri = (
158 select at_uri from repos
159 where state in ($2, $3, $4)
160 and (retry_after = -1 or retry_after = 0 or retry_after < $5)
161 order by
162 (retry_after = -1) desc,
163 (retry_after = 0) desc,
164 retry_after
165 limit 1
166 )
167 returning at_uri
168 `,
169 models.RepoStateResyncing,
170 models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError,
171 now,
172 ).Scan(&repoAt); err != nil {
173 if errors.Is(err, sql.ErrNoRows) {
174 return "", false, nil
175 }
176 return "", false, err
177 }
178
179 return repoAt, true, nil
180}
181
182func (r *Resyncer) resyncRepo(ctx context.Context, repoAt syntax.ATURI) error {
183 // ctx, span := tracer.Start(ctx, "resyncRepo")
184 // span.SetAttributes(attribute.String("aturi", repoAt))
185 // defer span.End()
186
187 resyncsStarted.Inc()
188 startTime := time.Now()
189
190 jobCtx, cancel := context.WithCancel(ctx)
191 r.registerRunning(repoAt, cancel)
192 defer r.unregisterRunning(repoAt)
193
194 success, err := r.doResync(jobCtx, repoAt)
195 if !success {
196 resyncsFailed.Inc()
197 resyncDuration.Observe(time.Since(startTime).Seconds())
198 return r.handleResyncFailure(ctx, repoAt, err)
199 }
200
201 resyncsCompleted.Inc()
202 resyncDuration.Observe(time.Since(startTime).Seconds())
203 return nil
204}
205
206func (r *Resyncer) doResync(ctx context.Context, repoAt syntax.ATURI) (bool, error) {
207 // ctx, span := tracer.Start(ctx, "doResync")
208 // span.SetAttributes(attribute.String("aturi", repoAt))
209 // defer span.End()
210
211 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt)
212 if err != nil {
213 return false, fmt.Errorf("failed to get repo: %w", err)
214 }
215 if repo == nil { // untracked repo, skip
216 return false, nil
217 }
218
219 r.knotBackoffMu.RLock()
220 backoffUntil, inBackoff := r.knotBackoff[repo.KnotDomain]
221 r.knotBackoffMu.RUnlock()
222 if inBackoff && time.Now().Before(backoffUntil) {
223 return false, nil
224 }
225
226 // HACK: check knot reachability with short timeout before running actual fetch.
227 // This is crucial as git-cli doesn't support http connection timeout.
228 // `http.lowSpeedTime` is only applied _after_ the connection.
229 if err := r.checkKnotReachability(ctx, repo); err != nil {
230 if isRateLimitError(err) {
231 r.knotBackoffMu.Lock()
232 r.knotBackoff[repo.KnotDomain] = time.Now().Add(10 * time.Second)
233 r.knotBackoffMu.Unlock()
234 return false, nil
235 }
236 // TODO: suspend repo on 404. KnotStream updates will change the repo state back online
237 return false, fmt.Errorf("knot unreachable: %w", err)
238 }
239
240 timeout := r.repoFetchTimeout
241 if repo.RetryAfter == -1 {
242 timeout = r.manualResyncTimeout
243 }
244 fetchCtx, cancel := context.WithTimeout(ctx, timeout)
245 defer cancel()
246
247 if err := r.gitm.Sync(fetchCtx, repo); err != nil {
248 return false, err
249 }
250
251 // repo.GitRev = <processed git.refUpdate revision>
252 // repo.RepoSha = <sha256 sum of git refs>
253 repo.State = models.RepoStateActive
254 repo.ErrorMsg = ""
255 repo.RetryCount = 0
256 repo.RetryAfter = 0
257 if err := db.UpsertRepo(ctx, r.db, repo); err != nil {
258 return false, fmt.Errorf("updating repo state to active %w", err)
259 }
260 return true, nil
261}
262
263type knotStatusError struct {
264 StatusCode int
265}
266
267func (ke *knotStatusError) Error() string {
268 return fmt.Sprintf("request failed with status code (HTTP %d)", ke.StatusCode)
269}
270
271func isRateLimitError(err error) bool {
272 var knotErr *knotStatusError
273 if errors.As(err, &knotErr) {
274 return knotErr.StatusCode == http.StatusTooManyRequests
275 }
276 return false
277}
278
279// checkKnotReachability checks if Knot is reachable and is valid git remote server
280func (r *Resyncer) checkKnotReachability(ctx context.Context, repo *models.Repo) error {
281 repoUrl, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), r.cfg.KnotUseSSL)
282 if err != nil {
283 return err
284 }
285
286 repoUrl += "/info/refs?service=git-upload-pack"
287
288 r.logger.Debug("checking knot reachability", "url", repoUrl)
289
290 req, err := http.NewRequestWithContext(ctx, "GET", repoUrl, nil)
291 if err != nil {
292 return err
293 }
294 req.Header.Set("User-Agent", "git/2.x")
295 req.Header.Set("Accept", "*/*")
296
297 resp, err := r.httpClient.Do(req)
298 if err != nil {
299 var uerr *url.Error
300 if errors.As(err, &uerr) {
301 return fmt.Errorf("request failed: %w", uerr.Unwrap())
302 }
303 return fmt.Errorf("request failed: %w", err)
304 }
305 defer resp.Body.Close()
306
307 if resp.StatusCode != http.StatusOK {
308 return &knotStatusError{resp.StatusCode}
309 }
310
311 // check if target is git server
312 ct := resp.Header.Get("Content-Type")
313 if !strings.Contains(ct, "application/x-git-upload-pack-advertisement") {
314 return fmt.Errorf("unexpected content-type: %s", ct)
315 }
316
317 return nil
318}
319
320func (r *Resyncer) handleResyncFailure(ctx context.Context, repoAt syntax.ATURI, err error) error {
321 r.logger.Debug("handleResyncFailure", "at_uri", repoAt, "err", err)
322 var state models.RepoState
323 var errMsg string
324 if err == nil {
325 state = models.RepoStateDesynchronized
326 errMsg = ""
327 } else {
328 state = models.RepoStateError
329 errMsg = err.Error()
330 }
331
332 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt)
333 if err != nil {
334 return fmt.Errorf("failed to get repo: %w", err)
335 }
336 if repo == nil {
337 return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt)
338 }
339
340 // start a 1 min & go up to 1 hr between retries
341 var retryCount = repo.RetryCount + 1
342 var retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix()
343
344 // remove null bytes
345 errMsg = strings.ReplaceAll(errMsg, "\x00", "")
346
347 repo.State = state
348 repo.ErrorMsg = errMsg
349 repo.RetryCount = retryCount
350 repo.RetryAfter = retryAfter
351 if err := db.UpsertRepo(ctx, r.db, repo); err != nil {
352 return fmt.Errorf("failed to update repo state: %w", err)
353 }
354 return nil
355}
356
357func backoff(retries int, max int) time.Duration {
358 dur := min(1<<retries, max)
359 jitter := time.Millisecond * time.Duration(rand.Intn(1000))
360 return time.Second*time.Duration(dur) + jitter
361}