Lasa is a stateless proxy that generates a RSS or an Atom feed from a Standard.site publication. lasa.anhgelus.world
rss atom atprotocol standard-site atproto
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat(cache): rss, atom and authors

+207 -77
+10 -1
cmd/lasad/config/config.go
··· 1 1 package config 2 2 3 3 import ( 4 + "context" 4 5 "os" 6 + "time" 5 7 6 8 "github.com/BurntSushi/toml" 7 9 glide "github.com/valkey-io/valkey-glide/go/v2" ··· 40 42 cfg = cfg.WithCredentials(config.NewServerCredentialsWithDefaultUsername(c.Auth.Password)) 41 43 } 42 44 } 43 - return glide.NewClient(cfg) 45 + client, err := glide.NewClient(cfg) 46 + if err != nil { 47 + return nil, err 48 + } 49 + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) 50 + defer cancel() 51 + _, err = client.Ping(ctx) 52 + return client, err 44 53 } 45 54 46 55 func Load(path string) (*Config, error) {
+1
cmd/lasad/context.go
··· 5 5 const ( 6 6 keyCfg = iota 7 7 keyClient 8 + keyDir 8 9 )
+179
cmd/lasad/directory.go
··· 1 + package main 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "errors" 7 + "fmt" 8 + "html/template" 9 + "io" 10 + "log/slog" 11 + "net/http" 12 + "time" 13 + 14 + glide "github.com/valkey-io/valkey-glide/go/v2" 15 + site "tangled.org/anhgelus.world/goat-site" 16 + "tangled.org/anhgelus.world/lasa" 17 + "tangled.org/anhgelus.world/xrpc" 18 + "tangled.org/anhgelus.world/xrpc/atproto" 19 + ) 20 + 21 + type Directory struct { 22 + cache *glide.Client 23 + duration time.Duration 24 + limiter *lasa.LimitManyRequests[[]byte] 25 + } 26 + 27 + func NewDirectory(cache *glide.Client, dur time.Duration) *Directory { 28 + return &Directory{ 29 + cache: cache, 30 + duration: dur, 31 + limiter: lasa.NewLimitManyRequests[[]byte](), 32 + } 33 + } 34 + 35 + func (d *Directory) fromCache(ctx context.Context, key string) []byte { 36 + if d.cache == nil { 37 + return nil 38 + } 39 + resp, err := d.cache.Get(ctx, key) 40 + if err != nil || resp.IsNil() { 41 + return nil 42 + } 43 + return []byte(resp.Value()) 44 + } 45 + 46 + func (d *Directory) toCache(ctx context.Context, key string, b []byte) { 47 + if d.cache == nil { 48 + return 49 + } 50 + _, err := d.cache.Set(ctx, key, string(b)) 51 + if err != nil { 52 + slog.Warn("cannot set bytes in cache", "bytes", b, "error", err) 53 + return 54 + } 55 + slog.Debug("bytes set in cache") 56 + _, err = d.cache.Expire(ctx, key, d.duration) 57 + if err != nil { 58 + slog.Warn("cannot set bytes expire", "bytes", b, "error", err) 59 + } 60 + } 61 + 62 + func (d *Directory) Author(ctx context.Context, did *atproto.DID) ([]byte, error) { 63 + key := did.String() + ":publications" 64 + b := d.fromCache(ctx, key) 65 + if b != nil { 66 + slog.Debug("author got from cache") 67 + } 68 + slog.Debug("cannot get author from cache", "did", did) 69 + 70 + return d.limiter.Do(key, func() ([]byte, error) { 71 + client := ctx.Value(keyClient).(xrpc.Client) 72 + doc, err := client.Directory().ResolveDID(ctx, did) 73 + if err != nil { 74 + return nil, err 75 + } 76 + h, _ := doc.Handle() 77 + v := struct { 78 + Author string 79 + Publications []Publication 80 + }{Author: h.String()} 81 + pubs, _, err := xrpc.ListRecords[*site.Publication](ctx, client, did, 0, "", false) 82 + if err != nil { 83 + return nil, err 84 + } 85 + v.Publications = make([]Publication, len(pubs)) 86 + for i, pub := range pubs { 87 + uri, err := pub.URI.URI(ctx, client.Directory()) 88 + if err != nil { 89 + slog.Error("cannot get uri for publication", "pub", pub.URI) 90 + continue 91 + } 92 + link := fmt.Sprintf("/%s/%s", did, uri.RecordKey()) 93 + v.Publications[i] = Publication{link, pub.Value.Name} 94 + } 95 + var bf bytes.Buffer 96 + err = template.Must(template.ParseFS(files, "author.html")).ExecuteTemplate(&bf, "author.html", v) 97 + if err != nil { 98 + return nil, err 99 + } 100 + b, err := io.ReadAll(&bf) 101 + if err != nil { 102 + return nil, err 103 + } 104 + d.toCache(ctx, key, b) 105 + return b, nil 106 + }) 107 + } 108 + 109 + func (d *Directory) Feed( 110 + ctx context.Context, 111 + w http.ResponseWriter, 112 + r *http.Request, 113 + kind string, 114 + gen func(context.Context, xrpc.Client, io.Writer, *atproto.DID, xrpc.RecordStored[*site.Publication]) error, 115 + ) error { 116 + w.Header().Set("Content-Type", "application/"+kind+"+xml") 117 + client := ctx.Value(keyClient).(xrpc.Client) 118 + did, err := lasa.Resolve(ctx, client.Directory(), r.PathValue("id")) 119 + if err != nil { 120 + w.WriteHeader(http.StatusBadRequest) 121 + return nil 122 + } 123 + rkey, err := atproto.ParseRecordKey(r.PathValue("rkey")) 124 + if err != nil { 125 + w.WriteHeader(http.StatusBadRequest) 126 + return nil 127 + } 128 + key := did.String() + ":" + rkey.String() + ":" + kind 129 + b := d.fromCache(ctx, key) 130 + if b != nil { 131 + slog.Debug("feed got from cache", "kind", kind) 132 + w.Write(b) 133 + return nil 134 + } 135 + slog.Debug("cannot get feed from cache", "did", did, "kind", kind) 136 + 137 + b, err = d.limiter.Do(key, func() ([]byte, error) { 138 + pub, ok, err := getPub(ctx, did, rkey) 139 + if !ok { 140 + return nil, nil 141 + } 142 + var bf bytes.Buffer 143 + err = gen(ctx, client, &bf, did, pub) 144 + if err != nil { 145 + return nil, err 146 + } 147 + b, err := io.ReadAll(&bf) 148 + if err != nil { 149 + return nil, err 150 + } 151 + d.toCache(ctx, key, b) 152 + return b, nil 153 + }) 154 + if err != nil { 155 + return err 156 + } 157 + if b == nil { 158 + w.WriteHeader(http.StatusNotFound) 159 + return nil 160 + } 161 + w.Write(b) 162 + return err 163 + } 164 + 165 + func getPub(ctx context.Context, did *atproto.DID, rkey atproto.RecordKey) (xrpc.RecordStored[*site.Publication], bool, error) { 166 + client := ctx.Value(keyClient).(xrpc.Client) 167 + pub, err := xrpc.GetRecord[*site.Publication](ctx, client, did, rkey, nil) 168 + if err != nil { 169 + if err, ok := errors.AsType[xrpc.ErrStandardResponse](err); ok { 170 + if errors.Is(err, xrpc.ErrRecordNotFound) { 171 + return pub, false, nil 172 + } 173 + return pub, false, err 174 + } else { 175 + return pub, false, err 176 + } 177 + } 178 + return pub, true, nil 179 + }
+9 -68
cmd/lasad/run.go
··· 3 3 import ( 4 4 "context" 5 5 "embed" 6 - "errors" 7 6 "fmt" 8 - "html/template" 9 7 "log/slog" 10 8 "net" 11 9 "net/http" ··· 15 13 "time" 16 14 17 15 glide "github.com/valkey-io/valkey-glide/go/v2" 18 - site "tangled.org/anhgelus.world/goat-site" 19 16 "tangled.org/anhgelus.world/lasa" 20 17 "tangled.org/anhgelus.world/lasa/cmd/internal" 21 18 "tangled.org/anhgelus.world/lasa/cmd/lasad/config" 22 19 "tangled.org/anhgelus.world/xrpc" 23 - "tangled.org/anhgelus.world/xrpc/atproto" 24 20 ) 25 21 26 22 //go:embed index.html author.html ··· 68 64 if err != nil { 69 65 panic(err) 70 66 } 67 + slog.Info("connected to valkey") 71 68 dur = time.Duration(cfg.Cache.Duration) * time.Minute 72 69 } 73 70 client := lasa.NewClient(http.DefaultClient, net.DefaultResolver, cache, dur, cfg.Domain) 74 71 ctx = context.WithValue(ctx, keyClient, client) 72 + ctx = context.WithValue(ctx, keyDir, NewDirectory(cache, dur)) 75 73 76 74 mux := http.NewServeMux() 77 75 mux.HandleFunc("GET /{id}/{rkey}/rss", func(w http.ResponseWriter, r *http.Request) { 78 - did, pub, ok := getPub(w, r) 79 - if !ok { 80 - return 81 - } 82 - w.Header().Set("Content-Type", "application/rss+xml") 83 - err = lasa.GenerateRSS(ctx, client, w, did, pub) 76 + dir := r.Context().Value(keyDir).(*Directory) 77 + err := dir.Feed(r.Context(), w, r, "rss", lasa.GenerateRSS) 84 78 if err != nil { 85 79 panic(err) 86 80 } 87 81 }) 88 82 mux.HandleFunc("GET /{id}/{rkey}/atom", func(w http.ResponseWriter, r *http.Request) { 89 - did, pub, ok := getPub(w, r) 90 - if !ok { 91 - return 92 - } 93 - w.Header().Set("Content-Type", "application/atom+xml") 94 - err = lasa.GenerateAtom(ctx, client, w, did, pub) 83 + dir := r.Context().Value(keyDir).(*Directory) 84 + err := dir.Feed(r.Context(), w, r, "atom", lasa.GenerateAtom) 95 85 if err != nil { 96 86 panic(err) 97 87 } ··· 104 94 w.WriteHeader(http.StatusBadRequest) 105 95 return 106 96 } 107 - doc, err := client.Directory().ResolveDID(ctx, did) 97 + dir := ctx.Value(keyDir).(*Directory) 98 + b, err := dir.Author(ctx, did) 108 99 if err != nil { 109 100 panic(err) 110 101 } 111 - h, _ := doc.Handle() 112 - v := struct { 113 - Author string 114 - Publications []Publication 115 - }{Author: h.String()} 116 - pubs, _, err := xrpc.ListRecords[*site.Publication](ctx, client, did, 0, "", false) 117 - if err != nil { 118 - panic(err) 119 - } 120 - v.Publications = make([]Publication, len(pubs)) 121 - for i, pub := range pubs { 122 - uri, err := pub.URI.URI(ctx, client.Directory()) 123 - if err != nil { 124 - panic(err) 125 - } 126 - link := fmt.Sprintf("/%s/%s", did, uri.RecordKey()) 127 - v.Publications[i] = Publication{link, pub.Value.Name} 128 - } 129 - err = template.Must(template.ParseFS(files, "author.html")).ExecuteTemplate(w, "author.html", v) 130 - if err != nil { 131 - panic(err) 132 - } 102 + w.Write(b) 133 103 }) 134 104 mux.HandleFunc("GET /{$}", func(w http.ResponseWriter, r *http.Request) { 135 105 b, err := files.ReadFile("index.html") ··· 168 138 h.ServeHTTP(w, r.WithContext(ctx)) 169 139 }) 170 140 } 171 - 172 - func getPub(w http.ResponseWriter, r *http.Request) (*atproto.DID, xrpc.RecordStored[*site.Publication], bool) { 173 - var pub xrpc.RecordStored[*site.Publication] 174 - ctx := r.Context() 175 - client := ctx.Value(keyClient).(xrpc.Client) 176 - did, err := lasa.Resolve(ctx, client.Directory(), r.PathValue("id")) 177 - if err != nil { 178 - w.WriteHeader(http.StatusBadRequest) 179 - return nil, pub, false 180 - } 181 - rkey, err := atproto.ParseRecordKey(r.PathValue("rkey")) 182 - if err != nil { 183 - w.WriteHeader(http.StatusBadRequest) 184 - return nil, pub, false 185 - } 186 - pub, err = xrpc.GetRecord[*site.Publication](ctx, client, did, rkey, nil) 187 - if err != nil { 188 - if err, ok := errors.AsType[xrpc.ErrStandardResponse](err); ok { 189 - if errors.Is(err, xrpc.ErrRecordNotFound) { 190 - w.WriteHeader(http.StatusNotFound) 191 - return nil, pub, false 192 - } 193 - panic(err) 194 - } else { 195 - panic(err) 196 - } 197 - } 198 - return did, pub, true 199 - }
+4 -4
directory.go
··· 15 15 inner atproto.Directory 16 16 cache *glide.Client 17 17 duration time.Duration 18 - limiter *limitManyRequests[*atproto.DIDDocument] 18 + limiter *LimitManyRequests[*atproto.DIDDocument] 19 19 } 20 20 21 21 func NewDirectory(dir atproto.Directory, cache *glide.Client, dur time.Duration) *Directory { 22 22 return &Directory{ 23 23 inner: dir, 24 24 cache: cache, 25 - limiter: newLimitManyRequests[*atproto.DIDDocument](), 25 + limiter: NewLimitManyRequests[*atproto.DIDDocument](), 26 26 duration: dur, 27 27 } 28 28 } ··· 60 60 return 61 61 } 62 62 slog.Debug("DIDDocument set in cache") 63 - _, err = d.cache.ExpireAt(ctx, key, time.Now().Add(d.duration)) 63 + _, err = d.cache.Expire(ctx, key, d.duration) 64 64 if err != nil { 65 - slog.Warn("cannot set DIDDocument expire at", "document", doc, "error", err) 65 + slog.Warn("cannot set DIDDocument expire", "document", doc, "error", err) 66 66 } 67 67 } 68 68
+4 -4
sync.go
··· 25 25 } 26 26 } 27 27 28 - type limitManyRequests[T any] struct { 28 + type LimitManyRequests[T any] struct { 29 29 content map[string]*limitRequests[T] 30 30 mu sync.RWMutex 31 31 } 32 32 33 - func newLimitManyRequests[T any]() *limitManyRequests[T] { 34 - return &limitManyRequests[T]{content: make(map[string]*limitRequests[T])} 33 + func NewLimitManyRequests[T any]() *LimitManyRequests[T] { 34 + return &LimitManyRequests[T]{content: make(map[string]*limitRequests[T])} 35 35 } 36 36 37 - func (lm *limitManyRequests[T]) Do(key string, fn func() (T, error)) (T, error) { 37 + func (lm *LimitManyRequests[T]) Do(key string, fn func() (T, error)) (T, error) { 38 38 lm.mu.RLock() 39 39 l, ok := lm.content[key] 40 40 if ok {