Coffee journaling on ATProto (alpha) alpha.arabica.social
coffee
17
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: profile jetstream watcher

authored by

Patrick Dewey and committed by tangled.org 1797af5d 48d72e21

+279 -36
+6
cmd/server/main.go
··· 218 218 firehoseConsumer := firehose.NewConsumer(firehoseConfig, feedIndex) 219 219 firehoseConsumer.Start(ctx) 220 220 221 + // Create and start profile watcher (separate Jetstream connection filtered 222 + // to app.bsky.actor.profile events for known Arabica users only) 223 + profileWatcher := firehose.NewProfileWatcher(firehoseConfig, feedIndex) 224 + profileWatcher.Start(ctx) 225 + 221 226 // Wire up the feed service to use the firehose index 222 227 adapter := firehose.NewFeedIndexAdapter(feedIndex) 223 228 feedService.SetFirehoseIndex(adapter) ··· 327 332 // This ensures users are added to the feed even if they had an existing session 328 333 oauthManager.SetOnAuthSuccess(func(did string) { 329 334 feedRegistry.Register(did) 335 + profileWatcher.Watch(did) 330 336 // Backfill the user's records (BackfillUser creates its own span 331 337 // only when there is actual work to do, avoiding empty traces for 332 338 // already-backfilled users)
+4
internal/firehose/config.go
··· 14 14 "wss://jetstream2.us-west.bsky.network/subscribe", 15 15 } 16 16 17 + // NSIDBlueskyProfile is the AT Protocol collection for user profile records. 18 + // Watched by ProfileWatcher (separate connection) for known Arabica users only. 19 + const NSIDBlueskyProfile = "app.bsky.actor.profile" 20 + 17 21 // ArabicaCollections lists all Arabica lexicon collections to filter for 18 22 var ArabicaCollections = []string{ 19 23 atproto.NSIDBrew,
+10
internal/firehose/index.go
··· 1069 1069 return profile, nil 1070 1070 } 1071 1071 1072 + // InvalidateProfile removes a DID's profile from both the in-memory and persistent 1073 + // caches. The next GetProfile call will re-fetch from the API. 1074 + func (idx *FeedIndex) InvalidateProfile(did string) { 1075 + idx.profileCacheMu.Lock() 1076 + delete(idx.profileCache, did) 1077 + idx.profileCacheMu.Unlock() 1078 + 1079 + _, _ = idx.db.Exec(`DELETE FROM profiles WHERE did = ?`, did) 1080 + } 1081 + 1072 1082 // GetKnownDIDs returns all DIDs that have created Arabica records 1073 1083 func (idx *FeedIndex) GetKnownDIDs(ctx context.Context) ([]string, error) { 1074 1084 rows, err := idx.db.QueryContext(ctx, `SELECT did FROM known_dids`)
+251
internal/firehose/profile_watcher.go
··· 1 + package firehose 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "fmt" 7 + "net/url" 8 + "sync" 9 + "time" 10 + 11 + "github.com/gorilla/websocket" 12 + "github.com/rs/zerolog/log" 13 + ) 14 + 15 + // ProfileWatcher is a dedicated Jetstream connection that subscribes to 16 + // app.bsky.actor.profile events for known Arabica users only. Because Jetstream 17 + // uses AND semantics when both wantedCollections and wantedDids are set, this 18 + // must be a separate connection from the main consumer (which has no DID filter). 19 + type ProfileWatcher struct { 20 + index *FeedIndex 21 + endpoints []string 22 + 23 + conn *websocket.Conn 24 + connMu sync.Mutex 25 + 26 + watchedDIDs map[string]struct{} 27 + watchedDIDsMu sync.RWMutex 28 + 29 + endpointIdx int 30 + stopCh chan struct{} 31 + wg sync.WaitGroup 32 + } 33 + 34 + type profileOptionsUpdate struct { 35 + Type string `json:"type"` 36 + Payload struct { 37 + WantedCollections []string `json:"wantedCollections"` 38 + WantedDids []string `json:"wantedDids"` 39 + } `json:"payload"` 40 + } 41 + 42 + // NewProfileWatcher creates a ProfileWatcher seeded with all currently known 43 + // Arabica DIDs from the index. 44 + func NewProfileWatcher(config *Config, index *FeedIndex) *ProfileWatcher { 45 + dids, _ := index.GetKnownDIDs(context.Background()) 46 + watched := make(map[string]struct{}, len(dids)) 47 + for _, did := range dids { 48 + watched[did] = struct{}{} 49 + } 50 + return &ProfileWatcher{ 51 + index: index, 52 + endpoints: config.Endpoints, 53 + watchedDIDs: watched, 54 + stopCh: make(chan struct{}), 55 + } 56 + } 57 + 58 + // Watch adds a DID to the subscription. If connected, an options update is sent 59 + // immediately so Jetstream begins delivering that user's profile events. 60 + func (pw *ProfileWatcher) Watch(did string) { 61 + pw.watchedDIDsMu.Lock() 62 + _, already := pw.watchedDIDs[did] 63 + pw.watchedDIDs[did] = struct{}{} 64 + pw.watchedDIDsMu.Unlock() 65 + 66 + if !already { 67 + pw.sendOptionsUpdate() 68 + } 69 + } 70 + 71 + // Start begins the profile watcher in a background goroutine. It will reconnect 72 + // automatically on failure, rotating through endpoints with exponential backoff. 73 + func (pw *ProfileWatcher) Start(ctx context.Context) { 74 + pw.wg.Add(1) 75 + go func() { 76 + defer pw.wg.Done() 77 + pw.run(ctx) 78 + }() 79 + } 80 + 81 + // Stop gracefully shuts down the watcher. 82 + func (pw *ProfileWatcher) Stop() { 83 + close(pw.stopCh) 84 + pw.connMu.Lock() 85 + if pw.conn != nil { 86 + pw.conn.Close() 87 + } 88 + pw.connMu.Unlock() 89 + pw.wg.Wait() 90 + } 91 + 92 + func (pw *ProfileWatcher) run(ctx context.Context) { 93 + backoff := time.Second 94 + maxBackoff := 30 * time.Second 95 + 96 + for { 97 + select { 98 + case <-ctx.Done(): 99 + return 100 + case <-pw.stopCh: 101 + return 102 + default: 103 + } 104 + 105 + // Skip connecting if we have no DIDs to watch yet — wait for the first Watch() call 106 + pw.watchedDIDsMu.RLock() 107 + n := len(pw.watchedDIDs) 108 + pw.watchedDIDsMu.RUnlock() 109 + if n == 0 { 110 + select { 111 + case <-ctx.Done(): 112 + return 113 + case <-pw.stopCh: 114 + return 115 + case <-time.After(5 * time.Second): 116 + } 117 + continue 118 + } 119 + 120 + endpoint := pw.endpoints[pw.endpointIdx] 121 + err := pw.connectAndConsume(ctx, endpoint) 122 + 123 + if err != nil { 124 + log.Warn().Err(err).Str("endpoint", endpoint).Msg("profile watcher: connection error") 125 + pw.endpointIdx = (pw.endpointIdx + 1) % len(pw.endpoints) 126 + 127 + select { 128 + case <-ctx.Done(): 129 + return 130 + case <-pw.stopCh: 131 + return 132 + case <-time.After(backoff): 133 + } 134 + 135 + backoff *= 2 136 + if backoff > maxBackoff { 137 + backoff = maxBackoff 138 + } 139 + } else { 140 + backoff = time.Second 141 + } 142 + } 143 + } 144 + 145 + func (pw *ProfileWatcher) connectAndConsume(ctx context.Context, endpoint string) error { 146 + wsURL := pw.buildURL(endpoint) 147 + log.Info().Str("url", wsURL).Msg("profile watcher: connecting") 148 + 149 + dialer := websocket.Dialer{HandshakeTimeout: 10 * time.Second} 150 + conn, _, err := dialer.DialContext(ctx, wsURL, nil) 151 + if err != nil { 152 + return fmt.Errorf("failed to connect: %w", err) 153 + } 154 + 155 + pw.connMu.Lock() 156 + pw.conn = conn 157 + pw.connMu.Unlock() 158 + 159 + log.Info().Str("endpoint", endpoint).Msg("profile watcher: connected") 160 + 161 + defer func() { 162 + pw.connMu.Lock() 163 + if pw.conn != nil { 164 + pw.conn.Close() 165 + pw.conn = nil 166 + } 167 + pw.connMu.Unlock() 168 + }() 169 + 170 + for { 171 + select { 172 + case <-ctx.Done(): 173 + return ctx.Err() 174 + case <-pw.stopCh: 175 + return nil 176 + default: 177 + } 178 + 179 + conn.SetReadDeadline(time.Now().Add(60 * time.Second)) 180 + _, message, err := conn.ReadMessage() 181 + if err != nil { 182 + return fmt.Errorf("read error: %w", err) 183 + } 184 + 185 + pw.processMessage(message) 186 + } 187 + } 188 + 189 + func (pw *ProfileWatcher) buildURL(endpoint string) string { 190 + u, _ := url.Parse(endpoint) 191 + q := u.Query() 192 + q.Set("wantedCollections", NSIDBlueskyProfile) 193 + 194 + pw.watchedDIDsMu.RLock() 195 + for did := range pw.watchedDIDs { 196 + q.Add("wantedDids", did) 197 + } 198 + pw.watchedDIDsMu.RUnlock() 199 + 200 + u.RawQuery = q.Encode() 201 + return u.String() 202 + } 203 + 204 + func (pw *ProfileWatcher) sendOptionsUpdate() { 205 + pw.connMu.Lock() 206 + conn := pw.conn 207 + pw.connMu.Unlock() 208 + 209 + if conn == nil { 210 + return // will be applied via URL on next reconnect 211 + } 212 + 213 + pw.watchedDIDsMu.RLock() 214 + dids := make([]string, 0, len(pw.watchedDIDs)) 215 + for did := range pw.watchedDIDs { 216 + dids = append(dids, did) 217 + } 218 + pw.watchedDIDsMu.RUnlock() 219 + 220 + var msg profileOptionsUpdate 221 + msg.Type = "options_update" 222 + msg.Payload.WantedCollections = []string{NSIDBlueskyProfile} 223 + msg.Payload.WantedDids = dids 224 + 225 + data, err := json.Marshal(msg) 226 + if err != nil { 227 + return 228 + } 229 + 230 + pw.connMu.Lock() 231 + defer pw.connMu.Unlock() 232 + if pw.conn != nil { 233 + if err := pw.conn.WriteMessage(websocket.TextMessage, data); err != nil { 234 + log.Warn().Err(err).Msg("profile watcher: failed to send options update") 235 + } 236 + } 237 + } 238 + 239 + func (pw *ProfileWatcher) processMessage(data []byte) { 240 + var event JetstreamEvent 241 + if err := json.Unmarshal(data, &event); err != nil || event.Kind != "commit" || event.Commit == nil { 242 + return 243 + } 244 + if event.Commit.Collection != NSIDBlueskyProfile { 245 + return 246 + } 247 + if event.Commit.Operation == "create" || event.Commit.Operation == "update" { 248 + pw.index.InvalidateProfile(event.DID) 249 + log.Debug().Str("did", event.DID).Msg("profile watcher: invalidated profile cache") 250 + } 251 + }
+8 -36
internal/handlers/handlers.go
··· 8 8 "net/http" 9 9 "strconv" 10 10 "strings" 11 - "sync" 12 - "time" 13 11 14 12 "arabica/internal/atproto" 15 13 "arabica/internal/database" ··· 27 25 "github.com/rs/zerolog/log" 28 26 ) 29 27 30 - // profileCacheTTL controls how long user profiles are cached before re-fetching. 31 - // Profiles (avatar, display name) change infrequently so 1 hour is reasonable. 32 - const profileCacheTTL = 1 * time.Hour 33 - 34 - // cachedProfile holds a user profile with its fetch timestamp. 35 - type cachedProfile struct { 36 - profile *bff.UserProfile 37 - cachedAt time.Time 38 - } 39 28 40 29 // Config holds handler configuration options 41 30 type Config struct { ··· 70 59 pdsAdminURL string 71 60 pdsAdminToken string 72 61 73 - // profileCache caches user profiles (avatar, handle) by DID to avoid 74 - // hitting the Bluesky API on every page load. 75 - profileCache map[string]*cachedProfile 76 - profileCacheMu sync.RWMutex 77 62 } 78 63 79 64 // NewHandler creates a new Handler with all required dependencies. ··· 93 78 config: config, 94 79 feedService: feedService, 95 80 feedRegistry: feedRegistry, 96 - profileCache: make(map[string]*cachedProfile), 97 81 } 98 82 } 99 83 ··· 216 200 } 217 201 218 202 // getUserProfile fetches the profile for an authenticated user. 219 - // Results are cached by DID for profileCacheTTL to avoid hitting the 220 - // Bluesky API on every page load. 203 + // Routes through feedIndex (invalidated by ProfileWatcher on profile updates) 204 + // so the header stays fresh without a separate cache layer. 221 205 // Returns nil if unable to fetch profile (non-fatal error). 222 206 func (h *Handler) getUserProfile(ctx context.Context, did string) *bff.UserProfile { 223 207 if did == "" { 224 208 return nil 225 209 } 226 210 227 - // Check cache 228 - h.profileCacheMu.RLock() 229 - if cached, ok := h.profileCache[did]; ok && time.Since(cached.cachedAt) < profileCacheTTL { 230 - h.profileCacheMu.RUnlock() 231 - return cached.profile 211 + var profile *atproto.Profile 212 + var err error 213 + if h.feedIndex != nil { 214 + profile, err = h.feedIndex.GetProfile(ctx, did) 215 + } else { 216 + profile, err = atproto.NewPublicClient().GetProfile(ctx, did) 232 217 } 233 - h.profileCacheMu.RUnlock() 234 - 235 - publicClient := atproto.NewPublicClient() 236 - profile, err := publicClient.GetProfile(ctx, did) 237 218 if err != nil { 238 219 log.Warn().Err(err).Str("did", did).Msg("Failed to fetch user profile for header") 239 220 return nil ··· 248 229 if profile.Avatar != nil { 249 230 userProfile.Avatar = *profile.Avatar 250 231 } 251 - 252 - // Store in cache 253 - h.profileCacheMu.Lock() 254 - h.profileCache[did] = &cachedProfile{ 255 - profile: userProfile, 256 - cachedAt: time.Now(), 257 - } 258 - h.profileCacheMu.Unlock() 259 - 260 232 return userProfile 261 233 } 262 234