this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

redis-cached identity directory (#499)

This exact code is already in the repo, being used for automod. I'm
doing this PR for review from @whyrusleeping and/or @ericvolp12, and any
code cleanup or API changes. After this merges, I will update automod to
use this package in a separate PR.

Thee are no tests yet. Could probably use
https://github.com/go-redis/redismock plus the exiting directory mock
for testing.

authored by

bnewbold and committed by
GitHub
81c10d1a aa76a194

+372
+4
atproto/identity/redisdir/doc.go
··· 1 + /* 2 + Identity Directory implementation with tiered caching, using Redis. 3 + */ 4 + package redisdir
+36
atproto/identity/redisdir/metrics.go
··· 1 + package redisdir 2 + 3 + import ( 4 + "github.com/prometheus/client_golang/prometheus" 5 + "github.com/prometheus/client_golang/prometheus/promauto" 6 + ) 7 + 8 + var handleCacheHits = promauto.NewCounter(prometheus.CounterOpts{ 9 + Name: "atproto_redis_directory_handle_cache_hits", 10 + Help: "Number of cache hits for ATProto handle lookups", 11 + }) 12 + 13 + var handleCacheMisses = promauto.NewCounter(prometheus.CounterOpts{ 14 + Name: "atproto_redis_directory_handle_cache_misses", 15 + Help: "Number of cache misses for ATProto handle lookups", 16 + }) 17 + 18 + var identityCacheHits = promauto.NewCounter(prometheus.CounterOpts{ 19 + Name: "atproto_redis_directory_identity_cache_hits", 20 + Help: "Number of cache hits for ATProto identity lookups", 21 + }) 22 + 23 + var identityCacheMisses = promauto.NewCounter(prometheus.CounterOpts{ 24 + Name: "atproto_redis_directory_identity_cache_misses", 25 + Help: "Number of cache misses for ATProto identity lookups", 26 + }) 27 + 28 + var identityRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{ 29 + Name: "atproto_redis_directory_identity_requests_coalesced", 30 + Help: "Number of identity requests coalesced", 31 + }) 32 + 33 + var handleRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{ 34 + Name: "atproto_redis_directory_handle_requests_coalesced", 35 + Help: "Number of handle requests coalesced", 36 + })
+332
atproto/identity/redisdir/redis_directory.go
··· 1 + package redisdir 2 + 3 + import ( 4 + "context" 5 + "fmt" 6 + "sync" 7 + "time" 8 + 9 + "github.com/bluesky-social/indigo/atproto/identity" 10 + "github.com/bluesky-social/indigo/atproto/syntax" 11 + 12 + "github.com/go-redis/cache/v9" 13 + "github.com/redis/go-redis/v9" 14 + ) 15 + 16 + // prefix string for all the Redis keys this cache uses 17 + var redisDirPrefix string = "dir/" 18 + 19 + // Uses redis as a cache for identity lookups. 20 + // 21 + // Includes an in-process LRU cache as well (provided by the redis client library), for hot key (identities). 22 + type RedisDirectory struct { 23 + Inner identity.Directory 24 + ErrTTL time.Duration 25 + HitTTL time.Duration 26 + 27 + handleCache *cache.Cache 28 + identityCache *cache.Cache 29 + didLookupChans sync.Map 30 + handleLookupChans sync.Map 31 + } 32 + 33 + type handleEntry struct { 34 + Updated time.Time 35 + DID syntax.DID 36 + Err error 37 + } 38 + 39 + type identityEntry struct { 40 + Updated time.Time 41 + Identity *identity.Identity 42 + Err error 43 + } 44 + 45 + var _ identity.Directory = (*RedisDirectory)(nil) 46 + 47 + // Creates a new caching `identity.Directory` wrapper around an existing directory, using Redis and in-process LRU for caching. 48 + // 49 + // `redisURL` contains all the redis connection config options. 50 + // `hitTTL` and `errTTL` define how long successful and errored identity metadata should be cached (respectively). errTTL is expected to be shorted than hitTTL. 51 + // `lruSize` is the size of the in-process cache, for each of the handle and identity caches. 10000 is a reasonable default. 52 + func NewRedisDirectory(inner identity.Directory, redisURL string, hitTTL, errTTL time.Duration, lruSize int) (*RedisDirectory, error) { 53 + opt, err := redis.ParseURL(redisURL) 54 + if err != nil { 55 + return nil, err 56 + } 57 + rdb := redis.NewClient(opt) 58 + // check redis connection 59 + _, err = rdb.Ping(context.TODO()).Result() 60 + if err != nil { 61 + return nil, err 62 + } 63 + handleCache := cache.New(&cache.Options{ 64 + Redis: rdb, 65 + LocalCache: cache.NewTinyLFU(lruSize, hitTTL), 66 + }) 67 + identityCache := cache.New(&cache.Options{ 68 + Redis: rdb, 69 + LocalCache: cache.NewTinyLFU(lruSize, hitTTL), 70 + }) 71 + return &RedisDirectory{ 72 + Inner: inner, 73 + ErrTTL: errTTL, 74 + HitTTL: hitTTL, 75 + handleCache: handleCache, 76 + identityCache: identityCache, 77 + }, nil 78 + } 79 + 80 + func (d *RedisDirectory) isHandleStale(e *handleEntry) bool { 81 + if e.Err != nil && time.Since(e.Updated) > d.ErrTTL { 82 + return true 83 + } 84 + return false 85 + } 86 + 87 + func (d *RedisDirectory) isIdentityStale(e *identityEntry) bool { 88 + if e.Err != nil && time.Since(e.Updated) > d.ErrTTL { 89 + return true 90 + } 91 + return false 92 + } 93 + 94 + func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) (*handleEntry, error) { 95 + h = h.Normalize() 96 + ident, err := d.Inner.LookupHandle(ctx, h) 97 + if err != nil { 98 + he := handleEntry{ 99 + Updated: time.Now(), 100 + DID: "", 101 + Err: err, 102 + } 103 + err = d.handleCache.Set(&cache.Item{ 104 + Ctx: ctx, 105 + Key: redisDirPrefix + h.String(), 106 + Value: he, 107 + TTL: d.ErrTTL, 108 + }) 109 + if err != nil { 110 + return nil, err 111 + } 112 + return &he, nil 113 + } 114 + 115 + ident.ParsedPublicKey = nil 116 + entry := identityEntry{ 117 + Updated: time.Now(), 118 + Identity: ident, 119 + Err: nil, 120 + } 121 + he := handleEntry{ 122 + Updated: time.Now(), 123 + DID: ident.DID, 124 + Err: nil, 125 + } 126 + 127 + err = d.identityCache.Set(&cache.Item{ 128 + Ctx: ctx, 129 + Key: redisDirPrefix + ident.DID.String(), 130 + Value: entry, 131 + TTL: d.HitTTL, 132 + }) 133 + if err != nil { 134 + return nil, err 135 + } 136 + err = d.handleCache.Set(&cache.Item{ 137 + Ctx: ctx, 138 + Key: redisDirPrefix + h.String(), 139 + Value: he, 140 + TTL: d.HitTTL, 141 + }) 142 + if err != nil { 143 + return nil, err 144 + } 145 + return &he, nil 146 + } 147 + 148 + func (d *RedisDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (syntax.DID, error) { 149 + var entry handleEntry 150 + err := d.handleCache.Get(ctx, redisDirPrefix+h.String(), &entry) 151 + if err != nil && err != cache.ErrCacheMiss { 152 + return "", err 153 + } 154 + if err != cache.ErrCacheMiss && !d.isHandleStale(&entry) { 155 + handleCacheHits.Inc() 156 + return entry.DID, entry.Err 157 + } 158 + handleCacheMisses.Inc() 159 + 160 + // Coalesce multiple requests for the same Handle 161 + res := make(chan struct{}) 162 + val, loaded := d.handleLookupChans.LoadOrStore(h.String(), res) 163 + if loaded { 164 + handleRequestsCoalesced.Inc() 165 + // Wait for the result from the pending request 166 + select { 167 + case <-val.(chan struct{}): 168 + // The result should now be in the cache 169 + err := d.handleCache.Get(ctx, redisDirPrefix+h.String(), entry) 170 + if err != nil && err != cache.ErrCacheMiss { 171 + return "", err 172 + } 173 + if err != cache.ErrCacheMiss && !d.isHandleStale(&entry) { 174 + return entry.DID, entry.Err 175 + } 176 + return "", fmt.Errorf("identity not found in cache after coalesce returned") 177 + case <-ctx.Done(): 178 + return "", ctx.Err() 179 + } 180 + } 181 + 182 + var did syntax.DID 183 + // Update the Handle Entry from PLC and cache the result 184 + newEntry, err := d.updateHandle(ctx, h) 185 + if err == nil && newEntry != nil { 186 + did = newEntry.DID 187 + } 188 + // Cleanup the coalesce map and close the results channel 189 + d.handleLookupChans.Delete(h.String()) 190 + // Callers waiting will now get the result from the cache 191 + close(res) 192 + 193 + return did, err 194 + } 195 + 196 + func (d *RedisDirectory) updateDID(ctx context.Context, did syntax.DID) (*identityEntry, error) { 197 + ident, err := d.Inner.LookupDID(ctx, did) 198 + // wipe parsed public key; it's a waste of space and can't serialize 199 + if nil == err { 200 + ident.ParsedPublicKey = nil 201 + } 202 + // persist the identity lookup error, instead of processing it immediately 203 + entry := identityEntry{ 204 + Updated: time.Now(), 205 + Identity: ident, 206 + Err: err, 207 + } 208 + var he *handleEntry 209 + // if *not* an error, then also update the handle cache 210 + if nil == err && !ident.Handle.IsInvalidHandle() { 211 + he = &handleEntry{ 212 + Updated: time.Now(), 213 + DID: did, 214 + Err: nil, 215 + } 216 + } 217 + 218 + err = d.identityCache.Set(&cache.Item{ 219 + Ctx: ctx, 220 + Key: redisDirPrefix + did.String(), 221 + Value: entry, 222 + TTL: d.HitTTL, 223 + }) 224 + if err != nil { 225 + return nil, err 226 + } 227 + if he != nil { 228 + err = d.handleCache.Set(&cache.Item{ 229 + Ctx: ctx, 230 + Key: redisDirPrefix + ident.Handle.String(), 231 + Value: *he, 232 + TTL: d.HitTTL, 233 + }) 234 + if err != nil { 235 + return nil, err 236 + } 237 + } 238 + return &entry, nil 239 + } 240 + 241 + func (d *RedisDirectory) LookupDID(ctx context.Context, did syntax.DID) (*identity.Identity, error) { 242 + var entry identityEntry 243 + err := d.identityCache.Get(ctx, redisDirPrefix+did.String(), &entry) 244 + if err != nil && err != cache.ErrCacheMiss { 245 + return nil, err 246 + } 247 + if err != cache.ErrCacheMiss && !d.isIdentityStale(&entry) { 248 + identityCacheHits.Inc() 249 + return entry.Identity, entry.Err 250 + } 251 + identityCacheMisses.Inc() 252 + 253 + // Coalesce multiple requests for the same DID 254 + res := make(chan struct{}) 255 + val, loaded := d.didLookupChans.LoadOrStore(did.String(), res) 256 + if loaded { 257 + identityRequestsCoalesced.Inc() 258 + // Wait for the result from the pending request 259 + select { 260 + case <-val.(chan struct{}): 261 + // The result should now be in the cache 262 + err = d.identityCache.Get(ctx, redisDirPrefix+did.String(), &entry) 263 + if err != nil && err != cache.ErrCacheMiss { 264 + return nil, err 265 + } 266 + if err != cache.ErrCacheMiss && !d.isIdentityStale(&entry) { 267 + return entry.Identity, entry.Err 268 + } 269 + return nil, fmt.Errorf("identity not found in cache after coalesce returned") 270 + case <-ctx.Done(): 271 + return nil, ctx.Err() 272 + } 273 + } 274 + 275 + var doc *identity.Identity 276 + // Update the Identity Entry from PLC and cache the result 277 + newEntry, err := d.updateDID(ctx, did) 278 + if err == nil && newEntry != nil { 279 + doc = newEntry.Identity 280 + } 281 + // Cleanup the coalesce map and close the results channel 282 + d.didLookupChans.Delete(did.String()) 283 + // Callers waiting will now get the result from the cache 284 + close(res) 285 + 286 + return doc, err 287 + } 288 + 289 + func (d *RedisDirectory) LookupHandle(ctx context.Context, h syntax.Handle) (*identity.Identity, error) { 290 + did, err := d.ResolveHandle(ctx, h) 291 + if err != nil { 292 + return nil, err 293 + } 294 + ident, err := d.LookupDID(ctx, did) 295 + if err != nil { 296 + return nil, err 297 + } 298 + 299 + declared, err := ident.DeclaredHandle() 300 + if err != nil { 301 + return nil, err 302 + } 303 + if declared != h { 304 + return nil, fmt.Errorf("handle does not match that declared in DID document") 305 + } 306 + return ident, nil 307 + } 308 + 309 + func (d *RedisDirectory) Lookup(ctx context.Context, a syntax.AtIdentifier) (*identity.Identity, error) { 310 + handle, err := a.AsHandle() 311 + if nil == err { // if not an error, is a handle 312 + return d.LookupHandle(ctx, handle) 313 + } 314 + did, err := a.AsDID() 315 + if nil == err { // if not an error, is a DID 316 + return d.LookupDID(ctx, did) 317 + } 318 + return nil, fmt.Errorf("at-identifier neither a Handle nor a DID") 319 + } 320 + 321 + func (d *RedisDirectory) Purge(ctx context.Context, a syntax.AtIdentifier) error { 322 + handle, err := a.AsHandle() 323 + if nil == err { // if not an error, is a handle 324 + handle = handle.Normalize() 325 + return d.handleCache.Delete(ctx, handle.String()) 326 + } 327 + did, err := a.AsDID() 328 + if nil == err { // if not an error, is a DID 329 + return d.identityCache.Delete(ctx, did.String()) 330 + } 331 + return fmt.Errorf("at-identifier neither a Handle nor a DID") 332 + }