Coffee journaling on ATProto (alpha) alpha.arabica.social
coffee
17
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: handle resolution cache race condition #19

open opened by pdewey.com targeting main from push-rkwvvmrkprrl
Labels

None yet.

assignee

None yet.

Participants 1
AT URI
at://did:plc:hm5f3dnm6jdhrc55qp2npdja/sh.tangled.repo.pull/3mlc7tacyow22
+236 -44
Diff #0
+33
internal/atproto/public_client.go
··· 92 92 return did, nil 93 93 } 94 94 95 + // InvalidateHandle removes a handle from the resolver cache so the next 96 + // ResolveHandle call refetches from the directory. Called when a firehose 97 + // identity event signals that a handle's DID mapping has changed. 98 + func (c *PublicClient) InvalidateHandle(handle string) { 99 + if handle == "" { 100 + return 101 + } 102 + c.handleMu.Lock() 103 + delete(c.handleCache, handle) 104 + c.handleMu.Unlock() 105 + } 106 + 107 + // InvalidateDID drops any cached entries pointing at this DID โ€” both the 108 + // PDS endpoint cache and any handleโ†’DID mappings whose resolved DID is the 109 + // given one. Used when a DID's repo is gone (account deleted/takendown) or 110 + // when a handle has been reassigned away from this DID. 111 + func (c *PublicClient) InvalidateDID(did string) { 112 + if did == "" { 113 + return 114 + } 115 + c.pdsMu.Lock() 116 + delete(c.pdsCache, did) 117 + c.pdsMu.Unlock() 118 + 119 + c.handleMu.Lock() 120 + for h, v := range c.handleCache { 121 + if v.value == did { 122 + delete(c.handleCache, h) 123 + } 124 + } 125 + c.handleMu.Unlock() 126 + } 127 + 95 128 // PublicListRecordsOutput represents the response from public listRecords API. 96 129 type PublicListRecordsOutput struct { 97 130 Records []PublicRecordEntry `json:"records"`
+143 -39
internal/firehose/index.go
··· 128 128 expires_at TEXT NOT NULL 129 129 ); 130 130 131 + CREATE TABLE IF NOT EXISTS did_by_handle ( 132 + handle TEXT PRIMARY KEY, 133 + did TEXT NOT NULL, 134 + updated_at TEXT NOT NULL 135 + ); 136 + CREATE INDEX IF NOT EXISTS idx_did_by_handle_did ON did_by_handle(did); 137 + 131 138 CREATE TABLE IF NOT EXISTS likes ( 132 139 subject_uri TEXT NOT NULL, 133 140 actor_did TEXT NOT NULL, ··· 289 296 profileCache: make(map[string]*CachedProfile), 290 297 } 291 298 299 + // One-time backfill: populate did_by_handle from any pre-existing profile rows 300 + // so handle resolution works for users observed before this table existed. 301 + if err := idx.backfillHandleIndex(); err != nil { 302 + log.Warn().Err(err).Msg("did_by_handle backfill failed; lookups will populate lazily") 303 + } 304 + 292 305 // If the database already has records from a previous run, mark ready immediately 293 306 // so the feed is served from persisted data while the firehose reconnects. 294 307 var count int ··· 299 312 return idx, nil 300 313 } 301 314 315 + // backfillHandleIndex populates did_by_handle from the profiles table. Idempotent. 316 + // Iterates every cached profile and inserts (handle, did) โ€” last writer wins, 317 + // matching the live storeProfile semantics, so a handle that existed on multiple 318 + // DIDs resolves to whichever profile was inserted most recently in the iteration. 319 + func (idx *FeedIndex) backfillHandleIndex() error { 320 + var n int 321 + if err := idx.db.QueryRow(`SELECT COUNT(*) FROM did_by_handle`).Scan(&n); err == nil && n > 0 { 322 + return nil 323 + } 324 + 325 + rows, err := idx.db.Query(`SELECT did, data FROM profiles`) 326 + if err != nil { 327 + return err 328 + } 329 + defer rows.Close() 330 + 331 + now := time.Now().Format(time.RFC3339Nano) 332 + for rows.Next() { 333 + var did, dataStr string 334 + if err := rows.Scan(&did, &dataStr); err != nil { 335 + continue 336 + } 337 + cached := &CachedProfile{} 338 + if err := json.Unmarshal([]byte(dataStr), cached); err != nil || cached.Profile == nil || cached.Profile.Handle == "" { 339 + continue 340 + } 341 + _, _ = idx.db.Exec( 342 + `INSERT OR REPLACE INTO did_by_handle (handle, did, updated_at) VALUES (?, ?, ?)`, 343 + cached.Profile.Handle, did, now) 344 + } 345 + return rows.Err() 346 + } 347 + 302 348 // Compile-time check: FeedIndex must satisfy the atproto.WitnessCache interface. 303 349 var _ atproto.WitnessCache = (*FeedIndex)(nil) 304 350 ··· 490 536 {`DELETE FROM notifications WHERE target_did = ? OR actor_did = ?`, []any{did, did}}, 491 537 {`DELETE FROM notifications_meta WHERE target_did = ?`, []any{did}}, 492 538 {`DELETE FROM profiles WHERE did = ?`, []any{did}}, 539 + {`DELETE FROM did_by_handle WHERE did = ?`, []any{did}}, 493 540 {`DELETE FROM known_dids WHERE did = ?`, []any{did}}, 494 541 {`DELETE FROM registered_dids WHERE did = ?`, []any{did}}, 495 542 {`DELETE FROM backfilled WHERE did = ?`, []any{did}}, ··· 1124 1171 return profile, nil 1125 1172 } 1126 1173 1127 - // storeProfile writes a profile to both in-memory and persistent caches. 1174 + // storeProfile writes a profile to both in-memory and persistent caches, and 1175 + // maintains the did_by_handle index so handle lookups stay accurate across 1176 + // handle changes and handle reassignment between DIDs. 1128 1177 func (idx *FeedIndex) storeProfile(ctx context.Context, did string, profile *atproto.Profile) { 1129 1178 now := time.Now() 1130 1179 cached := &CachedProfile{ ··· 1140 1189 data, _ := json.Marshal(cached) 1141 1190 _, _ = idx.db.ExecContext(ctx, `INSERT OR REPLACE INTO profiles (did, data, expires_at) VALUES (?, ?, ?)`, 1142 1191 did, string(data), cached.ExpiresAt.Format(time.RFC3339Nano)) 1143 - } 1144 1192 1145 - // GetDIDByHandle looks up a DID from the profile cache by handle. 1146 - // Returns the DID and true if found, or empty string and false if not cached. 1147 - // This avoids a ResolveHandle API call for known Arabica users. 1148 - func (idx *FeedIndex) GetDIDByHandle(ctx context.Context, handle string) (string, bool) { 1149 - // Check in-memory cache first 1150 - idx.profileCacheMu.RLock() 1151 - for did, cached := range idx.profileCache { 1152 - if cached.Profile != nil && cached.Profile.Handle == handle && time.Now().Before(cached.ExpiresAt) { 1153 - idx.profileCacheMu.RUnlock() 1154 - return did, true 1155 - } 1193 + if profile != nil && profile.Handle != "" { 1194 + // Drop any prior row pointing this DID at a different handle (handle change). 1195 + _, _ = idx.db.ExecContext(ctx, 1196 + `DELETE FROM did_by_handle WHERE did = ? AND handle != ?`, did, profile.Handle) 1197 + // Last writer wins on handle โ€” this naturally resolves handle reassignment 1198 + // from an old DID to a new one, since the new profile's INSERT OR REPLACE 1199 + // overwrites the old DID's mapping. 1200 + _, _ = idx.db.ExecContext(ctx, 1201 + `INSERT OR REPLACE INTO did_by_handle (handle, did, updated_at) VALUES (?, ?, ?)`, 1202 + profile.Handle, did, now.Format(time.RFC3339Nano)) 1156 1203 } 1157 - idx.profileCacheMu.RUnlock() 1204 + } 1158 1205 1159 - // Check persistent store 1160 - rows, err := idx.db.QueryContext(ctx, `SELECT did, data FROM profiles`) 1161 - if err != nil { 1206 + // GetDIDByHandle looks up a DID from the handle index. Returns the DID and 1207 + // true if found, or empty string and false if not indexed. 1208 + // 1209 + // Backed by the did_by_handle table โ€” last-writer-wins, so a handle that has 1210 + // been reassigned to a new DID resolves to that new DID once the new profile 1211 + // is observed (via the firehose profile watcher or a GetProfile call). 1212 + func (idx *FeedIndex) GetDIDByHandle(ctx context.Context, handle string) (string, bool) { 1213 + var did string 1214 + err := idx.db.QueryRowContext(ctx, 1215 + `SELECT did FROM did_by_handle WHERE handle = ?`, handle).Scan(&did) 1216 + if err != nil || did == "" { 1162 1217 return "", false 1163 1218 } 1164 - defer rows.Close() 1165 - 1166 - for rows.Next() { 1167 - var did, dataStr string 1168 - if err := rows.Scan(&did, &dataStr); err != nil { 1169 - continue 1170 - } 1171 - cached := &CachedProfile{} 1172 - if err := json.Unmarshal([]byte(dataStr), cached); err != nil || cached.Profile == nil { 1173 - continue 1174 - } 1175 - if cached.Profile.Handle == handle { 1176 - // Promote to in-memory cache 1177 - cached.ExpiresAt = time.Now().Add(idx.profileTTL) 1178 - idx.profileCacheMu.Lock() 1179 - idx.profileCache[did] = cached 1180 - idx.profileCacheMu.Unlock() 1181 - return did, true 1182 - } 1183 - } 1184 - 1185 - return "", false 1219 + return did, true 1186 1220 } 1187 1221 1188 1222 // InvalidateProfile removes a DID's profile from both the in-memory and persistent ··· 1193 1227 idx.profileCacheMu.Unlock() 1194 1228 1195 1229 _, _ = idx.db.Exec(`DELETE FROM profiles WHERE did = ?`, did) 1230 + _, _ = idx.db.Exec(`DELETE FROM did_by_handle WHERE did = ?`, did) 1196 1231 } 1197 1232 1198 1233 // RefreshProfile fetches a profile from the API and stores it in both caches. ··· 1208 1243 idx.storeProfile(ctx, did, profile) 1209 1244 } 1210 1245 1246 + // InvalidatePublicCachesForDID drops the public client's cached PDS endpoint 1247 + // and any handleโ†’DID mappings pointing at this DID. Used when an account is 1248 + // deleted/takendown so subsequent lookups don't keep hitting the tombstoned DID. 1249 + func (idx *FeedIndex) InvalidatePublicCachesForDID(did string) { 1250 + if idx.publicClient != nil { 1251 + idx.publicClient.InvalidateDID(did) 1252 + } 1253 + } 1254 + 1255 + // OnIdentityEvent reconciles caches when a Jetstream identity event reports 1256 + // that a DID's handle has changed. It is the only path through which a handle 1257 + // can be reassigned from one DID to another (handle release + reclaim by a 1258 + // different account), so this is where stale mappings must be evicted. 1259 + // 1260 + // Steps: 1261 + // 1. Look up this DID's previously cached handle (the old handle). 1262 + // 2. Find any *other* DID whose cached profile still claims the new handle โ€” 1263 + // that's the prior owner; invalidate its profile and resolver entries. 1264 + // 3. Drop the old handle from the resolver cache (it may now resolve to 1265 + // someone else, or to nothing). 1266 + // 4. Drop the new handle from the resolver cache so the next ResolveHandle 1267 + // re-fetches from the directory. 1268 + // 5. Refresh this DID's profile via the API; storeProfile then writes the 1269 + // authoritative did_by_handle row. 1270 + func (idx *FeedIndex) OnIdentityEvent(ctx context.Context, did, newHandle string) { 1271 + var oldHandle string 1272 + idx.profileCacheMu.RLock() 1273 + if cached, ok := idx.profileCache[did]; ok && cached.Profile != nil { 1274 + oldHandle = cached.Profile.Handle 1275 + } 1276 + idx.profileCacheMu.RUnlock() 1277 + if oldHandle == "" { 1278 + // Fall back to persistent store. 1279 + var dataStr string 1280 + if err := idx.db.QueryRowContext(ctx, `SELECT data FROM profiles WHERE did = ?`, did).Scan(&dataStr); err == nil { 1281 + cached := &CachedProfile{} 1282 + if err := json.Unmarshal([]byte(dataStr), cached); err == nil && cached.Profile != nil { 1283 + oldHandle = cached.Profile.Handle 1284 + } 1285 + } 1286 + } 1287 + 1288 + if newHandle != "" { 1289 + // Evict any prior owner of newHandle (other than `did` itself). 1290 + var priorDID string 1291 + err := idx.db.QueryRowContext(ctx, 1292 + `SELECT did FROM did_by_handle WHERE handle = ? AND did != ?`, newHandle, did).Scan(&priorDID) 1293 + if err == nil && priorDID != "" { 1294 + log.Warn(). 1295 + Str("handle", newHandle). 1296 + Str("prior_did", priorDID). 1297 + Str("new_did", did). 1298 + Msg("identity event: handle reassigned, invalidating prior owner") 1299 + idx.InvalidateProfile(priorDID) 1300 + idx.publicClient.InvalidateDID(priorDID) 1301 + } 1302 + } 1303 + 1304 + if oldHandle != "" && oldHandle != newHandle { 1305 + idx.publicClient.InvalidateHandle(oldHandle) 1306 + } 1307 + if newHandle != "" { 1308 + idx.publicClient.InvalidateHandle(newHandle) 1309 + } 1310 + idx.publicClient.InvalidateDID(did) 1311 + 1312 + idx.RefreshProfile(ctx, did) 1313 + } 1314 + 1211 1315 // GetKnownDIDs returns all DIDs that have created Arabica records 1212 1316 func (idx *FeedIndex) GetKnownDIDs(ctx context.Context) ([]string, error) { 1213 1317 rows, err := idx.db.QueryContext(ctx, `SELECT did FROM known_dids`)
+7 -5
internal/firehose/profile_watcher.go
··· 308 308 } 309 309 310 310 case "identity": 311 - // Handle change or PDS migration โ€” refresh the cached profile so handle 312 - // resolution stays accurate. Profile-commit events don't fire on handle 313 - // changes, so this is the only signal we get. 314 - pw.index.RefreshProfile(context.Background(), event.DID) 311 + // Handle change, handle reassignment, or PDS migration. OnIdentityEvent 312 + // reconciles the profile cache, did_by_handle index, and resolver caches 313 + // before refreshing โ€” this is also the only signal we get when a handle 314 + // moves from an abandoned DID to a new one. 315 315 handle := "" 316 316 if event.Identity != nil { 317 317 handle = event.Identity.Handle 318 318 } 319 - log.Info().Str("did", event.DID).Str("handle", handle).Msg("profile watcher: identity update, refreshed profile") 319 + pw.index.OnIdentityEvent(context.Background(), event.DID, handle) 320 + log.Info().Str("did", event.DID).Str("handle", handle).Msg("profile watcher: identity update reconciled") 320 321 321 322 case "account": 322 323 if event.Account == nil { ··· 336 337 log.Error().Err(err).Str("did", event.DID).Str("status", status).Msg("profile watcher: failed to delete user data") 337 338 return 338 339 } 340 + pw.index.InvalidatePublicCachesForDID(event.DID) 339 341 log.Warn().Str("did", event.DID).Str("status", status).Msg("profile watcher: purged all data for account") 340 342 pw.Unwatch(event.DID) 341 343 }
+51
internal/handlers/admin.go
··· 791 791 log.Error().Err(err).Str("did", didStr).Msg("witness export: encode failed") 792 792 } 793 793 } 794 + 795 + // HandleAdminPurgeDID removes every trace of a DID from the witness cache: 796 + // records, likes, comments (including ones targeting this DID's records), 797 + // notifications, profile cache, did_by_handle index, known/registered/backfilled 798 + // tracking, and user settings. Moderation tables are preserved as evidence. 799 + // 800 + // Required when an account orphans its data โ€” e.g. the user's PDS goes away 801 + // without the firehose ever emitting a deleted/takendown account event, so the 802 + // stale records sit in the cache forever. Auth and admin checks are handled by 803 + // RequireAdmin. 804 + func (h *Handler) HandleAdminPurgeDID(w http.ResponseWriter, r *http.Request) { 805 + rawDID := strings.TrimSpace(r.URL.Query().Get("did")) 806 + if rawDID == "" { 807 + // Form posts may put it in the body. 808 + if err := r.ParseForm(); err == nil { 809 + rawDID = strings.TrimSpace(r.FormValue("did")) 810 + } 811 + } 812 + if rawDID == "" { 813 + http.Error(w, "missing 'did' parameter", http.StatusBadRequest) 814 + return 815 + } 816 + did, err := syntax.ParseDID(rawDID) 817 + if err != nil { 818 + http.Error(w, fmt.Sprintf("invalid DID: %v", err), http.StatusBadRequest) 819 + return 820 + } 821 + if h.feedIndex == nil { 822 + http.Error(w, "feed index not configured", http.StatusServiceUnavailable) 823 + return 824 + } 825 + 826 + didStr := did.String() 827 + actor, _ := atproto.GetAuthenticatedDID(r.Context()) 828 + 829 + if err := h.feedIndex.DeleteAllByDID(r.Context(), didStr); err != nil { 830 + log.Error().Err(err).Str("did", didStr).Str("actor", actor).Msg("admin purge: DeleteAllByDID failed") 831 + http.Error(w, "purge failed", http.StatusInternalServerError) 832 + return 833 + } 834 + h.feedIndex.InvalidatePublicCachesForDID(didStr) 835 + 836 + log.Warn().Str("did", didStr).Str("actor", actor).Msg("admin purge: removed all witness data for DID") 837 + 838 + w.Header().Set("Content-Type", "application/json") 839 + _ = json.NewEncoder(w).Encode(map[string]any{ 840 + "did": didStr, 841 + "purged": true, 842 + "purgedAt": time.Now().UTC(), 843 + }) 844 + }
+2
internal/routing/routing.go
··· 198 198 middleware.RequireHTMXMiddleware(http.HandlerFunc(h.HandleAdminStats)))) 199 199 mux.Handle("GET /_mod/export", middleware.RequireAdmin(modSvc, 200 200 http.HandlerFunc(h.HandleAdminExportDID))) 201 + mux.Handle("POST /_mod/purge", cop.Handler( 202 + middleware.RequireAdmin(modSvc, http.HandlerFunc(h.HandleAdminPurgeDID)))) 201 203 202 204 // Static files (must come after specific routes) 203 205 fs := http.FileServer(http.Dir("static"))

History

1 round 0 comments
sign up or login to add to the discussion
pdewey.com submitted #0
1 commit
expand
fix: handle resolution cache race condition
merge conflicts detected
expand
  • internal/firehose/index.go:128
  • internal/firehose/profile_watcher.go:308
  • internal/handlers/admin.go:791
  • internal/routing/routing.go:198
expand 0 comments