collection of golang services under the Red Dwarf umbrella
server.reddwarf.app
bluesky
reddwarf
microcosm
appview
1package backstream
2
3import (
4 //"bytes"
5 "context"
6 "encoding/json"
7 "errors"
8 "fmt"
9 "log"
10 "net/http"
11 "strings"
12 "sync"
13 "time"
14
15 "io"
16 "io/ioutil"
17 "os"
18
19 "runtime"
20 "runtime/debug"
21
22 "github.com/gorilla/websocket"
23 "github.com/klauspost/compress/zstd"
24
25 data "github.com/bluesky-social/indigo/atproto/atdata"
26 atrepo "github.com/bluesky-social/indigo/atproto/repo"
27
28 // "github.com/bluesky-social/indigo/repo"
29 "github.com/bluesky-social/indigo/atproto/syntax"
30 "github.com/ipfs/go-cid"
31)
32
33const (
34 numWorkers = 20
35)
36
37var DefaultUpgrader = websocket.Upgrader{
38 CheckOrigin: func(r *http.Request) bool {
39 return true
40 },
41}
42
43type BackfillHandler struct {
44 Upgrader websocket.Upgrader
45 SessionManager *SessionManager
46 AtpClient *ATProtoClient
47 ZstdDict []byte
48 UseGetRepoMethod bool
49}
50
51type BackfillParams struct {
52 WantedDIDs []string
53 WantedCollections []string
54 GetRecordFormat bool
55}
56
57func (h *BackfillHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
58 compress := (r.URL.Query().Get("compress") == "true") && (h.ZstdDict != nil)
59
60 conn, err := h.Upgrader.Upgrade(w, r, nil)
61 if err != nil {
62 log.Printf("Failed to upgrade connection: %v", err)
63 return
64 }
65 defer conn.Close()
66
67 if compress {
68 log.Println("Client requested zstd compression. Enabling.")
69 }
70
71 params, ticket, err := h.parseQueryParams(r)
72 if err != nil {
73 h.sendError(conn, err.Error())
74 return
75 }
76
77 log.Printf("New connection for ticket: %s. DIDs: %v, Collections: %v, Workers: %d", ticket, params.WantedDIDs, params.WantedCollections, numWorkers)
78
79 session := h.SessionManager.GetOrCreate(ticket, params)
80 session.LastAccessed = time.Now()
81
82 ctx, cancel := context.WithCancel(r.Context())
83 defer cancel()
84
85 go func() {
86 defer cancel()
87 for {
88 if _, _, err := conn.ReadMessage(); err != nil {
89 if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
90 log.Printf("Client disconnected for ticket %s (read error): %v", ticket, err)
91 }
92 break
93 }
94 }
95 }()
96
97 var wg sync.WaitGroup
98 jobs := make(chan string, numWorkers)
99 results := make(chan interface{}, 100)
100
101 for i := 1; i <= numWorkers; i++ {
102 go h.worker(ctx, i, &wg, jobs, results, session)
103 }
104
105 writerDone := make(chan struct{})
106 if compress {
107 go h.compressedWriter(ctx, cancel, conn, results, writerDone)
108 } else {
109 go h.writer(ctx, cancel, conn, results, writerDone)
110 }
111
112 wg.Add(1)
113 go h.producer(ctx, &wg, jobs, session)
114
115 wg.Wait()
116 close(results)
117 <-writerDone
118
119 log.Printf("Backfill completed for ticket: %s", session.Ticket)
120 h.sendMessage(conn, map[string]string{"status": "complete", "message": "Backfill finished."})
121}
122
123func (h *BackfillHandler) compressedWriter(ctx context.Context, cancel context.CancelFunc, conn *websocket.Conn, results <-chan interface{}, done chan<- struct{}) {
124 defer close(done)
125
126 encoder, err := zstd.NewWriter(nil, zstd.WithEncoderDict(h.ZstdDict))
127 if err != nil {
128 log.Printf("ERROR: [CompressedWriter] Failed to create zstd encoder with dictionary: %v", err)
129 cancel()
130 return
131 }
132 defer encoder.Close()
133
134 for {
135 select {
136 case result, ok := <-results:
137 if !ok {
138 return
139 }
140
141 data, err := json.Marshal(result)
142 if err != nil {
143 log.Printf("ERROR: [CompressedWriter] Failed to marshal JSON: %v", err)
144 cancel()
145 return
146 }
147
148 compressed := encoder.EncodeAll(data, nil)
149
150 if err := conn.WriteMessage(websocket.BinaryMessage, compressed); err != nil {
151 log.Printf("ERROR: [CompressedWriter] Failed to write compressed message: %v", err)
152 cancel()
153 return
154 }
155 case <-ctx.Done():
156 log.Printf("[CompressedWriter] Context cancelled, stopping.")
157 return
158 }
159 }
160}
161
162func (h *BackfillHandler) writer(ctx context.Context, cancel context.CancelFunc, conn *websocket.Conn, results <-chan interface{}, done chan<- struct{}) {
163 defer close(done)
164 for {
165 select {
166 case result, ok := <-results:
167 if !ok {
168 return
169 }
170 if err := h.sendMessage(conn, result); err != nil {
171 log.Printf("ERROR: [Writer] Failed to write message, closing connection: %v", err)
172 cancel()
173 return
174 }
175 case <-ctx.Done():
176 log.Printf("[Writer] Context cancelled, stopping.")
177 return
178 }
179 }
180}
181
182func (h *BackfillHandler) producer(ctx context.Context, wg *sync.WaitGroup, jobs chan<- string, session *Session) {
183 defer close(jobs)
184 defer wg.Done()
185
186 isFullNetwork := len(session.Params.WantedDIDs) == 1 && session.Params.WantedDIDs[0] == "*"
187 isAllCollections := len(session.Params.WantedCollections) == 1 && session.Params.WantedCollections[0] == "*"
188
189 if isFullNetwork {
190 if isAllCollections {
191 // --- Case 1: Full Network, All Collections (dids=*&collections=*) ---
192 // We need to list *all* repos from the relay.
193 log.Printf("[Producer] Starting full network scan for all collections.")
194 for {
195 select {
196 case <-ctx.Done():
197 log.Printf("[Producer] Context cancelled, stopping full repo fetch.")
198 return
199 default:
200 }
201
202 log.Printf("[Producer] Fetching all repos with cursor: %s", session.ListReposCursor)
203 repos, nextCursor, err := h.AtpClient.ListRepos(ctx, session.ListReposCursor)
204 if err != nil {
205 log.Printf("ERROR: [Producer] Failed to list all repos: %v", err)
206 return
207 }
208
209 for _, repo := range repos {
210 if !session.IsDIDComplete(repo.DID) {
211 wg.Add(1)
212 jobs <- repo.DID
213 }
214 }
215
216 session.mu.Lock()
217 session.ListReposCursor = nextCursor
218 session.LastAccessed = time.Now()
219 session.mu.Unlock()
220
221 if nextCursor == "" {
222 log.Printf("[Producer] Finished fetching all repos from relay.")
223 break
224 }
225 }
226 } else {
227 // --- Case 2: Full Network, Specific Collections (dids=*&collections=a,b,c) ---
228 // For each specific collection, page through all repos and send DIDs to workers.
229 log.Printf("[Producer] Starting network scan for specific collections: %v", session.Params.WantedCollections)
230 for _, collection := range session.Params.WantedCollections {
231 for {
232 select {
233 case <-ctx.Done():
234 log.Printf("[Producer] Context cancelled, stopping repo fetch.")
235 return
236 default:
237 }
238
239 log.Printf("[Producer] Fetching repos for %s with cursor: %s", collection, session.ListReposCursor)
240 repos, nextCursor, err := h.AtpClient.ListReposByCollection(ctx, collection, session.ListReposCursor)
241 if err != nil {
242 log.Printf("ERROR: [Producer] Failed to list repos for collection %s: %v", collection, err)
243 return
244 }
245
246 for _, repo := range repos {
247 if !session.IsDIDComplete(repo.DID) {
248 wg.Add(1)
249 jobs <- repo.DID
250 }
251 }
252
253 session.mu.Lock()
254 session.ListReposCursor = nextCursor
255 session.LastAccessed = time.Now()
256 session.mu.Unlock()
257
258 if nextCursor == "" {
259 log.Printf("[Producer] Finished fetching all repos for collection %s", collection)
260 break
261 }
262 }
263 }
264 }
265 } else {
266 // --- Case 3: Specific List of DIDs (dids=a,b,c) ---
267 // Send user-provided DIDs to workers.
268 for _, did := range session.Params.WantedDIDs {
269 select {
270 case <-ctx.Done():
271 log.Printf("[Producer] Context cancelled, stopping DID processing.")
272 return
273 default:
274 if !session.IsDIDComplete(did) {
275 wg.Add(1)
276 jobs <- did
277 } else {
278 log.Printf("[Producer] Skipping already completed DID: %s", did)
279 }
280 }
281 }
282 }
283}
284
285func (h *BackfillHandler) worker(ctx context.Context, id int, wg *sync.WaitGroup, jobs <-chan string, results chan<- interface{}, session *Session) {
286 for did := range jobs {
287 func(did string) {
288 defer func() {
289 wg.Done()
290
291 runtime.GC()
292 debug.FreeOSMemory()
293
294 log.Printf("[Worker %d] Cleaned up resources for DID: %s", id, did)
295 }()
296
297 select {
298 case <-ctx.Done():
299 return
300 default:
301 }
302
303 log.Printf("[Worker %d] Processing DID: %s", id, did)
304 pdsURL, err := h.AtpClient.ResolveDID(ctx, did)
305 if err != nil {
306 log.Printf("WARN: [Worker %d] Could not resolve DID %s, skipping. Error: %v", id, did, err)
307 return
308 }
309
310 if h.UseGetRepoMethod {
311 h.processDIDWithGetRepo(ctx, id, did, pdsURL, results, session)
312 } else {
313 h.processDIDWithListRecords(ctx, id, did, pdsURL, results, session)
314 }
315
316 session.MarkDIDComplete(did)
317 log.Printf("[Worker %d] Finished DID: %s", id, did)
318 }(did)
319 }
320}
321
322func (h *BackfillHandler) processDIDWithGetRepo(ctx context.Context, id int, did, pdsURL string, results chan<- interface{}, session *Session) {
323 log.Printf("[Worker %d] Using streaming getRepo method for %s", id, did)
324 isAllCollections := len(session.Params.WantedCollections) == 1 && session.Params.WantedCollections[0] == "*"
325
326 wantedSet := make(map[string]struct{})
327 if !isAllCollections {
328 for _, coll := range session.Params.WantedCollections {
329 wantedSet[coll] = struct{}{}
330 }
331 }
332
333 respBody, err := h.AtpClient.GetRepo(ctx, pdsURL, did)
334 if err != nil {
335 log.Printf("WARN: [Worker %d] Failed to get repo stream for %s: %v", id, did, err)
336 return
337 }
338 defer respBody.Close()
339
340 if err := os.MkdirAll("./temp", 0o755); err != nil {
341 panic(err)
342 }
343 tempFile, err := ioutil.TempFile("./temp", "backstream-repo-*.car")
344 if err != nil {
345 log.Printf("ERROR: [Worker %d] Failed to create temp file for %s: %v", id, did, err)
346 return
347 }
348 defer os.Remove(tempFile.Name())
349
350 if _, err := io.Copy(tempFile, respBody); err != nil {
351 log.Printf("ERROR: [Worker %d] Failed to write repo to temp file for %s: %v", id, did, err)
352 return
353 }
354
355 if err := tempFile.Close(); err != nil {
356 log.Printf("ERROR: [Worker %d] Failed to close temp file for %s: %v", id, did, err)
357 return
358 }
359
360 readHandle, err := os.Open(tempFile.Name())
361 if err != nil {
362 log.Printf("ERROR: [Worker %d] Failed to open temp file for reading %s: %v", id, did, err)
363 return
364 }
365 defer readHandle.Close()
366
367 _, r, err := atrepo.LoadRepoFromCAR(ctx, readHandle)
368 if err != nil {
369 log.Printf("WARN: [Worker %d] Failed to read CAR stream for %s from temp file: %v", id, did, err)
370 return
371 }
372
373 err = r.MST.Walk(func(k []byte, v cid.Cid) error {
374 select {
375 case <-ctx.Done():
376 return errors.New("context cancelled during repo walk")
377 default:
378 }
379
380 path := string(k)
381 collection, rkey, err := syntax.ParseRepoPath(path)
382 if err != nil {
383 log.Printf("WARN: [Worker %d] Could not parse repo path '%s' for %s, skipping record", id, path, did)
384 return nil
385 }
386
387 if !isAllCollections {
388 if _, ok := wantedSet[string(collection)]; !ok {
389 return nil
390 }
391 }
392
393 recBytes, _, err := r.GetRecordBytes(ctx, collection, rkey)
394 if err != nil {
395 log.Printf("WARN: [Worker %d] Failed to get record bytes for %s: %v", id, path, err)
396 return nil
397 }
398
399 recordVal, err := data.UnmarshalCBOR(recBytes)
400 if err != nil {
401 log.Printf("WARN: [Worker %d] Failed to unmarshal record CBOR for %s: %v", id, path, err)
402 return nil
403 }
404
405 record := Record{
406 URI: fmt.Sprintf("at://%s/%s", did, path),
407 CID: v.String(),
408 Value: recordVal,
409 }
410
411 output := h.formatOutput(record, did, string(collection), session.Params.GetRecordFormat)
412 select {
413 case results <- output:
414 case <-ctx.Done():
415 return errors.New("context cancelled while sending result")
416 }
417
418 session.SetListRecordsCursor(did, string(collection), string(rkey))
419 return nil
420 })
421
422 if err != nil && !errors.Is(err, context.Canceled) {
423 log.Printf("WARN: [Worker %d] Error while walking repo for %s: %v", id, did, err)
424 }
425}
426
427func (h *BackfillHandler) processDIDWithListRecords(ctx context.Context, id int, did, pdsURL string, results chan<- interface{}, session *Session) {
428 log.Printf("[Worker %d] Using listRecords method for %s", id, did)
429 isAllCollections := len(session.Params.WantedCollections) == 1 && session.Params.WantedCollections[0] == "*"
430 var collectionsToProcess []string
431
432 if isAllCollections {
433 repoCollections, err := h.AtpClient.DescribeRepo(ctx, pdsURL, did)
434 if err != nil {
435 log.Printf("WARN: [Worker %d] Could not describe repo for %s to find collections, skipping. Error: %v", id, did, err)
436 return
437 }
438 collectionsToProcess = repoCollections
439 log.Printf("[Worker %d] Found %d collections for DID %s", id, len(collectionsToProcess), did)
440 } else {
441 collectionsToProcess = session.Params.WantedCollections
442 }
443
444 for _, collection := range collectionsToProcess {
445 cursor := session.GetListRecordsCursor(did, collection)
446 for {
447 select {
448 case <-ctx.Done():
449 log.Printf("[Worker %d] Context cancelled for DID %s", id, did)
450 return
451 default:
452 }
453
454 records, nextCursor, err := h.AtpClient.ListRecords(ctx, pdsURL, did, collection, cursor)
455 if err != nil {
456 if !strings.Contains(err.Error(), "status: 400") {
457 log.Printf("WARN: [Worker %d] Failed to list records for %s/%s, skipping. Error: %v", id, did, collection, err)
458 }
459 break
460 }
461
462 for _, record := range records {
463 output := h.formatOutput(record, did, collection, session.Params.GetRecordFormat)
464 select {
465 case results <- output:
466 case <-ctx.Done():
467 log.Printf("[Worker %d] Context cancelled while sending results for %s", id, did)
468 return
469 }
470 }
471
472 session.SetListRecordsCursor(did, collection, nextCursor)
473 cursor = nextCursor
474 if cursor == "" {
475 break
476 }
477 }
478 }
479}
480
481func (h *BackfillHandler) parseQueryParams(r *http.Request) (BackfillParams, string, error) {
482 query := r.URL.Query()
483 ticket := query.Get("ticket")
484
485 wantedDidsStr := query.Get("wantedDids")
486 wantedCollectionsStr := query.Get("wantedCollections")
487
488 if wantedCollectionsStr == "" && wantedDidsStr == "" && ticket == "" {
489 ticket = "jetstreamfalse"
490 } else if ticket == "" {
491 ticket = generateTicket()
492 }
493
494 if wantedDidsStr == "" {
495 log.Println("Query parameter 'wantedDids' not specified, defaulting to '*' (all repos).")
496 wantedDidsStr = "*"
497 }
498
499 if wantedCollectionsStr == "" {
500 log.Println("Query parameter 'wantedCollections' not specified, defaulting to '*' (all collections).")
501 wantedCollectionsStr = "*"
502 }
503
504 params := BackfillParams{
505 WantedDIDs: strings.Split(wantedDidsStr, ","),
506 WantedCollections: strings.Split(wantedCollectionsStr, ","),
507 GetRecordFormat: query.Get("getRecordFormat") == "true",
508 }
509 return params, ticket, nil
510}
511
512func (h *BackfillHandler) formatOutput(record Record, did, collection string, getRecordFormat bool) interface{} {
513 if getRecordFormat {
514 return GetRecordOutput{
515 URI: record.URI,
516 CID: record.CID,
517 Value: record.Value,
518 }
519 }
520 uriParts := strings.Split(record.URI, "/")
521 rkey := ""
522 if len(uriParts) == 5 {
523 rkey = uriParts[4]
524 }
525 return JetstreamLikeOutput{
526 Did: did,
527 Kind: "commit",
528 TimeUS: "1725911162329308",
529 Commit: JetstreamLikeCommit{
530 Rev: rkey,
531 Operation: "create",
532 Collection: collection,
533 RKey: rkey,
534 Record: record.Value,
535 CID: record.CID,
536 },
537 }
538}
539
540func (h *BackfillHandler) sendError(conn *websocket.Conn, message string) {
541 log.Printf("Sending error to client: %s", message)
542 _ = conn.WriteJSON(map[string]string{"error": message})
543}
544
545func (h *BackfillHandler) sendMessage(conn *websocket.Conn, v interface{}) error {
546 return conn.WriteJSON(v)
547}