this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

some collectiondir fixes (#1024)

This isn't a full pass, just fixing some known issues:

- process wasn't shutting down with Ctrl-C because of a WaitGroup. fixed
the bug, but didn't clean up the (many) channels+goroutines
- many API response errors were strings, not the expected XRPC JSON
message format
- README didn't describe the actual API endpoint

authored by

bnewbold and committed by
GitHub
4fe62dbd 826fcdea

+31 -29
+20 -20
cmd/collectiondir/README.md
··· 1 - # Collection Directory 2 1 3 - Maintain a directory of which repos use which collections of records. 4 - 5 - e.g. "app.bsky.feed.post" is used by did:alice did:bob 2 + `collectiondir`: Directory of Accounts by Collection 3 + ==================================================== 6 4 7 - Firehose consumer and crawler of PDS via listRepos and describeRepo. 5 + This is a small atproto microservice which maintains a directory of which accounts in the network (DIDs) have data (records) for which collections (NSIDs). 8 6 9 - The primary query is: 7 + It primarily serves the `com.atproto.sync.listReposByCollection` API endpoint: 10 8 11 9 ``` 12 - /v1/getDidsForCollection?collection={}&cursor={} 13 - ``` 14 - 15 - It returns JSON: 10 + GET /xrpc/com.atproto.sync.listReposByCollection?collection=com.atproto.sync.listReposByCollection?collection=com.atproto.lexicon.schema&limit=3 16 11 17 - ```json 18 - {"dids":["did:A", "..."], 19 - "cursor":"opaque text"} 12 + { 13 + "repos": [ 14 + { "did": "did:plc:4sm3vprfyl55ui3yhjd7w4po" }, 15 + { "did": "did:plc:xhkqwjmxuo65vwbwuiz53qor" }, 16 + { "did": "did:plc:w3aonw33w3mz3mwws34x5of6" } 17 + ], 18 + "cursor": "QQAAAEkAAAGVgFFLb2RpZDpwbGM6dzNhb253MzN3M216M213d3MzNHg1b2Y2AA==" 19 + } 20 20 ``` 21 21 22 - query parameter `collection` may be repeated up to 10 times. They must always be sent in the same order or the cursor will break. 22 + Features and design points: 23 23 24 - If multiple collections are specified, the result stream is not guaranteed to be de-duplicated on Did and Dids may be repeated. 25 - (A merge window is used so that the service is _likely_ to not send duplicate Dids.) 24 + - persists data in a local key/value database (pebble) 25 + - consumes from the firehose to stay up to date with record creation 26 + - can bootstrap the full network using `com.atproto.sync.listRepos` and `com.atproto.repo.describeRepo` 27 + - single golang binary for easy deployment 26 28 27 29 28 - ### Analytics queries 30 + ## Analytics Endpoint 29 31 30 32 ``` 31 33 /v1/listCollections?c={}&cursor={}&limit={50<=limit<=1000} ··· 41 43 ``` 42 44 43 45 44 - ## Design 45 - 46 - ### Schema 46 + ## Database Schema 47 47 48 48 The primary database is (collection, seen time int64 milliseconds, did) 49 49
+11 -9
cmd/collectiondir/serve.go
··· 297 297 }() 298 298 cs.log.Info("api shutdown start...") 299 299 err := cs.apiServer.Shutdown(context.Background()) 300 - //err := cs.esrv.Shutdown(context.Background()) 301 300 cs.log.Info("api shutdown, thread wait...", "err", err) 302 301 cs.wg.Wait() 303 302 cs.log.Info("threads done, db close...") ··· 465 464 collection := c.QueryParam("collection") 466 465 _, err := syntax.ParseNSID(collection) 467 466 if err != nil { 468 - return c.String(http.StatusBadRequest, fmt.Sprintf("bad collection nsid, %s", err.Error())) 467 + return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: fmt.Sprintf("bad collection nsid, %s", err.Error())}) 469 468 } 470 469 cursor := c.QueryParam("cursor") 471 470 limit := getLimit(c, 1, 500, 10_000) 472 471 they, nextCursor, err := cs.pcd.ReadCollection(ctx, collection, cursor, limit) 473 472 if err != nil { 474 473 slog.Error("ReadCollection", "collection", collection, "cursor", cursor, "limit", limit, "err", err) 475 - return c.String(http.StatusInternalServerError, "oops") 474 + return c.JSON(http.StatusInternalServerError, xrpc.XRPCError{ErrStr: "DatabaseError", Message: "failed to read DIDs for collection"}) 476 475 } 477 476 cs.log.Info("getDidsForCollection", "collection", collection, "cursor", cursor, "limit", limit, "count", len(they), "nextCursor", nextCursor) 478 477 var out comatproto.SyncListReposByCollection_Output ··· 620 619 if stalesecStr != "" && cs.isAdmin(c) { 621 620 stalesec, err := strconv.ParseInt(stalesecStr, 10, 64) 622 621 if err != nil { 623 - return c.String(http.StatusBadRequest, "bad stalesec") 622 + return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: "invalid 'stalesec' query parameter"}) 624 623 } 625 624 if stalesec == 0 { 626 625 stalenessAllowed = 1 ··· 632 631 stats, err := cs.getStatsCache(stalenessAllowed) 633 632 if err != nil { 634 633 slog.Error("getStatsCache", "err", err) 635 - return c.String(http.StatusInternalServerError, "oops") 634 + return c.JSON(http.StatusInternalServerError, xrpc.XRPCError{ErrStr: "DatabaseError", Message: "failed to read stats"}) 636 635 } 637 636 cursor := c.QueryParam("cursor") 638 637 collections, hasQueryCollections := c.QueryParams()["c"] ··· 705 704 } else { 706 705 errcount = 0 707 706 } 707 + case <-cs.shutdown: 708 + cs.log.Info("shutting down ingestReceiver") 709 + return 708 710 } 709 711 if errcount > 10 { 710 712 cs.log.Error("ingestReceiver too many errors") ··· 852 854 func (cs *collectionServer) crawlPds(c echo.Context) error { 853 855 isAdmin := cs.isAdmin(c) 854 856 if !isAdmin { 855 - return c.JSON(http.StatusForbidden, CrawlRequestResponse{Error: "nope"}) 857 + return c.JSON(http.StatusForbidden, xrpc.XRPCError{ErrStr: "AdminRequired", Message: "this endpoint requires admin auth"}) 856 858 } 857 859 hostQ := c.QueryParam("host") 858 860 if hostQ != "" { ··· 864 866 err := c.Bind(&req) 865 867 if err != nil { 866 868 cs.log.Info("bad crawl bind", "err", err) 867 - return c.String(http.StatusBadRequest, err.Error()) 869 + return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "BadRequest", Message: fmt.Sprintf("failed to parse body: %s", err)}) 868 870 } 869 871 if req.Host != "" { 870 872 go cs.crawlThread(req.Host) ··· 958 960 func (cs *collectionServer) crawlStatus(c echo.Context) error { 959 961 authHeader := c.Request().Header.Get("Authorization") 960 962 if authHeader != cs.ExepctedAuthHeader { 961 - return c.JSON(http.StatusForbidden, CrawlRequestResponse{Error: "nope"}) 963 + return c.JSON(http.StatusBadRequest, xrpc.XRPCError{ErrStr: "AdminAuthRequired", Message: "this endpoint requires admin-level auth"}) 962 964 } 963 965 var out CrawlStatusResponse 964 966 out.HostCrawls = make(map[string]HostCrawl) ··· 977 979 978 980 func (cs *collectionServer) healthz(c echo.Context) error { 979 981 // TODO: check database or upstream health? 980 - return c.String(http.StatusOK, "ok") 982 + return c.JSON(http.StatusOK, map[string]any{"status": "ok"}) 981 983 } 982 984 983 985 func loadBadwords(path string) (*BadwordsRE, error) {