this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

add collection filtering

dholms 379225b1 092f704f

+62 -13
+16 -12
nexus/nexus.go
··· 21 21 echo *echo.Echo 22 22 logger *slog.Logger 23 23 24 - Dir identity.Directory 24 + Dir identity.Directory 25 + RelayHost string 25 26 26 27 outbox *Outbox 27 28 28 29 FirehoseConsumer *FirehoseConsumer 29 30 EventProcessor *EventProcessor 30 31 31 - FullNetworkMode bool 32 - RelayHost string 32 + FullNetworkMode bool 33 + CollectionFilters []string 33 34 34 35 claimJobMu sync.Mutex 35 36 } ··· 42 43 FullNetworkMode bool 43 44 DisableAcks bool 44 45 WebhookURL string 46 + CollectionFilters []string // e.g., ["app.bsky.feed.post", "app.bsky.graph.*"] 45 47 } 46 48 47 49 func NewNexus(config NexusConfig) (*Nexus, error) { ··· 79 81 echo: e, 80 82 logger: slog.Default().With("system", "nexus"), 81 83 82 - Dir: &cdir, 84 + Dir: &cdir, 85 + RelayHost: config.RelayHost, 83 86 84 87 outbox: NewOutbox(db, outboxMode, config.WebhookURL), 85 88 86 - FullNetworkMode: config.FullNetworkMode, 87 - RelayHost: config.RelayHost, 89 + FullNetworkMode: config.FullNetworkMode, 90 + CollectionFilters: config.CollectionFilters, 88 91 } 89 92 90 93 parallelism := config.FirehoseParallelism ··· 98 101 } 99 102 100 103 n.EventProcessor = &EventProcessor{ 101 - Logger: n.logger.With("component", "processor"), 102 - DB: db, 103 - Dir: n.Dir, 104 - RelayHost: config.RelayHost, 105 - Outbox: n.outbox, 106 - FullNetworkMode: config.FullNetworkMode, 104 + Logger: n.logger.With("component", "processor"), 105 + DB: db, 106 + Dir: n.Dir, 107 + RelayHost: config.RelayHost, 108 + Outbox: n.outbox, 109 + FullNetworkMode: config.FullNetworkMode, 110 + CollectionFilters: config.CollectionFilters, 107 111 } 108 112 109 113 rsc := &events.RepoStreamCallbacks{
+14 -1
nexus/processor.go
··· 24 24 RelayHost string 25 25 Outbox *Outbox 26 26 27 - FullNetworkMode bool 27 + FullNetworkMode bool 28 + CollectionFilters []string 28 29 29 30 lastSeq atomic.Int64 30 31 } ··· 73 74 ep.Logger.Error("failed to parse operations", "did", evt.Repo, "error", err) 74 75 return err 75 76 } 77 + 78 + // filter ops to only matching collections after validation (since all ops are necessary for commit validation) 79 + filteredOps := []CommitOp{} 80 + for _, op := range commit.Ops { 81 + if matchesCollection(op.Collection, ep.CollectionFilters) { 82 + filteredOps = append(filteredOps, op) 83 + } 84 + } 85 + if len(filteredOps) == 0 { 86 + return nil 87 + } 88 + commit.Ops = filteredOps 76 89 77 90 if curr.State == models.RepoStateResyncing { 78 91 if err := ep.addToResyncBuffer(commit); err != nil {
+5
nexus/resync.go
··· 155 155 rkeyStr := rkey.String() 156 156 cidStr := recCid.String() 157 157 158 + // Filter collections - only process if matches filters 159 + if !matchesCollection(collStr, n.EventProcessor.CollectionFilters) { 160 + return nil 161 + } 162 + 158 163 existingCid, exists := existingCids[recPath] 159 164 if exists && existingCid == cidStr { 160 165 return nil
+27
nexus/util.go
··· 1 + package main 2 + 3 + import "strings" 4 + 5 + // matchesCollection checks if a collection matches any of the provided filters. 6 + // Filters support wildcards at the end (e.g., "app.bsky.*" & "app.bsky.feed.*" both match "app.bsky.feed.post"). 7 + // If no filters are provided, all collections match. 8 + func matchesCollection(collection string, filters []string) bool { 9 + if len(filters) == 0 { 10 + return true 11 + } 12 + 13 + for _, filter := range filters { 14 + if strings.HasSuffix(filter, "*") { 15 + prefix := strings.TrimSuffix(filter, "*") 16 + if strings.HasPrefix(collection, prefix) { 17 + return true 18 + } 19 + } else { 20 + if collection == filter { 21 + return true 22 + } 23 + } 24 + } 25 + 26 + return false 27 + }