rss email digests over ssh because you're a cool kid
herald.dunkirk.sh
go
rss
rss-reader
ssh
charm
1package ssh
2
3import (
4 "bytes"
5 "context"
6 "database/sql"
7 "fmt"
8 "io"
9 "io/fs"
10 "path/filepath"
11 "strings"
12 "time"
13
14 "github.com/adhocore/gronx"
15 "github.com/charmbracelet/log"
16 "github.com/charmbracelet/ssh"
17 "github.com/charmbracelet/wish/scp"
18 "github.com/kierank/herald/config"
19 "github.com/kierank/herald/ratelimit"
20 "github.com/kierank/herald/scheduler"
21 "github.com/kierank/herald/store"
22)
23
24type scpHandler struct {
25 store *store.DB
26 scheduler *scheduler.Scheduler
27 logger *log.Logger
28 rateLimiter *ratelimit.Limiter
29}
30
31func (h *scpHandler) Glob(s ssh.Session, pattern string) ([]string, error) {
32 user, ok := s.Context().Value("user").(*store.User)
33 if !ok {
34 return nil, fmt.Errorf("no user in context")
35 }
36
37 configs, err := h.store.ListConfigs(s.Context(), user.ID)
38 if err != nil {
39 return nil, err
40 }
41
42 var matches []string
43 for _, cfg := range configs {
44 matched, _ := filepath.Match(pattern, cfg.Filename)
45 if matched || pattern == "*" || pattern == cfg.Filename {
46 matches = append(matches, cfg.Filename)
47 }
48 }
49 return matches, nil
50}
51
52func (h *scpHandler) WalkDir(s ssh.Session, path string, fn fs.WalkDirFunc) error {
53 user, ok := s.Context().Value("user").(*store.User)
54 if !ok {
55 return fmt.Errorf("no user in context")
56 }
57
58 configs, err := h.store.ListConfigs(s.Context(), user.ID)
59 if err != nil {
60 return err
61 }
62
63 for _, cfg := range configs {
64 info := &configFileInfo{cfg: cfg}
65 if err := fn(cfg.Filename, &configDirEntry{info: info}, nil); err != nil {
66 return err
67 }
68 }
69 return nil
70}
71
72func (h *scpHandler) NewDirEntry(s ssh.Session, name string) (*scp.DirEntry, error) {
73 return nil, fmt.Errorf("directories not supported")
74}
75
76func (h *scpHandler) NewFileEntry(s ssh.Session, name string) (*scp.FileEntry, func() error, error) {
77 user, ok := s.Context().Value("user").(*store.User)
78 if !ok {
79 return nil, nil, fmt.Errorf("no user in context")
80 }
81
82 cfg, err := h.store.GetConfig(s.Context(), user.ID, name)
83 if err != nil {
84 return nil, nil, fmt.Errorf("config not found: %w", err)
85 }
86
87 content := []byte(cfg.RawText)
88 entry := &scp.FileEntry{
89 Name: cfg.Filename,
90 Mode: 0644,
91 Size: int64(len(content)),
92 Mtime: cfg.CreatedAt.Unix(),
93 Atime: cfg.CreatedAt.Unix(),
94 Reader: bytes.NewReader(content),
95 Filepath: cfg.Filename,
96 }
97
98 return entry, nil, nil
99}
100
101func (h *scpHandler) Mkdir(s ssh.Session, entry *scp.DirEntry) error {
102 return fmt.Errorf("directories not supported")
103}
104
105func (h *scpHandler) Write(s ssh.Session, entry *scp.FileEntry) (int64, error) {
106 h.logger.Debug("SCP Write called", "name", entry.Name, "size", entry.Size)
107
108 user, ok := s.Context().Value("user").(*store.User)
109 if !ok {
110 return 0, fmt.Errorf("no user in context")
111 }
112
113 // Rate limit SCP uploads (per user)
114 if !h.rateLimiter.Allow(fmt.Sprintf("scp:%d", user.ID)) {
115 return 0, fmt.Errorf("rate limit exceeded, please try again later")
116 }
117
118 // Max file size: 1MB
119 if entry.Size > 1024*1024 {
120 return 0, fmt.Errorf("file too large (max 1MB)")
121 }
122
123 name := entry.Name
124 if !strings.HasSuffix(name, ".txt") {
125 return 0, fmt.Errorf("only .txt files are supported")
126 }
127
128 content, err := io.ReadAll(io.LimitReader(entry.Reader, 1024*1024))
129 if err != nil {
130 return 0, fmt.Errorf("failed to read file: %w", err)
131 }
132
133 parsed, err := config.Parse(string(content))
134 if err != nil {
135 return 0, fmt.Errorf("failed to parse config: %w", err)
136 }
137
138 if err := config.Validate(parsed); err != nil {
139 return 0, fmt.Errorf("invalid config: %w", err)
140 }
141
142 ctx := s.Context()
143
144 // Validate feed URLs by attempting to fetch them
145 if err := config.ValidateFeedURLs(ctx, parsed); err != nil {
146 return 0, fmt.Errorf("feed validation failed: %w", err)
147 }
148
149 nextRun, err := calculateNextRun(parsed.CronExpr)
150 if err != nil {
151 return 0, fmt.Errorf("failed to calculate next run: %w", err)
152 }
153
154 // Use transaction for config update
155 tx, err := h.store.BeginTx(ctx)
156 if err != nil {
157 return 0, fmt.Errorf("begin transaction: %w", err)
158 }
159 defer func() { _ = tx.Rollback() }()
160
161 // Try to get existing config
162 existingCfg, err := h.store.GetConfigTx(ctx, tx, user.ID, name)
163 var cfg *store.Config
164
165 if err == nil {
166 // Config exists - update it
167 if err := h.store.UpdateConfigTx(ctx, tx, existingCfg.ID, parsed.Email, parsed.CronExpr, parsed.Digest, parsed.Inline, string(content), nextRun); err != nil {
168 return 0, fmt.Errorf("failed to update config: %w", err)
169 }
170 cfg = existingCfg
171 cfg.Email = parsed.Email
172 cfg.CronExpr = parsed.CronExpr
173 cfg.Digest = parsed.Digest
174 cfg.InlineContent = parsed.Inline
175 cfg.RawText = string(content)
176
177 // Sync feeds: match by URL, update/delete/add as needed
178 existingFeeds, err := h.store.GetFeedsByConfigTx(ctx, tx, cfg.ID)
179 if err != nil {
180 return 0, fmt.Errorf("failed to get existing feeds: %w", err)
181 }
182
183 // Build maps for comparison
184 existingByURL := make(map[string]*store.Feed)
185 for _, f := range existingFeeds {
186 existingByURL[f.URL] = f
187 }
188
189 newByURL := make(map[string]struct{ URL, Name string })
190 for _, f := range parsed.Feeds {
191 newByURL[f.URL] = struct{ URL, Name string }{URL: f.URL, Name: f.Name}
192 }
193
194 // Update existing feeds that are still present
195 for _, newFeed := range parsed.Feeds {
196 if existingFeed, exists := existingByURL[newFeed.URL]; exists {
197 // Feed still exists - update name if changed
198 if err := h.store.UpdateFeedTx(ctx, tx, existingFeed.ID, newFeed.Name); err != nil {
199 return 0, fmt.Errorf("failed to update feed: %w", err)
200 }
201 } else {
202 // New feed - create it and mark existing items as seen
203 newFeedRecord, err := h.store.CreateFeedTx(ctx, tx, cfg.ID, newFeed.URL, newFeed.Name)
204 if err != nil {
205 return 0, fmt.Errorf("failed to create feed: %w", err)
206 }
207 // Pre-seed seen items so we don't send old posts
208 if err := h.preseedSeenItems(ctx, tx, newFeedRecord); err != nil {
209 h.logger.Warn("failed to preseed seen items", "feed_url", newFeed.URL, "err", err)
210 }
211 }
212 }
213
214 // Delete feeds that are no longer present
215 for _, existingFeed := range existingFeeds {
216 if _, stillExists := newByURL[existingFeed.URL]; !stillExists {
217 if err := h.store.DeleteFeedTx(ctx, tx, existingFeed.ID); err != nil {
218 return 0, fmt.Errorf("failed to delete feed: %w", err)
219 }
220 }
221 }
222
223 h.logger.Debug("updated existing config", "filename", name)
224 } else {
225 // Config doesn't exist - create new one
226 cfg, err = h.store.CreateConfigTx(ctx, tx, user.ID, name, parsed.Email, parsed.CronExpr, parsed.Digest, parsed.Inline, string(content), nextRun)
227 if err != nil {
228 return 0, fmt.Errorf("failed to create config: %w", err)
229 }
230
231 for _, feed := range parsed.Feeds {
232 if _, err := h.store.CreateFeedTx(ctx, tx, cfg.ID, feed.URL, feed.Name); err != nil {
233 return 0, fmt.Errorf("failed to create feed: %w", err)
234 }
235 }
236
237 h.logger.Debug("created new config", "filename", name)
238 }
239
240 if err := tx.Commit(); err != nil {
241 return 0, fmt.Errorf("commit transaction: %w", err)
242 }
243
244 h.logger.Info("config uploaded", "user_id", user.ID, "filename", name, "feeds", len(parsed.Feeds), "next_run", nextRun)
245 return int64(len(content)), nil
246}
247
248func calculateNextRun(cronExpr string) (time.Time, error) {
249 return gronx.NextTickAfter(cronExpr, time.Now().UTC(), true)
250}
251
252type configFileInfo struct {
253 cfg *store.Config
254}
255
256func (i *configFileInfo) Name() string { return i.cfg.Filename }
257func (i *configFileInfo) Size() int64 { return int64(len(i.cfg.RawText)) }
258func (i *configFileInfo) Mode() fs.FileMode { return 0644 }
259func (i *configFileInfo) ModTime() time.Time { return i.cfg.CreatedAt }
260func (i *configFileInfo) IsDir() bool { return false }
261func (i *configFileInfo) Sys() any { return nil }
262
263type configDirEntry struct {
264 info *configFileInfo
265}
266
267func (e *configDirEntry) Name() string { return e.info.Name() }
268func (e *configDirEntry) IsDir() bool { return false }
269func (e *configDirEntry) Type() fs.FileMode { return e.info.Mode() }
270func (e *configDirEntry) Info() (fs.FileInfo, error) { return e.info, nil }
271
272// preseedSeenItems fetches the feed and marks all current items as seen,
273// so that adding a new feed doesn't trigger emails for old posts.
274func (h *scpHandler) preseedSeenItems(ctx context.Context, tx *sql.Tx, feed *store.Feed) error {
275 result := scheduler.FetchFeed(ctx, feed)
276 if result.Error != nil {
277 return result.Error
278 }
279
280 for _, item := range result.Items {
281 if err := h.store.MarkItemSeenTx(ctx, tx, feed.ID, item.GUID, item.Title, item.Link); err != nil {
282 return err
283 }
284 }
285
286 h.logger.Debug("preseeded seen items for new feed", "feed_url", feed.URL, "count", len(result.Items))
287 return nil
288}