tangled vouch map with historical data
7
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: fix backfill

+171 -49
+114 -48
cmd/backfill/main.go
··· 7 7 "fmt" 8 8 "io" 9 9 "log/slog" 10 + "math" 10 11 "net/http" 11 12 "os" 12 13 "os/signal" ··· 21 22 ) 22 23 23 24 const ( 24 - VouchCollection = "sh.tangled.graph.vouch" 25 - FollowCollection = "sh.tangled.graph.follow" 25 + VouchCollection = "sh.tangled.graph.vouch" 26 + FollowCollection = "sh.tangled.graph.follow" 26 27 KnotMemberCollection = "sh.tangled.knot.member" 27 - DefaultKnotDID = "did:plc:wshs7t2adsemcrrd4snkeqli" 28 - ListRecordsLimit = 100 29 - MaxWorkers = 20 28 + DefaultKnotDID = "did:plc:wshs7t2adsemcrrd4snkeqli" 29 + ListRecordsLimit = 100 30 + MaxWorkers = 20 31 + ProfileBatchSize = 50 32 + ProfileInterval = 30 * time.Second 33 + MaxRetries = 3 34 + RetryBaseDelay = 2 * time.Second 30 35 ) 31 36 32 37 var httpClient = &http.Client{ ··· 131 136 didCh := make(chan string, 10000) 132 137 var inFlight sync.WaitGroup 133 138 134 - // seed initial DIDs 135 139 for _, did := range cleanSeeds { 136 140 visited.Store(did, true) 137 141 inFlight.Add(1) ··· 144 148 var totalErrors atomic.Int64 145 149 var startTime = time.Now() 146 150 147 - // periodic progress ticker 148 151 go func() { 149 152 ticker := time.NewTicker(5 * time.Second) 150 153 defer ticker.Stop() ··· 153 156 case <-ctx.Done(): 154 157 return 155 158 case <-ticker.C: 156 - visited := totalVisited.Load() 159 + v := totalVisited.Load() 157 160 elapsed := time.Since(startTime).Round(time.Second) 158 - rate := float64(visited) / elapsed.Seconds() 161 + rate := float64(v) / elapsed.Seconds() 159 162 slog.Info("progress", 160 - "visited", visited, 163 + "visited", v, 161 164 "vouches", totalVouches.Load(), 162 165 "follows", totalFollows.Load(), 163 166 "errors", totalErrors.Load(), ··· 169 172 } 170 173 }() 171 174 172 - // closer: wait for all in-flight work to finish, then close the channel 175 + // background profile enrichment 176 + var profileWg sync.WaitGroup 177 + profileDIDs := make(chan string, 5000) 178 + profileWg.Add(1) 179 + go func() { 180 + defer profileWg.Done() 181 + batch := make([]string, 0, ProfileBatchSize) 182 + timer := time.NewTimer(ProfileInterval) 183 + defer timer.Stop() 184 + 185 + flush := func() { 186 + if len(batch) == 0 { 187 + return 188 + } 189 + dids := make([]string, len(batch)) 190 + copy(dids, batch) 191 + batch = batch[:0] 192 + 193 + enriched, err := resolve.BatchResolveAndStore(ctx, store, dids) 194 + if err != nil { 195 + slog.Warn("batch profile resolve had errors", "count", len(dids), "error", err) 196 + } 197 + if enriched > 0 { 198 + slog.Info("enriched profiles", "count", enriched) 199 + } 200 + } 201 + 202 + for { 203 + select { 204 + case <-ctx.Done(): 205 + flush() 206 + return 207 + case did, ok := <-profileDIDs: 208 + if !ok { 209 + flush() 210 + return 211 + } 212 + batch = append(batch, did) 213 + if len(batch) >= ProfileBatchSize { 214 + flush() 215 + timer.Reset(ProfileInterval) 216 + } 217 + case <-timer.C: 218 + flush() 219 + timer.Reset(ProfileInterval) 220 + } 221 + } 222 + }() 223 + 173 224 go func() { 174 225 inFlight.Wait() 175 226 close(didCh) ··· 206 257 inFlight.Add(1) 207 258 select { 208 259 case didCh <- newDID: 260 + // queue new DID for profile enrichment 261 + select { 262 + case profileDIDs <- newDID: 263 + default: 264 + } 209 265 case <-ctx.Done(): 210 266 inFlight.Done() 211 267 } ··· 217 273 } 218 274 219 275 wg.Wait() 276 + close(profileDIDs) 277 + profileWg.Wait() 220 278 221 279 elapsed := time.Since(startTime).Round(time.Second) 222 280 rate := float64(totalVisited.Load()) / elapsed.Seconds() ··· 228 286 "elapsed", elapsed.String(), 229 287 "rate", fmt.Sprintf("%.1f/s", rate), 230 288 ) 231 - 232 - slog.Info("enriching profiles...") 233 - enriched, err := resolve.EnrichMissing(ctx, store, 100) 234 - if err != nil { 235 - slog.Warn("profile enrichment had errors", "error", err) 236 - } 237 - slog.Info("profile enrichment complete", "enriched", enriched) 238 289 } 239 290 240 291 func bootstrapFromDB(store *db.Store) []string { ··· 261 312 default: 262 313 } 263 314 264 - u := fmt.Sprintf("%s/xrpc/com.atproto.repo.listRecords?repo=%s&collection=%s&limit=%d", 265 - pdsURL, knotDID, KnotMemberCollection, ListRecordsLimit) 266 - if cursor != "" { 267 - u += "&cursor=" + cursor 268 - } 269 - 270 - records, nextCursor, err := listRecords(ctx, pdsURL, knotDID, KnotMemberCollection, cursor) 315 + records, nextCursor, err := listRecordsWithRetry(ctx, pdsURL, knotDID, KnotMemberCollection, cursor) 271 316 if err != nil { 272 317 return members, err 273 318 } ··· 313 358 314 359 var newDIDs []string 315 360 316 - // try all collections in parallel 317 361 type result struct { 318 362 dids []string 319 363 count int ··· 353 397 354 398 func fetchVouches(ctx context.Context, store *db.Store, pdsURL, did string) ([]string, int, error) { 355 399 var newDIDs []string 356 - count := 0 400 + var vouches []db.Vouch 357 401 cursor := "" 358 402 359 403 for { 360 - records, nextCursor, err := listRecords(ctx, pdsURL, did, VouchCollection, cursor) 404 + records, nextCursor, err := listRecordsWithRetry(ctx, pdsURL, did, VouchCollection, cursor) 361 405 if err != nil { 362 - return newDIDs, count, err 406 + return newDIDs, 0, err 363 407 } 364 408 365 409 for _, rec := range records { ··· 378 422 updatedAt = t 379 423 } 380 424 381 - v := db.Vouch{ 425 + vouches = append(vouches, db.Vouch{ 382 426 VoucherDID: did, 383 427 VoucheeDID: rkey, 384 428 Kind: vouch.Kind, ··· 386 430 CreatedAt: vouch.CreatedAt, 387 431 Seq: 0, 388 432 UpdatedAt: updatedAt, 389 - } 390 - 391 - if err := store.UpsertVouch(v); err != nil { 392 - return newDIDs, count, fmt.Errorf("upsert vouch: %w", err) 393 - } 394 - count++ 433 + }) 395 434 if isValidDID(rkey) { 396 435 newDIDs = append(newDIDs, rkey) 397 436 } ··· 403 442 cursor = nextCursor 404 443 } 405 444 406 - return newDIDs, count, nil 445 + if len(vouches) > 0 { 446 + if err := store.BatchUpsertVouches(vouches); err != nil { 447 + return newDIDs, 0, fmt.Errorf("batch upsert vouches: %w", err) 448 + } 449 + } 450 + 451 + return newDIDs, len(vouches), nil 407 452 } 408 453 409 454 func fetchFollows(ctx context.Context, store *db.Store, pdsURL, did string) ([]string, int, error) { 410 455 var newDIDs []string 411 - count := 0 456 + var follows []db.Follow 412 457 cursor := "" 413 458 414 459 for { 415 - records, nextCursor, err := listRecords(ctx, pdsURL, did, FollowCollection, cursor) 460 + records, nextCursor, err := listRecordsWithRetry(ctx, pdsURL, did, FollowCollection, cursor) 416 461 if err != nil { 417 - return newDIDs, count, err 462 + return newDIDs, 0, err 418 463 } 419 464 420 465 for _, rec := range records { ··· 432 477 updatedAt = t 433 478 } 434 479 435 - f := db.Follow{ 480 + follows = append(follows, db.Follow{ 436 481 ActorDID: did, 437 482 SubjectDID: follow.Subject, 438 483 CreatedAt: follow.CreatedAt, 439 484 UpdatedAt: updatedAt, 440 - } 441 - 442 - if err := store.UpsertFollow(f); err != nil { 443 - return newDIDs, count, fmt.Errorf("upsert follow: %w", err) 444 - } 445 - count++ 485 + }) 446 486 newDIDs = append(newDIDs, follow.Subject) 447 487 } 448 488 ··· 452 492 cursor = nextCursor 453 493 } 454 494 455 - return newDIDs, count, nil 495 + if len(follows) > 0 { 496 + if err := store.BatchUpsertFollows(follows); err != nil { 497 + return newDIDs, 0, fmt.Errorf("batch upsert follows: %w", err) 498 + } 499 + } 500 + 501 + return newDIDs, len(follows), nil 456 502 } 457 503 458 504 type Record struct { 459 505 URI string 460 506 Value json.RawMessage 507 + } 508 + 509 + func listRecordsWithRetry(ctx context.Context, pdsURL, did, collection, cursor string) ([]Record, string, error) { 510 + var lastErr error 511 + for attempt := 0; attempt <= MaxRetries; attempt++ { 512 + if attempt > 0 { 513 + delay := RetryBaseDelay * time.Duration(math.Pow(2, float64(attempt-1))) 514 + select { 515 + case <-ctx.Done(): 516 + return nil, "", ctx.Err() 517 + case <-time.After(delay): 518 + } 519 + } 520 + records, nextCursor, err := listRecords(ctx, pdsURL, did, collection, cursor) 521 + if err == nil { 522 + return records, nextCursor, nil 523 + } 524 + lastErr = err 525 + } 526 + return nil, "", lastErr 461 527 } 462 528 463 529 func listRecords(ctx context.Context, pdsURL, did, collection, cursor string) ([]Record, string, error) {
+57 -1
internal/db/db.go
··· 173 173 return err 174 174 } 175 175 176 + func (s *Store) BatchUpsertVouches(vouches []Vouch) error { 177 + tx, err := s.db.Begin() 178 + if err != nil { 179 + return err 180 + } 181 + defer tx.Rollback() 182 + 183 + stmt, err := tx.Prepare(` 184 + INSERT INTO vouches (voucher_did, vouchee_did, kind, reason, created_at, seq, updated_at) 185 + VALUES (?, ?, ?, ?, ?, ?, ?) 186 + ON CONFLICT(voucher_did, vouchee_did) DO UPDATE SET 187 + kind = excluded.kind, 188 + reason = excluded.reason, 189 + created_at = excluded.created_at, 190 + seq = excluded.seq, 191 + updated_at = excluded.updated_at 192 + `) 193 + if err != nil { 194 + return err 195 + } 196 + defer stmt.Close() 197 + 198 + for _, v := range vouches { 199 + if _, err := stmt.Exec(v.VoucherDID, v.VoucheeDID, v.Kind, v.Reason, v.CreatedAt, v.Seq, v.UpdatedAt); err != nil { 200 + return err 201 + } 202 + } 203 + return tx.Commit() 204 + } 205 + 176 206 func (s *Store) DeleteVouch(voucherDID, voucheeDID string) error { 177 207 _, err := s.db.Exec(`DELETE FROM vouches WHERE voucher_did = ? AND vouchee_did = ?`, voucherDID, voucheeDID) 178 208 return err ··· 187 217 updated_at = excluded.updated_at 188 218 `, f.ActorDID, f.SubjectDID, f.CreatedAt, f.UpdatedAt) 189 219 return err 220 + } 221 + 222 + func (s *Store) BatchUpsertFollows(follows []Follow) error { 223 + tx, err := s.db.Begin() 224 + if err != nil { 225 + return err 226 + } 227 + defer tx.Rollback() 228 + 229 + stmt, err := tx.Prepare(` 230 + INSERT INTO follows (actor_did, subject_did, created_at, updated_at) 231 + VALUES (?, ?, ?, ?) 232 + ON CONFLICT(actor_did, subject_did) DO UPDATE SET 233 + created_at = excluded.created_at, 234 + updated_at = excluded.updated_at 235 + `) 236 + if err != nil { 237 + return err 238 + } 239 + defer stmt.Close() 240 + 241 + for _, f := range follows { 242 + if _, err := stmt.Exec(f.ActorDID, f.SubjectDID, f.CreatedAt, f.UpdatedAt); err != nil { 243 + return err 244 + } 245 + } 246 + return tx.Commit() 190 247 } 191 248 192 249 func (s *Store) DeleteFollow(actorDID, subjectDID string) error { ··· 418 475 SELECT MIN(t), MAX(t) FROM ( 419 476 SELECT updated_at as t FROM vouches WHERE updated_at > '0001-01-01' 420 477 UNION ALL SELECT updated_at FROM follows WHERE updated_at > '0001-01-01' 421 - UNION ALL SELECT updated_at FROM stars WHERE updated_at > '0001-01-01' 422 478 UNION ALL SELECT updated_at FROM knot_members WHERE updated_at > '0001-01-01' 423 479 ) 424 480 `).Scan(&min, &max)