mail based rss feed aggregator
2
fork

Configure Feed

Select the types of activity you want to include in your feed.

rewrite a lot of `fetcher` also closes [#9](https://tangled.org/ollie.earth/eater/issues/9) and [#10](https://tangled.org/ollie.earth/eater/issues/10)

ollie edbdf033 74b209a3

+683 -371
+9 -7
db/migrations/20260331190734_initialize.sql
··· 14 14 -- See the Licence for the specific language governing permissions and limitations. [cite: 6] 15 15 16 16 -- migrate:up 17 + PRAGMA foreign_keys = ON; 18 + 17 19 CREATE TABLE users 18 20 ( 19 21 id uuid PRIMARY KEY NOT NULL UNIQUE, ··· 26 28 ( 27 29 user_id uuid NOT NULL, 28 30 feed_id uuid NOT NULL, 29 - FOREIGN KEY(user_id) REFERENCES users(id), 30 - FOREIGN KEY(feed_id) REFERENCES feeds(id), 31 + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE, 32 + FOREIGN KEY(feed_id) REFERENCES feeds(id) ON DELETE CASCADE, 31 33 UNIQUE(user_id, feed_id) 32 34 ); 33 35 34 36 CREATE TABLE feeds 35 37 ( 36 - id uuid PRIMARY KEY NOT NULL, 37 - link TEXT NOT NULL UNIQUE, 38 - skip_n_times INT NOT NULL, 39 - failed_n_times INT NOT NULL 38 + id uuid PRIMARY KEY NOT NULL, 39 + link TEXT NOT NULL UNIQUE, 40 + next_check timestamp NOT NULL DEFAULT (unixepoch('now')), 41 + repeated_failures INT NOT NULL DEFAULT 0 40 42 ); 41 43 42 44 CREATE TABLE feed_updates 43 45 ( 44 46 user_id uuid NOT NULL, 45 47 link_to_post TEXT NOT NULL, 46 - FOREIGN KEY (user_id) REFERENCES users (id), 48 + FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE, 47 49 UNIQUE(user_id, link_to_post) 48 50 ); 49 51
+7 -7
db/schema.sql
··· 10 10 ( 11 11 user_id uuid NOT NULL, 12 12 feed_id uuid NOT NULL, 13 - FOREIGN KEY(user_id) REFERENCES users(id), 14 - FOREIGN KEY(feed_id) REFERENCES feeds(id), 13 + FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE, 14 + FOREIGN KEY(feed_id) REFERENCES feeds(id) ON DELETE CASCADE, 15 15 UNIQUE(user_id, feed_id) 16 16 ); 17 17 CREATE TABLE feeds 18 18 ( 19 - id uuid PRIMARY KEY NOT NULL, 20 - link TEXT NOT NULL UNIQUE, 21 - skip_n_times INT NOT NULL, 22 - failed_n_times INT NOT NULL 19 + id uuid PRIMARY KEY NOT NULL, 20 + link TEXT NOT NULL UNIQUE, 21 + next_check timestamp NOT NULL DEFAULT (unixepoch('now')), 22 + repeated_failures INT NOT NULL DEFAULT 0 23 23 ); 24 24 CREATE TABLE feed_updates 25 25 ( 26 26 user_id uuid NOT NULL, 27 27 link_to_post TEXT NOT NULL, 28 - FOREIGN KEY (user_id) REFERENCES users (id), 28 + FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE, 29 29 UNIQUE(user_id, link_to_post) 30 30 ); 31 31 -- Dbmate schema migrations
+1
gleam.toml
··· 35 35 gleam_crypto = ">= 1.5.1 and < 2.0.0" 36 36 glaze_oat = ">= 3.0.0 and < 4.0.0" 37 37 lustre_portal = ">= 1.0.1 and < 2.0.0" 38 + gleam_time = ">= 1.8.0 and < 2.0.0" 38 39 39 40 [dev_dependencies] 40 41 gleeunit = ">= 1.0.0 and < 2.0.0"
+1
manifest.toml
··· 53 53 gleam_json = { version = ">= 3.1.0 and < 4.0.0" } 54 54 gleam_otp = { version = ">= 1.2.0 and < 2.0.0" } 55 55 gleam_stdlib = { version = ">= 0.44.0 and < 2.0.0" } 56 + gleam_time = { version = ">= 1.8.0 and < 2.0.0" } 56 57 gleeunit = { version = ">= 1.0.0 and < 2.0.0" } 57 58 group_registry = { version = ">= 1.0.0 and < 2.0.0" } 58 59 logging = { version = ">= 1.3.0 and < 2.0.0" }
+51 -24
src/eater/backend.gleam
··· 98 98 99 99 supervisor.new(supervisor.RestForOne) 100 100 |> supervisor.add(pubsub.supervised(registry)) 101 - |> supervisor.add(fetcher.factory( 102 - name: fetcher_factory, 103 - starter: fetcher_manager, 104 - registry:, 105 - database:, 106 - )) 107 101 |> supervisor.add(sender.factory( 108 102 name: sender_factory, 109 103 starter: sender_manager, ··· 111 105 database:, 112 106 smtp_environment:, 113 107 )) 108 + |> supervisor.add(fetcher.factory( 109 + name: fetcher_factory, 110 + starter: fetcher_manager, 111 + registry:, 112 + database:, 113 + )) 114 114 |> supervisor.add(backend_manager(names, database, smtp_environment)) 115 115 |> supervisor.supervised() 116 116 } 117 117 118 118 // main api --------------------------------------------------------------------- 119 119 120 + pub type NewSubscriptionError { 121 + InvalidFeed(fetcher.FetchError) 122 + Database(DatabaseErrorKind) 123 + } 124 + 125 + pub type DatabaseErrorKind { 126 + FailedToAddFeed 127 + FailedToAddSubscription 128 + } 129 + 120 130 /// add a new subscription for a user 121 131 /// - save to database 122 132 /// - notify backend processes ··· 125 135 backend backend: Reference, 126 136 user user: user.User, 127 137 feed feed: rss.Location, 128 - ) -> Result(Nil, Nil) { 138 + ) -> Result(Nil, NewSubscriptionError) { 139 + // verify the feed is actually a valid rss feed 140 + use _ <- result.try( 141 + fetcher.fetch_feed(feed.link) 142 + |> result.map_error(InvalidFeed), 143 + ) 144 + 145 + // add the feed, if it doesnt exist 129 146 use _ <- result.try( 130 147 database.add_feed(backend.database, feed) 131 148 |> result.map_error(fn(error) { 132 - log(woof.Error, "Failed to add user to database", [ 133 - woof.field("user-email", user.email), 134 - woof.field("user-id", user.id |> uuid.to_string()), 149 + log(woof.Error, "Failed to add feed to database", [ 150 + woof.field("feed-url", feed.link |> uri.to_string()), 151 + woof.field("feed-id", feed.id |> uuid.to_string()), 135 152 woof.field("details", string.inspect(error)), 136 153 ]) 154 + Database(FailedToAddFeed) 137 155 }), 138 156 ) 139 157 158 + // add the subscription 140 159 use _ <- result.try( 141 160 database.add_subscription(backend.database, user, feed) 142 161 |> result.map_error(fn(error) { 143 - log(woof.Error, "Failed to add user to database", [ 162 + log(woof.Error, "Failed to add subscription to database", [ 144 163 woof.field("user-email", user.email), 145 164 woof.field("user-id", user.id |> uuid.to_string()), 165 + woof.field("feed-url", feed.link |> uri.to_string()), 166 + woof.field("feed-id", feed.id |> uuid.to_string()), 146 167 woof.field("details", string.inspect(error)), 147 168 ]) 169 + 170 + Database(FailedToAddSubscription) 148 171 }), 149 172 ) 150 173 174 + // notify the backend 151 175 let backend = process.named_subject(backend.name) 152 176 process.send(backend, NewSubscription(user, feed)) 153 177 |> Ok ··· 290 314 // external messages ---------------------------------------------------------- 291 315 /// add a subscription for a user 292 316 /// 293 - /// - save to database 294 317 /// - notify sender 295 318 /// - start sender if not running 296 319 /// - start fetcher if not running ··· 299 322 300 323 /// remove a subscription from a user 301 324 /// 302 - /// - delete from database 303 325 /// - notify sender 304 326 /// - if noone is subscribed 305 327 /// - stop fetcher ··· 489 511 // - stop fetcher 490 512 fetcher.stop_for(state.names.fetcher_manager, feed) 491 513 // - remove feed from database 492 - use _ <- try_twice( 493 - fn() { database.delete_feed(feed, state.database) }, 494 - otherwise: log_and_stop( 495 - state.logger, 496 - "Failed to delete feed from database", 497 - [ 498 - woof.field("feed", feed.link |> uri.to_string()), 499 - ], 500 - ), 501 - ) 502 514 503 - actor.continue(state) 515 + case database.delete_feed(state.database, feed) { 516 + Ok(_) -> actor.continue(state) 517 + Error(error) -> { 518 + woof.log( 519 + state.logger, 520 + woof.Error, 521 + "Failed to delete feed from database", 522 + [ 523 + woof.field("feed-url", feed.link |> uri.to_string()), 524 + woof.field("feed-id", feed.id |> uuid.to_string()), 525 + woof.field("details", string.inspect(error)), 526 + ], 527 + ) 528 + actor.stop_abnormal("Failed to delete feed from database") 529 + } 530 + } 504 531 } 505 532 _ -> actor.continue(state) 506 533 }
+39 -23
src/eater/database.gleam
··· 19 19 import eater/sql 20 20 import eater/user 21 21 import gleam/dynamic/decode 22 + import gleam/float 22 23 import gleam/list 23 24 import gleam/option.{Some} 24 25 import gleam/result 26 + import gleam/time/timestamp 25 27 import gleam/uri 26 28 import parrot/dev 27 29 import sqlight ··· 37 39 feed feed: rss.Location, 38 40 ) -> Result(rss.Location, sqlight.Error) { 39 41 let #(sql, with) = 40 - sql.add_feed( 41 - feed.id |> uuid.to_bit_array(), 42 - feed.link |> uri.to_string(), 43 - failed_n_times: feed.failed_n_times, 44 - skip_n_times: feed.skip_n_times, 45 - ) 42 + sql.add_feed(feed.id |> uuid.to_bit_array(), feed.link |> uri.to_string()) 46 43 47 44 let with = list.map(with, parrot_to_sqlight) 48 45 ··· 68 65 rss.Location( 69 66 id:, 70 67 link:, 71 - failed_n_times: feed.failed_n_times, 72 - skip_n_times: feed.skip_n_times, 68 + next_check: feed.next_check, 69 + repeated_failures: feed.repeated_failures, 73 70 ) 74 71 }) 75 72 |> Ok ··· 89 86 90 87 // it is a `:one` query with limit 1 91 88 list.map(feeds, fn(feed) { 92 - let sql.FeedByLink(id:, link:, skip_n_times:, failed_n_times:) = feed 89 + let sql.FeedByLink(id:, link:, next_check:, repeated_failures:) = feed 93 90 94 91 let assert Ok(id) = uuid.from_bit_array(id) 95 92 as "invalid UUID from db UUID column?!" 96 93 97 94 let assert Ok(link) = uri.parse(link) as "Invalid uri from database?!" 98 95 99 - rss.Location(id:, link:, failed_n_times:, skip_n_times:) 96 + rss.Location(id:, link:, next_check:, repeated_failures:) 100 97 }) 101 98 |> Ok 102 99 } 103 100 104 101 /// delete a feed 102 + /// 103 + /// also deletes all associated subscriptions 105 104 /// 106 105 pub fn delete_feed( 107 - feed feed: rss.Location, 108 106 from on: sqlight.Connection, 107 + feed feed: rss.Location, 109 108 ) -> Result(Nil, sqlight.Error) { 110 109 let #(sql, with) = sql.delete_feed(id: feed.id |> uuid.to_bit_array) 111 110 let with = list.map(with, parrot_to_sqlight) ··· 114 113 |> result.replace(Nil) 115 114 } 116 115 117 - /// persist changes to `failed_n_times` and `skip_n_times` 116 + /// persist changes to `feed.next_check` 118 117 /// 119 - pub fn update_feed_cooldown( 118 + pub fn update_feed_next_check( 119 + in on: sqlight.Connection, 120 120 feed feed: rss.Location, 121 + ) -> Result(Nil, sqlight.Error) { 122 + let rss.Location(id:, link: _, next_check:, repeated_failures: _) = feed 123 + 124 + let #(sql, with) = 125 + sql.update_feed_next_check(next_check:, id: uuid.to_bit_array(id)) 126 + 127 + let with = list.map(with, parrot_to_sqlight) 128 + 129 + sqlight.query(sql, on:, with:, expecting: decode.success("")) 130 + |> result.replace(Nil) 131 + } 132 + 133 + /// persist changes to `failed_n_times` and `skip_n_times` 134 + /// 135 + pub fn update_feed_repeated_failures( 121 136 in on: sqlight.Connection, 137 + feed feed: rss.Location, 122 138 ) -> Result(Nil, sqlight.Error) { 123 - let rss.Location(id:, link: _, failed_n_times:, skip_n_times:) = feed 139 + let rss.Location(id:, link: _, next_check: _, repeated_failures:) = feed 124 140 125 141 let #(sql, with) = 126 - sql.update_feed_cooldown( 127 - skip_n_times:, 128 - failed_n_times:, 142 + sql.update_feed_repeated_failures( 143 + repeated_failures:, 129 144 id: uuid.to_bit_array(id), 130 145 ) 131 146 ··· 319 334 as "Invalid UUID from datbase?!" 320 335 321 336 use link <- option.then(feed.feed_link) 322 - use failed_n_times <- option.then(feed.feed_failed) 323 - use skip_n_times <- option.then(feed.feed_skip) 337 + use next_check <- option.then(feed.next_check) 338 + use repeated_failures <- option.then(feed.repeated_failures) 324 339 325 340 let assert Ok(link) = uri.parse(link) as "Invalid uri from database?!" 326 341 327 - rss.Location(id:, link:, failed_n_times:, skip_n_times:) 342 + rss.Location(id:, link:, next_check:, repeated_failures:) 328 343 |> Some 329 344 } 330 345 |> option.to_result(Nil) ··· 368 383 rss.Location( 369 384 id:, 370 385 link:, 371 - failed_n_times: feed.failed_n_times, 372 - skip_n_times: feed.skip_n_times, 386 + next_check: feed.next_check, 387 + repeated_failures: feed.repeated_failures, 373 388 ), 374 389 feed.count, 375 390 ) ··· 389 404 dev.ParamNullable(x) -> sqlight.nullable(fn(a) { parrot_to_sqlight(a) }, x) 390 405 dev.ParamList(_) -> panic as "sqlite does not implement lists" 391 406 dev.ParamDate(_) -> panic as "date parameter needs to be implemented" 392 - dev.ParamTimestamp(_) -> panic as "sqlite does not support timestamps" 407 + dev.ParamTimestamp(timestamp) -> 408 + sqlight.int(timestamp.to_unix_seconds(timestamp) |> float.round()) 393 409 dev.ParamDynamic(_) -> panic as "cannot process dynamic parameter" 394 410 } 395 411 }
+28 -25
src/eater/feed/rss.gleam
··· 17 17 // See the Licence for the specific language governing permissions and limitations. [cite: 6] 18 18 19 19 import gleam/dynamic/decode 20 + import gleam/time/duration 21 + import gleam/time/timestamp 20 22 import gleam/uri 21 23 import youid/uuid 22 24 ··· 79 81 // location --------------------------------------------------------------------- 80 82 81 83 pub type Location { 82 - /// `failed_n_times`: how many times (back to back) fetching this url has failed 84 + /// `repeated_failures`: how many times (back to back) fetching this url has failed 83 85 /// +1 when it fails | = 0 when it succeeds 84 86 /// 85 - /// `skip_n_times`: how many more times this url should be skipped 86 - /// try to fetch if = 0 87 - /// on failure: set = failed_n_times * 2 88 - /// 89 - Location(id: uuid.Uuid, link: uri.Uri, failed_n_times: Int, skip_n_times: Int) 87 + Location( 88 + id: uuid.Uuid, 89 + link: uri.Uri, 90 + next_check: timestamp.Timestamp, 91 + repeated_failures: Int, 92 + ) 90 93 } 91 94 92 95 /// creates a new Location using the given `link` 93 96 /// sets a new uuid 94 97 /// 95 - pub fn new_location(link link: uri.Uri) { 96 - Location(id: uuid.v7(), link:, failed_n_times: 0, skip_n_times: 0) 98 + pub fn new_location(link link: uri.Uri) -> Location { 99 + Location( 100 + id: uuid.v7(), 101 + link:, 102 + next_check: timestamp.system_time(), 103 + repeated_failures: 0, 104 + ) 97 105 } 98 106 99 - /// update a location in case of failure 107 + /// increment the failure count 100 108 /// 101 - pub fn failure(location: Location) { 102 - Location( 103 - ..location, 104 - failed_n_times: location.failed_n_times + 1, 105 - skip_n_times: { location.failed_n_times + 1 } * 2, 106 - ) 109 + pub fn failure(location: Location) -> Location { 110 + Location(..location, repeated_failures: location.repeated_failures + 1) 107 111 } 108 112 109 - /// applies one round of cooldown 113 + /// resets the failure count to 0 110 114 /// 111 - pub fn cooldown(location: Location) { 112 - case location { 113 - Location(skip_n_times:, ..) if skip_n_times > 0 -> 114 - Location(..location, skip_n_times: location.skip_n_times - 1) 115 - _ -> Location(..location, skip_n_times: 0) 116 - } 115 + pub fn success(location: Location) -> Location { 116 + Location(..location, repeated_failures: 0) 117 117 } 118 118 119 - /// resets the cooldown to 0 0 119 + /// update the locations `next_check` using the current system time and the supplied duration 120 120 /// 121 - pub fn reset_cooldown(location: Location) { 122 - Location(..location, skip_n_times: 0, failed_n_times: 0) 121 + pub fn next_check_in(feed: Location, duration: duration.Duration) -> Location { 122 + Location( 123 + ..feed, 124 + next_check: timestamp.system_time() |> timestamp.add(duration), 125 + ) 123 126 }
+353 -212
src/eater/fetcher.gleam
··· 19 19 import eater/database 20 20 import eater/feed/rss 21 21 import eater/pubsub 22 + import gleam/bool 22 23 import gleam/dict 23 24 import gleam/erlang/process 25 + import gleam/float 24 26 import gleam/http 25 27 import gleam/http/request 26 28 import gleam/http/response 27 29 import gleam/httpc 30 + import gleam/int 28 31 import gleam/list 32 + import gleam/order 29 33 import gleam/otp/actor 30 34 import gleam/otp/factory_supervisor as factory 31 35 import gleam/otp/static_supervisor 32 36 import gleam/otp/supervision 33 37 import gleam/result 34 38 import gleam/string 39 + import gleam/time/calendar 40 + import gleam/time/duration 41 + import gleam/time/timestamp 35 42 import gleam/uri 36 43 import parsed_it/xml 37 44 import sqlight 38 45 import woof 39 46 import youid/uuid 40 47 41 - const interval_in_ms: Int = 3_600_000 48 + /// this should really be a constant but 49 + /// is currently not supported 50 + /// 51 + fn duration_between_checks() { 52 + duration.milliseconds(3_600_000) 53 + } 42 54 43 55 /// log with module related structured data 44 56 /// ··· 140 152 |> woof.log(level, message, fields) 141 153 } 142 154 143 - supervision.worker(fn() { 144 - log(woof.Info, "Starting", []) 155 + use <- supervision.worker 156 + 157 + actor.new_with_initialiser(100, fn(self) { 158 + log(woof.Info, "Starting", [ 159 + woof.field("pid", string.inspect(process.self())), 160 + ]) 145 161 146 - actor.new_with_initialiser(100, fn(self) { 147 - process.send_after(self, 150, StartAll) 162 + process.send_after(self, 1000, StartAll) 148 163 149 - actor.initialised(ManagerState( 150 - self:, 151 - factory:, 152 - registry:, 153 - database:, 154 - fetchers: dict.new(), 155 - )) 156 - |> Ok 157 - }) 158 - |> actor.on_message(fn(state, message) { 159 - case message { 160 - StartAll -> { 161 - case database.all_feeds(database) { 162 - Ok(feeds) -> { 163 - // start all the fetchers 164 - list.map(feeds, fn(feed) { 165 - process.send(state.self, StartFor(feed)) 166 - }) 164 + actor.initialised(ManagerState( 165 + self:, 166 + factory:, 167 + registry:, 168 + database:, 169 + fetchers: dict.new(), 170 + )) 171 + |> Ok 172 + }) 173 + |> actor.on_message(fn(state, message) { 174 + case message { 175 + StartAll -> { 176 + case database.all_feeds(database) { 177 + Ok(feeds) -> { 178 + // start all the fetchers 179 + list.map(feeds, fn(feed) { 180 + process.send(state.self, StartFor(feed)) 181 + }) 167 182 168 - actor.continue(state) 169 - } 170 - // have the supervision handle retrying 171 - Error(_) -> actor.stop_abnormal("Failed to get feeds from database") 183 + actor.continue(state) 172 184 } 173 - 174 - actor.continue(state) 185 + // have the supervision handle retrying 186 + Error(_) -> actor.stop_abnormal("Failed to get feeds from database") 175 187 } 176 188 177 - // all starts go through this 178 - FetcherStarted(Ok(started), feed) -> { 179 - // if there is an existing sender for this 180 - // tell it to shut down 181 - case dict.get(state.fetchers, feed.id) { 182 - Ok(actor) -> 183 - case process.is_alive(actor.pid) { 184 - True -> process.send_exit(actor.pid) 185 - False -> Nil 186 - } 187 - Error(_) -> Nil 188 - } 189 + actor.continue(state) 190 + } 189 191 190 - let state = 191 - ManagerState( 192 - ..state, 193 - fetchers: dict.insert(state.fetchers, feed.id, started), 194 - ) 192 + // all starts go through this 193 + FetcherStarted(Ok(started), feed) -> { 194 + // if there is an existing sender for this 195 + // tell it to shut down 196 + case dict.get(state.fetchers, feed.id) { 197 + Ok(actor) -> 198 + case process.is_alive(actor.pid) { 199 + True -> process.send_exit(actor.pid) 200 + False -> Nil 201 + } 202 + Error(_) -> Nil 203 + } 204 + 205 + let state = 206 + ManagerState( 207 + ..state, 208 + fetchers: dict.insert(state.fetchers, feed.id, started), 209 + ) 195 210 196 - log(woof.Info, "Fetcher has registered itself", [ 197 - woof.field("feed", feed.link |> uri.to_string()), 198 - ]) 211 + log(woof.Info, "Fetcher has registered itself", [ 212 + woof.field("feed", feed.link |> uri.to_string()), 213 + woof.field("pid", started.pid |> string.inspect()), 214 + ]) 199 215 200 - actor.continue(state) 201 - } 216 + actor.continue(state) 217 + } 202 218 203 - FetcherStarted(Error(start_error), feed) -> { 204 - log(woof.Error, "Failed to start fetcher", [ 205 - woof.field("feed", feed.link |> uri.to_string()), 206 - woof.field("details", string.inspect(start_error)), 207 - ]) 219 + FetcherStarted(Error(start_error), feed) -> { 220 + log(woof.Error, "Failed to start fetcher", [ 221 + woof.field("feed", feed.link |> uri.to_string()), 222 + woof.field("details", string.inspect(start_error)), 223 + ]) 208 224 209 - actor.stop_abnormal( 210 - "Fetcher failed to start, i dont know how this would happen", 211 - ) 225 + actor.stop_abnormal( 226 + "Fetcher failed to start, i dont know how this would happen", 227 + ) 228 + } 229 + // runtime additions ---------------------------------------------------- 230 + StartFor(feed) -> { 231 + let start_new = fn() { 232 + let _ = 233 + start_new( 234 + factory, 235 + Start(database:, registry:, feed:, manager: name), 236 + ) 237 + Nil 212 238 } 213 - // runtime additions ---------------------------------------------------- 214 - StartFor(feed) -> { 215 - let start_new = fn() { 216 - let _ = 217 - start_new( 218 - factory, 219 - Start(database:, registry:, feed:, manager: name), 220 - ) 221 - Nil 222 - } 223 239 224 - case dict.get(state.fetchers, feed.id) { 225 - Ok(actor) -> { 226 - case process.is_alive(actor.pid) { 227 - True -> Nil 228 - False -> start_new() 229 - } 240 + case dict.get(state.fetchers, feed.id) { 241 + Ok(actor) -> { 242 + case process.is_alive(actor.pid) { 243 + True -> Nil 244 + False -> start_new() 230 245 } 231 - Error(_) -> start_new() 232 246 } 233 - 234 - actor.continue(state) 247 + Error(_) -> start_new() 235 248 } 236 - StopFor(feed) -> { 237 - case dict.get(state.fetchers, feed.id) { 238 - Ok(actor) -> { 239 - case process.is_alive(actor.pid) { 240 - True -> process.send_exit(actor.pid) 241 - False -> Nil 242 - } 249 + 250 + actor.continue(state) 251 + } 252 + StopFor(feed) -> { 253 + case dict.get(state.fetchers, feed.id) { 254 + Ok(actor) -> { 255 + case process.is_alive(actor.pid) { 256 + True -> process.send_exit(actor.pid) 257 + False -> Nil 243 258 } 244 - Error(_) -> Nil 245 259 } 246 - 247 - actor.continue(state) 260 + Error(_) -> Nil 248 261 } 249 - RestartFor(user) -> { 250 - process.send(state.self, StopFor(user)) 251 - process.send(state.self, StartFor(user)) 262 + 263 + actor.continue(state) 264 + } 265 + RestartFor(user) -> { 266 + process.send(state.self, StopFor(user)) 267 + process.send(state.self, StartFor(user)) 252 268 253 - actor.continue(state) 254 - } 255 - RecheckFor(feed) -> { 256 - case dict.get(state.fetchers, feed.id) { 257 - Ok(actor) -> { 258 - case process.is_alive(actor.pid) { 259 - True -> { 260 - process.send(actor.data, CheckFeeds) 261 - } 262 - False -> Nil 269 + actor.continue(state) 270 + } 271 + RecheckFor(feed) -> { 272 + case dict.get(state.fetchers, feed.id) { 273 + Ok(actor) -> { 274 + case process.is_alive(actor.pid) { 275 + True -> { 276 + process.send(actor.data, CheckFeed) 263 277 } 278 + False -> Nil 264 279 } 265 - Error(_) -> Nil 266 280 } 267 - 268 - actor.continue(state) 281 + Error(_) -> Nil 269 282 } 283 + 284 + actor.continue(state) 270 285 } 271 - }) 272 - |> actor.named(name) 273 - |> actor.start() 286 + } 274 287 }) 288 + |> actor.named(name) 289 + |> actor.start() 275 290 } 276 291 277 292 /// start a new fetcher in the fetcher factory ··· 299 314 } 300 315 301 316 pub type Message { 302 - CheckFeeds 317 + CheckFeed 303 318 } 304 319 305 320 type State { 306 321 /// `self` - own subject 322 + /// `fetch_timer` - the timer that raises `CheckFeed` every `duration_between_checks` 307 323 /// `feed` - the feed this instance fetches 308 324 /// `publish_to` - the group registry to publish the items to 309 325 /// `database` - used to update the feed cooldowns ··· 321 337 fn fetcher(args: Start) -> Result(actor.Started(_), actor.StartError) { 322 338 let Start(feed:, registry:, database:, manager:) = args 323 339 324 - log(woof.Info, "Starting", [woof.field("feed", feed.link |> uri.to_string())]) 325 - 326 340 let started = 327 341 actor.new_with_initialiser(100, fn(self) { 328 - // TODO: persist the next timestamp to check at to the database 329 - // so the actor restarting doesnt preeptively recheck the feed 330 - let fetch_timer = process.send_after(self, 150, CheckFeeds) 342 + log(woof.Info, "Starting", [ 343 + woof.field("feed", feed.link |> uri.to_string()), 344 + woof.field("pid", string.inspect(process.self())), 345 + ]) 346 + 347 + let fetch_timer = case 348 + timestamp.compare(feed.next_check, timestamp.system_time()) 349 + { 350 + // next_check is in the future 351 + // send a CheckFeed message accordingly 352 + order.Gt -> { 353 + let difference = 354 + feed.next_check 355 + |> timestamp.difference(timestamp.system_time()) 356 + 357 + log(woof.Info, "Next check is at", [ 358 + woof.field( 359 + "timestamp", 360 + feed.next_check |> timestamp.to_rfc3339(calendar.local_offset()), 361 + ), 362 + ]) 363 + 364 + difference 365 + |> duration.to_milliseconds() 366 + |> process.send_after(self, _, CheckFeed) 367 + } 368 + // next check is right now 369 + _ -> { 370 + log(woof.Info, "Next check is being scheduled now", []) 371 + 372 + process.send_after(self, 150, CheckFeed) 373 + } 374 + } 331 375 332 376 actor.initialised(State( 333 377 self: self, ··· 353 397 /// 354 398 fn on_message(state: State, message: Message) -> actor.Next(State, Message) { 355 399 case message { 356 - CheckFeeds -> { 400 + // gets periodically raised to check our feed 401 + CheckFeed -> { 357 402 log(woof.Info, "Checking feed", [ 358 403 woof.field("feed", state.feed.link |> uri.to_string()), 359 404 ]) 360 405 361 - // only actually check the feed if there are people subscribed to it 362 - case pubsub.subscriber_count_feed(state.feed, state.registry) { 363 - [] -> 364 - log(woof.Warning, "Skipping", [ 365 - woof.field("feed", state.feed.link |> uri.to_string()), 366 - woof.field("reason", "No sender is subscribed"), 367 - ]) 368 - [_, ..] -> { 369 - let handled = handle_feed(state, state.feed) 370 - case handled { 371 - Ok(_) -> Nil 372 - Error(error) -> 373 - log(woof.Error, "Checking feed failed", [ 374 - woof.field("feed", state.feed.link |> uri.to_string()), 375 - woof.field("details", string.inspect(error)), 406 + // if we've failed to fetch the feed too many times 407 + // stop fetching for this feed, until it gets started again 408 + case state.feed.repeated_failures { 409 + failures if failures > 10 -> failed_too_often(state) 410 + _ -> { 411 + let reset_timer_and_continue = fn(state: State) { 412 + // cancel old timer in case we received another CheckFeeds message that wasnt sent by the timer 413 + // from the Ui for example 414 + process.cancel_timer(state.fetch_timer) 415 + 416 + // update the `next_check` timestamp in the `rss.Location` 417 + let feed = rss.next_check_in(state.feed, duration_between_checks()) 418 + 419 + // and save it to the database 420 + case database.update_feed_next_check(state.database, feed) { 421 + Error(_) -> 422 + actor.stop_abnormal( 423 + "Failed to persist new next_check for " 424 + <> state.feed.id |> uuid.to_string() 425 + <> " " 426 + <> state.feed.link |> uri.to_string(), 427 + ) 428 + // we managed to delete it 429 + // lets notify the other 430 + Ok(_) -> { 431 + // check again after interval has elapsed 432 + let fetch_timer = 433 + process.send_after( 434 + state.self, 435 + duration_between_checks() |> duration.to_milliseconds(), 436 + CheckFeed, 437 + ) 438 + 439 + log(woof.Info, "Next check is at", [ 440 + woof.field( 441 + "timestamp", 442 + feed.next_check 443 + |> timestamp.to_rfc3339(calendar.local_offset()), 444 + ), 445 + ]) 446 + 447 + actor.continue(State(..state, fetch_timer:, feed:)) 448 + } 449 + } 450 + } 451 + 452 + // only actually check the feed if there are people subscribed to it 453 + case pubsub.subscriber_count_feed(state.feed, state.registry) { 454 + [] -> { 455 + log(woof.Warning, "Skipping", [ 456 + woof.field("feed-url", state.feed.link |> uri.to_string()), 457 + woof.field("feed-id", state.feed.id |> uuid.to_string()), 458 + woof.field("reason", "No sender is subscribed"), 376 459 ]) 460 + reset_timer_and_continue(state) 461 + } 462 + 463 + [_, ..] -> 464 + case handle_feed_fetching(state) { 465 + Ok(state) -> reset_timer_and_continue(state) 466 + Error(stop) -> stop 467 + } 377 468 } 378 469 } 379 470 } 380 - 381 - // cancel old timer in case we received another CheckFeeds message that wasnt sent by the timer 382 - process.cancel_timer(state.fetch_timer) 383 - 384 - // check again after interval has elapsed 385 - let fetch_timer = 386 - process.send_after(state.self, interval_in_ms, CheckFeeds) 387 - 388 - actor.continue(State(..state, fetch_timer:)) 389 471 } 390 472 } 391 473 } 392 474 393 - type HandleFeedError { 394 - FailedToDecreaseCooldown(sqlight.Error) 395 - FailedToPersistFailure(sqlight.Error, FetchError) 475 + /// stop and notify that we arent fetching this feed anymore 476 + /// - notify senders through pubsub 477 + /// - stop 478 + /// 479 + fn failed_too_often(state: State) -> actor.Next(State, Message) { 480 + log(woof.Warning, "Feed failed too often, fetching is being paused", [ 481 + woof.field("feed-url", state.feed.link |> uri.to_string()), 482 + woof.field("feed-id", state.feed.id |> uuid.to_string()), 483 + woof.int_field("feed-failures", state.feed.repeated_failures), 484 + woof.bool_field("failure-too-high", state.feed.repeated_failures > 10), 485 + ]) 486 + 487 + // tell the senders so they can message subscribed users about it 488 + pubsub.publish_feed_pausing(state.registry, state.feed) 489 + 490 + // stop 491 + actor.stop() 396 492 } 397 493 398 - // TODO: clean up 399 - /// tries to fetch a given feed and publish it to the group registry 400 - /// 401 - /// handles the failure cases and incrementsthe feed's cooldown accordingly 494 + /// handle fetching a feed and the associated failures 402 495 /// 403 - fn handle_feed( 404 - state: State, 405 - location: rss.Location, 406 - ) -> Result(Nil, HandleFeedError) { 407 - let feed = fetch_feed(location) 408 - 496 + /// the error is only used to `Error(actor.stop_abnormal(..))` 497 + /// 498 + fn handle_feed_fetching(state: State) -> Result(State, actor.Next(State, a)) { 499 + let feed = fetch_feed(state.feed.link) 409 500 case feed { 501 + Error(error) -> { 502 + // log that something went wrong while fetching 503 + log(woof.Warning, "Failed to fetch feed", [ 504 + woof.field("feed-url", state.feed.link |> uri.to_string()), 505 + woof.field("feed-id", state.feed.id |> uuid.to_string()), 506 + ..describe_fetch_error(error) 507 + ]) 508 + 509 + // update the feed failure stats 510 + let feed = rss.failure(state.feed) 511 + 512 + // persist the new failure stats 513 + use _ <- result.try( 514 + database.update_feed_repeated_failures(state.database, feed) 515 + |> result.replace_error(actor.stop_abnormal( 516 + "Failed to increase failure count", 517 + )), 518 + ) 519 + 520 + // continue with the new feed 521 + Ok(State(..state, feed:)) 522 + } 410 523 Ok(feed) -> { 411 524 feed.channel.items 412 525 // reverse so we publish the oldest items first ··· 419 532 ) 420 533 }) 421 534 422 - case location.failed_n_times != 0 || location.skip_n_times != 0 { 423 - True -> 424 - rss.reset_cooldown(location) 425 - |> database.update_feed_cooldown(state.database) 426 - |> result.map_error(fn(error) { 427 - log(woof.Error, "Failed to reset feed cooldown", [ 428 - woof.field("details", string.inspect(error)), 429 - ]) 430 - }) 431 - |> result.unwrap(Nil) 432 - False -> Nil 433 - } 434 - |> Ok 535 + Ok(state) 435 536 } 436 - Error(OnCooldown) -> 437 - rss.cooldown(location) 438 - |> database.update_feed_cooldown(state.database) 439 - |> result.map_error(FailedToDecreaseCooldown) 537 + } 538 + } 539 + 540 + // feed fetching ---------------------------------------------------------------- 541 + 542 + pub type FetchError { 543 + InvalidUri 544 + PathNotXml 545 + RequestFailed(httpc.HttpError) 546 + Non200Response(response.Response(String)) 547 + NonXmlResponse 548 + FailedToParseRssFeed(xml.XmlDecodeError) 549 + } 550 + 551 + fn describe_fetch_error(fetch_error: FetchError) { 552 + case fetch_error { 553 + InvalidUri -> [ 554 + woof.field("error-variant", "InvalidUri"), 555 + woof.field( 556 + "description", 557 + "The supplied uri could not be converted into a request", 558 + ), 559 + ] 560 + PathNotXml -> [ 561 + woof.field("error-variant", "InvalidUri"), 562 + woof.field("description", "The supplied uri does not end with .xml"), 563 + ] 564 + RequestFailed(httpc_error) -> [ 565 + woof.field("error-variant", "RequestFailed"), 566 + woof.field("description", "The request failed"), 567 + woof.field("httpc-error", case httpc_error { 568 + httpc.InvalidUtf8Response -> "Invalid UTF8" 569 + httpc.FailedToConnect(_, _) -> "Failed to connect" 570 + httpc.ResponseTimeout -> "Timeout" 571 + }), 572 + ] 573 + NonXmlResponse -> [ 574 + woof.field("error-variant", "NonXmlResponse"), 575 + woof.field("description", "The response was not text/xml"), 576 + ] 577 + FailedToParseRssFeed(error) -> [ 578 + woof.field("error-variant", "FailedToParseRssFeed"), 579 + woof.field("description", "The returned xml could not be parsed"), 580 + ..case error { 581 + xml.InvalidXml(error) -> [ 582 + woof.field("xml-error", "Invalid xml: " <> error), 583 + ] 584 + xml.UnableToDecode(decode_errors) -> 585 + list.index_fold(decode_errors, [], fn(acc, error, index) { 586 + let field_start = fn() { "decode-error-" <> int.to_string(index) } 440 587 441 - Error(error) -> 442 - rss.failure(location) 443 - |> database.update_feed_cooldown(state.database) 444 - |> result.map_error(FailedToPersistFailure(_, error)) 588 + [ 589 + woof.field(field_start() <> "expected", error.expected), 590 + woof.field(field_start() <> "found", error.found), 591 + woof.field(field_start() <> "path", string.join(error.path, ".")), 592 + ..acc 593 + ] 594 + }) 595 + } 596 + ] 597 + Non200Response(response) -> [ 598 + woof.field("error-variant", "Non200Response"), 599 + woof.int_field("status-code", response.status), 600 + ] 445 601 } 446 602 } 447 603 448 - /// gets an rss feed from a given location and parses it into a `rss.Feed` 604 + /// gets an rss feed from a given uri and parses it into a `rss.Feed` 449 605 /// 450 - fn fetch_feed(location: rss.Location) -> Result(rss.Feed, FetchError) { 451 - case location { 452 - rss.Location(skip_n_times: 0, ..) -> { 453 - log(woof.Info, "Fetching", [ 454 - woof.field("feed", location.link |> uri.to_string()), 455 - ]) 606 + pub fn fetch_feed(feed: uri.Uri) -> Result(rss.Feed, FetchError) { 607 + use <- bool.guard( 608 + bool.negate(feed.path |> string.ends_with(".xml")), 609 + return: Error(PathNotXml), 610 + ) 456 611 457 - use req <- result.try( 458 - request.from_uri(location.link) 459 - |> result.replace_error(FailedToParseUrl(location.link)), 460 - ) 612 + use req <- result.try( 613 + request.from_uri(feed) 614 + |> result.replace_error(InvalidUri), 615 + ) 461 616 462 - let req = 463 - req 464 - |> request.set_method(http.Get) 617 + let req = 618 + req 619 + |> request.set_method(http.Get) 465 620 466 - use response <- result.try( 467 - httpc.send(req) |> result.map_error(RequestFailed), 468 - ) 621 + use response <- result.try(httpc.send(req) |> result.map_error(RequestFailed)) 469 622 470 - case response { 471 - response.Response(status: 200, headers: _, body:) -> 623 + case response { 624 + response.Response(status: 200, headers: _, body:) -> { 625 + case 626 + response.get_header(response, "Content-Type") 627 + |> result.map(string.contains(_, "xml")) 628 + { 629 + Ok(True) -> 472 630 xml.parse(body, rss.decoder()) 473 631 |> result.map_error(FailedToParseRssFeed) 474 - response -> { 475 - log(woof.Warning, "Fetching failed", [ 476 - woof.field("feed", location.link |> uri.to_string()), 477 - woof.int_field("status", response.status), 478 - ]) 479 - Error(Non200Response(response)) 480 - } 632 + _ -> Error(NonXmlResponse) 481 633 } 482 634 } 483 - _ -> { 484 - log(woof.Info, "Skipping", [ 485 - woof.field("feed", location.link |> uri.to_string()), 486 - woof.field("reason", "cooldown"), 487 - ]) 488 - Error(OnCooldown) 635 + 636 + response -> { 637 + Error(Non200Response(response)) 489 638 } 490 639 } 491 640 } 492 - 493 - type FetchError { 494 - FailedToParseUrl(uri.Uri) 495 - RequestFailed(httpc.HttpError) 496 - Non200Response(response.Response(String)) 497 - FailedToParseRssFeed(xml.XmlDecodeError) 498 - OnCooldown 499 - }
+21
src/eater/pubsub.gleam
··· 60 60 61 61 pub type Message { 62 62 FeedUpdate(update: rss.FeedUpdate) 63 + FailedToFetchTooManyTimes(rss.Location) 63 64 } 64 65 65 66 /// get the string 'channel' for a given `rss.Location` ··· 93 94 log(woof.Info, "Feed published update", [ 94 95 woof.field("feed", feed.link |> uri.to_string()), 95 96 woof.field("update", update.new.link), 97 + ]) 98 + } 99 + 100 + /// publish info about a feed being paused because it has failed fetching too many times 101 + /// 102 + pub fn publish_feed_pausing( 103 + registry registry: Registry, 104 + feed feed: rss.Location, 105 + ) -> Nil { 106 + let registry = group_registry.get_registry(registry) 107 + let channel = feed_channel(feed) 108 + 109 + let members = group_registry.members(registry, channel) 110 + 111 + list.map(members, process.send(_, FailedToFetchTooManyTimes(feed))) 112 + 113 + log(woof.Warning, "Feed pausing has been published", [ 114 + woof.field("feed-id", feed.id |> uuid.to_string()), 115 + woof.field("feed-url", feed.link |> uri.to_string()), 116 + woof.int_field("feed-failure-count", feed.repeated_failures), 96 117 ]) 97 118 } 98 119
+71 -10
src/eater/sender.gleam
··· 25 25 import gleam/bool 26 26 import gleam/dict 27 27 import gleam/erlang/process 28 + import gleam/int 28 29 import gleam/list 29 30 import gleam/otp/actor 30 31 import gleam/otp/factory_supervisor as factory ··· 228 229 229 230 log(woof.Info, "Sender has registered itself", [ 230 231 woof.field("user", user.email), 232 + woof.field("pid", string.inspect(started.pid)), 231 233 ]) 232 234 233 235 actor.continue(state) ··· 334 336 /// the sender will register itself with the manager 335 337 /// 336 338 fn start_new(factory factory_name: FactoryName, with with: Start) { 337 - log(woof.Info, "Starting", [woof.field("user", with.user.email)]) 338 - 339 339 let factory = factory.get_by_name(factory_name) 340 340 341 341 factory.start_child(factory, with) ··· 365 365 366 366 let actor_started = 367 367 actor.new_with_initialiser(100, fn(self) { 368 + log(woof.Info, "Starting", [ 369 + woof.field("user", user.email), 370 + woof.field("pid", string.inspect(process.self())), 371 + ]) 372 + 368 373 // tell self to get this users feeds from the database 369 374 process.send_after(self, 150, GetFeeds) 370 375 ··· 519 524 ]) 520 525 actor.continue(state) 521 526 } 527 + PubSubMessage(pubsub.FailedToFetchTooManyTimes(feed)) -> { 528 + let state = drop_feed(state, feed) 529 + case send_feed_pausing_email(state, feed) { 530 + Ok(_) -> actor.continue(state) 531 + Error(error) -> { 532 + log(woof.Error, "Failed to send feed pausing email", [ 533 + woof.field("user", state.user.email), 534 + woof.field("feed-url", feed.link |> uri.to_string()), 535 + woof.field("feed-id", feed.id |> uuid.to_string()), 536 + woof.field("details", string.inspect(error)), 537 + ]) 538 + 539 + // try again in a minute 540 + process.send_after( 541 + state.self, 542 + 60_000, 543 + Retry(pubsub.FailedToFetchTooManyTimes(feed), 1), 544 + ) 545 + 546 + actor.continue(state) 547 + } 548 + } 549 + } 550 + Retry(message: pubsub.FailedToFetchTooManyTimes(feed), attempt: _) -> 551 + case send_feed_pausing_email(state, feed) { 552 + Ok(_) -> actor.continue(state) 553 + Error(error) -> { 554 + log( 555 + woof.Error, 556 + "Failed to send feed deletion email a second time. Wont retry again.", 557 + [ 558 + woof.field("user", state.user.email), 559 + woof.field("feed-url", feed.link |> uri.to_string()), 560 + woof.field("feed-id", feed.id |> uuid.to_string()), 561 + woof.field("details", string.inspect(error)), 562 + ], 563 + ) 564 + 565 + actor.continue(state) 566 + } 567 + } 522 568 } 523 569 } 524 570 ··· 528 574 /// 529 575 fn initial_selector(state: State) { 530 576 log(woof.Info, "Initializing selector", [ 531 - woof.field("user", state.user.email), 577 + woof.field("user-email", state.user.email), 578 + woof.field("user-id", state.user.id |> uuid.to_string()), 579 + ..list.index_fold(state.feeds, [], fn(acc, feed, index) { 580 + let field_name = fn(suffix) { 581 + "feed-" <> suffix <> "-" <> int.to_string(index) 582 + } 583 + 584 + [ 585 + woof.field(field_name("url"), feed.link |> uri.to_string()), 586 + woof.field(field_name("id"), feed.id |> uuid.to_string()), 587 + ..acc 588 + ] 589 + }) 532 590 ]) 533 591 534 592 let self = process.self() ··· 544 602 545 603 let selector = 546 604 list.fold(state.feeds, selector, fn(selector, feed) { 547 - log(woof.Info, "User is subscribed to feed", [ 548 - woof.field("user", state.user.email), 549 - woof.field("feed", feed.link |> uri.to_string()), 550 - ]) 551 - 552 605 pubsub.select_feed( 553 606 selector:, 554 607 feed:, ··· 625 678 ) 626 679 627 680 use _ <- result.try( 628 - send_email(state, update) |> result.map_error(FailedToSendEmail), 681 + send_update_email(state, update) |> result.map_error(FailedToSendEmail), 629 682 ) 630 683 631 684 log(woof.Info, "Sent update to user", [ ··· 646 699 Ok(Nil) 647 700 } 648 701 649 - fn send_email( 702 + fn send_update_email( 650 703 state: State, 651 704 update: rss.FeedUpdate, 652 705 ) -> Result(Nil, gcourier.Error) { 653 706 smtp.feed_update_to_email(state.smtp_environment, update, state.user) 654 707 |> smtp.send_message(state.smtp_environment) 655 708 } 709 + 710 + fn send_feed_pausing_email( 711 + state: State, 712 + feed: rss.Location, 713 + ) -> Result(Nil, gcourier.Error) { 714 + smtp.feed_pausing_to_email(state.smtp_environment, feed, state.user) 715 + |> smtp.send_message(state.smtp_environment) 716 + }
+31
src/eater/smtp.gleam
··· 23 23 import gleam/int 24 24 import gleam/option.{type Option} 25 25 import gleam/result 26 + import gleam/uri 26 27 import lustre/attribute 27 28 import lustre/element 28 29 import lustre/element/html ··· 82 83 html.body([], [ 83 84 html.h1([], [html.text("Your one time password")]), 84 85 html.p([], [html.text(one_time_password)]), 86 + ]), 87 + ]) 88 + |> element.to_readable_string, 89 + )) 90 + } 91 + 92 + /// create a message informing a user about a feed being deleted 93 + /// because it has failed to be fetched to many times 94 + /// 95 + pub fn feed_pausing_to_email( 96 + environment: SmtpEnvironment, 97 + deleted_feed: rss.Location, 98 + user: user.User, 99 + ) -> gcourier.Message { 100 + gcourier.new_message(sender(environment)) 101 + |> gcourier.set_subject("One of your feeds has been paused") 102 + |> gcourier.add_recipient(gcourier.To(user.email)) 103 + |> gcourier.set_content(gcourier.Html( 104 + html.html([], [ 105 + html.body([], [ 106 + html.h1([], [html.text("One of your feeds has been pause")]), 107 + html.p([], [ 108 + html.text("Checking of "), 109 + html.a([attribute.href(deleted_feed.link |> uri.to_string())], [ 110 + element.text(deleted_feed.link |> uri.to_string()), 111 + ]), 112 + html.text( 113 + " has been paused. Because fetching or parsing of it has ran into issues too many times.", 114 + ), 115 + ]), 85 116 ]), 86 117 ]) 87 118 |> element.to_readable_string,
+57 -51
src/eater/sql.gleam
··· 3 3 4 4 import gleam/dynamic/decode 5 5 import gleam/option.{type Option} 6 + import gleam/time/timestamp.{type Timestamp} 6 7 import parrot/dev 7 8 8 - pub fn add_feed( 9 - id id: BitArray, 10 - link link: String, 11 - failed_n_times failed_n_times: Int, 12 - skip_n_times skip_n_times: Int, 13 - ) { 9 + pub fn add_feed(id id: BitArray, link link: String) { 14 10 let sql = 15 11 " 16 12 INSERT OR IGNORE INTO feeds ( 17 13 id, 18 - link, 19 - failed_n_times, 20 - skip_n_times 14 + link 21 15 ) 22 16 values 23 - (?, ?, ?, ?)" 24 - #(sql, [ 25 - dev.ParamBitArray(id), 26 - dev.ParamString(link), 27 - dev.ParamInt(failed_n_times), 28 - dev.ParamInt(skip_n_times), 29 - ]) 17 + (?, ?)" 18 + #(sql, [dev.ParamBitArray(id), dev.ParamString(link)]) 30 19 } 31 20 32 21 pub type AllFeeds { 33 - AllFeeds(id: BitArray, link: String, skip_n_times: Int, failed_n_times: Int) 22 + AllFeeds( 23 + id: BitArray, 24 + link: String, 25 + next_check: Timestamp, 26 + repeated_failures: Int, 27 + ) 34 28 } 35 29 36 30 pub fn all_feeds() { 37 - let sql = "SELECT id, link, skip_n_times, failed_n_times FROM feeds" 31 + let sql = "SELECT id, link, next_check, repeated_failures FROM feeds" 38 32 #(sql, [], all_feeds_decoder()) 39 33 } 40 34 41 35 pub fn all_feeds_decoder() -> decode.Decoder(AllFeeds) { 42 36 use id <- decode.field(0, decode.bit_array) 43 37 use link <- decode.field(1, decode.string) 44 - use skip_n_times <- decode.field(2, decode.int) 45 - use failed_n_times <- decode.field(3, decode.int) 46 - decode.success(AllFeeds(id:, link:, skip_n_times:, failed_n_times:)) 38 + use next_check <- decode.field(2, dev.datetime_decoder()) 39 + use repeated_failures <- decode.field(3, decode.int) 40 + decode.success(AllFeeds(id:, link:, next_check:, repeated_failures:)) 47 41 } 48 42 49 43 pub fn delete_feed(id id: BitArray) { ··· 51 45 #(sql, [dev.ParamBitArray(id)]) 52 46 } 53 47 54 - pub fn update_feed_cooldown( 55 - skip_n_times skip_n_times: Int, 56 - failed_n_times failed_n_times: Int, 48 + pub fn update_feed_next_check(next_check next_check: Timestamp, id id: BitArray) { 49 + let sql = 50 + "UPDATE feeds SET next_check = ? 51 + WHERE id = ?" 52 + #(sql, [dev.ParamTimestamp(next_check), dev.ParamBitArray(id)]) 53 + } 54 + 55 + pub fn update_feed_repeated_failures( 56 + repeated_failures repeated_failures: Int, 57 57 id id: BitArray, 58 58 ) { 59 59 let sql = 60 - "UPDATE feeds SET skip_n_times = ?, failed_n_times = ? 60 + "UPDATE feeds SET repeated_failures = ? 61 61 WHERE id = ?" 62 - #(sql, [ 63 - dev.ParamInt(skip_n_times), 64 - dev.ParamInt(failed_n_times), 65 - dev.ParamBitArray(id), 66 - ]) 62 + #(sql, [dev.ParamInt(repeated_failures), dev.ParamBitArray(id)]) 67 63 } 68 64 69 65 pub type FeedByLink { 70 - FeedByLink(id: BitArray, link: String, skip_n_times: Int, failed_n_times: Int) 66 + FeedByLink( 67 + id: BitArray, 68 + link: String, 69 + next_check: Timestamp, 70 + repeated_failures: Int, 71 + ) 71 72 } 72 73 73 74 pub fn feed_by_link(link link: String) { 74 75 let sql = 75 - "SELECT id, link, skip_n_times, failed_n_times FROM feeds WHERE link = ? LIMIT 1" 76 + "SELECT id, link, next_check, repeated_failures FROM feeds WHERE link = ? LIMIT 1" 76 77 #(sql, [dev.ParamString(link)], feed_by_link_decoder()) 77 78 } 78 79 79 80 pub fn feed_by_link_decoder() -> decode.Decoder(FeedByLink) { 80 81 use id <- decode.field(0, decode.bit_array) 81 82 use link <- decode.field(1, decode.string) 82 - use skip_n_times <- decode.field(2, decode.int) 83 - use failed_n_times <- decode.field(3, decode.int) 84 - decode.success(FeedByLink(id:, link:, skip_n_times:, failed_n_times:)) 83 + use next_check <- decode.field(2, dev.datetime_decoder()) 84 + use repeated_failures <- decode.field(3, decode.int) 85 + decode.success(FeedByLink(id:, link:, next_check:, repeated_failures:)) 85 86 } 86 87 87 88 pub fn add_user( ··· 189 190 pub fn add_subscription(user_id user_id: BitArray, feed_id feed_id: BitArray) { 190 191 let sql = 191 192 " 192 - INSERT INTO subscriptions ( 193 + INSERT OR IGNORE INTO subscriptions ( 193 194 user_id, 194 195 feed_id 195 196 ) ··· 212 213 FeedsForUser( 213 214 feed_id: Option(BitArray), 214 215 feed_link: Option(String), 215 - feed_skip: Option(Int), 216 - feed_failed: Option(Int), 216 + next_check: Option(Timestamp), 217 + repeated_failures: Option(Int), 217 218 ) 218 219 } 219 220 ··· 222 223 "SELECT 223 224 feeds.id AS feed_id, 224 225 feeds.link AS feed_link, 225 - feeds.skip_n_times AS feed_skip, 226 - feeds.failed_n_times AS feed_failed 226 + feeds.next_check AS next_check, 227 + feeds.repeated_failures AS repeated_failures 227 228 FROM 228 229 subscriptions 229 230 LEFT JOIN ··· 237 238 pub fn feeds_for_user_decoder() -> decode.Decoder(FeedsForUser) { 238 239 use feed_id <- decode.field(0, decode.optional(decode.bit_array)) 239 240 use feed_link <- decode.field(1, decode.optional(decode.string)) 240 - use feed_skip <- decode.field(2, decode.optional(decode.int)) 241 - use feed_failed <- decode.field(3, decode.optional(decode.int)) 242 - decode.success(FeedsForUser(feed_id:, feed_link:, feed_skip:, feed_failed:)) 241 + use next_check <- decode.field(2, decode.optional(dev.datetime_decoder())) 242 + use repeated_failures <- decode.field(3, decode.optional(decode.int)) 243 + decode.success(FeedsForUser( 244 + feed_id:, 245 + feed_link:, 246 + next_check:, 247 + repeated_failures:, 248 + )) 243 249 } 244 250 245 251 pub type FeedSubscriptionCount { ··· 266 272 SubscriptionsPerFeed( 267 273 id: BitArray, 268 274 link: String, 269 - skip_n_times: Int, 270 - failed_n_times: Int, 275 + next_check: Timestamp, 276 + repeated_failures: Int, 271 277 count: Int, 272 278 ) 273 279 } ··· 277 283 "SELECT 278 284 feeds.id, 279 285 feeds.link, 280 - feeds.skip_n_times, 281 - feeds.failed_n_times, 286 + feeds.next_check, 287 + feeds.repeated_failures, 282 288 count(subscriptions.user_id) 283 289 FROM 284 290 feeds ··· 291 297 pub fn subscriptions_per_feed_decoder() -> decode.Decoder(SubscriptionsPerFeed) { 292 298 use id <- decode.field(0, decode.bit_array) 293 299 use link <- decode.field(1, decode.string) 294 - use skip_n_times <- decode.field(2, decode.int) 295 - use failed_n_times <- decode.field(3, decode.int) 300 + use next_check <- decode.field(2, dev.datetime_decoder()) 301 + use repeated_failures <- decode.field(3, decode.int) 296 302 use count <- decode.field(4, decode.int) 297 303 decode.success(SubscriptionsPerFeed( 298 304 id:, 299 305 link:, 300 - skip_n_times:, 301 - failed_n_times:, 306 + next_check:, 307 + repeated_failures:, 302 308 count:, 303 309 )) 304 310 }
+8 -6
src/eater/sql/feeds.sql
··· 16 16 -- name: AddFeed :exec 17 17 INSERT OR IGNORE INTO feeds ( 18 18 id, 19 - link, 20 - failed_n_times, 21 - skip_n_times 19 + link 22 20 ) 23 21 values 24 - (?, ?, ?, ?); 22 + (?, ?); 25 23 26 24 -- name: AllFeeds :many 27 25 SELECT * FROM feeds; ··· 29 27 -- name: DeleteFeed :exec 30 28 DELETE FROM feeds WHERE id = ?; 31 29 32 - -- name: UpdateFeedCooldown :exec 33 - UPDATE feeds SET skip_n_times = ?, failed_n_times = ? 30 + -- name: UpdateFeedNextCheck :exec 31 + UPDATE feeds SET next_check = ? 32 + WHERE id = ?; 33 + 34 + -- name: UpdateFeedRepeatedFailures :exec 35 + UPDATE feeds SET repeated_failures = ? 34 36 WHERE id = ?; 35 37 36 38 -- name: FeedByLink :one
+5 -5
src/eater/sql/subscriptions.sql
··· 14 14 -- See the Licence for the specific language governing permissions and limitations. [cite: 6] 15 15 16 16 -- name: AddSubscription :exec 17 - INSERT INTO subscriptions ( 17 + INSERT OR IGNORE INTO subscriptions ( 18 18 user_id, 19 19 feed_id 20 20 ) ··· 33 33 SELECT 34 34 feeds.id AS feed_id, 35 35 feeds.link AS feed_link, 36 - feeds.skip_n_times AS feed_skip, 37 - feeds.failed_n_times AS feed_failed 36 + feeds.next_check AS next_check, 37 + feeds.repeated_failures AS repeated_failures 38 38 FROM 39 39 subscriptions 40 40 LEFT JOIN ··· 54 54 SELECT 55 55 feeds.id, 56 56 feeds.link, 57 - feeds.skip_n_times, 58 - feeds.failed_n_times, 57 + feeds.next_check, 58 + feeds.repeated_failures, 59 59 count(subscriptions.user_id) 60 60 FROM 61 61 feeds
+1 -1
src/eater/ui/main_ui.gleam
··· 207 207 UserClickedUnsubscribe(rss.Location) 208 208 UnsubscribedInBackend(Result(rss.Location, Nil)) 209 209 UserSubmittedSubscription(Result(uri.Uri, Form(uri.Uri))) 210 - SubscribedInBackend(Result(rss.Location, Nil)) 211 210 BackendReturnedFeed(Result(rss.Location, Nil)) 211 + SubscribedInBackend(Result(rss.Location, backend.NewSubscriptionError)) 212 212 } 213 213 214 214 /// describe a message using structured data