collection of golang services under the Red Dwarf umbrella server.reddwarf.app
bluesky reddwarf microcosm appview
15
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 547 lines 15 kB view raw
1package backstream 2 3import ( 4 //"bytes" 5 "context" 6 "encoding/json" 7 "errors" 8 "fmt" 9 "log" 10 "net/http" 11 "strings" 12 "sync" 13 "time" 14 15 "io" 16 "io/ioutil" 17 "os" 18 19 "runtime" 20 "runtime/debug" 21 22 "github.com/gorilla/websocket" 23 "github.com/klauspost/compress/zstd" 24 25 data "github.com/bluesky-social/indigo/atproto/atdata" 26 atrepo "github.com/bluesky-social/indigo/atproto/repo" 27 28 // "github.com/bluesky-social/indigo/repo" 29 "github.com/bluesky-social/indigo/atproto/syntax" 30 "github.com/ipfs/go-cid" 31) 32 33const ( 34 numWorkers = 20 35) 36 37var DefaultUpgrader = websocket.Upgrader{ 38 CheckOrigin: func(r *http.Request) bool { 39 return true 40 }, 41} 42 43type BackfillHandler struct { 44 Upgrader websocket.Upgrader 45 SessionManager *SessionManager 46 AtpClient *ATProtoClient 47 ZstdDict []byte 48 UseGetRepoMethod bool 49} 50 51type BackfillParams struct { 52 WantedDIDs []string 53 WantedCollections []string 54 GetRecordFormat bool 55} 56 57func (h *BackfillHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { 58 compress := (r.URL.Query().Get("compress") == "true") && (h.ZstdDict != nil) 59 60 conn, err := h.Upgrader.Upgrade(w, r, nil) 61 if err != nil { 62 log.Printf("Failed to upgrade connection: %v", err) 63 return 64 } 65 defer conn.Close() 66 67 if compress { 68 log.Println("Client requested zstd compression. Enabling.") 69 } 70 71 params, ticket, err := h.parseQueryParams(r) 72 if err != nil { 73 h.sendError(conn, err.Error()) 74 return 75 } 76 77 log.Printf("New connection for ticket: %s. DIDs: %v, Collections: %v, Workers: %d", ticket, params.WantedDIDs, params.WantedCollections, numWorkers) 78 79 session := h.SessionManager.GetOrCreate(ticket, params) 80 session.LastAccessed = time.Now() 81 82 ctx, cancel := context.WithCancel(r.Context()) 83 defer cancel() 84 85 go func() { 86 defer cancel() 87 for { 88 if _, _, err := conn.ReadMessage(); err != nil { 89 if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { 90 log.Printf("Client disconnected for ticket %s (read error): %v", ticket, err) 91 } 92 break 93 } 94 } 95 }() 96 97 var wg sync.WaitGroup 98 jobs := make(chan string, numWorkers) 99 results := make(chan interface{}, 100) 100 101 for i := 1; i <= numWorkers; i++ { 102 go h.worker(ctx, i, &wg, jobs, results, session) 103 } 104 105 writerDone := make(chan struct{}) 106 if compress { 107 go h.compressedWriter(ctx, cancel, conn, results, writerDone) 108 } else { 109 go h.writer(ctx, cancel, conn, results, writerDone) 110 } 111 112 wg.Add(1) 113 go h.producer(ctx, &wg, jobs, session) 114 115 wg.Wait() 116 close(results) 117 <-writerDone 118 119 log.Printf("Backfill completed for ticket: %s", session.Ticket) 120 h.sendMessage(conn, map[string]string{"status": "complete", "message": "Backfill finished."}) 121} 122 123func (h *BackfillHandler) compressedWriter(ctx context.Context, cancel context.CancelFunc, conn *websocket.Conn, results <-chan interface{}, done chan<- struct{}) { 124 defer close(done) 125 126 encoder, err := zstd.NewWriter(nil, zstd.WithEncoderDict(h.ZstdDict)) 127 if err != nil { 128 log.Printf("ERROR: [CompressedWriter] Failed to create zstd encoder with dictionary: %v", err) 129 cancel() 130 return 131 } 132 defer encoder.Close() 133 134 for { 135 select { 136 case result, ok := <-results: 137 if !ok { 138 return 139 } 140 141 data, err := json.Marshal(result) 142 if err != nil { 143 log.Printf("ERROR: [CompressedWriter] Failed to marshal JSON: %v", err) 144 cancel() 145 return 146 } 147 148 compressed := encoder.EncodeAll(data, nil) 149 150 if err := conn.WriteMessage(websocket.BinaryMessage, compressed); err != nil { 151 log.Printf("ERROR: [CompressedWriter] Failed to write compressed message: %v", err) 152 cancel() 153 return 154 } 155 case <-ctx.Done(): 156 log.Printf("[CompressedWriter] Context cancelled, stopping.") 157 return 158 } 159 } 160} 161 162func (h *BackfillHandler) writer(ctx context.Context, cancel context.CancelFunc, conn *websocket.Conn, results <-chan interface{}, done chan<- struct{}) { 163 defer close(done) 164 for { 165 select { 166 case result, ok := <-results: 167 if !ok { 168 return 169 } 170 if err := h.sendMessage(conn, result); err != nil { 171 log.Printf("ERROR: [Writer] Failed to write message, closing connection: %v", err) 172 cancel() 173 return 174 } 175 case <-ctx.Done(): 176 log.Printf("[Writer] Context cancelled, stopping.") 177 return 178 } 179 } 180} 181 182func (h *BackfillHandler) producer(ctx context.Context, wg *sync.WaitGroup, jobs chan<- string, session *Session) { 183 defer close(jobs) 184 defer wg.Done() 185 186 isFullNetwork := len(session.Params.WantedDIDs) == 1 && session.Params.WantedDIDs[0] == "*" 187 isAllCollections := len(session.Params.WantedCollections) == 1 && session.Params.WantedCollections[0] == "*" 188 189 if isFullNetwork { 190 if isAllCollections { 191 // --- Case 1: Full Network, All Collections (dids=*&collections=*) --- 192 // We need to list *all* repos from the relay. 193 log.Printf("[Producer] Starting full network scan for all collections.") 194 for { 195 select { 196 case <-ctx.Done(): 197 log.Printf("[Producer] Context cancelled, stopping full repo fetch.") 198 return 199 default: 200 } 201 202 log.Printf("[Producer] Fetching all repos with cursor: %s", session.ListReposCursor) 203 repos, nextCursor, err := h.AtpClient.ListRepos(ctx, session.ListReposCursor) 204 if err != nil { 205 log.Printf("ERROR: [Producer] Failed to list all repos: %v", err) 206 return 207 } 208 209 for _, repo := range repos { 210 if !session.IsDIDComplete(repo.DID) { 211 wg.Add(1) 212 jobs <- repo.DID 213 } 214 } 215 216 session.mu.Lock() 217 session.ListReposCursor = nextCursor 218 session.LastAccessed = time.Now() 219 session.mu.Unlock() 220 221 if nextCursor == "" { 222 log.Printf("[Producer] Finished fetching all repos from relay.") 223 break 224 } 225 } 226 } else { 227 // --- Case 2: Full Network, Specific Collections (dids=*&collections=a,b,c) --- 228 // For each specific collection, page through all repos and send DIDs to workers. 229 log.Printf("[Producer] Starting network scan for specific collections: %v", session.Params.WantedCollections) 230 for _, collection := range session.Params.WantedCollections { 231 for { 232 select { 233 case <-ctx.Done(): 234 log.Printf("[Producer] Context cancelled, stopping repo fetch.") 235 return 236 default: 237 } 238 239 log.Printf("[Producer] Fetching repos for %s with cursor: %s", collection, session.ListReposCursor) 240 repos, nextCursor, err := h.AtpClient.ListReposByCollection(ctx, collection, session.ListReposCursor) 241 if err != nil { 242 log.Printf("ERROR: [Producer] Failed to list repos for collection %s: %v", collection, err) 243 return 244 } 245 246 for _, repo := range repos { 247 if !session.IsDIDComplete(repo.DID) { 248 wg.Add(1) 249 jobs <- repo.DID 250 } 251 } 252 253 session.mu.Lock() 254 session.ListReposCursor = nextCursor 255 session.LastAccessed = time.Now() 256 session.mu.Unlock() 257 258 if nextCursor == "" { 259 log.Printf("[Producer] Finished fetching all repos for collection %s", collection) 260 break 261 } 262 } 263 } 264 } 265 } else { 266 // --- Case 3: Specific List of DIDs (dids=a,b,c) --- 267 // Send user-provided DIDs to workers. 268 for _, did := range session.Params.WantedDIDs { 269 select { 270 case <-ctx.Done(): 271 log.Printf("[Producer] Context cancelled, stopping DID processing.") 272 return 273 default: 274 if !session.IsDIDComplete(did) { 275 wg.Add(1) 276 jobs <- did 277 } else { 278 log.Printf("[Producer] Skipping already completed DID: %s", did) 279 } 280 } 281 } 282 } 283} 284 285func (h *BackfillHandler) worker(ctx context.Context, id int, wg *sync.WaitGroup, jobs <-chan string, results chan<- interface{}, session *Session) { 286 for did := range jobs { 287 func(did string) { 288 defer func() { 289 wg.Done() 290 291 runtime.GC() 292 debug.FreeOSMemory() 293 294 log.Printf("[Worker %d] Cleaned up resources for DID: %s", id, did) 295 }() 296 297 select { 298 case <-ctx.Done(): 299 return 300 default: 301 } 302 303 log.Printf("[Worker %d] Processing DID: %s", id, did) 304 pdsURL, err := h.AtpClient.ResolveDID(ctx, did) 305 if err != nil { 306 log.Printf("WARN: [Worker %d] Could not resolve DID %s, skipping. Error: %v", id, did, err) 307 return 308 } 309 310 if h.UseGetRepoMethod { 311 h.processDIDWithGetRepo(ctx, id, did, pdsURL, results, session) 312 } else { 313 h.processDIDWithListRecords(ctx, id, did, pdsURL, results, session) 314 } 315 316 session.MarkDIDComplete(did) 317 log.Printf("[Worker %d] Finished DID: %s", id, did) 318 }(did) 319 } 320} 321 322func (h *BackfillHandler) processDIDWithGetRepo(ctx context.Context, id int, did, pdsURL string, results chan<- interface{}, session *Session) { 323 log.Printf("[Worker %d] Using streaming getRepo method for %s", id, did) 324 isAllCollections := len(session.Params.WantedCollections) == 1 && session.Params.WantedCollections[0] == "*" 325 326 wantedSet := make(map[string]struct{}) 327 if !isAllCollections { 328 for _, coll := range session.Params.WantedCollections { 329 wantedSet[coll] = struct{}{} 330 } 331 } 332 333 respBody, err := h.AtpClient.GetRepo(ctx, pdsURL, did) 334 if err != nil { 335 log.Printf("WARN: [Worker %d] Failed to get repo stream for %s: %v", id, did, err) 336 return 337 } 338 defer respBody.Close() 339 340 if err := os.MkdirAll("./temp", 0o755); err != nil { 341 panic(err) 342 } 343 tempFile, err := ioutil.TempFile("./temp", "backstream-repo-*.car") 344 if err != nil { 345 log.Printf("ERROR: [Worker %d] Failed to create temp file for %s: %v", id, did, err) 346 return 347 } 348 defer os.Remove(tempFile.Name()) 349 350 if _, err := io.Copy(tempFile, respBody); err != nil { 351 log.Printf("ERROR: [Worker %d] Failed to write repo to temp file for %s: %v", id, did, err) 352 return 353 } 354 355 if err := tempFile.Close(); err != nil { 356 log.Printf("ERROR: [Worker %d] Failed to close temp file for %s: %v", id, did, err) 357 return 358 } 359 360 readHandle, err := os.Open(tempFile.Name()) 361 if err != nil { 362 log.Printf("ERROR: [Worker %d] Failed to open temp file for reading %s: %v", id, did, err) 363 return 364 } 365 defer readHandle.Close() 366 367 _, r, err := atrepo.LoadRepoFromCAR(ctx, readHandle) 368 if err != nil { 369 log.Printf("WARN: [Worker %d] Failed to read CAR stream for %s from temp file: %v", id, did, err) 370 return 371 } 372 373 err = r.MST.Walk(func(k []byte, v cid.Cid) error { 374 select { 375 case <-ctx.Done(): 376 return errors.New("context cancelled during repo walk") 377 default: 378 } 379 380 path := string(k) 381 collection, rkey, err := syntax.ParseRepoPath(path) 382 if err != nil { 383 log.Printf("WARN: [Worker %d] Could not parse repo path '%s' for %s, skipping record", id, path, did) 384 return nil 385 } 386 387 if !isAllCollections { 388 if _, ok := wantedSet[string(collection)]; !ok { 389 return nil 390 } 391 } 392 393 recBytes, _, err := r.GetRecordBytes(ctx, collection, rkey) 394 if err != nil { 395 log.Printf("WARN: [Worker %d] Failed to get record bytes for %s: %v", id, path, err) 396 return nil 397 } 398 399 recordVal, err := data.UnmarshalCBOR(recBytes) 400 if err != nil { 401 log.Printf("WARN: [Worker %d] Failed to unmarshal record CBOR for %s: %v", id, path, err) 402 return nil 403 } 404 405 record := Record{ 406 URI: fmt.Sprintf("at://%s/%s", did, path), 407 CID: v.String(), 408 Value: recordVal, 409 } 410 411 output := h.formatOutput(record, did, string(collection), session.Params.GetRecordFormat) 412 select { 413 case results <- output: 414 case <-ctx.Done(): 415 return errors.New("context cancelled while sending result") 416 } 417 418 session.SetListRecordsCursor(did, string(collection), string(rkey)) 419 return nil 420 }) 421 422 if err != nil && !errors.Is(err, context.Canceled) { 423 log.Printf("WARN: [Worker %d] Error while walking repo for %s: %v", id, did, err) 424 } 425} 426 427func (h *BackfillHandler) processDIDWithListRecords(ctx context.Context, id int, did, pdsURL string, results chan<- interface{}, session *Session) { 428 log.Printf("[Worker %d] Using listRecords method for %s", id, did) 429 isAllCollections := len(session.Params.WantedCollections) == 1 && session.Params.WantedCollections[0] == "*" 430 var collectionsToProcess []string 431 432 if isAllCollections { 433 repoCollections, err := h.AtpClient.DescribeRepo(ctx, pdsURL, did) 434 if err != nil { 435 log.Printf("WARN: [Worker %d] Could not describe repo for %s to find collections, skipping. Error: %v", id, did, err) 436 return 437 } 438 collectionsToProcess = repoCollections 439 log.Printf("[Worker %d] Found %d collections for DID %s", id, len(collectionsToProcess), did) 440 } else { 441 collectionsToProcess = session.Params.WantedCollections 442 } 443 444 for _, collection := range collectionsToProcess { 445 cursor := session.GetListRecordsCursor(did, collection) 446 for { 447 select { 448 case <-ctx.Done(): 449 log.Printf("[Worker %d] Context cancelled for DID %s", id, did) 450 return 451 default: 452 } 453 454 records, nextCursor, err := h.AtpClient.ListRecords(ctx, pdsURL, did, collection, cursor) 455 if err != nil { 456 if !strings.Contains(err.Error(), "status: 400") { 457 log.Printf("WARN: [Worker %d] Failed to list records for %s/%s, skipping. Error: %v", id, did, collection, err) 458 } 459 break 460 } 461 462 for _, record := range records { 463 output := h.formatOutput(record, did, collection, session.Params.GetRecordFormat) 464 select { 465 case results <- output: 466 case <-ctx.Done(): 467 log.Printf("[Worker %d] Context cancelled while sending results for %s", id, did) 468 return 469 } 470 } 471 472 session.SetListRecordsCursor(did, collection, nextCursor) 473 cursor = nextCursor 474 if cursor == "" { 475 break 476 } 477 } 478 } 479} 480 481func (h *BackfillHandler) parseQueryParams(r *http.Request) (BackfillParams, string, error) { 482 query := r.URL.Query() 483 ticket := query.Get("ticket") 484 485 wantedDidsStr := query.Get("wantedDids") 486 wantedCollectionsStr := query.Get("wantedCollections") 487 488 if wantedCollectionsStr == "" && wantedDidsStr == "" && ticket == "" { 489 ticket = "jetstreamfalse" 490 } else if ticket == "" { 491 ticket = generateTicket() 492 } 493 494 if wantedDidsStr == "" { 495 log.Println("Query parameter 'wantedDids' not specified, defaulting to '*' (all repos).") 496 wantedDidsStr = "*" 497 } 498 499 if wantedCollectionsStr == "" { 500 log.Println("Query parameter 'wantedCollections' not specified, defaulting to '*' (all collections).") 501 wantedCollectionsStr = "*" 502 } 503 504 params := BackfillParams{ 505 WantedDIDs: strings.Split(wantedDidsStr, ","), 506 WantedCollections: strings.Split(wantedCollectionsStr, ","), 507 GetRecordFormat: query.Get("getRecordFormat") == "true", 508 } 509 return params, ticket, nil 510} 511 512func (h *BackfillHandler) formatOutput(record Record, did, collection string, getRecordFormat bool) interface{} { 513 if getRecordFormat { 514 return GetRecordOutput{ 515 URI: record.URI, 516 CID: record.CID, 517 Value: record.Value, 518 } 519 } 520 uriParts := strings.Split(record.URI, "/") 521 rkey := "" 522 if len(uriParts) == 5 { 523 rkey = uriParts[4] 524 } 525 return JetstreamLikeOutput{ 526 Did: did, 527 Kind: "commit", 528 TimeUS: "1725911162329308", 529 Commit: JetstreamLikeCommit{ 530 Rev: rkey, 531 Operation: "create", 532 Collection: collection, 533 RKey: rkey, 534 Record: record.Value, 535 CID: record.CID, 536 }, 537 } 538} 539 540func (h *BackfillHandler) sendError(conn *websocket.Conn, message string) { 541 log.Printf("Sending error to client: %s", message) 542 _ = conn.WriteJSON(map[string]string{"error": message}) 543} 544 545func (h *BackfillHandler) sendMessage(conn *websocket.Conn, v interface{}) error { 546 return conn.WriteJSON(v) 547}