Coffee journaling on ATProto (alpha) alpha.arabica.social
coffee
17
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: witness cache to reduce PDS load

docs: witness cache plan

fix: invalidate session cache on record change, fetch from PDS

- Doesn't wait for witness cache to catch up since that takes time

authored by

Patrick Dewey and committed by tangled.org 7e335d19 357c2340

+1184 -34
+1
.prettierrc
··· 1 + { "proseWrap": "always", "printWidth": 80 }
+4
cmd/server/main.go
··· 351 351 // Wire up the feed index for like functionality 352 352 h.SetFeedIndex(feedIndex) 353 353 354 + // Wire up the witness cache so reads are served from the firehose index 355 + // before falling back to the PDS 356 + h.SetWitnessCache(feedIndex) 357 + 354 358 // Initialize moderation service and wire up to handler 355 359 moderatorsConfigPath := os.Getenv("ARABICA_MODERATORS_CONFIG") 356 360 moderationSvc, err := moderation.NewService(moderatorsConfigPath)
+373
docs/witness-cache-plan.md
··· 1 + # Witness Cache Implementation Plan 2 + 3 + **Goal:** Eliminate redundant PDS requests by using the firehose SQLite index as 4 + a local read cache for Arabica records. 5 + 6 + **Date:** 2026-03-22 7 + 8 + --- 9 + 10 + ## Problem 11 + 12 + Several pages make excessive PDS XRPC calls per page load: 13 + 14 + | Page | PDS Calls | Breakdown | 15 + | --------- | --------- | ----------------------------------------------------------------------------- | 16 + | Brew View | 5-7 | 1 brew + 4-5 sequential ref resolves (bean, roaster, grinder, brewer, recipe) | 17 + | Profile | 6-7 | 5 listRecords (all collections) + handle resolve + profile fetch | 18 + | Manage | 5+ | listRecords for all entity types | 19 + | Brew Form | 5+ | All entities for select dropdowns | 20 + 21 + The in-memory `SessionCache` (2min TTL, per-session) helps on rapid 22 + re-navigation but is volatile — server restarts and new sessions trigger full 23 + PDS re-fetches. 24 + 25 + ## Architecture 26 + 27 + Three-tier read path: 28 + 29 + ``` 30 + Request → L1: SessionCache (in-memory, 2min TTL, parsed Go structs) 31 + ↓ miss 32 + L2: WitnessCache (firehose SQLite, updated in real-time via relay) 33 + ↓ miss 34 + L3: PDS (remote XRPC calls) 35 + ``` 36 + 37 + The firehose index already stores every Arabica record it sees from the relay. 38 + We expose it as a read-only `WitnessCache` interface and inject it into 39 + `AtprotoStore`. 40 + 41 + Writes always go to PDS. The firehose keeps the witness cache consistent 42 + asynchronously (typically <1s propagation delay). 43 + 44 + ## Design Decisions 45 + 46 + ### Use firehose index directly, not a separate SQLite cache 47 + 48 + The firehose index already has all the data. A dedicated cache would duplicate 49 + storage and require its own invalidation logic. The firehose's relay 50 + subscription handles consistency for free. 51 + 52 + ### Resolve references from witness cache too 53 + 54 + The WIP branch's biggest flaw: after fetching a brew from the witness cache, it 55 + still called `ResolveBrewRefs` which made 4-5 individual PDS calls for the 56 + referenced bean/grinder/brewer/recipe. All of those records are also in the 57 + firehose index. 58 + 59 + A single brew view should go from 5-7 PDS calls to 0 when the witness cache is 60 + warm. 61 + 62 + ### Keep SessionCache as L1 63 + 64 + SessionCache holds parsed Go structs — zero deserialization cost on hit. The 65 + witness cache (L2) requires JSON → struct conversion but avoids network. Both 66 + layers serve different performance profiles. 67 + 68 + ### Don't cache profiles/handles in witness cache 69 + 70 + Profiles come from the AppView/PLC directory, not Arabica lexicons. The existing 71 + `ARABICA_PROFILE_CACHE_TTL` (1h) handles those. The witness cache only covers 72 + Arabica record collections. 73 + 74 + ### Fall through to PDS after writes 75 + 76 + When a user creates/updates/deletes, the SessionCache is invalidated (already 77 + happens today). The next read falls through to PDS since the firehose may not 78 + have propagated the change yet. This preserves read-your-writes consistency 79 + without any new invalidation logic. 80 + 81 + --- 82 + 83 + ## Tasks 84 + 85 + ### Task 1: WitnessCache interface and FeedIndex implementation 86 + 87 + **Files:** 88 + 89 + - Create: `internal/atproto/witness.go` 90 + - Modify: `internal/firehose/index.go` 91 + 92 + Define the interface in `atproto` (avoids import cycle with `firehose`): 93 + 94 + ```go 95 + // internal/atproto/witness.go 96 + package atproto 97 + 98 + type WitnessRecord struct { 99 + URI string 100 + DID string 101 + Collection string 102 + RKey string 103 + CID string 104 + Record json.RawMessage 105 + IndexedAt time.Time 106 + CreatedAt time.Time 107 + } 108 + 109 + type WitnessCache interface { 110 + // Returns (nil, nil) when not found. 111 + GetWitnessRecord(ctx context.Context, uri string) (*WitnessRecord, error) 112 + // Returns empty slice when none found. 113 + ListWitnessRecords(ctx context.Context, did, collection string) ([]*WitnessRecord, error) 114 + } 115 + ``` 116 + 117 + Add composite index to firehose schema for efficient DID+collection queries: 118 + 119 + ```sql 120 + CREATE INDEX IF NOT EXISTS idx_records_did_coll ON records(did, collection, created_at DESC); 121 + ``` 122 + 123 + Implement `GetWitnessRecord` and `ListWitnessRecords` on `FeedIndex`, with 124 + compile-time interface check: 125 + 126 + ```go 127 + var _ atproto.WitnessCache = (*FeedIndex)(nil) 128 + ``` 129 + 130 + --- 131 + 132 + ### Task 2: Witness cache helpers in AtprotoStore 133 + 134 + **Files:** 135 + 136 + - Modify: `internal/atproto/store.go` 137 + 138 + Add `witnessCache WitnessCache` field to `AtprotoStore`. Add 139 + `NewAtprotoStoreWithWitness` constructor. 140 + 141 + Add two private helpers: 142 + 143 + ```go 144 + // getFromWitness fetches a single record by collection+rkey. 145 + // Returns nil when cache is not configured or record not found. 146 + func (s *AtprotoStore) getFromWitness(ctx context.Context, collection, rkey string) *WitnessRecord 147 + 148 + // listFromWitness returns all cached records for a collection. 149 + // Returns nil when cache is not configured or empty. 150 + func (s *AtprotoStore) listFromWitness(ctx context.Context, collection string) []*WitnessRecord 151 + ``` 152 + 153 + Add `witnessRecordToMap` for converting `json.RawMessage` into the 154 + `map[string]interface{}` format the existing `Record*` conversion functions 155 + expect. 156 + 157 + --- 158 + 159 + ### Task 3: Cache-first List operations 160 + 161 + **Files:** 162 + 163 + - Modify: `internal/atproto/store.go` 164 + 165 + For each `List*` method (ListBeans, ListRoasters, ListGrinders, ListBrewers, 166 + ListRecipes, ListBrews): 167 + 168 + 1. Check SessionCache (unchanged) 169 + 2. Try `listFromWitness` — if records found, convert and populate SessionCache 170 + 3. Fall through to PDS on miss 171 + 172 + The read path becomes: 173 + 174 + ```go 175 + func (s *AtprotoStore) ListBeans(ctx context.Context) ([]*models.Bean, error) { 176 + // L1: session cache 177 + if cached := s.cache.Get(s.sessionID); cached != nil && cached.Beans != nil && cached.IsValid() { 178 + return cached.Beans, nil 179 + } 180 + // L2: witness cache 181 + if wRecords := s.listFromWitness(ctx, NSIDBean); wRecords != nil { 182 + beans := convertWitnessRecordsToBeans(wRecords) 183 + s.cache.SetBeans(s.sessionID, beans) 184 + return beans, nil 185 + } 186 + // L3: PDS (existing code, unchanged) 187 + ... 188 + } 189 + ``` 190 + 191 + --- 192 + 193 + ### Task 4: Cache-first Get operations with reference resolution from witness 194 + 195 + **Files:** 196 + 197 + - Modify: `internal/atproto/store.go` 198 + 199 + This is the highest-impact change. For `GetBrewByRKey`: 200 + 201 + 1. Get brew from witness cache 202 + 2. Get each reference (bean, grinder, brewer, recipe) from witness cache too 203 + 3. Only fall back to PDS if any witness lookup misses 204 + 205 + ```go 206 + func (s *AtprotoStore) GetBrewByRKey(ctx context.Context, rkey string) (*models.Brew, error) { 207 + if wr := s.getFromWitness(ctx, NSIDBrew, rkey); wr != nil { 208 + brew := convertWitnessRecordToBrew(wr) 209 + 210 + // Resolve refs from witness cache — NOT from PDS 211 + if beanURI != "" { 212 + if beanWR, _ := s.witnessCache.GetWitnessRecord(ctx, beanURI); beanWR != nil { 213 + brew.Bean = convertWitnessRecordToBean(beanWR) 214 + // Resolve roaster ref from witness too 215 + if roasterURI != "" { 216 + if roasterWR, _ := s.witnessCache.GetWitnessRecord(ctx, roasterURI); roasterWR != nil { 217 + brew.Bean.Roaster = convertWitnessRecordToRoaster(roasterWR) 218 + } 219 + } 220 + } 221 + } 222 + // Same for grinder, brewer, recipe... 223 + 224 + return brew, nil 225 + } 226 + 227 + // PDS fallback (existing code) 228 + ... 229 + } 230 + ``` 231 + 232 + **Impact:** Brew view page goes from 5-7 PDS calls to 0. 233 + 234 + Apply the same witness-first reference resolution to `GetBeanByRKey` (resolves 235 + roaster ref) and any other `Get*ByRKey` that resolves references. 236 + 237 + --- 238 + 239 + ### Task 5: Witness cache for public/profile reads 240 + 241 + **Files:** 242 + 243 + - Modify: `internal/handlers/profile.go` 244 + - Modify: `internal/handlers/brew.go` (public brew view path) 245 + - Modify: `internal/handlers/entity_views.go` (public entity view paths) 246 + 247 + The profile page and shared view pages use `PublicClient` which bypasses both 248 + SessionCache and WitnessCache entirely. The firehose index has these records for 249 + all known users. 250 + 251 + Give handlers access to the witness cache for public reads: 252 + 253 + ```go 254 + func (h *Handler) HandleProfilePage(w http.ResponseWriter, r *http.Request) { 255 + // Try witness cache for the target user's records 256 + if beans := h.listFromWitnessPublic(ctx, targetDID, NSIDBean); beans != nil { 257 + // Use cached data, skip PDS calls 258 + } 259 + } 260 + ``` 261 + 262 + Or create a `PublicWitnessStore` wrapper: 263 + 264 + ```go 265 + type PublicWitnessStore struct { 266 + witness WitnessCache 267 + publicClient *PublicClient // fallback 268 + } 269 + ``` 270 + 271 + This could be a follow-up if scoping is a concern — the authenticated path 272 + (Tasks 3-4) covers the most common case. 273 + 274 + --- 275 + 276 + ### Task 6: Metrics 277 + 278 + **Files:** 279 + 280 + - Modify: `internal/metrics/metrics.go` 281 + 282 + Add Prometheus counters: 283 + 284 + ```go 285 + var ( 286 + WitnessCacheHitsTotal = promauto.NewCounterVec(prometheus.CounterOpts{ 287 + Name: "arabica_witness_cache_hits_total", 288 + Help: "Total witness cache hits (PDS request avoided)", 289 + }, []string{"collection"}) 290 + 291 + WitnessCacheMissesTotal = promauto.NewCounterVec(prometheus.CounterOpts{ 292 + Name: "arabica_witness_cache_misses_total", 293 + Help: "Total witness cache misses (fell back to PDS)", 294 + }, []string{"collection"}) 295 + ) 296 + ``` 297 + 298 + Instrument every witness cache check point in store.go. This is critical for 299 + validating hit rates and identifying remaining cold-start gaps. 300 + 301 + --- 302 + 303 + ### Task 7: Wire into main.go and handlers 304 + 305 + **Files:** 306 + 307 + - Modify: `internal/handlers/handlers.go` 308 + - Modify: `cmd/server/main.go` 309 + 310 + Add `witnessCache atproto.WitnessCache` field to `Handler`. Add 311 + `SetWitnessCache` method. 312 + 313 + In `getAtprotoStore`, use `NewAtprotoStoreWithWitness` when witness cache is 314 + configured: 315 + 316 + ```go 317 + func (h *Handler) getAtprotoStore(r *http.Request) (database.Store, bool) { 318 + ... 319 + if h.witnessCache != nil { 320 + return atproto.NewAtprotoStoreWithWitness(client, did, sessionID, h.sessionCache, h.witnessCache), true 321 + } 322 + return atproto.NewAtprotoStore(client, did, sessionID, h.sessionCache), true 323 + } 324 + ``` 325 + 326 + In main.go, wire `feedIndex` as the witness cache after it's initialized. 327 + 328 + --- 329 + 330 + ### Task 8: Public feed cache invalidation 331 + 332 + **Files:** 333 + 334 + - Modify: `internal/feed/service.go` 335 + - Modify: `internal/handlers/brew.go`, `internal/handlers/entities.go` 336 + 337 + Add `InvalidatePublicFeedCache()` method to `feed.Service`. Call it from 338 + create/update/delete handlers so unauthenticated feed views reflect changes 339 + immediately. 340 + 341 + This was included in the WIP and is a good standalone improvement regardless of 342 + the witness cache. 343 + 344 + --- 345 + 346 + ## Non-Goals 347 + 348 + - **Profile/handle caching** — handled by existing `ARABICA_PROFILE_CACHE_TTL` 349 + - **CID-based staleness detection** — future optimization, not needed for v1 350 + - **Separate SQLite cache database** — firehose index already has the data 351 + - **Increasing SessionCache TTL** — 2 minutes is correct for multi-device sync 352 + 353 + ## Risks and Mitigations 354 + 355 + | Risk | Mitigation | 356 + | ------------------------------------------------------ | ------------------------------------------------------------------------- | 357 + | Firehose lag causes stale reads | PDS fallback on SessionCache invalidation after writes | 358 + | New user before firehose backfill | Witness miss → PDS fallback (transparent) | 359 + | Firehose down | Witness returns empty → PDS fallback (existing behavior) | 360 + | JSON parsing differences between witness and PDS paths | Same `Record*` conversion functions, same `map[string]interface{}` format | 361 + | Schema changes to firehose records table | WitnessCache interface isolates AtprotoStore from schema details | 362 + 363 + ## Expected Impact 364 + 365 + | Page | Before | After (warm cache) | 366 + | --------- | ------------- | ------------------ | 367 + | Brew View | 5-7 PDS calls | 0 | 368 + | Profile | 6-7 PDS calls | 0 (with Task 5) | 369 + | Manage | 5+ PDS calls | 0 | 370 + | Brew Form | 5+ PDS calls | 0 | 371 + 372 + Server restarts no longer cause a thundering herd of PDS requests — the witness 373 + cache survives restarts since it's backed by SQLite on disk.
+54 -7
internal/atproto/cache.go
··· 21 21 Recipes []*models.Recipe 22 22 Brews []*models.Brew 23 23 Timestamp time.Time 24 + // DirtyCollections tracks collections that were recently written to. 25 + // When a collection is dirty, the witness cache should be skipped 26 + // because firehose indexing may not have caught up yet. 27 + DirtyCollections map[string]bool 24 28 } 25 29 26 30 // IsValid returns true if the cache is still valid ··· 31 35 return time.Since(c.Timestamp) < CacheTTL 32 36 } 33 37 38 + // IsDirty returns true if the given collection was recently written to 39 + // and the witness cache should be skipped. 40 + func (c *UserCache) IsDirty(collection string) bool { 41 + if c == nil || c.DirtyCollections == nil { 42 + return false 43 + } 44 + return c.DirtyCollections[collection] 45 + } 46 + 34 47 // clone creates a shallow copy of the UserCache for safe modification 35 48 func (c *UserCache) clone() *UserCache { 36 49 if c == nil { 37 50 return &UserCache{Timestamp: time.Now()} 38 51 } 52 + // Copy dirty collections map 53 + var dirty map[string]bool 54 + if c.DirtyCollections != nil { 55 + dirty = make(map[string]bool, len(c.DirtyCollections)) 56 + for k, v := range c.DirtyCollections { 57 + dirty[k] = v 58 + } 59 + } 39 60 return &UserCache{ 40 - Beans: c.Beans, 41 - Roasters: c.Roasters, 42 - Grinders: c.Grinders, 43 - Brewers: c.Brewers, 44 - Recipes: c.Recipes, 45 - Brews: c.Brews, 46 - Timestamp: c.Timestamp, 61 + Beans: c.Beans, 62 + Roasters: c.Roasters, 63 + Grinders: c.Grinders, 64 + Brewers: c.Brewers, 65 + Recipes: c.Recipes, 66 + Brews: c.Brews, 67 + Timestamp: c.Timestamp, 68 + DirtyCollections: dirty, 47 69 } 48 70 } 49 71 ··· 145 167 sc.caches[sessionID] = newCache 146 168 } 147 169 170 + // markDirty sets a collection as dirty on the given cache, initializing the map if needed. 171 + func markDirty(cache *UserCache, collection string) { 172 + if cache.DirtyCollections == nil { 173 + cache.DirtyCollections = make(map[string]bool) 174 + } 175 + cache.DirtyCollections[collection] = true 176 + } 177 + 178 + // ClearDirty removes the dirty flag for a collection after fresh PDS data has been cached. 179 + func (sc *SessionCache) ClearDirty(sessionID, collection string) { 180 + sc.mu.Lock() 181 + defer sc.mu.Unlock() 182 + if cache, ok := sc.caches[sessionID]; ok { 183 + newCache := cache.clone() 184 + delete(newCache.DirtyCollections, collection) 185 + sc.caches[sessionID] = newCache 186 + } 187 + } 188 + 148 189 // InvalidateBeans marks that beans need to be refreshed using copy-on-write 149 190 func (sc *SessionCache) InvalidateBeans(sessionID string) { 150 191 sc.mu.Lock() ··· 152 193 if cache, ok := sc.caches[sessionID]; ok { 153 194 newCache := cache.clone() 154 195 newCache.Beans = nil 196 + markDirty(newCache, NSIDBean) 155 197 sc.caches[sessionID] = newCache 156 198 } 157 199 } ··· 165 207 newCache.Roasters = nil 166 208 // Also invalidate beans since they reference roasters 167 209 newCache.Beans = nil 210 + markDirty(newCache, NSIDRoaster) 168 211 sc.caches[sessionID] = newCache 169 212 } 170 213 } ··· 176 219 if cache, ok := sc.caches[sessionID]; ok { 177 220 newCache := cache.clone() 178 221 newCache.Grinders = nil 222 + markDirty(newCache, NSIDGrinder) 179 223 sc.caches[sessionID] = newCache 180 224 } 181 225 } ··· 187 231 if cache, ok := sc.caches[sessionID]; ok { 188 232 newCache := cache.clone() 189 233 newCache.Brewers = nil 234 + markDirty(newCache, NSIDBrewer) 190 235 sc.caches[sessionID] = newCache 191 236 } 192 237 } ··· 198 243 if cache, ok := sc.caches[sessionID]; ok { 199 244 newCache := cache.clone() 200 245 newCache.Recipes = nil 246 + markDirty(newCache, NSIDRecipe) 201 247 sc.caches[sessionID] = newCache 202 248 } 203 249 } ··· 209 255 if cache, ok := sc.caches[sessionID]; ok { 210 256 newCache := cache.clone() 211 257 newCache.Brews = nil 258 + markDirty(newCache, NSIDBrew) 212 259 sc.caches[sessionID] = newCache 213 260 } 214 261 }
+602 -26
internal/atproto/store.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "encoding/json" 5 6 "fmt" 6 7 "time" 7 8 8 9 "arabica/internal/database" 10 + "arabica/internal/metrics" 9 11 "arabica/internal/models" 10 12 11 13 "github.com/bluesky-social/indigo/atproto/syntax" ··· 16 18 // Context is passed as a parameter to each method rather than stored in the struct, 17 19 // following Go best practices for context propagation. 18 20 type AtprotoStore struct { 19 - client *Client 20 - did syntax.DID 21 - sessionID string 22 - cache *SessionCache 21 + client *Client 22 + did syntax.DID 23 + sessionID string 24 + cache *SessionCache 25 + witnessCache WitnessCache // optional; enables cache-first reads without PDS calls 23 26 } 24 27 25 28 // NewAtprotoStore creates a new atproto store for a specific user session. ··· 33 36 } 34 37 } 35 38 39 + // NewAtprotoStoreWithWitness creates a store that uses the witness cache for 40 + // cache-first reads, falling back to the PDS on cache misses. 41 + func NewAtprotoStoreWithWitness(client *Client, did syntax.DID, sessionID string, cache *SessionCache, witness WitnessCache) database.Store { 42 + return &AtprotoStore{ 43 + client: client, 44 + did: did, 45 + sessionID: sessionID, 46 + cache: cache, 47 + witnessCache: witness, 48 + } 49 + } 50 + 51 + // witnessRecordToMap unmarshals a WitnessRecord's raw JSON into the map format 52 + // expected by the existing Record* conversion functions. 53 + func witnessRecordToMap(wr *WitnessRecord) (map[string]interface{}, error) { 54 + var m map[string]interface{} 55 + if err := json.Unmarshal(wr.Record, &m); err != nil { 56 + return nil, err 57 + } 58 + return m, nil 59 + } 60 + 61 + // getFromWitness fetches a single record by collection+rkey from the witness cache. 62 + // Returns nil when the cache is not configured or the record is not found. 63 + func (s *AtprotoStore) getFromWitness(ctx context.Context, collection, rkey string) *WitnessRecord { 64 + if s.witnessCache == nil { 65 + return nil 66 + } 67 + uri := BuildATURI(s.did.String(), collection, rkey) 68 + wr, err := s.witnessCache.GetWitnessRecord(ctx, uri) 69 + if err != nil { 70 + log.Debug().Err(err).Str("uri", uri).Msg("witness: GetWitnessRecord error") 71 + return nil 72 + } 73 + return wr 74 + } 75 + 76 + // listFromWitness returns all cached records for a collection. 77 + // Returns nil when the cache is not configured or returns nothing. 78 + // Skips the witness cache if the collection was recently written to 79 + // (dirty), since the firehose may not have indexed the new record yet. 80 + func (s *AtprotoStore) listFromWitness(ctx context.Context, collection string) []*WitnessRecord { 81 + if s.witnessCache == nil { 82 + return nil 83 + } 84 + // Skip witness cache for collections with pending writes 85 + if userCache := s.cache.Get(s.sessionID); userCache.IsDirty(collection) { 86 + log.Debug().Str("collection", collection).Msg("witness: skipping dirty collection, falling back to PDS") 87 + return nil 88 + } 89 + records, err := s.witnessCache.ListWitnessRecords(ctx, s.did.String(), collection) 90 + if err != nil { 91 + log.Debug().Err(err).Str("collection", collection).Msg("witness: ListWitnessRecords error") 92 + return nil 93 + } 94 + if len(records) == 0 { 95 + return nil 96 + } 97 + return records 98 + } 99 + 100 + // getWitnessRecordByURI fetches a single record by full AT-URI from the witness cache. 101 + // Returns nil when the cache is not configured or the record is not found. 102 + func (s *AtprotoStore) getWitnessRecordByURI(ctx context.Context, uri string) *WitnessRecord { 103 + if s.witnessCache == nil { 104 + return nil 105 + } 106 + wr, err := s.witnessCache.GetWitnessRecord(ctx, uri) 107 + if err != nil { 108 + log.Debug().Err(err).Str("uri", uri).Msg("witness: GetWitnessRecord error") 109 + return nil 110 + } 111 + return wr 112 + } 113 + 114 + // resolveBrewRefsFromWitness resolves a brew's references (bean, grinder, brewer, recipe) 115 + // entirely from the witness cache, avoiding any PDS calls. Falls back to PDS-based resolution 116 + // only if a witness lookup fails for any referenced record. 117 + func (s *AtprotoStore) resolveBrewRefsFromWitness(ctx context.Context, brew *models.Brew, record map[string]interface{}) { 118 + // Resolve bean (and its roaster) 119 + if beanRef, _ := record["beanRef"].(string); beanRef != "" { 120 + if beanWR := s.getWitnessRecordByURI(ctx, beanRef); beanWR != nil { 121 + if beanMap, err := witnessRecordToMap(beanWR); err == nil { 122 + if bean, err := RecordToBean(beanMap, beanWR.URI); err == nil { 123 + bean.RKey = beanWR.RKey 124 + // Resolve roaster ref from witness too 125 + if roasterRef, ok := beanMap["roasterRef"].(string); ok && roasterRef != "" { 126 + if c, err := ResolveATURI(roasterRef); err == nil { 127 + bean.RoasterRKey = c.RKey 128 + } 129 + if roasterWR := s.getWitnessRecordByURI(ctx, roasterRef); roasterWR != nil { 130 + if roasterMap, err := witnessRecordToMap(roasterWR); err == nil { 131 + if roaster, err := RecordToRoaster(roasterMap, roasterWR.URI); err == nil { 132 + roaster.RKey = roasterWR.RKey 133 + bean.Roaster = roaster 134 + } 135 + } 136 + } 137 + } 138 + brew.Bean = bean 139 + } 140 + } 141 + } 142 + } 143 + 144 + // Resolve grinder 145 + if grinderRef, _ := record["grinderRef"].(string); grinderRef != "" { 146 + if grinderWR := s.getWitnessRecordByURI(ctx, grinderRef); grinderWR != nil { 147 + if grinderMap, err := witnessRecordToMap(grinderWR); err == nil { 148 + if grinder, err := RecordToGrinder(grinderMap, grinderWR.URI); err == nil { 149 + grinder.RKey = grinderWR.RKey 150 + brew.GrinderObj = grinder 151 + } 152 + } 153 + } 154 + } 155 + 156 + // Resolve brewer 157 + if brewerRef, _ := record["brewerRef"].(string); brewerRef != "" { 158 + if brewerWR := s.getWitnessRecordByURI(ctx, brewerRef); brewerWR != nil { 159 + if brewerMap, err := witnessRecordToMap(brewerWR); err == nil { 160 + if brewer, err := RecordToBrewer(brewerMap, brewerWR.URI); err == nil { 161 + brewer.RKey = brewerWR.RKey 162 + brew.BrewerObj = brewer 163 + } 164 + } 165 + } 166 + } 167 + 168 + // Resolve recipe 169 + if recipeRef, _ := record["recipeRef"].(string); recipeRef != "" { 170 + if recipeWR := s.getWitnessRecordByURI(ctx, recipeRef); recipeWR != nil { 171 + if recipeMap, err := witnessRecordToMap(recipeWR); err == nil { 172 + if recipe, err := RecordToRecipe(recipeMap, recipeWR.URI); err == nil { 173 + recipe.RKey = recipeWR.RKey 174 + // Resolve recipe's brewer ref from witness 175 + if brewerRef, ok := recipeMap["brewerRef"].(string); ok && brewerRef != "" { 176 + if c, err := ResolveATURI(brewerRef); err == nil { 177 + recipe.BrewerRKey = c.RKey 178 + } 179 + if brewerWR := s.getWitnessRecordByURI(ctx, brewerRef); brewerWR != nil { 180 + if brewerMap, err := witnessRecordToMap(brewerWR); err == nil { 181 + if brewer, err := RecordToBrewer(brewerMap, brewerWR.URI); err == nil { 182 + brewer.RKey = brewerWR.RKey 183 + recipe.BrewerObj = brewer 184 + } 185 + } 186 + } 187 + } 188 + brew.RecipeObj = recipe 189 + } 190 + } 191 + } 192 + } 193 + } 194 + 36 195 // ========== Brew Helpers ========== 37 196 38 197 // extractBrewRefRKeys extracts rkeys from AT-URI references in a brew record's raw values. ··· 151 310 } 152 311 153 312 func (s *AtprotoStore) GetBrewByRKey(ctx context.Context, rkey string) (*models.Brew, error) { 313 + // Try witness cache — resolve the brew AND its references from cache 314 + if wr := s.getFromWitness(ctx, NSIDBrew, rkey); wr != nil { 315 + m, err := witnessRecordToMap(wr) 316 + if err == nil { 317 + brew, err := RecordToBrew(m, wr.URI) 318 + if err == nil { 319 + metrics.WitnessCacheHitsTotal.WithLabelValues("brew").Inc() 320 + brew.RKey = rkey 321 + extractBrewRefRKeys(brew, m) 322 + s.resolveBrewRefsFromWitness(ctx, brew, m) 323 + return brew, nil 324 + } 325 + } 326 + log.Warn().Err(err).Str("rkey", rkey).Msg("witness: failed to convert brew, falling back to PDS") 327 + } else { 328 + metrics.WitnessCacheMissesTotal.WithLabelValues("brew").Inc() 329 + } 330 + 154 331 output, err := s.client.GetRecord(ctx, s.did, s.sessionID, &GetRecordInput{ 155 332 Collection: NSIDBrew, 156 333 RKey: rkey, ··· 199 376 200 377 // GetBrewRecordByRKey fetches a brew by rkey and returns it with its AT Protocol metadata 201 378 func (s *AtprotoStore) GetBrewRecordByRKey(ctx context.Context, rkey string) (*BrewRecord, error) { 379 + // Try witness cache 380 + if wr := s.getFromWitness(ctx, NSIDBrew, rkey); wr != nil { 381 + m, err := witnessRecordToMap(wr) 382 + if err == nil { 383 + brew, err := RecordToBrew(m, wr.URI) 384 + if err == nil { 385 + metrics.WitnessCacheHitsTotal.WithLabelValues("brew").Inc() 386 + brew.RKey = rkey 387 + extractBrewRefRKeys(brew, m) 388 + s.resolveBrewRefsFromWitness(ctx, brew, m) 389 + return &BrewRecord{ 390 + Brew: brew, 391 + URI: wr.URI, 392 + CID: wr.CID, 393 + }, nil 394 + } 395 + } 396 + log.Warn().Err(err).Str("rkey", rkey).Msg("witness: failed to convert brew record, falling back to PDS") 397 + } else { 398 + metrics.WitnessCacheMissesTotal.WithLabelValues("brew").Inc() 399 + } 400 + 202 401 output, err := s.client.GetRecord(ctx, s.did, s.sessionID, &GetRecordInput{ 203 402 Collection: NSIDBrew, 204 403 RKey: rkey, ··· 249 448 return userCache.Brews, nil 250 449 } 251 450 252 - // Use ListAllRecords to handle pagination automatically 253 - output, err := s.client.ListAllRecords(ctx, s.did, s.sessionID, NSIDBrew) 254 - if err != nil { 255 - return nil, fmt.Errorf("failed to list brew records: %w", err) 256 - } 451 + var brews []*models.Brew 257 452 258 - brews := make([]*models.Brew, 0, len(output.Records)) 453 + // Try witness cache 454 + if wRecords := s.listFromWitness(ctx, NSIDBrew); wRecords != nil { 455 + metrics.WitnessCacheHitsTotal.WithLabelValues("brew").Inc() 456 + brews = make([]*models.Brew, 0, len(wRecords)) 457 + for _, wr := range wRecords { 458 + m, err := witnessRecordToMap(wr) 459 + if err != nil { 460 + log.Warn().Err(err).Str("uri", wr.URI).Msg("witness: failed to parse brew") 461 + continue 462 + } 463 + brew, err := RecordToBrew(m, wr.URI) 464 + if err != nil { 465 + log.Warn().Err(err).Str("uri", wr.URI).Msg("witness: failed to convert brew") 466 + continue 467 + } 468 + brew.RKey = wr.RKey 469 + extractBrewRefRKeys(brew, m) 470 + brews = append(brews, brew) 471 + } 472 + } else { 473 + metrics.WitnessCacheMissesTotal.WithLabelValues("brew").Inc() 259 474 260 - for _, rec := range output.Records { 261 - brew, err := RecordToBrew(rec.Value, rec.URI) 475 + // Use ListAllRecords to handle pagination automatically 476 + output, err := s.client.ListAllRecords(ctx, s.did, s.sessionID, NSIDBrew) 262 477 if err != nil { 263 - log.Warn().Err(err).Str("uri", rec.URI).Msg("Failed to convert brew record") 264 - continue 478 + return nil, fmt.Errorf("failed to list brew records: %w", err) 265 479 } 266 480 267 - // Extract rkey from URI 268 - if components, err := ResolveATURI(rec.URI); err == nil { 269 - brew.RKey = components.RKey 270 - } 481 + brews = make([]*models.Brew, 0, len(output.Records)) 482 + 483 + for _, rec := range output.Records { 484 + brew, err := RecordToBrew(rec.Value, rec.URI) 485 + if err != nil { 486 + log.Warn().Err(err).Str("uri", rec.URI).Msg("Failed to convert brew record") 487 + continue 488 + } 489 + 490 + // Extract rkey from URI 491 + if components, err := ResolveATURI(rec.URI); err == nil { 492 + brew.RKey = components.RKey 493 + } 271 494 272 - // Extract rkeys from AT-URI references 273 - extractBrewRefRKeys(brew, rec.Value) 495 + // Extract rkeys from AT-URI references 496 + extractBrewRefRKeys(brew, rec.Value) 274 497 275 - brews = append(brews, brew) 498 + brews = append(brews, brew) 499 + } 276 500 } 277 501 278 502 // Resolve references using cached data instead of N+1 queries ··· 326 550 } 327 551 } 328 552 329 - // Update cache 553 + // Update cache and clear dirty flag since we fetched from PDS 330 554 s.cache.SetBrews(s.sessionID, brews) 555 + s.cache.ClearDirty(s.sessionID, NSIDBrew) 331 556 332 557 return brews, nil 333 558 } ··· 406 631 407 632 // GetBeanRecordByRKey fetches a bean by rkey and returns it with its AT Protocol metadata 408 633 func (s *AtprotoStore) GetBeanRecordByRKey(ctx context.Context, rkey string) (*BeanRecord, error) { 634 + // Try witness cache 635 + if wr := s.getFromWitness(ctx, NSIDBean, rkey); wr != nil { 636 + m, err := witnessRecordToMap(wr) 637 + if err == nil { 638 + bean, err := RecordToBean(m, wr.URI) 639 + if err == nil { 640 + metrics.WitnessCacheHitsTotal.WithLabelValues("bean").Inc() 641 + bean.RKey = rkey 642 + if roasterRef, ok := m["roasterRef"].(string); ok && roasterRef != "" { 643 + if c, err := ResolveATURI(roasterRef); err == nil { 644 + bean.RoasterRKey = c.RKey 645 + } 646 + if roasterWR := s.getWitnessRecordByURI(ctx, roasterRef); roasterWR != nil { 647 + if roasterMap, err := witnessRecordToMap(roasterWR); err == nil { 648 + if roaster, err := RecordToRoaster(roasterMap, roasterWR.URI); err == nil { 649 + roaster.RKey = roasterWR.RKey 650 + bean.Roaster = roaster 651 + } 652 + } 653 + } 654 + } 655 + return &BeanRecord{ 656 + Bean: bean, 657 + URI: wr.URI, 658 + CID: wr.CID, 659 + }, nil 660 + } 661 + } 662 + log.Warn().Err(err).Str("rkey", rkey).Msg("witness: failed to convert bean record, falling back to PDS") 663 + } else { 664 + metrics.WitnessCacheMissesTotal.WithLabelValues("bean").Inc() 665 + } 666 + 409 667 output, err := s.client.GetRecord(ctx, s.did, s.sessionID, &GetRecordInput{ 410 668 Collection: NSIDBean, 411 669 RKey: rkey, ··· 451 709 452 710 // GetRoasterRecordByRKey fetches a roaster by rkey and returns it with its AT Protocol metadata 453 711 func (s *AtprotoStore) GetRoasterRecordByRKey(ctx context.Context, rkey string) (*RoasterRecord, error) { 712 + if wr := s.getFromWitness(ctx, NSIDRoaster, rkey); wr != nil { 713 + m, err := witnessRecordToMap(wr) 714 + if err == nil { 715 + roaster, err := RecordToRoaster(m, wr.URI) 716 + if err == nil { 717 + metrics.WitnessCacheHitsTotal.WithLabelValues("roaster").Inc() 718 + roaster.RKey = rkey 719 + return &RoasterRecord{Roaster: roaster, URI: wr.URI, CID: wr.CID}, nil 720 + } 721 + } 722 + } else { 723 + metrics.WitnessCacheMissesTotal.WithLabelValues("roaster").Inc() 724 + } 725 + 454 726 output, err := s.client.GetRecord(ctx, s.did, s.sessionID, &GetRecordInput{ 455 727 Collection: NSIDRoaster, 456 728 RKey: rkey, ··· 483 755 484 756 // GetGrinderRecordByRKey fetches a grinder by rkey and returns it with its AT Protocol metadata 485 757 func (s *AtprotoStore) GetGrinderRecordByRKey(ctx context.Context, rkey string) (*GrinderRecord, error) { 758 + if wr := s.getFromWitness(ctx, NSIDGrinder, rkey); wr != nil { 759 + m, err := witnessRecordToMap(wr) 760 + if err == nil { 761 + grinder, err := RecordToGrinder(m, wr.URI) 762 + if err == nil { 763 + metrics.WitnessCacheHitsTotal.WithLabelValues("grinder").Inc() 764 + grinder.RKey = rkey 765 + return &GrinderRecord{Grinder: grinder, URI: wr.URI, CID: wr.CID}, nil 766 + } 767 + } 768 + } else { 769 + metrics.WitnessCacheMissesTotal.WithLabelValues("grinder").Inc() 770 + } 771 + 486 772 output, err := s.client.GetRecord(ctx, s.did, s.sessionID, &GetRecordInput{ 487 773 Collection: NSIDGrinder, 488 774 RKey: rkey, ··· 515 801 516 802 // GetBrewerRecordByRKey fetches a brewer by rkey and returns it with its AT Protocol metadata 517 803 func (s *AtprotoStore) GetBrewerRecordByRKey(ctx context.Context, rkey string) (*BrewerRecord, error) { 804 + if wr := s.getFromWitness(ctx, NSIDBrewer, rkey); wr != nil { 805 + m, err := witnessRecordToMap(wr) 806 + if err == nil { 807 + brewer, err := RecordToBrewer(m, wr.URI) 808 + if err == nil { 809 + metrics.WitnessCacheHitsTotal.WithLabelValues("brewer").Inc() 810 + brewer.RKey = rkey 811 + return &BrewerRecord{Brewer: brewer, URI: wr.URI, CID: wr.CID}, nil 812 + } 813 + } 814 + } else { 815 + metrics.WitnessCacheMissesTotal.WithLabelValues("brewer").Inc() 816 + } 817 + 518 818 output, err := s.client.GetRecord(ctx, s.did, s.sessionID, &GetRecordInput{ 519 819 Collection: NSIDBrewer, 520 820 RKey: rkey, ··· 587 887 } 588 888 589 889 func (s *AtprotoStore) GetBeanByRKey(ctx context.Context, rkey string) (*models.Bean, error) { 890 + // Try witness cache 891 + if wr := s.getFromWitness(ctx, NSIDBean, rkey); wr != nil { 892 + m, err := witnessRecordToMap(wr) 893 + if err == nil { 894 + bean, err := RecordToBean(m, wr.URI) 895 + if err == nil { 896 + metrics.WitnessCacheHitsTotal.WithLabelValues("bean").Inc() 897 + bean.RKey = rkey 898 + if roasterRef, ok := m["roasterRef"].(string); ok && roasterRef != "" { 899 + if c, err := ResolveATURI(roasterRef); err == nil { 900 + bean.RoasterRKey = c.RKey 901 + } 902 + // Resolve roaster from witness cache 903 + if roasterWR := s.getWitnessRecordByURI(ctx, roasterRef); roasterWR != nil { 904 + if roasterMap, err := witnessRecordToMap(roasterWR); err == nil { 905 + if roaster, err := RecordToRoaster(roasterMap, roasterWR.URI); err == nil { 906 + roaster.RKey = roasterWR.RKey 907 + bean.Roaster = roaster 908 + } 909 + } 910 + } 911 + } 912 + return bean, nil 913 + } 914 + } 915 + log.Warn().Err(err).Str("rkey", rkey).Msg("witness: failed to convert bean, falling back to PDS") 916 + } else { 917 + metrics.WitnessCacheMissesTotal.WithLabelValues("bean").Inc() 918 + } 919 + 590 920 output, err := s.client.GetRecord(ctx, s.did, s.sessionID, &GetRecordInput{ 591 921 Collection: NSIDBean, 592 922 RKey: rkey, ··· 628 958 return userCache.Beans, nil 629 959 } 630 960 961 + // Try witness cache 962 + if wRecords := s.listFromWitness(ctx, NSIDBean); wRecords != nil { 963 + metrics.WitnessCacheHitsTotal.WithLabelValues("bean").Inc() 964 + beans := make([]*models.Bean, 0, len(wRecords)) 965 + for _, wr := range wRecords { 966 + m, err := witnessRecordToMap(wr) 967 + if err != nil { 968 + log.Warn().Err(err).Str("uri", wr.URI).Msg("witness: failed to parse bean") 969 + continue 970 + } 971 + bean, err := RecordToBean(m, wr.URI) 972 + if err != nil { 973 + log.Warn().Err(err).Str("uri", wr.URI).Msg("witness: failed to convert bean") 974 + continue 975 + } 976 + bean.RKey = wr.RKey 977 + if roasterRef, ok := m["roasterRef"].(string); ok && roasterRef != "" { 978 + if c, err := ResolveATURI(roasterRef); err == nil { 979 + bean.RoasterRKey = c.RKey 980 + } 981 + } 982 + beans = append(beans, bean) 983 + } 984 + s.cache.SetBeans(s.sessionID, beans) 985 + return beans, nil 986 + } 987 + 988 + metrics.WitnessCacheMissesTotal.WithLabelValues("bean").Inc() 989 + 631 990 // Use ListAllRecords to handle pagination automatically 632 991 output, err := s.client.ListAllRecords(ctx, s.did, s.sessionID, NSIDBean) 633 992 if err != nil { ··· 659 1018 beans = append(beans, bean) 660 1019 } 661 1020 662 - // Update cache 1021 + // Update cache and clear dirty flag since we fetched from PDS 663 1022 s.cache.SetBeans(s.sessionID, beans) 1023 + s.cache.ClearDirty(s.sessionID, NSIDBean) 664 1024 665 1025 return beans, nil 666 1026 } ··· 782 1142 } 783 1143 784 1144 func (s *AtprotoStore) GetRoasterByRKey(ctx context.Context, rkey string) (*models.Roaster, error) { 1145 + // Try witness cache 1146 + if wr := s.getFromWitness(ctx, NSIDRoaster, rkey); wr != nil { 1147 + m, err := witnessRecordToMap(wr) 1148 + if err == nil { 1149 + roaster, err := RecordToRoaster(m, wr.URI) 1150 + if err == nil { 1151 + metrics.WitnessCacheHitsTotal.WithLabelValues("roaster").Inc() 1152 + roaster.RKey = rkey 1153 + return roaster, nil 1154 + } 1155 + } 1156 + log.Warn().Err(err).Str("rkey", rkey).Msg("witness: failed to convert roaster, falling back to PDS") 1157 + } else { 1158 + metrics.WitnessCacheMissesTotal.WithLabelValues("roaster").Inc() 1159 + } 1160 + 785 1161 output, err := s.client.GetRecord(ctx, s.did, s.sessionID, &GetRecordInput{ 786 1162 Collection: NSIDRoaster, 787 1163 RKey: rkey, ··· 808 1184 return userCache.Roasters, nil 809 1185 } 810 1186 1187 + // Try witness cache 1188 + if wRecords := s.listFromWitness(ctx, NSIDRoaster); wRecords != nil { 1189 + metrics.WitnessCacheHitsTotal.WithLabelValues("roaster").Inc() 1190 + roasters := make([]*models.Roaster, 0, len(wRecords)) 1191 + for _, wr := range wRecords { 1192 + m, err := witnessRecordToMap(wr) 1193 + if err != nil { 1194 + log.Warn().Err(err).Str("uri", wr.URI).Msg("witness: failed to parse roaster") 1195 + continue 1196 + } 1197 + roaster, err := RecordToRoaster(m, wr.URI) 1198 + if err != nil { 1199 + log.Warn().Err(err).Str("uri", wr.URI).Msg("witness: failed to convert roaster") 1200 + continue 1201 + } 1202 + roaster.RKey = wr.RKey 1203 + roasters = append(roasters, roaster) 1204 + } 1205 + s.cache.SetRoasters(s.sessionID, roasters) 1206 + return roasters, nil 1207 + } 1208 + 1209 + metrics.WitnessCacheMissesTotal.WithLabelValues("roaster").Inc() 1210 + 811 1211 // Use ListAllRecords to handle pagination automatically 812 1212 output, err := s.client.ListAllRecords(ctx, s.did, s.sessionID, NSIDRoaster) 813 1213 if err != nil { ··· 831 1231 roasters = append(roasters, roaster) 832 1232 } 833 1233 834 - // Update cache 1234 + // Update cache and clear dirty flag since we fetched from PDS 835 1235 s.cache.SetRoasters(s.sessionID, roasters) 1236 + s.cache.ClearDirty(s.sessionID, NSIDRoaster) 836 1237 837 1238 return roasters, nil 838 1239 } ··· 928 1329 } 929 1330 930 1331 func (s *AtprotoStore) GetGrinderByRKey(ctx context.Context, rkey string) (*models.Grinder, error) { 1332 + // Try witness cache 1333 + if wr := s.getFromWitness(ctx, NSIDGrinder, rkey); wr != nil { 1334 + m, err := witnessRecordToMap(wr) 1335 + if err == nil { 1336 + grinder, err := RecordToGrinder(m, wr.URI) 1337 + if err == nil { 1338 + metrics.WitnessCacheHitsTotal.WithLabelValues("grinder").Inc() 1339 + grinder.RKey = rkey 1340 + return grinder, nil 1341 + } 1342 + } 1343 + log.Warn().Err(err).Str("rkey", rkey).Msg("witness: failed to convert grinder, falling back to PDS") 1344 + } else { 1345 + metrics.WitnessCacheMissesTotal.WithLabelValues("grinder").Inc() 1346 + } 1347 + 931 1348 output, err := s.client.GetRecord(ctx, s.did, s.sessionID, &GetRecordInput{ 932 1349 Collection: NSIDGrinder, 933 1350 RKey: rkey, ··· 954 1371 return userCache.Grinders, nil 955 1372 } 956 1373 1374 + // Try witness cache 1375 + if wRecords := s.listFromWitness(ctx, NSIDGrinder); wRecords != nil { 1376 + metrics.WitnessCacheHitsTotal.WithLabelValues("grinder").Inc() 1377 + grinders := make([]*models.Grinder, 0, len(wRecords)) 1378 + for _, wr := range wRecords { 1379 + m, err := witnessRecordToMap(wr) 1380 + if err != nil { 1381 + log.Warn().Err(err).Str("uri", wr.URI).Msg("witness: failed to parse grinder") 1382 + continue 1383 + } 1384 + grinder, err := RecordToGrinder(m, wr.URI) 1385 + if err != nil { 1386 + log.Warn().Err(err).Str("uri", wr.URI).Msg("witness: failed to convert grinder") 1387 + continue 1388 + } 1389 + grinder.RKey = wr.RKey 1390 + grinders = append(grinders, grinder) 1391 + } 1392 + s.cache.SetGrinders(s.sessionID, grinders) 1393 + return grinders, nil 1394 + } 1395 + 1396 + metrics.WitnessCacheMissesTotal.WithLabelValues("grinder").Inc() 1397 + 957 1398 // Use ListAllRecords to handle pagination automatically 958 1399 output, err := s.client.ListAllRecords(ctx, s.did, s.sessionID, NSIDGrinder) 959 1400 if err != nil { ··· 977 1418 grinders = append(grinders, grinder) 978 1419 } 979 1420 980 - // Update cache 1421 + // Update cache and clear dirty flag since we fetched from PDS 981 1422 s.cache.SetGrinders(s.sessionID, grinders) 1423 + s.cache.ClearDirty(s.sessionID, NSIDGrinder) 982 1424 983 1425 return grinders, nil 984 1426 } ··· 1074 1516 } 1075 1517 1076 1518 func (s *AtprotoStore) GetBrewerByRKey(ctx context.Context, rkey string) (*models.Brewer, error) { 1519 + // Try witness cache 1520 + if wr := s.getFromWitness(ctx, NSIDBrewer, rkey); wr != nil { 1521 + m, err := witnessRecordToMap(wr) 1522 + if err == nil { 1523 + brewer, err := RecordToBrewer(m, wr.URI) 1524 + if err == nil { 1525 + metrics.WitnessCacheHitsTotal.WithLabelValues("brewer").Inc() 1526 + brewer.RKey = rkey 1527 + return brewer, nil 1528 + } 1529 + } 1530 + log.Warn().Err(err).Str("rkey", rkey).Msg("witness: failed to convert brewer, falling back to PDS") 1531 + } else { 1532 + metrics.WitnessCacheMissesTotal.WithLabelValues("brewer").Inc() 1533 + } 1534 + 1077 1535 output, err := s.client.GetRecord(ctx, s.did, s.sessionID, &GetRecordInput{ 1078 1536 Collection: NSIDBrewer, 1079 1537 RKey: rkey, ··· 1100 1558 return userCache.Brewers, nil 1101 1559 } 1102 1560 1561 + // Try witness cache 1562 + if wRecords := s.listFromWitness(ctx, NSIDBrewer); wRecords != nil { 1563 + metrics.WitnessCacheHitsTotal.WithLabelValues("brewer").Inc() 1564 + brewers := make([]*models.Brewer, 0, len(wRecords)) 1565 + for _, wr := range wRecords { 1566 + m, err := witnessRecordToMap(wr) 1567 + if err != nil { 1568 + log.Warn().Err(err).Str("uri", wr.URI).Msg("witness: failed to parse brewer") 1569 + continue 1570 + } 1571 + brewer, err := RecordToBrewer(m, wr.URI) 1572 + if err != nil { 1573 + log.Warn().Err(err).Str("uri", wr.URI).Msg("witness: failed to convert brewer") 1574 + continue 1575 + } 1576 + brewer.RKey = wr.RKey 1577 + brewers = append(brewers, brewer) 1578 + } 1579 + s.cache.SetBrewers(s.sessionID, brewers) 1580 + return brewers, nil 1581 + } 1582 + 1583 + metrics.WitnessCacheMissesTotal.WithLabelValues("brewer").Inc() 1584 + 1103 1585 // Use ListAllRecords to handle pagination automatically 1104 1586 output, err := s.client.ListAllRecords(ctx, s.did, s.sessionID, NSIDBrewer) 1105 1587 if err != nil { ··· 1123 1605 brewers = append(brewers, brewer) 1124 1606 } 1125 1607 1126 - // Update cache 1608 + // Update cache and clear dirty flag since we fetched from PDS 1127 1609 s.cache.SetBrewers(s.sessionID, brewers) 1610 + s.cache.ClearDirty(s.sessionID, NSIDBrewer) 1128 1611 1129 1612 return brewers, nil 1130 1613 } ··· 1240 1723 } 1241 1724 1242 1725 func (s *AtprotoStore) GetRecipeByRKey(ctx context.Context, rkey string) (*models.Recipe, error) { 1726 + // Try witness cache 1727 + if wr := s.getFromWitness(ctx, NSIDRecipe, rkey); wr != nil { 1728 + m, err := witnessRecordToMap(wr) 1729 + if err == nil { 1730 + recipe, err := RecordToRecipe(m, wr.URI) 1731 + if err == nil { 1732 + metrics.WitnessCacheHitsTotal.WithLabelValues("recipe").Inc() 1733 + recipe.RKey = rkey 1734 + if brewerRef, ok := m["brewerRef"].(string); ok && brewerRef != "" { 1735 + if c, err := ResolveATURI(brewerRef); err == nil { 1736 + recipe.BrewerRKey = c.RKey 1737 + } 1738 + if brewerWR := s.getWitnessRecordByURI(ctx, brewerRef); brewerWR != nil { 1739 + if brewerMap, err := witnessRecordToMap(brewerWR); err == nil { 1740 + if brewer, err := RecordToBrewer(brewerMap, brewerWR.URI); err == nil { 1741 + brewer.RKey = brewerWR.RKey 1742 + recipe.BrewerObj = brewer 1743 + } 1744 + } 1745 + } 1746 + } 1747 + return recipe, nil 1748 + } 1749 + } 1750 + log.Warn().Err(err).Str("rkey", rkey).Msg("witness: failed to convert recipe, falling back to PDS") 1751 + } else { 1752 + metrics.WitnessCacheMissesTotal.WithLabelValues("recipe").Inc() 1753 + } 1754 + 1243 1755 output, err := s.client.GetRecord(ctx, s.did, s.sessionID, &GetRecordInput{ 1244 1756 Collection: NSIDRecipe, 1245 1757 RKey: rkey, ··· 1272 1784 1273 1785 // GetRecipeRecordByRKey fetches a recipe by rkey and returns it with its AT Protocol metadata 1274 1786 func (s *AtprotoStore) GetRecipeRecordByRKey(ctx context.Context, rkey string) (*RecipeRecord, error) { 1787 + // Try witness cache 1788 + if wr := s.getFromWitness(ctx, NSIDRecipe, rkey); wr != nil { 1789 + m, err := witnessRecordToMap(wr) 1790 + if err == nil { 1791 + recipe, err := RecordToRecipe(m, wr.URI) 1792 + if err == nil { 1793 + metrics.WitnessCacheHitsTotal.WithLabelValues("recipe").Inc() 1794 + recipe.RKey = rkey 1795 + if brewerRef, ok := m["brewerRef"].(string); ok && brewerRef != "" { 1796 + if c, err := ResolveATURI(brewerRef); err == nil { 1797 + recipe.BrewerRKey = c.RKey 1798 + } 1799 + if brewerWR := s.getWitnessRecordByURI(ctx, brewerRef); brewerWR != nil { 1800 + if brewerMap, err := witnessRecordToMap(brewerWR); err == nil { 1801 + if brewer, err := RecordToBrewer(brewerMap, brewerWR.URI); err == nil { 1802 + brewer.RKey = brewerWR.RKey 1803 + recipe.BrewerObj = brewer 1804 + } 1805 + } 1806 + } 1807 + } 1808 + return &RecipeRecord{ 1809 + Recipe: recipe, 1810 + URI: wr.URI, 1811 + CID: wr.CID, 1812 + }, nil 1813 + } 1814 + } 1815 + log.Warn().Err(err).Str("rkey", rkey).Msg("witness: failed to convert recipe record, falling back to PDS") 1816 + } else { 1817 + metrics.WitnessCacheMissesTotal.WithLabelValues("recipe").Inc() 1818 + } 1819 + 1275 1820 output, err := s.client.GetRecord(ctx, s.did, s.sessionID, &GetRecordInput{ 1276 1821 Collection: NSIDRecipe, 1277 1822 RKey: rkey, ··· 1312 1857 return userCache.Recipes, nil 1313 1858 } 1314 1859 1860 + // Try witness cache 1861 + if wRecords := s.listFromWitness(ctx, NSIDRecipe); wRecords != nil { 1862 + metrics.WitnessCacheHitsTotal.WithLabelValues("recipe").Inc() 1863 + recipes := make([]*models.Recipe, 0, len(wRecords)) 1864 + for _, wr := range wRecords { 1865 + m, err := witnessRecordToMap(wr) 1866 + if err != nil { 1867 + log.Warn().Err(err).Str("uri", wr.URI).Msg("witness: failed to parse recipe") 1868 + continue 1869 + } 1870 + recipe, err := RecordToRecipe(m, wr.URI) 1871 + if err != nil { 1872 + log.Warn().Err(err).Str("uri", wr.URI).Msg("witness: failed to convert recipe") 1873 + continue 1874 + } 1875 + recipe.RKey = wr.RKey 1876 + if brewerRef, ok := m["brewerRef"].(string); ok && brewerRef != "" { 1877 + if c, err := ResolveATURI(brewerRef); err == nil { 1878 + recipe.BrewerRKey = c.RKey 1879 + } 1880 + } 1881 + recipes = append(recipes, recipe) 1882 + } 1883 + s.cache.SetRecipes(s.sessionID, recipes) 1884 + return recipes, nil 1885 + } 1886 + 1887 + metrics.WitnessCacheMissesTotal.WithLabelValues("recipe").Inc() 1888 + 1315 1889 output, err := s.client.ListAllRecords(ctx, s.did, s.sessionID, NSIDRecipe) 1316 1890 if err != nil { 1317 1891 return nil, fmt.Errorf("failed to list recipe records: %w", err) ··· 1340 1914 recipes = append(recipes, recipe) 1341 1915 } 1342 1916 1917 + // Clear dirty flag since we fetched from PDS 1343 1918 s.cache.SetRecipes(s.sessionID, recipes) 1919 + s.cache.ClearDirty(s.sessionID, NSIDRecipe) 1344 1920 1345 1921 return recipes, nil 1346 1922 }
+33
internal/atproto/witness.go
··· 1 + package atproto 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "time" 7 + ) 8 + 9 + // WitnessRecord is a record retrieved from the witness cache (firehose index). 10 + type WitnessRecord struct { 11 + URI string 12 + DID string 13 + Collection string 14 + RKey string 15 + CID string 16 + Record json.RawMessage 17 + IndexedAt time.Time 18 + CreatedAt time.Time 19 + } 20 + 21 + // WitnessCache is a read-only view of the firehose index that lets AtprotoStore 22 + // serve reads from the locally-indexed SQLite database instead of the PDS. 23 + // Implementations must be safe for concurrent use. 24 + type WitnessCache interface { 25 + // GetWitnessRecord retrieves a single record by AT-URI. 26 + // Returns (nil, nil) when the record is not present in the cache. 27 + GetWitnessRecord(ctx context.Context, uri string) (*WitnessRecord, error) 28 + 29 + // ListWitnessRecords returns all cached records for a DID+collection pair, 30 + // ordered by created_at descending. 31 + // Returns an empty (non-nil) slice when none are found. 32 + ListWitnessRecords(ctx context.Context, did, collection string) ([]*WitnessRecord, error) 33 + }
+10
internal/feed/service.go
··· 225 225 return "" 226 226 } 227 227 228 + // InvalidatePublicFeedCache clears the cached public feed so the next request 229 + // re-queries the firehose index. Call this after any record is created, updated, 230 + // or deleted so unauthenticated users don't see stale content. 231 + func (s *Service) InvalidatePublicFeedCache() { 232 + s.cache.mu.Lock() 233 + s.cache.items = nil 234 + s.cache.expiresAt = time.Time{} 235 + s.cache.mu.Unlock() 236 + } 237 + 228 238 // GetCachedPublicFeed returns cached feed items for unauthenticated users. 229 239 // It returns up to PublicFeedLimit items from the cache, refreshing if expired. 230 240 // The cache stores PublicFeedCacheSize items internally but only returns PublicFeedLimit.
+59
internal/firehose/index.go
··· 105 105 CREATE INDEX IF NOT EXISTS idx_records_created ON records(created_at DESC); 106 106 CREATE INDEX IF NOT EXISTS idx_records_did ON records(did); 107 107 CREATE INDEX IF NOT EXISTS idx_records_coll_created ON records(collection, created_at DESC); 108 + CREATE INDEX IF NOT EXISTS idx_records_did_coll ON records(did, collection, created_at DESC); 108 109 109 110 CREATE TABLE IF NOT EXISTS meta ( 110 111 key TEXT PRIMARY KEY, ··· 276 277 return idx, nil 277 278 } 278 279 280 + // Compile-time check: FeedIndex must satisfy the atproto.WitnessCache interface. 281 + var _ atproto.WitnessCache = (*FeedIndex)(nil) 282 + 279 283 // DB returns the underlying database connection for shared use by other stores. 280 284 func (idx *FeedIndex) DB() *sql.DB { 281 285 return idx.db 286 + } 287 + 288 + // GetWitnessRecord retrieves a single record by AT-URI from the index. 289 + // Returns (nil, nil) when the record is not found. 290 + func (idx *FeedIndex) GetWitnessRecord(ctx context.Context, uri string) (*atproto.WitnessRecord, error) { 291 + var rec atproto.WitnessRecord 292 + var recordStr, indexedAtStr, createdAtStr string 293 + 294 + err := idx.db.QueryRowContext(ctx, ` 295 + SELECT uri, did, collection, rkey, record, cid, indexed_at, created_at 296 + FROM records WHERE uri = ? 297 + `, uri).Scan(&rec.URI, &rec.DID, &rec.Collection, &rec.RKey, 298 + &recordStr, &rec.CID, &indexedAtStr, &createdAtStr) 299 + if err == sql.ErrNoRows { 300 + return nil, nil 301 + } 302 + if err != nil { 303 + return nil, err 304 + } 305 + rec.Record = json.RawMessage(recordStr) 306 + rec.IndexedAt, _ = time.Parse(time.RFC3339Nano, indexedAtStr) 307 + rec.CreatedAt, _ = time.Parse(time.RFC3339Nano, createdAtStr) 308 + return &rec, nil 309 + } 310 + 311 + // ListWitnessRecords returns all indexed records for a DID+collection pair, 312 + // ordered by created_at descending. Returns an empty slice when none are found. 313 + func (idx *FeedIndex) ListWitnessRecords(ctx context.Context, did, collection string) ([]*atproto.WitnessRecord, error) { 314 + rows, err := idx.db.QueryContext(ctx, ` 315 + SELECT uri, did, collection, rkey, record, cid, indexed_at, created_at 316 + FROM records WHERE did = ? AND collection = ? 317 + ORDER BY created_at DESC 318 + `, did, collection) 319 + if err != nil { 320 + return nil, err 321 + } 322 + defer rows.Close() 323 + 324 + records := make([]*atproto.WitnessRecord, 0) 325 + for rows.Next() { 326 + var rec atproto.WitnessRecord 327 + var recordStr, indexedAtStr, createdAtStr string 328 + if err := rows.Scan(&rec.URI, &rec.DID, &rec.Collection, &rec.RKey, 329 + &recordStr, &rec.CID, &indexedAtStr, &createdAtStr); err != nil { 330 + continue 331 + } 332 + rec.Record = json.RawMessage(recordStr) 333 + rec.IndexedAt, _ = time.Parse(time.RFC3339Nano, indexedAtStr) 334 + rec.CreatedAt, _ = time.Parse(time.RFC3339Nano, createdAtStr) 335 + records = append(records, &rec) 336 + } 337 + if err := rows.Err(); err != nil { 338 + return nil, err 339 + } 340 + return records, nil 282 341 } 283 342 284 343 // Close closes the index database
+4
internal/handlers/brew.go
··· 579 579 return 580 580 } 581 581 582 + h.invalidateFeedCache() 583 + 582 584 // Redirect to brew list 583 585 w.Header().Set("HX-Redirect", "/brews") 584 586 w.WriteHeader(http.StatusOK) ··· 673 675 handleStoreError(w, err, "Failed to update brew") 674 676 return 675 677 } 678 + 679 + h.invalidateFeedCache() 676 680 677 681 // Redirect to brew list 678 682 w.Header().Set("HX-Redirect", "/brews")
+8
internal/handlers/entities.go
··· 230 230 return 231 231 } 232 232 233 + h.invalidateFeedCache() 233 234 writeJSON(w, bean, "bean") 234 235 } 235 236 ··· 273 274 return 274 275 } 275 276 277 + h.invalidateFeedCache() 276 278 writeJSON(w, roaster, "roaster") 277 279 } 278 280 ··· 366 368 return 367 369 } 368 370 371 + h.invalidateFeedCache() 369 372 writeJSON(w, bean, "bean") 370 373 } 371 374 ··· 429 432 return 430 433 } 431 434 435 + h.invalidateFeedCache() 432 436 writeJSON(w, roaster, "roaster") 433 437 } 434 438 ··· 482 486 return 483 487 } 484 488 489 + h.invalidateFeedCache() 485 490 writeJSON(w, grinder, "grinder") 486 491 } 487 492 ··· 536 541 return 537 542 } 538 543 544 + h.invalidateFeedCache() 539 545 writeJSON(w, grinder, "grinder") 540 546 } 541 547 ··· 588 594 return 589 595 } 590 596 597 + h.invalidateFeedCache() 591 598 writeJSON(w, brewer, "brewer") 592 599 } 593 600 ··· 641 648 return 642 649 } 643 650 651 + h.invalidateFeedCache() 644 652 writeJSON(w, brewer, "brewer") 645 653 } 646 654
+19 -1
internal/handlers/handlers.go
··· 44 44 config Config 45 45 feedService *feed.Service 46 46 feedRegistry *feed.Registry 47 - feedIndex *firehose.FeedIndex 47 + feedIndex *firehose.FeedIndex 48 + witnessCache atproto.WitnessCache 48 49 49 50 // Moderation dependencies (optional) 50 51 moderationService *moderation.Service ··· 83 84 h.feedIndex = idx 84 85 } 85 86 87 + // SetWitnessCache configures the handler to use the witness cache for cache-first reads. 88 + func (h *Handler) SetWitnessCache(wc atproto.WitnessCache) { 89 + h.witnessCache = wc 90 + } 91 + 86 92 // SetModeration configures the handler with moderation service and store 87 93 func (h *Handler) SetModeration(svc *moderation.Service, store moderation.Store) { 88 94 h.moderationService = svc ··· 98 104 } 99 105 100 106 107 + // invalidateFeedCache clears the public feed cache after a mutation. 108 + func (h *Handler) invalidateFeedCache() { 109 + if h.feedService != nil { 110 + h.feedService.InvalidatePublicFeedCache() 111 + } 112 + } 113 + 101 114 // validateRKey validates and returns an rkey from a path parameter. 102 115 // Returns the rkey if valid, or writes an error response and returns empty string if invalid. 103 116 func validateRKey(w http.ResponseWriter, rkey string) string { ··· 208 221 } 209 222 210 223 // Create user-scoped atproto store with injected cache 224 + if h.witnessCache != nil { 225 + store := atproto.NewAtprotoStoreWithWitness(h.atprotoClient, did, sessionID, h.sessionCache, h.witnessCache) 226 + return store, true 227 + } 211 228 store := atproto.NewAtprotoStore(h.atprotoClient, did, sessionID, h.sessionCache) 212 229 return store, true 213 230 } ··· 249 266 handleStoreError(w, err, "Failed to delete "+entityName) 250 267 return 251 268 } 269 + h.invalidateFeedCache() 252 270 w.WriteHeader(http.StatusOK) 253 271 } 254 272
+4
internal/handlers/recipe.go
··· 67 67 return 68 68 } 69 69 70 + h.invalidateFeedCache() 70 71 writeJSON(w, recipe, "recipe") 71 72 } 72 73 ··· 128 129 return 129 130 } 130 131 132 + h.invalidateFeedCache() 131 133 w.WriteHeader(http.StatusOK) 132 134 } 133 135 ··· 150 152 return 151 153 } 152 154 155 + h.invalidateFeedCache() 153 156 w.WriteHeader(http.StatusOK) 154 157 } 155 158 ··· 247 250 return 248 251 } 249 252 253 + h.invalidateFeedCache() 250 254 writeJSON(w, recipe, "recipe") 251 255 } 252 256
+13
internal/metrics/metrics.go
··· 45 45 }, []string{"method", "collection"}) 46 46 ) 47 47 48 + // Witness cache metrics 49 + var ( 50 + WitnessCacheHitsTotal = promauto.NewCounterVec(prometheus.CounterOpts{ 51 + Name: "arabica_witness_cache_hits_total", 52 + Help: "Total witness cache hits (PDS request avoided)", 53 + }, []string{"collection"}) 54 + 55 + WitnessCacheMissesTotal = promauto.NewCounterVec(prometheus.CounterOpts{ 56 + Name: "arabica_witness_cache_misses_total", 57 + Help: "Total witness cache misses (fell back to PDS)", 58 + }, []string{"collection"}) 59 + ) 60 + 48 61 // Feed metrics 49 62 var ( 50 63 FeedCacheHitsTotal = promauto.NewCounter(prometheus.CounterOpts{