Monorepo for Tangled
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at master 361 lines 9.3 kB view raw
1package knotmirror 2 3import ( 4 "context" 5 "database/sql" 6 "errors" 7 "fmt" 8 "log/slog" 9 "math/rand" 10 "net/http" 11 "net/url" 12 "strings" 13 "sync" 14 "time" 15 16 "github.com/bluesky-social/indigo/atproto/syntax" 17 "tangled.org/core/knotmirror/config" 18 "tangled.org/core/knotmirror/db" 19 "tangled.org/core/knotmirror/models" 20 "tangled.org/core/log" 21) 22 23type Resyncer struct { 24 logger *slog.Logger 25 db *sql.DB 26 gitm GitMirrorManager 27 cfg *config.Config 28 29 claimJobMu sync.Mutex 30 31 runningJobs map[syntax.ATURI]context.CancelFunc 32 runningJobsMu sync.Mutex 33 34 repoFetchTimeout time.Duration 35 manualResyncTimeout time.Duration 36 parallelism int 37 38 knotBackoff map[string]time.Time 39 knotBackoffMu sync.RWMutex 40 41 httpClient *http.Client 42} 43 44func NewResyncer(l *slog.Logger, db *sql.DB, gitm GitMirrorManager, cfg *config.Config) *Resyncer { 45 return &Resyncer{ 46 logger: log.SubLogger(l, "resyncer"), 47 db: db, 48 gitm: gitm, 49 cfg: cfg, 50 51 runningJobs: make(map[syntax.ATURI]context.CancelFunc), 52 53 repoFetchTimeout: cfg.GitRepoFetchTimeout, 54 manualResyncTimeout: 30 * time.Minute, 55 parallelism: cfg.ResyncParallelism, 56 57 knotBackoff: make(map[string]time.Time), 58 59 httpClient: &http.Client{Timeout: 30 * time.Second}, 60 } 61} 62 63func (r *Resyncer) Start(ctx context.Context) { 64 for i := 0; i < r.parallelism; i++ { 65 go r.runResyncWorker(ctx, i) 66 } 67} 68 69func (r *Resyncer) runResyncWorker(ctx context.Context, workerID int) { 70 l := r.logger.With("worker", workerID) 71 for { 72 select { 73 case <-ctx.Done(): 74 l.Info("resync worker shutting down", "error", ctx.Err()) 75 return 76 default: 77 } 78 repoAt, found, err := r.claimResyncJob(ctx) 79 if err != nil { 80 l.Error("failed to claim resync job", "error", err) 81 time.Sleep(time.Second) 82 continue 83 } 84 if !found { 85 time.Sleep(time.Second) 86 continue 87 } 88 l.Info("processing resync", "aturi", repoAt) 89 if err := r.resyncRepo(ctx, repoAt); err != nil { 90 l.Error("resync failed", "aturi", repoAt, "error", err) 91 } 92 } 93} 94 95func (r *Resyncer) registerRunning(repo syntax.ATURI, cancel context.CancelFunc) { 96 r.runningJobsMu.Lock() 97 defer r.runningJobsMu.Unlock() 98 99 if _, exists := r.runningJobs[repo]; exists { 100 return 101 } 102 r.runningJobs[repo] = cancel 103} 104 105func (r *Resyncer) unregisterRunning(repo syntax.ATURI) { 106 r.runningJobsMu.Lock() 107 defer r.runningJobsMu.Unlock() 108 109 delete(r.runningJobs, repo) 110} 111 112func (r *Resyncer) CancelResyncJob(repo syntax.ATURI) { 113 r.runningJobsMu.Lock() 114 defer r.runningJobsMu.Unlock() 115 116 cancel, ok := r.runningJobs[repo] 117 if !ok { 118 return 119 } 120 delete(r.runningJobs, repo) 121 cancel() 122} 123 124// TriggerResyncJob manually triggers the resync job 125func (r *Resyncer) TriggerResyncJob(ctx context.Context, repoAt syntax.ATURI) error { 126 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 127 if err != nil { 128 return fmt.Errorf("failed to get repo: %w", err) 129 } 130 if repo == nil { 131 return fmt.Errorf("repo not found: %s", repoAt) 132 } 133 134 if repo.State == models.RepoStateResyncing { 135 return fmt.Errorf("repo already resyncing") 136 } 137 138 repo.State = models.RepoStatePending 139 repo.RetryAfter = -1 // resyncer will prioritize this 140 141 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 142 return fmt.Errorf("updating repo state to pending %w", err) 143 } 144 return nil 145} 146 147func (r *Resyncer) claimResyncJob(ctx context.Context) (syntax.ATURI, bool, error) { 148 // use mutex to prevent duplicated jobs 149 r.claimJobMu.Lock() 150 defer r.claimJobMu.Unlock() 151 152 var repoAt syntax.ATURI 153 now := time.Now().Unix() 154 if err := r.db.QueryRowContext(ctx, 155 `update repos 156 set state = $1 157 where at_uri = ( 158 select at_uri from repos 159 where state in ($2, $3, $4) 160 and (retry_after = -1 or retry_after = 0 or retry_after < $5) 161 order by 162 (retry_after = -1) desc, 163 (retry_after = 0) desc, 164 retry_after 165 limit 1 166 ) 167 returning at_uri 168 `, 169 models.RepoStateResyncing, 170 models.RepoStatePending, models.RepoStateDesynchronized, models.RepoStateError, 171 now, 172 ).Scan(&repoAt); err != nil { 173 if errors.Is(err, sql.ErrNoRows) { 174 return "", false, nil 175 } 176 return "", false, err 177 } 178 179 return repoAt, true, nil 180} 181 182func (r *Resyncer) resyncRepo(ctx context.Context, repoAt syntax.ATURI) error { 183 // ctx, span := tracer.Start(ctx, "resyncRepo") 184 // span.SetAttributes(attribute.String("aturi", repoAt)) 185 // defer span.End() 186 187 resyncsStarted.Inc() 188 startTime := time.Now() 189 190 jobCtx, cancel := context.WithCancel(ctx) 191 r.registerRunning(repoAt, cancel) 192 defer r.unregisterRunning(repoAt) 193 194 success, err := r.doResync(jobCtx, repoAt) 195 if !success { 196 resyncsFailed.Inc() 197 resyncDuration.Observe(time.Since(startTime).Seconds()) 198 return r.handleResyncFailure(ctx, repoAt, err) 199 } 200 201 resyncsCompleted.Inc() 202 resyncDuration.Observe(time.Since(startTime).Seconds()) 203 return nil 204} 205 206func (r *Resyncer) doResync(ctx context.Context, repoAt syntax.ATURI) (bool, error) { 207 // ctx, span := tracer.Start(ctx, "doResync") 208 // span.SetAttributes(attribute.String("aturi", repoAt)) 209 // defer span.End() 210 211 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 212 if err != nil { 213 return false, fmt.Errorf("failed to get repo: %w", err) 214 } 215 if repo == nil { // untracked repo, skip 216 return false, nil 217 } 218 219 r.knotBackoffMu.RLock() 220 backoffUntil, inBackoff := r.knotBackoff[repo.KnotDomain] 221 r.knotBackoffMu.RUnlock() 222 if inBackoff && time.Now().Before(backoffUntil) { 223 return false, nil 224 } 225 226 // HACK: check knot reachability with short timeout before running actual fetch. 227 // This is crucial as git-cli doesn't support http connection timeout. 228 // `http.lowSpeedTime` is only applied _after_ the connection. 229 if err := r.checkKnotReachability(ctx, repo); err != nil { 230 if isRateLimitError(err) { 231 r.knotBackoffMu.Lock() 232 r.knotBackoff[repo.KnotDomain] = time.Now().Add(10 * time.Second) 233 r.knotBackoffMu.Unlock() 234 return false, nil 235 } 236 // TODO: suspend repo on 404. KnotStream updates will change the repo state back online 237 return false, fmt.Errorf("knot unreachable: %w", err) 238 } 239 240 timeout := r.repoFetchTimeout 241 if repo.RetryAfter == -1 { 242 timeout = r.manualResyncTimeout 243 } 244 fetchCtx, cancel := context.WithTimeout(ctx, timeout) 245 defer cancel() 246 247 if err := r.gitm.Sync(fetchCtx, repo); err != nil { 248 return false, err 249 } 250 251 // repo.GitRev = <processed git.refUpdate revision> 252 // repo.RepoSha = <sha256 sum of git refs> 253 repo.State = models.RepoStateActive 254 repo.ErrorMsg = "" 255 repo.RetryCount = 0 256 repo.RetryAfter = 0 257 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 258 return false, fmt.Errorf("updating repo state to active %w", err) 259 } 260 return true, nil 261} 262 263type knotStatusError struct { 264 StatusCode int 265} 266 267func (ke *knotStatusError) Error() string { 268 return fmt.Sprintf("request failed with status code (HTTP %d)", ke.StatusCode) 269} 270 271func isRateLimitError(err error) bool { 272 var knotErr *knotStatusError 273 if errors.As(err, &knotErr) { 274 return knotErr.StatusCode == http.StatusTooManyRequests 275 } 276 return false 277} 278 279// checkKnotReachability checks if Knot is reachable and is valid git remote server 280func (r *Resyncer) checkKnotReachability(ctx context.Context, repo *models.Repo) error { 281 repoUrl, err := makeRepoRemoteUrl(repo.KnotDomain, repo.DidSlashRepo(), r.cfg.KnotUseSSL) 282 if err != nil { 283 return err 284 } 285 286 repoUrl += "/info/refs?service=git-upload-pack" 287 288 r.logger.Debug("checking knot reachability", "url", repoUrl) 289 290 req, err := http.NewRequestWithContext(ctx, "GET", repoUrl, nil) 291 if err != nil { 292 return err 293 } 294 req.Header.Set("User-Agent", "git/2.x") 295 req.Header.Set("Accept", "*/*") 296 297 resp, err := r.httpClient.Do(req) 298 if err != nil { 299 var uerr *url.Error 300 if errors.As(err, &uerr) { 301 return fmt.Errorf("request failed: %w", uerr.Unwrap()) 302 } 303 return fmt.Errorf("request failed: %w", err) 304 } 305 defer resp.Body.Close() 306 307 if resp.StatusCode != http.StatusOK { 308 return &knotStatusError{resp.StatusCode} 309 } 310 311 // check if target is git server 312 ct := resp.Header.Get("Content-Type") 313 if !strings.Contains(ct, "application/x-git-upload-pack-advertisement") { 314 return fmt.Errorf("unexpected content-type: %s", ct) 315 } 316 317 return nil 318} 319 320func (r *Resyncer) handleResyncFailure(ctx context.Context, repoAt syntax.ATURI, err error) error { 321 r.logger.Debug("handleResyncFailure", "at_uri", repoAt, "err", err) 322 var state models.RepoState 323 var errMsg string 324 if err == nil { 325 state = models.RepoStateDesynchronized 326 errMsg = "" 327 } else { 328 state = models.RepoStateError 329 errMsg = err.Error() 330 } 331 332 repo, err := db.GetRepoByAtUri(ctx, r.db, repoAt) 333 if err != nil { 334 return fmt.Errorf("failed to get repo: %w", err) 335 } 336 if repo == nil { 337 return fmt.Errorf("failed to get repo. repo '%s' doesn't exist in db", repoAt) 338 } 339 340 // start a 1 min & go up to 1 hr between retries 341 var retryCount = repo.RetryCount + 1 342 var retryAfter = time.Now().Add(backoff(retryCount, 60) * 60).Unix() 343 344 // remove null bytes 345 errMsg = strings.ReplaceAll(errMsg, "\x00", "") 346 347 repo.State = state 348 repo.ErrorMsg = errMsg 349 repo.RetryCount = retryCount 350 repo.RetryAfter = retryAfter 351 if err := db.UpsertRepo(ctx, r.db, repo); err != nil { 352 return fmt.Errorf("failed to update repo state: %w", err) 353 } 354 return nil 355} 356 357func backoff(retries int, max int) time.Duration { 358 dur := min(1<<retries, max) 359 jitter := time.Millisecond * time.Duration(rand.Intn(1000)) 360 return time.Second*time.Duration(dur) + jitter 361}