this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

collection directory service lookup repos by collection (who has app.bsky.feed.post records ?) firehose consumer and crawl PDS by listRepos,describeRepo daily-active-users collections

authored by

Brian Olson and committed by
Brian Olson
29bad144 79a03573

+1973
+43
cmd/collectiondir/Dockerfile
··· 1 + FROM golang:1.23-bullseye AS build-env 2 + 3 + ENV DEBIAN_FRONTEND=noninteractive 4 + ENV TZ=Etc/UTC 5 + ENV GODEBUG="netdns=go" 6 + ENV GOOS="linux" 7 + ENV GOARCH="amd64" 8 + ENV CGO_ENABLED="1" 9 + 10 + WORKDIR /usr/src/collectiondir 11 + 12 + COPY . . 13 + 14 + RUN go mod download && \ 15 + go mod verify 16 + 17 + RUN go build \ 18 + -v \ 19 + -trimpath \ 20 + -tags timetzdata \ 21 + -o /collectiondir-bin \ 22 + ./cmd/collectiondir 23 + 24 + FROM debian:bullseye-slim 25 + 26 + ENV DEBIAN_FRONTEND="noninteractive" 27 + ENV TZ=Etc/UTC 28 + ENV GODEBUG="netdns=go" 29 + 30 + RUN apt-get update && apt-get install --yes \ 31 + dumb-init \ 32 + ca-certificates \ 33 + runit 34 + 35 + WORKDIR /collectiondir 36 + COPY --from=build-env /collectiondir-bin /usr/bin/collectiondir 37 + 38 + ENTRYPOINT ["/usr/bin/dumb-init", "--"] 39 + CMD ["/usr/bin/collectiondir"] 40 + 41 + LABEL org.opencontainers.image.source=https://github.com/bluesky-social/indigo 42 + LABEL org.opencontainers.image.description="collectiondir " 43 + LABEL org.opencontainers.image.licenses=MIT
+56
cmd/collectiondir/README.md
··· 1 + # Collection Directory 2 + 3 + Maintain a directory of which repos use which collections of records. 4 + 5 + e.g. "app.bsky.feed.post" is used by did:alice did:bob 6 + 7 + Firehose consumer and crawler of PDS via listRepos and describeRepo. 8 + 9 + The primary query is: 10 + 11 + ``` 12 + /v1/getDidsForCollection?collection={}&cursor={} 13 + ``` 14 + 15 + It returns JSON: 16 + 17 + ```json 18 + {"dids":["did:A", "..."], 19 + "cursor":"opaque text"} 20 + ``` 21 + 22 + query parameter `collection` may be repeated up to 10 times. They must always be sent in the same order or the cursor will break. 23 + 24 + If multiple collections are specified, the result stream is not guaranteed to be de-duplicated on Did and Dids may be repeated. 25 + (A merge window is used so that the service is _likely_ to not send duplicate Dids.) 26 + 27 + 28 + ### Analytics queries 29 + 30 + ``` 31 + /v1/listCollections?c={}&cursor={}&limit={50<=limit<=1000} 32 + ``` 33 + 34 + `listCollections` returns JSON with a map of collection name to approximate number of dids implementing it. 35 + With no `c` parameter it returns all known collections with cursor paging. 36 + With up to 20 repeated `c` paramaters it returns only those collections (no paging). 37 + It may be the cached result of a computation, up to several minutes out of date. 38 + ```json 39 + {"collections":{"app.bsky.feed.post": 123456789, "some collection": 42}, 40 + "cursor":"opaque text"} 41 + ``` 42 + 43 + 44 + ## Design 45 + 46 + ### Schema 47 + 48 + The primary database is (collection, seen time int64 milliseconds, did) 49 + 50 + This allows for efficient cursor fetching of more dids for a collection. 51 + 52 + e.g. A new service starts consuming the firehose for events it wants in collection `com.newservice.data.thing`, 53 + it then calls the collection directory for a list of repos which may have already created data in this collection, 54 + and does `getRepo` calls to those repo's PDSes to get prior data. 55 + By the time it is done paging forward through the collection directory results and getting those repos, 56 + it will have backfilled data and new data it has collected live off the firehose.
+151
cmd/collectiondir/collectiondir.go
··· 1 + package main 2 + 3 + import ( 4 + "compress/gzip" 5 + "encoding/csv" 6 + "encoding/json" 7 + "errors" 8 + "fmt" 9 + "github.com/carlmjohnson/versioninfo" 10 + "github.com/urfave/cli/v2" 11 + "io" 12 + "log/slog" 13 + "os" 14 + "strings" 15 + ) 16 + 17 + func main() { 18 + app := cli.App{ 19 + Name: "collectiondir", 20 + Usage: "collection directory service", 21 + Version: versioninfo.Short(), 22 + Flags: []cli.Flag{ 23 + &cli.BoolFlag{ 24 + Name: "verbose", 25 + }, 26 + }, 27 + Commands: []*cli.Command{ 28 + serveCmd, 29 + crawlCmd, 30 + buildCmd, 31 + statsCmd, 32 + }, 33 + } 34 + err := app.Run(os.Args) 35 + if err != nil { 36 + fmt.Fprintf(os.Stderr, "%s\n", err.Error()) 37 + os.Exit(1) 38 + } 39 + } 40 + 41 + var statsCmd = &cli.Command{ 42 + Name: "stats", 43 + Flags: []cli.Flag{ 44 + &cli.StringFlag{ 45 + Name: "pebble", 46 + Usage: "path to store pebble db", 47 + Required: true, 48 + }, 49 + }, 50 + Action: func(cctx *cli.Context) error { 51 + logLevel := slog.LevelInfo 52 + if cctx.Bool("verbose") { 53 + logLevel = slog.LevelDebug 54 + } 55 + log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel})) 56 + slog.SetDefault(log) 57 + pebblePath := cctx.String("pebble") 58 + var db PebbleCollectionDirectory 59 + db.log = log 60 + err := db.Open(pebblePath) 61 + if err != nil { 62 + return err 63 + } 64 + defer db.Close() 65 + 66 + stats, err := db.GetCollectionStats() 67 + if err != nil { 68 + return err 69 + } 70 + blob, err := json.MarshalIndent(stats, "", " ") 71 + os.Stdout.Write(blob) 72 + os.Stdout.Write([]byte{'\n'}) 73 + return nil 74 + }, 75 + } 76 + 77 + var buildCmd = &cli.Command{ 78 + Name: "build", 79 + Usage: "collect csv into a database", 80 + Flags: []cli.Flag{ 81 + &cli.StringFlag{ 82 + Name: "csv", 83 + Required: true, 84 + }, 85 + &cli.StringFlag{ 86 + Name: "pebble", 87 + Usage: "path to store pebble db", 88 + Required: true, 89 + }, 90 + }, 91 + Action: func(cctx *cli.Context) error { 92 + logLevel := slog.LevelInfo 93 + if cctx.Bool("verbose") { 94 + logLevel = slog.LevelDebug 95 + } 96 + log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel})) 97 + slog.SetDefault(log) 98 + pebblePath := cctx.String("pebble") 99 + var db PebbleCollectionDirectory 100 + db.log = log 101 + err := db.Open(pebblePath) 102 + if err != nil { 103 + return err 104 + } 105 + defer db.Close() 106 + csvPath := cctx.String("csv") 107 + var fin io.Reader 108 + if csvPath == "-" { 109 + fin = os.Stdin 110 + } else if strings.HasSuffix(csvPath, ".gz") { 111 + osin, err := os.Open(csvPath) 112 + if err != nil { 113 + return fmt.Errorf("%s: could not open csv, %w", csvPath, err) 114 + } 115 + defer osin.Close() 116 + gzin, err := gzip.NewReader(osin) 117 + if err != nil { 118 + return fmt.Errorf("%s: could not open csv, %w", csvPath, err) 119 + } 120 + defer gzin.Close() 121 + fin = gzin 122 + } else { 123 + osin, err := os.Open(csvPath) 124 + if err != nil { 125 + return fmt.Errorf("%s: could not open csv, %w", csvPath, err) 126 + } 127 + defer osin.Close() 128 + fin = osin 129 + } 130 + reader := csv.NewReader(fin) 131 + rowcount := 0 132 + results := make(chan DidCollection, 100) 133 + go db.SetFromResults(results) 134 + for { 135 + row, err := reader.Read() 136 + if errors.Is(err, io.EOF) { 137 + break 138 + } 139 + did := row[0] 140 + collection := row[1] 141 + results <- DidCollection{ 142 + Did: did, 143 + Collection: collection, 144 + } 145 + rowcount++ 146 + } 147 + close(results) 148 + log.Debug("read csv", "rows", rowcount) 149 + return nil 150 + }, 151 + }
+152
cmd/collectiondir/crawl.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "encoding/csv" 6 + "fmt" 7 + "golang.org/x/time/rate" 8 + "io" 9 + "log/slog" 10 + "net/http" 11 + "net/url" 12 + "os" 13 + 14 + "github.com/urfave/cli/v2" 15 + 16 + "github.com/bluesky-social/indigo/api/atproto" 17 + "github.com/bluesky-social/indigo/xrpc" 18 + ) 19 + 20 + type DidCollection struct { 21 + Did string `json:"d"` 22 + Collection string `json:"c"` 23 + } 24 + 25 + func DidCollectionsToCsv(out io.Writer, sources <-chan DidCollection) { 26 + writer := csv.NewWriter(out) 27 + defer writer.Flush() 28 + var row [2]string 29 + for dc := range sources { 30 + row[0] = dc.Did 31 + row[1] = dc.Collection 32 + writer.Write(row[:]) 33 + } 34 + } 35 + 36 + var crawlCmd = &cli.Command{ 37 + Name: "crawl", 38 + Usage: "crawl a PDS", 39 + Flags: []cli.Flag{ 40 + &cli.StringFlag{ 41 + Name: "host", 42 + Usage: "hostname or URL of PDS", 43 + }, 44 + &cli.StringFlag{ 45 + Name: "csv-out", 46 + Usage: "path for output or - for stdout", 47 + }, 48 + &cli.Float64Flag{ 49 + Name: "qps", 50 + Usage: "queries per second to do vs target PDS", 51 + Value: 50, // large PDS: 500_000 repos, 10_000 seconds, ~3 hours 52 + }, 53 + &cli.StringFlag{ 54 + Name: "ratelimit-header", 55 + Usage: "secret for friend PDSes", 56 + EnvVars: []string{"BSKY_SOCIAL_RATE_LIMIT_SKIP", "RATE_LIMIT_HEADER"}, 57 + }, 58 + }, 59 + Action: func(cctx *cli.Context) error { 60 + log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelInfo})) 61 + ctx, cancel := context.WithCancel(context.Background()) 62 + defer cancel() 63 + hostname := cctx.String("host") 64 + hosturl, err := url.Parse(hostname) 65 + if err != nil { 66 + hosturl = new(url.URL) 67 + hosturl.Scheme = "https" 68 + hosturl.Host = hostname 69 + } 70 + httpClient := http.Client{} 71 + rpcClient := xrpc.Client{ 72 + Host: hosturl.String(), 73 + Client: &httpClient, 74 + } 75 + if cctx.IsSet("ratelimit-header") { 76 + rpcClient.Headers = map[string]string{ 77 + "x-ratelimit-bypass": cctx.String("ratelimit-header"), 78 + } 79 + } 80 + log.Info("will crawl", "url", rpcClient.Host) 81 + csvOutPath := cctx.String("csv-out") 82 + var fout io.Writer = os.Stdout 83 + if csvOutPath != "" { 84 + if csvOutPath == "-" { 85 + fout = os.Stdout 86 + } else { 87 + fout, err = os.Create(csvOutPath) 88 + if err != nil { 89 + return fmt.Errorf("%s: could not open for writing: %w", csvOutPath, err) 90 + } 91 + } 92 + } 93 + qps := cctx.Float64("qps") 94 + results := make(chan DidCollection, 100) 95 + defer close(results) 96 + go DidCollectionsToCsv(fout, results) 97 + crawler := Crawler{ 98 + Ctx: ctx, 99 + RpcClient: &rpcClient, 100 + QPS: qps, 101 + Results: results, 102 + Log: log, 103 + } 104 + err = crawler.CrawlPDSRepoCollections() 105 + log.Info("done") 106 + 107 + return err 108 + }, 109 + } 110 + 111 + type Crawler struct { 112 + Ctx context.Context 113 + RpcClient *xrpc.Client 114 + QPS float64 115 + Results chan<- DidCollection 116 + Log *slog.Logger 117 + } 118 + 119 + // CrawlPDSRepoCollections 120 + // write results to chan 121 + // does _not_ close chan 122 + // (allow multiple threads of PDS queries running to one output chan, e.g. feeding into SetFromResults() ) 123 + func (cr *Crawler) CrawlPDSRepoCollections() error { 124 + var cursor string 125 + limiter := rate.NewLimiter(rate.Limit(cr.QPS), 1) 126 + for { 127 + limiter.Wait(cr.Ctx) 128 + repos, err := atproto.SyncListRepos(cr.Ctx, cr.RpcClient, cursor, 1000) 129 + if err != nil { 130 + // TODO: wait N seconds, retry M times 131 + return fmt.Errorf("%s: sync repos: %w", cr.RpcClient.Host, err) 132 + } 133 + slog.Info("got repo list", "count", len(repos.Repos)) 134 + for _, xr := range repos.Repos { 135 + limiter.Wait(cr.Ctx) 136 + desc, err := atproto.RepoDescribeRepo(cr.Ctx, cr.RpcClient, xr.Did) 137 + if err != nil { 138 + slog.Error("repo desc", "host", cr.RpcClient.Host, "did", xr.Did, "err", err) 139 + continue 140 + } 141 + for _, collection := range desc.Collections { 142 + cr.Results <- DidCollection{Did: xr.Did, Collection: collection} 143 + } 144 + } 145 + if repos.Cursor != nil { 146 + cursor = *repos.Cursor 147 + } else { 148 + break 149 + } 150 + } 151 + return nil 152 + }
+103
cmd/collectiondir/firehose.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "github.com/bluesky-social/indigo/events" 7 + "github.com/gorilla/websocket" 8 + "log/slog" 9 + "net/http" 10 + "net/url" 11 + "time" 12 + ) 13 + 14 + type Firehose struct { 15 + Log *slog.Logger 16 + 17 + Host string 18 + Seq int64 19 + 20 + events chan<- *events.XRPCStreamEvent 21 + } 22 + 23 + func (fh *Firehose) subscribeWithRedialer(ctx context.Context, fhevents chan<- *events.XRPCStreamEvent) error { 24 + defer close(fhevents) 25 + d := websocket.Dialer{} 26 + 27 + rurl, err := url.Parse(fh.Host) 28 + if err != nil { 29 + rurl = new(url.URL) 30 + rurl.Host = fh.Host 31 + rurl.Scheme = "wss" 32 + } else { 33 + if rurl.Scheme == fh.Host { 34 + rurl.Scheme = "wss" 35 + } 36 + if rurl.Scheme == "https" || rurl.Scheme == "wss" { 37 + rurl.Scheme = "wss" 38 + } else if rurl.Scheme == "http" || rurl.Scheme == "ws" { 39 + rurl.Scheme = "ws" 40 + } else if rurl.Scheme == "" { 41 + rurl.Scheme = "wss" 42 + } else { 43 + return fmt.Errorf("host unknown scheme %#v", rurl.Scheme) 44 + } 45 + } 46 + //protocol := "wss" 47 + subscribeReposUrl := rurl.JoinPath("/xrpc/com.atproto.sync.subscribeRepos") 48 + fh.events = fhevents 49 + 50 + var backoff int 51 + for { 52 + select { 53 + case <-ctx.Done(): 54 + return nil 55 + default: 56 + } 57 + 58 + header := http.Header{ 59 + "User-Agent": []string{"bgs-rainbow-v0"}, 60 + } 61 + 62 + if fh.Seq >= 0 { 63 + subscribeReposUrl.RawQuery = fmt.Sprintf("cursor=%d", fh.Seq) 64 + } 65 + url := subscribeReposUrl.String() 66 + con, res, err := d.DialContext(ctx, url, header) 67 + if err != nil { 68 + fh.Log.Warn("dialing failed", "url", url, "err", err, "backoff", backoff) 69 + time.Sleep(5 * time.Second) 70 + backoff++ 71 + 72 + continue 73 + } 74 + 75 + fh.Log.Info("event subscription response", "code", res.StatusCode) 76 + 77 + if err := fh.handleConnection(ctx, con); err != nil { 78 + fh.Log.Warn("connection failed", "host", fh.Host, "err", err) 79 + } 80 + } 81 + } 82 + 83 + func (fh *Firehose) handleConnection(ctx context.Context, con *websocket.Conn) error { 84 + ctx, cancel := context.WithCancel(ctx) 85 + defer cancel() 86 + 87 + return events.HandleRepoStream(ctx, con, fh, fh.Log) 88 + } 89 + 90 + // AddWork is part of events.Scheduler 91 + func (fh *Firehose) AddWork(ctx context.Context, repo string, val *events.XRPCStreamEvent) error { 92 + tsv, ok := val.GetSequence() 93 + if ok { 94 + fh.Seq = tsv 95 + } 96 + fh.events <- val 97 + return nil 98 + } 99 + 100 + // Shutdown is part of events.Scheduler 101 + func (fh *Firehose) Shutdown() { 102 + // unneeded in this usage 103 + }
+92
cmd/collectiondir/metrics.go
··· 1 + package main 2 + 3 + import ( 4 + "errors" 5 + "github.com/labstack/echo/v4" 6 + "github.com/prometheus/client_golang/prometheus" 7 + "github.com/prometheus/client_golang/prometheus/promauto" 8 + "net/http" 9 + "strconv" 10 + "time" 11 + ) 12 + 13 + var firehoseReceivedCounter = promauto.NewCounter(prometheus.CounterOpts{ 14 + Name: "collectiondir_firehose_received_total", 15 + Help: "number of events received from upstream firehose", 16 + }) 17 + var firehoseCommits = promauto.NewCounter(prometheus.CounterOpts{ 18 + Name: "collectiondir_firehose_commits", 19 + Help: "number of #commit events received from upstream firehose", 20 + }) 21 + var firehoseCommitOps = promauto.NewCounterVec(prometheus.CounterOpts{ 22 + Name: "collectiondir_firehose_commit_ops", 23 + Help: "number of #commit events received from upstream firehose", 24 + }, []string{"op"}) 25 + 26 + var firehoseDidcSet = promauto.NewCounter(prometheus.CounterOpts{ 27 + Name: "collectiondir_firehose_didc_total", 28 + }) 29 + 30 + var pebbleDup = promauto.NewCounter(prometheus.CounterOpts{ 31 + Name: "collectiondir_pebble_dup_total", 32 + }) 33 + 34 + var pebbleNew = promauto.NewCounter(prometheus.CounterOpts{ 35 + Name: "collectiondir_pebble_new_total", 36 + }) 37 + 38 + var pdsCrawledCounter = promauto.NewCounter(prometheus.CounterOpts{ 39 + Name: "collectiondir_pds_crawled_total", 40 + }) 41 + 42 + var reqDur = promauto.NewHistogramVec(prometheus.HistogramOpts{ 43 + Name: "http_request_duration_seconds", 44 + Help: "A histogram of latencies for requests.", 45 + Buckets: prometheus.ExponentialBuckets(0.001, 2, 15), 46 + }, []string{"code", "method", "path"}) 47 + 48 + var reqCnt = promauto.NewCounterVec(prometheus.CounterOpts{ 49 + Name: "http_requests_total", 50 + Help: "A counter for requests to the wrapped handler.", 51 + }, []string{"code", "method", "path"}) 52 + 53 + // MetricsMiddleware defines handler function for metrics middleware 54 + // TODO: reunify with bgs/metrics.go ? 55 + func MetricsMiddleware(next echo.HandlerFunc) echo.HandlerFunc { 56 + return func(c echo.Context) error { 57 + path := c.Path() 58 + if path == "/metrics" || path == "/_health" { 59 + return next(c) 60 + } 61 + 62 + start := time.Now() 63 + //requestSize := computeApproximateRequestSize(c.Request()) 64 + 65 + err := next(c) 66 + 67 + status := c.Response().Status 68 + if err != nil { 69 + var httpError *echo.HTTPError 70 + if errors.As(err, &httpError) { 71 + status = httpError.Code 72 + } 73 + if status == 0 || status == http.StatusOK { 74 + status = http.StatusInternalServerError 75 + } 76 + } 77 + 78 + elapsed := float64(time.Since(start)) / float64(time.Second) 79 + 80 + statusStr := strconv.Itoa(status) 81 + method := c.Request().Method 82 + 83 + //responseSize := float64(c.Response().Size) 84 + 85 + reqDur.WithLabelValues(statusStr, method, path).Observe(elapsed) 86 + reqCnt.WithLabelValues(statusStr, method, path).Inc() 87 + //reqSz.WithLabelValues(statusStr, method, path).Observe(float64(requestSize)) 88 + //resSz.WithLabelValues(statusStr, method, path).Observe(responseSize) 89 + 90 + return err 91 + } 92 + }
+377
cmd/collectiondir/pebble.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "encoding/base64" 6 + "encoding/binary" 7 + "errors" 8 + "fmt" 9 + "github.com/cockroachdb/pebble" 10 + "log/slog" 11 + "time" 12 + ) 13 + 14 + func makeCollectionInternKey(collection string) []byte { 15 + out := make([]byte, len(collection)+1) 16 + out[0] = 'C' 17 + copy(out[1:], collection) 18 + return out 19 + } 20 + 21 + func parseCollectionInternKey(key []byte) string { 22 + if key[0] != 'C' { 23 + panic(fmt.Sprintf("collection key must start with C, got %v", key[0])) 24 + } 25 + return string(key[1:]) 26 + } 27 + 28 + func makePrimaryPebbleRow(collectionId uint32, did string, seenMs int64) []byte { 29 + out := make([]byte, 1+4+8+len(did)) 30 + out[0] = 'A' 31 + binary.BigEndian.PutUint32(out[1:], collectionId) 32 + pos := 1 + 4 33 + binary.BigEndian.PutUint64(out[pos:], uint64(seenMs)) 34 + pos += 8 35 + copy(out[pos:], did) 36 + return out 37 + } 38 + 39 + func parsePrimaryPebbleRow(row []byte) (collectionId uint32, did string, seenMs int64) { 40 + if row[0] != 'A' { 41 + panic(fmt.Sprintf("primary row key wanted A got %v", row[0])) 42 + } 43 + collectionId = binary.BigEndian.Uint32(row[1:5]) 44 + seenMs = int64(binary.BigEndian.Uint64(row[5:13])) 45 + did = string(row[13:]) 46 + return collectionId, did, seenMs 47 + } 48 + 49 + func makeByDidKey(did string, collectionId uint32) []byte { 50 + out := make([]byte, 1+len(did)+4) 51 + out[0] = 'D' 52 + copy(out[1:1+len(did)], did) 53 + pos := 1 + len(did) 54 + binary.BigEndian.PutUint32(out[pos:], collectionId) 55 + return out 56 + } 57 + 58 + func parseByDidKey(key []byte) (did string, collectionId uint32) { 59 + if key[0] != 'D' { 60 + panic(fmt.Sprintf("by did key wanted D got %v", key[0])) 61 + } 62 + last4 := len(key) - 5 63 + collectionId = binary.BigEndian.Uint32(key[last4:]) 64 + did = string(key[1 : last4+1]) 65 + return did, collectionId 66 + } 67 + 68 + // PebbleCollectionDirectory holds a DID<=>{collections} directory in pebble db. 69 + // The primary database is (collection, seen time int64 milliseconds, did) 70 + // Inner schema: 71 + // C{collection} : {uint32 collectionId} 72 + // D{did}{uint32 collectionId} : {uint64 seen ms} 73 + // A{uint32 collectionId}{uint64 seen ms}{did} : 't' 74 + type PebbleCollectionDirectory struct { 75 + db *pebble.DB 76 + 77 + // collections can be LRU cache if it ever becomes too big 78 + collections map[string]uint32 79 + collectionNames map[uint32]string // TODO: B-tree would be nice 80 + maxCollectionId uint32 81 + 82 + log *slog.Logger 83 + } 84 + 85 + func (pcd *PebbleCollectionDirectory) Open(pebblePath string) error { 86 + db, err := pebble.Open(pebblePath, &pebble.Options{}) 87 + if err != nil { 88 + return fmt.Errorf("%s: could not open db, %w", pebblePath, err) 89 + } 90 + pcd.db = db 91 + pcd.collections = make(map[string]uint32) 92 + pcd.collectionNames = make(map[uint32]string) 93 + if pcd.log == nil { 94 + pcd.log = slog.Default() 95 + } 96 + return pcd.ReadAllCollectionInterns(context.Background()) 97 + } 98 + 99 + func (pcd *PebbleCollectionDirectory) Close() error { 100 + err := pcd.db.Flush() 101 + if err != nil { 102 + pcd.log.Error("pebble flush", "err", err) 103 + } 104 + err = pcd.db.Close() 105 + if err != nil { 106 + pcd.log.Error("pebble close", "err", err) 107 + } 108 + return err 109 + } 110 + 111 + func (pcd *PebbleCollectionDirectory) ReadAllCollectionInterns(ctx context.Context) error { 112 + lower := []byte{'C'} 113 + upper := []byte{'D'} 114 + iter, err := pcd.db.NewIterWithContext(ctx, &pebble.IterOptions{ 115 + LowerBound: lower, 116 + UpperBound: upper, 117 + }) 118 + if err != nil { 119 + return fmt.Errorf("collection iter start, %w", err) 120 + } 121 + defer iter.Close() 122 + count := 0 123 + for iter.First(); iter.Valid(); iter.Next() { 124 + key := iter.Key() 125 + value, err := iter.ValueAndErr() 126 + if err != nil { 127 + return fmt.Errorf("collection iter, %w", err) 128 + } 129 + collection := parseCollectionInternKey(key) 130 + collectionId := binary.BigEndian.Uint32(value) 131 + count++ 132 + pcd.collections[collection] = collectionId 133 + pcd.collectionNames[collectionId] = collection 134 + if collectionId > pcd.maxCollectionId { 135 + pcd.maxCollectionId = collectionId 136 + } 137 + pcd.log.Debug("collection", "name", collection, "id", collectionId) 138 + } 139 + pcd.log.Debug("read collections", "count", count, "max", pcd.maxCollectionId) 140 + return nil 141 + } 142 + 143 + type CollectionDidTime struct { 144 + Collection string 145 + Did string 146 + UnixMillis int64 147 + } 148 + 149 + func (pcd *PebbleCollectionDirectory) ReadAllPrimary(ctx context.Context, out chan<- CollectionDidTime) error { 150 + defer close(out) 151 + lower := []byte{'A'} 152 + upper := []byte{'B'} 153 + iter, err := pcd.db.NewIterWithContext(ctx, &pebble.IterOptions{ 154 + LowerBound: lower, 155 + UpperBound: upper, 156 + }) 157 + if err != nil { 158 + return fmt.Errorf("collection iter start, %w", err) 159 + } 160 + defer iter.Close() 161 + count := 0 162 + done := ctx.Done() 163 + for iter.First(); iter.Valid(); iter.Next() { 164 + key := iter.Key() 165 + collectionId, did, seenMs := parsePrimaryPebbleRow(key) 166 + count++ 167 + collection := pcd.collectionNames[collectionId] 168 + rec := CollectionDidTime{ 169 + Collection: collection, 170 + Did: did, 171 + UnixMillis: seenMs, 172 + } 173 + select { 174 + case <-done: 175 + return nil 176 + case out <- rec: 177 + } 178 + } 179 + pcd.log.Debug("read primary", "count", count) 180 + return nil 181 + } 182 + 183 + func (pcd *PebbleCollectionDirectory) ReadCollection(ctx context.Context, collection, cursor string, limit int) (result []CollectionDidTime, nextCursor string, err error) { 184 + var lower []byte 185 + collectionId, err := pcd.CollectionToId(collection) 186 + if err != nil { 187 + return nil, "", fmt.Errorf("collection id err, %w", err) 188 + } 189 + if cursor != "" { 190 + lower, err = base64.StdEncoding.DecodeString(cursor) 191 + if err != nil { 192 + return nil, "", fmt.Errorf("could not decode cursor, %w", err) 193 + } 194 + } else { 195 + lower = make([]byte, 1+4) 196 + lower[0] = 'A' 197 + binary.BigEndian.PutUint32(lower[1:], collectionId) 198 + } 199 + var upper [5]byte 200 + upper[0] = 'A' 201 + binary.BigEndian.PutUint32(upper[1:], collectionId+1) 202 + iter, err := pcd.db.NewIterWithContext(ctx, &pebble.IterOptions{ 203 + LowerBound: lower, 204 + UpperBound: upper[:], 205 + }) 206 + if err != nil { 207 + return nil, "", fmt.Errorf("collection iter start, %w", err) 208 + } 209 + defer iter.Close() 210 + count := 0 211 + done := ctx.Done() 212 + result = make([]CollectionDidTime, 0, limit) 213 + for iter.First(); iter.Valid(); iter.Next() { 214 + key := iter.Key() 215 + collectionId, did, seenMs := parsePrimaryPebbleRow(key) 216 + count++ 217 + collection := pcd.collectionNames[collectionId] 218 + rec := CollectionDidTime{ 219 + Collection: collection, 220 + Did: did, 221 + UnixMillis: seenMs, 222 + } 223 + result = append(result, rec) 224 + breaker := false 225 + if count >= limit { 226 + breaker = true 227 + } else { 228 + select { 229 + case <-done: 230 + breaker = true 231 + default: 232 + } 233 + } 234 + if breaker { 235 + prevKey := make([]byte, len(key), len(key)+1) 236 + copy(prevKey, key) 237 + prevKey = append(prevKey, 0) 238 + nextCursor = base64.StdEncoding.EncodeToString(prevKey) 239 + break 240 + } 241 + } 242 + pcd.log.Debug("read primary", "count", count) 243 + return result, nextCursor, nil 244 + } 245 + 246 + func (pcd *PebbleCollectionDirectory) CollectionToId(collection string) (uint32, error) { 247 + // easy mode: in cache 248 + collectionId, ok := pcd.collections[collection] 249 + if ok { 250 + return collectionId, nil 251 + } 252 + 253 + // read from db 254 + key := makeCollectionInternKey(collection) 255 + value, closer, err := pcd.db.Get(key) 256 + if closer != nil { 257 + defer closer.Close() 258 + } 259 + if err == nil { 260 + collectionId = binary.BigEndian.Uint32(value) 261 + return collectionId, nil 262 + } 263 + 264 + // make new id, write to db 265 + if errors.Is(err, pebble.ErrNotFound) { 266 + // ok, fall through 267 + } else if err != nil { 268 + return 0, fmt.Errorf("pebble get err, %w", err) 269 + } 270 + collectionId = pcd.maxCollectionId + 1 271 + pcd.maxCollectionId = collectionId 272 + var cib [4]byte 273 + binary.BigEndian.PutUint32(cib[:], collectionId) 274 + err = pcd.db.Set(key, cib[:], pebble.NoSync) 275 + if err != nil { 276 + return 0, fmt.Errorf("pebble set err, %w", err) 277 + } 278 + pcd.collections[collection] = collectionId 279 + pcd.collectionNames[collectionId] = collection 280 + return collectionId, nil 281 + } 282 + 283 + var trueValue = [1]byte{'t'} 284 + 285 + func (pcd *PebbleCollectionDirectory) MaybeSetCollection(did, collection string) error { 286 + collectionId, err := pcd.CollectionToId(collection) 287 + if err != nil { 288 + return err 289 + } 290 + dkey := makeByDidKey(did, collectionId) 291 + _, closer, err := pcd.db.Get(dkey) 292 + if closer != nil { 293 + defer closer.Close() 294 + } 295 + if err == nil { 296 + // already exists, done 297 + pebbleDup.Inc() 298 + return nil 299 + } 300 + if errors.Is(err, pebble.ErrNotFound) { 301 + // ok, fall through 302 + } else if err != nil { 303 + return fmt.Errorf("pebble get err, %w", err) 304 + } 305 + 306 + now := time.Now() 307 + pkey := makePrimaryPebbleRow(collectionId, did, now.UnixMilli()) 308 + err = pcd.db.Set(pkey, trueValue[:], pebble.NoSync) 309 + if err != nil { 310 + return fmt.Errorf("pebble set err, %w", err) 311 + } 312 + var timebytes [8]byte 313 + binary.BigEndian.PutUint64(timebytes[:], uint64(now.UnixMilli())) 314 + err = pcd.db.Set(dkey, timebytes[:], pebble.NoSync) 315 + if err != nil { 316 + return fmt.Errorf("pebble set err, %w", err) 317 + } 318 + pebbleNew.Inc() 319 + return nil 320 + } 321 + 322 + func (pcd *PebbleCollectionDirectory) SetFromResults(results <-chan DidCollection) { 323 + errcount := 0 324 + for result := range results { 325 + err := pcd.MaybeSetCollection(result.Did, result.Collection) 326 + if err != nil { 327 + errcount++ 328 + pcd.log.Error("set collection", "err", err) 329 + if errcount > 0 { 330 + // TODO: signal backpressure and shutdown 331 + return 332 + } 333 + } else { 334 + errcount = 0 335 + } 336 + } 337 + } 338 + 339 + type CollectionStats struct { 340 + CollectionCounts map[string]uint64 `json:"collections"` 341 + } 342 + 343 + func (pcd *PebbleCollectionDirectory) GetCollectionStats() (stats CollectionStats, err error) { 344 + ctx := context.Background() 345 + records := make(chan CollectionDidTime, 1000) 346 + go pcd.ReadAllPrimary(ctx, records) 347 + 348 + stats.CollectionCounts = make(map[string]uint64) 349 + 350 + for rec := range records { 351 + stats.CollectionCounts[rec.Collection]++ 352 + } 353 + 354 + return stats, nil 355 + } 356 + 357 + const seqKey = "Xseq" 358 + 359 + func (pcd *PebbleCollectionDirectory) SetSequence(seq int64) error { 360 + var seqb [8]byte 361 + binary.BigEndian.PutUint64(seqb[:], uint64(seq)) 362 + return pcd.db.Set([]byte(seqKey), seqb[:], pebble.NoSync) 363 + } 364 + func (pcd *PebbleCollectionDirectory) GetSequence() (int64, bool, error) { 365 + vbytes, closer, err := pcd.db.Get([]byte(seqKey)) 366 + if closer != nil { 367 + defer closer.Close() 368 + } 369 + if errors.Is(err, pebble.ErrNotFound) { 370 + return 0, false, nil 371 + } 372 + if err != nil { 373 + return 0, false, fmt.Errorf("pebble seq err, %w", err) 374 + } 375 + seq := int64(binary.BigEndian.Uint64(vbytes)) 376 + return seq, true, nil 377 + }
+107
cmd/collectiondir/pebble_test.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "encoding/csv" 6 + "log/slog" 7 + "strings" 8 + "testing" 9 + 10 + "github.com/cockroachdb/pebble" 11 + "github.com/cockroachdb/pebble/vfs" 12 + "github.com/stretchr/testify/assert" 13 + ) 14 + 15 + type debugWriter struct { 16 + t *testing.T 17 + } 18 + 19 + func (w *debugWriter) Write(p []byte) (n int, err error) { 20 + w.t.Helper() 21 + w.t.Log(string(p)) 22 + return len(p), nil 23 + } 24 + 25 + // make a new pebble that writes to memory and logs to test.Log 26 + func newMem(t *testing.T) *PebbleCollectionDirectory { 27 + memfs := vfs.NewMem() 28 + db, err := pebble.Open("wat", &pebble.Options{ 29 + FS: memfs, 30 + }) 31 + if err != nil { 32 + panic(err) 33 + } 34 + 35 + log := slog.New(slog.NewTextHandler(&debugWriter{t: t}, &slog.HandlerOptions{Level: slog.LevelDebug})) 36 + pcd := &PebbleCollectionDirectory{ 37 + db: db, 38 + collections: make(map[string]uint32), 39 + collectionNames: make(map[uint32]string), 40 + log: log, 41 + } 42 + if pcd.log == nil { 43 + pcd.log = slog.Default() 44 + } 45 + return pcd 46 + } 47 + 48 + // did, collection 49 + const testDataCsv = `alice,post 50 + alice,like 51 + bob,post 52 + bob,other 53 + carol,post 54 + eve,post 55 + eve,like 56 + eve,other` 57 + 58 + func TestPebbleCollectionDirectory(t *testing.T) { 59 + assert := assert.New(t) 60 + 61 + pcd := newMem(t) 62 + defer func() { 63 + err := pcd.Close() 64 + if err != nil { 65 + t.Error(err) 66 + } 67 + }() 68 + 69 + rows, err := csv.NewReader(strings.NewReader(testDataCsv)).ReadAll() 70 + assert.NoError(err) 71 + for _, row := range rows { 72 + err := pcd.MaybeSetCollection(row[0], row[1]) 73 + assert.NoError(err) 74 + } 75 + stats, err := pcd.GetCollectionStats() 76 + assert.NoError(err) 77 + t.Log(stats) 78 + assert.Equal(uint64(4), stats.CollectionCounts["post"]) 79 + assert.Equal(uint64(2), stats.CollectionCounts["like"]) 80 + assert.Equal(uint64(2), stats.CollectionCounts["other"]) 81 + 82 + t.Log(pcd.collections) 83 + 84 + wat, nextCursor, err := pcd.ReadCollection(context.Background(), "post", "", 1000) 85 + assert.NoError(err) 86 + assert.Equal("", nextCursor) 87 + for _, row := range wat { 88 + assert.Equal("post", row.Collection) 89 + } 90 + assert.Equal(4, len(wat)) 91 + 92 + wat, nextCursor, err = pcd.ReadCollection(context.Background(), "like", "", 1000) 93 + assert.NoError(err) 94 + assert.Equal("", nextCursor) 95 + for _, row := range wat { 96 + assert.Equal("like", row.Collection) 97 + } 98 + assert.Equal(2, len(wat)) 99 + 100 + wat, nextCursor, err = pcd.ReadCollection(context.Background(), "other", "", 1000) 101 + assert.NoError(err) 102 + assert.Equal("", nextCursor) 103 + for _, row := range wat { 104 + assert.Equal("other", row.Collection) 105 + } 106 + assert.Equal(2, len(wat)) 107 + }
+867
cmd/collectiondir/serve.go
··· 1 + package main 2 + 3 + import ( 4 + "compress/gzip" 5 + "context" 6 + "encoding/csv" 7 + "fmt" 8 + comatproto "github.com/bluesky-social/indigo/api/atproto" 9 + "github.com/bluesky-social/indigo/events" 10 + "github.com/bluesky-social/indigo/xrpc" 11 + "github.com/labstack/echo/v4" 12 + "github.com/labstack/echo/v4/middleware" 13 + "github.com/prometheus/client_golang/prometheus/promhttp" 14 + "github.com/urfave/cli/v2" 15 + "log/slog" 16 + "net" 17 + "net/http" 18 + "net/url" 19 + "os" 20 + "os/signal" 21 + "path/filepath" 22 + "sort" 23 + "strconv" 24 + "strings" 25 + "sync" 26 + "syscall" 27 + "time" 28 + ) 29 + 30 + var serveCmd = &cli.Command{ 31 + Name: "serve", 32 + Flags: []cli.Flag{ 33 + &cli.StringFlag{ 34 + Name: "api-listen", 35 + Value: ":2510", 36 + EnvVars: []string{"COLLECTIONS_API_LISTEN"}, 37 + }, 38 + &cli.StringFlag{ 39 + Name: "metrics-listen", 40 + Value: ":2511", 41 + EnvVars: []string{"COLLECTIONS_METRICS_LISTEN"}, 42 + }, 43 + &cli.StringFlag{ 44 + Name: "pebble", 45 + Usage: "path to store pebble db", 46 + Required: true, 47 + }, 48 + &cli.StringFlag{ 49 + Name: "dau-directory", 50 + Usage: "directory to store DAU pebble db", 51 + Required: true, 52 + }, 53 + &cli.StringFlag{ 54 + Name: "upstream", 55 + Usage: "URL, e.g. wss://bsky.network", 56 + EnvVars: []string{"COLLECTIONS_UPSTREAM"}, 57 + }, 58 + &cli.StringFlag{ 59 + Name: "admin-token", 60 + Usage: "admin authentication", 61 + EnvVars: []string{"COLLECTIONS_ADMIN_TOKEN"}, 62 + }, 63 + &cli.Float64Flag{ 64 + Name: "crawl-qps", 65 + Usage: "per-PDS crawl queries-per-second limit", 66 + Value: 100, 67 + }, 68 + &cli.StringFlag{ 69 + Name: "ratelimit-header", 70 + Usage: "secret for friend PDSes", 71 + EnvVars: []string{"BSKY_SOCIAL_RATE_LIMIT_SKIP", "RATE_LIMIT_HEADER"}, 72 + }, 73 + &cli.BoolFlag{ 74 + Name: "verbose", 75 + }, 76 + }, 77 + Action: func(cctx *cli.Context) error { 78 + var server collectionServer 79 + return server.run(cctx) 80 + }, 81 + } 82 + 83 + type collectionServer struct { 84 + ctx context.Context 85 + 86 + // the primary directory, all repos ever and their collections 87 + pcd *PebbleCollectionDirectory 88 + 89 + // daily-active-user directory, new directory every 00:00:00 UTC 90 + dauDirectory *PebbleCollectionDirectory 91 + dauDirectoryPath string // currently open dauDirectory, {dauDirectoryDir}/{YYYY}{mm}{dd}.pebble 92 + dauDay time.Time // YYYY-MM-DD 00:00:00 UTC 93 + dauTomorrow time.Time 94 + dauDirectoryDir string 95 + 96 + statsCache *CollectionStats 97 + statsCacheWhen time.Time 98 + statsCacheLock sync.Mutex 99 + statsCacheFresh sync.Cond 100 + statsCachePending bool 101 + 102 + // (did,collection) pairs from firehose 103 + ingestFirehose chan DidCollection 104 + // (did,collection) pairs from PDS crawl (don't apply to dauDirectory) 105 + ingestCrawl chan DidCollection 106 + 107 + log *slog.Logger 108 + 109 + AdminToken string 110 + ExepctedAuthHeader string 111 + PerPDSCrawlQPS float64 112 + 113 + activeCrawlHosts map[string]time.Time 114 + activeCrawlsLock sync.Mutex 115 + 116 + shutdown chan struct{} 117 + 118 + wg sync.WaitGroup 119 + 120 + ratelimitHeader string 121 + 122 + apiServer *http.Server 123 + //esrv *echo.Echo 124 + metricsServer *http.Server 125 + } 126 + 127 + const defaultPerPDSCrawlQPS = 100 128 + 129 + func (cs *collectionServer) run(cctx *cli.Context) error { 130 + signals := make(chan os.Signal, 1) 131 + signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM) 132 + cs.shutdown = make(chan struct{}) 133 + level := slog.LevelInfo 134 + if cctx.Bool("verbose") { 135 + level = slog.LevelDebug 136 + } 137 + if cctx.IsSet("ratelimit-header") { 138 + cs.ratelimitHeader = cctx.String("ratelimit-header") 139 + } 140 + log := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: level})) 141 + cs.ingestFirehose = make(chan DidCollection, 1000) 142 + cs.ingestCrawl = make(chan DidCollection, 1000) 143 + cs.wg.Add(1) 144 + go cs.ingestReceiver() 145 + cs.log = log 146 + cs.ctx = cctx.Context 147 + cs.AdminToken = cctx.String("admin-token") 148 + cs.ExepctedAuthHeader = "Bearer " + cs.AdminToken 149 + pebblePath := cctx.String("pebble") 150 + cs.pcd = &PebbleCollectionDirectory{ 151 + log: cs.log, 152 + } 153 + err := cs.pcd.Open(pebblePath) 154 + if err != nil { 155 + return fmt.Errorf("%s: failed to open pebble db: %w", pebblePath, err) 156 + } 157 + cs.dauDirectoryDir = cctx.String("dau-directory") 158 + if cs.dauDirectoryDir != "" { 159 + err := cs.openDau() 160 + if err != nil { 161 + return err 162 + } 163 + } 164 + cs.statsCacheFresh.L = &cs.statsCacheLock 165 + errchan := make(chan error, 3) 166 + apiAddr := cctx.String("api-listen") 167 + cs.wg.Add(1) 168 + go func() { 169 + errchan <- cs.StartApiServer(cctx.Context, apiAddr) 170 + }() 171 + metricsAddr := cctx.String("metrics-listen") 172 + cs.wg.Add(1) 173 + go func() { 174 + errchan <- cs.StartMetricsServer(cctx.Context, metricsAddr) 175 + }() 176 + 177 + upstream := cctx.String("upstream") 178 + if upstream != "" { 179 + fh := Firehose{ 180 + Log: log, 181 + Host: upstream, 182 + Seq: -1, 183 + } 184 + seq, seqok, err := cs.pcd.GetSequence() 185 + if err != nil { 186 + cs.log.Warn("db get seq", "err", err) 187 + } else if seqok { 188 + fh.Seq = seq 189 + } 190 + fhevents := make(chan *events.XRPCStreamEvent, 1000) 191 + cs.wg.Add(1) 192 + go cs.firehoseThread(&fh, fhevents) 193 + cs.wg.Add(1) 194 + go cs.handleFirehose(fhevents) 195 + } 196 + 197 + select { 198 + case <-signals: 199 + log.Info("received shutdown signal") 200 + go errchanlog(cs.log, "server error", errchan) 201 + return cs.Shutdown() 202 + case err := <-errchan: 203 + if err != nil { 204 + log.Error("server error", "err", err) 205 + go errchanlog(cs.log, "server error", errchan) 206 + return cs.Shutdown() 207 + } 208 + } 209 + return nil 210 + } 211 + 212 + func (cs *collectionServer) openDau() error { 213 + now := time.Now().UTC() 214 + ymd := now.Format("2006-01-02") 215 + fname := fmt.Sprintf("d%s.pebble", ymd) 216 + fpath := filepath.Join(cs.dauDirectoryDir, fname) 217 + daud := &PebbleCollectionDirectory{ 218 + log: cs.log, 219 + } 220 + err := daud.Open(fpath) 221 + if err != nil { 222 + return fmt.Errorf("%s: failed to open dau pebble db: %w", fpath, err) 223 + } 224 + cs.dauDirectory = daud 225 + cs.dauDirectoryPath = fpath 226 + cs.dauDay = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, time.UTC) 227 + cs.dauTomorrow = now.AddDate(0, 0, 1) 228 + return nil 229 + } 230 + 231 + func errchanlog(log *slog.Logger, msg string, errchan <-chan error) { 232 + for err := range errchan { 233 + log.Error(msg, "err", err) 234 + } 235 + } 236 + 237 + func (cs *collectionServer) Shutdown() error { 238 + close(cs.shutdown) 239 + go func() { 240 + cs.log.Info("metrics shutdown start") 241 + cs.metricsServer.Shutdown(context.Background()) 242 + cs.log.Info("metrics shutdown") 243 + }() 244 + cs.log.Info("api shutdown start...") 245 + err := cs.apiServer.Shutdown(context.Background()) 246 + //err := cs.esrv.Shutdown(context.Background()) 247 + cs.log.Info("api shutdown, thread wait...", "err", err) 248 + cs.wg.Wait() 249 + cs.log.Info("threads done, db close...") 250 + ee := cs.pcd.Close() 251 + if ee != nil { 252 + cs.log.Error("failed to shutdown pebble", "err", ee) 253 + } 254 + cs.log.Info("db done. done.") 255 + return err 256 + } 257 + 258 + // firehoseThreads is responsible for connecting to upstream firehose source 259 + func (cs *collectionServer) firehoseThread(fh *Firehose, fhevents chan<- *events.XRPCStreamEvent) { 260 + defer cs.wg.Done() 261 + defer cs.log.Info("firehoseThread exit") 262 + ctx, cancel := context.WithCancel(cs.ctx) 263 + go func() { 264 + <-cs.shutdown 265 + cancel() 266 + }() 267 + err := fh.subscribeWithRedialer(ctx, fhevents) 268 + if err != nil { 269 + cs.log.Error("failed to subscribe to redialer", "err", err) 270 + } 271 + if fh.Seq >= 0 { 272 + err := cs.pcd.SetSequence(fh.Seq) 273 + if err != nil { 274 + cs.log.Warn("db set seq", "err", err) 275 + } 276 + } 277 + } 278 + 279 + // handleFirehose consumes XRPCStreamEvent from firehoseThread(), further parses data and applies 280 + func (cs *collectionServer) handleFirehose(fhevents <-chan *events.XRPCStreamEvent) { 281 + defer cs.wg.Done() 282 + defer cs.log.Info("handleFirehose exit") 283 + defer close(cs.ingestFirehose) 284 + var lastSeq int64 285 + lastSeqSet := false 286 + notDone := true 287 + for notDone { 288 + select { 289 + case <-cs.shutdown: 290 + cs.log.Info("firehose handler shutdown") 291 + notDone = false 292 + case evt, ok := <-fhevents: 293 + if !ok { 294 + notDone = false 295 + cs.log.Info("firehose handler closed") 296 + break 297 + } 298 + firehoseReceivedCounter.Inc() 299 + seq, ok := evt.GetSequence() 300 + if ok { 301 + lastSeq = seq 302 + lastSeqSet = true 303 + } 304 + if evt.RepoCommit != nil { 305 + firehoseCommits.Inc() 306 + cs.handleCommit(evt.RepoCommit) 307 + } 308 + } 309 + } 310 + if lastSeqSet { 311 + cs.pcd.SetSequence(lastSeq) 312 + } 313 + } 314 + 315 + func (cs *collectionServer) handleCommit(commit *comatproto.SyncSubscribeRepos_Commit) { 316 + for _, op := range commit.Ops { 317 + // op.Path is collection/rkey 318 + slash := strings.IndexRune(op.Path, '/') 319 + if slash == -1 { 320 + cs.log.Warn("bad op path", "repo", commit.Repo) 321 + return 322 + } 323 + collection := op.Path[:slash] 324 + firehoseCommitOps.WithLabelValues(op.Action).Inc() 325 + if op.Action == "create" || op.Action == "update" { 326 + firehoseDidcSet.Inc() 327 + cs.ingestFirehose <- DidCollection{ 328 + Did: commit.Repo, 329 + Collection: collection, 330 + } 331 + } 332 + } 333 + } 334 + 335 + func (cs *collectionServer) StartMetricsServer(ctx context.Context, addr string) error { 336 + defer cs.wg.Done() 337 + defer cs.log.Info("metrics server exit") 338 + cs.metricsServer = &http.Server{ 339 + Addr: addr, 340 + Handler: promhttp.Handler(), 341 + } 342 + return cs.metricsServer.ListenAndServe() 343 + } 344 + 345 + func (cs *collectionServer) StartApiServer(ctx context.Context, addr string) error { 346 + defer cs.wg.Done() 347 + defer cs.log.Info("api server exit") 348 + var lc net.ListenConfig 349 + li, err := lc.Listen(ctx, "tcp", addr) 350 + if err != nil { 351 + return err 352 + } 353 + e := echo.New() 354 + e.HideBanner = true 355 + 356 + e.Use(MetricsMiddleware) 357 + e.Use(middleware.CORSWithConfig(middleware.CORSConfig{ 358 + AllowOrigins: []string{"*"}, 359 + AllowHeaders: []string{echo.HeaderOrigin, echo.HeaderContentType, echo.HeaderAccept, echo.HeaderAuthorization}, 360 + })) 361 + 362 + e.GET("/_health", cs.healthz) 363 + 364 + e.GET("/v1/getDidsForCollection", cs.getDidsForCollection) 365 + e.GET("/v1/listCollections", cs.listCollections) 366 + 367 + // TODO: allow public 'requestCrawl' API? 368 + //e.GET("/xrpc/com.atproto.sync.requestCrawl", cs.crawlPds) 369 + //e.POST("/xrpc/com.atproto.sync.requestCrawl", cs.crawlPds) 370 + 371 + // admin auth heador required 372 + e.POST("/admin/pds/requestCrawl", cs.crawlPds) // same as relay 373 + e.GET("/v1/crawlStatus", cs.crawlStatus) 374 + 375 + e.Listener = li 376 + srv := &http.Server{ 377 + Handler: e, 378 + } 379 + cs.apiServer = srv 380 + return srv.Serve(li) 381 + } 382 + 383 + const statsCacheDuration = time.Second * 300 384 + 385 + type GetDidsForCollectionResponse struct { 386 + Dids []string `json:"dids"` 387 + Cursor string `json:"cursor"` 388 + } 389 + 390 + func getLimit(c echo.Context, min, defaultLim, max int) int { 391 + limstr := c.QueryParam("limit") 392 + if limstr == "" { 393 + return defaultLim 394 + } 395 + lvx, err := strconv.ParseInt(limstr, 10, 64) 396 + if err != nil { 397 + return defaultLim 398 + } 399 + lv := int(lvx) 400 + if lv < min { 401 + return min 402 + } 403 + if lv > max { 404 + return max 405 + } 406 + return lv 407 + } 408 + 409 + // /v1/getDidsForCollection?collection={}&cursor={} 410 + // 411 + // returns 412 + // {"dids":["did:A", "..."], "cursor":"opaque text"} 413 + func (cs *collectionServer) getDidsForCollection(c echo.Context) error { 414 + ctx := c.Request().Context() 415 + collection := c.QueryParam("collection") 416 + cursor := c.QueryParam("cursor") 417 + limit := getLimit(c, 50, 500, 1000) 418 + they, nextCursor, err := cs.pcd.ReadCollection(ctx, collection, cursor, limit) 419 + if err != nil { 420 + slog.Error("ReadCollection", "collection", collection, "cursor", cursor, "limit", limit, "err", err) 421 + return c.String(http.StatusInternalServerError, "oops") 422 + } 423 + cs.log.Info("getDidsForCollection", "collection", collection, "cursor", cursor, "limit", limit, "count", len(they), "nextCursor", nextCursor) 424 + var out GetDidsForCollectionResponse 425 + out.Dids = make([]string, len(they)) 426 + for i, rec := range they { 427 + out.Dids[i] = rec.Did 428 + } 429 + out.Cursor = nextCursor 430 + return c.JSON(http.StatusOK, out) 431 + } 432 + 433 + // return cached collection stats if they're fresh 434 + // return new collection stats if they can be calculated quicly 435 + // return stale cached collection stats if new stats take too long 436 + // just wait for fresh stats if there are no cached stats 437 + // stalenessAllowed is how old stats can be before we try to recalculate them, 0=default of 5 minutes 438 + func (cs *collectionServer) getStatsCache(stalenessAllowed time.Duration) (*CollectionStats, error) { 439 + if stalenessAllowed <= 0 { 440 + stalenessAllowed = statsCacheDuration 441 + } 442 + var statsCache *CollectionStats 443 + var staleCache *CollectionStats 444 + var waiter *freshStatsWaiter 445 + cs.statsCacheLock.Lock() 446 + if cs.statsCache != nil { 447 + if time.Since(cs.statsCacheWhen) < stalenessAllowed { 448 + // has fresh! 449 + statsCache = cs.statsCache 450 + } else if !cs.statsCachePending { 451 + cs.statsCachePending = true 452 + go cs.statsBuilder() 453 + staleCache = cs.statsCache 454 + } else { 455 + staleCache = cs.statsCache 456 + } 457 + if staleCache != nil { 458 + waiter = &freshStatsWaiter{ 459 + cs: cs, 460 + freshCache: make(chan *CollectionStats), 461 + } 462 + go waiter.waiter() 463 + } 464 + } else if !cs.statsCachePending { 465 + cs.statsCachePending = true 466 + go cs.statsBuilder() 467 + } 468 + cs.statsCacheLock.Unlock() 469 + 470 + if statsCache != nil { 471 + // return fresh-enough data 472 + return statsCache, nil 473 + } 474 + 475 + if staleCache == nil { 476 + // block forever waiting for fresh data 477 + cs.statsCacheLock.Lock() 478 + for cs.statsCache == nil { 479 + cs.statsCacheFresh.Wait() 480 + } 481 + statsCache = cs.statsCache 482 + cs.statsCacheLock.Unlock() 483 + return statsCache, nil 484 + } 485 + 486 + // wait for up to a second for fresh data, on timeout return stale data 487 + timeout := time.NewTimer(time.Second) 488 + defer timeout.Stop() 489 + select { 490 + case <-timeout.C: 491 + cs.statsCacheLock.Lock() 492 + waiter.l.Lock() 493 + waiter.obsolete = true 494 + waiter.l.Unlock() 495 + cs.statsCacheLock.Unlock() 496 + return staleCache, nil 497 + case statsCache = <-waiter.freshCache: 498 + return statsCache, nil 499 + } 500 + } 501 + 502 + type freshStatsWaiter struct { 503 + cs *collectionServer 504 + l sync.Mutex 505 + obsolete bool 506 + freshCache chan *CollectionStats 507 + } 508 + 509 + func (fsw *freshStatsWaiter) waiter() { 510 + fsw.cs.statsCacheLock.Lock() 511 + defer fsw.cs.statsCacheLock.Unlock() 512 + fsw.cs.statsCacheFresh.Wait() 513 + fsw.l.Lock() 514 + defer fsw.l.Unlock() 515 + if fsw.obsolete { 516 + close(fsw.freshCache) 517 + } else { 518 + fsw.freshCache <- fsw.cs.statsCache 519 + } 520 + } 521 + 522 + func (cs *collectionServer) statsBuilder() { 523 + for { 524 + start := time.Now() 525 + stats, err := cs.pcd.GetCollectionStats() 526 + dt := time.Since(start) 527 + if err == nil { 528 + countsum := uint64(0) 529 + for _, v := range stats.CollectionCounts { 530 + countsum += v 531 + } 532 + cs.log.Info("stats built", "dt", dt, "total", countsum) 533 + cs.statsCacheLock.Lock() 534 + cs.statsCache = &stats 535 + cs.statsCacheWhen = time.Now() 536 + cs.statsCacheFresh.Broadcast() 537 + cs.statsCachePending = false 538 + cs.statsCacheLock.Unlock() 539 + return 540 + } else { 541 + cs.log.Error("GetCollectionStats", "dt", dt, "err", err) 542 + time.Sleep(2 * time.Second) 543 + } 544 + } 545 + } 546 + 547 + // /v1/listCollections?c={}&cursor={}&limit={50<=limit<=1000} 548 + // 549 + // admin may set ?stalesec={} for a maximum number of seconds stale data is accepted 550 + // 551 + // returns 552 + // {"collections":{"app.bsky.feed.post": 123456789, "some collection": 42}, "cursor":"opaque text"} 553 + func (cs *collectionServer) listCollections(c echo.Context) error { 554 + stalenessAllowed := statsCacheDuration 555 + stalesecStr := c.QueryParam("stalesec") 556 + if stalesecStr != "" && cs.isAdmin(c) { 557 + stalesec, err := strconv.ParseInt(stalesecStr, 10, 64) 558 + if err != nil { 559 + return c.String(http.StatusBadRequest, "bad stalesec") 560 + } 561 + if stalesec == 0 { 562 + stalenessAllowed = 1 563 + } else { 564 + stalenessAllowed = time.Duration(stalesec) * time.Second 565 + } 566 + cs.log.Info("stalesec", "q", stalesecStr, "d", stalenessAllowed) 567 + } 568 + stats, err := cs.getStatsCache(stalenessAllowed) 569 + if err != nil { 570 + slog.Error("getStatsCache", "err", err) 571 + return c.String(http.StatusInternalServerError, "oops") 572 + } 573 + cursor := c.QueryParam("cursor") 574 + collections, hasQueryCollections := c.QueryParams()["c"] 575 + limit := getLimit(c, 50, 500, 1000) 576 + var out ListCollectionsResponse 577 + if hasQueryCollections { 578 + out.Collections = make(map[string]uint64, len(collections)) 579 + for _, collection := range collections { 580 + count, ok := stats.CollectionCounts[collection] 581 + if ok { 582 + out.Collections[collection] = count 583 + } 584 + } 585 + } else { 586 + allCollections := make([]string, 0, len(stats.CollectionCounts)) 587 + for collection := range stats.CollectionCounts { 588 + allCollections = append(allCollections, collection) 589 + } 590 + sort.Strings(allCollections) 591 + out.Collections = make(map[string]uint64, limit) 592 + count := 0 593 + for _, collection := range allCollections { 594 + if (cursor == "") || (collection > cursor) { 595 + out.Collections[collection] = stats.CollectionCounts[collection] 596 + count++ 597 + if count >= limit { 598 + out.Cursor = collection 599 + } 600 + } 601 + } 602 + } 603 + return c.JSON(http.StatusOK, out) 604 + } 605 + 606 + type ListCollectionsResponse struct { 607 + Collections map[string]uint64 `json:"collections"` 608 + Cursor string `json:"cursor"` 609 + } 610 + 611 + func (cs *collectionServer) ingestReceiver() { 612 + defer cs.wg.Done() 613 + defer cs.log.Info("ingestReceiver exit") 614 + errcount := 0 615 + for { 616 + select { 617 + case didc, ok := <-cs.ingestFirehose: 618 + if !ok { 619 + cs.log.Info("ingestFirehose closed") 620 + return 621 + } 622 + err := cs.pcd.MaybeSetCollection(didc.Did, didc.Collection) 623 + if err != nil { 624 + cs.log.Warn("pcd write", "err", err) 625 + errcount++ 626 + } else { 627 + errcount = 0 628 + } 629 + if cs.dauDirectory != nil { 630 + err = cs.maybeDauWrite(didc) 631 + if err != nil { 632 + cs.log.Warn("dau write", "err", err) 633 + errcount++ 634 + } else { 635 + errcount = 0 636 + } 637 + } 638 + case didc := <-cs.ingestCrawl: 639 + err := cs.pcd.MaybeSetCollection(didc.Did, didc.Collection) 640 + if err != nil { 641 + cs.log.Warn("pcd write", "err", err) 642 + errcount++ 643 + } else { 644 + errcount = 0 645 + } 646 + } 647 + if errcount > 10 { 648 + cs.log.Error("ingestReceiver too many errors") 649 + return // TODO: cancel parent somehow 650 + } 651 + } 652 + } 653 + 654 + // write {dauDirectoryDir}/d{YYYY-MM-DD}.pebble stats summary to {dauDirectoryDir}/d{YYYY-MM-DD}.csv.gz 655 + func dauStats(oldDau *PebbleCollectionDirectory, dauDay time.Time, dauDir string, log *slog.Logger) { 656 + stats, err := oldDau.GetCollectionStats() 657 + e2 := oldDau.Close() 658 + if e2 != nil { 659 + log.Error("old dau close", "err", e2) 660 + } 661 + if err != nil { 662 + log.Error("old dau stats", "err", err) 663 + } else { 664 + fname := fmt.Sprintf("d%s.csv.gz", dauDay.Format("2006-01-02")) 665 + outstatsPath := filepath.Join(dauDir, fname) 666 + pcdStatsToCsvGz(stats, outstatsPath, log) 667 + } 668 + } 669 + 670 + func pcdStatsToCsvGz(stats CollectionStats, outpath string, log *slog.Logger) { 671 + fout, err := os.Create(outpath) 672 + if err != nil { 673 + log.Error("dau stats open", "err", err) 674 + return 675 + } 676 + defer fout.Close() 677 + gzout := gzip.NewWriter(fout) 678 + csvout := csv.NewWriter(gzout) 679 + defer csvout.Flush() 680 + defer gzout.Close() 681 + err = csvout.Write([]string{"collection", "count"}) 682 + if err != nil { 683 + log.Error("dau stats header", "err", err) 684 + return 685 + } 686 + var row [2]string 687 + for collection, count := range stats.CollectionCounts { 688 + row[0] = collection 689 + row[1] = strconv.FormatUint(count, 10) 690 + err = csvout.Write(row[:]) 691 + if err != nil { 692 + log.Error("dau stats row", "err", err) 693 + return 694 + } 695 + } 696 + } 697 + 698 + func (cs *collectionServer) maybeDauWrite(didc DidCollection) error { 699 + now := time.Now() 700 + if now.After(cs.dauTomorrow) { 701 + go dauStats(cs.dauDirectory, cs.dauDay, cs.dauDirectoryDir, cs.log) 702 + cs.dauDirectory = nil 703 + err := cs.openDau() 704 + if err != nil { 705 + return fmt.Errorf("dau reopen, %w", err) 706 + } 707 + } 708 + return cs.dauDirectory.MaybeSetCollection(didc.Did, didc.Collection) 709 + } 710 + 711 + type CrawlRequest struct { 712 + Host string `json:"hostname,omitempty"` 713 + Hosts []string `json:"hosts,omitempty"` 714 + } 715 + 716 + type CrawlRequestResponse struct { 717 + Message string `json:"message,omitempty"` 718 + Error string `json:"error,omitempty"` 719 + } 720 + 721 + func hostOrUrlToUrl(host string) string { 722 + xu, err := url.Parse(host) 723 + if err != nil { 724 + xu = new(url.URL) 725 + xu.Host = host 726 + xu.Scheme = "https" 727 + return xu.String() 728 + } else if xu.Scheme == "" { 729 + xu.Scheme = "https" 730 + return xu.String() 731 + } 732 + return host 733 + } 734 + 735 + func (cs *collectionServer) isAdmin(c echo.Context) bool { 736 + authHeader := c.Request().Header.Get("Authorization") 737 + if authHeader == "" { 738 + return false 739 + } 740 + if authHeader == cs.ExepctedAuthHeader { 741 + return true 742 + } 743 + cs.log.Info("wrong auth header", "header", authHeader, "expected", cs.ExepctedAuthHeader) 744 + return false 745 + } 746 + 747 + // /v1/crawlRequest 748 + // requires header `Authorization: Bearer {admin token}` 749 + // 750 + // POST {"host":"one hostname or URL", "hosts":["up to 1000 hosts", "..."]} 751 + // OR 752 + // POST /v1/crawlRequest?host={one host} 753 + func (cs *collectionServer) crawlPds(c echo.Context) error { 754 + isAdmin := cs.isAdmin(c) 755 + if !isAdmin { 756 + return c.JSON(http.StatusForbidden, CrawlRequestResponse{Error: "nope"}) 757 + } 758 + hostQ := c.QueryParam("host") 759 + if hostQ != "" { 760 + go cs.crawlThread(hostQ) 761 + return c.JSON(http.StatusOK, CrawlRequestResponse{Message: "ok"}) 762 + } 763 + 764 + var req CrawlRequest 765 + err := c.Bind(&req) 766 + if err != nil { 767 + return c.String(http.StatusBadRequest, err.Error()) 768 + } 769 + if req.Host != "" { 770 + go cs.crawlThread(req.Host) 771 + } 772 + for _, host := range req.Hosts { 773 + go cs.crawlThread(host) 774 + } 775 + return c.JSON(http.StatusOK, CrawlRequestResponse{Message: "ok"}) 776 + } 777 + 778 + func (cs *collectionServer) crawlThread(hostIn string) { 779 + host := hostOrUrlToUrl(hostIn) 780 + if host != hostIn { 781 + cs.log.Info("going to crawl", "in", hostIn, "as", host) 782 + } 783 + httpClient := http.Client{} 784 + rpcClient := xrpc.Client{ 785 + Host: host, 786 + Client: &httpClient, 787 + } 788 + if cs.ratelimitHeader != "" { 789 + rpcClient.Headers = map[string]string{ 790 + "x-ratelimit-bypass": cs.ratelimitHeader, 791 + } 792 + } 793 + crawler := Crawler{ 794 + Ctx: cs.ctx, 795 + RpcClient: &rpcClient, 796 + QPS: cs.PerPDSCrawlQPS, 797 + Results: cs.ingestCrawl, 798 + Log: cs.log, 799 + } 800 + start := time.Now() 801 + ok := cs.recordCrawlStart(host, start) 802 + if !ok { 803 + cs.log.Info("not crawling dup", "host", host) 804 + } 805 + cs.log.Info("crawling", "host", host) 806 + err := crawler.CrawlPDSRepoCollections() 807 + cs.clearActiveCrawl(host) 808 + pdsCrawledCounter.Inc() 809 + if err != nil { 810 + cs.log.Warn("crawl err", "host", host, "err", err) 811 + } else { 812 + dt := time.Since(start) 813 + cs.log.Info("crawl done", "host", host, "dt", dt) 814 + } 815 + } 816 + 817 + // recordCrawlStart returns true if ok, false if duplicate 818 + func (cs *collectionServer) recordCrawlStart(host string, start time.Time) (ok bool) { 819 + cs.activeCrawlsLock.Lock() 820 + defer cs.activeCrawlsLock.Unlock() 821 + if cs.activeCrawlHosts == nil { 822 + cs.activeCrawlHosts = make(map[string]time.Time) 823 + cs.activeCrawlHosts[host] = start 824 + return true 825 + } else { 826 + _, dup := cs.activeCrawlHosts[host] 827 + if dup { 828 + return false 829 + } 830 + cs.activeCrawlHosts[host] = start 831 + return true 832 + } 833 + } 834 + 835 + func (cs *collectionServer) clearActiveCrawl(host string) { 836 + cs.activeCrawlsLock.Lock() 837 + defer cs.activeCrawlsLock.Unlock() 838 + if cs.activeCrawlHosts == nil { 839 + return 840 + } 841 + delete(cs.activeCrawlHosts, host) 842 + } 843 + 844 + type CrawlStatusResponse struct { 845 + HostStarts map[string]string `json:"host_starts"` 846 + } 847 + 848 + // GET /v1/crawlStatus 849 + func (cs *collectionServer) crawlStatus(c echo.Context) error { 850 + authHeader := c.Request().Header.Get("Authorization") 851 + if authHeader != cs.ExepctedAuthHeader { 852 + return c.JSON(http.StatusForbidden, CrawlRequestResponse{Error: "nope"}) 853 + } 854 + var out CrawlStatusResponse 855 + out.HostStarts = make(map[string]string) 856 + cs.activeCrawlsLock.Lock() 857 + defer cs.activeCrawlsLock.Unlock() 858 + for host, start := range cs.activeCrawlHosts { 859 + out.HostStarts[host] = start.UTC().Format(time.RFC3339) 860 + } 861 + return c.JSON(http.StatusOK, out) 862 + } 863 + 864 + func (cs *collectionServer) healthz(c echo.Context) error { 865 + // TODO: check database or upstream health? 866 + return c.String(http.StatusOK, "ok") 867 + }
+25
events/events.go
··· 455 455 } 456 456 } 457 457 458 + func (evt *XRPCStreamEvent) GetSequence() (int64, bool) { 459 + switch { 460 + case evt == nil: 461 + return -1, false 462 + case evt.RepoCommit != nil: 463 + return evt.RepoCommit.Seq, true 464 + case evt.RepoHandle != nil: 465 + return evt.RepoHandle.Seq, true 466 + case evt.RepoMigrate != nil: 467 + return evt.RepoMigrate.Seq, true 468 + case evt.RepoTombstone != nil: 469 + return evt.RepoTombstone.Seq, true 470 + case evt.RepoIdentity != nil: 471 + return evt.RepoIdentity.Seq, true 472 + case evt.RepoAccount != nil: 473 + return evt.RepoAccount.Seq, true 474 + case evt.RepoInfo != nil: 475 + return -1, false 476 + case evt.Error != nil: 477 + return -1, false 478 + default: 479 + return -1, false 480 + } 481 + } 482 + 458 483 func (em *EventManager) rmSubscriber(sub *Subscriber) { 459 484 em.subsLk.Lock() 460 485 defer em.subsLk.Unlock()