this repo has no description
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

switch over to now-merged `atproto/identity/redisdir` (#507)

This is just a copy of `automod/directory`, in a new package location,
and will be maintained/used by other services in addition to just
automod.

There was one very small tweak of the API which is to pass an int for
size of local (in-process) caching (this was hard-coded as 10k
previously).

authored by

bnewbold and committed by
GitHub
05273c62 0590ced0

+2 -358
-356
automod/directory/redis_directory.go
··· 1 - package directory 2 - 3 - import ( 4 - "context" 5 - "fmt" 6 - "sync" 7 - "time" 8 - 9 - "github.com/bluesky-social/indigo/atproto/identity" 10 - "github.com/bluesky-social/indigo/atproto/syntax" 11 - 12 - "github.com/go-redis/cache/v9" 13 - "github.com/prometheus/client_golang/prometheus" 14 - "github.com/prometheus/client_golang/prometheus/promauto" 15 - "github.com/redis/go-redis/v9" 16 - ) 17 - 18 - var redisDirPrefix string = "dir/" 19 - 20 - // uses redis as a cache for identity lookups. includes a local cache layer as well, for hot keys 21 - type RedisDirectory struct { 22 - Inner identity.Directory 23 - ErrTTL time.Duration 24 - HitTTL time.Duration 25 - 26 - handleCache *cache.Cache 27 - identityCache *cache.Cache 28 - didLookupChans sync.Map 29 - handleLookupChans sync.Map 30 - } 31 - 32 - type HandleEntry struct { 33 - Updated time.Time 34 - DID syntax.DID 35 - Err error 36 - } 37 - 38 - type IdentityEntry struct { 39 - Updated time.Time 40 - Identity *identity.Identity 41 - Err error 42 - } 43 - 44 - var _ identity.Directory = (*RedisDirectory)(nil) 45 - 46 - func NewRedisDirectory(inner identity.Directory, redisURL string, hitTTL, errTTL time.Duration) (*RedisDirectory, error) { 47 - opt, err := redis.ParseURL(redisURL) 48 - if err != nil { 49 - return nil, err 50 - } 51 - rdb := redis.NewClient(opt) 52 - // check redis connection 53 - _, err = rdb.Ping(context.TODO()).Result() 54 - if err != nil { 55 - return nil, err 56 - } 57 - handleCache := cache.New(&cache.Options{ 58 - Redis: rdb, 59 - LocalCache: cache.NewTinyLFU(10_000, hitTTL), 60 - }) 61 - identityCache := cache.New(&cache.Options{ 62 - Redis: rdb, 63 - LocalCache: cache.NewTinyLFU(10_000, hitTTL), 64 - }) 65 - return &RedisDirectory{ 66 - Inner: inner, 67 - ErrTTL: errTTL, 68 - HitTTL: hitTTL, 69 - handleCache: handleCache, 70 - identityCache: identityCache, 71 - }, nil 72 - } 73 - 74 - func (d *RedisDirectory) IsHandleStale(e *HandleEntry) bool { 75 - if e.Err != nil && time.Since(e.Updated) > d.ErrTTL { 76 - return true 77 - } 78 - return false 79 - } 80 - 81 - func (d *RedisDirectory) IsIdentityStale(e *IdentityEntry) bool { 82 - if e.Err != nil && time.Since(e.Updated) > d.ErrTTL { 83 - return true 84 - } 85 - return false 86 - } 87 - 88 - func (d *RedisDirectory) updateHandle(ctx context.Context, h syntax.Handle) (*HandleEntry, error) { 89 - h = h.Normalize() 90 - ident, err := d.Inner.LookupHandle(ctx, h) 91 - if err != nil { 92 - he := HandleEntry{ 93 - Updated: time.Now(), 94 - DID: "", 95 - Err: err, 96 - } 97 - err = d.handleCache.Set(&cache.Item{ 98 - Ctx: ctx, 99 - Key: redisDirPrefix + h.String(), 100 - Value: he, 101 - TTL: d.ErrTTL, 102 - }) 103 - if err != nil { 104 - return nil, err 105 - } 106 - return &he, nil 107 - } 108 - 109 - ident.ParsedPublicKey = nil 110 - entry := IdentityEntry{ 111 - Updated: time.Now(), 112 - Identity: ident, 113 - Err: nil, 114 - } 115 - he := HandleEntry{ 116 - Updated: time.Now(), 117 - DID: ident.DID, 118 - Err: nil, 119 - } 120 - 121 - err = d.identityCache.Set(&cache.Item{ 122 - Ctx: ctx, 123 - Key: redisDirPrefix + ident.DID.String(), 124 - Value: entry, 125 - TTL: d.HitTTL, 126 - }) 127 - if err != nil { 128 - return nil, err 129 - } 130 - err = d.handleCache.Set(&cache.Item{ 131 - Ctx: ctx, 132 - Key: redisDirPrefix + h.String(), 133 - Value: he, 134 - TTL: d.HitTTL, 135 - }) 136 - if err != nil { 137 - return nil, err 138 - } 139 - return &he, nil 140 - } 141 - 142 - func (d *RedisDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (syntax.DID, error) { 143 - var entry HandleEntry 144 - err := d.handleCache.Get(ctx, redisDirPrefix+h.String(), &entry) 145 - if err != nil && err != cache.ErrCacheMiss { 146 - return "", err 147 - } 148 - if err != cache.ErrCacheMiss && !d.IsHandleStale(&entry) { 149 - handleCacheHits.Inc() 150 - return entry.DID, entry.Err 151 - } 152 - handleCacheMisses.Inc() 153 - 154 - // Coalesce multiple requests for the same Handle 155 - res := make(chan struct{}) 156 - val, loaded := d.handleLookupChans.LoadOrStore(h.String(), res) 157 - if loaded { 158 - handleRequestsCoalesced.Inc() 159 - // Wait for the result from the pending request 160 - select { 161 - case <-val.(chan struct{}): 162 - // The result should now be in the cache 163 - err := d.handleCache.Get(ctx, redisDirPrefix+h.String(), entry) 164 - if err != nil && err != cache.ErrCacheMiss { 165 - return "", err 166 - } 167 - if err != cache.ErrCacheMiss && !d.IsHandleStale(&entry) { 168 - return entry.DID, entry.Err 169 - } 170 - return "", fmt.Errorf("identity not found in cache after coalesce returned") 171 - case <-ctx.Done(): 172 - return "", ctx.Err() 173 - } 174 - } 175 - 176 - var did syntax.DID 177 - // Update the Handle Entry from PLC and cache the result 178 - newEntry, err := d.updateHandle(ctx, h) 179 - if err == nil && newEntry != nil { 180 - did = newEntry.DID 181 - } 182 - // Cleanup the coalesce map and close the results channel 183 - d.handleLookupChans.Delete(h.String()) 184 - // Callers waiting will now get the result from the cache 185 - close(res) 186 - 187 - return did, err 188 - } 189 - 190 - func (d *RedisDirectory) updateDID(ctx context.Context, did syntax.DID) (*IdentityEntry, error) { 191 - ident, err := d.Inner.LookupDID(ctx, did) 192 - // wipe parsed public key; it's a waste of space and can't serialize 193 - if nil == err { 194 - ident.ParsedPublicKey = nil 195 - } 196 - // persist the identity lookup error, instead of processing it immediately 197 - entry := IdentityEntry{ 198 - Updated: time.Now(), 199 - Identity: ident, 200 - Err: err, 201 - } 202 - var he *HandleEntry 203 - // if *not* an error, then also update the handle cache 204 - if nil == err && !ident.Handle.IsInvalidHandle() { 205 - he = &HandleEntry{ 206 - Updated: time.Now(), 207 - DID: did, 208 - Err: nil, 209 - } 210 - } 211 - 212 - err = d.identityCache.Set(&cache.Item{ 213 - Ctx: ctx, 214 - Key: redisDirPrefix + did.String(), 215 - Value: entry, 216 - TTL: d.HitTTL, 217 - }) 218 - if err != nil { 219 - return nil, err 220 - } 221 - if he != nil { 222 - err = d.handleCache.Set(&cache.Item{ 223 - Ctx: ctx, 224 - Key: redisDirPrefix + ident.Handle.String(), 225 - Value: *he, 226 - TTL: d.HitTTL, 227 - }) 228 - if err != nil { 229 - return nil, err 230 - } 231 - } 232 - return &entry, nil 233 - } 234 - 235 - func (d *RedisDirectory) LookupDID(ctx context.Context, did syntax.DID) (*identity.Identity, error) { 236 - var entry IdentityEntry 237 - err := d.identityCache.Get(ctx, redisDirPrefix+did.String(), &entry) 238 - if err != nil && err != cache.ErrCacheMiss { 239 - return nil, err 240 - } 241 - if err != cache.ErrCacheMiss && !d.IsIdentityStale(&entry) { 242 - identityCacheHits.Inc() 243 - return entry.Identity, entry.Err 244 - } 245 - identityCacheMisses.Inc() 246 - 247 - // Coalesce multiple requests for the same DID 248 - res := make(chan struct{}) 249 - val, loaded := d.didLookupChans.LoadOrStore(did.String(), res) 250 - if loaded { 251 - identityRequestsCoalesced.Inc() 252 - // Wait for the result from the pending request 253 - select { 254 - case <-val.(chan struct{}): 255 - // The result should now be in the cache 256 - err = d.identityCache.Get(ctx, redisDirPrefix+did.String(), &entry) 257 - if err != nil && err != cache.ErrCacheMiss { 258 - return nil, err 259 - } 260 - if err != cache.ErrCacheMiss && !d.IsIdentityStale(&entry) { 261 - return entry.Identity, entry.Err 262 - } 263 - return nil, fmt.Errorf("identity not found in cache after coalesce returned") 264 - case <-ctx.Done(): 265 - return nil, ctx.Err() 266 - } 267 - } 268 - 269 - var doc *identity.Identity 270 - // Update the Identity Entry from PLC and cache the result 271 - newEntry, err := d.updateDID(ctx, did) 272 - if err == nil && newEntry != nil { 273 - doc = newEntry.Identity 274 - } 275 - // Cleanup the coalesce map and close the results channel 276 - d.didLookupChans.Delete(did.String()) 277 - // Callers waiting will now get the result from the cache 278 - close(res) 279 - 280 - return doc, err 281 - } 282 - 283 - func (d *RedisDirectory) LookupHandle(ctx context.Context, h syntax.Handle) (*identity.Identity, error) { 284 - did, err := d.ResolveHandle(ctx, h) 285 - if err != nil { 286 - return nil, err 287 - } 288 - ident, err := d.LookupDID(ctx, did) 289 - if err != nil { 290 - return nil, err 291 - } 292 - 293 - declared, err := ident.DeclaredHandle() 294 - if err != nil { 295 - return nil, err 296 - } 297 - if declared != h { 298 - return nil, fmt.Errorf("handle does not match that declared in DID document") 299 - } 300 - return ident, nil 301 - } 302 - 303 - func (d *RedisDirectory) Lookup(ctx context.Context, a syntax.AtIdentifier) (*identity.Identity, error) { 304 - handle, err := a.AsHandle() 305 - if nil == err { // if not an error, is a handle 306 - return d.LookupHandle(ctx, handle) 307 - } 308 - did, err := a.AsDID() 309 - if nil == err { // if not an error, is a DID 310 - return d.LookupDID(ctx, did) 311 - } 312 - return nil, fmt.Errorf("at-identifier neither a Handle nor a DID") 313 - } 314 - 315 - func (d *RedisDirectory) Purge(ctx context.Context, a syntax.AtIdentifier) error { 316 - handle, err := a.AsHandle() 317 - if nil == err { // if not an error, is a handle 318 - handle = handle.Normalize() 319 - return d.handleCache.Delete(ctx, handle.String()) 320 - } 321 - did, err := a.AsDID() 322 - if nil == err { // if not an error, is a DID 323 - return d.identityCache.Delete(ctx, did.String()) 324 - } 325 - return fmt.Errorf("at-identifier neither a Handle nor a DID") 326 - } 327 - 328 - var handleCacheHits = promauto.NewCounter(prometheus.CounterOpts{ 329 - Name: "atproto_redis_directory_handle_cache_hits", 330 - Help: "Number of cache hits for ATProto handle lookups", 331 - }) 332 - 333 - var handleCacheMisses = promauto.NewCounter(prometheus.CounterOpts{ 334 - Name: "atproto_redis_directory_handle_cache_misses", 335 - Help: "Number of cache misses for ATProto handle lookups", 336 - }) 337 - 338 - var identityCacheHits = promauto.NewCounter(prometheus.CounterOpts{ 339 - Name: "atproto_redis_directory_identity_cache_hits", 340 - Help: "Number of cache hits for ATProto identity lookups", 341 - }) 342 - 343 - var identityCacheMisses = promauto.NewCounter(prometheus.CounterOpts{ 344 - Name: "atproto_redis_directory_identity_cache_misses", 345 - Help: "Number of cache misses for ATProto identity lookups", 346 - }) 347 - 348 - var identityRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{ 349 - Name: "atproto_redis_directory_identity_requests_coalesced", 350 - Help: "Number of identity requests coalesced", 351 - }) 352 - 353 - var handleRequestsCoalesced = promauto.NewCounter(prometheus.CounterOpts{ 354 - Name: "atproto_redis_directory_handle_requests_coalesced", 355 - Help: "Number of handle requests coalesced", 356 - })
+2 -2
cmd/hepa/main.go
··· 10 10 "time" 11 11 12 12 "github.com/bluesky-social/indigo/atproto/identity" 13 + "github.com/bluesky-social/indigo/atproto/identity/redisdir" 13 14 "github.com/bluesky-social/indigo/atproto/syntax" 14 15 "github.com/bluesky-social/indigo/automod/capture" 15 - "github.com/bluesky-social/indigo/automod/directory" 16 16 17 17 "github.com/carlmjohnson/versioninfo" 18 18 _ "github.com/joho/godotenv/autoload" ··· 117 117 } 118 118 var dir identity.Directory 119 119 if cctx.String("redis-url") != "" { 120 - rdir, err := directory.NewRedisDirectory(&baseDir, cctx.String("redis-url"), time.Hour*24, time.Minute*2) 120 + rdir, err := redisdir.NewRedisDirectory(&baseDir, cctx.String("redis-url"), time.Hour*24, time.Minute*2, 10_000) 121 121 if err != nil { 122 122 return nil, err 123 123 }