rss email digests over ssh because you're a cool kid herald.dunkirk.sh
go rss rss-reader ssh charm
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 288 lines 8.7 kB view raw
1package ssh 2 3import ( 4 "bytes" 5 "context" 6 "database/sql" 7 "fmt" 8 "io" 9 "io/fs" 10 "path/filepath" 11 "strings" 12 "time" 13 14 "github.com/adhocore/gronx" 15 "github.com/charmbracelet/log" 16 "github.com/charmbracelet/ssh" 17 "github.com/charmbracelet/wish/scp" 18 "github.com/kierank/herald/config" 19 "github.com/kierank/herald/ratelimit" 20 "github.com/kierank/herald/scheduler" 21 "github.com/kierank/herald/store" 22) 23 24type scpHandler struct { 25 store *store.DB 26 scheduler *scheduler.Scheduler 27 logger *log.Logger 28 rateLimiter *ratelimit.Limiter 29} 30 31func (h *scpHandler) Glob(s ssh.Session, pattern string) ([]string, error) { 32 user, ok := s.Context().Value("user").(*store.User) 33 if !ok { 34 return nil, fmt.Errorf("no user in context") 35 } 36 37 configs, err := h.store.ListConfigs(s.Context(), user.ID) 38 if err != nil { 39 return nil, err 40 } 41 42 var matches []string 43 for _, cfg := range configs { 44 matched, _ := filepath.Match(pattern, cfg.Filename) 45 if matched || pattern == "*" || pattern == cfg.Filename { 46 matches = append(matches, cfg.Filename) 47 } 48 } 49 return matches, nil 50} 51 52func (h *scpHandler) WalkDir(s ssh.Session, path string, fn fs.WalkDirFunc) error { 53 user, ok := s.Context().Value("user").(*store.User) 54 if !ok { 55 return fmt.Errorf("no user in context") 56 } 57 58 configs, err := h.store.ListConfigs(s.Context(), user.ID) 59 if err != nil { 60 return err 61 } 62 63 for _, cfg := range configs { 64 info := &configFileInfo{cfg: cfg} 65 if err := fn(cfg.Filename, &configDirEntry{info: info}, nil); err != nil { 66 return err 67 } 68 } 69 return nil 70} 71 72func (h *scpHandler) NewDirEntry(s ssh.Session, name string) (*scp.DirEntry, error) { 73 return nil, fmt.Errorf("directories not supported") 74} 75 76func (h *scpHandler) NewFileEntry(s ssh.Session, name string) (*scp.FileEntry, func() error, error) { 77 user, ok := s.Context().Value("user").(*store.User) 78 if !ok { 79 return nil, nil, fmt.Errorf("no user in context") 80 } 81 82 cfg, err := h.store.GetConfig(s.Context(), user.ID, name) 83 if err != nil { 84 return nil, nil, fmt.Errorf("config not found: %w", err) 85 } 86 87 content := []byte(cfg.RawText) 88 entry := &scp.FileEntry{ 89 Name: cfg.Filename, 90 Mode: 0644, 91 Size: int64(len(content)), 92 Mtime: cfg.CreatedAt.Unix(), 93 Atime: cfg.CreatedAt.Unix(), 94 Reader: bytes.NewReader(content), 95 Filepath: cfg.Filename, 96 } 97 98 return entry, nil, nil 99} 100 101func (h *scpHandler) Mkdir(s ssh.Session, entry *scp.DirEntry) error { 102 return fmt.Errorf("directories not supported") 103} 104 105func (h *scpHandler) Write(s ssh.Session, entry *scp.FileEntry) (int64, error) { 106 h.logger.Debug("SCP Write called", "name", entry.Name, "size", entry.Size) 107 108 user, ok := s.Context().Value("user").(*store.User) 109 if !ok { 110 return 0, fmt.Errorf("no user in context") 111 } 112 113 // Rate limit SCP uploads (per user) 114 if !h.rateLimiter.Allow(fmt.Sprintf("scp:%d", user.ID)) { 115 return 0, fmt.Errorf("rate limit exceeded, please try again later") 116 } 117 118 // Max file size: 1MB 119 if entry.Size > 1024*1024 { 120 return 0, fmt.Errorf("file too large (max 1MB)") 121 } 122 123 name := entry.Name 124 if !strings.HasSuffix(name, ".txt") { 125 return 0, fmt.Errorf("only .txt files are supported") 126 } 127 128 content, err := io.ReadAll(io.LimitReader(entry.Reader, 1024*1024)) 129 if err != nil { 130 return 0, fmt.Errorf("failed to read file: %w", err) 131 } 132 133 parsed, err := config.Parse(string(content)) 134 if err != nil { 135 return 0, fmt.Errorf("failed to parse config: %w", err) 136 } 137 138 if err := config.Validate(parsed); err != nil { 139 return 0, fmt.Errorf("invalid config: %w", err) 140 } 141 142 ctx := s.Context() 143 144 // Validate feed URLs by attempting to fetch them 145 if err := config.ValidateFeedURLs(ctx, parsed); err != nil { 146 return 0, fmt.Errorf("feed validation failed: %w", err) 147 } 148 149 nextRun, err := calculateNextRun(parsed.CronExpr) 150 if err != nil { 151 return 0, fmt.Errorf("failed to calculate next run: %w", err) 152 } 153 154 // Use transaction for config update 155 tx, err := h.store.BeginTx(ctx) 156 if err != nil { 157 return 0, fmt.Errorf("begin transaction: %w", err) 158 } 159 defer func() { _ = tx.Rollback() }() 160 161 // Try to get existing config 162 existingCfg, err := h.store.GetConfigTx(ctx, tx, user.ID, name) 163 var cfg *store.Config 164 165 if err == nil { 166 // Config exists - update it 167 if err := h.store.UpdateConfigTx(ctx, tx, existingCfg.ID, parsed.Email, parsed.CronExpr, parsed.Digest, parsed.Inline, string(content), nextRun); err != nil { 168 return 0, fmt.Errorf("failed to update config: %w", err) 169 } 170 cfg = existingCfg 171 cfg.Email = parsed.Email 172 cfg.CronExpr = parsed.CronExpr 173 cfg.Digest = parsed.Digest 174 cfg.InlineContent = parsed.Inline 175 cfg.RawText = string(content) 176 177 // Sync feeds: match by URL, update/delete/add as needed 178 existingFeeds, err := h.store.GetFeedsByConfigTx(ctx, tx, cfg.ID) 179 if err != nil { 180 return 0, fmt.Errorf("failed to get existing feeds: %w", err) 181 } 182 183 // Build maps for comparison 184 existingByURL := make(map[string]*store.Feed) 185 for _, f := range existingFeeds { 186 existingByURL[f.URL] = f 187 } 188 189 newByURL := make(map[string]struct{ URL, Name string }) 190 for _, f := range parsed.Feeds { 191 newByURL[f.URL] = struct{ URL, Name string }{URL: f.URL, Name: f.Name} 192 } 193 194 // Update existing feeds that are still present 195 for _, newFeed := range parsed.Feeds { 196 if existingFeed, exists := existingByURL[newFeed.URL]; exists { 197 // Feed still exists - update name if changed 198 if err := h.store.UpdateFeedTx(ctx, tx, existingFeed.ID, newFeed.Name); err != nil { 199 return 0, fmt.Errorf("failed to update feed: %w", err) 200 } 201 } else { 202 // New feed - create it and mark existing items as seen 203 newFeedRecord, err := h.store.CreateFeedTx(ctx, tx, cfg.ID, newFeed.URL, newFeed.Name) 204 if err != nil { 205 return 0, fmt.Errorf("failed to create feed: %w", err) 206 } 207 // Pre-seed seen items so we don't send old posts 208 if err := h.preseedSeenItems(ctx, tx, newFeedRecord); err != nil { 209 h.logger.Warn("failed to preseed seen items", "feed_url", newFeed.URL, "err", err) 210 } 211 } 212 } 213 214 // Delete feeds that are no longer present 215 for _, existingFeed := range existingFeeds { 216 if _, stillExists := newByURL[existingFeed.URL]; !stillExists { 217 if err := h.store.DeleteFeedTx(ctx, tx, existingFeed.ID); err != nil { 218 return 0, fmt.Errorf("failed to delete feed: %w", err) 219 } 220 } 221 } 222 223 h.logger.Debug("updated existing config", "filename", name) 224 } else { 225 // Config doesn't exist - create new one 226 cfg, err = h.store.CreateConfigTx(ctx, tx, user.ID, name, parsed.Email, parsed.CronExpr, parsed.Digest, parsed.Inline, string(content), nextRun) 227 if err != nil { 228 return 0, fmt.Errorf("failed to create config: %w", err) 229 } 230 231 for _, feed := range parsed.Feeds { 232 if _, err := h.store.CreateFeedTx(ctx, tx, cfg.ID, feed.URL, feed.Name); err != nil { 233 return 0, fmt.Errorf("failed to create feed: %w", err) 234 } 235 } 236 237 h.logger.Debug("created new config", "filename", name) 238 } 239 240 if err := tx.Commit(); err != nil { 241 return 0, fmt.Errorf("commit transaction: %w", err) 242 } 243 244 h.logger.Info("config uploaded", "user_id", user.ID, "filename", name, "feeds", len(parsed.Feeds), "next_run", nextRun) 245 return int64(len(content)), nil 246} 247 248func calculateNextRun(cronExpr string) (time.Time, error) { 249 return gronx.NextTickAfter(cronExpr, time.Now().UTC(), true) 250} 251 252type configFileInfo struct { 253 cfg *store.Config 254} 255 256func (i *configFileInfo) Name() string { return i.cfg.Filename } 257func (i *configFileInfo) Size() int64 { return int64(len(i.cfg.RawText)) } 258func (i *configFileInfo) Mode() fs.FileMode { return 0644 } 259func (i *configFileInfo) ModTime() time.Time { return i.cfg.CreatedAt } 260func (i *configFileInfo) IsDir() bool { return false } 261func (i *configFileInfo) Sys() any { return nil } 262 263type configDirEntry struct { 264 info *configFileInfo 265} 266 267func (e *configDirEntry) Name() string { return e.info.Name() } 268func (e *configDirEntry) IsDir() bool { return false } 269func (e *configDirEntry) Type() fs.FileMode { return e.info.Mode() } 270func (e *configDirEntry) Info() (fs.FileInfo, error) { return e.info, nil } 271 272// preseedSeenItems fetches the feed and marks all current items as seen, 273// so that adding a new feed doesn't trigger emails for old posts. 274func (h *scpHandler) preseedSeenItems(ctx context.Context, tx *sql.Tx, feed *store.Feed) error { 275 result := scheduler.FetchFeed(ctx, feed) 276 if result.Error != nil { 277 return result.Error 278 } 279 280 for _, item := range result.Items { 281 if err := h.store.MarkItemSeenTx(ctx, tx, feed.ID, item.GUID, item.Title, item.Link); err != nil { 282 return err 283 } 284 } 285 286 h.logger.Debug("preseeded seen items for new feed", "feed_url", feed.URL, "count", len(result.Items)) 287 return nil 288}