Lasa is a stateless proxy that generates a RSS or an Atom feed from a Standard.site publication. lasa.anhgelus.world
rss atom atprotocol standard-site atproto
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(cache): custom atproto directory

+169 -5
+99
directory.go
··· 1 + package lasa 2 + 3 + import ( 4 + "context" 5 + "encoding/json" 6 + "log/slog" 7 + "time" 8 + 9 + "github.com/valkey-io/valkey-go" 10 + "tangled.org/anhgelus.world/xrpc/atproto" 11 + ) 12 + 13 + type Directory struct { 14 + inner atproto.Directory 15 + cache valkey.Client 16 + duration time.Duration 17 + limiter *limitManyRequests[*atproto.DIDDocument] 18 + } 19 + 20 + func NewDirectory(dir atproto.Directory, cache valkey.Client) *Directory { 21 + return &Directory{ 22 + inner: dir, 23 + cache: cache, 24 + } 25 + } 26 + 27 + func (d *Directory) fromCache(ctx context.Context, key string) *atproto.DIDDocument { 28 + if d.cache == nil { 29 + return nil 30 + } 31 + resp := d.cache.Do(ctx, d.cache.B().Get().Key(key).Build()) 32 + err := resp.Error() 33 + var doc *atproto.DIDDocument 34 + if err == nil { 35 + b, err := resp.AsBytes() 36 + if err == nil { 37 + err = json.Unmarshal(b, &doc) 38 + if err == nil { 39 + return doc 40 + } else { 41 + slog.Warn("cannot unmarshal cache response into DIDDocument", "resp", b) 42 + } 43 + } else { 44 + slog.Warn("cannot convert cache response into bytes", "resp", resp) 45 + } 46 + } 47 + return nil 48 + } 49 + 50 + func (d *Directory) toCache(ctx context.Context, key string, doc *atproto.DIDDocument) { 51 + if d.cache == nil { 52 + return 53 + } 54 + b, err := json.Marshal(doc) 55 + if err != nil { 56 + slog.Warn("cannot marshal DIDDocument", "document", doc, "error", err) 57 + return 58 + } 59 + err = d.cache.Do(ctx, d.cache.B().Set().Key(key).Value(string(b)).Build()).Error() 60 + if err != nil { 61 + slog.Warn("cannot set DIDDocument in cache", "document", doc, "error", err) 62 + } 63 + } 64 + 65 + func (d *Directory) ResolveHandle(ctx context.Context, h atproto.Handle) (*atproto.DIDDocument, error) { 66 + key := h.String() 67 + doc := d.fromCache(ctx, key) 68 + if doc != nil { 69 + return doc, nil 70 + } 71 + slog.Debug("cannot get DIDDocument from cache") 72 + 73 + return d.limiter.Do(key, func() (*atproto.DIDDocument, error) { 74 + doc, err := d.inner.ResolveHandle(ctx, h) 75 + if err != nil { 76 + return nil, err 77 + } 78 + d.toCache(ctx, key, doc) 79 + return doc, nil 80 + }) 81 + } 82 + 83 + func (d *Directory) ResolveDID(ctx context.Context, did *atproto.DID) (*atproto.DIDDocument, error) { 84 + key := did.String() 85 + doc := d.fromCache(ctx, key) 86 + if doc != nil { 87 + return doc, nil 88 + } 89 + slog.Debug("cannot get DIDDocument from cache") 90 + 91 + return d.limiter.Do(key, func() (*atproto.DIDDocument, error) { 92 + doc, err := d.inner.ResolveDID(ctx, did) 93 + if err != nil { 94 + return nil, err 95 + } 96 + d.toCache(ctx, key, doc) 97 + return doc, nil 98 + }) 99 + }
+5 -1
go.mod
··· 5 5 require ( 6 6 github.com/BurntSushi/toml v1.6.0 7 7 github.com/valkey-io/valkey-go v1.0.73 8 + tangled.org/anhgelus.world/xrpc v0.1.0 8 9 ) 9 10 10 - require golang.org/x/sys v0.42.0 // indirect 11 + require ( 12 + golang.org/x/net v0.52.0 // indirect 13 + golang.org/x/sys v0.42.0 // indirect 14 + )
+8 -4
go.sum
··· 8 8 github.com/valkey-io/valkey-go v1.0.73/go.mod h1:VGhZ6fs68Qrn2+OhH+6waZH27bjpgQOiLyUQyXuYK5k= 9 9 go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= 10 10 go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= 11 - golang.org/x/net v0.48.0 h1:zyQRTTrjc33Lhh0fBgT/H3oZq9WuvRR5gPC70xpDiQU= 12 - golang.org/x/net v0.48.0/go.mod h1:+ndRgGjkh8FGtu1w1FGbEC31if4VrNVMuKTgcAAnQRY= 11 + golang.org/x/net v0.52.0 h1:He/TN1l0e4mmR3QqHMT2Xab3Aj3L9qjbhRm78/6jrW0= 12 + golang.org/x/net v0.52.0/go.mod h1:R1MAz7uMZxVMualyPXb+VaqGSa3LIaUqk0eEt3w36Sw= 13 13 golang.org/x/sys v0.42.0 h1:omrd2nAlyT5ESRdCLYdm3+fMfNFE/+Rf4bDIQImRJeo= 14 14 golang.org/x/sys v0.42.0/go.mod h1:4GL1E5IUh+htKOUEOaiffhrAeqysfVGipDYzABqnCmw= 15 - golang.org/x/text v0.32.0 h1:ZD01bjUt1FQ9WJ0ClOL5vxgxOI/sVCNgX1YtKwcY0mU= 16 - golang.org/x/text v0.32.0/go.mod h1:o/rUWzghvpD5TXrTIBuJU77MTaN0ljMWE47kxGJQ7jY= 15 + golang.org/x/text v0.35.0 h1:JOVx6vVDFokkpaq1AEptVzLTpDe9KGpj5tR4/X+ybL8= 16 + golang.org/x/text v0.35.0/go.mod h1:khi/HExzZJ2pGnjenulevKNX1W67CUy0AsXcNubPGCA= 17 + pgregory.net/rapid v1.2.0 h1:keKAYRcjm+e1F0oAuU5F5+YPAWcyxNNRK2wud503Gnk= 18 + pgregory.net/rapid v1.2.0/go.mod h1:PY5XlDGj0+V1FCq0o192FdRhpKHGTRIWBgqjDBTrq04= 19 + tangled.org/anhgelus.world/xrpc v0.1.0 h1:IFPgCf2c5j6c5IJmUx/Jid0MZdYJRxlxYAoLMF+nMRs= 20 + tangled.org/anhgelus.world/xrpc v0.1.0/go.mod h1:DW43uo9DKZHVN9fiH6lAYVQ+0cfSLoceo7aE5lE1jjw=
+57
sync.go
··· 1 + package lasa 2 + 3 + import ( 4 + "sync" 5 + "sync/atomic" 6 + ) 7 + 8 + type limitRequests[T any] struct { 9 + n atomic.Uint32 10 + ch chan T 11 + } 12 + 13 + func newLimitRequests[T any]() *limitRequests[T] { 14 + return &limitRequests[T]{ch: make(chan T)} 15 + } 16 + 17 + func (l *limitRequests[T]) Sub() T { 18 + l.n.Add(1) 19 + return <-l.ch 20 + } 21 + 22 + func (l *limitRequests[T]) Send(v T) { 23 + for range l.n.Load() { 24 + l.ch <- v 25 + } 26 + } 27 + 28 + type limitManyRequests[T any] struct { 29 + content map[string]*limitRequests[T] 30 + mu sync.RWMutex 31 + } 32 + 33 + func newLimitManyRequests[T any]() *limitManyRequests[T] { 34 + return &limitManyRequests[T]{content: make(map[string]*limitRequests[T])} 35 + } 36 + 37 + func (lm *limitManyRequests[T]) Do(key string, fn func() (T, error)) (T, error) { 38 + lm.mu.RLock() 39 + l, ok := lm.content[key] 40 + if ok { 41 + lm.mu.RUnlock() 42 + return l.Sub(), nil 43 + } 44 + lm.mu.RUnlock() 45 + 46 + l = newLimitRequests[T]() 47 + lm.mu.Lock() 48 + lm.content[key] = l 49 + lm.mu.Unlock() 50 + 51 + v, err := fn() 52 + if err != nil { 53 + return v, err 54 + } 55 + l.Send(v) 56 + return v, nil 57 + }